ParquetMetadataTableValuedFunction.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tablefunction;
import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.MapType;
import org.apache.doris.catalog.PrimitiveType;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.FileFormatConstants;
import org.apache.doris.datasource.property.storage.LocalProperties;
import org.apache.doris.datasource.property.storage.StorageProperties;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMetaScanRange;
import org.apache.doris.thrift.TMetadataType;
import org.apache.doris.thrift.TParquetMetadataParams;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Table-valued function parquet_meta for reading Parquet metadata.
* Currently works in two modes:
* - parquet_meta (mode parquet_metadata): row-group/column statistics similar to DuckDB parquet_metadata()
* - parquet_schema: logical schema similar to DuckDB parquet_schema()
* - parquet_file_metadata: file-level metadata aligned with DuckDB parquet_file_metadata()
* - parquet_kv_metadata: file key/value metadata aligned with DuckDB parquet_kv_metadata()
* - parquet_bloom_probe: row group bloom filter probe aligned with DuckDB parquet_bloom_probe()
*/
public class ParquetMetadataTableValuedFunction extends MetadataTableValuedFunction {
public static final String NAME = "parquet_meta";
public static final String NAME_FILE_METADATA = "parquet_file_metadata";
public static final String NAME_KV_METADATA = "parquet_kv_metadata";
public static final String NAME_BLOOM_PROBE = "parquet_bloom_probe";
private static final String MODE = "mode";
private static final String COLUMN = "column";
private static final String VALUE = "value";
private static final String MODE_METADATA = "parquet_metadata";
private static final String MODE_SCHEMA = "parquet_schema";
private static final String MODE_FILE_METADATA = "parquet_file_metadata";
private static final String MODE_KV_METADATA = "parquet_kv_metadata";
private static final String MODE_BLOOM_PROBE = "parquet_bloom_probe";
private static final ImmutableSet<String> SUPPORTED_MODES =
ImmutableSet.of(MODE_METADATA, MODE_SCHEMA, MODE_FILE_METADATA, MODE_KV_METADATA,
MODE_BLOOM_PROBE);
private static final ImmutableList<Column> PARQUET_SCHEMA_COLUMNS = ImmutableList.of(
// Align with DuckDB parquet_schema() output.
new Column("file_name", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true),
new Column("name", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true),
new Column("type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true),
new Column("type_length", PrimitiveType.BIGINT, true),
new Column("repetition_type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true),
new Column("num_children", PrimitiveType.BIGINT, true),
new Column("converted_type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true),
new Column("scale", PrimitiveType.BIGINT, true),
new Column("precision", PrimitiveType.BIGINT, true),
new Column("field_id", PrimitiveType.BIGINT, true),
new Column("logical_type", ScalarType.createVarcharType(ScalarType.MAX_VARCHAR_LENGTH), true)
);
private static final ImmutableList<Column> PARQUET_METADATA_COLUMNS = ImmutableList.of(
// Align with DuckDB parquet_metadata() output.
new Column("file_name", ScalarType.createStringType(), true),
new Column("row_group_id", PrimitiveType.BIGINT, true),
new Column("row_group_num_rows", PrimitiveType.BIGINT, true),
new Column("row_group_num_columns", PrimitiveType.BIGINT, true),
new Column("row_group_bytes", PrimitiveType.BIGINT, true),
new Column("column_id", PrimitiveType.BIGINT, true),
new Column("file_offset", PrimitiveType.BIGINT, true),
new Column("num_values", PrimitiveType.BIGINT, true),
new Column("path_in_schema", ScalarType.createStringType(), true),
new Column("type", ScalarType.createStringType(), true),
new Column("stats_min", ScalarType.createStringType(), true),
new Column("stats_max", ScalarType.createStringType(), true),
new Column("stats_null_count", PrimitiveType.BIGINT, true),
new Column("stats_distinct_count", PrimitiveType.BIGINT, true),
new Column("stats_min_value", ScalarType.createStringType(), true),
new Column("stats_max_value", ScalarType.createStringType(), true),
new Column("compression", ScalarType.createStringType(), true),
new Column("encodings", ScalarType.createStringType(), true),
new Column("index_page_offset", PrimitiveType.BIGINT, true),
new Column("dictionary_page_offset", PrimitiveType.BIGINT, true),
new Column("data_page_offset", PrimitiveType.BIGINT, true),
new Column("total_compressed_size", PrimitiveType.BIGINT, true),
new Column("total_uncompressed_size", PrimitiveType.BIGINT, true),
new Column("key_value_metadata", new MapType(
ScalarType.createVarbinaryType(ScalarType.MAX_VARBINARY_LENGTH),
ScalarType.createVarbinaryType(ScalarType.MAX_VARBINARY_LENGTH)), true),
new Column("bloom_filter_offset", PrimitiveType.BIGINT, true),
new Column("bloom_filter_length", PrimitiveType.BIGINT, true),
new Column("min_is_exact", PrimitiveType.BOOLEAN, true),
new Column("max_is_exact", PrimitiveType.BOOLEAN, true),
new Column("row_group_compressed_bytes", PrimitiveType.BIGINT, true)
);
private static final ImmutableList<Column> PARQUET_FILE_METADATA_COLUMNS = ImmutableList.of(
new Column("file_name", PrimitiveType.STRING, true),
new Column("created_by", PrimitiveType.STRING, true),
new Column("num_rows", PrimitiveType.BIGINT, true),
new Column("num_row_groups", PrimitiveType.BIGINT, true),
new Column("format_version", PrimitiveType.BIGINT, true),
new Column("encryption_algorithm", PrimitiveType.STRING, true),
new Column("footer_signing_key_metadata", PrimitiveType.STRING, true)
);
private static final ImmutableList<Column> PARQUET_KV_METADATA_COLUMNS = ImmutableList.of(
new Column("file_name", PrimitiveType.STRING, true),
new Column("key", ScalarType.createStringType(), true),
new Column("value", ScalarType.createStringType(), true)
);
private static final ImmutableList<Column> PARQUET_BLOOM_PROBE_COLUMNS = ImmutableList.of(
new Column("file_name", PrimitiveType.STRING, true),
new Column("row_group_id", PrimitiveType.INT, true),
// 1 = excluded by BF, 0 = might contain, -1 = no bloom filter in file
new Column("bloom_filter_excludes", PrimitiveType.INT, true)
);
private final List<String> paths;
private final String mode;
// File system info for remote Parquet access (e.g. S3).
private final TFileType fileType;
private final Map<String, String> properties;
private final String bloomColumn;
private final String bloomLiteral;
public ParquetMetadataTableValuedFunction(Map<String, String> params) throws AnalysisException {
Map<String, String> normalizedParams = params.entrySet().stream().collect(Collectors.toMap(
entry -> entry.getKey().toLowerCase(),
Map.Entry::getValue,
(value1, value2) -> value2
));
String rawUri = normalizedParams.get(ExternalFileTableValuedFunction.URI_KEY);
boolean uriProvided = !Strings.isNullOrEmpty(rawUri);
String rawPath = uriProvided ? rawUri : normalizedParams.get(LocalProperties.PROP_FILE_PATH);
if (Strings.isNullOrEmpty(rawPath)) {
throw new AnalysisException(
"Property 'uri' or 'file_path' is required for parquet_meta");
}
String parsedPath = rawPath.trim();
if (parsedPath.isEmpty()) {
throw new AnalysisException(
"Property 'uri' or 'file_path' must contain at least one location");
}
String rawMode = normalizedParams.getOrDefault(MODE, MODE_METADATA);
mode = rawMode.toLowerCase();
if (!SUPPORTED_MODES.contains(mode)) {
throw new AnalysisException("Unsupported mode '" + rawMode + "' for parquet_meta");
}
String tmpBloomColumn = null;
String tmpBloomLiteral = null;
if (MODE_BLOOM_PROBE.equals(mode)) {
tmpBloomColumn = normalizedParams.get(COLUMN);
tmpBloomLiteral = normalizedParams.get(VALUE);
if (Strings.isNullOrEmpty(tmpBloomColumn) || Strings.isNullOrEmpty(tmpBloomLiteral)) {
throw new AnalysisException(
"Missing 'column' or 'value' for mode parquet_bloom_probe");
}
tmpBloomColumn = tmpBloomColumn.trim();
tmpBloomLiteral = tmpBloomLiteral.trim();
if (tmpBloomColumn.isEmpty() || tmpBloomLiteral.isEmpty()) {
throw new AnalysisException(
"Missing 'column' or 'value' for mode parquet_bloom_probe");
}
}
String scheme = null;
try {
scheme = new URI(parsedPath).getScheme();
} catch (URISyntaxException e) {
scheme = null;
}
if (uriProvided) {
if (Strings.isNullOrEmpty(scheme)) {
throw new AnalysisException("Property 'uri' must contain a scheme for parquet_meta");
}
} else if (!Strings.isNullOrEmpty(scheme)) {
throw new AnalysisException("Property 'file_path' must not contain a scheme for parquet_meta");
}
Map<String, String> storageParams = new HashMap<>(normalizedParams);
// StorageProperties detects provider by "uri".
if (uriProvided) {
storageParams.put(ExternalFileTableValuedFunction.URI_KEY, parsedPath);
} else {
// Local file path, hint local fs support.
storageParams.put(StorageProperties.FS_LOCAL_SUPPORT, "true");
storageParams.put(LocalProperties.PROP_FILE_PATH, parsedPath);
}
StorageProperties storageProperties;
try {
storageProperties = StorageProperties.createPrimary(storageParams);
} catch (RuntimeException e) {
throw new AnalysisException(
"Failed to parse storage properties for parquet_meta: " + e.getMessage(), e);
}
this.fileType = mapToFileType(storageProperties.getType());
Map<String, String> backendProps = storageProperties.getBackendConfigProperties();
String normalizedPath;
try {
normalizedPath = storageProperties.validateAndNormalizeUri(parsedPath);
} catch (UserException e) {
throw new AnalysisException(
"Failed to normalize parquet_meta paths: " + e.getMessage(), e);
}
this.properties = backendProps;
// Expand any glob patterns (e.g. *.parquet) to concrete file paths.
List<String> normalizedPaths =
expandGlobPath(normalizedPath, storageProperties, storageParams, this.fileType);
this.paths = ImmutableList.copyOf(normalizedPaths);
this.bloomColumn = tmpBloomColumn;
this.bloomLiteral = tmpBloomLiteral;
}
/**
* Expand a wildcard path to matching files.
*/
private static List<String> expandGlobPath(String inputPath,
StorageProperties storageProperties,
Map<String, String> storageParams,
TFileType fileType) throws AnalysisException {
if (Strings.isNullOrEmpty(inputPath)) {
return Collections.emptyList();
}
if (!containsWildcards(inputPath)) {
return ImmutableList.of(inputPath);
}
List<String> expanded =
expandSingleGlob(inputPath, storageProperties, storageParams, fileType);
if (expanded.isEmpty()) {
throw new AnalysisException("No files matched parquet_meta path patterns: " + inputPath);
}
return expanded;
}
private static boolean containsWildcards(String path) {
if (Strings.isNullOrEmpty(path)) {
return false;
}
return path.contains("*") || path.contains("[") || path.contains("{");
}
private static List<String> expandSingleGlob(String pattern,
StorageProperties storageProperties,
Map<String, String> storageParams,
TFileType fileType) throws AnalysisException {
if (fileType == TFileType.FILE_LOCAL) {
Map<String, String> localProps = new HashMap<>(storageParams);
// Allow Local TVF to pick any alive backend when backend_id is not provided.
localProps.putIfAbsent(LocalTableValuedFunction.PROP_SHARED_STORAGE, "true");
localProps.putIfAbsent(FileFormatConstants.PROP_FORMAT, FileFormatConstants.FORMAT_PARQUET);
// Local TVF expects the uri/path in properties; storageParams already contains it.
LocalTableValuedFunction localTvf = new LocalTableValuedFunction(localProps);
return localTvf.getFileStatuses().stream()
.filter(status -> !status.isIsDir())
.map(TBrokerFileStatus::getPath)
.collect(Collectors.toList());
}
if (fileType == TFileType.FILE_HTTP) {
throw new AnalysisException("Glob patterns are not supported for file type: " + fileType);
}
if (storageProperties == null) {
throw new AnalysisException("Storage properties is required for glob pattern: " + pattern);
}
if (fileType == TFileType.FILE_S3 || fileType == TFileType.FILE_HDFS) {
return globRemoteWithBroker(pattern, storageParams);
}
throw new AnalysisException("Glob patterns are not supported for file type: " + fileType);
}
private static List<String> globRemoteWithBroker(String pattern,
Map<String, String> storageParams) throws AnalysisException {
List<TBrokerFileStatus> remoteFiles = new ArrayList<>();
BrokerDesc brokerDesc = new BrokerDesc("ParquetMetaTvf", storageParams);
try {
BrokerUtil.parseFile(pattern, brokerDesc, remoteFiles);
} catch (UserException e) {
throw new AnalysisException("Failed to expand glob pattern '" + pattern + "': "
+ e.getMessage(), e);
}
return remoteFiles.stream()
.filter(file -> !file.isIsDir())
.map(TBrokerFileStatus::getPath)
.collect(Collectors.toList());
}
/**
* Map FE storage type to BE file type.
*/
private static TFileType mapToFileType(StorageProperties.Type type) throws AnalysisException {
switch (type) {
case HDFS:
case OSS_HDFS:
return TFileType.FILE_HDFS;
case HTTP:
return TFileType.FILE_HTTP;
case LOCAL:
return TFileType.FILE_LOCAL;
case S3:
case OSS:
case OBS:
case COS:
case GCS:
case MINIO:
case AZURE:
return TFileType.FILE_S3;
default:
throw new AnalysisException("Unsupported storage type for parquet_meta: " + type);
}
}
@Override
public TMetadataType getMetadataType() {
return TMetadataType.PARQUET;
}
@Override
public TMetaScanRange getMetaScanRange(List<String> requiredFields) {
TParquetMetadataParams parquetParams = new TParquetMetadataParams();
parquetParams.setPaths(paths);
parquetParams.setMode(mode);
parquetParams.setFileType(fileType);
parquetParams.setProperties(properties);
if (MODE_BLOOM_PROBE.equals(mode)) {
parquetParams.setBloomColumn(bloomColumn);
parquetParams.setBloomLiteral(bloomLiteral);
}
TMetaScanRange scanRange = new TMetaScanRange();
scanRange.setMetadataType(TMetadataType.PARQUET);
scanRange.setParquetParams(parquetParams);
// Fan out: one file per split so MetadataScanNode can distribute across BEs.
if (paths != null && !paths.isEmpty()) {
scanRange.setSerializedSplits(paths);
}
return scanRange;
}
@Override
public String getTableName() {
return "ParquetMetadataTableValuedFunction<" + mode + ">";
}
@Override
public List<Column> getTableColumns() {
if (MODE_SCHEMA.equals(mode)) {
return PARQUET_SCHEMA_COLUMNS;
}
if (MODE_FILE_METADATA.equals(mode)) {
return PARQUET_FILE_METADATA_COLUMNS;
}
if (MODE_KV_METADATA.equals(mode)) {
return PARQUET_KV_METADATA_COLUMNS;
}
if (MODE_BLOOM_PROBE.equals(mode)) {
return PARQUET_BLOOM_PROBE_COLUMNS;
}
return PARQUET_METADATA_COLUMNS;
}
}