TimeBasedChangeVisibleWaiter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.tso.TSOTimestamp;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TimeBasedChangeVisibleWaiter {
private final ConnectContext context;
private final Plan plan;
private final Map<List<String>, TableIf> tables;
private final long defaultEndTsMs;
public static void waitForVisible(ConnectContext context, Plan plan, Map<List<String>, TableIf> tables)
throws UserException {
new TimeBasedChangeVisibleWaiter(context, plan, tables, System.currentTimeMillis()).waitInternal();
}
@VisibleForTesting
TimeBasedChangeVisibleWaiter(ConnectContext context, Plan plan,
Map<List<String>, TableIf> tables, long defaultEndTsMs) {
this.context = context;
this.plan = plan;
this.tables = tables;
this.defaultEndTsMs = defaultEndTsMs;
}
private void waitInternal() throws UserException {
if (context.getSessionVariable().isEnableEventualConsistentChange() || tables.isEmpty()) {
return;
}
waitForDbToTableEndTSO(collectDbToTableEndTSO());
}
@VisibleForTesting
Map<Long, Map<Long, Long>> collectDbToTableEndTSO() {
Map<Long, Map<Long, Long>> dbToTableEndTSO = new HashMap<>();
for (UnboundRelation relation : plan.<UnboundRelation>collectToList(UnboundRelation.class::isInstance)) {
TableScanParams scanParams = relation.getScanParams();
if (scanParams == null || !scanParams.incrementalRead()) {
continue;
}
TableIf table = tables.get(RelationUtil.getQualifierName(context, relation.getNameParts()));
if (table instanceof OlapTable) {
addTableEndTSO(dbToTableEndTSO, (OlapTable) table, getEndTsMs(scanParams));
}
}
return dbToTableEndTSO;
}
private void waitForDbToTableEndTSO(Map<Long, Map<Long, Long>> dbToTableEndTSO) throws UserException {
if (dbToTableEndTSO.isEmpty()) {
return;
}
long deadlineMs = System.currentTimeMillis() + context.getSessionVariable().getChangeVisibleTimeoutMs();
for (Map.Entry<Long, Map<Long, Long>> dbEntry : dbToTableEndTSO.entrySet()) {
long dbId = dbEntry.getKey();
Map<Long, Long> tableEndTSO = dbEntry.getValue();
for (TransactionState txn : getCommittedTransactions(dbId)) {
Pair<Long, Long> matchedTableEndTSO = findMatchedTableEndTSO(txn, tableEndTSO);
if (matchedTableEndTSO != null) {
waitTransactionVisible(txn, dbId, matchedTableEndTSO.first, matchedTableEndTSO.second,
deadlineMs);
}
}
}
}
private Pair<Long, Long> findMatchedTableEndTSO(TransactionState txn, Map<Long, Long> tableEndTSO) {
long commitTSO = txn.getCommitTSO();
if (txn.getTransactionStatus() != TransactionStatus.COMMITTED || commitTSO < 0) {
return null;
}
for (Long tableId : txn.getTableIdList()) {
Long endTSO = tableEndTSO.get(tableId);
if (endTSO != null && commitTSO <= endTSO) {
return Pair.of(tableId, endTSO);
}
}
return null;
}
private List<TransactionState> getCommittedTransactions(long dbId) throws UserException {
try {
return Env.getCurrentGlobalTransactionMgr().getCommittedTransactions(dbId);
} catch (Exception e) {
throw new UserException("get committed transactions failed. dbId=" + dbId, e);
}
}
private void waitTransactionVisible(TransactionState txn, long dbId, long tableId,
long endTSO, long deadlineMs) throws UserException {
long remainingMs = deadlineMs - System.currentTimeMillis();
while (txn.getTransactionStatus() == TransactionStatus.COMMITTED && remainingMs > 0) {
try {
txn.waitTransactionVisible(remainingMs);
} catch (InterruptedException ignored) {
// Keep the previous wait behavior.
}
remainingMs = deadlineMs - System.currentTimeMillis();
}
if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
throw new UserException(String.format(
"timeout waiting transaction become visible for time-based read, "
+ "txnId=%d dbId=%d tableId=%d endTSO=%d",
txn.getTransactionId(), dbId, tableId, endTSO));
}
}
private long getEndTsMs(TableScanParams scanParams) {
if (scanParams.getMapParams().containsKey(OlapScanNode.OLAP_END_TIMESTAMP)) {
long endTsMs = OlapScanNode.parseChangeTimestamp(
scanParams.getMapParams().get(OlapScanNode.OLAP_END_TIMESTAMP));
return endTsMs > 0 ? endTsMs : defaultEndTsMs;
}
return defaultEndTsMs;
}
private void addTableEndTSO(Map<Long, Map<Long, Long>> dbToTableEndTSO, OlapTable table, long endTsMs) {
dbToTableEndTSO.computeIfAbsent(table.getDatabase().getId(), ignored -> new HashMap<>())
.merge(table.getId(), TSOTimestamp.composeFullTimestamp(endTsMs), Math::max);
}
}