PublishVersionDaemon.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.transaction;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.Config;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.system.Backend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.AgentTaskQueue;
import org.apache.doris.task.PublishVersionTask;
import org.apache.doris.task.UpdateVisibleVersionTask;
import org.apache.doris.thrift.TPartitionVersionInfo;
import org.apache.doris.thrift.TTaskType;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
public class PublishVersionDaemon extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(PublishVersionDaemon.class);
public PublishVersionDaemon() {
super("PUBLISH_VERSION", Config.publish_version_interval_ms);
}
@Override
protected void runAfterCatalogReady() {
Map<Long, Long> partitionVisibleVersions = Maps.newHashMap();
Map<Long, Set<Long>> backendPartitions = Maps.newHashMap();
try {
publishVersion(partitionVisibleVersions, backendPartitions);
sendBackendVisibleVersion(partitionVisibleVersions, backendPartitions);
} catch (Throwable t) {
LOG.error("errors while publish version to all backends", t);
}
}
private void publishVersion(Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
if (DebugPointUtil.isEnable("PublishVersionDaemon.stop_publish")) {
return;
}
GlobalTransactionMgrIface globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
List<TransactionState> readyTransactionStates = globalTransactionMgr.getReadyToPublishTransactions();
if (readyTransactionStates.isEmpty()) {
return;
}
// ATTN, we publish transaction state to all backends including dead backend, if not publish to dead backend
// then transaction manager will treat it as success
SystemInfoService infoService = Env.getCurrentSystemInfo();
List<Long> allBackends = infoService.getAllBackendIds(false);
if (allBackends.isEmpty()) {
LOG.warn("some transaction state need to publish, but no backend exists");
return;
}
traverseReadyTxnAndDispatchPublishVersionTask(readyTransactionStates, allBackends);
tryFinishTxn(readyTransactionStates, infoService, globalTransactionMgr,
partitionVisibleVersions, backendPartitions);
}
private void traverseReadyTxnAndDispatchPublishVersionTask(List<TransactionState> readyTransactionStates,
List<Long> allBackends) {
long createPublishVersionTaskTime = System.currentTimeMillis();
// every backend-transaction identified a single task
AgentBatchTask batchTask = new AgentBatchTask();
// for delta rows statistics to exclude rollup tablets
Map<Long, Set<Long>> beIdToBaseTabletIds = Maps.newHashMap();
// traverse all ready transactions and dispatch the publish version task to all backends
for (TransactionState transactionState : readyTransactionStates) {
if (transactionState.hasSendTask()) {
continue;
}
try {
genPublishTask(allBackends, transactionState, createPublishVersionTaskTime, beIdToBaseTabletIds,
batchTask);
} catch (Throwable t) {
LOG.error("errors while generate publish task for transaction: {}", transactionState, t);
}
}
if (!batchTask.getAllTasks().isEmpty()) {
AgentTaskExecutor.submit(batchTask);
}
}
private void genPublishTask(List<Long> allBackends, TransactionState transactionState,
long createPublishVersionTaskTime, Map<Long, Set<Long>> beIdToBaseTabletIds, AgentBatchTask batchTask) {
Set<Long> publishBackends = Sets.newHashSet(transactionState.getPublishVersionTasks().keySet());
publishBackends.addAll(transactionState.getInvolvedBackends());
// public version tasks are not persisted in catalog, so publishBackends may be empty.
// so we have to try publish to all backends;
if (publishBackends.isEmpty()) {
// could not just add to it, should new a new object, or the back map will destroyed
publishBackends = Sets.newHashSet();
publishBackends.addAll(allBackends);
}
if (transactionState.getTransactionId() == DebugPointUtil.getDebugParamOrDefault(
"PublishVersionDaemon.genPublishTask.failed", "txnId", -1L)) {
throw new NullPointerException("genPublishTask failed for txnId: " + transactionState.getTransactionId());
}
if (transactionState.getSubTxnIds() != null) {
for (Entry<Long, TableCommitInfo> entry : transactionState.getSubTxnIdToTableCommitInfo().entrySet()) {
long subTxnId = entry.getKey();
List<TPartitionVersionInfo> partitionVersionInfos = generatePartitionVersionInfos(entry.getValue(),
transactionState, beIdToBaseTabletIds);
LOG.debug("add publish task, txnId={}, subTxnId={}, backends={}, partitionVersionInfos={}",
transactionState.getTransactionId(), subTxnId, publishBackends, partitionVersionInfos);
addPublishVersionTask(publishBackends, subTxnId, transactionState, partitionVersionInfos,
beIdToBaseTabletIds, createPublishVersionTaskTime, batchTask);
}
} else {
List<TPartitionVersionInfo> partitionVersionInfos = generatePartitionVersionInfos(
transactionState.getIdToTableCommitInfos().values(), transactionState, beIdToBaseTabletIds);
addPublishVersionTask(publishBackends, transactionState.getTransactionId(), transactionState,
partitionVersionInfos, beIdToBaseTabletIds, createPublishVersionTaskTime, batchTask);
}
transactionState.setSendedTask();
LOG.info("send publish tasks for transaction: {}, db: {}", transactionState.getTransactionId(),
transactionState.getDbId());
}
private void tryFinishTxn(List<TransactionState> readyTransactionStates,
SystemInfoService infoService, GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
for (TransactionState transactionState : readyTransactionStates) {
try {
// try to finish the transaction, if failed just retry in next loop
tryFinishOneTxn(transactionState, infoService, globalTransactionMgr, partitionVisibleVersions,
backendPartitions);
} catch (Throwable t) {
LOG.error("errors while finish transaction: {}, publish tasks: {}", transactionState,
transactionState.getPublishVersionTasks(), t);
}
} // end for readyTransactionStates
}
private void tryFinishOneTxn(TransactionState transactionState, SystemInfoService infoService,
GlobalTransactionMgrIface globalTransactionMgr,
Map<Long, Long> partitionVisibleVersions, Map<Long, Set<Long>> backendPartitions) {
Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows = Maps.newHashMap();
AtomicBoolean hasBackendAliveAndUnfinishedTask = new AtomicBoolean(false);
Set<Long> notFinishTaskBe = Sets.newHashSet();
transactionState.getPublishVersionTasks().forEach((key, tasks) -> {
long beId = key;
for (PublishVersionTask task : tasks) {
if (task.isFinished()) {
calculateTaskUpdateRows(tableIdToTabletDeltaRows, task);
} else {
if (infoService.checkBackendAlive(task.getBackendId())) {
hasBackendAliveAndUnfinishedTask.set(true);
}
notFinishTaskBe.add(beId);
}
}
});
transactionState.setTableIdToTabletDeltaRows(tableIdToTabletDeltaRows);
if (LOG.isDebugEnabled()) {
LOG.debug("notFinishTaskBe {}, trans {}", notFinishTaskBe, transactionState);
}
boolean isPublishSlow = false;
long totalNum = transactionState.getPublishVersionTasks().keySet().size();
boolean allUnFinishTaskIsSlow = notFinishTaskBe.stream().allMatch(beId -> {
Backend be = infoService.getBackend(beId);
if (be == null) {
return false;
}
return be.getPublishTaskLastTimeAccumulated() > Config.publish_version_queued_limit_number;
});
if (totalNum - notFinishTaskBe.size() > totalNum / 2 && allUnFinishTaskIsSlow) {
if (LOG.isDebugEnabled()) {
LOG.debug(" finishNum {}, txn publish tasks {}, notFinishTaskBe {}",
totalNum - notFinishTaskBe.size(), transactionState.getPublishVersionTasks().keySet(),
notFinishTaskBe);
}
isPublishSlow = true;
}
boolean shouldFinishTxn = !hasBackendAliveAndUnfinishedTask.get() || transactionState.isPublishTimeout()
|| isPublishSlow
|| DebugPointUtil.isEnable("PublishVersionDaemon.not_wait_unfinished_tasks");
if (shouldFinishTxn) {
try {
// one transaction exception should not affect other transaction
globalTransactionMgr.finishTransaction(transactionState.getDbId(),
transactionState.getTransactionId(), partitionVisibleVersions, backendPartitions);
} catch (Exception e) {
LOG.warn("error happens when finish transaction {}", transactionState.getTransactionId(), e);
}
if (transactionState.getTransactionStatus() != TransactionStatus.VISIBLE) {
// if finish transaction state failed, then update publish version time, should check
// to finish after some interval
transactionState.updateSendTaskTime();
if (LOG.isDebugEnabled()) {
LOG.debug("publish version for transaction {} failed", transactionState);
}
}
}
if (transactionState.getTransactionStatus() == TransactionStatus.VISIBLE) {
transactionState.getPublishVersionTasks().values().forEach(tasks -> {
for (PublishVersionTask task : tasks) {
AgentTaskQueue.removeTask(task.getBackendId(), TTaskType.PUBLISH_VERSION, task.getSignature());
}
});
transactionState.pruneAfterVisible();
if (MetricRepo.isInit) {
long publishTime = transactionState.getLastPublishVersionTime() - transactionState.getCommitTime();
MetricRepo.HISTO_TXN_PUBLISH_LATENCY.update(publishTime);
}
}
}
// Merge task tablets update rows to tableToTabletsDelta.
private void calculateTaskUpdateRows(Map<Long, Map<Long, Long>> tableIdToTabletDeltaRows, PublishVersionTask task) {
if (CollectionUtils.isEmpty(task.getErrorTablets())) {
LOG.debug("Task backend id {}, update rows info : [{}]",
task.getBackendId(), task.getTableIdToTabletDeltaRows());
for (Entry<Long, Map<Long, Long>> tableEntry : task.getTableIdToTabletDeltaRows().entrySet()) {
tableIdToTabletDeltaRows.putIfAbsent(tableEntry.getKey(), Maps.newHashMap());
Map<Long, Long> tabletsDelta = tableIdToTabletDeltaRows.get(tableEntry.getKey());
for (Entry<Long, Long> tabletEntry : tableEntry.getValue().entrySet()) {
tabletsDelta.computeIfPresent(tabletEntry.getKey(),
(tabletId, origRows) -> origRows + tabletEntry.getValue());
tabletsDelta.putIfAbsent(tabletEntry.getKey(), tabletEntry.getValue());
}
}
}
}
private Map<Long, Set<Long>> getBaseTabletIdsForEachBe(TransactionState transactionState,
TableCommitInfo tableCommitInfo) throws MetaNotFoundException {
OlapTable table = (OlapTable) Env.getCurrentEnv()
.getInternalCatalog()
.getDb(transactionState.getDbId())
.orElseThrow(() -> new MetaNotFoundException(String.format("could not get db by id=%s",
transactionState.getDbId())))
.getTable(tableCommitInfo.getTableId())
.orElseThrow(() -> new MetaNotFoundException(String.format("could not get tbl by id=%s",
tableCommitInfo)));
return tableCommitInfo
.getIdToPartitionCommitInfo()
.values().stream()
.map(PartitionCommitInfo::getPartitionId)
.map(partitionId -> Optional.ofNullable(table.getPartition(partitionId)))
.filter(Optional::isPresent)
.map(Optional::get)
.map(Partition::getBaseIndex)
.map(MaterializedIndex::getTablets)
.flatMap(Collection::stream)
.flatMap(tablet ->
tablet.getBackendIds()
.stream().map(backendId -> Pair.of(backendId, tablet.getId())))
.collect(Collectors.groupingBy(p -> p.first,
Collectors.mapping(p -> p.second, Collectors.toSet())));
}
private void sendBackendVisibleVersion(Map<Long, Long> partitionVisibleVersions,
Map<Long, Set<Long>> backendPartitions) {
if (partitionVisibleVersions.isEmpty() || backendPartitions.isEmpty()) {
return;
}
long createTime = System.currentTimeMillis();
AgentBatchTask batchTask = new AgentBatchTask();
backendPartitions.forEach((backendId, partitionIds) -> {
Map<Long, Long> backendPartitionVersions = partitionVisibleVersions.entrySet().stream()
.filter(entry -> partitionIds.contains(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
UpdateVisibleVersionTask task = new UpdateVisibleVersionTask(backendId, backendPartitionVersions,
createTime);
batchTask.addTask(task);
});
AgentTaskExecutor.submit(batchTask);
}
private List<TPartitionVersionInfo> generatePartitionVersionInfos(Collection<TableCommitInfo> tableCommitInfos,
TransactionState transactionState, Map<Long, Set<Long>> beIdToBaseTabletIds) {
return tableCommitInfos.stream()
.map(c -> generatePartitionVersionInfos(c, transactionState, beIdToBaseTabletIds)).flatMap(List::stream)
.collect(Collectors.toList());
}
private List<TPartitionVersionInfo> generatePartitionVersionInfos(TableCommitInfo tableCommitInfo,
TransactionState transactionState, Map<Long, Set<Long>> beIdToBaseTabletIds) {
try {
Map<Long, Set<Long>> map = getBaseTabletIdsForEachBe(transactionState, tableCommitInfo);
map.forEach((beId, newSet) -> {
beIdToBaseTabletIds.computeIfPresent(beId, (id, orgSet) -> {
orgSet.addAll(newSet);
return orgSet;
});
beIdToBaseTabletIds.putIfAbsent(beId, newSet);
});
} catch (MetaNotFoundException e) {
LOG.warn("exception occur when trying to get rollup tablets info", e);
}
return tableCommitInfo.generateTPartitionVersionInfos();
}
private void addPublishVersionTask(Set<Long> publishBackends, long transactionId, TransactionState transactionState,
List<TPartitionVersionInfo> partitionVersionInfos, Map<Long, Set<Long>> beIdToBaseTabletIds,
long createPublishVersionTaskTime,
AgentBatchTask batchTask) {
for (Long backendId : publishBackends) {
PublishVersionTask task = new PublishVersionTask(backendId,
transactionId,
transactionState.getDbId(),
partitionVersionInfos,
createPublishVersionTaskTime);
task.setBaseTabletsIds(beIdToBaseTabletIds.getOrDefault(backendId, Collections.emptySet()));
// add to AgentTaskQueue for handling finish report.
// not check return value, because the add will success
AgentTaskQueue.addTask(task);
batchTask.addTask(task);
transactionState.addPublishVersionTask(backendId, task);
}
}
}