FeServiceClient.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.datasource.doris;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.UserException;
import org.apache.doris.common.io.Text;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TAbortRemoteTxnRequest;
import org.apache.doris.thrift.TAbortRemoteTxnResult;
import org.apache.doris.thrift.TAddOrDropPartitionsRequest;
import org.apache.doris.thrift.TAddOrDropPartitionsResult;
import org.apache.doris.thrift.TBeginRemoteTxnRequest;
import org.apache.doris.thrift.TBeginRemoteTxnResult;
import org.apache.doris.thrift.TCommitRemoteTxnRequest;
import org.apache.doris.thrift.TCommitRemoteTxnResult;
import org.apache.doris.thrift.TGetBackendMetaRequest;
import org.apache.doris.thrift.TGetBackendMetaResult;
import org.apache.doris.thrift.TGetOlapTableMetaRequest;
import org.apache.doris.thrift.TGetOlapTableMetaResult;
import org.apache.doris.thrift.TInsertOverwriteRecordRequest;
import org.apache.doris.thrift.TInsertOverwriteRecordResult;
import org.apache.doris.thrift.TInsertOverwriteRegisterRequest;
import org.apache.doris.thrift.TInsertOverwriteRegisterResult;
import org.apache.doris.thrift.TInsertOverwriteTaskRequest;
import org.apache.doris.thrift.TInsertOverwriteTaskResult;
import org.apache.doris.thrift.TMasterAddressRequest;
import org.apache.doris.thrift.TMasterAddressResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPartitionMeta;
import org.apache.doris.thrift.TRecordFinishedLoadJobRequest;
import org.apache.doris.thrift.TRecordFinishedLoadJobResult;
import org.apache.doris.thrift.TReplacePartitionsRequest;
import org.apache.doris.thrift.TReplacePartitionsResult;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
public class FeServiceClient {
private static final Logger LOG = LogManager.getLogger(FeServiceClient.class);
private final Random random = new Random(System.currentTimeMillis());
private final String name;
private final List<TNetworkAddress> addresses;
private final String user;
private final String password;
private final int retryCount;
private final int timeoutMs;
private volatile TNetworkAddress master;
public FeServiceClient(String name, List<TNetworkAddress> addresses, String user, String password,
int retryCount, int timeoutS) {
this.name = name;
this.addresses = addresses;
this.user = user;
this.password = password;
this.retryCount = retryCount;
this.timeoutMs = (int) TimeUnit.SECONDS.toMillis(timeoutS);
}
private List<TNetworkAddress> getAddresses() {
return addresses;
}
private FrontendService.Client getRemoteFeClient(TNetworkAddress address, int timeoutMs) {
try {
return ClientPool.frontendPool.borrowObject(address, timeoutMs);
} catch (Exception e) {
String msg = String.format("failed to get remote doris:%s fe connection", name);
throw new RuntimeException(msg, e);
}
}
private void returnClient(TNetworkAddress address, FrontendService.Client client, boolean returnObj) {
if (returnObj) {
ClientPool.frontendPool.returnObject(address, client);
} else {
ClientPool.frontendPool.invalidateObject(address, client);
}
}
private <T> T randomCallWithRetry(ThriftCall<T> call, String errorMsg, int timeout) {
List<TNetworkAddress> addresses = getAddresses();
int retries = 0;
Exception lastException = null;
while (retries < retryCount) {
int index = random.nextInt(addresses.size());
FrontendService.Client client = null;
for (int i = 0; i < addresses.size() && retries < retryCount; i++) {
TNetworkAddress address = addresses.get((index + i) % addresses.size());
client = getRemoteFeClient(address, timeout);
boolean returnObj = false;
try {
T result = call.call(client);
returnObj = true;
return result;
} catch (TException | IOException e) {
lastException = e;
retries++;
} catch (Exception e) {
throw new RuntimeException(errorMsg + ":" + e.getMessage(), e);
} finally {
returnClient(address, client, returnObj);
}
}
}
String lastMessage = lastException == null ? "unknown" : lastException.getMessage();
throw new RuntimeException(errorMsg + ":" + lastMessage, lastException);
}
public static class TResultAdapter {
public static TStatus getStatus(Object obj) {
if (obj instanceof TBeginRemoteTxnResult) {
return ((TBeginRemoteTxnResult) obj).getStatus();
} else if (obj instanceof TCommitRemoteTxnResult) {
return ((TCommitRemoteTxnResult) obj).getStatus();
} else if (obj instanceof TAbortRemoteTxnResult) {
return ((TAbortRemoteTxnResult) obj).getStatus();
} else if (obj instanceof TMasterAddressResult) {
return ((TMasterAddressResult) obj).getStatus();
} else if (obj instanceof TAddOrDropPartitionsResult) {
return ((TAddOrDropPartitionsResult) obj).getStatus();
} else if (obj instanceof TReplacePartitionsResult) {
return ((TReplacePartitionsResult) obj).getStatus();
} else if (obj instanceof TInsertOverwriteRegisterResult) {
return ((TInsertOverwriteRegisterResult) obj).getStatus();
} else if (obj instanceof TInsertOverwriteTaskResult) {
return ((TInsertOverwriteTaskResult) obj).getStatus();
} else if (obj instanceof TInsertOverwriteRecordResult) {
return ((TInsertOverwriteRecordResult) obj).getStatus();
} else if (obj instanceof TRecordFinishedLoadJobResult) {
return ((TRecordFinishedLoadJobResult) obj).getStatus();
}
throw new IllegalArgumentException("unsupported result type: " + obj.getClass().getName());
}
public static TNetworkAddress getMasterAddress(Object obj) {
if (obj instanceof TBeginRemoteTxnResult) {
return ((TBeginRemoteTxnResult) obj).getMasterAddress();
} else if (obj instanceof TCommitRemoteTxnResult) {
return ((TCommitRemoteTxnResult) obj).getMasterAddress();
} else if (obj instanceof TAbortRemoteTxnResult) {
return ((TAbortRemoteTxnResult) obj).getMasterAddress();
} else if (obj instanceof TMasterAddressResult) {
return ((TMasterAddressResult) obj).getMasterAddress();
} else if (obj instanceof TAddOrDropPartitionsResult) {
return ((TAddOrDropPartitionsResult) obj).getMasterAddress();
} else if (obj instanceof TReplacePartitionsResult) {
return ((TReplacePartitionsResult) obj).getMasterAddress();
} else if (obj instanceof TInsertOverwriteRegisterResult) {
return ((TInsertOverwriteRegisterResult) obj).getMasterAddress();
} else if (obj instanceof TInsertOverwriteTaskResult) {
return ((TInsertOverwriteTaskResult) obj).getMasterAddress();
} else if (obj instanceof TInsertOverwriteRecordResult) {
return ((TInsertOverwriteRecordResult) obj).getMasterAddress();
} else if (obj instanceof TRecordFinishedLoadJobResult) {
return ((TRecordFinishedLoadJobResult) obj).getMasterAddress();
}
throw new IllegalArgumentException("unsupported result type: " + obj.getClass().getName());
}
}
private <T> T masterCallWithRetry(ThriftCall<T> call, String errorMsg, int timeout) {
List<TNetworkAddress> addresses = getAddresses();
int retries = 0;
Exception lastException = null;
int index = random.nextInt(addresses.size());
if (master == null) {
master = addresses.get((index) % addresses.size());
}
FrontendService.Client client = null;
while (retries < retryCount) {
TNetworkAddress clientAddr = master;
client = getRemoteFeClient(clientAddr, timeout);
boolean returnObj = false;
try {
T result = call.call(client);
returnObj = true;
if (TResultAdapter.getStatus(result).getStatusCode() == TStatusCode.NOT_MASTER) {
if (TResultAdapter.getMasterAddress(result) != null) {
master = TResultAdapter.getMasterAddress(result);
}
} else {
return result;
}
index++;
retries++;
} catch (TException | IOException e) {
lastException = e;
retries++;
index++;
master = addresses.get((index) % addresses.size());
} catch (Exception e) {
throw new RuntimeException(errorMsg + ":" + e.getMessage(), e);
} finally {
returnClient(clientAddr, client, returnObj);
}
}
String lastMessage = lastException == null ? "master not found in retry times" : lastException.getMessage();
throw new RuntimeException(errorMsg + ":" + lastMessage, lastException);
}
public List<Backend> listBackends() {
TGetBackendMetaRequest request = new TGetBackendMetaRequest();
request.setUser(user);
request.setPasswd(password);
String msg = String.format("failed to get backends from remote doris:%s", name);
return randomCallWithRetry(client -> {
TGetBackendMetaResult result = client.getBackendMeta(request);
return result.getBackends().stream()
.map(b -> Backend.fromThrift(b))
.collect(Collectors.toList());
}, msg, timeoutMs);
}
public RemoteOlapTable getOlapTable(String dbName, String table, long tableId, List<Partition> partitions,
List<Partition> tempPartitions) {
TGetOlapTableMetaRequest request = new TGetOlapTableMetaRequest();
request.setDb(dbName);
request.setTable(table);
request.setTableId(tableId);
request.setUser(user);
request.setPasswd(password);
request.setVersion(FeConstants.meta_version);
for (Partition partition : partitions) {
TPartitionMeta meta = new TPartitionMeta();
meta.setId(partition.getId());
meta.setVisibleVersion(partition.getVisibleVersion());
meta.setVisibleVersionTime(partition.getVisibleVersionTime());
request.addToPartitions(meta);
}
for (Partition partition : tempPartitions) {
TPartitionMeta meta = new TPartitionMeta();
meta.setId(partition.getId());
meta.setVisibleVersion(partition.getVisibleVersion());
meta.setVisibleVersionTime(partition.getVisibleVersionTime());
request.addToTempPartitions(meta);
}
String msg = String.format("failed to get table meta from remote doris:%s", name);
return randomCallWithRetry(client -> {
TGetOlapTableMetaResult result = client.getOlapTableMeta(request);
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new UserException(result.getStatus().toString());
}
RemoteOlapTable remoteOlapTable = null;
try (DataInputStream in = new DataInputStream(new ByteArrayInputStream(result.getTableMeta()))) {
OlapTable olapTable = OlapTable.read(in);
remoteOlapTable = RemoteOlapTable.fromOlapTable(olapTable);
}
List<Partition> updatedPartitions = new ArrayList<>(result.getUpdatedPartitionsSize());
if (result.getUpdatedPartitionsSize() > 0) {
for (ByteBuffer buffer : result.getUpdatedPartitions()) {
try (ByteArrayInputStream in =
new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining());
DataInputStream dataInputStream = new DataInputStream(in)) {
String partitionStr = Text.readString(dataInputStream);
Partition partition = GsonUtils.GSON.fromJson(partitionStr, Partition.class);
updatedPartitions.add(partition);
}
}
}
List<Long> removedPartitions = result.getRemovedPartitions();
if (removedPartitions == null) {
removedPartitions = new ArrayList<>();
}
remoteOlapTable.rebuildPartitions(partitions, updatedPartitions, removedPartitions);
// rebuild temp partitions
if (result.isSetUpdatedTempPartitions() && result.getUpdatedTempPartitionsSize() > 0) {
updatedPartitions = new ArrayList<>(result.getUpdatedTempPartitionsSize());
for (ByteBuffer buffer : result.getUpdatedTempPartitions()) {
try (ByteArrayInputStream in =
new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.remaining());
DataInputStream dataInputStream = new DataInputStream(in)) {
String partitionStr = Text.readString(dataInputStream);
Partition partition = GsonUtils.GSON.fromJson(partitionStr, Partition.class);
updatedPartitions.add(partition);
}
}
}
removedPartitions = result.getRemovedTempPartitions();
if (removedPartitions == null) {
removedPartitions = new ArrayList<>();
}
remoteOlapTable.rebuildTempPartitions(tempPartitions, updatedPartitions, removedPartitions);
return remoteOlapTable;
}, msg, timeoutMs);
}
public TBeginRemoteTxnResult beginRemoteTxn(TBeginRemoteTxnRequest request) throws Exception {
request.setUser(user);
request.setPasswd(password);
String msg = String.format("failed to begin remote txn from remote doris:%s, label=%s", name,
request.getLabel());
long startTime = System.currentTimeMillis();
TBeginRemoteTxnResult result;
try {
result = masterCallWithRetry(client -> client.beginRemoteTxn(request), msg,
Math.max(timeoutMs, (int) request.getTimeoutMs()));
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("begin remote txn for catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
long costMs = System.currentTimeMillis() - startTime;
LOG.info("begin remote txn for catalog {} finished, cost={}ms, statusCode={}",
name, costMs, result.getStatus().getStatusCode());
return result;
}
public TCommitRemoteTxnResult commitRemoteTxn(TCommitRemoteTxnRequest request) throws Exception {
request.setUser(user);
request.setPasswd(password);
String msg = String.format("failed to commit remote txn from remote doris:%s, txnId=%d", name,
request.getTxnId());
long startTime = System.currentTimeMillis();
TCommitRemoteTxnResult result;
try {
result = masterCallWithRetry(client -> client.commitRemoteTxn(request), msg,
Math.max(timeoutMs, (int) request.getInsertVisibleTimeoutMs()));
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("commit remote txn for catalog {} failed, txnId={}, cost={}ms",
name, request.getTxnId(), costMs, e);
throw e;
}
long costMs = System.currentTimeMillis() - startTime;
LOG.info("commit remote txn for catalog {} finished, txnId={}, cost={}ms, statusCode={}",
name, request.getTxnId(), costMs, result.getStatus().getStatusCode());
return result;
}
public TAbortRemoteTxnResult abortRemoteTxn(TAbortRemoteTxnRequest request) throws Exception {
request.setUser(user);
request.setPasswd(password);
String msg = String.format("failed to abort remote txn from remote doris:%s, txnId=%d", name,
request.getTxnId());
long startTime = System.currentTimeMillis();
TAbortRemoteTxnResult result;
try {
result = masterCallWithRetry(client -> client.abortRemoteTxn(request), msg, timeoutMs);
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("abort remote txn for catalog {} failed, txnId={}, cost={}ms",
name, request.getTxnId(), costMs, e);
throw e;
}
long costMs = System.currentTimeMillis() - startTime;
LOG.info("abort remote txn for catalog {} finished, txnId={}, cost={}ms, statusCode={}",
name, request.getTxnId(), costMs, result.getStatus().getStatusCode());
return result;
}
private interface ThriftCall<T> {
public T call(FrontendService.Client client) throws Exception;
}
public TNetworkAddress getMasterAddress() throws RuntimeException {
TMasterAddressRequest request = new TMasterAddressRequest();
request.setUser(user);
request.setPasswd(password);
long startTime = System.currentTimeMillis();
TMasterAddressResult result;
try {
result = masterCallWithRetry(client -> client.getMasterAddress(request),
"failed to get master address from remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new RuntimeException(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("get master address for catalog {} failed, cost={}ms", name, costMs, e);
throw new RuntimeException(e.getMessage());
}
return master;
}
public void addPartitions(String dbName, String tableName, List<String> partitionNames,
List<String> tempPartitionNames, boolean isTemp) throws DdlException {
TAddOrDropPartitionsRequest request = new TAddOrDropPartitionsRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setPartitionNames(partitionNames);
request.setTempPartitionNames(tempPartitionNames);
request.setIsTemp(isTemp);
request.setIsDrop(false);
long startTime = System.currentTimeMillis();
TAddOrDropPartitionsResult result;
try {
result = masterCallWithRetry(client -> client.addOrDropPartitions(request),
"failed to add partitions to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new DdlException(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("add partitions to catalog {} failed, cost={}ms", name, costMs, e);
throw new DdlException(e.getMessage());
}
}
public boolean dropPartitions(String dbName, String tableName, List<String> partitionNames, boolean isTemp,
boolean isForce) {
TAddOrDropPartitionsRequest request = new TAddOrDropPartitionsRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setPartitionNames(partitionNames);
request.setIsTemp(isTemp);
request.setIsForce(isForce);
request.setIsDrop(true);
long startTime = System.currentTimeMillis();
TAddOrDropPartitionsResult result;
try {
result = masterCallWithRetry(client -> client.addOrDropPartitions(request),
"failed to drop partitions to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
return false;
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("drop partitions to catalog {} failed, cost={}ms", name, costMs, e);
return false;
}
return true;
}
public void replacePartitions(String dbName, String tableName, List<String> partitionNames,
List<String> tempPartitionNames, boolean isForce) throws DdlException {
TReplacePartitionsRequest request = new TReplacePartitionsRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setPartitionNames(partitionNames);
request.setTempPartitionNames(tempPartitionNames);
request.setIsForce(isForce);
long startTime = System.currentTimeMillis();
TReplacePartitionsResult result;
try {
result = masterCallWithRetry(client -> client.replacePartitions(request),
"failed to replace partitions to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new DdlException(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("replace partitions to catalog {} failed, cost={}ms", name, costMs, e);
throw new DdlException(e.getMessage());
}
}
public long registerTask(String dbName, String tableName, List<String> tempPartitionNames) throws Exception {
TInsertOverwriteRegisterRequest request = new TInsertOverwriteRegisterRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setPartitionNames(tempPartitionNames);
long startTime = System.currentTimeMillis();
TInsertOverwriteRegisterResult result;
try {
result = masterCallWithRetry(client -> client.registerInsertOverwriteTask(request),
"failed to register insert overwrite task to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new DdlException(result.getStatus().getErrorMsgs().get(0));
}
return result.getTaskId();
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("register insert overwrite task to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public long registerTaskGroup(String dbName, String tableName) throws Exception {
TInsertOverwriteRegisterRequest request = new TInsertOverwriteRegisterRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteRegisterResult result =
masterCallWithRetry(client -> client.registerInsertOverwriteTask(request),
"failed to register insert overwrite task to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new DdlException(result.getStatus().getErrorMsgs().get(0));
}
return result.getGroupId();
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("register insert overwrite task to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void registerTaskInGroup(long groupId, long taskId) throws Exception {
TInsertOverwriteRegisterRequest request = new TInsertOverwriteRegisterRequest();
request.setUser(user);
request.setPasswd(password);
request.setGroupId(groupId);
request.setTaskId(taskId);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteRegisterResult result =
masterCallWithRetry(client -> client.registerInsertOverwriteTask(request),
"failed to register insert overwrite task to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new DdlException(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("register insert overwrite task to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void taskGroupSuccess(String dbName, String tableName, long groupId) throws DdlException {
TInsertOverwriteTaskRequest request = new TInsertOverwriteTaskRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setGroupId(groupId);
request.setIsSuccess(true);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteTaskResult result =
masterCallWithRetry(client -> client.insertOverwriteTaskAction(request),
"failed to task group success to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new DdlException(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("task group success to catalog {} failed, cost={}ms", name, costMs, e);
throw new DdlException(e.getMessage());
}
}
public void taskSuccess(long taskId) throws Exception {
TInsertOverwriteTaskRequest request = new TInsertOverwriteTaskRequest();
request.setUser(user);
request.setPasswd(password);
request.setIsSuccess(true);
request.setTaskId(taskId);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteTaskResult result =
masterCallWithRetry(client -> client.insertOverwriteTaskAction(request),
"failed to task success to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new Exception(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("task success to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void taskFail(long taskId) throws Exception {
TInsertOverwriteTaskRequest request = new TInsertOverwriteTaskRequest();
request.setUser(user);
request.setPasswd(password);
request.setIsSuccess(false);
request.setTaskId(taskId);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteTaskResult result =
masterCallWithRetry(client -> client.insertOverwriteTaskAction(request),
"failed to task fail to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new Exception(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("task fail to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void taskGroupFail(long groupId) throws Exception {
TInsertOverwriteTaskRequest request = new TInsertOverwriteTaskRequest();
request.setUser(user);
request.setPasswd(password);
request.setGroupId(groupId);
request.setIsSuccess(false);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteTaskResult result =
masterCallWithRetry(client -> client.insertOverwriteTaskAction(request),
"failed to task group fail to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new Exception(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("task group fail to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void recordRunningTableOrException(String dbName, String tableName) throws Exception {
TInsertOverwriteRecordRequest request = new TInsertOverwriteRecordRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setIsAdd(true);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteRecordResult result =
masterCallWithRetry(client -> client.addOrDropInsertOverwriteRecord(request),
"failed to record running table or exception to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new Exception(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("record running table or exception to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void dropRunningRecord(String dbName, String tableName) throws Exception {
TInsertOverwriteRecordRequest request = new TInsertOverwriteRecordRequest();
request.setUser(user);
request.setPasswd(password);
request.setDb(dbName);
request.setTbl(tableName);
request.setIsAdd(false);
long startTime = System.currentTimeMillis();
try {
TInsertOverwriteRecordResult result =
masterCallWithRetry(client -> client.addOrDropInsertOverwriteRecord(request),
"failed to drop running record to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new Exception(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("drop running record to catalog {} failed, cost={}ms", name, costMs, e);
throw e;
}
}
public void recordFinishedLoadJob(String label, long transactionId, String dbName, String tableName,
long createTimestamp, String failMsg, String trackingUrl, String firstErrorMsg,
long jobId) throws MetaNotFoundException {
TRecordFinishedLoadJobRequest request = new TRecordFinishedLoadJobRequest();
request.setUser(user);
request.setPasswd(password);
request.setLabel(label);
request.setDb(dbName);
request.setTbl(tableName);
request.setTxnId(transactionId);
request.setCreateTs(createTimestamp);
request.setFailMsg(failMsg);
request.setTrackingUrl(trackingUrl);
request.setFirstErrMsg(firstErrorMsg);
request.setJobId(jobId);
long startTime = System.currentTimeMillis();
try {
TRecordFinishedLoadJobResult result =
masterCallWithRetry(client -> client.recordFinishedLoadJobRequest(request),
"failed to record finished load job to remote doris:" + name, timeoutMs);
if (result.isSetMasterAddress()) {
master = result.getMasterAddress();
}
if (result.getStatus().getStatusCode() != TStatusCode.OK) {
throw new MetaNotFoundException(result.getStatus().getErrorMsgs().get(0));
}
} catch (Exception e) {
long costMs = System.currentTimeMillis() - startTime;
LOG.warn("record finished load job to catalog {} failed, cost={}ms", name, costMs, e);
throw new MetaNotFoundException(e.getMessage());
}
}
}