CloudReportHandler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud.master;
import org.apache.doris.catalog.Env;
import org.apache.doris.cloud.catalog.CloudEnv;
import org.apache.doris.master.ReportHandler;
import org.apache.doris.system.Backend;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.DropReplicaTask;
import org.apache.doris.thrift.TTablet;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class CloudReportHandler extends ReportHandler {
private static final Logger LOG = LogManager.getLogger(CloudReportHandler.class);
@Override
public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
Map<Long, Long> backendPartitionsVersion, long backendReportVersion, long numTablets) {
long start = System.currentTimeMillis();
LOG.info("backend[{}] have {} tablet(s), {} need deal tablet(s). report version: {}",
backendId, numTablets, backendTablets.size(), backendReportVersion);
// current be useful
Set<Long> tabletIdsInFe = ((CloudEnv) Env.getCurrentEnv()).getCloudTabletRebalancer()
.getSnapshotTabletsInPrimaryAndSecondaryByBeId(backendId);
Set<Long> tabletIdsInBe = backendTablets.keySet();
// handle (be - meta)
Set<Long> tabletIdsNeedDrop = diffTablets(tabletIdsInFe, tabletIdsInBe);
// drop agent task
deleteFromBackend(backendId, tabletIdsNeedDrop);
Backend be = Env.getCurrentSystemInfo().getBackend(backendId);
LOG.info("finished to handle task report from backend {}-{}, "
+ "diff task num: {}, cost: {} ms.",
backendId, be != null ? be.getHost() : "",
tabletIdsNeedDrop.size(),
(System.currentTimeMillis() - start));
}
// tabletIdsInFe, tablet is used in Primary or Secondary
// tabletIdsInBe, tablet report exceed time, need to check
// returns tabletIds need to drop
private Set<Long> diffTablets(Set<Long> tabletIdsInFe, Set<Long> tabletIdsInBe) {
// tabletsInBe - tabletsInFe
Set<Long> result = new HashSet<>(tabletIdsInBe);
result.removeAll(tabletIdsInFe);
return result;
}
private static void deleteFromBackend(long backendId, Set<Long> tabletIdsWillDrop) {
int deleteFromBackendCounter = 0;
AgentBatchTask batchTask = new AgentBatchTask();
for (Long tabletId : tabletIdsWillDrop) {
DropReplicaTask task = new DropReplicaTask(backendId, tabletId, -1, -1, false);
batchTask.addTask(task);
LOG.info("delete tablet[{}] from backend[{}]", tabletId, backendId);
++deleteFromBackendCounter;
}
if (batchTask.getTaskNum() != 0) {
AgentTaskExecutor.submit(batchTask);
}
LOG.info("delete {} tablet(s) from backend[{}]", deleteFromBackendCounter, backendId);
}
}