StreamingMultiTblTask.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.job.extensions.insert.streaming;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeConstants;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.httpv2.rest.StreamingJobAction.CommitOffsetRequest;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
import org.apache.doris.job.cdc.split.BinlogSplit;
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.jdbc.JdbcOffset;
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
@Log4j2
@Getter
public class StreamingMultiTblTask extends AbstractStreamingTask {
private static final ObjectMapper objectMapper = new ObjectMapper();
private DataSourceType dataSourceType;
private SourceOffsetProvider offsetProvider;
private Map<String, String> sourceProperties;
private Map<String, String> targetProperties;
private String targetDb;
private StreamingJobProperties jobProperties;
private long scannedRows = 0L;
private long scannedBytes = 0L;
public StreamingMultiTblTask(Long jobId,
long taskId,
DataSourceType dataSourceType,
SourceOffsetProvider offsetProvider,
Map<String, String> sourceProperties,
String targetDb,
Map<String, String> targetProperties,
StreamingJobProperties jobProperties,
UserIdentity userIdentity) {
super(jobId, taskId, userIdentity);
this.dataSourceType = dataSourceType;
this.offsetProvider = offsetProvider;
this.sourceProperties = sourceProperties;
this.targetProperties = targetProperties;
this.jobProperties = jobProperties;
this.targetDb = targetDb;
}
@Override
public void before() throws Exception {
if (getIsCanceled().get()) {
log.info("streaming multi task has been canceled, task id is {}", getTaskId());
return;
}
this.status = TaskStatus.RUNNING;
this.startTimeMs = System.currentTimeMillis();
this.runningOffset = offsetProvider.getNextOffset(null, sourceProperties);
log.info("streaming multi task {} get running offset: {}", taskId, runningOffset.toString());
}
@Override
public void run() throws JobException {
if (getIsCanceled().get()) {
log.info("task has been canceled, task id is {}", getTaskId());
return;
}
log.info("start to run streaming multi task, offset is {}", runningOffset.toString());
sendWriteRequest();
}
private void sendWriteRequest() throws JobException {
Backend backend = StreamingJobUtils.selectBackend(jobId);
WriteRecordRequest params = buildRequestParams();
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
.setApi("/api/writeRecords")
.setParams(new Gson().toJson(params)).build();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
InternalService.PRequestCdcClientResult result = null;
try {
Future<PRequestCdcClientResult> future =
BackendServiceProxy.getInstance().requestCdcClient(address, request);
result = future.get();
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
log.error("Failed to get split from backend, {}", result.getStatus().getErrorMsgs(0));
throw new JobException(
"Failed to get split from backend," + result.getStatus().getErrorMsgs(0) + ", response: "
+ result.getResponse());
}
String response = result.getResponse();
try {
ResponseBody<String> responseObj = objectMapper.readValue(
response,
new TypeReference<ResponseBody<String>>() {
}
);
if (responseObj.getCode() == RestApiStatusCode.OK.code) {
log.info("Send write records request successfully, response: {}", responseObj.getData());
return;
}
} catch (JsonProcessingException e) {
log.error("Failed to parse write records response: {}", response, e);
throw new JobException("Failed to parse write records response: " + response);
}
throw new JobException("Failed to send write records request , error message: " + response);
} catch (ExecutionException | InterruptedException ex) {
log.error("Send write request failed: ", ex);
throw new JobException(ex);
}
}
private String getToken() throws JobException {
String token = "";
try {
// Acquire token from master
token = Env.getCurrentEnv().getTokenManager().acquireToken();
} catch (Exception e) {
log.warn("Failed to get auth token:", e);
throw new JobException(e.getMessage());
}
return token;
}
private WriteRecordRequest buildRequestParams() throws JobException {
JdbcOffset offset = (JdbcOffset) runningOffset;
WriteRecordRequest request = new WriteRecordRequest();
request.setJobId(getJobId());
request.setConfig(sourceProperties);
request.setDataSource(dataSourceType.name());
request.setTaskId(getTaskId() + "");
request.setToken(getToken());
request.setTargetDb(targetDb);
Map<String, Object> splitMeta = objectMapper.convertValue(offset.getSplit(),
new TypeReference<Map<String, Object>>() {
});
request.setMeta(splitMeta);
String feAddr = Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort();
request.setFrontendAddress(feAddr);
request.setMaxInterval(jobProperties.getMaxIntervalSecond());
return request;
}
@Override
public boolean onSuccess() throws JobException {
if (getIsCanceled().get()) {
return false;
}
log.info("streaming multi task {} send write request run successfully.", getTaskId());
return false;
}
/**
* Callback function for offset commit success.
*/
public void successCallback(CommitOffsetRequest offsetRequest) {
if (getIsCanceled().get()) {
return;
}
this.status = TaskStatus.SUCCESS;
this.finishTimeMs = System.currentTimeMillis();
JdbcOffset runOffset = (JdbcOffset) this.runningOffset;
if (!isCallable()) {
return;
}
// set end offset to running offset
Map<String, String> offsetMeta;
try {
offsetMeta = objectMapper.readValue(offsetRequest.getOffset(), new TypeReference<Map<String, String>>() {
});
} catch (JsonProcessingException e) {
log.warn("Failed to parse offset meta from request: {}", offsetRequest.getOffset(), e);
throw new RuntimeException(e);
}
String splitId = offsetMeta.remove(JdbcSourceOffsetProvider.SPLIT_ID);
if (runOffset.getSplit().snapshotSplit()
&& !BinlogSplit.BINLOG_SPLIT_ID.equals(splitId)) {
SnapshotSplit split = (SnapshotSplit) runOffset.getSplit();
split.setHighWatermark(offsetMeta);
} else if (!runOffset.getSplit().snapshotSplit()
&& BinlogSplit.BINLOG_SPLIT_ID.equals(splitId)) {
BinlogSplit split = (BinlogSplit) runOffset.getSplit();
split.setEndingOffset(offsetMeta);
} else {
log.warn("Split id is not consistent, task running split id {},"
+ " offset commit request split id {}", runOffset.getSplit().getSplitId(), splitId);
throw new RuntimeException("Split id is not consistent");
}
this.scannedRows = offsetRequest.getScannedRows();
this.scannedBytes = offsetRequest.getScannedBytes();
Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
if (null == job) {
log.info("job is null, job id is {}", jobId);
return;
}
StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job;
streamingInsertJob.onStreamTaskSuccess(this);
}
@Override
protected void onFail(String errMsg) throws JobException {
super.onFail(errMsg);
}
@Override
public void cancel(boolean needWaitCancelComplete) {
// No manual cancellation is required; the task ID will be checked for consistency in the beforeCommit function.
super.cancel(needWaitCancelComplete);
}
@Override
public void closeOrReleaseResources() {
// close cdc client connection
}
public boolean isTimeout() {
// todo: need to config
return (System.currentTimeMillis() - createTimeMs) > 300 * 1000;
}
@Override
public TRow getTvfInfo(String jobName) {
TRow trow = super.getTvfInfo(jobName);
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
Map<String, Object> statistic = new HashMap<>();
statistic.put("scannedRows", scannedRows);
statistic.put("loadBytes", scannedBytes);
trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(statistic)));
if (this.getUserIdentity() == null) {
trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
} else {
trow.addToColumnValue(new TCell().setStringVal(this.getUserIdentity().getQualifiedUser()));
}
trow.addToColumnValue(new TCell().setStringVal(""));
trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
? FeConstants.null_string : runningOffset.showRange()));
return trow;
}
}