PluginDrivenScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.analysis.CastExpr;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprToSqlVisitor;
import org.apache.doris.analysis.ToSqlParams;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.handle.ConnectorColumnHandle;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.handle.PassthroughQueryTableHandle;
import org.apache.doris.connector.api.pushdown.ConnectorExpression;
import org.apache.doris.connector.api.pushdown.ConnectorFilterConstraint;
import org.apache.doris.connector.api.pushdown.FilterApplicationResult;
import org.apache.doris.connector.api.pushdown.LimitApplicationResult;
import org.apache.doris.connector.api.pushdown.ProjectionApplicationResult;
import org.apache.doris.connector.api.scan.ConnectorScanPlanProvider;
import org.apache.doris.connector.api.scan.ConnectorScanRange;
import org.apache.doris.connector.api.scan.ScanNodePropertiesResult;
import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.spi.Split;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TFileAttributes;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileRangeDesc;
import org.apache.doris.thrift.TFileTextScanRangeParams;
import org.apache.doris.thrift.TTableFormatFileDesc;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
/**
* Generic scan node that delegates scan planning to the connector SPI.
*
* <p>Replaces connector-specific ScanNode subclasses for plugin-driven catalogs.
* Extends {@link FileQueryScanNode} to reuse the existing split-to-Thrift
* conversion pipeline. Uses {@code FORMAT_JNI} by default, which routes
* to BE's JNI scanner framework.</p>
*
* <p>Scan flow:</p>
* <ol>
* <li>{@link #getSplits} calls {@link ConnectorScanPlanProvider#planScan}
* to obtain {@link ConnectorScanRange}s from the connector plugin</li>
* <li>Each range is wrapped in a {@link PluginDrivenSplit}</li>
* <li>{@link FileQueryScanNode#createScanRangeLocations} distributes splits
* to backends</li>
* <li>{@link #setScanParams} populates {@link TTableFormatFileDesc} with
* connector-specific properties for each split</li>
* </ol>
*/
public class PluginDrivenScanNode extends FileQueryScanNode {
private static final Logger LOG = LogManager.getLogger(PluginDrivenScanNode.class);
private static final String TABLE_FORMAT_TYPE = "plugin_driven";
/** Scan node property keys (shared with connector plugins). */
private static final String PROP_FILE_FORMAT_TYPE = "file_format_type";
private static final String PROP_PATH_PARTITION_KEYS = "path_partition_keys";
private static final String PROP_LOCATION_PREFIX = "location.";
private static final String PROP_HIVE_TEXT_PREFIX = "hive.text.";
private final Connector connector;
private final ConnectorSession connectorSession;
// Set during filter pushdown; may be updated from the original table handle.
private ConnectorTableHandle currentHandle;
// Nereids partition-pruning result, injected by the translator. Defaults to NOT_PRUNED
// so that connectors / non-partitioned tables read all partitions unless pruning applies.
private SelectedPartitions selectedPartitions = SelectedPartitions.NOT_PRUNED;
// Cached isBatchMode() result. isBatchMode is read on both the dispatch (FileQueryScanNode)
// and explain (FileScanNode) paths and num_partitions_in_batch_mode is fuzzy, so cache it to
// keep the decision stable across reads (mirrors IcebergScanNode).
private Boolean isBatchModeCache;
// Populated from ConnectorScanPlanProvider.getScanNodePropertiesResult()
private ScanNodePropertiesResult cachedPropertiesResult;
private Map<String, String> scanNodeProperties;
// Maps filtered conjunct indices (after CAST removal) back to original conjunct indices
private List<Integer> filteredToOriginalIndex;
public PluginDrivenScanNode(PlanNodeId id, TupleDescriptor desc,
boolean needCheckColumnPriv, SessionVariable sv,
ScanContext scanContext, Connector connector,
ConnectorSession connectorSession, ConnectorTableHandle tableHandle) {
super(id, desc, "PluginDrivenScanNode", scanContext, needCheckColumnPriv, sv);
this.connector = connector;
this.connectorSession = connectorSession;
this.currentHandle = tableHandle;
}
/**
* Creates a PluginDrivenScanNode by resolving the connector, session, and table handle
* from the plugin-driven catalog and table.
*/
public static PluginDrivenScanNode create(PlanNodeId id, TupleDescriptor desc,
boolean needCheckColumnPriv, SessionVariable sv,
ScanContext scanContext, PluginDrivenExternalCatalog catalog,
PluginDrivenExternalTable table) {
Connector connector = catalog.getConnector();
ConnectorSession session = catalog.buildConnectorSession();
ConnectorMetadata metadata = connector.getMetadata(session);
String dbName = table.getDb() != null ? table.getDb().getRemoteName() : "";
String tableName = table.getRemoteName();
ConnectorTableHandle handle = metadata.getTableHandle(session, dbName, tableName)
.orElseThrow(() -> new RuntimeException(
"Table handle not found for plugin-driven table: " + dbName + "." + tableName));
return new PluginDrivenScanNode(id, desc, needCheckColumnPriv, sv,
scanContext, connector, session, handle);
}
/**
* Injects the Nereids partition-pruning result. Called by the translator so the pruned
* partition set can be pushed down to the connector's scan plan (see {@link #getSplits}).
*/
public void setSelectedPartitions(SelectedPartitions selectedPartitions) {
this.selectedPartitions = selectedPartitions;
}
/**
* Resolves the pruned partition spec strings to push to the connector SPI.
*
* <p>Mirrors legacy {@code MaxComputeScanNode.getSplits()} three-state handling:</p>
* <ul>
* <li>not pruned (NOT_PRUNED / non-partitioned) → {@code null}: scan all partitions;</li>
* <li>pruned to a non-empty set → that set's partition names;</li>
* <li>pruned to zero partitions → empty list: caller short-circuits with no splits.</li>
* </ul>
*/
static List<String> resolveRequiredPartitions(SelectedPartitions selectedPartitions) {
if (selectedPartitions == null || !selectedPartitions.isPruned) {
return null;
}
return new ArrayList<>(selectedPartitions.selectedPartitions.keySet());
}
/**
* Partition counts to surface on this scan node — {@code {selectedPartitionNum, totalPartitionNum}}
* — or {@code null} to leave the fields at their default (nothing to show). Drives the EXPLAIN
* {@code partition=N/M} line and SQL-block-rule enforcement (via {@code getSelectedPartitionNum()}).
*
* <p>Mirrors legacy {@code MaxComputeScanNode}'s display gate: any <em>real</em> partition selection
* reports {@code size/total}, whereas the {@link SelectedPartitions#NOT_PRUNED} sentinel
* (non-partitioned table, or one not supporting internal pruning) reports nothing.</p>
*
* <p>The gate is {@code != NOT_PRUNED}, deliberately <b>not</b> {@code isPruned}: a partitioned table
* queried without a partition predicate keeps the initial all-partitions selection from
* {@link ExternalTable#initSelectedPartitions} ({@code isPruned=false} but a full, non-{@code
* NOT_PRUNED} map; {@code PruneFileScanPartition} only runs under a {@code LogicalFilter}), and must
* still report {@code partition=total/total} (e.g. {@code SELECT *} over a 2-partition table →
* {@code 2/2}). An {@code isPruned} gate wrongly shows {@code 0/0}. This differs from the connector
* pushdown gate ({@link #resolveRequiredPartitions}, which stays {@code isPruned}): an unpruned scan
* must read ALL partitions, so it pushes no partition restriction.</p>
*/
static long[] displayPartitionCounts(SelectedPartitions selectedPartitions) {
if (selectedPartitions == null || selectedPartitions == SelectedPartitions.NOT_PRUNED) {
return null;
}
return new long[] {
selectedPartitions.selectedPartitions.size(), selectedPartitions.totalPartitionNum};
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
if (currentHandle instanceof PassthroughQueryTableHandle) {
output.append(prefix).append("TABLE VALUE FUNCTION\n");
String query = ((PassthroughQueryTableHandle) currentHandle).getQuery();
output.append(prefix).append("QUERY: ").append(query).append("\n");
} else {
Map<String, String> props = getOrLoadScanNodeProperties();
String query = props.get("query");
output.append(prefix).append("TABLE: ")
.append(desc.getTable().getNameWithFullQualifiers()).append("\n");
// Surface the backing connector/catalog type (e.g. es, jdbc, max_compute) so the
// generic node name does not hide which connector this scan delegates to. Reuses the
// same getDatabase().getCatalog() chain getNameWithFullQualifiers() already walks here.
output.append(prefix).append("CONNECTOR: ")
.append(desc.getTable().getDatabase().getCatalog().getType()).append("\n");
if (query != null) {
output.append(prefix).append("QUERY: ").append(query).append("\n");
}
if (!conjuncts.isEmpty()) {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
output.append(prefix).append("PREDICATES: ")
.append(expr.accept(ExprToSqlVisitor.INSTANCE, ToSqlParams.WITH_TABLE))
.append("\n");
}
// Partition-pruning summary (selected/total), mirroring the parent
// FileScanNode.getNodeExplainString()'s `partition=N/M` line. This override replaces the
// parent's body wholesale (custom TABLE/QUERY/PREDICATES format), so it must re-emit the
// line itself; the counts are populated from the Nereids pruning result in
// getSplits()/startSplit() (see setSelectedPartitions).
output.append(prefix).append("partition=").append(selectedPartitionNum)
.append("/").append(totalPartitionNum).append("\n");
// Delegate connector-specific EXPLAIN info to the SPI
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
if (scanProvider != null) {
scanProvider.appendExplainInfo(output, prefix, props);
}
// Show ES terminate_after optimization when limit is pushed to ES
if (limit > 0 && conjuncts.isEmpty()
&& "es_http".equals(props.get(PROP_FILE_FORMAT_TYPE))) {
output.append(prefix).append("ES terminate_after: ")
.append(limit).append("\n");
}
}
if (useTopnFilter()) {
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
.map(node -> node.getId().asInt() + "").collect(Collectors.toList()));
output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n");
}
return output.toString();
}
@Override
protected TFileFormatType getFileFormatType() throws UserException {
Map<String, String> props = getOrLoadScanNodeProperties();
String format = props.get(PROP_FILE_FORMAT_TYPE);
if (format != null) {
return mapFileFormatType(format);
}
return TFileFormatType.FORMAT_JNI;
}
@Override
protected List<String> getPathPartitionKeys() throws UserException {
Map<String, String> props = getOrLoadScanNodeProperties();
String keys = props.get(PROP_PATH_PARTITION_KEYS);
if (keys != null && !keys.isEmpty()) {
return Arrays.asList(keys.split(","));
}
return Collections.emptyList();
}
@Override
protected TableIf getTargetTable() throws UserException {
return desc.getTable();
}
@Override
protected Map<String, String> getLocationProperties() throws UserException {
Map<String, String> props = getOrLoadScanNodeProperties();
Map<String, String> locationProps = new HashMap<>();
for (Map.Entry<String, String> entry : props.entrySet()) {
if (entry.getKey().startsWith(PROP_LOCATION_PREFIX)) {
String realKey = entry.getKey().substring(PROP_LOCATION_PREFIX.length());
locationProps.put(realKey, entry.getValue());
}
}
return locationProps;
}
@Override
protected TFileAttributes getFileAttributes() throws UserException {
Map<String, String> props = getOrLoadScanNodeProperties();
String serDeLib = props.get(PROP_HIVE_TEXT_PREFIX + "serde_lib");
if (serDeLib == null || serDeLib.isEmpty()) {
return new TFileAttributes();
}
TFileAttributes attrs = new TFileAttributes();
String skipLinesStr = props.get(PROP_HIVE_TEXT_PREFIX + "skip_lines");
if (skipLinesStr != null) {
try {
attrs.setSkipLines(Integer.parseInt(skipLinesStr));
} catch (NumberFormatException e) {
// ignore
}
}
TFileTextScanRangeParams textParams = new TFileTextScanRangeParams();
String colSep = props.get(PROP_HIVE_TEXT_PREFIX + "column_separator");
if (colSep != null) {
textParams.setColumnSeparator(colSep);
}
String lineSep = props.get(PROP_HIVE_TEXT_PREFIX + "line_delimiter");
if (lineSep != null) {
textParams.setLineDelimiter(lineSep);
}
String mapkvDelim = props.get(PROP_HIVE_TEXT_PREFIX + "mapkv_delimiter");
if (mapkvDelim != null) {
textParams.setMapkvDelimiter(mapkvDelim);
}
String collDelim = props.get(PROP_HIVE_TEXT_PREFIX + "collection_delimiter");
if (collDelim != null) {
textParams.setCollectionDelimiter(collDelim);
}
String escape = props.get(PROP_HIVE_TEXT_PREFIX + "escape");
if (escape != null && !escape.isEmpty()) {
textParams.setEscape(escape.getBytes()[0]);
}
String nullFmt = props.get(PROP_HIVE_TEXT_PREFIX + "null_format");
if (nullFmt != null) {
textParams.setNullFormat(nullFmt);
}
String enclose = props.get(PROP_HIVE_TEXT_PREFIX + "enclose");
if (enclose != null && !enclose.isEmpty()) {
textParams.setEnclose(enclose.getBytes()[0]);
attrs.setTrimDoubleQuotes(true);
}
attrs.setTextParams(textParams);
attrs.setHeaderType("");
attrs.setEnableTextValidateUtf8(sessionVariable.enableTextValidateUtf8);
String isJson = props.get(PROP_HIVE_TEXT_PREFIX + "is_json");
if ("true".equals(isJson)) {
attrs.setReadJsonByLine(true);
attrs.setReadByColumnDef(true);
}
return attrs;
}
@Override
protected void convertPredicate() {
// Attempt filter pushdown via the connector SPI
if (conjuncts == null || conjuncts.isEmpty()) {
return;
}
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
ConnectorFilterConstraint constraint = buildFilterConstraint(conjuncts);
Optional<FilterApplicationResult<ConnectorTableHandle>> result =
metadata.applyFilter(connectorSession, currentHandle, constraint);
if (result.isPresent()) {
FilterApplicationResult<ConnectorTableHandle> filterResult = result.get();
currentHandle = filterResult.getHandle();
// Consume remainingFilter to avoid duplicate predicate evaluation on BE:
// - null means all predicates were fully pushed down → clear conjuncts
// - non-null means some/all predicates remain → keep conjuncts (conservative)
ConnectorExpression remaining = filterResult.getRemainingFilter();
if (remaining == null) {
conjuncts.clear();
LOG.debug("Filter fully pushed down for plugin-driven scan, cleared conjuncts");
} else {
// Partial or full remaining: keep all conjuncts for BE-side evaluation.
// Fine-grained conjunct removal (matching individual remaining sub-expressions
// back to original Expr conjuncts) is deferred to a future enhancement.
LOG.debug("Filter pushdown accepted with remaining filter, keeping conjuncts");
}
}
// Invalidate cached properties so they are rebuilt with the updated conjuncts/handle.
scanNodeProperties = null;
cachedPropertiesResult = null;
filteredToOriginalIndex = null;
}
/**
* Attempts to push the limit down via the SPI applyLimit() protocol.
* Called before getSplits(), after filter pushdown.
*
* <p>If the connector accepts the limit, the handle is updated.
* The limit is still passed to planScan() as a parameter for
* connectors that handle limit directly in planScan().</p>
*/
private void tryPushDownLimit() {
if (limit <= 0) {
return;
}
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
Optional<LimitApplicationResult<ConnectorTableHandle>> result =
metadata.applyLimit(connectorSession, currentHandle, limit);
if (result.isPresent()) {
currentHandle = result.get().getHandle();
LOG.debug("Limit {} pushed down via applyLimit for plugin-driven scan", limit);
}
}
/**
* Attempts to push the projection down via the SPI applyProjection() protocol.
* Called before getSplits(), after filter and limit pushdown.
*
* <p>If the connector accepts the projection, the handle is updated.</p>
*/
private void tryPushDownProjection(List<ConnectorColumnHandle> columns) {
if (columns.isEmpty()) {
return;
}
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
Optional<ProjectionApplicationResult<ConnectorTableHandle>> result =
metadata.applyProjection(connectorSession, currentHandle, columns);
if (result.isPresent()) {
currentHandle = result.get().getHandle();
LOG.debug("Projection pushed down via applyProjection for plugin-driven scan");
}
}
@Override
public List<Split> getSplits(int numBackends) throws UserException {
// Attempt limit and projection pushdown via SPI protocol
tryPushDownLimit();
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
if (scanProvider == null) {
LOG.warn("Connector does not provide a scan plan provider, returning empty splits");
return Collections.emptyList();
}
// Push the Nereids partition-pruning result down to the connector so the read session
// covers only the surviving partitions. A pruned-to-zero set means no data to read,
// mirroring legacy MaxComputeScanNode.getSplits()'s empty-selection short-circuit.
List<String> requiredPartitions = resolveRequiredPartitions(selectedPartitions);
// Surface the partition counts for EXPLAIN (partition=N/M) and SQL-block-rule enforcement,
// mirroring legacy MaxComputeScanNode.getSplits():720-722. Set BEFORE the pruned-to-zero
// short-circuit below so a 0-partition selection still reports partition=0/total (e.g. WHERE
// part=<absent value>). Batch mode populates these in startSplit() instead. See
// displayPartitionCounts for why the gate covers the no-predicate all-partitions case.
long[] partitionCounts = displayPartitionCounts(selectedPartitions);
if (partitionCounts != null) {
this.selectedPartitionNum = partitionCounts[0];
this.totalPartitionNum = partitionCounts[1];
}
if (requiredPartitions != null && requiredPartitions.isEmpty()) {
return Collections.emptyList();
}
List<ConnectorColumnHandle> columns = buildColumnHandles();
tryPushDownProjection(columns);
Optional<ConnectorExpression> remainingFilter = buildRemainingFilter();
// If buildRemainingFilter stripped non-pushable (CAST) conjuncts (filteredToOriginalIndex
// != null), suppress source-side LIMIT pushdown: the connector now sees a filter that no
// longer reflects those predicates and could apply a LIMIT (e.g. MaxCompute's row-offset
// limit-split optimization, which fires on an empty/partition-only filter) over rows the
// stripped predicate has NOT filtered. Since BE re-evaluates the stripped predicate only on
// the rows the source returns, that would under-return. Legacy disabled limit-split whenever
// a non-partition-equality (incl. CAST) predicate was present; this mirrors it.
long sourceLimit = effectiveSourceLimit(limit, filteredToOriginalIndex != null);
List<ConnectorScanRange> ranges = scanProvider.planScan(
connectorSession, currentHandle, columns, remainingFilter, sourceLimit, requiredPartitions);
List<Split> splits = new ArrayList<>(ranges.size());
for (ConnectorScanRange range : ranges) {
splits.add(new PluginDrivenSplit(range));
}
return splits;
}
/**
* Source-side LIMIT to pass to {@code planScan}: the real limit normally, but {@code -1}
* (no source limit) when non-pushable conjuncts were stripped from the filter. A source LIMIT
* applied before a stripped (BE-only) predicate would return too few rows (BE can only filter
* the returned rows down, not recover rows the source never returned). Extracted as a pure
* static so the correctness-critical decision is unit-testable without a {@link FileQueryScanNode}.
*/
static long effectiveSourceLimit(long limit, boolean nonPushableConjunctsStripped) {
return nonPushableConjunctsStripped ? -1L : limit;
}
/**
* Enables batched / streaming split generation for large partitioned scans, mirroring legacy
* {@code MaxComputeScanNode.isBatchMode()}. Three gates are evaluated generically from state the
* node already holds (partition pruning + slots + the {@code num_partitions_in_batch_mode}
* threshold); the connector-specific gate (legacy {@code odpsTable.getFileNum() > 0}) is
* delegated to {@link ConnectorScanPlanProvider#supportsBatchScan}.
*/
@Override
public boolean isBatchMode() {
if (isBatchModeCache == null) {
isBatchModeCache = computeBatchMode();
}
return isBatchModeCache;
}
private boolean computeBatchMode() {
// getScanPlanProvider() may be null for connectors without scan capability; mirror the
// null-guard in getSplits() so isBatchMode (run on the dispatch + explain paths) never NPEs.
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
boolean supportsBatchScan = scanProvider != null
&& scanProvider.supportsBatchScan(connectorSession, currentHandle);
return shouldUseBatchMode(selectedPartitions, !desc.getSlots().isEmpty(),
supportsBatchScan, sessionVariable.getNumPartitionsInBatchMode());
}
/**
* Pure batch-mode gate, mirroring legacy {@code MaxComputeScanNode.isBatchMode()} (its connector
* {@code odpsTable.getFileNum() > 0} check is folded into {@code supportsBatchScan}). Extracted
* as a static helper so the four-input decision is unit-testable without constructing a
* {@link FileQueryScanNode} (the async/wiring half is covered by live e2e — see DV-019).
*
* <ul>
* <li>not partitioned / not pruned ({@code selectedPartitions} null or {@code !isPruned}) → false;</li>
* <li>no required slots → false;</li>
* <li>connector does not support batch scan (incl. no scan provider) → false;</li>
* <li>otherwise batch iff {@code numPartitionsInBatchMode > 0} and the pruned partition count
* reaches that threshold.</li>
* </ul>
*
* <p>The {@code !isPruned} check subsumes BOTH legacy gates ({@code getPartitionColumns().isEmpty()}
* and the reference check {@code != NOT_PRUNED}): a non-partitioned external table always carries
* {@code NOT_PRUNED} (which has {@code isPruned=false}), so collapsing them is not a dropped gate —
* it is in fact marginally stronger than legacy's reference identity check.</p>
*/
static boolean shouldUseBatchMode(SelectedPartitions selectedPartitions, boolean hasSlots,
boolean supportsBatchScan, int numPartitionsInBatchMode) {
if (selectedPartitions == null || !selectedPartitions.isPruned) {
return false;
}
if (!hasSlots) {
return false;
}
if (!supportsBatchScan) {
return false;
}
return numPartitionsInBatchMode > 0
&& selectedPartitions.selectedPartitions.size() >= numPartitionsInBatchMode;
}
@Override
public int numApproximateSplits() {
// Number of pruned partitions; must be non-negative in batch mode (FileQueryScanNode rejects
// negative). Under the isBatchMode gate this is >= num_partitions_in_batch_mode >= 1.
return selectedPartitions == null ? -1 : selectedPartitions.selectedPartitions.size();
}
/**
* Asynchronously generates splits in batches of {@code num_partitions_in_batch_mode} partitions,
* streaming each batch into {@link #splitAssignment}. Mirrors legacy
* {@code MaxComputeScanNode.startSplit}: one read session per partition batch (built by the
* connector via {@link ConnectorScanPlanProvider#planScanForPartitionBatch}) on the shared
* schedule executor, with the same completion/error protocol against {@code SplitAssignment}.
*
* <p>Batch mode deliberately does NOT push the limit (passes {@code -1}): legacy's batch path
* ignores limit, and the LIMIT-split optimization stays on the non-batch {@link #getSplits}
* path only (the two are mutually exclusive).</p>
*/
@Override
public void startSplit(int numBackends) {
long[] partitionCounts = displayPartitionCounts(selectedPartitions);
if (partitionCounts != null) {
this.selectedPartitionNum = partitionCounts[0];
this.totalPartitionNum = partitionCounts[1];
}
if (selectedPartitions.selectedPartitions.isEmpty()) {
// Unreachable under the isBatchMode gate (size >= num_partitions_in_batch_mode >= 1);
// kept for fidelity with legacy MaxComputeScanNode.startSplit's empty short-circuit.
return;
}
// Mirror getSplits()'s projection + filter pushdown (but NOT the limit) before going async.
// tryPushDownProjection mutates currentHandle, so capture the resolved handle afterwards.
final List<ConnectorColumnHandle> columns = buildColumnHandles();
tryPushDownProjection(columns);
final Optional<ConnectorExpression> remainingFilter = buildRemainingFilter();
final ConnectorTableHandle handle = currentHandle;
final ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
final List<String> allPartitions =
new ArrayList<>(selectedPartitions.selectedPartitions.keySet());
final int batchSize = sessionVariable.getNumPartitionsInBatchMode();
Executor scheduleExecutor = Env.getCurrentEnv().getExtMetaCacheMgr().getScheduleExecutor();
AtomicReference<UserException> batchException = new AtomicReference<>(null);
AtomicInteger numFinishedPartitions = new AtomicInteger(0);
CompletableFuture.runAsync(() -> {
for (int begin = 0; begin < allPartitions.size(); begin += batchSize) {
int end = Math.min(begin + batchSize, allPartitions.size());
if (batchException.get() != null || splitAssignment.isStop()) {
break;
}
List<String> batch = allPartitions.subList(begin, end);
int curBatchSize = end - begin;
try {
CompletableFuture.runAsync(() -> {
try {
List<ConnectorScanRange> ranges = scanProvider.planScanForPartitionBatch(
connectorSession, handle, columns, remainingFilter, -1L, batch);
List<Split> batchSplits = new ArrayList<>(ranges.size());
for (ConnectorScanRange range : ranges) {
batchSplits.add(new PluginDrivenSplit(range));
}
if (splitAssignment.needMoreSplit()) {
splitAssignment.addToQueue(batchSplits);
}
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
} finally {
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
if (numFinishedPartitions.addAndGet(curBatchSize) == allPartitions.size()) {
splitAssignment.finishSchedule();
}
}
}, scheduleExecutor);
} catch (Exception e) {
batchException.set(new UserException(e.getMessage(), e));
}
if (batchException.get() != null) {
splitAssignment.setException(batchException.get());
}
}
}, scheduleExecutor);
}
@Override
protected void setScanParams(TFileRangeDesc rangeDesc, Split split) {
if (!(split instanceof PluginDrivenSplit)) {
return;
}
PluginDrivenSplit pluginSplit = (PluginDrivenSplit) split;
ConnectorScanRange scanRange = pluginSplit.getConnectorScanRange();
TTableFormatFileDesc tableFormatFileDesc = new TTableFormatFileDesc();
tableFormatFileDesc.setTableFormatType(scanRange.getTableFormatType());
// Delegate format-specific Thrift construction to the connector SPI
scanRange.populateRangeParams(tableFormatFileDesc, rangeDesc);
rangeDesc.setTableFormatParams(tableFormatFileDesc);
}
@Override
protected Optional<String> getSerializedTable() {
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
if (scanProvider != null) {
Map<String, String> props = getOrLoadScanNodeProperties();
String serialized = scanProvider.getSerializedTable(props);
if (serialized != null) {
return Optional.of(serialized);
}
}
return Optional.empty();
}
@Override
public void createScanRangeLocations() throws UserException {
super.createScanRangeLocations();
// Delegate scan-level Thrift params to the connector SPI
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
if (scanProvider != null) {
Map<String, String> props = getOrLoadScanNodeProperties();
scanProvider.populateScanLevelParams(params, props);
}
pruneConjunctsFromNodeProperties();
// Push down limit to ES via terminate_after optimization.
// When all predicates are pushed to ES (conjuncts empty) and limit fits in one batch,
// ES can use terminate_after to stop scanning early instead of scrolling all results.
if (limit > 0 && limit <= sessionVariable.batchSize && conjuncts.isEmpty()
&& params.isSetEsProperties()) {
params.getEsProperties().put("limit", String.valueOf(limit));
}
}
/**
* Prunes pushed-down conjuncts using the structured result from
* {@link ConnectorScanPlanProvider#getScanNodePropertiesResult()}.
*
* <p>Only conjuncts whose indices are in the not-pushed set are retained.
* If the connector has no not-pushed tracking (empty set), all conjuncts
* are considered pushed and cleared.</p>
*/
private void pruneConjunctsFromNodeProperties() {
if (conjuncts == null || conjuncts.isEmpty()) {
return;
}
ScanNodePropertiesResult result = getOrLoadPropertiesResult();
if (!result.hasConjunctTracking()) {
// No conjunct tracking — do not prune (keep all conjuncts for safety)
return;
}
// notPushedSet indices are relative to the filtered conjunct list
// (after CAST expr removal). Map them back to original conjunct indices.
Set<Integer> notPushedSet = result.getNotPushedConjunctIndices();
Set<Integer> originalNotPushed = new HashSet<>();
if (filteredToOriginalIndex != null) {
for (int filteredIdx : notPushedSet) {
if (filteredIdx < filteredToOriginalIndex.size()) {
originalNotPushed.add(filteredToOriginalIndex.get(filteredIdx));
}
}
} else {
// No CAST filtering was applied — indices map 1:1
originalNotPushed.addAll(notPushedSet);
}
// Also keep any conjuncts that were filtered out (CAST expressions)
// since those were never sent to the connector for pushdown
if (filteredToOriginalIndex != null) {
Set<Integer> sentToConnector = new HashSet<>(filteredToOriginalIndex);
for (int i = 0; i < conjuncts.size(); i++) {
if (!sentToConnector.contains(i)) {
originalNotPushed.add(i);
}
}
}
List<Expr> remaining = new ArrayList<>();
for (int i = 0; i < conjuncts.size(); i++) {
if (originalNotPushed.contains(i)) {
remaining.add(conjuncts.get(i));
}
}
conjuncts.clear();
conjuncts.addAll(remaining);
}
/**
* Lazily loads and caches the ScanNodePropertiesResult from the connector.
* Both getOrLoadScanNodeProperties() and pruneConjunctsFromNodeProperties()
* use this to avoid redundant computation.
*/
private ScanNodePropertiesResult getOrLoadPropertiesResult() {
if (cachedPropertiesResult == null) {
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
if (scanProvider != null) {
List<ConnectorColumnHandle> columns = buildColumnHandles();
Optional<ConnectorExpression> filter = buildRemainingFilter();
cachedPropertiesResult = scanProvider.getScanNodePropertiesResult(
connectorSession, currentHandle, columns, filter);
}
if (cachedPropertiesResult == null) {
cachedPropertiesResult = new ScanNodePropertiesResult(Collections.emptyMap());
}
}
return cachedPropertiesResult;
}
/**
* Lazily loads scan node properties from the connector's scan plan provider.
*/
private Map<String, String> getOrLoadScanNodeProperties() {
if (scanNodeProperties == null) {
scanNodeProperties = getOrLoadPropertiesResult().getProperties();
if (scanNodeProperties == null) {
scanNodeProperties = Collections.emptyMap();
}
}
return scanNodeProperties;
}
/**
* Maps a file format name string to the corresponding TFileFormatType.
*/
private static TFileFormatType mapFileFormatType(String format) {
switch (format.toLowerCase()) {
case "parquet":
return TFileFormatType.FORMAT_PARQUET;
case "orc":
return TFileFormatType.FORMAT_ORC;
case "text":
case "csv":
return TFileFormatType.FORMAT_CSV_PLAIN;
case "json":
return TFileFormatType.FORMAT_JSON;
case "avro":
return TFileFormatType.FORMAT_AVRO;
case "es_http":
return TFileFormatType.FORMAT_ES_HTTP;
default:
return TFileFormatType.FORMAT_JNI;
}
}
/**
* Builds column handles from the tuple descriptor's slot descriptors.
* These tell the connector which columns are needed for the query,
* enabling optimized column selection (e.g., SELECT col1, col2 instead of SELECT *).
*/
private List<ConnectorColumnHandle> buildColumnHandles() {
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
Map<String, ConnectorColumnHandle> allHandles =
metadata.getColumnHandles(connectorSession, currentHandle);
if (allHandles.isEmpty()) {
return Collections.emptyList();
}
List<ConnectorColumnHandle> selected = new ArrayList<>();
for (org.apache.doris.analysis.SlotDescriptor slot : desc.getSlots()) {
if (slot.getColumn() != null) {
ConnectorColumnHandle ch = allHandles.get(slot.getColumn().getName());
if (ch != null) {
selected.add(ch);
}
}
}
return selected;
}
/**
* Builds a {@link ConnectorFilterConstraint} from the current conjuncts.
*/
private ConnectorFilterConstraint buildFilterConstraint(List<Expr> exprs) {
ConnectorExpression combined = ExprToConnectorExpressionConverter.convertConjuncts(exprs);
return new ConnectorFilterConstraint(combined, Collections.emptyMap());
}
/**
* Builds the remaining filter expression from unconsumed conjuncts.
* If no conjuncts remain, returns {@link Optional#empty()}.
* Filters out CAST-containing predicates when the connector does not support CAST pushdown.
*/
private Optional<ConnectorExpression> buildRemainingFilter() {
if (conjuncts == null || conjuncts.isEmpty()) {
filteredToOriginalIndex = null;
return Optional.empty();
}
List<Expr> pushableConjuncts = conjuncts;
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
if (!metadata.supportsCastPredicatePushdown(connectorSession)) {
filteredToOriginalIndex = new ArrayList<>();
pushableConjuncts = new ArrayList<>();
for (int i = 0; i < conjuncts.size(); i++) {
if (!containsCastExpr(conjuncts.get(i))) {
pushableConjuncts.add(conjuncts.get(i));
filteredToOriginalIndex.add(i);
}
}
// If no filtering occurred, clear the mapping (1:1)
if (filteredToOriginalIndex.size() == conjuncts.size()) {
filteredToOriginalIndex = null;
}
} else {
filteredToOriginalIndex = null;
}
if (pushableConjuncts.isEmpty()) {
return Optional.empty();
}
return Optional.of(ExprToConnectorExpressionConverter.convertConjuncts(pushableConjuncts));
}
private static boolean containsCastExpr(Expr expr) {
List<CastExpr> castExprs = new ArrayList<>();
expr.collect(CastExpr.class, castExprs);
return !castExprs.isEmpty();
}
}