AlterReplicaTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.task;

import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.analysis.DescriptorTable;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.Config;
import org.apache.doris.thrift.TAlterMaterializedViewParam;
import org.apache.doris.thrift.TAlterTabletReqV2;
import org.apache.doris.thrift.TAlterTabletType;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TTaskType;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/*
 * This task is used for alter table process, such as rollup and schema change
 * The task will do data transformation from base replica to new replica.
 * The new replica should be created before.
 * The new replica can be a rollup replica, or a shadow replica of schema change.
 */
public class AlterReplicaTask extends AgentTask {

    private long baseTabletId;
    private long newReplicaId;
    private int baseSchemaHash;
    private int newSchemaHash;
    private long version;
    private long jobId;
    private AlterJobV2.JobType jobType;

    private Map<String, Expr> defineExprs;
    private Expr whereClause;
    private DescriptorTable descTable;
    private Map<Object, Object> objectPool;
    private List<Column> baseSchemaColumns;

    private long expiration;

    private String vaultId;
    /**
     * AlterReplicaTask constructor.
     *
     */

    public AlterReplicaTask(long backendId, long dbId, long tableId, long partitionId, long rollupIndexId,
            long baseIndexId, long rollupTabletId, long baseTabletId, long newReplicaId, int newSchemaHash,
            int baseSchemaHash, long version, long jobId, AlterJobV2.JobType jobType, Map<String, Expr> defineExprs,
            DescriptorTable descTable, List<Column> baseSchemaColumns, Map<Object, Object> objectPool,
            Expr whereClause, long expiration, String vaultId) {
        super(null, backendId, TTaskType.ALTER, dbId, tableId, partitionId, rollupIndexId, rollupTabletId);

        this.baseTabletId = baseTabletId;
        this.newReplicaId = newReplicaId;

        this.newSchemaHash = newSchemaHash;
        this.baseSchemaHash = baseSchemaHash;

        this.version = version;
        this.jobId = jobId;

        this.jobType = jobType;
        this.defineExprs = defineExprs;
        this.whereClause = whereClause;
        this.descTable = descTable;
        this.baseSchemaColumns = baseSchemaColumns;
        this.objectPool = objectPool;
        this.expiration = expiration;
        this.vaultId = vaultId;
    }

    public long getBaseTabletId() {
        return baseTabletId;
    }

    public long getNewReplicaId() {
        return newReplicaId;
    }

    public int getNewSchemaHash() {
        return newSchemaHash;
    }

    public int getBaseSchemaHash() {
        return baseSchemaHash;
    }

    public long getVersion() {
        return version;
    }

    public long getJobId() {
        return jobId;
    }

    public AlterJobV2.JobType getJobType() {
        return jobType;
    }

    public TAlterTabletReqV2 toThrift() {
        TAlterTabletReqV2 req = new TAlterTabletReqV2(baseTabletId, signature, baseSchemaHash, newSchemaHash);
        req.setAlterVersion(version);
        req.setJobId(jobId);
        req.setExpiration(expiration);
        req.setBeExecVersion(Config.be_exec_version);
        switch (jobType) {
            case ROLLUP:
                req.setAlterTabletType(TAlterTabletType.ROLLUP);
                break;
            case SCHEMA_CHANGE:
                req.setAlterTabletType(TAlterTabletType.SCHEMA_CHANGE);
                break;
            default:
                break;
        }

        if (defineExprs != null) {
            for (Map.Entry<String, Expr> entry : defineExprs.entrySet()) {
                Object value = objectPool.get(entry.getKey());
                if (value == null) {
                    List<SlotRef> slots = Lists.newArrayList();
                    entry.getValue().collect(SlotRef.class, slots);
                    TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(entry.getKey());
                    mvParam.setMvExpr(entry.getValue().treeToThrift());
                    req.addToMaterializedViewParams(mvParam);
                    objectPool.put(entry.getKey(), mvParam);
                } else {
                    TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
                    req.addToMaterializedViewParams(mvParam);
                }
            }
        }

        if (whereClause != null) {
            Object value = objectPool.get(Column.WHERE_SIGN);
            if (value == null) {
                TAlterMaterializedViewParam mvParam = new TAlterMaterializedViewParam(Column.WHERE_SIGN);
                mvParam.setMvExpr(whereClause.treeToThrift());
                req.addToMaterializedViewParams(mvParam);
                objectPool.put(Column.WHERE_SIGN, mvParam);
            } else {
                TAlterMaterializedViewParam mvParam = (TAlterMaterializedViewParam) value;
                req.addToMaterializedViewParams(mvParam);
            }
        }
        req.setDescTbl(descTable.toThrift());

        if (baseSchemaColumns != null) {
            Object value = objectPool.get(baseSchemaColumns);
            if (value == null) {
                List<TColumn> columns = new ArrayList<TColumn>();
                for (Column column : baseSchemaColumns) {
                    columns.add(column.toThrift());
                }
                objectPool.put(baseSchemaColumns, columns);
                req.setColumns(columns);
            } else {
                List<TColumn> columns = (List<TColumn>) value;
                req.setColumns(columns);
            }
        }
        req.setStorageVaultId(this.vaultId);
        return req;
    }
}