JdbcSourceOffsetProvider.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.offset.jdbc;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.CompareOffsetRequest;
import org.apache.doris.job.cdc.request.FetchTableSplitsRequest;
import org.apache.doris.job.cdc.request.JobBaseConfig;
import org.apache.doris.job.cdc.split.AbstractSourceSplit;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
@Getter
@Setter
@Log4j2
public class JdbcSourceOffsetProvider implements SourceOffsetProvider {
public static final String SPLIT_ID = "splitId";
private static final ObjectMapper objectMapper = new ObjectMapper();
protected int snapshotParallelism = 1;
protected Long jobId;
protected DataSourceType sourceType;
protected Map<String, String> sourceProperties = new HashMap<>();
List<SnapshotSplit> remainingSplits = new ArrayList<>();
List<SnapshotSplit> finishedSplits = new ArrayList<>();
JdbcOffset currentOffset;
Map<String, String> endBinlogOffset;
@SerializedName("chw")
// tableID -> splitId -> chunk of highWatermark
Map<String, Map<String, Map<String, String>>> chunkHighWatermarkMap;
@SerializedName("bop")
Map<String, String> binlogOffsetPersist;
@SerializedName("ts")
String tableSchemas;
/** Split progress (task-commit view). */
@SerializedName("csp")
SplitProgress committedSplitProgress;
volatile boolean hasMoreData = true;
transient volatile String cloudCluster;
/** Split progress (cdc-fetch view), >= committedSplitProgress. Rebuilt on restart. */
transient SplitProgress cdcSplitProgress = new SplitProgress();
/** Cache of Job.syncTables, set by initSplitProgress / replayIfNeed. */
transient List<String> cachedSyncTables;
/** Guards cdcSplitProgress/committedSplitProgress/remainingSplits/finishedSplits. */
protected final transient Object splitsLock = new Object();
/**
* No-arg constructor for subclass use.
*/
public JdbcSourceOffsetProvider() {
this.chunkHighWatermarkMap = new HashMap<>();
}
/**
* Constructor for FROM Source TO Database.
*/
public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, Map<String, String> sourceProperties) {
this.jobId = jobId;
this.sourceType = sourceType;
this.sourceProperties = sourceProperties;
this.chunkHighWatermarkMap = new HashMap<>();
this.snapshotParallelism = Integer.parseInt(
sourceProperties.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
}
// Refresh fields that may be changed via ALTER JOB; called before each use.
@Override
public void ensureInitialized(Long jobId, Map<String, String> newProps) throws JobException {
this.sourceProperties = newProps;
this.snapshotParallelism = Integer.parseInt(
newProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
}
@Override
public String getSourceType() {
return "jdbc";
}
@Override
public Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> properties) {
synchronized (splitsLock) {
JdbcOffset nextOffset = new JdbcOffset();
if (!remainingSplits.isEmpty()) {
int splitsNum = Math.min(remainingSplits.size(), snapshotParallelism);
List<SnapshotSplit> snapshotSplits = new ArrayList<>(remainingSplits.subList(0, splitsNum));
nextOffset.setSplits(snapshotSplits);
return nextOffset;
} else if (currentOffset != null && currentOffset.snapshotSplit() && noMoreSplits()) {
// initial mode: snapshot to binlog. noMoreSplits() guards against switching while
// splitting is still in progress (remainingSplits empty doesn't mean fully cut).
// snapshot-only mode is intercepted by hasReachedEnd() before reaching here.
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(new ArrayList<>(finishedSplits));
nextOffset.setSplits(Collections.singletonList(binlogSplit));
return nextOffset;
} else {
// only binlog
return currentOffset == null
? new JdbcOffset(Collections.singletonList(new BinlogSplit())) : currentOffset;
}
}
}
@Override
public String getShowCurrentOffset() {
if (this.currentOffset != null) {
if (currentOffset.snapshotSplit()) {
List<? extends AbstractSourceSplit> splits = currentOffset.getSplits();
return new Gson().toJson(splits);
} else {
BinlogSplit binlogSplit = (BinlogSplit) currentOffset.getSplits().get(0);
HashMap<String, Object> showMap = new HashMap<>();
showMap.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
if (binlogSplit.getStartingOffset() != null) {
showMap.putAll(binlogSplit.getStartingOffset());
}
return new Gson().toJson(showMap);
}
}
return null;
}
@Override
public String getShowMaxOffset() {
if (endBinlogOffset != null) {
return new Gson().toJson(endBinlogOffset);
}
return null;
}
/**
* Should never call this.
*/
@Override
public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand, Offset nextOffset,
long taskId) {
throw new UnsupportedOperationException("rewriteTvfParams not supported for " + getClass().getSimpleName());
}
@Override
public void updateOffset(Offset offset) {
this.currentOffset = (JdbcOffset) offset;
if (currentOffset.snapshotSplit()) {
synchronized (splitsLock) {
List<? extends AbstractSourceSplit> splits = currentOffset.getSplits();
for (AbstractSourceSplit split : splits) {
SnapshotSplit snapshotSplit = (SnapshotSplit) split;
String splitId = split.getSplitId();
boolean remove = remainingSplits.removeIf(v -> {
if (v.getSplitId().equals(splitId)) {
snapshotSplit.setTableId(v.getTableId());
snapshotSplit.setSplitKey(v.getSplitKey());
snapshotSplit.setSplitStart(v.getSplitStart());
snapshotSplit.setSplitEnd(v.getSplitEnd());
return true;
}
return false;
});
if (remove) {
finishedSplits.add(snapshotSplit);
chunkHighWatermarkMap.computeIfAbsent(snapshotSplit.getTableId(), k -> new HashMap<>())
.put(snapshotSplit.getSplitId(), snapshotSplit.getHighWatermark());
// Advance committedSplitProgress to this committed chunk.
if (committedSplitProgress != null) {
applySplitToProgress(committedSplitProgress, snapshotSplit,
getTableName(snapshotSplit.getTableId()));
}
} else {
// Replay before remainingSplits is restored, or a duplicate commit.
log.warn("Cannot find snapshot split {} in remainingSplits for job {}", splitId, getJobId());
}
}
}
} else {
BinlogSplit binlogSplit = (BinlogSplit) currentOffset.getSplits().get(0);
binlogOffsetPersist = new HashMap<>(binlogSplit.getStartingOffset());
binlogOffsetPersist.put(SPLIT_ID, BinlogSplit.BINLOG_SPLIT_ID);
}
}
@Override
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchEndOffset")
.setParams(new Gson().toJson(requestParams)).build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get end offset from backend, {}", result.getStatus().getErrorMsgs(0));
throw new JobException(
"Failed to get end offset from backend," + result.getStatus().getErrorMsgs(0) + ", response: "
+ result.getResponse());
}
String response = result.getResponse();
try {
ResponseBody<Map<String, String>> responseObj = objectMapper.readValue(
response,
new TypeReference<ResponseBody<Map<String, String>>>() {
}
);
Map<String, String> newEndOffset = responseObj.getData();
// null→value also counts as a change: upstream may have advanced while fetch was blocked.
if (endBinlogOffset == null || !endBinlogOffset.equals(newEndOffset)) {
hasMoreData = true;
}
endBinlogOffset = newEndOffset;
} catch (JsonProcessingException e) {
log.warn("Failed to parse end offset response: {}", response);
throw new JobException(response);
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/fetchEndOffset jobId={} backend={}:{} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/fetchEndOffset jobId=" + getJobId());
} catch (ExecutionException | InterruptedException ex) {
log.warn("Get end offset error: ", ex);
throw new JobException(ex);
}
}
@Override
public boolean hasMoreDataToConsume() {
if (currentOffset == null) {
return true;
}
synchronized (splitsLock) {
if (currentOffset.snapshotSplit()) {
if (!remainingSplits.isEmpty()) {
return true;
}
// Splitting still in progress: wait for next tick.
if (!noMoreSplits()) {
return false;
}
// Splitting done: snapshot-only completes; initial mode falls through to binlog.
return !isSnapshotOnlyMode();
}
if (!hasMoreData) {
return false;
}
if (CollectionUtils.isNotEmpty(remainingSplits)) {
return true;
}
}
if (MapUtils.isEmpty(endBinlogOffset)) {
return false;
}
try {
if (!currentOffset.snapshotSplit()) {
BinlogSplit binlogSplit = (BinlogSplit) currentOffset.getSplits().get(0);
if (MapUtils.isEmpty(binlogSplit.getStartingOffset())) {
// snapshot to binlog phase
return true;
}
hasMoreData = compareOffset(endBinlogOffset, new HashMap<>(binlogSplit.getStartingOffset()));
return hasMoreData;
} else {
// snapshot means has data to consume
return true;
}
} catch (Exception ex) {
log.info("Compare offset error: ", ex);
return false;
}
}
private boolean compareOffset(Map<String, String> offsetFirst, Map<String, String> offsetSecond)
throws JobException {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
CompareOffsetRequest requestParams =
new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties,
getFrontendAddress(), offsetFirst, offsetSecond);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/compareOffset")
.setParams(new Gson().toJson(requestParams)).build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to compare offset , {}", result.getStatus().getErrorMsgs(0));
throw new JobException(
"Failed to compare offset ," + result.getStatus().getErrorMsgs(0) + ", response: "
+ result.getResponse());
}
String response = result.getResponse();
try {
ResponseBody<Integer> responseObj = objectMapper.readValue(
response,
new TypeReference<ResponseBody<Integer>>() {
}
);
return responseObj.getData() > 0;
} catch (JsonProcessingException e) {
log.warn("Failed to parse compare offset response: {}", response);
throw new JobException("Failed to parse compare offset response: " + response);
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/compareOffset jobId={} backend={}:{} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/compareOffset jobId=" + getJobId());
} catch (ExecutionException | InterruptedException ex) {
log.warn("Compare offset error: ", ex);
throw new JobException(ex);
}
}
@Override
public Offset deserializeOffset(String offset) {
try {
// chunk is highWatermark, binlog is offset map
List<Map<String, String>> offsetMeta =
objectMapper.readValue(offset, new TypeReference<List<Map<String, String>>>() {});
List<SnapshotSplit> snapshotSplits = new ArrayList<>(offsetMeta.size());
for (Map<String, String> ot : offsetMeta) {
String splitId = ot.remove(SPLIT_ID);
if (BinlogSplit.BINLOG_SPLIT_ID.equals(splitId)) {
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setSplitId(splitId);
binlogSplit.setStartingOffset(ot);
return new JdbcOffset(Collections.singletonList(binlogSplit));
}
SnapshotSplit snapshotSplit = new SnapshotSplit();
snapshotSplit.setSplitId(splitId);
snapshotSplit.setHighWatermark(ot);
snapshotSplits.add(snapshotSplit);
}
return new JdbcOffset(snapshotSplits);
} catch (JsonProcessingException e) {
log.warn("Failed to deserialize offset: {}", offset, e);
throw new RuntimeException(e);
}
}
@Override
public Offset deserializeOffsetProperty(String offset) {
if (offset == null || offset.trim().isEmpty()) {
return null;
}
// JSON format: {"file":"binlog.000003","pos":154} or {"lsn":"123456"}
if (DataSourceConfigValidator.isJsonOffset(offset)) {
try {
Map<String, String> offsetMap = objectMapper.readValue(offset,
new TypeReference<Map<String, String>>() {});
return new JdbcOffset(Collections.singletonList(new BinlogSplit(offsetMap)));
} catch (Exception e) {
log.warn("Failed to parse JSON offset: {}", offset, e);
return null;
}
}
return null;
}
@Override
public void validateAlterOffset(String offset) throws Exception {
if (!DataSourceConfigValidator.isJsonOffset(offset)) {
throw new AnalysisException(
"ALTER JOB for CDC only supports JSON specific offset, "
+ "e.g. '{\"file\":\"binlog.000001\",\"pos\":\"154\"}' for MySQL "
+ "or '{\"lsn\":\"12345678\"}' for PostgreSQL");
}
}
/**
* Replay snapshot splits if needed
*/
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
synchronized (splitsLock) {
this.cachedSyncTables = job.getSyncTables();
}
String offsetProviderPersist = job.getOffsetProviderPersist();
if (offsetProviderPersist != null) {
JdbcSourceOffsetProvider replayFromPersist = GsonUtils.GSON.fromJson(offsetProviderPersist,
JdbcSourceOffsetProvider.class);
this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist();
this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap();
this.tableSchemas = replayFromPersist.getTableSchemas();
synchronized (splitsLock) {
this.committedSplitProgress = replayFromPersist.getCommittedSplitProgress() != null
? replayFromPersist.getCommittedSplitProgress() : new SplitProgress();
this.cdcSplitProgress = this.committedSplitProgress.copy();
}
log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}",
getJobId(),
binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(),
chunkHighWatermarkMap == null ? 0 : chunkHighWatermarkMap.size());
if (MapUtils.isNotEmpty(binlogOffsetPersist)) {
currentOffset = new JdbcOffset();
currentOffset.setSplits(Collections.singletonList(new BinlogSplit(binlogOffsetPersist)));
} else {
Map<String, List<SnapshotSplit>> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId());
if (MapUtils.isNotEmpty(chunkHighWatermarkMap) && MapUtils.isNotEmpty(snapshotSplits)) {
List<SnapshotSplit> lastSnapshotSplits =
recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits);
if (this.remainingSplits.isEmpty()) {
if (!lastSnapshotSplits.isEmpty()) {
currentOffset = new JdbcOffset();
currentOffset.setSplits(lastSnapshotSplits);
} else if (!isSnapshotOnlyMode()) {
// initial mode: rebuild binlog split for snapshot-to-binlog transition
currentOffset = new JdbcOffset();
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
currentOffset.setSplits(Collections.singletonList(binlogSplit));
} else {
// snapshot-only completed: leave currentOffset as null,
// hasReachedEnd() detects completion via finishedSplits
log.info("Replaying offset provider for job {}: snapshot-only mode completed,"
+ " finishedSplits={}, skip currentOffset restoration",
getJobId(), finishedSplits.size());
}
}
}
}
} else if (checkNeedSplitChunks(sourceProperties)
&& CollectionUtils.isEmpty(remainingSplits)
&& CollectionUtils.isEmpty(finishedSplits)
&& MapUtils.isEmpty(chunkHighWatermarkMap)
&& MapUtils.isEmpty(binlogOffsetPersist)) {
// After the Job is created for the first time, starting from the initial offset,
// the task for the first split is scheduled, When the task status is running or failed,
// If FE restarts, the split needs to be restore from the meta again.
log.info("Replaying offset provider for job {}, offsetProviderPersist is empty", getJobId());
Map<String, List<SnapshotSplit>> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId());
recalculateRemainingSplits(new HashMap<>(), snapshotSplits);
} else {
log.info("No need to replay offset provider for job {}", getJobId());
}
// Resume cdcSplitProgress from the at-most-one table cut to mid so the next
// advanceSplits() RPC won't re-cut already-fetched splits.
synchronized (splitsLock) {
if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
return;
}
SnapshotSplit mid = findResumeMidSplit(cachedSyncTables, finishedSplits, remainingSplits);
if (mid != null) {
applySplitToProgress(cdcSplitProgress, mid, getTableName(mid.getTableId()));
} else {
clearProgress(cdcSplitProgress);
}
log.info("Replay summary for job {}: finishedSplits={}, remainingSplits={}, "
+ "committedSplitProgress=(table={}, nextStart={}, nextSplitId={}), "
+ "cdcSplitProgress=(table={}, nextStart={}, nextSplitId={})",
getJobId(), finishedSplits.size(), remainingSplits.size(),
committedSplitProgress == null ? null : committedSplitProgress.getCurrentSplittingTable(),
committedSplitProgress == null ? null : Arrays.toString(committedSplitProgress.getNextSplitStart()),
committedSplitProgress == null ? null : committedSplitProgress.getNextSplitId(),
cdcSplitProgress.getCurrentSplittingTable(),
Arrays.toString(cdcSplitProgress.getNextSplitStart()),
cdcSplitProgress.getNextSplitId());
}
}
/**
* Find the at-most-one table cut to mid (its largest-id split has non-null splitEnd).
* Returns null when every table in {@code syncTables} is either untouched or fully cut.
*/
static SnapshotSplit findResumeMidSplit(List<String> syncTables,
List<SnapshotSplit> finishedSplits,
List<SnapshotSplit> remainingSplits) {
Map<String, SnapshotSplit> lastPerTable = new HashMap<>();
pickLastById(finishedSplits, lastPerTable);
pickLastById(remainingSplits, lastPerTable);
for (String tbl : syncTables) {
SnapshotSplit last = lastPerTable.get(tbl);
if (last != null && last.getSplitEnd() != null && last.getSplitEnd().length > 0) {
return last;
}
}
return null;
}
private static void pickLastById(List<SnapshotSplit> splits, Map<String, SnapshotSplit> out) {
for (SnapshotSplit s : splits) {
SnapshotSplit prev = out.get(s.getTableId());
if (prev == null || splitIdOf(s.getSplitId()) > splitIdOf(prev.getSplitId())) {
out.put(s.getTableId(), s);
}
}
}
/**
* Assign the HW value to the synchronized Split,
* and remove the Split from remainSplit and place it in finishedSplit.
*/
protected List<SnapshotSplit> recalculateRemainingSplits(
Map<String, Map<String, Map<String, String>>> chunkHighWatermarkMap,
Map<String, List<SnapshotSplit>> snapshotSplits) {
this.finishedSplits = new ArrayList<>();
for (Map.Entry<String, Map<String, Map<String, String>>> entry : chunkHighWatermarkMap.entrySet()) {
String tableId = entry.getKey();
Map<String, Map<String, String>> splitIdToHighWatermark = entry.getValue();
if (MapUtils.isEmpty(splitIdToHighWatermark)) {
continue;
}
// db.schema.table
String tableName = getTableName(tableId);
if (tableName == null) {
continue;
}
List<SnapshotSplit> tableSplits = snapshotSplits.get(tableName);
if (CollectionUtils.isEmpty(tableSplits)) {
continue;
}
tableSplits.removeIf(split -> {
String splitId = split.getSplitId();
Map<String, String> highWatermark = splitIdToHighWatermark.get(splitId);
if (highWatermark != null) {
split.setHighWatermark(highWatermark);
finishedSplits.add(split);
return true;
}
return false;
});
}
this.remainingSplits = snapshotSplits.values().stream()
.flatMap(List::stream)
.collect(Collectors.toList());
// The splits that were last syncing before the restart
int splitsNum = Math.min(remainingSplits.size(), snapshotParallelism);
List<SnapshotSplit> lastSnapshotSplits = new ArrayList<>(remainingSplits.subList(0, splitsNum));
return lastSnapshotSplits;
}
private String getTableName(String tableId) {
if (tableId == null) {
return null;
}
String[] split = tableId.split("\\.");
return split[split.length - 1];
}
@Override
public String getPersistInfo() {
return GsonUtils.GSON.toJson(this);
}
// ============ Async split progress (driven by scheduler each tick) ============
/**
* One-time setup at CREATE.
* - initial/snapshot mode: init split progress; scheduler will drive advanceSplits() each tick.
* - latest mode (and other non-splitting modes): open the remote reader (e.g. PG slot) so the
* binlog phase can start immediately; no snapshot splitting will happen.
*/
@Override
public void initOnCreate(List<String> syncTables) throws JobException {
if (!checkNeedSplitChunks(sourceProperties)) {
initSourceReader();
return;
}
synchronized (splitsLock) {
this.cachedSyncTables = syncTables;
this.committedSplitProgress = new SplitProgress();
this.cdcSplitProgress = new SplitProgress();
}
}
@Override
public boolean noMoreSplits() {
if (!checkNeedSplitChunks(sourceProperties)) {
return true;
}
synchronized (splitsLock) {
return cdcSplitProgress.getCurrentSplittingTable() == null
&& computeCdcRemainingTables().isEmpty();
}
}
/** Tables not yet touched by cdc fetching. Caller must hold splitsLock. */
private List<String> computeCdcRemainingTables() {
if (cachedSyncTables == null || cachedSyncTables.isEmpty()) {
return Collections.emptyList();
}
// SnapshotSplit.tableId is qualified ("schema.table"/"db.table"); cachedSyncTables is bare — normalize.
Set<String> touched = new HashSet<>();
for (SnapshotSplit s : finishedSplits) {
touched.add(getTableName(s.getTableId()));
}
for (SnapshotSplit s : remainingSplits) {
touched.add(getTableName(s.getTableId()));
}
if (cdcSplitProgress.getCurrentSplittingTable() != null) {
touched.add(getTableName(cdcSplitProgress.getCurrentSplittingTable()));
}
List<String> result = new ArrayList<>(cachedSyncTables.size());
for (String t : cachedSyncTables) {
if (!touched.contains(getTableName(t))) {
result.add(t);
}
}
return result;
}
@Override
public void advanceSplits() throws JobException {
// Phase 1 (locked, fast): pick next table & snapshot the resume cursor.
String tbl;
Object[] startVal;
Integer splitId;
synchronized (splitsLock) {
if (cdcSplitProgress.getCurrentSplittingTable() == null) {
List<String> remaining = computeCdcRemainingTables();
if (remaining.isEmpty()) {
return;
}
cdcSplitProgress.setCurrentSplittingTable(remaining.get(0));
cdcSplitProgress.setNextSplitStart(null);
cdcSplitProgress.setNextSplitId(null);
}
tbl = cdcSplitProgress.getCurrentSplittingTable();
startVal = cdcSplitProgress.getNextSplitStart();
splitId = cdcSplitProgress.getNextSplitId();
}
// Phase 2 (unlocked, slow): RPC. Keeps updateOffset / scheduler tick unblocked
// so task dispatch can drain remainingSplits while we fetch the next batch.
List<SnapshotSplit> batch = rpcFetchSplitsBatch(tbl, startVal, splitId);
if (batch == null || batch.isEmpty()) {
return;
}
// Phase 3 (locked, fast): compute newSplits + splitsOfTbl WITHOUT mutating in-memory.
// Persist-before-publish keeps streaming_job_meta from lagging cdcSplitProgress, so a
// crash never leaves an HW recorded for a split whose definition was not written.
List<SnapshotSplit> newSplits;
List<SnapshotSplit> splitsOfTbl;
synchronized (splitsLock) {
// replayIfNeed / pause-resume may have moved the cursor during the RPC — discard.
if (!tbl.equals(cdcSplitProgress.getCurrentSplittingTable())
|| !Objects.equals(splitId, cdcSplitProgress.getNextSplitId())) {
log.info("advanceSplits discard batch for job {} table {}: state moved on "
+ "during RPC (now table={}, splitId={})",
getJobId(), tbl,
cdcSplitProgress.getCurrentSplittingTable(),
cdcSplitProgress.getNextSplitId());
return;
}
Set<String> existingIds = new HashSet<>();
finishedSplits.forEach(s -> existingIds.add(s.getSplitId()));
remainingSplits.forEach(s -> existingIds.add(s.getSplitId()));
newSplits = new ArrayList<>();
for (SnapshotSplit s : batch) {
if (!existingIds.contains(s.getSplitId())) {
newSplits.add(s);
}
}
if (newSplits.size() < batch.size()) {
log.info("advanceSplits dedup'd {} duplicate splits (batch={}, new={}) for job {} table {}",
batch.size() - newSplits.size(), batch.size(), newSplits.size(), getJobId(), tbl);
}
// Post-batch meta state: finished + remaining + newSplits, filtered by table.
List<SnapshotSplit> allForTbl = new ArrayList<>(
finishedSplits.size() + remainingSplits.size() + newSplits.size());
allForTbl.addAll(finishedSplits);
allForTbl.addAll(remainingSplits);
allForTbl.addAll(newSplits);
// tbl is bare (matches cachedSyncTables); SnapshotSplit.tableId is qualified.
splitsOfTbl = allForTbl.stream()
.filter(s -> tbl.equals(getTableName(s.getTableId())))
.sorted(Comparator.comparingInt(s -> splitIdOf(s.getSplitId())))
.collect(Collectors.toList());
}
// Phase 4 (unlocked, slow): persist FIRST. On failure, throw → advanceSplitsIfNeed
// PAUSEs the job; autoResume re-picks the same (tbl, startVal, splitId), so cdc_client
// regenerates identical splitIds and the retried UPSERT is idempotent.
try {
StreamingJobUtils.upsertChunkList(getJobId(), tbl, splitsOfTbl);
} catch (Exception e) {
throw new JobException("Failed to persist chunk_list for job " + getJobId()
+ " table " + tbl + ": " + e.getMessage(), e);
}
// Phase 5 (locked, fast): publish. Skip if cursor moved during Phase 4 — splits are
// already in meta, next iteration / replayIfNeed reconciles via splitId.
synchronized (splitsLock) {
if (!tbl.equals(cdcSplitProgress.getCurrentSplittingTable())
|| !Objects.equals(splitId, cdcSplitProgress.getNextSplitId())) {
log.info("advanceSplits discard publish for job {} table {}: state moved on "
+ "after UPSERT (now table={}, splitId={})",
getJobId(), tbl,
cdcSplitProgress.getCurrentSplittingTable(),
cdcSplitProgress.getNextSplitId());
return;
}
remainingSplits.addAll(newSplits);
applySplitToProgress(cdcSplitProgress, batch.get(batch.size() - 1), tbl);
log.info("advanceSplits jobId={} table={} request(nextStart={}, nextSplitId={}) "
+ "published {} new splits, cdcSplitProgress -> (table={}, nextStart={}, nextSplitId={})",
getJobId(), tbl, Arrays.toString(startVal), splitId, newSplits.size(),
cdcSplitProgress.getCurrentSplittingTable(),
Arrays.toString(cdcSplitProgress.getNextSplitStart()),
cdcSplitProgress.getNextSplitId());
}
}
/** Parse the trailing integer id from flink-cdc splitId format "tableId:id". */
static int splitIdOf(String splitId) {
if (splitId == null) {
throw new IllegalArgumentException("splitId is null");
}
int colon = splitId.lastIndexOf(':');
if (colon < 0 || colon == splitId.length() - 1) {
throw new IllegalArgumentException("malformed splitId, expected 'tableId:id': " + splitId);
}
try {
return Integer.parseInt(splitId.substring(colon + 1));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("malformed splitId, expected 'tableId:id': " + splitId, e);
}
}
/** Reset progress to "no table being split" state. */
private static void clearProgress(SplitProgress progress) {
progress.setCurrentSplittingTable(null);
progress.setNextSplitStart(null);
progress.setNextSplitId(null);
}
/**
* Apply a split's position to a progress object.
* - splitEnd null/empty (final split of table) → clear all fields.
* - splitEnd non-empty → set currentSplittingTable to tableName (bare, matching the
* form used in cachedSyncTables / snapshotTable), advance start/id.
* tableName must be the bare name; SnapshotSplit.tableId is qualified (schema.table)
* and would break the fetchSplits RPC contract if reused as currentSplittingTable.
*/
private static void applySplitToProgress(SplitProgress progress, SnapshotSplit split, String tableName) {
if (split.getSplitEnd() == null || split.getSplitEnd().length == 0) {
clearProgress(progress);
} else {
progress.setCurrentSplittingTable(tableName);
progress.setNextSplitStart(split.getSplitEnd());
progress.setNextSplitId(splitIdOf(split.getSplitId()) + 1);
}
}
/** RPC fetchSplits with (table, nextSplitStart, nextSplitId, batchSize). protected for UT subclass. */
protected List<SnapshotSplit> rpcFetchSplitsBatch(String table, Object[] nextSplitStart, Integer nextSplitId)
throws JobException {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
FetchTableSplitsRequest requestParams = new FetchTableSplitsRequest(
getJobId(), sourceType.name(), sourceProperties, getFrontendAddress(), table);
requestParams.setNextSplitStart(nextSplitStart);
requestParams.setNextSplitId(nextSplitId);
requestParams.setBatchSize(Config.streaming_cdc_fetch_splits_batch_size);
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/fetchSplits")
.setParams(new Gson().toJson(requestParams))
.build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec);
PRequestCdcClientResult result = future.get(
Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new JobException("fetchSplits backend error: " + result.getStatus().getErrorMsgs(0));
}
ResponseBody<List<SnapshotSplit>> resp = objectMapper.readValue(
result.getResponse(),
new TypeReference<ResponseBody<List<SnapshotSplit>>>() {});
return resp.getData();
} catch (TimeoutException te) {
throw new JobException("fetchSplits RPC timeout: jobId=" + getJobId() + " table=" + table);
} catch (Exception ex) {
throw new JobException("fetchSplits failed: " + ex.getMessage());
}
}
protected boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
String startMode = sourceProperties.get(DataSourceConfigKeys.OFFSET);
if (startMode == null) {
return false;
}
return DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(startMode)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(startMode);
}
protected boolean isSnapshotOnlyMode() {
String offset = sourceProperties.get(DataSourceConfigKeys.OFFSET);
return DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset);
}
@Override
public String getLag() {
if (currentOffset == null || currentOffset.snapshotSplit()) {
return "";
}
// Source is idle (last task consumed no data), report zero lag
if (!hasMoreData) {
return "0";
}
BinlogSplit binlogSplit = (BinlogSplit) currentOffset.getSplits().get(0);
Map<String, String> offsetMap = binlogSplit.getStartingOffset();
if (MapUtils.isEmpty(offsetMap)) {
return "";
}
long eventTimeMs = extractEventTimeMs(offsetMap);
if (eventTimeMs <= 0) {
return "0";
}
long lagSec = (System.currentTimeMillis() - eventTimeMs) / 1000;
return String.valueOf(Math.max(lagSec, 0));
}
/**
* Extract event timestamp in milliseconds from binlog offset map.
* MySQL: ts_sec (seconds), PostgreSQL: ts_usec (microseconds).
*/
protected long extractEventTimeMs(Map<String, String> offsetMap) {
try {
String tsSec = offsetMap.get("ts_sec");
if (tsSec != null) {
return Long.parseLong(tsSec) * 1000;
}
String tsUsec = offsetMap.get("ts_usec");
if (tsUsec != null) {
return Long.parseLong(tsUsec) / 1000;
}
} catch (NumberFormatException e) {
log.warn("Failed to parse event timestamp from offset: {}", offsetMap, e);
}
return -1;
}
@Override
public void onTaskCommitted(long scannedRows, long loadBytes) {
if (scannedRows == 0 && loadBytes == 0) {
hasMoreData = false;
}
}
@Override
public boolean hasReachedEnd() {
if (!isSnapshotOnlyMode()) {
return false;
}
synchronized (splitsLock) {
return CollectionUtils.isNotEmpty(finishedSplits)
&& remainingSplits.isEmpty()
&& noMoreSplits();
}
}
/**
* Source reader needs to be initialized here.
* For example, PG slots need to be created first;
* otherwise, conflicts will occur in multi-backends scenarios.
*/
private void initSourceReader() throws JobException {
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/initReader")
.setParams(new Gson().toJson(requestParams)).build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_heavy_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_heavy_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to init job {} reader, {}", getJobId(), result.getStatus().getErrorMsgs(0));
throw new JobException(
"Failed to init source reader," + result.getStatus().getErrorMsgs(0) + ", response: "
+ result.getResponse());
}
String response = result.getResponse();
try {
ResponseBody<String> responseObj = objectMapper.readValue(
response,
new TypeReference<ResponseBody<String>>() {
}
);
if (responseObj.getCode() == RestApiStatusCode.OK.code) {
log.info("Init {} source reader successfully, response: {}", getJobId(), responseObj.getData());
return;
} else {
throw new JobException("Failed to init source reader, error: " + responseObj.getData());
}
} catch (JobException jobex) {
log.warn("Failed to init {} source reader, {}", getJobId(), response);
throw new JobException(jobex.getMessage());
} catch (Exception e) {
log.warn("Failed to init {} source reader, {}", getJobId(), response);
throw new JobException("Failed to init source reader, cause " + e.getMessage());
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/initReader jobId={} backend={}:{} timeout_sec={}",
getJobId(), backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_heavy_rpc_timeout_sec);
throw new JobException("cdc_client RPC timeout: /api/initReader jobId=" + getJobId());
} catch (ExecutionException | InterruptedException ex) {
log.warn("init source reader: ", ex);
throw new JobException(ex);
}
}
public void cleanMeta(Long jobId) throws JobException {
// clean meta table
StreamingJobUtils.deleteJobMeta(jobId);
Backend backend = StreamingJobUtils.selectBackend(cloudCluster);
JobBaseConfig requestParams =
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/close")
.setParams(new Gson().toJson(requestParams)).build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future = BackendServiceProxy.getInstance()
.requestCdcClient(address, request, Config.streaming_cdc_light_rpc_timeout_sec);
result = future.get(Config.streaming_cdc_light_rpc_timeout_sec, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to close job {} source {}", jobId, result.getStatus().getErrorMsgs(0));
}
} catch (TimeoutException te) {
log.warn("cdc_client RPC timeout api=/api/close jobId={} backend={}:{} timeout_sec={}",
jobId, backend.getHost(), backend.getBrpcPort(),
Config.streaming_cdc_light_rpc_timeout_sec);
} catch (ExecutionException | InterruptedException ex) {
log.warn("Close job error: ", ex);
}
}
private String getFrontendAddress() {
return Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort();
}
/** Mirrors flink-cdc ChunkSplitterState. */
@Getter
@Setter
public static class SplitProgress {
@SerializedName("ct")
private String currentSplittingTable;
@SerializedName("ns")
private Object[] nextSplitStart;
@SerializedName("ni")
private Integer nextSplitId;
public SplitProgress() {}
/** Deep copy for transferring committed -> cdc view on restart. */
public SplitProgress copy() {
SplitProgress c = new SplitProgress();
c.currentSplittingTable = this.currentSplittingTable;
c.nextSplitStart = this.nextSplitStart == null
? null : Arrays.copyOf(this.nextSplitStart, this.nextSplitStart.length);
c.nextSplitId = this.nextSplitId;
return c;
}
}
}