ExternalFileTableValuedFunction.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.Resource;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.StructField;
import org.apache.doris.catalog.StructType;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.common.util.FileFormatUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.datasource.property.fileformat.CsvFileFormatProperties;
import org.apache.doris.datasource.property.fileformat.FileFormatProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.datasource.tvf.source.TVFScanNode;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
import org.apache.doris.proto.Types.PScalarType;
import org.apache.doris.proto.Types.PStructField;
import org.apache.doris.proto.Types.PTypeDesc;
import org.apache.doris.proto.Types.PTypeNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileScanRange;
import org.apache.doris.thrift.TFileScanRangeParams;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.THdfsParams;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPrimitiveType;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import org.apache.thrift.TSerializer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* ExternalFileTableValuedFunction is used for S3/HDFS/LOCAL table-valued-function
*/
public abstract class ExternalFileTableValuedFunction extends TableValuedFunctionIf {
public static final Logger LOG = LogManager.getLogger(ExternalFileTableValuedFunction.class);
protected static final String URI_KEY = "uri";
public static final String PROP_TABLE_ID = "table_id";
// Columns got from file and path(if has)
protected List<Column> columns = null;
// User specified csv columns, it will override columns got from file
private final List<Column> csvSchema = Lists.newArrayList();
// Partition columns from path, e.g. /path/to/columnName=columnValue.
private List<String> pathPartitionKeys;
protected List<TBrokerFileStatus> fileStatuses = Lists.newArrayList();
protected Map<String, String> backendConnectProperties = Maps.newHashMap();
protected StorageProperties storageProperties;
// Processed parameters derived from user input; includes normalization and default value filling.
Map<String, String> processedParams;
protected String filePath;
protected Optional<String> resourceName = Optional.empty();
public FileFormatProperties fileFormatProperties;
private long tableId;
public abstract TFileType getTFileType();
public abstract String getFilePath();
public abstract BrokerDesc getBrokerDesc();
public TFileFormatType getTFileFormatType() {
return fileFormatProperties.getFileFormatType();
}
public TFileCompressType getTFileCompressType() {
return fileFormatProperties.getCompressionType();
}
public Map<String, String> getBackendConnectProperties() {
return backendConnectProperties;
}
public List<String> getPathPartitionKeys() {
return pathPartitionKeys;
}
protected void parseFile() throws AnalysisException {
String path = getFilePath();
BrokerDesc brokerDesc = getBrokerDesc();
try {
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
} catch (UserException e) {
throw new AnalysisException("parse file failed, err: " + e.getMessage(), e);
}
}
//The keys in properties map need to be lowercase.
protected Map<String, String> parseCommonProperties(Map<String, String> properties) throws AnalysisException {
Map<String, String> mergedProperties = Maps.newHashMap();
if (properties.containsKey("resource")) {
Resource resource = Env.getCurrentEnv().getResourceMgr().getResource(properties.get("resource"));
if (resource == null) {
throw new AnalysisException("Can not find resource: " + properties.get("resource"));
}
this.resourceName = Optional.of(properties.get("resource"));
mergedProperties = resource.getCopiedProperties();
}
mergedProperties.putAll(properties);
// Copy the properties, because we will remove the key from properties.
Map<String, String> copiedProps = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER);
copiedProps.putAll(mergedProperties);
tableId = Long.valueOf(getOrDefaultAndRemove(copiedProps, PROP_TABLE_ID, "-1"));
String formatString = getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_FORMAT, "").toLowerCase();
fileFormatProperties = FileFormatProperties.createFileFormatProperties(formatString);
fileFormatProperties.analyzeFileFormatProperties(copiedProps, true);
if (fileFormatProperties instanceof CsvFileFormatProperties) {
FileFormatUtils.parseCsvSchema(csvSchema, getOrDefaultAndRemove(copiedProps,
CsvFileFormatProperties.PROP_CSV_SCHEMA, ""));
if (LOG.isDebugEnabled()) {
LOG.debug("get csv schema: {}", csvSchema);
}
}
pathPartitionKeys = Optional.ofNullable(
getOrDefaultAndRemove(copiedProps, FileFormatConstants.PROP_PATH_PARTITION_KEYS, null))
.map(str -> Arrays.stream(str.split(","))
.map(String::trim)
.collect(Collectors.toList()))
.orElse(Lists.newArrayList());
this.processedParams = new HashMap<>(copiedProps);
return copiedProps;
}
protected String getOrDefaultAndRemove(Map<String, String> props, String key, String defaultValue) {
String value = props.getOrDefault(key, defaultValue);
props.remove(key);
return value;
}
public List<TBrokerFileStatus> getFileStatuses() {
return fileStatuses;
}
public TFileAttributes getFileAttributes() {
return fileFormatProperties.toTFileAttributes();
}
@Override
public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) {
return new TVFScanNode(id, desc, false, sv);
}
@Override
public List<Column> getTableColumns() throws AnalysisException {
if (!csvSchema.isEmpty()) {
return csvSchema;
}
// if (FeConstants.runningUnitTest) {
// Object mockedUtObj = FeConstants.unitTestConstant;
// if (mockedUtObj instanceof List) {
// return ((List<Column>) mockedUtObj);
// }
// return new ArrayList<>();
// }
if (this.columns != null) {
return columns;
}
// get one BE address
columns = Lists.newArrayList();
Backend be = getBackend();
if (be == null) {
throw new AnalysisException("No Alive backends");
}
if (fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_WAL) {
List<Column> fileColumns = new ArrayList<>();
Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
List<Column> tableColumns = table.getBaseSchema(true);
for (int i = 0; i < tableColumns.size(); i++) {
Column column = new Column(tableColumns.get(i).getName(), tableColumns.get(i).getType(), true);
column.setUniqueId(tableColumns.get(i).getUniqueId());
column.setIsAllowNull(true);
fileColumns.add(column);
}
return fileColumns;
}
TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
PFetchTableSchemaRequest request = getFetchTableStructureRequest();
InternalService.PFetchTableSchemaResult result = null;
// `request == null` means we don't need to get schemas from BE,
// and we fill a dummy col for this table.
if (request != null) {
Future<InternalService.PFetchTableSchemaResult> future = BackendServiceProxy.getInstance()
.fetchTableStructureAsync(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
String errMsg;
if (code != TStatusCode.OK) {
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
} else {
errMsg = "fetchTableStructureAsync failed. backend address: "
+ NetUtils
.getHostPortInAccessibleFormat(address.getHostname(), address.getPort());
}
throw new AnalysisException(errMsg);
}
}
fillColumns(result);
} catch (RpcException e) {
throw new AnalysisException("fetchTableStructureResult rpc exception", e);
} catch (InterruptedException e) {
throw new AnalysisException("fetchTableStructureResult interrupted exception", e);
} catch (ExecutionException e) {
throw new AnalysisException("fetchTableStructureResult exception", e);
} catch (TException e) {
throw new AnalysisException("getFetchTableStructureRequest exception", e);
}
return columns;
}
protected Backend getBackend() {
// For the http stream task, we should obtain the be for processing the task
ImmutableMap<Long, Backend> beIdToBe;
try {
beIdToBe = Env.getCurrentSystemInfo().getBackendsByCurrentCluster();
} catch (AnalysisException e) {
LOG.warn("get backend failed, ", e);
return null;
}
if (getTFileType() == TFileType.FILE_STREAM) {
long backendId = ConnectContext.get().getBackendId();
Backend be = beIdToBe.get(backendId);
if (be == null || !be.isAlive()) {
LOG.warn("Backend {} is not alive", backendId);
return null;
} else {
return be;
}
}
for (Backend be : beIdToBe.values()) {
if (be.isAlive()) {
return be;
}
}
return null;
}
/**
* Convert PTypeDesc into doris column type
*
* @param typeNodes list PTypeNodes in PTypeDesc
* @param start the start index of typeNode to parse
* @return column type and the number of parsed PTypeNodes
*/
private Pair<Type, Integer> getColumnType(List<PTypeNode> typeNodes, int start) {
PScalarType columnType = typeNodes.get(start).getScalarType();
TPrimitiveType tPrimitiveType = TPrimitiveType.findByValue(columnType.getType());
Type type;
int parsedNodes;
if (tPrimitiveType == TPrimitiveType.ARRAY) {
Pair<Type, Integer> itemType = getColumnType(typeNodes, start + 1);
type = ArrayType.create(itemType.key(), true);
parsedNodes = 1 + itemType.value();
} else if (tPrimitiveType == TPrimitiveType.MAP) {
Pair<Type, Integer> keyType = getColumnType(typeNodes, start + 1);
Pair<Type, Integer> valueType = getColumnType(typeNodes, start + 1 + keyType.value());
type = new MapType(keyType.key(), valueType.key());
parsedNodes = 1 + keyType.value() + valueType.value();
} else if (tPrimitiveType == TPrimitiveType.STRUCT) {
parsedNodes = 1;
ArrayList<StructField> fields = new ArrayList<>();
for (int i = 0; i < typeNodes.get(start).getStructFieldsCount(); ++i) {
Pair<Type, Integer> fieldType = getColumnType(typeNodes, start + parsedNodes);
PStructField structField = typeNodes.get(start).getStructFields(i);
fields.add(new StructField(structField.getName(), fieldType.key(), structField.getComment(),
structField.getContainsNull()));
parsedNodes += fieldType.value();
}
type = new StructType(fields);
} else {
type = ScalarType.createType(PrimitiveType.fromThrift(tPrimitiveType),
columnType.getLen(), columnType.getPrecision(), columnType.getScale());
parsedNodes = 1;
}
return Pair.of(type, parsedNodes);
}
private void fillColumns(InternalService.PFetchTableSchemaResult result) {
// `result == null` means we don't need to get schemas from BE,
// and we fill a dummy col for this table.
if (result == null) {
columns.add(new Column("__dummy_col", ScalarType.createStringType(), true));
return;
}
// add fetched file columns
for (int idx = 0; idx < result.getColumnNums(); ++idx) {
PTypeDesc type = result.getColumnTypes(idx);
String colName = result.getColumnNames(idx);
columns.add(new Column(colName, getColumnType(type.getTypesList(), 0).key(), true));
}
// add path columns
// HACK(tsy): path columns are all treated as STRING type now, after BE supports reading all columns
// types by all format readers from file meta, maybe reading path columns types from BE then.
for (String colName : pathPartitionKeys) {
columns.add(new Column(colName, ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), false));
}
}
private PFetchTableSchemaRequest getFetchTableStructureRequest() throws TException {
// set TFileScanRangeParams
TFileScanRangeParams fileScanRangeParams = new TFileScanRangeParams();
fileScanRangeParams.setFormatType(fileFormatProperties.getFileFormatType());
Map<String, String> beProperties = new HashMap<>();
beProperties.putAll(backendConnectProperties);
fileScanRangeParams.setProperties(beProperties);
if (fileFormatProperties instanceof CsvFileFormatProperties) {
fileScanRangeParams.setTextSerdeType(((CsvFileFormatProperties) fileFormatProperties).getTextSerdeType());
}
fileScanRangeParams.setFileAttributes(getFileAttributes());
ConnectContext ctx = ConnectContext.get();
fileScanRangeParams.setLoadId(ctx.queryId());
if (getTFileType() == TFileType.FILE_STREAM) {
fileStatuses.add(new TBrokerFileStatus("", false, -1, true));
fileScanRangeParams.setFileType(getTFileType());
}
if (getTFileType() == TFileType.FILE_HDFS) {
THdfsParams tHdfsParams = HdfsResource.generateHdfsParam(storageProperties.getBackendConfigProperties());
String fsName = storageProperties.getBackendConfigProperties().get(HdfsResource.HADOOP_FS_NAME);
tHdfsParams.setFsName(fsName);
fileScanRangeParams.setHdfsParams(tHdfsParams);
}
// get first file, used to parse table schema
TBrokerFileStatus firstFile = null;
for (TBrokerFileStatus fileStatus : fileStatuses) {
if (isFileContentEmpty(fileStatus)) {
continue;
}
firstFile = fileStatus;
break;
}
// `firstFile == null` means:
// 1. No matching file path exists
// 2. All matched files have a size of 0
// For these two situations, we don't need to get schema from BE
if (firstFile == null) {
return null;
}
// set TFileRangeDesc
TFileRangeDesc fileRangeDesc = new TFileRangeDesc();
fileRangeDesc.setLoadId(ctx.queryId());
fileRangeDesc.setFileType(getTFileType());
fileRangeDesc.setCompressType(Util.getOrInferCompressType(
fileFormatProperties.getCompressionType(), firstFile.getPath()));
fileRangeDesc.setPath(firstFile.getPath());
fileRangeDesc.setStartOffset(0);
fileRangeDesc.setSize(firstFile.getSize());
fileRangeDesc.setFileSize(firstFile.getSize());
fileRangeDesc.setModificationTime(firstFile.getModificationTime());
// set TFileScanRange
TFileScanRange fileScanRange = new TFileScanRange();
fileScanRange.addToRanges(fileRangeDesc);
fileScanRange.setParams(fileScanRangeParams);
return InternalService.PFetchTableSchemaRequest.newBuilder()
.setFileScanRange(ByteString.copyFrom(new TSerializer().serialize(fileScanRange))).build();
}
private boolean isFileContentEmpty(TBrokerFileStatus fileStatus) {
if (fileStatus.isIsDir() || fileStatus.size == 0) {
return true;
}
if (Util.isCsvFormat(fileFormatProperties.getFileFormatType())
|| fileFormatProperties.getFileFormatType() == TFileFormatType.FORMAT_JSON) {
int magicNumberBytes = 0;
switch (fileFormatProperties.getCompressionType()) {
case GZ:
magicNumberBytes = 20;
break;
case LZO:
case LZOP:
magicNumberBytes = 42;
break;
case DEFLATE:
magicNumberBytes = 8;
break;
case SNAPPYBLOCK:
case LZ4BLOCK:
case LZ4FRAME:
magicNumberBytes = 4;
break;
case BZ2:
magicNumberBytes = 14;
break;
case UNKNOWN:
case PLAIN:
default:
break;
}
// fileStatus.size may be -1 in http_stream
if (fileStatus.size >= 0 && fileStatus.size <= magicNumberBytes) {
return true;
}
}
return false;
}
public void checkAuth(ConnectContext ctx) {
if (resourceName.isPresent()) {
if (!Env.getCurrentEnv().getAccessManager()
.checkResourcePriv(ctx, resourceName.get(), PrivPredicate.USAGE)) {
String message = ErrorCode.ERR_RESOURCE_ACCESS_DENIED_ERROR.formatErrorMsg(
PrivPredicate.USAGE.getPrivs().toString(), resourceName.get());
throw new org.apache.doris.nereids.exceptions.AnalysisException(message);
}
}
}
}