EsUtil.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.es;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.PartitionDesc;
import org.apache.doris.analysis.RangePartitionDesc;
import org.apache.doris.catalog.ArrayType;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.ScalarType;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.util.JsonUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.StreamSupport;
/**
* Util for ES, some static method.
**/
public class EsUtil {
private static final Logger LOG = LogManager.getLogger(EsUtil.class);
/**
* Analyze partition and distributionDesc.
**/
public static void analyzePartitionAndDistributionDesc(PartitionDesc partitionDesc,
DistributionDesc distributionDesc) throws AnalysisException {
if (partitionDesc == null && distributionDesc == null) {
return;
}
if (partitionDesc != null) {
if (!(partitionDesc instanceof RangePartitionDesc)) {
throw new AnalysisException("Elasticsearch table only permit range partition");
}
RangePartitionDesc rangePartitionDesc = (RangePartitionDesc) partitionDesc;
analyzePartitionDesc(rangePartitionDesc);
}
if (distributionDesc != null) {
throw new AnalysisException("could not support distribution clause");
}
}
private static void analyzePartitionDesc(RangePartitionDesc partDesc) throws AnalysisException {
if (partDesc.getPartitionColNames() == null || partDesc.getPartitionColNames().isEmpty()) {
throw new AnalysisException("No partition columns.");
}
if (partDesc.getPartitionColNames().size() > 1) {
throw new AnalysisException("Elasticsearch table's partition column could only be a single column");
}
}
/**
* Get boolean throw DdlException when parse error
**/
public static boolean getBoolean(Map<String, String> properties, String name) throws DdlException {
String property = properties.get(name).trim();
try {
return Boolean.parseBoolean(property);
} catch (Exception e) {
throw new DdlException(String.format("fail to parse %s, %s = %s, `%s` should be like 'true' or 'false', "
+ "value should be double quotation marks", name, name, property, name));
}
}
@VisibleForTesting
public static ObjectNode getMapping(String indexMapping) {
ObjectNode jsonNodes = JsonUtil.parseObject(indexMapping);
// If the indexName use alias takes the first mapping
return (ObjectNode) jsonNodes.iterator().next().get("mappings");
}
@VisibleForTesting
public static ObjectNode getRootSchema(ObjectNode mappings, String mappingType, List<String> arrayFields) {
// Type is null in the following three cases
// 1. Equal 6.8.x and after
// 2. Multi-catalog auto infer
// 3. Equal 6.8.x and before user not passed
if (mappingType == null) {
// remove dynamic templates, for ES 7.x and 8.x
checkNonPropertiesFields(mappings, arrayFields);
String firstType = mappings.fieldNames().next();
//The first parameter may not be properties, so we need to first determine whether it is 7.x or above.
if (StreamSupport.stream(Spliterators
.spliteratorUnknownSize(mappings.fieldNames(), Spliterator.ORDERED), false)
.anyMatch(s -> s.contains("properties"))) {
// Equal 7.x and after
return mappings;
} else {
ObjectNode firstData = (ObjectNode) mappings.get(firstType);
// check for ES 6.x and before
checkNonPropertiesFields(firstData, arrayFields);
return firstData;
}
} else {
if (mappings.has(mappingType)) {
ObjectNode jsonData = (ObjectNode) mappings.get(mappingType);
// check for ES 6.x and before
checkNonPropertiesFields(jsonData, arrayFields);
return jsonData;
}
// Compatible type error
return getRootSchema(mappings, null, arrayFields);
}
}
/**
* Check non properties fields
*
* @param mappings
*/
private static void checkNonPropertiesFields(ObjectNode mappings, List<String> arrayFields) {
// remove `_meta` field and parse array_fields
JsonNode metaNode = mappings.remove("_meta");
if (metaNode != null) {
JsonNode dorisMeta = metaNode.get("doris");
if (dorisMeta != null) {
JsonNode arrayNode = dorisMeta.get("array_fields");
if (arrayNode != null) {
for (JsonNode jsonNode : arrayNode) {
arrayFields.add(jsonNode.asText());
}
}
}
}
// remove `dynamic_templates` field
mappings.remove("dynamic_templates");
// remove `dynamic` field
mappings.remove("dynamic");
// remove `_default` field, we do not parse `_default_` mapping, only explicit mapping.
// `_default` _mapping type is deprecated in 7.0 and removed in 8.0
// https://www.elastic.co/guide/en/elasticsearch/reference/7.17/removal-of-types.html
mappings.remove("_default_");
// check explicit mapping
if (mappings.isEmpty()) {
throw new DorisEsException("Do not support index without explicit mapping.");
}
}
/**
* Get mapping properties transform to ObjectNode.
**/
public static ObjectNode getMappingProps(String sourceIndex, String indexMapping, String mappingType) {
ObjectNode mappings = getMapping(indexMapping);
ObjectNode rootSchema = getRootSchema(mappings, mappingType, new ArrayList<>());
ObjectNode properties = (ObjectNode) rootSchema.get("properties");
if (properties == null) {
throw new DorisEsException(
"index[" + sourceIndex + "] type[" + mappingType + "] mapping not found for the ES Cluster");
}
return properties;
}
/**
* Generate columns from ES Cluster.
* Add mappingEsId config in es external catalog.
**/
public static List<Column> genColumnsFromEs(EsRestClient client, String indexName, String mappingType,
boolean mappingEsId, Map<String, String> column2typeMap) {
String mapping = client.getMapping(indexName);
ObjectNode mappings = getMapping(mapping);
// Get array_fields while removing _meta property.
List<String> arrayFields = new ArrayList<>();
ObjectNode rootSchema = getRootSchema(mappings, mappingType, arrayFields);
return genColumnsFromEs(indexName, mappingType, rootSchema, mappingEsId, arrayFields, column2typeMap);
}
@VisibleForTesting
public static List<Column> genColumnsFromEs(String indexName, String mappingType, ObjectNode rootSchema,
boolean mappingEsId, List<String> arrayFields, Map<String, String> column2typeMap) {
List<Column> columns = new ArrayList<>();
if (mappingEsId) {
Column column = new Column();
column.setName("_id");
column.setIsKey(true);
column.setType(ScalarType.createVarcharType(255));
column.setIsAllowNull(true);
column.setUniqueId(-1);
columns.add(column);
}
ObjectNode mappingProps = (ObjectNode) rootSchema.get("properties");
if (mappingProps == null) {
throw new DorisEsException(
"index[" + indexName + "] type[" + mappingType + "] mapping not found for the ES Cluster");
}
Iterator<String> iterator = mappingProps.fieldNames();
while (iterator.hasNext()) {
String fieldName = iterator.next();
ObjectNode fieldValue = (ObjectNode) mappingProps.get(fieldName);
Column column = parseEsField(fieldName, replaceFieldAlias(mappingProps, fieldValue), arrayFields,
column2typeMap);
columns.add(column);
}
return columns;
}
private static ObjectNode replaceFieldAlias(ObjectNode mappingProps, ObjectNode fieldValue) {
if (!fieldValue.has("type")) {
return fieldValue;
}
String typeStr = fieldValue.get("type").asText();
if ("alias".equals(typeStr)) {
String path = fieldValue.get("path").asText();
if ("_id".equals(path)) {
// _id is not in mappingProps, use keyword type.
fieldValue.put("type", "keyword");
} else {
if (mappingProps.has(path)) {
return (ObjectNode) mappingProps.get(path);
}
}
}
return fieldValue;
}
private static Column parseEsField(String fieldName, ObjectNode fieldValue, List<String> arrayFields,
Map<String, String> column2typeMap) {
Column column = new Column();
column.setName(fieldName);
column.setIsKey(true);
column.setIsAllowNull(true);
column.setUniqueId(-1);
Type type;
// Complex types are treating as String types for now.
if (fieldValue.has("type")) {
String typeStr = fieldValue.get("type").asText();
column.setComment("Elasticsearch type is " + typeStr);
column2typeMap.put(fieldName, typeStr);
// reference https://www.elastic.co/guide/en/elasticsearch/reference/8.3/sql-data-types.html
switch (typeStr) {
case "null":
type = Type.NULL;
break;
case "boolean":
type = Type.BOOLEAN;
break;
case "byte":
type = Type.TINYINT;
break;
case "short":
type = Type.SMALLINT;
break;
case "integer":
type = Type.INT;
break;
case "long":
type = Type.BIGINT;
break;
case "unsigned_long":
type = Type.LARGEINT;
break;
case "float":
case "half_float":
type = Type.FLOAT;
break;
case "double":
case "scaled_float":
type = Type.DOUBLE;
break;
case "date":
type = parseEsDateType(column, fieldValue);
break;
case "keyword":
case "text":
case "ip":
case "wildcard":
case "constant_keyword":
type = ScalarType.createStringType();
break;
case "nested":
type = Type.JSONB;
break;
default:
type = Type.UNSUPPORTED;
}
} else {
// When there is no explicit type in mapping, it indicates this type is an `object` in Elasticsearch.
// reference: https://www.elastic.co/guide/en/elasticsearch/reference/current/object.html
type = Type.JSONB;
column.setComment("Elasticsearch type is object");
column2typeMap.put(fieldName, "object");
}
if (arrayFields.contains(fieldName)) {
column.setType(ArrayType.create(type, true));
} else {
column.setType(type);
}
return column;
}
private static final List<String> ALLOW_DATE_FORMATS = Lists.newArrayList("yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd",
"epoch_millis");
/**
* Parse es date to doris type by format
**/
private static Type parseEsDateType(Column column, ObjectNode field) {
if (!field.has("format")) {
// default format
column.setComment("Elasticsearch type is date, no format");
return ScalarType.createDatetimeV2Type(0);
} else {
String originFormat = field.get("format").asText();
String[] formats = originFormat.split("\\|\\|");
boolean dateTimeFlag = false;
boolean dateFlag = false;
boolean bigIntFlag = false;
for (String format : formats) {
// pre-check format
String trimFormat = format.trim();
if (!ALLOW_DATE_FORMATS.contains(trimFormat)) {
column.setComment(
"Elasticsearch type is date, format is " + trimFormat + " not support, use String type");
return ScalarType.createStringType();
}
switch (trimFormat) {
case "yyyy-MM-dd HH:mm:ss":
dateTimeFlag = true;
break;
case "yyyy-MM-dd":
dateFlag = true;
break;
case "epoch_millis":
default:
bigIntFlag = true;
}
}
column.setComment("Elasticsearch type is date, format is " + originFormat);
if (dateTimeFlag) {
return ScalarType.createDatetimeV2Type(0);
}
if (dateFlag) {
return ScalarType.createDateV2Type();
}
if (bigIntFlag) {
return Type.BIGINT;
}
return ScalarType.createStringType();
}
}
}