UnboundTableSinkCreator.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.nereids.analyzer;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.hive.HMSExternalCatalog;
import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
import org.apache.doris.dictionary.Dictionary;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.exceptions.ParseException;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.info.DMLCommandType;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.logical.LogicalSink;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.qe.ConnectContext;

import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

/**
 * Create unbound table sink
 */
public class UnboundTableSinkCreator {

    /**
     * create unbound sink without DML command
     */
    public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> nameParts,
                List<String> colNames, List<String> hints, List<String> partitions, Plan query)
            throws UserException {
        String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
        CatalogIf<?> curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
        if (curCatalog instanceof InternalCatalog) {
            return new UnboundTableSink<>(nameParts, colNames, hints, partitions, query);
        } else if (curCatalog instanceof HMSExternalCatalog) {
            return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions, query);
        } else if (curCatalog instanceof IcebergExternalCatalog) {
            return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions, query);
        }
        throw new UserException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
    }

    /**
     * create unbound sink for DML plan
     */
    public static LogicalSink<? extends Plan> createUnboundTableSink(List<String> nameParts,
                List<String> colNames, List<String> hints, boolean temporaryPartition, List<String> partitions,
                boolean isPartialUpdate, DMLCommandType dmlCommandType, LogicalPlan plan) {
        String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
        CatalogIf<?> curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
        if (curCatalog instanceof InternalCatalog) {
            return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions,
                    isPartialUpdate, dmlCommandType, Optional.empty(),
                    Optional.empty(), plan);
        } else if (curCatalog instanceof HMSExternalCatalog) {
            return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
        } else if (curCatalog instanceof IcebergExternalCatalog) {
            return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
        } else if (curCatalog instanceof JdbcExternalCatalog) {
            return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
        }
        throw new RuntimeException("Load data to " + curCatalog.getClass().getSimpleName() + " is not supported.");
    }

    /**
     * create unbound sink for DML plan with auto detect overwrite partition enable.
     */
    public static LogicalSink<? extends Plan> createUnboundTableSinkMaybeOverwrite(List<String> nameParts,
            List<String> colNames, List<String> hints, boolean temporaryPartition, List<String> partitions,
            boolean isAutoDetectPartition, boolean isOverwrite, boolean isPartialUpdate, DMLCommandType dmlCommandType,
            LogicalPlan plan) {
        if (isAutoDetectPartition) { // partitions is null
            if (!isOverwrite) {
                throw new ParseException("ASTERISK is only supported in overwrite partition for OLAP table");
            }
            temporaryPartition = false;
            partitions = ImmutableList.of();
        }

        String catalogName = RelationUtil.getQualifierName(ConnectContext.get(), nameParts).get(0);
        CatalogIf<?> curCatalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
        if (curCatalog instanceof InternalCatalog) {
            return new UnboundTableSink<>(nameParts, colNames, hints, temporaryPartition, partitions,
                    isAutoDetectPartition,
                    isPartialUpdate, dmlCommandType, Optional.empty(),
                    Optional.empty(), plan);
        } else if (curCatalog instanceof HMSExternalCatalog && !isAutoDetectPartition) {
            return new UnboundHiveTableSink<>(nameParts, colNames, hints, partitions,
                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
        } else if (curCatalog instanceof IcebergExternalCatalog && !isAutoDetectPartition) {
            return new UnboundIcebergTableSink<>(nameParts, colNames, hints, partitions,
                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
        } else if (curCatalog instanceof JdbcExternalCatalog) {
            return new UnboundJdbcTableSink<>(nameParts, colNames, hints, partitions,
                    dmlCommandType, Optional.empty(), Optional.empty(), plan);
        }

        throw new AnalysisException(
                (isOverwrite ? "insert overwrite" : "insert") + " data to " + curCatalog.getClass().getSimpleName()
                        + " is not supported."
                        + (isAutoDetectPartition
                        ? " PARTITION(*) is only supported in overwrite partition for OLAP table" : ""));
    }

    /**
     * create unbound sink for dictionary sink
     */
    public static UnboundDictionarySink<? extends Plan> createUnboundDictionarySink(Dictionary dictionary,
            LogicalPlan child, boolean adaptiveLoad) {
        return new UnboundDictionarySink<>(dictionary, child, adaptiveLoad);
    }
}