InsertStreamTxnExecutor.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.qe;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.common.UserException;
import org.apache.doris.nereids.load.NereidsStreamLoadPlanner;
import org.apache.doris.nereids.load.NereidsStreamLoadTask;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.system.Backend;
import org.apache.doris.system.BeSelectionPolicy;
import org.apache.doris.thrift.TFileCompressType;
import org.apache.doris.thrift.TFileFormatType;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineFragmentParamsList;
import org.apache.doris.thrift.TScanRangeParams;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.transaction.TransactionEntry;
import org.apache.doris.transaction.TransactionState;
import org.apache.thrift.TException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class InsertStreamTxnExecutor {
private long txnId;
private TUniqueId loadId;
private TransactionEntry txnEntry;
public InsertStreamTxnExecutor(TransactionEntry txnEntry) {
this.txnEntry = txnEntry;
}
public void beginTransaction(TStreamLoadPutRequest request) throws UserException, TException, TimeoutException,
InterruptedException, ExecutionException {
TTxnParams txnConf = txnEntry.getTxnConf();
OlapTable table = (OlapTable) txnEntry.getTable();
// StreamLoadTask's id == request's load_id
NereidsStreamLoadTask streamLoadTask = NereidsStreamLoadTask.fromTStreamLoadPutRequest(request);
NereidsStreamLoadPlanner planner = new NereidsStreamLoadPlanner((Database) txnEntry.getDb(), table,
streamLoadTask);
boolean isMowTable = ((OlapTable) txnEntry.getTable()).getEnableUniqueKeyMergeOnWrite();
TPipelineFragmentParamsList pipelineParamsList = new TPipelineFragmentParamsList();
if (!table.tryReadLock(1, TimeUnit.MINUTES)) {
throw new UserException("get table read lock timeout, database=" + table.getDatabase().getId() + ",table="
+ table.getName());
}
try {
// Will using load id as query id in fragment
TPipelineFragmentParams tRequest = planner.plan(streamLoadTask.getId());
tRequest.setTxnConf(txnConf).setImportLabel(txnEntry.getLabel());
tRequest.setIsMowTable(isMowTable);
for (Map.Entry<Integer, List<TScanRangeParams>> entry : tRequest.local_params.get(0).per_node_scan_ranges
.entrySet()) {
for (TScanRangeParams scanRangeParams : entry.getValue()) {
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setFormatType(
TFileFormatType.FORMAT_PROTO);
scanRangeParams.scan_range.ext_scan_range.file_scan_range.params.setCompressType(
TFileCompressType.PLAIN);
}
}
txnConf.setFragmentInstanceId(tRequest.local_params.get(0).fragment_instance_id);
this.loadId = request.getLoadId();
this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
.setHi(loadId.getHi())
.setLo(loadId.getLo()).build());
pipelineParamsList.addToParamsList(tRequest);
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(table.getDatabase().getId(), streamLoadTask.getTxnId());
if (transactionState != null) {
transactionState.addTableIndexes(table);
}
} finally {
table.readUnlock();
}
BeSelectionPolicy policy = new BeSelectionPolicy.Builder().needLoadAvailable().needQueryAvailable().build();
List<Long> beIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
if (beIds.isEmpty()) {
throw new UserException("No available backend to match the policy: " + policy);
}
Backend backend = Env.getCurrentSystemInfo().getBackendsByCurrentCluster().get(beIds.get(0));
txnConf.setUserIp(backend.getHost());
txnEntry.setBackend(backend);
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<InternalService.PExecPlanFragmentResult> future;
future = BackendServiceProxy.getInstance().execPlanFragmentsAsync(address, pipelineParamsList, false);
InternalService.PExecPlanFragmentResult result = future.get(5, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new TException("failed to execute plan fragment: " + result.getStatus().getErrorMsgsList());
}
} catch (RpcException e) {
throw new TException(e);
}
}
public void commitTransaction() throws TException, TimeoutException,
InterruptedException, ExecutionException {
TTxnParams txnConf = txnEntry.getTxnConf();
Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
.setHi(txnConf.getFragmentInstanceId().getHi())
.setLo(txnConf.getFragmentInstanceId().getLo()).build();
Backend backend = txnEntry.getBackend();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<InternalService.PCommitResult> future = BackendServiceProxy
.getInstance().commit(address, fragmentInstanceId, this.txnEntry.getpLoadId());
InternalService.PCommitResult result = future.get(5, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new TException("failed to commit txn: " + result.getStatus().getErrorMsgsList());
}
} catch (RpcException e) {
throw new TException(e);
}
}
public void abortTransaction() throws TException, TimeoutException,
InterruptedException, ExecutionException {
TTxnParams txnConf = txnEntry.getTxnConf();
Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
.setHi(txnConf.getFragmentInstanceId().getHi())
.setLo(txnConf.getFragmentInstanceId().getLo()).build();
Backend be = txnEntry.getBackend();
TNetworkAddress address = new TNetworkAddress(be.getHost(), be.getBrpcPort());
try {
Future<InternalService.PRollbackResult> future = BackendServiceProxy.getInstance().rollback(address,
fragmentInstanceId, this.txnEntry.getpLoadId());
InternalService.PRollbackResult result = future.get(5, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new TException("failed to rollback txn: " + result.getStatus().getErrorMsgsList());
}
} catch (RpcException e) {
throw new TException(e);
}
}
public void sendData() throws TException, TimeoutException,
InterruptedException, ExecutionException {
if (txnEntry.getDataToSend() == null || txnEntry.getDataToSend().isEmpty()) {
return;
}
TTxnParams txnConf = txnEntry.getTxnConf();
Types.PUniqueId fragmentInstanceId = Types.PUniqueId.newBuilder()
.setHi(txnConf.getFragmentInstanceId().getHi())
.setLo(txnConf.getFragmentInstanceId().getLo()).build();
Backend backend = txnEntry.getBackend();
TNetworkAddress address = new TNetworkAddress(backend.getHost(), backend.getBrpcPort());
try {
Future<InternalService.PSendDataResult> future = BackendServiceProxy.getInstance().sendData(
address, fragmentInstanceId, this.txnEntry.getpLoadId(), txnEntry.getDataToSend());
InternalService.PSendDataResult result = future.get(5, TimeUnit.SECONDS);
TStatusCode code = TStatusCode.findByValue(result.getStatus().getStatusCode());
if (code != TStatusCode.OK) {
throw new TException("failed to insert data: " + result.getStatus().getErrorMsgsList());
}
} catch (RpcException e) {
throw new TException(e);
} finally {
txnEntry.clearDataToSend();
}
}
public TUniqueId getLoadId() {
return loadId;
}
public void setLoadId(TUniqueId loadId) {
this.loadId = loadId;
this.txnEntry.setpLoadId(Types.PUniqueId.newBuilder()
.setHi(loadId.getHi())
.setLo(loadId.getLo()).build());
}
public long getTxnId() {
return txnId;
}
public void setTxnId(long txnId) {
this.txnId = txnId;
}
public TransactionEntry getTxnEntry() {
return txnEntry;
}
}