PluginDrivenEsScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.common.UserException;
import org.apache.doris.connector.api.Connector;
import org.apache.doris.connector.api.ConnectorMetadata;
import org.apache.doris.connector.api.ConnectorSession;
import org.apache.doris.connector.api.handle.ConnectorColumnHandle;
import org.apache.doris.connector.api.handle.ConnectorTableHandle;
import org.apache.doris.connector.api.handle.NamedColumnHandle;
import org.apache.doris.connector.api.pushdown.ConnectorExpression;
import org.apache.doris.connector.api.pushdown.ConnectorFilterConstraint;
import org.apache.doris.connector.api.pushdown.FilterApplicationResult;
import org.apache.doris.connector.api.pushdown.ProjectionApplicationResult;
import org.apache.doris.connector.api.scan.ConnectorScanPlanProvider;
import org.apache.doris.connector.api.scan.ConnectorScanRange;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanContext;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TEsScanNode;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
/**
* Scan node for plugin-driven ES catalogs.
*
* <p>Extends {@link ExternalScanNode} (like the legacy EsScanNode) to produce
* {@code ES_HTTP_SCAN_NODE} Thrift types. All ES-specific logic (shard routing,
* query DSL building, field context resolution) is delegated to the connector
* plugin via {@link ConnectorScanPlanProvider}.</p>
*
* <p>This node is used when a {@link PluginDrivenExternalTable} belongs to an
* ES-type connector (detected via {@code getScanRangeType() == ES_SCAN}).</p>
*/
public class PluginDrivenEsScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(PluginDrivenEsScanNode.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final TypeReference<Map<String, String>> MAP_TYPE_REF =
new TypeReference<Map<String, String>>() {};
private final Connector connector;
private final ConnectorSession connectorSession;
private ConnectorTableHandle currentHandle;
// Populated during finalize
private Map<String, String> nodeProperties = Collections.emptyMap();
public PluginDrivenEsScanNode(PlanNodeId id, TupleDescriptor desc,
ScanContext scanContext, Connector connector,
ConnectorSession connectorSession, ConnectorTableHandle tableHandle) {
super(id, desc, "PluginDrivenEsScanNode", scanContext, false);
this.connector = connector;
this.connectorSession = connectorSession;
this.currentHandle = tableHandle;
}
/**
* Creates from catalog and table references.
*/
public static PluginDrivenEsScanNode create(PlanNodeId id, TupleDescriptor desc,
ScanContext scanContext, PluginDrivenExternalCatalog catalog,
PluginDrivenExternalTable table) {
Connector conn = catalog.getConnector();
ConnectorSession session = catalog.buildConnectorSession();
ConnectorMetadata metadata = conn.getMetadata(session);
String dbName = table.getDb() != null ? table.getDb().getRemoteName() : "";
String tableName = table.getRemoteName();
ConnectorTableHandle handle = metadata.getTableHandle(session, dbName, tableName)
.orElseThrow(() -> new RuntimeException(
"Table handle not found for ES table: " + dbName + "." + tableName));
return new PluginDrivenEsScanNode(id, desc, scanContext, conn, session, handle);
}
@Override
public void finalizeForNereids() throws UserException {
// Attempt filter pushdown
pushdownFilter();
// Build scan ranges and node properties
createScanRangeLocations();
}
private void pushdownFilter() {
if (conjuncts == null || conjuncts.isEmpty()) {
return;
}
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
ConnectorExpression combined = ExprToConnectorExpressionConverter.convertConjuncts(conjuncts);
ConnectorFilterConstraint constraint = new ConnectorFilterConstraint(
combined, Collections.emptyMap());
Optional<FilterApplicationResult<ConnectorTableHandle>> result =
metadata.applyFilter(connectorSession, currentHandle, constraint);
if (result.isPresent()) {
FilterApplicationResult<ConnectorTableHandle> filterResult = result.get();
currentHandle = filterResult.getHandle();
ConnectorExpression remaining = filterResult.getRemainingFilter();
if (remaining == null) {
conjuncts.clear();
LOG.debug("ES filter fully pushed down via applyFilter, cleared conjuncts");
} else {
LOG.debug("ES filter pushdown accepted with remaining, keeping conjuncts");
}
}
}
/**
* Attempts to push the projection down via the SPI applyProjection() protocol.
*/
private void tryPushDownProjection(List<ConnectorColumnHandle> columns) {
if (columns.isEmpty()) {
return;
}
ConnectorMetadata metadata = connector.getMetadata(connectorSession);
Optional<ProjectionApplicationResult<ConnectorTableHandle>> result =
metadata.applyProjection(connectorSession, currentHandle, columns);
if (result.isPresent()) {
currentHandle = result.get().getHandle();
LOG.debug("ES projection pushed down via applyProjection");
}
}
@Override
protected void createScanRangeLocations() throws UserException {
ConnectorScanPlanProvider scanProvider = connector.getScanPlanProvider();
List<ConnectorColumnHandle> columns = buildColumnHandles();
// Attempt projection pushdown via SPI protocol
tryPushDownProjection(columns);
Optional<ConnectorExpression> remainingFilter = buildRemainingFilter();
// Get scan ranges (shard routing)
List<ConnectorScanRange> ranges = scanProvider.planScan(
connectorSession, currentHandle, columns, remainingFilter);
// Get node-level properties (query DSL, auth, doc values mode)
nodeProperties = scanProvider.getScanNodeProperties(
connectorSession, currentHandle, columns, remainingFilter);
// Remove pushed-down conjuncts to avoid redundant evaluation by BE.
// The connector returns indices of conjuncts that could NOT be pushed.
pruneConjuncts(nodeProperties);
// Convert to Thrift scan range locations
for (ConnectorScanRange range : ranges) {
TScanRangeLocations locations = convertToThrift(range);
scanRangeLocations.add(locations);
}
if (LOG.isDebugEnabled()) {
LOG.debug("ES plugin scan: {} ranges, query_dsl={}",
scanRangeLocations.size(),
nodeProperties.getOrDefault("query_dsl", "N/A"));
}
}
private TScanRangeLocations convertToThrift(ConnectorScanRange range) throws UserException {
Map<String, String> props = range.getProperties();
TScanRangeLocations locations = new TScanRangeLocations();
// Assign BE backends
List<String> hosts = range.getHosts();
FederationBackendPolicy bePolicy = new FederationBackendPolicy();
bePolicy.init(hosts);
int numBackends = bePolicy.numBackends();
for (int i = 0; i < numBackends; i++) {
TScanRangeLocation location = new TScanRangeLocation();
Backend be = bePolicy.getNextBe();
location.setBackendId(be.getId());
location.setServer(new TNetworkAddress(be.getHost(), be.getBePort()));
locations.addToLocations(location);
}
// Build TEsScanRange
TEsScanRange esScanRange = new TEsScanRange();
// ES hosts
String hostsStr = props.getOrDefault("es.hosts", "");
List<TNetworkAddress> esHosts = new ArrayList<>();
for (String hostPort : hostsStr.split(",")) {
String trimmed = hostPort.trim();
if (trimmed.isEmpty()) {
continue;
}
// Strip scheme prefix (http:// or https://) if present
String hostAndPort = trimmed;
int schemeEnd = hostAndPort.indexOf("://");
if (schemeEnd >= 0) {
hostAndPort = hostAndPort.substring(schemeEnd + 3);
}
int colonIdx = hostAndPort.lastIndexOf(':');
String host;
int port;
if (colonIdx > 0) {
host = hostAndPort.substring(0, colonIdx);
port = Integer.parseInt(hostAndPort.substring(colonIdx + 1));
} else {
host = hostAndPort;
port = 9200;
}
esHosts.add(new TNetworkAddress(host, port));
}
esScanRange.setEsHosts(esHosts);
// Index
String index = props.getOrDefault("es.index", "");
esScanRange.setIndex(index);
// Type
String type = props.getOrDefault("es.type", null);
if (type != null) {
esScanRange.setType(type);
}
// Shard ID
int shardId = Integer.parseInt(props.getOrDefault("es.shard_id", "-1"));
esScanRange.setShardId(shardId);
TScanRange scanRange = new TScanRange();
scanRange.setEsScanRange(esScanRange);
locations.setScanRange(scanRange);
return locations;
}
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
Map<String, String> esProperties = new HashMap<>(nodeProperties);
TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
// Deserialize docvalue_context and fields_context from JSON flat properties.
// The ES connector serializes these Map<String,String> as JSON strings
// into the generic flat properties to avoid ES-specific API on the SPI.
String docvalueJson = esProperties.remove("docvalue_context_json");
if (docvalueJson != null && !docvalueJson.isEmpty()) {
try {
Map<String, String> docvalueContext = OBJECT_MAPPER.readValue(
docvalueJson, MAP_TYPE_REF);
esScanNode.setDocvalueContext(docvalueContext);
} catch (Exception e) {
LOG.warn("Failed to deserialize docvalue_context_json", e);
}
}
String fieldsJson = esProperties.remove("fields_context_json");
if (fieldsJson != null && !fieldsJson.isEmpty()) {
try {
Map<String, String> fieldsContext = OBJECT_MAPPER.readValue(
fieldsJson, MAP_TYPE_REF);
esScanNode.setFieldsContext(fieldsContext);
} catch (Exception e) {
LOG.warn("Failed to deserialize fields_context_json", e);
}
}
esScanNode.setProperties(esProperties);
msg.es_scan_node = esScanNode;
super.toThrift(msg);
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
String tableName = nodeProperties.getOrDefault("_table_name",
desc != null && desc.getTable() != null ? desc.getTable().getName() : "PluginDrivenES");
output.append(prefix).append("TABLE: ").append(tableName).append("\n");
if (detailLevel == TExplainLevel.BRIEF) {
return output.toString();
}
String sortColumn = nodeProperties.get("sort_column");
if (sortColumn != null) {
output.append(prefix).append("SORT COLUMN: ").append(sortColumn).append("\n");
}
if (!conjuncts.isEmpty()) {
output.append(prefix).append("LOCAL_PREDICATES: ")
.append(getExplainString(conjuncts)).append("\n");
}
String queryDsl = nodeProperties.getOrDefault("query_dsl", "N/A");
output.append(prefix).append("REMOTE_PREDICATES: ")
.append(queryDsl).append("\n");
String indexName = nodeProperties.get("_es_index");
String typeName = nodeProperties.get("_es_type");
if (indexName != null) {
output.append(prefix).append(String.format("ES index/type: %s/%s",
indexName, typeName != null ? typeName : "_doc")).append("\n");
}
String docValueScan = nodeProperties.get("doc_values_mode");
if (docValueScan != null) {
output.append(prefix).append("DOC_VALUE_SCAN: ").append(docValueScan).append("\n");
}
return output.toString();
}
private List<ConnectorColumnHandle> buildColumnHandles() {
ArrayList<SlotDescriptor> slots = desc.getSlots();
if (slots == null || slots.isEmpty()) {
return Collections.emptyList();
}
List<ConnectorColumnHandle> handles = new ArrayList<>(slots.size());
for (SlotDescriptor slot : slots) {
handles.add(new NamedColumnHandle(slot.getColumn().getName()));
}
return handles;
}
private Optional<ConnectorExpression> buildRemainingFilter() {
if (conjuncts == null || conjuncts.isEmpty()) {
return Optional.empty();
}
return Optional.of(ExprToConnectorExpressionConverter.convertConjuncts(conjuncts));
}
/**
* Prune conjuncts that were successfully pushed down to ES.
*
* <p>The connector returns a comma-separated list of indices for conjuncts
* that could NOT be pushed. We keep only those and remove the rest,
* matching the old EsScanNode behavior.</p>
*/
private void pruneConjuncts(Map<String, String> props) {
if (conjuncts == null || conjuncts.isEmpty()) {
return;
}
String notPushedStr = props.get("_not_pushed_conjunct_indices");
if (notPushedStr == null) {
// No info → all conjuncts were pushed
conjuncts.clear();
return;
}
Set<Integer> notPushedSet = new HashSet<>();
for (String idx : notPushedStr.split(",")) {
notPushedSet.add(Integer.parseInt(idx.trim()));
}
List<Expr> remaining = new ArrayList<>();
for (int i = 0; i < conjuncts.size(); i++) {
if (notPushedSet.contains(i)) {
remaining.add(conjuncts.get(i));
}
}
conjuncts.clear();
conjuncts.addAll(remaining);
}
}