AlterHandler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.alter;
import org.apache.doris.analysis.AlterClause;
import org.apache.doris.analysis.CancelStmt;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.nereids.trees.plans.commands.AlterCommand;
import org.apache.doris.persist.RemoveAlterJobV2OperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.task.AlterReplicaTask;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantLock;
public abstract class AlterHandler extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(AlterHandler.class);
// queue of alter job v2
protected ConcurrentMap<Long, AlterJobV2> alterJobsV2 = Maps.newConcurrentMap();
/**
* lock to perform atomic operations.
* eg.
* When job is finished, it will be moved from alterJobs to finishedOrCancelledAlterJobs,
* and this requires atomic operations. So the lock must be held to do this operations.
* Operations like Get or Put do not need lock.
*/
protected ReentrantLock lock = new ReentrantLock();
public AlterHandler(String name) {
this(name, FeConstants.default_scheduler_interval_millisecond);
}
public AlterHandler(String name, int schedulerIntervalMillisecond) {
super(name, schedulerIntervalMillisecond);
}
protected void lock() {
lock.lock();
}
protected void unlock() {
lock.unlock();
}
protected void addAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
this.alterJobsV2.put(alterJob.getJobId(), alterJob);
LOG.info("add {} job {}", alterJob.getType(), alterJob.getJobId());
}
public List<AlterJobV2> getUnfinishedAlterJobV2ByTableId(long tblId) {
List<AlterJobV2> unfinishedAlterJobList = new ArrayList<>();
for (AlterJobV2 alterJob : alterJobsV2.values()) {
if (alterJob.getTableId() == tblId
&& alterJob.getJobState() != AlterJobV2.JobState.FINISHED
&& alterJob.getJobState() != AlterJobV2.JobState.CANCELLED) {
unfinishedAlterJobList.add(alterJob);
}
}
return unfinishedAlterJobList;
}
public AlterJobV2 getUnfinishedAlterJobV2ByJobId(long jobId) {
for (AlterJobV2 alterJob : alterJobsV2.values()) {
if (alterJob.getJobId() == jobId && !alterJob.isDone()) {
return alterJob;
}
}
return null;
}
public Map<Long, AlterJobV2> getAlterJobsV2() {
return this.alterJobsV2;
}
private void clearExpireFinishedOrCancelledAlterJobsV2() {
Iterator<Map.Entry<Long, AlterJobV2>> iterator = alterJobsV2.entrySet().iterator();
while (iterator.hasNext()) {
AlterJobV2 alterJobV2 = iterator.next().getValue();
if (alterJobV2.isExpire()) {
iterator.remove();
RemoveAlterJobV2OperationLog log = new RemoveAlterJobV2OperationLog(
alterJobV2.getJobId(), alterJobV2.getType());
Env.getCurrentEnv().getEditLog().logRemoveExpiredAlterJobV2(log);
LOG.info("remove expired {} job {}. finish at {}", alterJobV2.getType(),
alterJobV2.getJobId(), TimeUtils.longToTimeString(alterJobV2.getFinishedTimeMs()));
}
}
}
public void replayRemoveAlterJobV2(RemoveAlterJobV2OperationLog log) {
if (alterJobsV2.remove(log.getJobId()) != null) {
LOG.info("replay removing expired {} job {}.", log.getType(), log.getJobId());
} else {
// should not happen, but it does no matter, just add a warn log here to observe
LOG.warn("failed to find {} job {} when replay removing expired job.", log.getType(), log.getJobId());
}
}
public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state, long dbId) {
return alterJobsV2.values().stream().filter(e -> e.getJobState() == state && e.getDbId() == dbId).count();
}
public Long getAlterJobV2Num(org.apache.doris.alter.AlterJobV2.JobState state) {
Long counter = 0L;
for (AlterJobV2 job : alterJobsV2.values()) {
// no need to check priv here. This method is only called in show proc stmt,
// which already check the ADMIN priv.
if (job.getJobState() == state) {
counter++;
}
}
return counter;
}
@Override
protected void runAfterCatalogReady() {
clearExpireFinishedOrCancelledAlterJobsV2();
}
@Override
public void start() {
super.start();
}
/*
* abstract
*/
/*
* get alter job's info for show
*/
public abstract List<List<Comparable>> getAlterJobInfosByDb(Database db);
/*
* entry function. handle alter ops
*/
public abstract void process(String rawSql, List<AlterClause> alterClauses, Database db,
OlapTable olapTable)
throws UserException;
public abstract void processForNereids(String rawSql, List<AlterCommand> alterCommands, Database db,
OlapTable olapTable)
throws UserException;
/*
* entry function. handle alter ops
*/
public void process(List<AlterClause> alterClauses, Database db, OlapTable olapTable)
throws UserException {
process("", alterClauses, db, olapTable);
}
public void processForNereids(List<AlterCommand> alterSystemCommands, Database db, OlapTable olapTable)
throws UserException {
processForNereids("", alterSystemCommands, db, olapTable);
}
/*
* entry function. handle alter ops for external table
*/
public void processExternalTable(List<AlterClause> alterClauses, Database db, Table externalTable)
throws UserException {}
/*
* cancel alter ops
*/
public abstract void cancel(CancelStmt stmt) throws DdlException;
/*
* Handle the finish report of alter task.
* If task is success, which means the history data before specified version has been transformed successfully.
* So here we should modify the replica's version.
* We assume that the specified version is X.
* Case 1:
* After alter table process starts, there is no new load job being submitted. So the new replica
* should be with version (0-1). So we just modify the replica's version to
* partition's visible version, which is X.
* Case 2:
* After alter table process starts, there are some load job being processed.
* Case 2.1:
* None of them succeed on this replica. so the version is still 1.
* We should modify the replica's version to X.
* Case 2.2
* There are new load jobs after alter task, and at least one of them is succeed on this replica.
* So the replica's version should be larger than X. So we don't need to modify the replica version
* because its already looks like normal.
* In summary, we only need to update replica's version when replica's version is smaller than X
*/
public void handleFinishAlterTask(AlterReplicaTask task) throws MetaNotFoundException {
Database db = Env.getCurrentInternalCatalog().getDbOrMetaException(task.getDbId());
OlapTable tbl = (OlapTable) db.getTableOrMetaException(task.getTableId(), Table.TableType.OLAP);
tbl.writeLockOrMetaException();
try {
Partition partition = tbl.getPartition(task.getPartitionId());
if (partition == null) {
throw new MetaNotFoundException("partition " + task.getPartitionId() + " does not exist");
}
MaterializedIndex index = partition.getIndex(task.getIndexId());
if (index == null) {
throw new MetaNotFoundException("index " + task.getIndexId() + " does not exist");
}
Tablet tablet = index.getTablet(task.getTabletId());
Preconditions.checkNotNull(tablet, task.getTabletId());
Replica replica = tablet.getReplicaById(task.getNewReplicaId());
if (replica == null) {
throw new MetaNotFoundException("replica " + task.getNewReplicaId() + " does not exist");
}
LOG.info("before handle alter task tablet {}, replica: {}, task version: {}",
task.getSignature(), replica, task.getVersion());
boolean versionChanged = false;
if (replica.getVersion() < task.getVersion()) {
replica.updateVersion(task.getVersion());
versionChanged = true;
}
if (versionChanged) {
ReplicaPersistInfo info = ReplicaPersistInfo.createForClone(task.getDbId(), task.getTableId(),
task.getPartitionId(), task.getIndexId(), task.getTabletId(), task.getBackendId(),
replica.getId(), replica.getVersion(), -1,
replica.getDataSize(), replica.getRemoteDataSize(), replica.getRowCount(),
replica.getLastFailedVersion(), replica.getLastSuccessVersion());
Env.getCurrentEnv().getEditLog().logUpdateReplica(info);
}
LOG.info("after handle alter task tablet: {}, replica: {}", task.getSignature(), replica);
} finally {
tbl.writeUnlock();
}
}
// replay the alter job v2
public void replayAlterJobV2(AlterJobV2 alterJob) throws AnalysisException {
AlterJobV2 existingJob = alterJobsV2.get(alterJob.getJobId());
if (existingJob == null) {
// This is the first time to replay the alter job, so just using the replayed alterJob to call replay();
alterJob.replay(alterJob);
alterJobsV2.put(alterJob.getJobId(), alterJob);
} else {
existingJob.failedTabletBackends = alterJob.failedTabletBackends;
existingJob.replay(alterJob);
}
}
/**
* there will be OOM if there are too many replicas of the table when schema change.
*/
protected void checkReplicaCount(OlapTable olapTable) throws DdlException {
olapTable.readLock();
try {
long replicaCount = olapTable.getReplicaCount();
long maxReplicaCount = Config.max_replica_count_when_schema_change;
if (replicaCount > maxReplicaCount) {
String msg = String.format("%s have %d replicas reach %d limit when schema change.",
olapTable.getName(), replicaCount, maxReplicaCount);
LOG.warn(msg);
throw new DdlException(msg);
}
} finally {
olapTable.readUnlock();
}
}
}