TimeBasedChangeVisibleWaiter.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.tso.TSOTimestamp;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Before executing a time-based incremental read, block until every transaction that committed at
* or before the requested read timestamp of the target tables becomes visible. This guarantees the
* read sees a complete set of changes up to that time point.
*
* <p>Skipped entirely when the session enables eventual-consistent change reads, or when no table
* is involved. Waiting is bounded by session variable {@code change_visible_timeout_ms}; timing out
* raises a {@link UserException}.
*/
public class TimeBasedChangeVisibleWaiter {
private final ConnectContext context;
public static void waitForVisible(ConnectContext context, Plan plan, Map<List<String>, TableIf> tables)
throws UserException {
if (context.getSessionVariable().isEnableEventualConsistentChange() || tables.isEmpty()) {
return;
}
Map<Long, Map<Long, Long>> dbToTableEndTSO = collectDbToTableEndTSO(
context, plan, tables, System.currentTimeMillis());
new TimeBasedChangeVisibleWaiter(context).waitForDbToTableEndTSO(dbToTableEndTSO);
}
private TimeBasedChangeVisibleWaiter(ConnectContext context) {
this.context = context;
}
/**
* Walk the plan, pick out relations doing an incremental read, and for each OlapTable record the
* read end timestamp (converted to a full TSO) aggregated as dbId -> (tableId -> max endTSO).
*/
@VisibleForTesting
static Map<Long, Map<Long, Long>> collectDbToTableEndTSO(ConnectContext context, Plan plan,
Map<List<String>, TableIf> tables, long defaultEndTsMs) {
Map<Long, Map<Long, Long>> dbToTableEndTSO = new HashMap<>();
plan.foreach(node -> {
if (!(node instanceof UnboundRelation)) {
return;
}
UnboundRelation relation = (UnboundRelation) node;
TableScanParams scanParams = relation.getScanParams();
if (scanParams == null || !scanParams.incrementalRead()) {
return;
}
TableIf table = tables.get(RelationUtil.getQualifierName(context, relation.getNameParts()));
if (table instanceof OlapTable) {
addTableEndTSO(dbToTableEndTSO, (OlapTable) table, getEndTsMs(scanParams, defaultEndTsMs));
}
});
return dbToTableEndTSO;
}
/**
* For each db, scan its committed-but-not-visible transactions; whenever a transaction's commit
* TSO falls within a target table's endTSO, wait for that transaction to become visible.
*/
private void waitForDbToTableEndTSO(Map<Long, Map<Long, Long>> dbToTableEndTSO) throws UserException {
if (dbToTableEndTSO.isEmpty()) {
return;
}
long deadlineMs = System.currentTimeMillis() + context.getSessionVariable().getChangeVisibleTimeoutMs();
for (Map.Entry<Long, Map<Long, Long>> dbEntry : dbToTableEndTSO.entrySet()) {
long dbId = dbEntry.getKey();
Map<Long, Long> tableEndTSO = dbEntry.getValue();
for (TransactionState txn : getCommittedTransactions(dbId)) {
Pair<Long, Long> matchedTableEndTSO = findMatchedTableEndTSO(txn, tableEndTSO);
if (matchedTableEndTSO != null) {
waitTransactionVisible(txn, dbId, matchedTableEndTSO.first, matchedTableEndTSO.second,
deadlineMs);
}
}
}
}
/**
* Return (tableId, endTSO) if the transaction is COMMITTED and its commit TSO is within the
* requested endTSO of one of its tables; otherwise null (no need to wait).
*/
private Pair<Long, Long> findMatchedTableEndTSO(TransactionState txn, Map<Long, Long> tableEndTSO) {
long commitTSO = txn.getCommitTSO();
if (txn.getTransactionStatus() != TransactionStatus.COMMITTED || commitTSO < 0) {
return null;
}
for (Long tableId : txn.getTableIdList()) {
Long endTSO = tableEndTSO.get(tableId);
if (endTSO != null && commitTSO <= endTSO) {
return Pair.of(tableId, endTSO);
}
}
return null;
}
private List<TransactionState> getCommittedTransactions(long dbId) throws UserException {
try {
return Env.getCurrentGlobalTransactionMgr().getCommittedTransactions(dbId);
} catch (Exception e) {
throw new UserException("get committed transactions failed. dbId=" + dbId, e);
}
}
/**
* Poll-wait until the transaction leaves COMMITTED (becomes visible) or the deadline passes;
* throw if it is still COMMITTED at timeout.
*/
private void waitTransactionVisible(TransactionState txn, long dbId, long tableId,
long endTSO, long deadlineMs) throws UserException {
long remainingMs = deadlineMs - System.currentTimeMillis();
while (txn.getTransactionStatus() == TransactionStatus.COMMITTED && remainingMs > 0) {
try {
txn.waitTransactionVisible(remainingMs);
} catch (InterruptedException ignored) {
// Keep the previous wait behavior.
}
remainingMs = deadlineMs - System.currentTimeMillis();
}
if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
throw new UserException(String.format(
"timeout waiting transaction become visible for time-based read, "
+ "txnId=%d dbId=%d tableId=%d endTSO=%d",
txn.getTransactionId(), dbId, tableId, endTSO));
}
}
// Resolve the read end timestamp (ms) from scan params; fall back to defaultEndTsMs (the query
// start time) when absent or non-positive.
private static long getEndTsMs(TableScanParams scanParams, long defaultEndTsMs) {
if (scanParams.getMapParams().containsKey(OlapScanNode.OLAP_END_TIMESTAMP)) {
long endTsMs = OlapScanNode.parseChangeTimestamp(
scanParams.getMapParams().get(OlapScanNode.OLAP_END_TIMESTAMP));
return endTsMs > 0 ? endTsMs : defaultEndTsMs;
}
return defaultEndTsMs;
}
// Compose endTsMs into a full TSO and merge into dbId -> (tableId -> endTSO), keeping the max.
private static void addTableEndTSO(Map<Long, Map<Long, Long>> dbToTableEndTSO, OlapTable table, long endTsMs) {
dbToTableEndTSO.computeIfAbsent(table.getDatabase().getId(), ignored -> new HashMap<>())
.merge(table.getId(), TSOTimestamp.composeFullTimestamp(endTsMs), Math::max);
}
}