TimeBasedChangeVisibleWaiter.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.qe;

import org.apache.doris.analysis.TableScanParams;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.util.RelationUtil;
import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.tso.TSOTimestamp;

import com.google.common.annotations.VisibleForTesting;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TimeBasedChangeVisibleWaiter {
    private final ConnectContext context;
    private final Plan plan;
    private final Map<List<String>, TableIf> tables;
    private final long defaultEndTsMs;

    public static void waitForVisible(ConnectContext context, Plan plan, Map<List<String>, TableIf> tables)
            throws UserException {
        new TimeBasedChangeVisibleWaiter(context, plan, tables, System.currentTimeMillis()).waitInternal();
    }

    @VisibleForTesting
    TimeBasedChangeVisibleWaiter(ConnectContext context, Plan plan,
            Map<List<String>, TableIf> tables, long defaultEndTsMs) {
        this.context = context;
        this.plan = plan;
        this.tables = tables;
        this.defaultEndTsMs = defaultEndTsMs;
    }

    private void waitInternal() throws UserException {
        if (context.getSessionVariable().isEnableEventualConsistentChange() || tables.isEmpty()) {
            return;
        }
        waitForDbToTableEndTSO(collectDbToTableEndTSO());
    }

    @VisibleForTesting
    Map<Long, Map<Long, Long>> collectDbToTableEndTSO() {
        Map<Long, Map<Long, Long>> dbToTableEndTSO = new HashMap<>();
        for (UnboundRelation relation : plan.<UnboundRelation>collectToList(UnboundRelation.class::isInstance)) {
            TableScanParams scanParams = relation.getScanParams();
            if (scanParams == null || !scanParams.incrementalRead()) {
                continue;
            }
            TableIf table = tables.get(RelationUtil.getQualifierName(context, relation.getNameParts()));
            if (table instanceof OlapTable) {
                addTableEndTSO(dbToTableEndTSO, (OlapTable) table, getEndTsMs(scanParams));
            }
        }
        return dbToTableEndTSO;
    }

    private void waitForDbToTableEndTSO(Map<Long, Map<Long, Long>> dbToTableEndTSO) throws UserException {
        if (dbToTableEndTSO.isEmpty()) {
            return;
        }
        long deadlineMs = System.currentTimeMillis() + context.getSessionVariable().getChangeVisibleTimeoutMs();
        for (Map.Entry<Long, Map<Long, Long>> dbEntry : dbToTableEndTSO.entrySet()) {
            long dbId = dbEntry.getKey();
            Map<Long, Long> tableEndTSO = dbEntry.getValue();
            for (TransactionState txn : getCommittedTransactions(dbId)) {
                Pair<Long, Long> matchedTableEndTSO = findMatchedTableEndTSO(txn, tableEndTSO);
                if (matchedTableEndTSO != null) {
                    waitTransactionVisible(txn, dbId, matchedTableEndTSO.first, matchedTableEndTSO.second,
                            deadlineMs);
                }
            }
        }
    }

    private Pair<Long, Long> findMatchedTableEndTSO(TransactionState txn, Map<Long, Long> tableEndTSO) {
        long commitTSO = txn.getCommitTSO();
        if (txn.getTransactionStatus() != TransactionStatus.COMMITTED || commitTSO < 0) {
            return null;
        }
        for (Long tableId : txn.getTableIdList()) {
            Long endTSO = tableEndTSO.get(tableId);
            if (endTSO != null && commitTSO <= endTSO) {
                return Pair.of(tableId, endTSO);
            }
        }
        return null;
    }

    private List<TransactionState> getCommittedTransactions(long dbId) throws UserException {
        try {
            return Env.getCurrentGlobalTransactionMgr().getCommittedTransactions(dbId);
        } catch (Exception e) {
            throw new UserException("get committed transactions failed. dbId=" + dbId, e);
        }
    }

    private void waitTransactionVisible(TransactionState txn, long dbId, long tableId,
            long endTSO, long deadlineMs) throws UserException {
        long remainingMs = deadlineMs - System.currentTimeMillis();
        while (txn.getTransactionStatus() == TransactionStatus.COMMITTED && remainingMs > 0) {
            try {
                txn.waitTransactionVisible(remainingMs);
            } catch (InterruptedException ignored) {
                // Keep the previous wait behavior.
            }
            remainingMs = deadlineMs - System.currentTimeMillis();
        }
        if (txn.getTransactionStatus() == TransactionStatus.COMMITTED) {
            throw new UserException(String.format(
                    "timeout waiting transaction become visible for time-based read, "
                            + "txnId=%d dbId=%d tableId=%d endTSO=%d",
                    txn.getTransactionId(), dbId, tableId, endTSO));
        }
    }

    private long getEndTsMs(TableScanParams scanParams) {
        if (scanParams.getMapParams().containsKey(OlapScanNode.OLAP_END_TIMESTAMP)) {
            long endTsMs = OlapScanNode.parseChangeTimestamp(
                    scanParams.getMapParams().get(OlapScanNode.OLAP_END_TIMESTAMP));
            return endTsMs > 0 ? endTsMs : defaultEndTsMs;
        }
        return defaultEndTsMs;
    }

    private void addTableEndTSO(Map<Long, Map<Long, Long>> dbToTableEndTSO, OlapTable table, long endTsMs) {
        dbToTableEndTSO.computeIfAbsent(table.getDatabase().getId(), ignored -> new HashMap<>())
                .merge(table.getId(), TSOTimestamp.composeFullTimestamp(endTsMs), Math::max);
    }
}