MasterOpExecutor.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.RedirectStatus;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.DdlException;
import org.apache.doris.thrift.TGroupCommitInfo;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TNetworkAddress;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/**
 * MasterOpExecutor is used to send request to Master FE.
 * It is inherited from FEOpExecutor. The difference is that MasterOpExecutor may need to wait the journal being
 * synced before returning.
 */
public class MasterOpExecutor extends FEOpExecutor {
    private static final Logger LOG = LogManager.getLogger(MasterOpExecutor.class);
    private final int journalWaitTimeoutMs;

    public MasterOpExecutor(OriginStatement originStmt, ConnectContext ctx, RedirectStatus status, boolean isQuery) {
        super(new TNetworkAddress(ctx.getEnv().getMasterHost(), ctx.getEnv().getMasterRpcPort()),
                originStmt, ctx, isQuery);
        if (status.isNeedToWaitJournalSync()) {
            this.journalWaitTimeoutMs = (int) (ctx.getExecTimeoutS() * 1000 * RPC_TIMEOUT_COEFFICIENT);
        } else {
            this.journalWaitTimeoutMs = 0;
        }
    }

    /**
     * used for simply syncing journal with master under strong consistency mode
     */
    public MasterOpExecutor(ConnectContext ctx) {
        this(null, ctx, RedirectStatus.FORWARD_WITH_SYNC, true);
    }

    @Override
    public void execute() throws Exception {
        super.execute();
        waitOnReplaying();
    }

    @Override
    public void cancel() throws Exception {
        super.cancel();
        waitOnReplaying();
    }

    private void waitOnReplaying() throws DdlException {
        LOG.info("forwarding to master get result max journal id: {}", result.maxJournalId);
        ctx.getEnv().getJournalObservable().waitOn(result.maxJournalId, journalWaitTimeoutMs);
    }

    public void syncJournal() throws Exception {
        result = forward(buildSyncJournalParams());
        waitOnReplaying();
    }

    public long getGroupCommitLoadBeId(long tableId, String cluster) throws Exception {
        result = forward(buildGetGroupCommitLoadBeIdParmas(tableId, cluster));
        waitOnReplaying();
        return result.groupCommitLoadBeId;
    }

    public void updateLoadData(long tableId, long receiveData) throws Exception {
        result = forward(buildUpdateLoadDataParams(tableId, receiveData));
        waitOnReplaying();
    }

    private TMasterOpRequest buildSyncJournalParams() {
        final TMasterOpRequest params = new TMasterOpRequest();
        // node ident
        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
        params.setSyncJournalOnly(true);
        params.setDb(ctx.getDatabase());
        params.setUser(ctx.getQualifiedUser());
        // just make the protocol happy
        params.setSql("");
        return params;
    }

    private TMasterOpRequest buildGetGroupCommitLoadBeIdParmas(long tableId, String cluster) {
        final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
        groupCommitParams.setGetGroupCommitLoadBeId(true);
        groupCommitParams.setGroupCommitLoadTableId(tableId);
        groupCommitParams.setCluster(cluster);
        return getMasterOpRequestForGroupCommit(groupCommitParams);
    }

    private TMasterOpRequest buildUpdateLoadDataParams(long tableId, long receiveData) {
        final TGroupCommitInfo groupCommitParams = new TGroupCommitInfo();
        groupCommitParams.setUpdateLoadData(true);
        groupCommitParams.setTableId(tableId);
        groupCommitParams.setReceiveData(receiveData);
        return getMasterOpRequestForGroupCommit(groupCommitParams);
    }

    private TMasterOpRequest getMasterOpRequestForGroupCommit(TGroupCommitInfo groupCommitParams) {
        final TMasterOpRequest params = new TMasterOpRequest();
        // node ident
        params.setClientNodeHost(Env.getCurrentEnv().getSelfNode().getHost());
        params.setClientNodePort(Env.getCurrentEnv().getSelfNode().getPort());
        params.setGroupCommitInfo(groupCommitParams);
        params.setDb(ctx.getDatabase());
        params.setUser(ctx.getQualifiedUser());
        // just make the protocol happy
        params.setSql("");
        return params;
    }

}