CanalSyncDataConsumer.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.sync.canal;
import org.apache.doris.common.Config;
import org.apache.doris.common.Status;
import org.apache.doris.common.UserException;
import org.apache.doris.load.sync.SyncChannelHandle;
import org.apache.doris.load.sync.SyncDataConsumer;
import org.apache.doris.load.sync.SyncFailMsg;
import org.apache.doris.load.sync.model.Events;
import org.apache.doris.load.sync.position.EntryPosition;
import org.apache.doris.load.sync.position.PositionMeta;
import org.apache.doris.load.sync.position.PositionRange;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
public class CanalSyncDataConsumer extends SyncDataConsumer {
private static Logger logger = LogManager.getLogger(CanalSyncDataConsumer.class);
private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";
private static final long MIN_COMMIT_EVENT_SIZE = Config.min_sync_commit_size;
private static final long MIN_COMMIT_MEM_SIZE = Config.min_bytes_sync_commit;
private static final long MAX_COMMIT_MEM_SIZE = Config.max_bytes_sync_commit;
private CanalSyncJob syncJob;
private CanalConnector connector;
private Map<Long, CanalSyncChannel> idToChannels;
private Set<Long> ackBatches;
private PositionMeta<EntryPosition> positionMeta;
private LinkedBlockingQueue<Events<CanalEntry.Entry, EntryPosition>> dataBlockingQueue;
private SyncChannelHandle handle;
private ReentrantLock getLock;
private int sleepTimeMs;
private long commitIntervalSecond;
public void setChannels(Map<Long, CanalSyncChannel> idToChannels) {
for (CanalSyncChannel channel : idToChannels.values()) {
this.positionMeta.setCommitPosition(channel.getId(), EntryPosition.MIN_POS);
channel.setCallback(handle);
}
this.idToChannels = idToChannels;
}
public CanalSyncDataConsumer(CanalSyncJob syncJob, CanalConnector connector, ReentrantLock getLock, boolean debug) {
super(debug);
this.syncJob = syncJob;
this.connector = connector;
this.dataBlockingQueue = Queues.newLinkedBlockingQueue(1024);
this.ackBatches = Sets.newHashSet();
this.positionMeta = new PositionMeta<>();
this.getLock = getLock;
this.handle = new SyncChannelHandle();
this.commitIntervalSecond = Config.sync_commit_interval_second;
this.sleepTimeMs = 500;
}
public void stop(boolean needCleanUp) {
super.stop();
if (needCleanUp) {
cleanUp();
}
}
@Override
public void beginForTxn() {
handle.reset(idToChannels.size());
for (CanalSyncChannel channel : idToChannels.values()) {
channel.initTxn(Config.max_stream_load_timeout_second);
handle.mark(channel);
}
}
@Override
public void abortForTxn(String reason) {
abortForTxn(idToChannels.values(), reason);
}
public void abortForTxn(Collection<CanalSyncChannel> channels, String reason) {
logger.info("client is aborting transactions. JobId: {}, reason: {}", syncJob.getId(), reason);
for (CanalSyncChannel channel : channels) {
if (channel.isTxnBegin()) {
try {
channel.abortTxn(reason);
} catch (Exception e) {
logger.warn("Abort channel failed. jobId: {},channel: {}, target: {}, msg: {}",
syncJob.getId(), channel.getId(), channel.getTargetTable(), e.getMessage());
}
}
}
rollback();
}
@Override
public void commitForTxn() {
logger.info("client is committing transactions. JobId: {}", syncJob.getId());
boolean success = true;
EntryPosition latestPosition = positionMeta.getLatestPosition();
for (CanalSyncChannel channel : idToChannels.values()) {
if (channel.isTxnBegin()) {
try {
channel.commitTxn();
this.positionMeta.setCommitPosition(channel.getId(), latestPosition);
} catch (Exception ce) {
logger.warn("Commit channel failed. JobId: {}, channel: {}, target: {}, msg: {}",
syncJob.getId(), channel.getId(), channel.getTargetTable(), ce.getMessage());
try {
channel.abortTxn(ce.getMessage());
} catch (Exception ae) {
logger.warn("Abort channel failed. JobId: {},channel: {}, target: {}, msg: {}",
syncJob.getId(), channel.getId(), channel.getTargetTable(), ae.getMessage());
}
success = false;
}
}
}
if (success) {
ack();
} else {
rollback();
}
}
public Status waitForTxn() {
for (CanalSyncChannel channel : idToChannels.values()) {
channel.submitEOF();
}
Status st = Status.CANCELLED;
try {
handle.join();
st = handle.getStatus();
} catch (InterruptedException e) {
logger.warn("InterruptedException: ", e);
}
return st;
}
public void cleanForTxn() {
for (CanalSyncChannel channel : idToChannels.values()) {
if (channel.isTxnInit()) {
channel.clearTxn();
}
}
}
@Override
public void process() {
while (running) {
try {
int totalSize = 0;
long totalMemSize = 0L;
long startTime = System.currentTimeMillis();
beginForTxn();
while (running) {
Events<CanalEntry.Entry, EntryPosition> dataEvents = null;
try {
dataEvents = dataBlockingQueue.poll(CanalConfigs.pollWaitingTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// do nothing
}
if (dataEvents == null) {
// If not, continue to wait for the next batch of data
if (totalSize >= MIN_COMMIT_EVENT_SIZE || totalMemSize >= MIN_COMMIT_MEM_SIZE) {
break;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException e) {
// do nothing
}
} else {
if (debug) {
// print summary of the batch
CanalUtils.printSummary(dataEvents);
}
List<CanalEntry.Entry> entries = dataEvents.getDatas();
int size = entries.size();
ackBatches.add(dataEvents.getId());
positionMeta.addBatch(dataEvents.getId(), dataEvents.getPositionRange());
executeOneBatch(dataEvents);
totalSize += size;
totalMemSize += dataEvents.getMemSize();
// size of bytes received so far is larger than max commit memory size.
if (totalMemSize >= MAX_COMMIT_MEM_SIZE) {
break;
}
}
if (System.currentTimeMillis() - startTime >= commitIntervalSecond * 1000) {
break;
}
}
// wait all channels done
Status st = waitForTxn();
if (!running) {
abortForTxn("stopping client");
continue;
}
if (st.ok()) {
commitForTxn();
} else {
abortForTxn(st.getErrorMsg());
syncJob.cancel(SyncFailMsg.MsgType.RUN_FAIL, st.getErrorMsg());
}
} catch (Exception e) {
logger.error("Executor is error!", e);
abortForTxn(e.getMessage());
syncJob.cancel(SyncFailMsg.MsgType.SUBMIT_FAIL, e.getMessage());
} finally {
cleanForTxn();
}
}
}
public void put(Message message, int size) {
List<CanalEntry.Entry> entries;
if (message.isRaw()) {
entries = new ArrayList<>(message.getRawEntries().size());
for (ByteString rawEntry : message.getRawEntries()) {
try {
entries.add(CanalEntry.Entry.parseFrom(rawEntry));
} catch (InvalidProtocolBufferException e) {
throw new CanalException(e);
}
}
} else {
entries = message.getEntries();
}
int startIndex = 0;
// if last ack position is null, it is the first time to consume batch
EntryPosition lastAckPosition = positionMeta.getAckPosition();
if (lastAckPosition != null) {
EntryPosition firstPosition = EntryPosition.createPosition(entries.get(0));
// only get data after the last ack position
if (EntryPosition.min(firstPosition, lastAckPosition).equals(firstPosition)) {
for (int i = 0; i <= entries.size() - 1; i++) {
startIndex++;
if (EntryPosition.checkPosition(entries.get(i), lastAckPosition)) {
break;
}
}
}
}
if (startIndex <= size - 1) {
Events<CanalEntry.Entry, EntryPosition> dataEvents = new Events<>(message.getId());
PositionRange<EntryPosition> range = new PositionRange<>();
dataEvents.setPositionRange(range);
range.setStart(EntryPosition.createPosition(entries.get(startIndex)));
range.setEnd(EntryPosition.createPosition(entries.get(size - 1)));
dataEvents.setDatas(entries);
long memsize = 0L;
for (CanalEntry.Entry entry : entries) {
memsize += entry.getHeader().getEventLength();
}
dataEvents.setMemSize(memsize);
try {
dataBlockingQueue.put(dataEvents);
} catch (InterruptedException e) {
logger.error("put message to executor error:", e);
throw new CanalException(e);
}
}
}
private void executeOneBatch(Events<CanalEntry.Entry, EntryPosition> dataEvents) throws UserException {
final long batchId = dataEvents.getId();
Map<String, CanalSyncChannel> preferChannels = Maps.newHashMap();
Map<String, CanalSyncChannel> secondaryChannels = Maps.newHashMap();
EntryPosition startPosition = dataEvents.getPositionRange().getStart();
EntryPosition endPosition = dataEvents.getPositionRange().getEnd();
for (CanalSyncChannel channel : idToChannels.values()) {
String key = CanalUtils.getFullName(channel.getSrcDataBase(), channel.getSrcTable());
// if last commit position is null, it is the first time to execute batch
EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId());
if (commitPosition != null) {
if (commitPosition.compareTo(startPosition) < 0) {
preferChannels.put(key, channel);
} else if (commitPosition.compareTo(endPosition) < 0) {
secondaryChannels.put(key, channel);
}
} else {
preferChannels.put(key, channel);
}
}
// distribute data to channels
for (CanalEntry.Entry entry : dataEvents.getDatas()) {
CanalEntry.EntryType entryType = entry.getEntryType();
try {
if (entryType == CanalEntry.EntryType.ROWDATA) {
CanalEntry.RowChange rowChange;
try {
rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new CanalException("parse event has an error , data:" + entry.toString(), e);
}
final CanalEntry.Header header = entry.getHeader();
final CanalEntry.EventType eventType = rowChange.getEventType();
if (!CanalUtils.isDML(eventType) || rowChange.getIsDdl()) {
String sql = rowChange.getSql();
processDDL(header, eventType, sql);
return;
}
String schemaTableName = CanalUtils.getFullName(header.getSchemaName(), header.getTableName());
if (preferChannels.containsKey(schemaTableName)) {
CanalSyncChannel channel = preferChannels.get(schemaTableName);
channel.submit(batchId, eventType, rowChange);
} else if (secondaryChannels.containsKey(schemaTableName)) {
CanalSyncChannel channel = secondaryChannels.get(schemaTableName);
EntryPosition position = EntryPosition.createPosition(entry);
EntryPosition commitPosition = positionMeta.getCommitPosition(channel.getId());
if (position.compareTo(commitPosition) > 0) {
channel.submit(batchId, eventType, rowChange);
}
}
// print row
if (debug) {
CanalUtils.printRow(rowChange, header);
}
}
} catch (Exception e) {
logger.error("execute event has an error, data: {}, msg: {}", entry.toString(), e);
throw new UserException("execute batch failed, batchId: " + batchId + " ,msg: " + e.getMessage());
}
}
}
// currently not support ddl
private void processDDL(CanalEntry.Header header, CanalEntry.EventType eventType, String sql) {
String table = header.getSchemaName() + "." + header.getTableName();
switch (eventType) {
case CREATE:
logger.warn("parse create table event, table: {}, sql: {}", table, sql);
return;
case ALTER:
logger.warn("parse alter table event, table: {}, sql: {}", table, sql);
return;
case TRUNCATE:
logger.warn("parse truncate table event, table: {}, sql: {}", table, sql);
return;
case ERASE:
case QUERY:
logger.warn("parse event : {}, sql: {} . ignored!", eventType.name(), sql);
return;
case RENAME:
logger.warn("parse rename table event, table: {}, sql: {}", table, sql);
return;
case CINDEX:
logger.warn("parse create index event, table: {}, sql: {}", table, sql);
return;
case DINDEX:
logger.warn("parse delete index event, table: {}, sql: {}", table, sql);
return;
default:
logger.warn("parse unknown event: {}, table: {}, sql: {}", eventType.name(), table, sql);
break;
}
}
private void ack() {
if (!ackBatches.isEmpty()) {
logger.info("client ack batches: {}", ackBatches);
while (!ackBatches.isEmpty()) {
// ack the oldest batch
long minBatchId = Collections.min(ackBatches);
connector.ack(minBatchId);
ackBatches.remove(minBatchId);
PositionRange<EntryPosition> positionRange = positionMeta.removeBatch(minBatchId);
positionMeta.setAckPosition(positionRange.getEnd());
positionMeta.setAckTime(System.currentTimeMillis());
}
}
}
private void rollback() {
holdGetLock();
try {
// Wait for the receiver to put the last message into the queue before clearing queue
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
// ignore
}
if (!ackBatches.isEmpty()) {
connector.rollback();
}
} finally {
releaseGetLock();
}
dataBlockingQueue.clear();
ackBatches.clear();
positionMeta.clearAllBatch();
}
public String getPositionInfo() {
EntryPosition ackPosition = positionMeta.getAckPosition();
long ackTime = positionMeta.getAckTime();
StringBuilder sb = new StringBuilder();
if (ackPosition != null) {
DateTimeFormatter format = DateTimeFormatter.ofPattern(DATE_FORMAT).withZone(ZoneId.systemDefault());
long executeTime = ackPosition.getExecuteTime();
long delayTime = ackTime - executeTime;
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(System.currentTimeMillis()),
ZoneId.systemDefault());
sb.append("position:").append(ackPosition)
.append(", executeTime:[").append(format.format(date)).append("], ")
.append("delay:").append(delayTime).append("ms");
if (StringUtils.isNotEmpty(ackPosition.getGtid())) {
sb.append(", gtid(").append(ackPosition.getGtid())
.append(") ");
}
} else {
sb.append("position:").append("N/A");
}
return sb.toString();
}
private void cleanUp() {
dataBlockingQueue.clear();
ackBatches.clear();
positionMeta.cleanUp();
}
private void holdGetLock() {
getLock.lock();
}
private void releaseGetLock() {
getLock.unlock();
}
}