CanalSyncChannel.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.sync.canal;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.UserException;
import org.apache.doris.load.sync.SyncChannel;
import org.apache.doris.load.sync.SyncChannelCallback;
import org.apache.doris.load.sync.SyncJob;
import org.apache.doris.load.sync.model.Data;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.InsertStreamTxnExecutor;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.task.SyncTask;
import org.apache.doris.task.SyncTaskPool;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TFileType;
import org.apache.doris.thrift.TMergeType;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.BeginTransactionException;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
public class CanalSyncChannel extends SyncChannel {
private static final Logger LOG = LogManager.getLogger(CanalSyncChannel.class);
private static final String DELETE_COLUMN = "_delete_sign_";
private static final String DELETE_CONDITION = DELETE_COLUMN + "=1";
private static final String NULL_VALUE_FOR_LOAD = "\\N";
private final int index;
private long timeoutSecond;
private long lastBatchId;
private Data<InternalService.PDataRow> batchBuffer;
private InsertStreamTxnExecutor txnExecutor;
public CanalSyncChannel(long id, SyncJob syncJob, Database db, OlapTable table, List<String> columns,
String srcDataBase, String srcTable) {
super(id, syncJob, db, table, columns, srcDataBase, srcTable);
this.index = SyncTaskPool.getNextIndex();
this.batchBuffer = new Data<>();
this.lastBatchId = -1L;
this.timeoutSecond = -1L;
}
private static final class SendTask extends SyncTask {
private final InsertStreamTxnExecutor executor;
private final Data<InternalService.PDataRow> rows;
public SendTask(long signature, int index, SyncChannelCallback callback, Data<InternalService.PDataRow> rows,
InsertStreamTxnExecutor executor) {
super(signature, index, callback);
this.executor = executor;
this.rows = rows;
}
public void exec() throws Exception {
TransactionEntry txnEntry = executor.getTxnEntry();
txnEntry.setDataToSend(rows.getDatas());
executor.sendData();
}
}
private static final class EOFTask extends SyncTask {
public EOFTask(long signature, int index, SyncChannelCallback callback) {
super(signature, index, callback);
}
public void exec() throws Exception {
callback.onFinished(signature);
}
}
@Override
public void beginTxn(long batchId) throws UserException, TException, TimeoutException,
InterruptedException, ExecutionException {
if (!isTxnBegin()) {
long currentTime = System.currentTimeMillis();
String label = "label_job" + + jobId + "_channel" + id + "_db" + db.getId() + "_tbl" + tbl.getId()
+ "_batch" + batchId + "_" + currentTime;
String targetColumn = Joiner.on(",").join(columns) + "," + DELETE_COLUMN;
GlobalTransactionMgrIface globalTransactionMgr = Env.getCurrentGlobalTransactionMgr();
long txnLimit = db.getTransactionQuotaSize();
long runningTxnNums = globalTransactionMgr.getRunningTxnNums(db.getId());
if (runningTxnNums < txnLimit) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
TTxnParams txnConf = txnEntry.getTxnConf();
TransactionState.LoadJobSourceType sourceType = TransactionState.LoadJobSourceType.INSERT_STREAMING;
TStreamLoadPutRequest request = null;
try {
long txnId = globalTransactionMgr.beginTransaction(db.getId(),
Lists.newArrayList(tbl.getId()), label,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, 0,
FrontendOptions.getLocalHostAddress(),
ExecuteEnv.getInstance().getStartupTime()),
sourceType, timeoutSecond);
String token = Env.getCurrentEnv().getTokenManager().acquireToken();
request = new TStreamLoadPutRequest()
.setTxnId(txnId).setDb(txnConf.getDb()).setTbl(txnConf.getTbl())
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
.setThriftRpcTimeoutMs(5000).setLoadId(txnExecutor.getLoadId())
.setMergeType(TMergeType.MERGE).setDeleteCondition(DELETE_CONDITION)
.setColumns(targetColumn);
txnConf.setTxnId(txnId).setToken(token);
txnEntry.setLabel(label);
txnExecutor.setTxnId(txnId);
} catch (DuplicatedRequestException e) {
LOG.warn("duplicate request for sync channel. channel: {},"
+ " request id: {}, txn: {}, table: {}",
id, e.getDuplicatedRequestId(), e.getTxnId(), targetTable);
txnExecutor.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
// this happens when channel re-consume same batch,
// we should just pass through it without begin a new txn
LOG.warn("Label already used in channel {}, label: {}, table: {}, batch: {}",
id, label, targetTable, batchId);
return;
} catch (AnalysisException | BeginTransactionException e) {
LOG.warn("encounter an error when beginning txn in channel {}, table: {}",
id, targetTable);
throw e;
} catch (UserException e) {
LOG.warn("encounter an error when creating plan in channel {}, table: {}",
id, targetTable);
throw e;
}
try {
// async exec begin transaction
long txnId = txnExecutor.getTxnId();
if (txnId != - 1L) {
this.txnExecutor.beginTransaction(request);
LOG.info("begin txn in channel {}, table: {}, label:{}, txn id: {}",
id, targetTable, label, txnExecutor.getTxnId());
}
} catch (TException e) {
LOG.warn("Failed to begin txn in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch (TimeoutException | InterruptedException | ExecutionException e) {
LOG.warn("Error occur while waiting begin txn response in channel {},"
+ " table: {}, txn: {}, msg:{}", id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
}
} else {
String failMsg = "current running txns on db " + db.getId() + " is "
+ runningTxnNums
+ ", larger than limit " + txnLimit;
LOG.warn(failMsg);
throw new BeginTransactionException(failMsg);
}
}
}
@Override
public void abortTxn(String reason) throws TException, TimeoutException, InterruptedException, ExecutionException {
if (!isTxnBegin()) {
LOG.warn("No transaction to abort in channel {}, table: {}", id, targetTable);
return;
}
try {
this.txnExecutor.abortTransaction();
LOG.info("abort txn in channel {}, table: {}, txn id: {}, last batch: {}, reason: {}",
id, targetTable, txnExecutor.getTxnId(), lastBatchId, reason);
} catch (TException e) {
LOG.warn("Failed to abort txn in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Error occur while waiting abort txn response in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} finally {
this.batchBuffer = new Data<>();
updateBatchId(-1L);
}
}
@Override
public void commitTxn() throws TException, TimeoutException, InterruptedException, ExecutionException {
if (!isTxnBegin()) {
LOG.warn("No transaction to commit in channel {}, table: {}", id, targetTable);
return;
}
try {
flushData();
this.txnExecutor.commitTransaction();
LOG.info("commit txn in channel {}, table: {}, txn id: {}, last batch: {}",
id, targetTable, txnExecutor.getTxnId(), lastBatchId);
} catch (TException e) {
LOG.warn("Failed to commit txn in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} catch (InterruptedException | ExecutionException | TimeoutException e) {
LOG.warn("Error occur while waiting commit txn return in channel {}, table: {}, txn: {}, msg:{}",
id, targetTable, txnExecutor.getTxnId(), e.getMessage());
throw e;
} finally {
this.batchBuffer = new Data<>();
updateBatchId(-1L);
}
}
@Override
public void initTxn(long timeoutSecond) {
if (!isTxnInit()) {
UUID uuid = UUID.randomUUID();
TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
this.timeoutSecond = timeoutSecond;
TTxnParams txnConf = new TTxnParams().setNeedTxn(true).setThriftRpcTimeoutMs(5000).setTxnId(-1)
.setDb(db.getFullName()).setTbl(tbl.getName()).setDbId(db.getId());
this.txnExecutor = new InsertStreamTxnExecutor(new TransactionEntry(txnConf, db, tbl));
txnExecutor.setTxnId(-1L);
txnExecutor.setLoadId(loadId);
}
}
public void clearTxn() {
this.txnExecutor = null;
}
public void submit(long batchId, CanalEntry.EventType eventType, CanalEntry.RowChange rowChange) {
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
List<InternalService.PDataRow> rows = parseRow(eventType, rowData);
try {
Preconditions.checkState(isTxnInit());
if (batchId > lastBatchId) {
if (!isTxnBegin()) {
beginTxn(batchId);
} else {
SendTask task = new SendTask(id, index, callback, batchBuffer, txnExecutor);
SyncTaskPool.submit(task);
this.batchBuffer = new Data<>();
}
updateBatchId(batchId);
}
} catch (Exception e) {
String errMsg = "encounter exception when submit in channel " + id + ", table: "
+ targetTable + ", batch: " + batchId;
LOG.error(errMsg, e);
throw new CanalException(errMsg, e);
}
this.batchBuffer.addRows(rows);
}
}
public void submitEOF() {
EOFTask task = new EOFTask(id, index, callback);
SyncTaskPool.submit(task);
}
private List<InternalService.PDataRow> parseRow(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {
List<InternalService.PDataRow> rows = Lists.newArrayList();
switch (eventType) {
case DELETE:
rows.add(parseRow(CanalEntry.EventType.DELETE, rowData.getBeforeColumnsList()));
break;
case INSERT:
rows.add(parseRow(CanalEntry.EventType.INSERT, rowData.getAfterColumnsList()));
break;
case UPDATE:
// update is to delete first and then insert
rows.add(parseRow(CanalEntry.EventType.DELETE, rowData.getBeforeColumnsList()));
rows.add(parseRow(CanalEntry.EventType.INSERT, rowData.getAfterColumnsList()));
break;
default:
LOG.warn("ignore event, channel: {}, schema: {}, table: {}", id, srcDataBase, srcTable);
}
return rows;
}
private InternalService.PDataRow parseRow(CanalEntry.EventType eventType, List<CanalEntry.Column> columns) {
InternalService.PDataRow.Builder row = InternalService.PDataRow.newBuilder();
for (CanalEntry.Column column : columns) {
if (column.getIsNull()) {
row.addColBuilder().setValue(NULL_VALUE_FOR_LOAD);
} else {
row.addColBuilder().setValue(column.getValue());
}
}
// add batch delete condition to the tail
if (eventType == CanalEntry.EventType.DELETE) {
row.addColBuilder().setValue("1");
} else {
row.addColBuilder().setValue("0");
}
return row.build();
}
public void flushData() throws TException, TimeoutException,
InterruptedException, ExecutionException {
if (this.batchBuffer.isNotEmpty()) {
TransactionEntry txnEntry = txnExecutor.getTxnEntry();
txnEntry.setDataToSend(batchBuffer.getDatas());
this.txnExecutor.sendData();
this.batchBuffer = new Data<>();
}
}
public boolean isTxnBegin() {
return isTxnInit() && this.txnExecutor.getTxnId() != -1;
}
public boolean isTxnInit() {
return this.txnExecutor != null;
}
private void updateBatchId(long batchId) {
this.lastBatchId = batchId;
}
public String getInfo() {
StringBuilder stringBuilder = new StringBuilder();
stringBuilder.append(srcDataBase).append(".").append(srcTable);
stringBuilder.append("->");
stringBuilder.append(targetTable);
return stringBuilder.toString();
}
public long getId() {
return id;
}
public String getSrcTable() {
return srcTable;
}
public String getSrcDataBase() {
return srcDataBase;
}
public String getTargetTable() {
return targetTable;
}
public void setCallback(SyncChannelCallback callback) {
this.callback = callback;
}
public void setPartitions(PartitionNames partitionNames) {
this.partitionNames = partitionNames;
}
}