CanalUtils.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.sync.canal;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.sync.model.Events;
import org.apache.doris.load.sync.position.EntryPosition;
import com.alibaba.otter.canal.common.CanalException;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.UnsupportedEncodingException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.List;
public class CanalUtils {
private static Logger logger = LogManager.getLogger(CanalUtils.class);
private static final String SEP = SystemUtils.LINE_SEPARATOR;
private static String context_format = null;
private static String row_format = null;
private static String transaction_format = null;
static {
context_format = SEP + "----------- Batch Summary ------------------------------>" + SEP;
context_format += "| Batch Id: [{}] ,count : [{}] , Mem size : [{}] , Time : {}" + SEP;
context_format += "| Start : [{}] " + SEP;
context_format += "| End : [{}] " + SEP;
context_format += "----------------------------------------------------------" + SEP;
row_format = SEP
+ "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} ,"
+ " executeTime : {}({}) , gtid : ({}) , delay : {} ms"
+ SEP;
transaction_format = SEP
+ "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
+ SEP;
}
public static void printSummary(Events<CanalEntry.Entry, EntryPosition> dataEvents) {
List<CanalEntry.Entry> entries = dataEvents.getDatas();
if (CollectionUtils.isEmpty(entries)) {
return;
}
String startPosition = buildPositionForDump(entries.get(0));
String endPosition = buildPositionForDump(entries.get(entries.size() - 1));
logger.info(context_format, dataEvents.getId(), entries.size(), dataEvents.getMemSize(),
TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()), startPosition, endPosition);
}
public static void printSummary(Message message, int size, long memsize) {
List<CanalEntry.Entry> entries = message.getEntries();
if (CollectionUtils.isEmpty(entries)) {
return;
}
String startPosition = buildPositionForDump(message.getEntries().get(0));
String endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
logger.info(context_format, message.getId(), size, memsize,
TimeUtils.getDatetimeFormatWithTimeZone().format(LocalDateTime.now()), startPosition, endPosition);
}
public static String buildPositionForDump(CanalEntry.Entry entry) {
CanalEntry.Header header = entry.getHeader();
long time = entry.getHeader().getExecuteTime();
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(time), ZoneId.systemDefault());
StringBuilder sb = new StringBuilder();
sb.append(header.getLogfileName())
.append(":")
.append(header.getLogfileOffset())
.append(":")
.append(header.getExecuteTime())
.append("(")
.append(TimeUtils.getDatetimeFormatWithTimeZone().format(date))
.append(")");
if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
sb.append(" gtid(").append(entry.getHeader().getGtid())
.append(")");
}
return sb.toString();
}
public static String getFullName(String schemaName, String tableName) {
StringBuilder sb = new StringBuilder();
if (schemaName != null) {
sb.append(schemaName).append(".");
}
sb.append(tableName);
return sb.toString().intern();
}
public static void printRow(CanalEntry.RowChange rowChange, CanalEntry.Header header) {
long executeTime = header.getExecuteTime();
long delayTime = System.currentTimeMillis() - executeTime;
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(executeTime), ZoneId.systemDefault());
CanalEntry.EventType eventType = rowChange.getEventType();
logger.info(row_format, header.getLogfileName(),
String.valueOf(header.getLogfileOffset()), header.getSchemaName(),
header.getTableName(), eventType,
String.valueOf(header.getExecuteTime()), TimeUtils.getDatetimeFormatWithTimeZone().format(date),
header.getGtid(), String.valueOf(delayTime));
if (eventType == CanalEntry.EventType.QUERY || rowChange.getIsDdl()) {
logger.info(" sql ----> " + rowChange.getSql() + SEP);
return;
}
printXAInfo(rowChange.getPropsList());
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (eventType == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
printColumn(rowData.getAfterColumnsList());
}
}
}
public static void printColumn(List<CanalEntry.Column> columns) {
StringBuilder builder = new StringBuilder();
for (CanalEntry.Column column : columns) {
try {
if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
|| StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
// get value bytes
builder.append(column.getName())
.append(" : ")
.append(new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
} else {
builder.append(column.getName())
.append(" : ")
.append(column.getValue());
}
} catch (UnsupportedEncodingException e) {
// CHECKSTYLE IGNORE THIS LINE
}
builder.append(" type=").append(column.getMysqlType());
if (column.getUpdated()) {
builder.append(" update=").append(column.getUpdated());
}
builder.append(SEP);
}
logger.info(builder.toString());
}
public static void printXAInfo(List<CanalEntry.Pair> pairs) {
if (pairs == null) {
return;
}
String xaType = null;
String xaXid = null;
for (CanalEntry.Pair pair : pairs) {
String key = pair.getKey();
if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
xaType = pair.getValue();
} else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
xaXid = pair.getValue();
}
}
if (xaType != null && xaXid != null) {
logger.info(" ------> " + xaType + " " + xaXid);
}
}
public static void transactionBegin(CanalEntry.Entry entry) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = System.currentTimeMillis() - executeTime;
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(executeTime), ZoneId.systemDefault());
CanalEntry.TransactionBegin begin = null;
try {
begin = CanalEntry.TransactionBegin.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new CanalException("parse event has an error , data:" + entry.toString(), e);
}
// print transaction begin info, thread ID, time consumption
logger.info(transaction_format, entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()),
TimeUtils.getDatetimeFormatWithTimeZone().format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime));
logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
printXAInfo(begin.getPropsList());
}
public static void transactionEnd(CanalEntry.Entry entry) {
long executeTime = entry.getHeader().getExecuteTime();
long delayTime = System.currentTimeMillis() - executeTime;
LocalDateTime date = LocalDateTime.ofInstant(Instant.ofEpochMilli(executeTime), ZoneId.systemDefault());
CanalEntry.TransactionEnd end = null;
try {
end = CanalEntry.TransactionEnd.parseFrom(entry.getStoreValue());
} catch (InvalidProtocolBufferException e) {
throw new CanalException("parse event has an error , data:" + entry.toString(), e);
}
// print transaction end info, transaction ID
logger.info("----------------\n");
logger.info(" END ----> transaction id: {}", end.getTransactionId());
printXAInfo(end.getPropsList());
logger.info(transaction_format, entry.getHeader().getLogfileName(),
String.valueOf(entry.getHeader().getLogfileOffset()),
String.valueOf(entry.getHeader().getExecuteTime()),
TimeUtils.getDatetimeFormatWithTimeZone().format(date),
entry.getHeader().getGtid(), String.valueOf(delayTime));
}
public static boolean isDML(CanalEntry.EventType eventType) {
switch (eventType) {
case INSERT:
case UPDATE:
case DELETE:
return true;
default:
return false;
}
}
}