EsScanNode.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.es.source;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotDescriptor;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.EsResource;
import org.apache.doris.catalog.EsTable;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalScanNode;
import org.apache.doris.datasource.FederationBackendPolicy;
import org.apache.doris.datasource.es.EsExternalTable;
import org.apache.doris.datasource.es.EsShardPartitions;
import org.apache.doris.datasource.es.EsShardRouting;
import org.apache.doris.datasource.es.EsTablePartitions;
import org.apache.doris.datasource.es.QueryBuilders;
import org.apache.doris.datasource.es.QueryBuilders.BoolQueryBuilder;
import org.apache.doris.datasource.es.QueryBuilders.BuilderOptions;
import org.apache.doris.datasource.es.QueryBuilders.QueryBuilder;
import org.apache.doris.planner.PartitionPruner;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.RangePartitionPrunerV2;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TEsScanNode;
import org.apache.doris.thrift.TEsScanRange;
import org.apache.doris.thrift.TExplainLevel;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* ScanNode for Elasticsearch.
**/
public class EsScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(EsScanNode.class);
private final EsTablePartitions esTablePartitions;
private final EsTable table;
private QueryBuilder queryBuilder;
private boolean isFinalized = false;
public EsScanNode(PlanNodeId id, TupleDescriptor desc) {
this(id, desc, false);
}
/**
* For multicatalog es.
**/
public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean esExternalTable) {
super(id, desc, "EsScanNode", StatisticalType.ES_SCAN_NODE, false);
if (esExternalTable) {
EsExternalTable externalTable = (EsExternalTable) (desc.getTable());
table = externalTable.getEsTable();
} else {
table = (EsTable) (desc.getTable());
}
esTablePartitions = table.getEsTablePartitions();
}
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
buildQuery();
}
@Override
public void init() throws UserException {
super.init();
buildQuery();
}
@Override
public void finalize(Analyzer analyzer) throws UserException {
buildQuery();
doFinalize();
}
@Override
public void finalizeForNereids() throws UserException {
buildQuery();
doFinalize();
}
private void doFinalize() throws UserException {
if (isFinalized) {
return;
}
createScanRangeLocations();
isFinalized = true;
}
@Override
protected void createScanRangeLocations() throws UserException {
scanRangeLocations = getShardLocations();
}
/**
* return whether can use the doc_values scan
* 0 and 1 are returned to facilitate Doris BE processing
*
* @param desc the fields needs to read from ES
* @param docValueContext the mapping for docvalues fields from origin field to doc_value fields
*/
private int useDocValueScan(TupleDescriptor desc, Map<String, String> docValueContext) {
ArrayList<SlotDescriptor> slotDescriptors = desc.getSlots();
List<String> selectedFields = new ArrayList<>(slotDescriptors.size());
for (SlotDescriptor slotDescriptor : slotDescriptors) {
selectedFields.add(slotDescriptor.getColumn().getName());
}
if (selectedFields.size() > table.getMaxDocValueFields()) {
return 0;
}
Set<String> docValueFields = docValueContext.keySet();
boolean useDocValue = true;
for (String selectedField : selectedFields) {
if (!docValueFields.contains(selectedField)) {
useDocValue = false;
break;
}
}
return useDocValue ? 1 : 0;
}
@SneakyThrows
@Override
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
Map<String, String> properties = Maps.newHashMap();
if (table.getUserName() != null) {
properties.put(EsResource.USER, table.getUserName());
}
if (table.getPasswd() != null) {
properties.put(EsResource.PASSWORD, table.getPasswd());
}
properties.put(EsResource.HTTP_SSL_ENABLED, String.valueOf(table.isHttpSslEnabled()));
TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
if (table.isEnableDocValueScan()) {
esScanNode.setDocvalueContext(table.docValueContext());
properties.put(EsResource.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
}
properties.put(EsResource.QUERY_DSL, queryBuilder.toJson());
if (table.isEnableKeywordSniff() && table.fieldsContext().size() > 0) {
esScanNode.setFieldsContext(table.fieldsContext());
}
esScanNode.setProperties(properties);
msg.es_scan_node = esScanNode;
super.toThrift(msg);
}
// only do partition(es index level) prune
private List<TScanRangeLocations> getShardLocations() throws UserException {
// has to get partition info from es state not from table because the partition
// info is generated from es cluster state dynamically
if (esTablePartitions == null) {
if (table.getLastMetaDataSyncException() != null) {
throw new UserException("fetch es table [" + table.getName() + "] metadata failure: "
+ table.getLastMetaDataSyncException().getLocalizedMessage());
}
throw new UserException("EsTable metadata has not been synced, Try it later");
}
Collection<Long> partitionIds = partitionPrune(esTablePartitions.getPartitionInfo());
List<EsShardPartitions> selectedIndex = Lists.newArrayList();
ArrayList<String> unPartitionedIndices = Lists.newArrayList();
ArrayList<String> partitionedIndices = Lists.newArrayList();
for (EsShardPartitions esShardPartitions : esTablePartitions.getUnPartitionedIndexStates().values()) {
selectedIndex.add(esShardPartitions);
unPartitionedIndices.add(esShardPartitions.getIndexName());
}
if (partitionIds != null) {
for (Long partitionId : partitionIds) {
EsShardPartitions indexState = esTablePartitions.getEsShardPartitions(partitionId);
selectedIndex.add(indexState);
partitionedIndices.add(indexState.getIndexName());
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("partition prune finished, unpartitioned index [{}], " + "partitioned index [{}]",
String.join(",", unPartitionedIndices), String.join(",", partitionedIndices));
}
List<TScanRangeLocations> result = Lists.newArrayList();
boolean enableESParallelScroll = isEnableESParallelScroll();
for (EsShardPartitions indexState : selectedIndex) {
// When disabling parallel scroll, only use the first shard routing.
// Because we only need plan a single scan range.
List<List<EsShardRouting>> shardRoutings = enableESParallelScroll
? new ArrayList<>(indexState.getShardRoutings().values()) :
Collections.singletonList(indexState.getShardRoutings().get(0));
for (List<EsShardRouting> shardRouting : shardRoutings) {
// get backends
List<TNetworkAddress> shardAllocations = new ArrayList<>();
List<String> preLocations = new ArrayList<>();
for (EsShardRouting item : shardRouting) {
shardAllocations.add(item.getHttpAddress());
preLocations.add(item.getHttpAddress().getHostname());
}
FederationBackendPolicy backendPolicy = new FederationBackendPolicy();
backendPolicy.init(preLocations);
TScanRangeLocations locations = new TScanRangeLocations();
// When disabling parallel scroll, only use the first backend.
// Because we only need plan a single query to one backend.
int numBackends = enableESParallelScroll ? backendPolicy.numBackends() : 1;
for (int i = 0; i < numBackends; ++i) {
TScanRangeLocation location = new TScanRangeLocation();
Backend be = backendPolicy.getNextBe();
location.setBackendId(be.getId());
location.setServer(new TNetworkAddress(be.getHost(), be.getBePort()));
locations.addToLocations(location);
}
// Generate on es scan range
TEsScanRange esScanRange = new TEsScanRange();
esScanRange.setEsHosts(shardAllocations);
// When disabling parallel scroll, use the index state's index name to prevent the index aliases from
// being expanded.
// eg: index alias `log-20240501` may point to multiple indices,
// such as `log-20240501-1`/`log-20240501-2`.
// When we plan a single query, we should use the index alias instead of the real indices names.
esScanRange.setIndex(
enableESParallelScroll ? shardRouting.get(0).getIndexName() : indexState.getIndexName());
if (table.getType() != null) {
esScanRange.setType(table.getMappingType());
}
// When disabling parallel scroll, set shard id to -1 to disable shard preference in query option.
esScanRange.setShardId(enableESParallelScroll ? shardRouting.get(0).getShardId() : -1);
// Scan range
TScanRange scanRange = new TScanRange();
scanRange.setEsScanRange(esScanRange);
locations.setScanRange(scanRange);
// result
result.add(locations);
}
}
if (LOG.isDebugEnabled()) {
StringBuilder scratchBuilder = new StringBuilder();
for (TScanRangeLocations scanRangeLocations : result) {
scratchBuilder.append(scanRangeLocations.toString());
scratchBuilder.append(" ");
}
if (LOG.isDebugEnabled()) {
LOG.debug("ES table {} scan ranges {}", table.getName(), scratchBuilder.toString());
}
}
return result;
}
private boolean isEnableESParallelScroll() {
ConnectContext connectContext = ConnectContext.get();
return connectContext != null && connectContext.getSessionVariable().isEnableESParallelScroll();
}
/**
* if the index name is an alias or index pattern, then the es table is related
* with one or more indices some indices could be pruned by using partition info
* in index settings currently only support range partition setting
*
* @param partitionInfo partitionInfo
*/
private Collection<Long> partitionPrune(PartitionInfo partitionInfo) throws AnalysisException {
if (partitionInfo == null) {
return null;
}
PartitionPruner partitionPruner;
switch (partitionInfo.getType()) {
case RANGE: {
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
Map<Long, PartitionItem> keyRangeById = rangePartitionInfo.getIdToItem(false);
partitionPruner = new RangePartitionPrunerV2(keyRangeById, rangePartitionInfo.getPartitionColumns(),
columnNameToRange);
return partitionPruner.prune();
}
case UNPARTITIONED: {
return null;
}
default: {
return null;
}
}
}
@Override
public String getNodeExplainString(String prefix, TExplainLevel detailLevel) {
StringBuilder output = new StringBuilder();
output.append(prefix).append("TABLE: ").append(table.getName()).append("\n");
if (detailLevel == TExplainLevel.BRIEF) {
return output.toString();
}
if (null != sortColumn) {
output.append(prefix).append("SORT COLUMN: ").append(sortColumn).append("\n");
}
if (!conjuncts.isEmpty()) {
output.append(prefix).append("LOCAL_PREDICATES: ").append(getExplainString(conjuncts)).append("\n");
}
output.append(prefix).append("REMOTE_PREDICATES: ").append(queryBuilder.toJson()).append("\n");
String indexName = table.getIndexName();
String typeName = table.getMappingType();
output.append(prefix).append(String.format("ES index/type: %s/%s", indexName, typeName)).append("\n");
if (useTopnFilter()) {
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
.map(node -> node.getId().asInt() + "").collect(Collectors.toList()));
output.append(prefix).append("TOPN OPT:").append(topnFilterSources).append("\n");
}
return output.toString();
}
private void buildQuery() throws UserException {
if (conjuncts.isEmpty()) {
queryBuilder = QueryBuilders.matchAllQuery();
} else {
// col -> col.keyword
Map<String, String> fieldsContext = new HashMap<>();
if (table.isEnableKeywordSniff() && !table.fieldsContext().isEmpty()) {
fieldsContext = table.fieldsContext();
}
boolean hasFilter = false;
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
List<Expr> notPushDownList = new ArrayList<>();
if (table.getColumn2typeMap() == null) {
table.genColumnsFromEs();
}
for (Expr expr : conjuncts) {
QueryBuilder queryBuilder = QueryBuilders.toEsDsl(expr, notPushDownList, fieldsContext,
BuilderOptions.builder().likePushDown(table.isLikePushDown())
.needCompatDateFields(table.needCompatDateFields()).build(),
table.getColumn2typeMap());
if (queryBuilder != null) {
hasFilter = true;
boolQueryBuilder.must(queryBuilder);
}
}
if (!hasFilter) {
queryBuilder = QueryBuilders.matchAllQuery();
} else {
queryBuilder = boolQueryBuilder;
}
conjuncts.removeIf(expr -> !notPushDownList.contains(expr));
}
}
@Override
public StatsDelta genStatsDelta() throws AnalysisException {
return new StatsDelta(Env.getCurrentEnv().getCurrentCatalog().getId(),
Env.getCurrentEnv().getCurrentCatalog().getDbOrAnalysisException(table.getQualifiedDbName()).getId(),
table.getId(), -1L);
}
}