ResultFileSink.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.planner;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.OutFileClause;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.Util;
import org.apache.doris.thrift.TDataSink;
import org.apache.doris.thrift.TDataSinkType;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TResultFileSink;
import org.apache.doris.thrift.TResultFileSinkOptions;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.ArrayList;
public class ResultFileSink extends DataSink {
private PlanNodeId exchNodeId;
private TResultFileSinkOptions fileSinkOptions;
private String brokerName;
private StorageBackend.StorageType storageType;
private DataPartition outputPartition;
private TupleId outputTupleId;
private String header = "";
private String headerType = "";
private ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause) {
this.exchNodeId = exchNodeId;
this.fileSinkOptions = outFileClause.toSinkOptions();
this.brokerName = outFileClause.getBrokerDesc() == null ? null :
outFileClause.getBrokerDesc().getName();
this.storageType = outFileClause.getBrokerDesc() == null ? StorageBackend.StorageType.LOCAL :
outFileClause.getBrokerDesc().getStorageType();
}
//gen header names
private String genNames(ArrayList<String> headerNames, String columnSeparator, String lineDelimiter) {
StringBuilder sb = new StringBuilder();
for (String name : headerNames) {
sb.append(name).append(columnSeparator);
}
String headerName = sb.substring(0, sb.length() - columnSeparator.length());
headerName += lineDelimiter;
return headerName;
}
public ResultFileSink(PlanNodeId exchNodeId, OutFileClause outFileClause, ArrayList<String> labels) {
this(exchNodeId, outFileClause);
if (Util.isCsvFormat(outFileClause.getFileFormatType())) {
if (outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES)
|| outFileClause.getHeaderType().equals(FileFormatConstants.FORMAT_CSV_WITH_NAMES_AND_TYPES)) {
header = genNames(labels, outFileClause.getColumnSeparator(), outFileClause.getLineDelimiter());
}
headerType = outFileClause.getHeaderType();
}
}
public String getBrokerName() {
return brokerName;
}
public StorageBackend.StorageType getStorageType() {
return storageType;
}
public void setBrokerAddr(String ip, int port) {
Preconditions.checkNotNull(fileSinkOptions);
fileSinkOptions.setBrokerAddresses(Lists.newArrayList(new TNetworkAddress(ip, port)));
}
public void resetByDataStreamSink(DataStreamSink dataStreamSink) {
exchNodeId = dataStreamSink.getExchNodeId();
outputPartition = dataStreamSink.getOutputPartition();
}
public void setOutputTupleId(TupleId tupleId) {
outputTupleId = tupleId;
}
@Override
public String getExplainString(String prefix, TExplainLevel explainLevel) {
StringBuilder strBuilder = new StringBuilder();
strBuilder.append(prefix);
strBuilder.append("RESULT FILE SINK\n");
strBuilder.append(" FILE PATH: " + fileSinkOptions.getFilePath() + "\n");
strBuilder.append(" STORAGE TYPE: " + storageType.name() + "\n");
switch (storageType) {
case BROKER:
strBuilder.append(" broker name: " + brokerName + "\n");
break;
default:
break;
}
return strBuilder.toString();
}
@Override
protected TDataSink toThrift() {
TDataSink result = new TDataSink(TDataSinkType.RESULT_FILE_SINK);
TResultFileSink tResultFileSink = new TResultFileSink();
tResultFileSink.setFileOptions(fileSinkOptions);
tResultFileSink.setStorageBackendType(storageType.toThrift());
tResultFileSink.setDestNodeId(exchNodeId.asInt());
tResultFileSink.setHeaderType(headerType);
tResultFileSink.setHeader(header);
if (outputTupleId != null) {
tResultFileSink.setOutputTupleId(outputTupleId.asInt());
}
result.setResultFileSink(tResultFileSink);
return result;
}
@Override
public PlanNodeId getExchNodeId() {
return exchNodeId;
}
@Override
public DataPartition getOutputPartition() {
return outputPartition;
}
/**
* Construct a tuple for file status, the tuple schema as following:
* | FileNumber | Int |
* | TotalRows | Bigint |
* | FileSize | Bigint |
* | URL | Varchar |
* | WriteTimeSec | Varchar |
* | WriteSpeedKB | Varchar |
*/
public static TupleDescriptor constructFileStatusTupleDesc(DescriptorTable descriptorTable) {
TupleDescriptor resultFileStatusTupleDesc =
descriptorTable.createTupleDescriptor("result_file_status");
resultFileStatusTupleDesc.setIsMaterialized(true);
for (int i = 0; i < OutFileClause.RESULT_COL_NAMES.size(); ++i) {
SlotDescriptor slotDescriptor = descriptorTable.addSlotDescriptor(resultFileStatusTupleDesc);
slotDescriptor.setLabel(OutFileClause.RESULT_COL_NAMES.get(i));
slotDescriptor.setType(OutFileClause.RESULT_COL_TYPES.get(i));
slotDescriptor.setColumn(new Column(OutFileClause.RESULT_COL_NAMES.get(i),
OutFileClause.RESULT_COL_TYPES.get(i)));
slotDescriptor.setIsMaterialized(true);
slotDescriptor.setIsNullable(false);
}
resultFileStatusTupleDesc.computeStatAndMemLayout();
return resultFileStatusTupleDesc;
}
}