CanalSyncJob.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.load.sync.canal;
import org.apache.doris.analysis.BinlogDesc;
import org.apache.doris.analysis.ChannelDescription;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.LogBuilder;
import org.apache.doris.common.util.LogKey;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.load.sync.DataSyncJobType;
import org.apache.doris.load.sync.SyncFailMsg;
import org.apache.doris.load.sync.SyncFailMsg.MsgType;
import org.apache.doris.load.sync.SyncJob;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
public class CanalSyncJob extends SyncJob {
private static final Logger LOG = LogManager.getLogger(CanalSyncJob.class);
protected static final String CANAL_SERVER_IP = "canal.server.ip";
protected static final String CANAL_SERVER_PORT = "canal.server.port";
protected static final String CANAL_DESTINATION = "canal.destination";
protected static final String CANAL_USERNAME = "canal.username";
protected static final String CANAL_PASSWORD = "canal.password";
protected static final String CANAL_BATCH_SIZE = "canal.batchSize";
protected static final String CANAL_DEBUG = "canal.debug";
@SerializedName(value = "remote")
private final CanalDestination remote;
@SerializedName(value = "username")
private String username;
@SerializedName(value = "password")
private String password;
@SerializedName(value = "batchSize")
private int batchSize = 8192;
@SerializedName(value = "debug")
private boolean debug = false;
private transient SyncCanalClient client;
public CanalSyncJob(long id, String jobName, long dbId) {
super(id, jobName, dbId);
this.dataSyncJobType = DataSyncJobType.CANAL;
this.remote = new CanalDestination("", 0, "");
}
private void init() throws DdlException {
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(remote.getIp(), remote.getPort()), remote.getDestination(), username, password);
// create channels
initChannels();
// create client
client = new SyncCanalClient(this, remote.getDestination(), connector, batchSize, debug);
// register channels into client
client.registerChannels(channels);
}
public void initChannels() throws DdlException {
if (channels == null) {
channels = Lists.newArrayList();
}
Database db = Env.getCurrentInternalCatalog().getDbOrDdlException(dbId);
db.writeLock();
try {
for (ChannelDescription channelDescription : channelDescriptions) {
String tableName = channelDescription.getTargetTable();
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
if (olapTable.getKeysType() != KeysType.UNIQUE_KEYS || !olapTable.hasDeleteSign()) {
throw new DdlException("Table[" + tableName + "] don't support batch delete.");
}
List<String> colNames = channelDescription.getColNames();
if (colNames == null) {
colNames = Lists.newArrayList();
for (Column column : olapTable.getBaseSchema(false)) {
colNames.add(column.getName());
}
}
CanalSyncChannel syncChannel = new CanalSyncChannel(channelDescription.getChannelId(), this, db,
olapTable, colNames, channelDescription.getSrcDatabase(), channelDescription.getSrcTableName());
if (channelDescription.getPartitionNames() != null) {
syncChannel.setPartitions(channelDescription.getPartitionNames());
}
channels.add(syncChannel);
}
} finally {
db.writeUnlock();
}
}
@Override
public void checkAndSetBinlogInfo(BinlogDesc binlogDesc) throws DdlException {
super.checkAndSetBinlogInfo(binlogDesc);
Map<String, String> properties = binlogDesc.getProperties();
remote.parse(properties);
if (!properties.containsKey(CANAL_USERNAME)) {
throw new DdlException("Missing " + CANAL_USERNAME + " property in binlog properties");
} else {
username = properties.get(CANAL_USERNAME);
}
if (!properties.containsKey(CANAL_PASSWORD)) {
throw new DdlException("Missing " + CANAL_PASSWORD + " property in binlog properties");
} else {
password = properties.get(CANAL_PASSWORD);
}
// optional
if (properties.containsKey(CANAL_BATCH_SIZE)) {
try {
batchSize = Integer.parseInt(properties.get(CANAL_BATCH_SIZE));
} catch (NumberFormatException e) {
throw new DdlException("Property " + CANAL_BATCH_SIZE + " is not int");
}
}
// optional
if (properties.containsKey(CANAL_DEBUG)) {
debug = Boolean.parseBoolean(properties.get(CANAL_DEBUG));
}
}
public boolean isInit() {
return client != null && channels != null;
}
public boolean isNeedReschedule() {
return jobState == JobState.RUNNING && !isInit();
}
@Override
public void execute() throws UserException {
LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
.add("remote ip", remote.getIp())
.add("remote port", remote.getPort())
.add("msg", "Try to start canal client.")
.add("debug", debug)
.build());
// init
if (!isInit()) {
init();
}
// start client
unprotectedStartClient();
}
@Override
public void cancel(MsgType msgType, String errMsg) {
try {
switch (msgType) {
case SUBMIT_FAIL:
case RUN_FAIL:
case UNKNOWN:
unprotectedStopClient(JobState.PAUSED);
break;
case SCHEDULE_FAIL:
case USER_CANCEL:
unprotectedStopClient(JobState.CANCELLED);
break;
default:
Preconditions.checkState(false, "unknown msg type: " + msgType.name());
break;
}
failMsg = new SyncFailMsg(msgType, errMsg);
LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
.add("MsgType", msgType.name())
.add("msg", "Cancel canal sync job.")
.add("errMsg", errMsg)
.build());
} catch (UserException e) {
LOG.warn(new LogBuilder(LogKey.SYNC_JOB, id)
.add("msg", "Failed to cancel canal sync job.")
.build(), e);
}
}
@Override
public void pause() throws UserException {
unprotectedStopClient(JobState.PAUSED);
LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
.add("remote ip", remote.getIp())
.add("remote port", remote.getPort())
.add("msg", "Pause canal sync job.")
.add("debug", debug)
.build());
}
@Override
public void resume() throws UserException {
updateState(JobState.PENDING, false);
LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
.add("remote ip", remote.getIp())
.add("remote port", remote.getPort())
.add("msg", "Resume canal sync job.")
.add("debug", debug)
.build());
}
public void unprotectedStartClient() throws UserException {
client.startup();
updateState(JobState.RUNNING, false);
LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
.add("name", jobName)
.add("msg", "Client has been started.")
.build());
}
public void unprotectedStopClient(JobState jobState) throws UserException {
if (jobState != JobState.CANCELLED && jobState != JobState.PAUSED) {
return;
}
if (client != null) {
client.shutdown(true);
}
updateState(jobState, false);
LOG.info(new LogBuilder(LogKey.SYNC_JOB, id)
.add("name", jobName)
.add("msg", "Client has been stopped.")
.build());
}
@Override
public void replayUpdateSyncJobState(SyncJobUpdateStateInfo info) {
lastStartTimeMs = info.getLastStartTimeMs();
lastStopTimeMs = info.getLastStopTimeMs();
finishTimeMs = info.getFinishTimeMs();
failMsg = info.getFailMsg();
try {
JobState jobState = info.getJobState();
switch (jobState) {
case PENDING:
updateState(JobState.PENDING, true);
break;
case RUNNING:
updateState(JobState.RUNNING, true);
break;
case PAUSED:
updateState(JobState.PAUSED, true);
break;
case CANCELLED:
updateState(JobState.CANCELLED, true);
break;
default:
throw new UserException("job state is invalid: " + jobState);
}
} catch (UserException e) {
LOG.error(new LogBuilder(LogKey.SYNC_JOB, id)
.add("desired_state", info.getJobState())
.add("msg", "replay update state error.")
.add("reason", e.getMessage())
.build(), e);
}
LOG.info(new LogBuilder(LogKey.SYNC_JOB, info.getId())
.add("desired_state:", info.getJobState())
.add("msg", "replay update sync job state")
.build());
}
@Override
public String getStatus() {
if (client != null) {
return client.getPositionInfo();
}
return "\\N";
}
@Override
public String getJobConfig() {
StringBuilder sb = new StringBuilder();
sb.append("address:").append(remote.getIp()).append(":").append(remote.getPort()).append(",")
.append("destination:").append(remote.getDestination()).append(",")
.append("batchSize:").append(batchSize);
return sb.toString();
}
public CanalDestination getRemote() {
return remote;
}
@Override
public String toString() {
return "SyncJob [jobId=" + id
+ ", jobName=" + jobName
+ ", dbId=" + dbId
+ ", state=" + jobState
+ ", createTimeMs=" + TimeUtils.longToTimeString(createTimeMs)
+ ", lastStartTimeMs=" + TimeUtils.longToTimeString(lastStartTimeMs)
+ ", lastStopTimeMs=" + TimeUtils.longToTimeString(lastStopTimeMs)
+ ", finishTimeMs=" + TimeUtils.longToTimeString(finishTimeMs)
+ "]";
}
}