MappingPhase.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.es;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.EsTable;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.commons.lang3.StringUtils;

import java.util.Iterator;

/**
 * Get index mapping from remote ES Cluster, and resolved `keyword` and `doc_values` field
 * Later we can use it to parse all relevant indexes
 */
public class MappingPhase implements SearchPhase {

    private EsRestClient client;

    // json response for `{index}/_mapping` API
    private String jsonMapping;

    public MappingPhase(EsRestClient client) {
        this.client = client;
    }

    @Override
    public void execute(SearchContext context) throws DorisEsException {
        jsonMapping = client.getMapping(context.sourceIndex());
    }

    @Override
    public void postProcess(SearchContext context) {
        resolveFields(context, jsonMapping);
    }

    /**
     * Parse the required field information from the json.
     *
     * @param searchContext the current associated column searchContext
     * @param indexMapping the return value of _mapping
     */
    public static void resolveFields(SearchContext searchContext, String indexMapping) throws DorisEsException {
        ObjectNode properties = EsUtil.getMappingProps(searchContext.sourceIndex(), indexMapping, searchContext.type());
        for (Column col : searchContext.columns()) {
            String colName = col.getName();
            // _id not exist mapping, but be can query it.
            if (!"_id".equals(colName)) {
                if (!properties.has(colName)) {
                    throw new DorisEsException(
                            "index[" + searchContext.sourceIndex() + "] mapping[" + indexMapping + "] not found "
                                    + "column " + colName + " for the ES Cluster");
                }
                ObjectNode fieldObject = (ObjectNode) properties.get(colName);
                if (!fieldObject.has("type")) {
                    continue;
                }
                String fieldType = fieldObject.get("type").asText();
                resolveDateFields(searchContext, fieldObject, colName, fieldType);
                resolveKeywordFields(searchContext, fieldObject, colName, fieldType);
                resolveDocValuesFields(searchContext, fieldObject, colName, fieldType);
            }
        }
    }

    private static void resolveDateFields(SearchContext searchContext, ObjectNode fieldObject, String colName,
            String fieldType) {
        // Compat use default/strict_date_optional_time format date type, need transform datetime to
        if ("date".equals(fieldType)) {
            if (!fieldObject.has("format") || "strict_date_optional_time".equals(fieldObject.get("format").asText())) {
                searchContext.needCompatDateFields().add(colName);
            }
        }
    }


    // get a field of keyword type in the fields
    private static void resolveKeywordFields(SearchContext searchContext, ObjectNode fieldObject, String colName,
            String fieldType) {
        // string-type field used keyword type to generate predicate
        // if text field type seen, we should use the `field` keyword type?
        if ("text".equals(fieldType)) {
            JsonNode fieldsObject = fieldObject.get("fields");
            if (fieldsObject != null) {
                Iterator<String> fieldNames = fieldsObject.fieldNames();
                while (fieldNames.hasNext()) {
                    String fieldName = fieldNames.next();
                    ObjectNode innerTypeObject = (ObjectNode) fieldsObject.get(fieldName);
                    // just for text type
                    if ("keyword".equals(innerTypeObject.get("type").asText())) {
                        searchContext.fetchFieldsContext().put(colName, colName + "." + fieldName);
                    }
                }
            }
        }
    }

    private static void resolveDocValuesFields(SearchContext searchContext, ObjectNode fieldObject, String colName,
            String fieldType) {
        String docValueField = null;
        if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(fieldType)) {
            JsonNode fieldsObject = fieldObject.get("fields");
            if (fieldsObject != null) {
                Iterator<String> fieldNames = fieldsObject.fieldNames();
                while (fieldNames.hasNext()) {
                    String fieldName = fieldNames.next();
                    ObjectNode innerTypeObject = (ObjectNode) fieldsObject.get(fieldName);
                    if (EsTable.DEFAULT_DOCVALUE_DISABLED_FIELDS.contains(innerTypeObject.get("type").asText())) {
                        continue;
                    }
                    if (innerTypeObject.has("doc_values")) {
                        boolean docValue = innerTypeObject.get("doc_values").asBoolean();
                        if (docValue) {
                            docValueField = colName;
                        }
                    } else if (innerTypeObject.has("ignore_above")) {
                        // reference:
                        // https://www.elastic.co/guide/en/elasticsearch/reference/current/keyword.html#keyword-params
                        // > ignore_above
                        // > Do not index any string longer than this value. Defaults to 2147483647 so that all values
                        // > would be accepted. Please however note that default dynamic mapping rules create a sub
                        // > keyword field that overrides this default by setting ignore_above: 256.
                        // this field has `ignore_above` param
                        // Strings longer than the ignore_above setting will not be indexed or stored
                        // so we cannot rely on its doc_values
                    } else {
                        // a : {c : {}} -> a -> a.c
                        docValueField = colName + "." + fieldName;
                    }
                }
            }
        } else {
            // set doc_value = false manually
            if (fieldObject.has("doc_values")) {
                boolean docValue = fieldObject.get("doc_values").asBoolean();
                if (!docValue) {
                    return;
                }
            } else if (fieldType == null || "nested".equals(fieldType)) {
                // The object field has no type, and nested not support doc value.
                return;
            } else if (fieldObject.has("ignore_above")) {
                // reference:
                // https://www.elastic.co/guide/en/elasticsearch/reference/current/keyword.html#keyword-params
                // > ignore_above
                // > Do not index any string longer than this value. Defaults to 2147483647 so that all values
                // > would be accepted. Please however note that default dynamic mapping rules create a sub
                // > keyword field that overrides this default by setting ignore_above: 256.
                // this field has `ignore_above` param
                // Strings longer than the ignore_above setting will not be indexed or stored
                // so we cannot rely on its doc_values
                return;
            }
            docValueField = colName;
        }
        // docValueField Cannot be null
        if (StringUtils.isNotEmpty(docValueField)) {
            searchContext.docValueFieldsContext().put(colName, docValueField);
        }
    }

}