JdbcTvfSourceOffsetProvider.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.offset.jdbc;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.split.AbstractSourceSplit;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.offset.Offset;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
import org.apache.doris.nereids.trees.expressions.Properties;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.tablefunction.CdcStreamTableValuedFunction;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.google.gson.Gson;
import lombok.extern.log4j.Log4j2;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
/**
* OffsetProvider for cdc_stream TVF path.
*
* <p>Differs from JdbcSourceOffsetProvider (non-TVF path) in:
* <ul>
* <li>offset commit: FE pulls actual end offset from BE via /api/getTaskOffset/{taskId} in
* beforeCommitted, stores in txn attachment (transactionally safe)</li>
* <li>cloud mode snapshot: attachment carries cumulative chunkHighWatermarkMap so that
* replayOnCloudMode can recover full state from the single latest attachment in MS</li>
* <li>recovery: state is rebuilt from txn replay (chunkHighWatermarkMap populated by
* replayOnCommitted/replayOnCloudMode -> updateOffset), not from EditLog</li>
* <li>updateOffset: during replay remainingSplits is empty so removeIf returns false naturally;
* chunkHighWatermarkMap is always updated unconditionally to support recovery</li>
* <li>replayIfNeed: checks currentOffset directly — snapshot triggers remainingSplits rebuild
* from meta + chunkHighWatermarkMap; binlog needs no action (currentOffset already set)</li>
* </ul>
*/
@Log4j2
public class JdbcTvfSourceOffsetProvider extends JdbcSourceOffsetProvider {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
/**
* No-arg constructor required by SourceOffsetProviderFactory.createSourceOffsetProvider().
*/
public JdbcTvfSourceOffsetProvider() {
super();
}
/**
* Initializes provider state and fetches snapshot splits from BE.
* splitChunks is called here (rather than in StreamingInsertJob) to keep
* all cdc_stream-specific init logic inside the provider.
*/
@Override
public void ensureInitialized(Long jobId, Map<String, String> originTvfProps) throws JobException {
if (this.jobId != null) {
return;
}
this.jobId = jobId;
this.sourceProperties = originTvfProps;
this.chunkHighWatermarkMap = new HashMap<>();
String type = originTvfProps.get(DataSourceConfigKeys.TYPE);
Preconditions.checkArgument(type != null, "type is required");
this.sourceType = DataSourceType.valueOf(type.toUpperCase());
this.snapshotParallelism = Integer.parseInt(
originTvfProps.getOrDefault(DataSourceConfigKeys.SNAPSHOT_PARALLELISM,
DataSourceConfigKeys.SNAPSHOT_PARALLELISM_DEFAULT));
String table = originTvfProps.get(DataSourceConfigKeys.TABLE);
Preconditions.checkArgument(table != null, "table is required for cdc_stream TVF");
}
/**
* Called once on fresh job creation (not on FE restart).
* Fetches snapshot splits from BE and persists them to the meta table.
*/
@Override
public void initOnCreate() throws JobException {
String table = sourceProperties.get(DataSourceConfigKeys.TABLE);
splitChunks(Collections.singletonList(table));
}
/**
* Rewrites the cdc_stream TVF SQL with current offset meta and taskId,
* so the BE knows where to start reading and can report
* the end offset back via taskOffsetCache.
*/
@Override
public InsertIntoTableCommand rewriteTvfParams(InsertIntoTableCommand originCommand,
Offset runningOffset, long taskId) {
JdbcOffset offset = (JdbcOffset) runningOffset;
Map<String, String> props = new HashMap<>();
Plan rewritePlan = originCommand.getParsedPlan().get().rewriteUp(plan -> {
if (plan instanceof UnboundTVFRelation) {
UnboundTVFRelation originTvfRel = (UnboundTVFRelation) plan;
props.putAll(originTvfRel.getProperties().getMap());
props.put(CdcStreamTableValuedFunction.META_KEY, new Gson().toJson(offset.generateMeta()));
props.put(CdcStreamTableValuedFunction.JOB_ID_KEY, String.valueOf(jobId));
props.put(CdcStreamTableValuedFunction.TASK_ID_KEY, String.valueOf(taskId));
return new UnboundTVFRelation(
originTvfRel.getRelationId(), originTvfRel.getFunctionName(), new Properties(props));
}
return plan;
});
InsertIntoTableCommand cmd = new InsertIntoTableCommand((LogicalPlan) rewritePlan,
Optional.empty(), Optional.empty(), Optional.empty(), true, Optional.empty());
cmd.setJobId(originCommand.getJobId());
return cmd;
}
/**
* Returns the serialized JSON offset to store in txn commit attachment.
*
* <p>Calls BE /api/getTaskOffset/{taskId} to get the actual end offset recorded after
* fetchRecordStream completes (stored in PipelineCoordinator.taskOffsetCache).
*
* <p>For cloud + snapshot: returns cumulative list (all previously completed chunks +
* current task's new splits) so that replayOnCloudMode can recover full state from latest attachment.
* For non-cloud snapshot / binlog: returns only current task's splits.
*/
@Override
public String getCommitOffsetJson(Offset runningOffset, long taskId, List<Long> scanBackendIds) {
List<Map<String, String>> currentTaskEndOffset = fetchTaskEndOffset(taskId, scanBackendIds);
if (CollectionUtils.isEmpty(currentTaskEndOffset)) {
return "";
}
// Cloud + snapshot: prepend all previously completed chunks so the attachment is
// self-contained for replayOnCloudMode (MS only keeps the latest attachment)
if (Config.isCloudMode() && ((JdbcOffset) runningOffset).snapshotSplit()) {
List<Map<String, String>> cumulative = buildCumulativeSnapshotOffset(currentTaskEndOffset);
return new Gson().toJson(cumulative);
}
return new Gson().toJson(currentTaskEndOffset);
}
/**
* Queries each scan backend in order until one returns a non-empty offset for this taskId.
* Only the BE that ran the cdc_stream TVF scan node will have the entry in taskOffsetCache.
*/
private List<Map<String, String>> fetchTaskEndOffset(long taskId, List<Long> scanBackendIds) {
InternalService.PRequestCdcClientRequest request =
InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/getTaskOffset/" + taskId).build();
for (Long beId : scanBackendIds) {
Backend backend = Env.getCurrentSystemInfo().getBackend(beId);
if (backend == null) {
log.info("Backend {} not found for task {}, skipping", beId, taskId);
continue;
}
String rawResponse = null;
try {
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
InternalService.PRequestCdcClientResult result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.warn("Failed to get task {} offset from BE {}: {}", taskId,
backend.getHost(), result.getStatus().getErrorMsgs(0));
continue;
}
rawResponse = result.getResponse();
ResponseBody<List<Map<String, String>>> responseObj = OBJECT_MAPPER.readValue(
rawResponse,
new TypeReference<ResponseBody<List<Map<String, String>>>>() {});
List<Map<String, String>> data = responseObj.getData();
if (!CollectionUtils.isEmpty(data)) {
log.info("Fetched task {} offset from BE {}: {}", taskId, backend.getHost(), data);
return data;
}
} catch (Exception ex) {
log.warn("Get task offset error for task {} from BE {}, raw response: {}",
taskId, backend.getHost(), rawResponse, ex);
}
}
return Collections.emptyList();
}
/**
* Merges existing chunkHighWatermarkMap (all previously completed chunks) with
* current task's new splits, deduplicating by splitId.
*/
private List<Map<String, String>> buildCumulativeSnapshotOffset(
List<Map<String, String>> currentTaskSplits) {
Set<String> currentSplitIds = currentTaskSplits.stream()
.map(m -> m.get(SPLIT_ID)).collect(Collectors.toSet());
List<Map<String, String>> result = new ArrayList<>();
// Add all previously completed chunks (skip any that overlap with current task)
if (MapUtils.isNotEmpty(chunkHighWatermarkMap)) {
for (Map.Entry<String, Map<String, Map<String, String>>> tableEntry
: chunkHighWatermarkMap.entrySet()) {
for (Map.Entry<String, Map<String, String>> splitEntry
: tableEntry.getValue().entrySet()) {
if (!currentSplitIds.contains(splitEntry.getKey())) {
Map<String, String> map = new HashMap<>(splitEntry.getValue());
map.put(SPLIT_ID, splitEntry.getKey());
result.add(map);
}
}
}
}
result.addAll(currentTaskSplits);
return result;
}
/**
* TVF path updateOffset.
*
* <p>Snapshot: always writes to chunkHighWatermarkMap (needed for cloud cumulative attachment
* and FE-restart recovery). In normal flow removeIf finds the split in remainingSplits and
* adds it to finishedSplits. During txn replay remainingSplits is empty so removeIf returns
* false naturally — chunkHighWatermarkMap is still updated for replayIfNeed to use later.
*
* <p>Binlog: currentOffset is set above; no extra state needed for TVF recovery path.
*/
@Override
public void updateOffset(Offset offset) {
this.currentOffset = (JdbcOffset) offset;
if (currentOffset.snapshotSplit()) {
for (AbstractSourceSplit split : currentOffset.getSplits()) {
SnapshotSplit ss = (SnapshotSplit) split;
boolean removed = remainingSplits.removeIf(v -> {
if (v.getSplitId().equals(ss.getSplitId())) {
ss.setTableId(v.getTableId());
ss.setSplitKey(v.getSplitKey());
ss.setSplitStart(v.getSplitStart());
ss.setSplitEnd(v.getSplitEnd());
return true;
}
return false;
});
if (removed) {
finishedSplits.add(ss);
}
chunkHighWatermarkMap.computeIfAbsent(buildTableKey(), k -> new HashMap<>())
.put(ss.getSplitId(), ss.getHighWatermark());
}
}
// Binlog: currentOffset is already set; no binlogOffsetPersist needed for TVF path.
}
/**
* TVF path recovery: offsetProviderPersist is always null (no EditLog write).
* currentOffset is set by replayOnCommitted/replayOnCloudMode -> updateOffset before this runs.
*
* <ul>
* <li>snapshot: mid-snapshot restart — rebuild remainingSplits from meta + chunkHighWatermarkMap</li>
* <li>binlog: currentOffset already correct from updateOffset; nothing to do</li>
* </ul>
*/
@Override
public void replayIfNeed(StreamingInsertJob job) throws JobException {
if (currentOffset == null) {
log.info("Replaying TVF offset provider for job {}: no committed txn, skip", getJobId());
return;
}
if (currentOffset.snapshotSplit()) {
log.info("Replaying TVF offset provider for job {}: restoring snapshot state from txn replay",
getJobId());
Map<String, List<SnapshotSplit>> snapshotSplits = StreamingJobUtils.restoreSplitsToJob(job.getJobId());
if (MapUtils.isNotEmpty(snapshotSplits)) {
List<SnapshotSplit> lastSnapshotSplits =
recalculateRemainingSplits(chunkHighWatermarkMap, snapshotSplits);
if (remainingSplits.isEmpty()) {
if (!lastSnapshotSplits.isEmpty()) {
currentOffset = new JdbcOffset(lastSnapshotSplits);
} else if (!isSnapshotOnlyMode()) {
BinlogSplit binlogSplit = new BinlogSplit();
binlogSplit.setFinishedSplits(finishedSplits);
currentOffset = new JdbcOffset(Collections.singletonList(binlogSplit));
}
}
}
} else {
log.info("Replaying TVF offset provider for job {}: binlog offset already set, nothing to do",
getJobId());
}
}
/**
* Builds the chunkHighWatermarkMap outer key as schema.table (if schema is present)
* or database.table, matching the format used by snapshotSplits keys in
* recalculateRemainingSplits.
*/
private String buildTableKey() {
String schema = sourceProperties.get(DataSourceConfigKeys.SCHEMA);
String qualifier = (schema != null && !schema.isEmpty())
? schema : sourceProperties.get(DataSourceConfigKeys.DATABASE);
return qualifier + "." + sourceProperties.get(DataSourceConfigKeys.TABLE);
}
/**
* TVF path does not persist to EditLog; state is recovered via txn replay.
* This override is defensive — the persistOffsetProviderIfNeed() call path
* only runs in the non-TVF commitOffset flow and won't reach here.
*/
@Override
public String getPersistInfo() {
return null;
}
@Override
public void applyEndOffsetToTask(Offset runningOffset, Offset endOffset) {
if (!(runningOffset instanceof JdbcOffset) || !(endOffset instanceof JdbcOffset)) {
return;
}
JdbcOffset running = (JdbcOffset) runningOffset;
JdbcOffset end = (JdbcOffset) endOffset;
if (running.snapshotSplit()) {
for (int i = 0; i < running.getSplits().size() && i < end.getSplits().size(); i++) {
SnapshotSplit rSplit = (SnapshotSplit) running.getSplits().get(i);
SnapshotSplit eSplit = (SnapshotSplit) end.getSplits().get(i);
rSplit.setHighWatermark(eSplit.getHighWatermark());
}
} else {
BinlogSplit rSplit = (BinlogSplit) running.getSplits().get(0);
BinlogSplit eSplit = (BinlogSplit) end.getSplits().get(0);
// deserializeOffset stores binlog position in startingOffset
rSplit.setEndingOffset(eSplit.getStartingOffset());
}
}
}