EsExternalCatalog.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.es;

import org.apache.doris.catalog.EsResource;
import org.apache.doris.common.DdlException;
import org.apache.doris.datasource.CatalogProperty;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.InitCatalogLog;
import org.apache.doris.datasource.SessionContext;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;

/**
 * External catalog for elasticsearch
 */
@Getter
public class EsExternalCatalog extends ExternalCatalog {
    public static final String DEFAULT_DB = "default_db";

    private static final Logger LOG = LogManager.getLogger(EsExternalCatalog.class);
    private EsRestClient esRestClient;
    private static final List<String> REQUIRED_PROPERTIES = ImmutableList.of(
            EsResource.HOSTS
    );

    /**
     * Default constructor for EsExternalCatalog.
     */
    public EsExternalCatalog(long catalogId, String name, String resource, Map<String, String> props, String comment) {
        super(catalogId, name, InitCatalogLog.Type.ES, comment);
        this.catalogProperty = new CatalogProperty(resource, processCompatibleProperties(props));
    }

    private Map<String, String> processCompatibleProperties(Map<String, String> props) {
        // Compatible with "Doris On ES" interfaces
        Map<String, String> properties = Maps.newHashMap();
        for (Map.Entry<String, String> kv : props.entrySet()) {
            properties.put(StringUtils.removeStart(kv.getKey(), EsResource.ES_PROPERTIES_PREFIX), kv.getValue());
        }
        // nodes = properties.get(EsResource.HOSTS).trim().split(",");
        if (properties.containsKey("ssl")) {
            properties.put(EsResource.HTTP_SSL_ENABLED, properties.remove("ssl"));
        }
        if (properties.containsKey("username")) {
            properties.put(EsResource.USER, properties.remove("username"));
        }
        return properties;
    }

    public String[] getNodes() {
        String hosts = catalogProperty.getOrDefault(EsResource.HOSTS, "");
        String sslEnabled =
                catalogProperty.getOrDefault(EsResource.HTTP_SSL_ENABLED, EsResource.HTTP_SSL_ENABLED_DEFAULT_VALUE);
        String[] hostUrls = hosts.trim().split(",");
        EsResource.fillUrlsWithSchema(hostUrls, Boolean.parseBoolean(sslEnabled));
        return hostUrls;
    }

    public String getUsername() {
        return catalogProperty.getOrDefault(EsResource.USER, "");
    }

    public String getPassword() {
        return catalogProperty.getOrDefault(EsResource.PASSWORD, "");
    }

    public boolean enableDocValueScan() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.DOC_VALUE_SCAN,
                EsResource.DOC_VALUE_SCAN_DEFAULT_VALUE));
    }

    public boolean enableKeywordSniff() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.KEYWORD_SNIFF,
                EsResource.KEYWORD_SNIFF_DEFAULT_VALUE));
    }

    public boolean enableSsl() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.HTTP_SSL_ENABLED,
                EsResource.HTTP_SSL_ENABLED_DEFAULT_VALUE));
    }

    public boolean enableNodesDiscovery() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.NODES_DISCOVERY,
                EsResource.NODES_DISCOVERY_DEFAULT_VALUE));
    }

    public boolean enableMappingEsId() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.MAPPING_ES_ID,
                EsResource.MAPPING_ES_ID_DEFAULT_VALUE));
    }

    public boolean enableLikePushDown() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.LIKE_PUSH_DOWN,
                EsResource.LIKE_PUSH_DOWN_DEFAULT_VALUE));
    }

    public boolean enableIncludeHiddenIndex() {
        return Boolean.parseBoolean(catalogProperty.getOrDefault(EsResource.INCLUDE_HIDDEN_INDEX,
                EsResource.INCLUDE_HIDDEN_INDEX_DEFAULT_VALUE));
    }

    @Override
    protected void initLocalObjectsImpl() {
        esRestClient = new EsRestClient(getNodes(), getUsername(), getPassword(), enableSsl());
        if (!esRestClient.health()) {
            throw new DorisEsException("Failed to connect to ES cluster,"
                    + " please check your ES cluster or your ES catalog configuration.");
        }
    }

    @Override
    public List<String> listTableNames(SessionContext ctx, String dbName) {
        makeSureInitialized();
        return esRestClient.listTable(enableIncludeHiddenIndex());
    }

    @Override
    public boolean tableExist(SessionContext ctx, String dbName, String tblName) {
        return esRestClient.existIndex(this.esRestClient.getClient(), tblName);
    }

    @Override
    protected List<String> listDatabaseNames() {
        return Lists.newArrayList(DEFAULT_DB);
    }

    @Override
    public void checkProperties() throws DdlException {
        super.checkProperties();
        for (String requiredProperty : REQUIRED_PROPERTIES) {
            if (!catalogProperty.getProperties().containsKey(requiredProperty)) {
                throw new DdlException("Required property '" + requiredProperty + "' is missing");
            }
        }
    }
}