TSOService.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.tso;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.journal.local.LocalJournal;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.persist.EditLog;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.concurrent.locks.ReentrantLock;
public class TSOService extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(TSOService.class);
// Global timestamp with physical time and logical counter
private final TSOTimestamp globalTimestamp = new TSOTimestamp();
// Lock for thread-safe access to global timestamp
private final ReentrantLock lock = new ReentrantLock();
// Guard value for time window updates (in milliseconds)
private static final long UPDATE_TIME_WINDOW_GUARD = 1;
/**
* Constructor initializes the TSO service with update interval
*/
public TSOService() {
super("TSO-service", Config.tso_service_update_interval_ms);
}
/**
* Start the TSO service and calibrate timestamp if needed
*/
@Override
public synchronized void start() {
super.start();
for (int i = 0; i < Config.max_update_tso_retry_count; i++) {
lock.lock();
boolean isInitialized = false;
try {
isInitialized = (globalTimestamp.getPhysicalTimestamp() != 0);
} finally {
lock.unlock();
}
if (!isInitialized) {
LOG.info("TSO service timestamp is not calibrated, start calibrate timestamp");
try {
calibrateTimestamp();
} catch (Exception e) {
LOG.warn("TSO service calibrate timestamp failed", e);
}
}
}
}
/**
* Periodically update timestamp after catalog is ready
* This method is called by the MasterDaemon framework
*/
@Override
protected void runAfterCatalogReady() {
if (!Config.experimental_enable_feature_tso) {
return;
}
boolean updated = false;
Throwable lastFailure = null;
for (int i = 0; i < Config.max_update_tso_retry_count; i++) {
lock.lock();
boolean isInitialized = false;
try {
isInitialized = (globalTimestamp.getPhysicalTimestamp() != 0);
} finally {
lock.unlock();
}
if (!isInitialized) {
LOG.info("TSO service timestamp is not calibrated, start calibrate timestamp");
try {
calibrateTimestamp();
return;
} catch (Exception e) {
lastFailure = e;
LOG.warn("TSO service calibrate timestamp failed", e);
}
}
try {
updateTimestamp();
updated = true;
break; // Update successful, exit the loop
} catch (Exception e) {
lastFailure = e;
LOG.warn("TSO service update timestamp failed, retry: {}", i, e);
MetricRepo.COUNTER_TSO_CLOCK_UPDATE_FAILED.increase(1L);
try {
sleep(Config.tso_service_update_interval_ms);
} catch (InterruptedException ie) {
LOG.warn("TSO service sleep interrupted", ie);
Thread.currentThread().interrupt();
}
}
}
if (updated) {
if (LOG.isDebugEnabled()) {
LOG.debug("TSO service updated timestamp");
}
} else if (lastFailure != null) {
LOG.warn("TSO service update timestamp failed after {} retries",
Config.max_update_tso_retry_count, lastFailure);
} else {
LOG.warn("TSO service update timestamp failed after {} retries", Config.max_update_tso_retry_count);
}
}
/**
* Generate a single TSO timestamp
*
* @return Composed TSO timestamp combining physical time and logical counter
* @throws RuntimeException if TSO is not calibrated or other errors occur
*/
public Long getTSO() {
int maxGetTSORetryCount = Math.max(1, Config.max_get_tso_retry_count);
for (int i = 0; i < maxGetTSORetryCount; i++) {
// Wait for environment to be ready and ensure we're running on master FE
Env env = Env.getCurrentEnv();
if (env == null || !env.isReady()) {
LOG.warn("TSO service wait for catalog ready");
try {
sleep(200);
} catch (InterruptedException ie) {
LOG.warn("TSO service sleep interrupted", ie);
Thread.currentThread().interrupt();
}
continue;
} else if (!env.isMaster()) {
LOG.warn("TSO service only run on master FE");
try {
sleep(200);
} catch (InterruptedException ie) {
LOG.warn("TSO service sleep interrupted", ie);
Thread.currentThread().interrupt();
}
continue;
}
Pair<Long, Long> pair = generateTSO();
long physical = pair.first;
long logical = pair.second;
if (physical == 0) {
throw new RuntimeException("TSO timestamp is not calibrated, please check");
}
// Check for logical counter overflow
if (logical >= TSOTimestamp.MAX_LOGICAL_COUNTER) {
LOG.warn("TSO timestamp logical counter overflow, please check");
try {
sleep(Config.tso_service_update_interval_ms);
} catch (InterruptedException ie) {
LOG.warn("TSO service sleep interrupted", ie);
Thread.currentThread().interrupt();
}
continue;
}
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TSO_CLOCK_GET_SUCCESS.increase(1L);
}
return TSOTimestamp.composeTimestamp(physical, logical);
}
return -1L;
}
/**
* Get the current composed TSO timestamp
*
* @return Current TSO timestamp combining physical time and logical counter
*/
public long getCurrentTSO() {
lock.lock();
try {
return globalTimestamp.composeTimestamp();
} finally {
lock.unlock();
}
}
/**
* Calibrate the TSO timestamp when service starts
* This ensures the timestamp is consistent with the last persisted value
*
* Algorithm:
* - If Tnow - Tlast < 1ms, then Tnext = Tlast + 1
* - Otherwise Tnext = Tnow
*/
private void calibrateTimestamp() {
// Check if Env is ready before calibration
Env env = Env.getCurrentEnv();
if (env == null || !env.isReady() || !env.isMaster()) {
LOG.warn("Env is not ready or not master, skip TSO timestamp calibration");
return;
}
long timeLast = env.getWindowEndTSO(); // Last timestamp from EditLog replay
long timeNow = System.currentTimeMillis() + Config.tso_time_offset_debug_mode;
// Calculate next physical time to ensure monotonicity
long nextPhysicalTime;
if (timeNow - timeLast < 1) {
nextPhysicalTime = timeLast + 1;
} else {
nextPhysicalTime = timeNow;
}
// Construct new timestamp (physical time with reset logical counter)
setTSOPhysical(nextPhysicalTime, true);
// Write the right boundary of time window to BDBJE for persistence
long timeWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms;
env.setWindowEndTSO(timeWindowEnd);
writeTimestampToBDBJE(timeWindowEnd);
LOG.info("TSO timestamp calibrated: lastTimestamp={}, currentMillis={}, nextPhysicalTime={}, timeWindowEnd={}",
timeLast, timeNow, nextPhysicalTime, timeWindowEnd);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TSO_CLOCK_CALCULATED.increase(1L);
}
}
/**
* Update timestamp periodically to maintain time window
* This method handles various time-related issues:
* 1. Clock drift detection
* 2. Clock backward detection
* 3. Logical counter overflow handling
* 4. Time window renewal
*/
private void updateTimestamp() {
// Check if Env is ready
Env env = Env.getCurrentEnv();
if (env == null || !env.isReady() || !env.isMaster()) {
LOG.warn("Env is not ready or not master, skip TSO timestamp update");
return;
}
// 1. Check if TSO has been calibrated
long currentTime = System.currentTimeMillis() + Config.tso_time_offset_debug_mode;
long prevPhysicalTime = 0;
long prevLogicalCounter = 0;
lock.lock();
try {
prevPhysicalTime = globalTimestamp.getPhysicalTimestamp();
prevLogicalCounter = globalTimestamp.getLogicalCounter();
} finally {
lock.unlock();
}
if (prevPhysicalTime == 0) {
LOG.error("TSO timestamp is not calibrated, please check");
}
// 2. Check for serious clock issues
long timeLag = currentTime - prevPhysicalTime;
if (timeLag >= 3 * Config.tso_service_update_interval_ms) {
// Clock drift (time difference too large), log clearly and trigger corresponding metric
LOG.warn("TSO clock drift detected, lastPhysicalTime={}, currentTime={}, "
+ "timeLag={} (exceeds 3 * update interval {})",
prevPhysicalTime, currentTime, timeLag, 3 * Config.tso_service_update_interval_ms);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TSO_CLOCK_DRIFT_DETECTED.increase(1L);
}
} else if (timeLag < 0) {
// Clock backward (current time earlier than last recorded time)
// log clearly and trigger corresponding metric
LOG.warn("TSO clock backward detected, lastPhysicalTime={}, currentTime={}, "
+ "timeLag={} (current time is earlier than last physical time)",
prevPhysicalTime, currentTime, timeLag);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TSO_CLOCK_BACKWARD_DETECTED.increase(1L);
}
}
// 3. Update time based on conditions
long nextPhysicalTime = prevPhysicalTime;
if (timeLag > UPDATE_TIME_WINDOW_GUARD) {
// Align physical time to current time
nextPhysicalTime = currentTime;
} else if (prevLogicalCounter > TSOTimestamp.MAX_LOGICAL_COUNTER / 2) {
// Logical counter nearly full → advance to next millisecond
nextPhysicalTime = prevPhysicalTime + 1;
} else {
// Logical counter not nearly full → just increment logical counter
// do nothing
}
// 4. Check if time window right boundary needs renewal
if ((env.getWindowEndTSO() - nextPhysicalTime) <= UPDATE_TIME_WINDOW_GUARD) {
// Time window right boundary needs renewal
long nextWindowEnd = nextPhysicalTime + Config.tso_service_window_duration_ms;
env.setWindowEndTSO(nextWindowEnd);
writeTimestampToBDBJE(nextWindowEnd);
}
// 5. Update global timestamp
setTSOPhysical(nextPhysicalTime, false);
if (MetricRepo.isInit) {
MetricRepo.COUNTER_TSO_CLOCK_UPDATED.increase(1L);
}
}
/**
* Write the right boundary of TSO time window to BDBJE for persistence
*
* @param timestamp The timestamp to write
*/
private void writeTimestampToBDBJE(long timestamp) {
try {
// Check if Env is ready
Env env = Env.getCurrentEnv();
if (env == null) {
LOG.warn("Env is null, skip writing TSO timestamp to BDBJE");
return;
}
// Check if Env is ready and is master
if (!env.isReady()) {
LOG.warn("Env is not ready, skip writing TSO timestamp to BDBJE");
return;
}
if (!env.isMaster()) {
LOG.warn("Current node is not master, skip writing TSO timestamp to BDBJE");
return;
}
TSOTimestamp tsoTimestamp = new TSOTimestamp(timestamp, 0);
// Check if EditLog is available
EditLog editLog = env.getEditLog();
if (editLog == null) {
LOG.warn("EditLog is null, skip writing TSO timestamp to BDBJE");
return;
}
// Additional check to ensure EditLog's journal is properly initialized
if (editLog.getJournal() == null) {
LOG.warn("EditLog's journal is null, skip writing TSO timestamp to BDBJE");
return;
}
if (editLog.getJournal() instanceof LocalJournal) {
if (!((LocalJournal) editLog.getJournal()).isReadyToFlush()) {
LOG.warn("EditLog's journal is not ready to flush, skip writing TSO timestamp to BDBJE");
return;
}
}
if (Config.enable_tso_persist_journal) {
editLog.logTSOTimestampWindowEnd(tsoTimestamp);
} else {
LOG.debug("TSO timestamp {} is not persisted to journal, "
+ "please check if enable_tso_persist_journal is set to true",
tsoTimestamp);
}
} catch (Exception e) {
LOG.error("Failed to write TSO timestamp to BDBJE", e);
}
}
/**
* Generate a single TSO timestamp by incrementing the logical counter
*
* @return Pair of (physicalTime, updatedLogicalCounter) for the base timestamp
*/
private Pair<Long, Long> generateTSO() {
lock.lock();
try {
long physicalTime = globalTimestamp.getPhysicalTimestamp();
if (physicalTime == 0) {
return Pair.of(0L, 0L);
}
long logicalCounter = globalTimestamp.getLogicalCounter();
globalTimestamp.setLogicalCounter(logicalCounter + 1);
logicalCounter = globalTimestamp.getLogicalCounter();
return Pair.of(physicalTime, logicalCounter);
} finally {
lock.unlock();
}
}
/**
* Set the physical time component of the global timestamp
*
* @param next New physical time value
* @param force Whether to force update even if physical time is zero
*/
private void setTSOPhysical(long next, boolean force) {
lock.lock();
try {
// Do not update the zero physical time if the `force` flag is false.
if (!force && globalTimestamp.getPhysicalTimestamp() == 0) {
return;
}
if (next - globalTimestamp.getPhysicalTimestamp() > 0) {
globalTimestamp.setPhysicalTimestamp(next);
globalTimestamp.setLogicalCounter(0L);
}
} finally {
lock.unlock();
}
}
/**
* Replay handler for TSO window end timestamp from edit log.
* This method updates the Env's window end TSO value.
* It is safe to call during checkpoint replay when TSOService may not be initialized.
*
* @param windowEnd New window end physical time
*/
public void replayWindowEndTSO(TSOTimestamp windowEnd) {
Env env = Env.getCurrentEnv();
if (env == null) {
return;
}
env.setWindowEndTSO(windowEnd.getPhysicalTimestamp());
}
}