StreamingMultiTblTask.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.FeConstants;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.httpv2.rest.StreamingJobAction.CommitOffsetRequest;
import org.apache.doris.job.base.Job;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.common.TaskStatus;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.offset.SourceOffsetProvider;
import org.apache.doris.job.offset.jdbc.JdbcOffset;
import org.apache.doris.job.offset.jdbc.JdbcSourceOffsetProvider;
import org.apache.doris.job.offset.jdbc.split.BinlogSplit;
import org.apache.doris.job.offset.jdbc.split.SnapshotSplit;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.InternalService.PRequestCdcClientResult;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TRow;
import org.apache.doris.thrift.TStatusCode;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import lombok.Getter;
import lombok.extern.log4j.Log4j2;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

@Log4j2
@Getter
public class StreamingMultiTblTask extends AbstractStreamingTask {
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private DataSourceType dataSourceType;
    private SourceOffsetProvider offsetProvider;
    private Map<String, String> sourceProperties;
    private Map<String, String> targetProperties;
    private String targetDb;
    private StreamingJobProperties jobProperties;
    private long scannedRows = 0L;
    private long scannedBytes = 0L;

    public StreamingMultiTblTask(Long jobId,
            long taskId,
            DataSourceType dataSourceType,
            SourceOffsetProvider offsetProvider,
            Map<String, String> sourceProperties,
            String targetDb,
            Map<String, String> targetProperties,
            StreamingJobProperties jobProperties,
            UserIdentity userIdentity) {
        super(jobId, taskId, userIdentity);
        this.dataSourceType = dataSourceType;
        this.offsetProvider = offsetProvider;
        this.sourceProperties = sourceProperties;
        this.targetProperties = targetProperties;
        this.jobProperties = jobProperties;
        this.targetDb = targetDb;
    }

    @Override
    public void before() throws Exception {
        if (getIsCanceled().get()) {
            log.info("streaming multi task has been canceled, task id is {}", getTaskId());
            return;
        }
        this.status = TaskStatus.RUNNING;
        this.startTimeMs = System.currentTimeMillis();
        this.runningOffset = offsetProvider.getNextOffset(null, sourceProperties);
        log.info("streaming multi task {} get running offset: {}", taskId, runningOffset.toString());
    }

    @Override
    public void run() throws JobException {
        if (getIsCanceled().get()) {
            log.info("task has been canceled, task id is {}", getTaskId());
            return;
        }
        log.info("start to run streaming multi task, offset is {}", runningOffset.toString());
        sendWriteRequest();
    }

    private void sendWriteRequest() throws JobException {
        Backend backend = StreamingJobUtils.selectBackend(jobId);
        Map<String, Object> params = buildRequestParams();
        InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
                .setApi("/api/writeRecords")
                .setParams(new Gson().toJson(params)).build();
        TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
        InternalService.PRequestCdcClientResult result = null;
        try {
            Future<PRequestCdcClientResult> future =
                    BackendServiceProxy.getInstance().requestCdcClient(address, request);
            result = future.get();
            TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
            if (code != TStatusCode.OK) {
                log.error("Failed to get split from backend, {}", result.getStatus().getErrorMsgs(0));
                throw new JobException(
                        "Failed to get split from backend," + result.getStatus().getErrorMsgs(0) + ", response: "
                                + result.getResponse());
            }
            String response = result.getResponse();
            try {
                ResponseBody<String> responseObj = objectMapper.readValue(
                        response,
                        new TypeReference<ResponseBody<String>>() {
                        }
                );
                if (responseObj.getCode() == RestApiStatusCode.OK.code) {
                    log.info("Send write records request successfully, response: {}", responseObj.getData());
                    return;
                }
            } catch (JsonProcessingException e) {
                log.error("Failed to parse write records response: {}", response, e);
                throw new JobException("Failed to parse write records response: " + response);
            }
            throw new JobException("Failed to send write records request , error message: " + response);
        } catch (ExecutionException | InterruptedException ex) {
            log.error("Send write request failed: ", ex);
            throw new JobException(ex);
        }
    }

    private String getToken() throws JobException {
        String token = "";
        try {
            // Acquire token from master
            token = Env.getCurrentEnv().getTokenManager().acquireToken();
        } catch (Exception e) {
            log.warn("Failed to get auth token:", e);
            throw new JobException(e.getMessage());
        }
        return token;
    }

    private Map<String, Object> buildRequestParams() throws JobException {
        JdbcOffset offset = (JdbcOffset) runningOffset;
        Map<String, Object> params = new HashMap<>();
        params.put("jobId", getJobId());
        params.put("labelName", getLabelName());
        params.put("dataSource", dataSourceType);
        params.put("meta", offset.getSplit());
        params.put("config", sourceProperties);
        params.put("targetDb", targetDb);
        params.put("token", getToken());
        params.put("taskId", getTaskId());
        params.put("frontendAddress",
                Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort());
        params.put("maxInterval", jobProperties.getMaxIntervalSecond());
        return params;
    }

    @Override
    public boolean onSuccess() throws JobException {
        if (getIsCanceled().get()) {
            return false;
        }
        log.info("streaming multi task {} send write request run successfully.", getTaskId());
        return false;
    }

    /**
     * Callback function for offset commit success.
     */
    public void successCallback(CommitOffsetRequest offsetRequest) {
        if (getIsCanceled().get()) {
            return;
        }
        this.status = TaskStatus.SUCCESS;
        this.finishTimeMs = System.currentTimeMillis();
        JdbcOffset runOffset = (JdbcOffset) this.runningOffset;
        if (!isCallable()) {
            return;
        }
        // set end offset to running offset
        Map<String, String> offsetMeta;
        try {
            offsetMeta = objectMapper.readValue(offsetRequest.getOffset(), new TypeReference<Map<String, String>>() {
            });
        } catch (JsonProcessingException e) {
            log.warn("Failed to parse offset meta from request: {}", offsetRequest.getOffset(), e);
            throw new RuntimeException(e);
        }
        String splitId = offsetMeta.remove(JdbcSourceOffsetProvider.SPLIT_ID);
        if (runOffset.getSplit().snapshotSplit()
                && !BinlogSplit.BINLOG_SPLIT_ID.equals(splitId)) {
            SnapshotSplit split = (SnapshotSplit) runOffset.getSplit();
            split.setHighWatermark(offsetMeta);
        } else if (!runOffset.getSplit().snapshotSplit()
                && BinlogSplit.BINLOG_SPLIT_ID.equals(splitId)) {
            BinlogSplit split = (BinlogSplit) runOffset.getSplit();
            split.setEndingOffset(offsetMeta);
        } else {
            log.warn("Split id is not consistent, task running split id {},"
                    + " offset commit request split id {}", runOffset.getSplit().getSplitId(), splitId);
            throw new RuntimeException("Split id is not consistent");
        }
        this.scannedRows = offsetRequest.getScannedRows();
        this.scannedBytes = offsetRequest.getScannedBytes();
        Job job = Env.getCurrentEnv().getJobManager().getJob(getJobId());
        if (null == job) {
            log.info("job is null, job id is {}", jobId);
            return;
        }
        StreamingInsertJob streamingInsertJob = (StreamingInsertJob) job;
        streamingInsertJob.onStreamTaskSuccess(this);
    }

    @Override
    protected void onFail(String errMsg) throws JobException {
        super.onFail(errMsg);
    }

    @Override
    public void cancel(boolean needWaitCancelComplete) {
        // No manual cancellation is required; the task ID will be checked for consistency in the beforeCommit function.
        super.cancel(needWaitCancelComplete);
    }

    @Override
    public void closeOrReleaseResources() {
        // close cdc client connection
    }

    public boolean isTimeout() {
        // todo: need to config
        return (System.currentTimeMillis() - createTimeMs) > 300 * 1000;
    }

    @Override
    public TRow getTvfInfo(String jobName) {
        TRow trow = super.getTvfInfo(jobName);
        trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
        Map<String, Object> statistic = new HashMap<>();
        statistic.put("scannedRows", scannedRows);
        statistic.put("loadBytes", scannedBytes);
        trow.addToColumnValue(new TCell().setStringVal(new Gson().toJson(statistic)));

        if (this.getUserIdentity() == null) {
            trow.addToColumnValue(new TCell().setStringVal(FeConstants.null_string));
        } else {
            trow.addToColumnValue(new TCell().setStringVal(this.getUserIdentity().getQualifiedUser()));
        }
        trow.addToColumnValue(new TCell().setStringVal(""));
        trow.addToColumnValue(new TCell().setStringVal(runningOffset == null
                ? FeConstants.null_string : runningOffset.showRange()));
        return trow;
    }
}