EsTable.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.datasource.es.EsMetaStateTracker;
import org.apache.doris.datasource.es.EsRestClient;
import org.apache.doris.datasource.es.EsTablePartitions;
import org.apache.doris.datasource.es.EsUtil;
import org.apache.doris.persist.gson.GsonPostProcessable;
import org.apache.doris.thrift.TEsTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Elasticsearch table.
**/
@Getter
@Setter
public class EsTable extends Table implements GsonPostProcessable {
// reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/doc-values.html
// https://www.elastic.co/guide/en/elasticsearch/reference/current/text.html
public static final Set<String> DEFAULT_DOCVALUE_DISABLED_FIELDS =
new HashSet<>(Arrays.asList("text", "annotated_text", "match_only_text"));
private static final Logger LOG = LogManager.getLogger(EsTable.class);
// Solr doc_values vs stored_fields performance-smackdown indicate:
// It is possible to notice that retrieving an high number of fields leads
// to a sensible worsening of performance if DocValues are used.
// Instead, the (almost) surprising thing is that, by returning less than 20 fields,
// DocValues performs better than stored fields and the difference
// gets little as the number of fields returned increases.
// Asking for 9 DocValues fields and 1 stored field takes an average query time is 6.86
// (more than returning 10 stored fields)
// Here we have a slightly conservative value of 20, but at the same time
// we also provide configurable parameters for expert-using
// @see `MAX_DOCVALUE_FIELDS`
@Getter
private static final int DEFAULT_MAX_DOCVALUE_FIELDS = 20;
private String hosts;
private String[] seeds;
private String userName = "";
private String passwd = "";
// index name can be specific indexăwildcard matched or alias.
private String indexName;
// which type used for `indexName`
private String mappingType = null;
// only save the partition definition, save the partition key,
// partition list is got from es cluster dynamically and is saved in esTableState
@SerializedName("pi")
private PartitionInfo partitionInfo;
private EsTablePartitions esTablePartitions;
// Whether to enable docvalues scan optimization for fetching fields more fast, default to true
private boolean enableDocValueScan = Boolean.parseBoolean(EsResource.DOC_VALUE_SCAN_DEFAULT_VALUE);
// Whether to enable sniffing keyword for filtering more reasonable, default to true
private boolean enableKeywordSniff = Boolean.parseBoolean(EsResource.KEYWORD_SNIFF_DEFAULT_VALUE);
// if the number of fields which value extracted from `doc_value` exceeding this max limitation
// would downgrade to extract value from `stored_fields`
private int maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
// Whether to enable the discovery of es nodes, You can disable it if you are in network isolation
private boolean nodesDiscovery = Boolean.parseBoolean(EsResource.NODES_DISCOVERY_DEFAULT_VALUE);
// Whether to use ssl call es, be and fe access through trust
private boolean httpSslEnabled = Boolean.parseBoolean(EsResource.HTTP_SSL_ENABLED_DEFAULT_VALUE);
// Whether pushdown like expr, like will trans to wildcard query, consumes too many es cpu resources
private boolean likePushDown = Boolean.parseBoolean(EsResource.LIKE_PUSH_DOWN_DEFAULT_VALUE);
// Whether to include hidden index, default to false
private boolean includeHiddenIndex = Boolean.parseBoolean(EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE);
// tableContext is used for being convenient to persist some configuration parameters uniformly
@SerializedName("tc")
private Map<String, String> tableContext = new HashMap<>();
// record the latest and recently exception when sync ES table metadata (mapping, shard location)
private Throwable lastMetaDataSyncException = null;
// connect es.
private EsRestClient client = null;
// Periodically pull es metadata
private EsMetaStateTracker esMetaStateTracker;
// column name -> elasticsearch field data type
private Map<String, String> column2typeMap = new HashMap<>();
public EsTable() {
super(TableType.ELASTICSEARCH);
}
/**
* Create table for user.
**/
public EsTable(String name, Map<String, String> properties) throws DdlException {
super(TableType.ELASTICSEARCH);
this.name = name;
validate(properties);
this.client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
/**
* Create table for test.
**/
public EsTable(long id, String name, List<Column> schema, Map<String, String> properties,
PartitionInfo partitionInfo) throws DdlException {
super(id, name, TableType.ELASTICSEARCH, schema);
this.partitionInfo = partitionInfo;
validate(properties);
this.client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
public EsTable(long id, String name, List<Column> schema, TableType tableType) {
super(id, name, tableType, schema);
}
public Map<String, String> fieldsContext() throws UserException {
initEsMetaStateTracker();
return esMetaStateTracker.searchContext().fetchFieldsContext();
}
public Map<String, String> docValueContext() throws UserException {
initEsMetaStateTracker();
return esMetaStateTracker.searchContext().docValueFieldsContext();
}
public List<String> needCompatDateFields() throws UserException {
initEsMetaStateTracker();
return esMetaStateTracker.searchContext().needCompatDateFields();
}
private void initEsMetaStateTracker() {
if (esMetaStateTracker == null) {
esMetaStateTracker = new EsMetaStateTracker(client, this);
}
}
private void validate(Map<String, String> properties) throws DdlException {
EsResource.valid(properties, false);
if (properties.containsKey(EsResource.USER)) {
userName = properties.get(EsResource.USER).trim();
}
if (properties.containsKey(EsResource.PASSWORD)) {
passwd = properties.get(EsResource.PASSWORD).trim();
}
indexName = properties.get(EsResource.INDEX).trim();
// enable doc value scan for Elasticsearch
if (properties.containsKey(EsResource.DOC_VALUE_SCAN)) {
enableDocValueScan = EsUtil.getBoolean(properties, EsResource.DOC_VALUE_SCAN);
}
if (properties.containsKey(EsResource.KEYWORD_SNIFF)) {
enableKeywordSniff = EsUtil.getBoolean(properties, EsResource.KEYWORD_SNIFF);
}
if (properties.containsKey(EsResource.NODES_DISCOVERY)) {
nodesDiscovery = EsUtil.getBoolean(properties, EsResource.NODES_DISCOVERY);
}
if (properties.containsKey(EsResource.HTTP_SSL_ENABLED)) {
httpSslEnabled = EsUtil.getBoolean(properties, EsResource.HTTP_SSL_ENABLED);
}
if (properties.containsKey(EsResource.LIKE_PUSH_DOWN)) {
likePushDown = EsUtil.getBoolean(properties, EsResource.LIKE_PUSH_DOWN);
}
if (StringUtils.isNotBlank(properties.get(EsResource.TYPE))) {
mappingType = properties.get(EsResource.TYPE).trim();
}
if (properties.containsKey(EsResource.MAX_DOCVALUE_FIELDS)) {
try {
maxDocValueFields = Integer.parseInt(properties.get(EsResource.MAX_DOCVALUE_FIELDS).trim());
if (maxDocValueFields < 0) {
maxDocValueFields = 0;
}
} catch (Exception e) {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}
hosts = properties.get(EsResource.HOSTS).trim();
seeds = hosts.split(",");
// parse httpSslEnabled before use it here.
EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
if (properties.containsKey(EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE)) {
includeHiddenIndex = EsUtil.getBoolean(properties, EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE);
}
tableContext.put("hosts", hosts);
tableContext.put("userName", userName);
tableContext.put("passwd", passwd);
tableContext.put("indexName", indexName);
if (mappingType != null) {
tableContext.put("mappingType", mappingType);
}
tableContext.put("enableDocValueScan", String.valueOf(enableDocValueScan));
tableContext.put("enableKeywordSniff", String.valueOf(enableKeywordSniff));
tableContext.put("maxDocValueFields", String.valueOf(maxDocValueFields));
tableContext.put(EsResource.NODES_DISCOVERY, String.valueOf(nodesDiscovery));
tableContext.put(EsResource.HTTP_SSL_ENABLED, String.valueOf(httpSslEnabled));
tableContext.put(EsResource.LIKE_PUSH_DOWN, String.valueOf(likePushDown));
tableContext.put(EsResource.INCLUDE_HIDDEN_INDEX, String.valueOf(includeHiddenIndex));
}
@Override
public TTableDescriptor toThrift() {
TEsTable tEsTable = new TEsTable();
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.ES_TABLE, fullSchema.size(), 0,
getName(), "");
tTableDescriptor.setEsTable(tEsTable);
return tTableDescriptor;
}
@Override
public String getSignature(int signatureVersion) {
StringBuilder sb = new StringBuilder(signatureVersion);
sb.append(name);
sb.append(type.name());
if (tableContext.isEmpty()) {
sb.append(hosts);
sb.append(userName);
sb.append(passwd);
sb.append(indexName);
if (mappingType != null) {
sb.append(mappingType);
}
} else {
for (Map.Entry<String, String> entry : tableContext.entrySet()) {
sb.append(entry.getKey());
sb.append(entry.getValue());
}
}
String md5 = DigestUtils.md5Hex(sb.toString());
if (LOG.isDebugEnabled()) {
LOG.debug("get signature of es table {}: {}. signature string: {}", name, md5, sb.toString());
}
return md5;
}
@Deprecated
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
int size = in.readInt();
for (int i = 0; i < size; ++i) {
String key = Text.readString(in);
String value = Text.readString(in);
tableContext.put(key, value);
}
hosts = tableContext.get("hosts");
seeds = hosts.split(",");
userName = tableContext.get("userName");
passwd = tableContext.get("passwd");
indexName = tableContext.get("indexName");
mappingType = tableContext.get("mappingType");
enableDocValueScan = Boolean.parseBoolean(
tableContext.getOrDefault("enableDocValueScan", EsResource.DOC_VALUE_SCAN_DEFAULT_VALUE));
enableKeywordSniff = Boolean.parseBoolean(
tableContext.getOrDefault("enableKeywordSniff", EsResource.KEYWORD_SNIFF_DEFAULT_VALUE));
if (tableContext.containsKey("maxDocValueFields")) {
try {
maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields"));
} catch (Exception e) {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}
nodesDiscovery = Boolean.parseBoolean(
tableContext.getOrDefault(EsResource.NODES_DISCOVERY, EsResource.NODES_DISCOVERY_DEFAULT_VALUE));
httpSslEnabled = Boolean.parseBoolean(
tableContext.getOrDefault(EsResource.HTTP_SSL_ENABLED, EsResource.HTTP_SSL_ENABLED_DEFAULT_VALUE));
likePushDown = Boolean.parseBoolean(
tableContext.getOrDefault(EsResource.LIKE_PUSH_DOWN, EsResource.LIKE_PUSH_DOWN_DEFAULT_VALUE));
includeHiddenIndex = Boolean.parseBoolean(tableContext.getOrDefault(EsResource.INCLUDE_HIDDEN_INDEX,
EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE));
PartitionType partType = PartitionType.valueOf(Text.readString(in));
if (partType == PartitionType.UNPARTITIONED) {
partitionInfo = SinglePartitionInfo.read(in);
} else if (partType == PartitionType.RANGE) {
partitionInfo = RangePartitionInfo.read(in);
} else {
throw new IOException("invalid partition type: " + partType);
}
// parse httpSslEnabled before use it here.
EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
@Override
public void gsonPostProcess() throws IOException {
hosts = tableContext.get("hosts");
seeds = hosts.split(",");
userName = tableContext.get("userName");
passwd = tableContext.get("passwd");
indexName = tableContext.get("indexName");
mappingType = tableContext.get("mappingType");
enableDocValueScan = Boolean.parseBoolean(
tableContext.getOrDefault("enableDocValueScan", EsResource.DOC_VALUE_SCAN_DEFAULT_VALUE));
enableKeywordSniff = Boolean.parseBoolean(
tableContext.getOrDefault("enableKeywordSniff", EsResource.KEYWORD_SNIFF_DEFAULT_VALUE));
if (tableContext.containsKey("maxDocValueFields")) {
try {
maxDocValueFields = Integer.parseInt(tableContext.get("maxDocValueFields"));
} catch (Exception e) {
maxDocValueFields = DEFAULT_MAX_DOCVALUE_FIELDS;
}
}
nodesDiscovery = Boolean.parseBoolean(
tableContext.getOrDefault(EsResource.NODES_DISCOVERY, EsResource.NODES_DISCOVERY_DEFAULT_VALUE));
httpSslEnabled = Boolean.parseBoolean(
tableContext.getOrDefault(EsResource.HTTP_SSL_ENABLED, EsResource.HTTP_SSL_ENABLED_DEFAULT_VALUE));
likePushDown = Boolean.parseBoolean(
tableContext.getOrDefault(EsResource.LIKE_PUSH_DOWN, EsResource.LIKE_PUSH_DOWN_DEFAULT_VALUE));
includeHiddenIndex = Boolean.parseBoolean(tableContext.getOrDefault(EsResource.INCLUDE_HIDDEN_INDEX,
EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE));
// parse httpSslEnabled before use it here.
EsResource.fillUrlsWithSchema(seeds, httpSslEnabled);
client = new EsRestClient(seeds, userName, passwd, httpSslEnabled);
}
/**
* Sync es index meta from remote ES Cluster.
*/
public void syncTableMetaData() {
initEsMetaStateTracker();
try {
esMetaStateTracker.run();
this.esTablePartitions = esMetaStateTracker.searchContext().tablePartitions();
} catch (Throwable e) {
LOG.warn(
"Exception happens when fetch index [{}] meta data from remote es cluster." + "table id: {}, err: ",
this.name, this.id, e);
this.esTablePartitions = null;
this.lastMetaDataSyncException = e;
}
}
public List<Column> genColumnsFromEs() {
return EsUtil.genColumnsFromEs(client, indexName, mappingType, false, column2typeMap);
}
}