CloudRollupJobV2.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.alter;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.cloud.datasource.CloudInternalCatalog;
import org.apache.doris.cloud.proto.Cloud;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.cloud.system.CloudSystemInfoService;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.proto.OlapFile;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.task.AgentTask;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.thrift.TTabletType;
import org.apache.doris.thrift.TTaskType;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

public class CloudRollupJobV2 extends RollupJobV2 {
    private static final Logger LOG = LogManager.getLogger(CloudRollupJobV2.class);

    public static AlterJobV2 buildCloudRollupJobV2(RollupJobV2 job) throws IllegalAccessException, AnalysisException {
        CloudRollupJobV2 ret = new CloudRollupJobV2();
        List<Field> allFields = new ArrayList<>();
        Class tmpClass = RollupJobV2.class;
        while (tmpClass != null) {
            allFields.addAll(Arrays.asList(tmpClass.getDeclaredFields()));
            tmpClass = tmpClass.getSuperclass();
        }
        for (Field field : allFields) {
            field.setAccessible(true);
            Annotation annotation = field.getAnnotation(SerializedName.class);
            if (annotation != null) {
                field.set(ret, field.get(job));
            }
        }
        ret.initAnalyzer();
        return ret;
    }

    private CloudRollupJobV2() {}

    // Don't call it directly, use AlterJobV2Factory to replace
    public CloudRollupJobV2(String rawSql, long jobId, long dbId, long tableId, String tableName, long timeoutMs,
                       long baseIndexId,
                       long rollupIndexId, String baseIndexName, String rollupIndexName, List<Column> rollupSchema,
                       Column whereColumn,
                       int baseSchemaHash, int rollupSchemaHash, KeysType rollupKeysType,
                       short rollupShortKeyColumnCount,
                       OriginStatement origStmt) throws AnalysisException {
        super(rawSql, jobId, dbId, tableId, tableName, timeoutMs, baseIndexId,
                rollupIndexId, baseIndexName, rollupIndexName, rollupSchema, whereColumn,
                baseSchemaHash, rollupSchemaHash, rollupKeysType, rollupShortKeyColumnCount, origStmt);
        ConnectContext context = ConnectContext.get();
        if (context != null) {
            String clusterName = "";
            try {
                clusterName = context.getCloudCluster();
            } catch (ComputeGroupException e) {
                LOG.warn("failed to get compute group name", e);
            }
            LOG.debug("rollup job add cloud cluster, context not null, cluster: {}", clusterName);
            if (!Strings.isNullOrEmpty(clusterName)) {
                setCloudClusterName(clusterName);
            }
        }
        LOG.debug("rollup job add cloud cluster, context {}", context);
    }

    @Override
    protected void onCreateRollupReplicaDone() throws AlterCancelException {
        List<Long> rollupIndexList = new ArrayList<Long>();
        rollupIndexList.add(rollupIndexId);
        try {
            ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                .commitMaterializedIndex(dbId, tableId, rollupIndexList, false);
        } catch (Exception e) {
            LOG.warn("commitMaterializedIndex Exception:{}", e);
            throw new AlterCancelException(e.getMessage());
        }

        LOG.info("onCreateRollupReplicaDone finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}",
                dbId, tableId, jobId, rollupIndexList);
    }

    @Override
    protected void onCancel() {
        List<Long> rollupIndexList = new ArrayList<Long>();
        rollupIndexList.add(rollupIndexId);
        long tryTimes = 1;
        while (true) {
            try {
                ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                    .dropMaterializedIndex(tableId, rollupIndexList, false);
                for (Map.Entry<Long, Map<Long, Long>> partitionEntry : partitionIdToBaseRollupTabletIdMap.entrySet()) {
                    Long partitionId = partitionEntry.getKey();
                    Map<Long, Long> rollupTabletIdToBaseTabletId = partitionEntry.getValue();
                    for (Map.Entry<Long, Long> tabletEntry : rollupTabletIdToBaseTabletId.entrySet()) {
                        Long rollupTabletId = tabletEntry.getKey();
                        Long baseTabletId = tabletEntry.getValue();
                        ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                                .removeSchemaChangeJob(jobId, dbId, tableId, baseIndexId, rollupIndexId,
                                    partitionId, baseTabletId, rollupTabletId);
                    }
                    LOG.info("Cancel RollupJob. Remove SchemaChangeJob in ms."
                            + "dbId:{}, tableId:{}, rollupIndexId: {} partitionId:{}. tabletSize:{}",
                            dbId, tableId, rollupIndexId, partitionId, rollupTabletIdToBaseTabletId.size());
                }
                break;
            } catch (Exception e) {
                LOG.warn("tryTimes:{}, onCancel exception:", tryTimes, e);
            }
            sleepSeveralSeconds();
            tryTimes++;
        }

        LOG.info("onCancel finished, dbId:{}, tableId:{}, jobId:{}, rollupIndexList:{}",
                dbId, tableId, jobId, rollupIndexList);
    }

    @Override
    protected void createRollupReplica() throws AlterCancelException {
        Database db = Env.getCurrentInternalCatalog()
                .getDbOrException(dbId, s -> new AlterCancelException("Database " + s + " does not exist"));

        // 1. create rollup replicas
        OlapTable tbl;
        try {
            tbl = (OlapTable) db.getTableOrMetaException(tableId, Table.TableType.OLAP);
        } catch (MetaNotFoundException e) {
            throw new AlterCancelException(e.getMessage());
        }

        long expiration = (createTimeMs + timeoutMs) / 1000;
        tbl.readLock();
        try {
            Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
            try {
                List<Long> rollupIndexList = new ArrayList<Long>();
                rollupIndexList.add(rollupIndexId);
                ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                    .prepareMaterializedIndex(tbl.getId(), rollupIndexList, expiration);
                createRollupReplicaForPartition(tbl);
            } catch (Exception e) {
                LOG.warn("createCloudShadowIndexReplica Exception:{}", e);
                throw new AlterCancelException(e.getMessage());
            }
        } finally {
            tbl.readUnlock();
        }

        // create all rollup replicas success.
        // add rollup index to catalog
        tbl.writeLockOrAlterCancelException();
        try {
            Preconditions.checkState(tbl.getState() == OlapTableState.ROLLUP);
            addRollupIndexToCatalog(tbl);
        } finally {
            tbl.writeUnlock();
        }
    }

    private void createRollupReplicaForPartition(OlapTable tbl) throws Exception {
        for (Map.Entry<Long, MaterializedIndex> entry : this.partitionIdToRollupIndex.entrySet()) {
            long partitionId = entry.getKey();
            Partition partition = tbl.getPartition(partitionId);
            if (partition == null) {
                continue;
            }
            TTabletType tabletType = tbl.getPartitionInfo().getTabletType(partitionId);
            MaterializedIndex rollupIndex = entry.getValue();
            Cloud.CreateTabletsRequest.Builder requestBuilder =
                    Cloud.CreateTabletsRequest.newBuilder();
            List<String> rowStoreColumns =
                                        tbl.getTableProperty().getCopiedRowStoreColumns();
            for (Tablet rollupTablet : rollupIndex.getTablets()) {
                OlapFile.TabletMetaCloudPB.Builder builder =
                        ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                            .createTabletMetaBuilder(tableId, rollupIndexId,
                            partitionId, rollupTablet, tabletType, rollupSchemaHash,
                                    rollupKeysType, rollupShortKeyColumnCount, tbl.getCopiedBfColumns(),
                                    tbl.getBfFpp(), null, rollupSchema,
                                    tbl.getDataSortInfo(), tbl.getCompressionType(), tbl.getStoragePolicy(),
                                    tbl.isInMemory(), true,
                                    tbl.getName(), tbl.getTTLSeconds(),
                                    tbl.getEnableUniqueKeyMergeOnWrite(), tbl.storeRowColumn(),
                                    tbl.getBaseSchemaVersion(), tbl.getCompactionPolicy(),
                                    tbl.getTimeSeriesCompactionGoalSizeMbytes(),
                                    tbl.getTimeSeriesCompactionFileCountThreshold(),
                                    tbl.getTimeSeriesCompactionTimeThresholdSeconds(),
                                    tbl.getTimeSeriesCompactionEmptyRowsetsThreshold(),
                                    tbl.getTimeSeriesCompactionLevelThreshold(),
                                    tbl.disableAutoCompaction(),
                                    tbl.getRowStoreColumnsUniqueIds(rowStoreColumns),
                                    null,
                                    tbl.rowStorePageSize(),
                                    tbl.variantEnableFlattenNested(), null,
                                    tbl.storagePageSize());
                requestBuilder.addTabletMetas(builder);
            } // end for rollupTablets
            requestBuilder.setDbId(dbId);
            ((CloudInternalCatalog) Env.getCurrentInternalCatalog())
                    .sendCreateTabletsRpc(requestBuilder);
        }
    }

    @Override
    protected void ensureCloudClusterExist(List<AgentTask> tasks) throws AlterCancelException {
        if (((CloudSystemInfoService) Env.getCurrentSystemInfo())
                .getCloudClusterIdByName(cloudClusterName) == null) {
            for (AgentTask task : tasks) {
                task.setFinished(true);
                AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.ALTER, task.getSignature());
            }
            StringBuilder sb = new StringBuilder("cloud cluster(");
            sb.append(cloudClusterName);
            sb.append(") has been removed, jobId=");
            sb.append(jobId);
            String msg = sb.toString();
            LOG.warn(msg);
            throw new AlterCancelException(msg);
        }
    }

    @Override
    protected boolean checkTableStable(Database db) throws AlterCancelException {
        return true;
    }
}