GroupCommitTableValuedFunction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.planner.GroupCommitPlanner;
import org.apache.doris.planner.GroupCommitScanNode;
import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TFileType;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * The Implement of table valued function
 * group_commit().
 */
public class GroupCommitTableValuedFunction extends ExternalFileTableValuedFunction {

    public static final String NAME = "group_commit";

    private static final String TABLE_ID_KEY = "table_id";

    private final long tableId;

    public GroupCommitTableValuedFunction(Map<String, String> params) throws AnalysisException {
        if (params == null || !params.containsKey(TABLE_ID_KEY)) {
            throw new AnalysisException(NAME + " should contains property " + TABLE_ID_KEY);
        }
        tableId = Long.parseLong(params.get(TABLE_ID_KEY));
    }

    // =========== implement abstract methods of ExternalFileTableValuedFunction =================

    @Override
    public List<Column> getTableColumns() throws AnalysisException {
        List<Column> fileColumns = new ArrayList<>();
        Table table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
        if (table == null) {
            throw new AnalysisException("table with table_id " + tableId + " is not exists");
        }
        if (!(table instanceof OlapTable)) {
            throw new AnalysisException("Only support OLAP table, but table type of table_id "
                    + tableId + " is " + table.getType());
        }
        if (Env.getCurrentEnv().getGroupCommitManager().isBlock(tableId)) {
            String msg = "insert table " + tableId + GroupCommitPlanner.SCHEMA_CHANGE;
            LOG.info(msg);
            throw new AnalysisException(msg);
        }
        if (Config.group_commit_timeout_multipler > 0) {
            int timeoutS = Math.max((int) (((OlapTable) table).getGroupCommitIntervalMs() / 1000.0
                    * Config.group_commit_timeout_multipler), 600);
            ConnectContext.get().getSessionVariable().setInsertTimeoutS(timeoutS);
            ConnectContext.get().getSessionVariable().setQueryTimeoutS(timeoutS);
        }

        List<Column> tableColumns = table.getBaseSchema(true);
        for (int i = 1; i <= tableColumns.size(); i++) {
            fileColumns.add(new Column("c" + i, tableColumns.get(i - 1).getType(), true));
        }
        return fileColumns;
    }

    @Override
    public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, SessionVariable sv) {
        return new GroupCommitScanNode(id, desc, tableId);
    }

    @Override
    public TFileType getTFileType() {
        return TFileType.FILE_STREAM;
    }

    @Override
    public String getFilePath() {
        return null;
    }

    @Override
    public BrokerDesc getBrokerDesc() {
        return new BrokerDesc("GroupCommitTvfBroker", StorageType.STREAM, processedParams);
    }

    // =========== implement abstract methods of TableValuedFunctionIf =================
    @Override
    public String getTableName() {
        return "GroupCommitTableValuedFunction";
    }
}