FrontendServiceImpl.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.service;
import org.apache.doris.analysis.AbstractBackupTableRefClause;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.Analyzer;
import org.apache.doris.analysis.LabelName;
import org.apache.doris.analysis.PartitionExprUtil;
import org.apache.doris.analysis.PartitionNames;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.TableName;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.backup.Snapshot;
import org.apache.doris.binlog.BinlogLagInfo;
import org.apache.doris.catalog.AutoIncrementGenerator;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.MaterializedIndex;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionInfo;
import org.apache.doris.catalog.PartitionType;
import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.catalog.Tablet;
import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.catalog.View;
import org.apache.doris.cloud.catalog.CloudPartition;
import org.apache.doris.cloud.catalog.CloudTablet;
import org.apache.doris.cloud.proto.Cloud.CommitTxnResponse;
import org.apache.doris.cluster.ClusterNamespace;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.AuthenticationException;
import org.apache.doris.common.CaseSensibility;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.DuplicatedRequestException;
import org.apache.doris.common.GZIPUtils;
import org.apache.doris.common.InternalErrorCode;
import org.apache.doris.common.LabelAlreadyUsedException;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.PatternMatcherException;
import org.apache.doris.common.Status;
import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.annotation.LogException;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalDatabase;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.insertoverwrite.InsertOverwriteUtil;
import org.apache.doris.load.StreamLoadHandler;
import org.apache.doris.load.routineload.ErrorReason;
import org.apache.doris.load.routineload.RoutineLoadJob;
import org.apache.doris.load.routineload.RoutineLoadJob.JobState;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.master.MasterImpl;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.trees.plans.PlanNodeAndHash;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.planner.OlapTableSink;
import org.apache.doris.plsql.metastore.PlsqlPackage;
import org.apache.doris.plsql.metastore.PlsqlProcedureKey;
import org.apache.doris.plsql.metastore.PlsqlStoredProcedure;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.ConnectContext.ConnectType;
import org.apache.doris.qe.ConnectProcessor;
import org.apache.doris.qe.Coordinator;
import org.apache.doris.qe.DdlExecutor;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.HttpStreamParams;
import org.apache.doris.qe.MasterCatalogExecutor;
import org.apache.doris.qe.MasterOpExecutor;
import org.apache.doris.qe.MysqlConnectProcessor;
import org.apache.doris.qe.QeProcessorImpl;
import org.apache.doris.qe.QueryState;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.service.arrowflight.FlightSqlConnectProcessor;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.ColStatsData;
import org.apache.doris.statistics.ColumnStatistic;
import org.apache.doris.statistics.InvalidateStatsTarget;
import org.apache.doris.statistics.StatisticsCacheKey;
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdatePartitionStatsTarget;
import org.apache.doris.statistics.hbo.RecentRunsPlanStatistics;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.tablefunction.MetadataGenerator;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.FrontendServiceVersion;
import org.apache.doris.thrift.TAddPlsqlPackageRequest;
import org.apache.doris.thrift.TAddPlsqlStoredProcedureRequest;
import org.apache.doris.thrift.TAutoIncrementRangeRequest;
import org.apache.doris.thrift.TAutoIncrementRangeResult;
import org.apache.doris.thrift.TBackend;
import org.apache.doris.thrift.TBeginTxnRequest;
import org.apache.doris.thrift.TBeginTxnResult;
import org.apache.doris.thrift.TBinlog;
import org.apache.doris.thrift.TCheckAuthRequest;
import org.apache.doris.thrift.TCheckAuthResult;
import org.apache.doris.thrift.TColumnDef;
import org.apache.doris.thrift.TColumnDesc;
import org.apache.doris.thrift.TColumnInfo;
import org.apache.doris.thrift.TCommitTxnRequest;
import org.apache.doris.thrift.TCommitTxnResult;
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
import org.apache.doris.thrift.TConfirmUnusedRemoteFilesResult;
import org.apache.doris.thrift.TCreatePartitionRequest;
import org.apache.doris.thrift.TCreatePartitionResult;
import org.apache.doris.thrift.TDescribeTablesParams;
import org.apache.doris.thrift.TDescribeTablesResult;
import org.apache.doris.thrift.TDropPlsqlPackageRequest;
import org.apache.doris.thrift.TDropPlsqlStoredProcedureRequest;
import org.apache.doris.thrift.TFeResult;
import org.apache.doris.thrift.TFetchResourceResult;
import org.apache.doris.thrift.TFetchRoutineLoadJobRequest;
import org.apache.doris.thrift.TFetchRoutineLoadJobResult;
import org.apache.doris.thrift.TFetchRunningQueriesRequest;
import org.apache.doris.thrift.TFetchRunningQueriesResult;
import org.apache.doris.thrift.TFetchSchemaTableDataRequest;
import org.apache.doris.thrift.TFetchSchemaTableDataResult;
import org.apache.doris.thrift.TFetchSplitBatchRequest;
import org.apache.doris.thrift.TFetchSplitBatchResult;
import org.apache.doris.thrift.TFinishTaskRequest;
import org.apache.doris.thrift.TFrontendPingFrontendRequest;
import org.apache.doris.thrift.TFrontendPingFrontendResult;
import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TFrontendReportAliveSessionRequest;
import org.apache.doris.thrift.TFrontendReportAliveSessionResult;
import org.apache.doris.thrift.TGetBackendMetaRequest;
import org.apache.doris.thrift.TGetBackendMetaResult;
import org.apache.doris.thrift.TGetBinlogLagResult;
import org.apache.doris.thrift.TGetBinlogRequest;
import org.apache.doris.thrift.TGetBinlogResult;
import org.apache.doris.thrift.TGetColumnInfoRequest;
import org.apache.doris.thrift.TGetColumnInfoResult;
import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetMasterTokenRequest;
import org.apache.doris.thrift.TGetMasterTokenResult;
import org.apache.doris.thrift.TGetMetaDB;
import org.apache.doris.thrift.TGetMetaRequest;
import org.apache.doris.thrift.TGetMetaResult;
import org.apache.doris.thrift.TGetMetaTable;
import org.apache.doris.thrift.TGetQueryStatsRequest;
import org.apache.doris.thrift.TGetSnapshotRequest;
import org.apache.doris.thrift.TGetSnapshotResult;
import org.apache.doris.thrift.TGetTablesParams;
import org.apache.doris.thrift.TGetTablesResult;
import org.apache.doris.thrift.TGetTabletReplicaInfosRequest;
import org.apache.doris.thrift.TGetTabletReplicaInfosResult;
import org.apache.doris.thrift.TGroupCommitInfo;
import org.apache.doris.thrift.TInitExternalCtlMetaRequest;
import org.apache.doris.thrift.TInitExternalCtlMetaResult;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TListPrivilegesResult;
import org.apache.doris.thrift.TListTableMetadataNameIdsResult;
import org.apache.doris.thrift.TListTableStatusResult;
import org.apache.doris.thrift.TLoadTxn2PCRequest;
import org.apache.doris.thrift.TLoadTxn2PCResult;
import org.apache.doris.thrift.TLoadTxnBeginRequest;
import org.apache.doris.thrift.TLoadTxnBeginResult;
import org.apache.doris.thrift.TLoadTxnCommitRequest;
import org.apache.doris.thrift.TLoadTxnCommitResult;
import org.apache.doris.thrift.TLoadTxnRollbackRequest;
import org.apache.doris.thrift.TLoadTxnRollbackResult;
import org.apache.doris.thrift.TLockBinlogRequest;
import org.apache.doris.thrift.TLockBinlogResult;
import org.apache.doris.thrift.TMasterOpRequest;
import org.apache.doris.thrift.TMasterOpResult;
import org.apache.doris.thrift.TMasterResult;
import org.apache.doris.thrift.TMySqlLoadAcquireTokenResult;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TNodeInfo;
import org.apache.doris.thrift.TNullableStringLiteral;
import org.apache.doris.thrift.TOlapTableIndexTablets;
import org.apache.doris.thrift.TOlapTablePartition;
import org.apache.doris.thrift.TPipelineFragmentParams;
import org.apache.doris.thrift.TPipelineWorkloadGroup;
import org.apache.doris.thrift.TPlsqlPackageResult;
import org.apache.doris.thrift.TPlsqlStoredProcedureResult;
import org.apache.doris.thrift.TPrivilegeCtrl;
import org.apache.doris.thrift.TPrivilegeHier;
import org.apache.doris.thrift.TPrivilegeStatus;
import org.apache.doris.thrift.TPrivilegeType;
import org.apache.doris.thrift.TQueryStatsResult;
import org.apache.doris.thrift.TQueryType;
import org.apache.doris.thrift.TReplacePartitionRequest;
import org.apache.doris.thrift.TReplacePartitionResult;
import org.apache.doris.thrift.TReplicaInfo;
import org.apache.doris.thrift.TReportCommitTxnResultRequest;
import org.apache.doris.thrift.TReportExecStatusParams;
import org.apache.doris.thrift.TReportExecStatusResult;
import org.apache.doris.thrift.TReportRequest;
import org.apache.doris.thrift.TRestoreSnapshotRequest;
import org.apache.doris.thrift.TRestoreSnapshotResult;
import org.apache.doris.thrift.TRollbackTxnRequest;
import org.apache.doris.thrift.TRollbackTxnResult;
import org.apache.doris.thrift.TRoutineLoadJob;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.doris.thrift.TSchemaTableName;
import org.apache.doris.thrift.TShowProcessListRequest;
import org.apache.doris.thrift.TShowProcessListResult;
import org.apache.doris.thrift.TShowUserRequest;
import org.apache.doris.thrift.TShowUserResult;
import org.apache.doris.thrift.TShowVariableRequest;
import org.apache.doris.thrift.TShowVariableResult;
import org.apache.doris.thrift.TSnapshotLoaderReportRequest;
import org.apache.doris.thrift.TSnapshotType;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStreamLoadMultiTablePutResult;
import org.apache.doris.thrift.TStreamLoadPutRequest;
import org.apache.doris.thrift.TStreamLoadPutResult;
import org.apache.doris.thrift.TSubTxnInfo;
import org.apache.doris.thrift.TSyncQueryColumns;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableMetadataNameIds;
import org.apache.doris.thrift.TTableQueryStats;
import org.apache.doris.thrift.TTableRef;
import org.apache.doris.thrift.TTableStatus;
import org.apache.doris.thrift.TTabletLocation;
import org.apache.doris.thrift.TTxnParams;
import org.apache.doris.thrift.TUniqueId;
import org.apache.doris.thrift.TUpdateExportTaskStatusRequest;
import org.apache.doris.thrift.TUpdateFollowerPartitionStatsCacheRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TUpdatePlanStatsCacheRequest;
import org.apache.doris.thrift.TWaitingTxnStatusRequest;
import org.apache.doris.thrift.TWaitingTxnStatusResult;
import org.apache.doris.transaction.SubTransactionState;
import org.apache.doris.transaction.TabletCommitInfo;
import org.apache.doris.transaction.TransactionState;
import org.apache.doris.transaction.TransactionState.TxnCoordinator;
import org.apache.doris.transaction.TransactionState.TxnSourceType;
import org.apache.doris.transaction.TransactionStatus;
import org.apache.doris.transaction.TxnCommitAttachment;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
// Frontend service used to serve all request for this frontend through
// thrift protocol
public class FrontendServiceImpl implements FrontendService.Iface {
private static final Logger LOG = LogManager.getLogger(FrontendServiceImpl.class);
private static final String NOT_MASTER_ERR_MSG = "FE is not master";
private MasterImpl masterImpl;
private ExecuteEnv exeEnv;
// key is txn id,value is index of plan fragment instance, it's used by multi
// table request plan
private ConcurrentHashMap<Long, AtomicInteger> multiTableFragmentInstanceIdIndexMap =
new ConcurrentHashMap<>(64);
private final Map<TUniqueId, ConnectContext> proxyQueryIdToConnCtx =
new ConcurrentHashMap<>(64);
private static TNetworkAddress getMasterAddress() {
Env env = Env.getCurrentEnv();
String masterHost = env.getMasterHost();
int masterRpcPort = env.getMasterRpcPort();
return new TNetworkAddress(masterHost, masterRpcPort);
}
public FrontendServiceImpl(ExecuteEnv exeEnv) {
masterImpl = new MasterImpl();
this.exeEnv = exeEnv;
}
@Override
public TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(TConfirmUnusedRemoteFilesRequest request)
throws TException {
if (!Env.getCurrentEnv().isMaster()) {
throw new TException("FE is not master");
}
TConfirmUnusedRemoteFilesResult res = new TConfirmUnusedRemoteFilesResult();
if (!request.isSetConfirmList()) {
throw new TException("confirm_list in null");
}
request.getConfirmList().forEach(info -> {
if (!info.isSetCooldownMetaId()) {
LOG.warn("cooldown_meta_id is null");
return;
}
TabletMeta tabletMeta = Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(info.tablet_id);
if (tabletMeta == null) {
LOG.warn("tablet {} not found", info.tablet_id);
return;
}
Tablet tablet;
int replicaNum;
try {
OlapTable table = (OlapTable) Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
.getTable(tabletMeta.getTableId())
.get();
table.readLock();
replicaNum = table.getPartitionInfo().getReplicaAllocation(tabletMeta.getPartitionId())
.getTotalReplicaNum();
try {
tablet = table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId())
.getTablet(info.tablet_id);
} finally {
table.readUnlock();
}
} catch (RuntimeException e) {
LOG.warn("tablet {} not found", info.tablet_id);
return;
}
// check cooldownReplicaId
Pair<Long, Long> cooldownConf = tablet.getCooldownConf();
if (cooldownConf.first != info.cooldown_replica_id) {
LOG.info("cooldown replica id not match({} vs {}), tablet={}", cooldownConf.first,
info.cooldown_replica_id, info.tablet_id);
return;
}
// check cooldownMetaId of all replicas are the same
List<Replica> replicas = Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id);
// FIXME(plat1ko): We only delete remote files when tablet is under a stable
// state: enough replicas and
// all replicas are alive. Are these conditions really sufficient or necessary?
if (replicas.size() < replicaNum) {
LOG.info("num replicas are not enough, tablet={}", info.tablet_id);
return;
}
for (Replica replica : replicas) {
if (!replica.isAlive()) {
LOG.info("replica is not alive, tablet={}, replica={}", info.tablet_id, replica.getId());
return;
}
if (replica.getCooldownTerm() != cooldownConf.second) {
LOG.info("replica's cooldown term not match({} vs {}), tablet={}", cooldownConf.second,
replica.getCooldownTerm(), info.tablet_id);
return;
}
if (!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
LOG.info("cooldown meta id are not same, tablet={}", info.tablet_id);
return;
}
}
res.addToConfirmedTablets(info.tablet_id);
});
if (res.isSetConfirmedTablets() && !res.getConfirmedTablets().isEmpty()) {
if (Env.getCurrentEnv().isMaster()) {
// ensure FE is real master
Env.getCurrentEnv().getEditLog().logCooldownDelete(new CooldownDelete());
} else {
throw new TException("FE is not master");
}
}
return res;
}
@Override
public TGetDbsResult getDbNames(TGetDbsParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get db request: {}", params);
}
TGetDbsResult result = new TGetDbsResult();
List<String> dbNames = Lists.newArrayList();
List<String> catalogNames = Lists.newArrayList();
List<Long> dbIds = Lists.newArrayList();
List<Long> catalogIds = Lists.newArrayList();
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
matcher = PatternMatcher.createMysqlPattern(params.getPattern(),
CaseSensibility.DATABASE.getCaseSensibility());
} catch (PatternMatcherException e) {
throw new TException("Pattern is in bad format: " + params.getPattern());
}
}
Env env = Env.getCurrentEnv();
List<CatalogIf> catalogIfs = Lists.newArrayList();
// list all catalogs or the specified catalog.
if (Strings.isNullOrEmpty(params.catalog)) {
catalogIfs = env.getCatalogMgr().listCatalogs();
} else {
catalogIfs.add(env.getCatalogMgr()
.getCatalogOrException(params.catalog,
catalog -> new TException("Unknown catalog " + catalog)));
}
for (CatalogIf catalog : catalogIfs) {
Collection<DatabaseIf> dbs = new HashSet<DatabaseIf>();
try {
dbs = catalog.getAllDbs();
} catch (Exception e) {
LOG.warn("failed to get database names for catalog {}", catalog.getName(), e);
// Some external catalog may fail to get databases due to wrong connection info.
}
if (LOG.isDebugEnabled()) {
LOG.debug("get db size: {}, in catalog: {}", dbs.size(), catalog.getName());
}
if (dbs.isEmpty() && params.isSetGetNullCatalog() && params.get_null_catalog) {
catalogNames.add(catalog.getName());
dbNames.add("NULL");
catalogIds.add(catalog.getId());
dbIds.add(-1L);
continue;
}
if (dbs.isEmpty()) {
continue;
}
UserIdentity currentUser = null;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
for (DatabaseIf db : dbs) {
String dbName = db.getFullName();
if (!env.getAccessManager()
.checkDbPriv(currentUser, catalog.getName(), dbName, PrivPredicate.SHOW)) {
continue;
}
if (matcher != null && !matcher.match(getMysqlTableSchema(catalog.getName(), dbName))) {
continue;
}
catalogNames.add(catalog.getName());
dbNames.add(getMysqlTableSchema(catalog.getName(), dbName));
catalogIds.add(catalog.getId());
dbIds.add(db.getId());
}
}
result.setDbs(dbNames);
result.setCatalogs(catalogNames);
result.setCatalogIds(catalogIds);
result.setDbIds(dbIds);
return result;
}
private String getMysqlTableSchema(String ctl, String db) {
if (!GlobalVariable.showFullDbNameInInfoSchemaDb) {
return db;
}
if (ctl.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
return db;
}
return ctl + "." + db;
}
private String getDbNameFromMysqlTableSchema(String ctl, String db) {
if (ctl.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
return db;
}
String[] parts = db.split("\\.");
if (parts.length == 2) {
return parts[1];
}
return db;
}
@LogException
@Override
public TGetTablesResult getTableNames(TGetTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get table name request: {}", params);
}
TGetTablesResult result = new TGetTablesResult();
List<String> tablesResult = Lists.newArrayList();
result.setTables(tablesResult);
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
matcher = PatternMatcher.createMysqlPattern(params.getPattern(),
CaseSensibility.TABLE.getCaseSensibility());
} catch (PatternMatcherException e) {
throw new TException("Pattern is in bad format: " + params.getPattern());
}
}
// database privs should be checked in analysis phrase
UserIdentity currentUser;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
String catalogName = Strings.isNullOrEmpty(params.catalog) ? InternalCatalog.INTERNAL_CATALOG_NAME
: params.catalog;
String dbName = getDbNameFromMysqlTableSchema(catalogName, params.db);
DatabaseIf<TableIf> db = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrException(catalogName, catalog -> new TException("Unknown catalog: " + catalog))
.getDbNullable(dbName);
if (db != null) {
Set<String> tableNames;
try {
tableNames = db.getTableNamesOrEmptyWithLock();
for (String tableName : tableNames) {
if (LOG.isDebugEnabled()) {
LOG.debug("get table: {}, wait to check", tableName);
}
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(currentUser, catalogName, dbName, tableName,
PrivPredicate.SHOW)) {
continue;
}
if (matcher != null && !matcher.match(tableName)) {
continue;
}
tablesResult.add(tableName);
}
} catch (Exception e) {
LOG.warn("failed to get table names for db {} in catalog {}", params.db, catalogName, e);
}
}
return result;
}
@Override
public TListTableStatusResult listTableStatus(TGetTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get list table request: {}", params);
}
TListTableStatusResult result = new TListTableStatusResult();
List<TTableStatus> tablesResult = Lists.newArrayList();
result.setTables(tablesResult);
PatternMatcher matcher = null;
String specifiedTable = null;
if (params.isSetPattern()) {
try {
matcher = PatternMatcher.createMysqlPattern(params.getPattern(),
CaseSensibility.TABLE.getCaseSensibility());
} catch (PatternMatcherException e) {
throw new TException("Pattern is in bad format " + params.getPattern());
}
}
if (params.isSetTable()) {
specifiedTable = params.getTable();
}
// database privs should be checked in analysis phrase
UserIdentity currentUser;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
String catalogName = InternalCatalog.INTERNAL_CATALOG_NAME;
if (params.isSetCatalog()) {
catalogName = params.catalog;
}
String dbName = getDbNameFromMysqlTableSchema(catalogName, params.db);
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (catalog != null) {
DatabaseIf db = catalog.getDbNullable(dbName);
if (db != null) {
try {
List<TableIf> tables;
if (!params.isSetType() || params.getType() == null || params.getType().isEmpty()) {
tables = db.getTablesIgnoreException();
} else {
switch (params.getType()) {
case "VIEW":
tables = db.getViewsOrEmpty();
break;
default:
tables = db.getTablesIgnoreException();
}
}
for (TableIf table : tables) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(currentUser, catalogName, dbName,
table.getName(), PrivPredicate.SHOW)) {
continue;
}
if (matcher != null && !matcher.match(table.getName())) {
continue;
}
if (specifiedTable != null && !specifiedTable.equals(table.getName())) {
continue;
}
// For the follower node in cloud mode,
// when querying the information_schema table,
// the version needs to be updated.
// Otherwise, the version will always be the old value
// unless there is a query for the table in the follower node.
if (!Env.getCurrentEnv().isMaster() && Config.isCloudMode()
&& table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
List<CloudPartition> partitions = olapTable.getAllPartitions().stream()
.filter(p -> p instanceof CloudPartition)
.map(cloudPartition -> (CloudPartition) cloudPartition)
.collect(Collectors.toList());
CloudPartition.getSnapshotVisibleVersion(partitions);
}
table.readLock();
try {
long lastCheckTime = table.getLastCheckTime() <= 0 ? 0 : table.getLastCheckTime();
TTableStatus status = new TTableStatus();
status.setName(table.getName());
status.setType(table.getMysqlType());
status.setEngine(table.getEngine());
status.setComment(table.getComment());
status.setCreateTime(table.getCreateTime());
status.setLastCheckTime(lastCheckTime / 1000);
status.setUpdateTime(table.getUpdateTime() / 1000);
status.setCheckTime(lastCheckTime / 1000);
status.setCollation("utf-8");
status.setRows(table.getCachedRowCount());
status.setDataLength(table.getDataLength());
status.setAvgRowLength(table.getAvgRowLength());
status.setIndexLength(table.getIndexLength());
if (table instanceof View) {
status.setDdlSql(((View) table).getInlineViewDef());
}
tablesResult.add(status);
} finally {
table.readUnlock();
}
}
} catch (Exception e) {
LOG.warn("failed to get tables for db {} in catalog {}", db.getFullName(), catalogName, e);
}
}
}
return result;
}
public TListTableMetadataNameIdsResult listTableMetadataNameIds(TGetTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get list simple table request: {}", params);
}
TListTableMetadataNameIdsResult result = new TListTableMetadataNameIdsResult();
List<TTableMetadataNameIds> tablesResult = Lists.newArrayList();
result.setTables(tablesResult);
UserIdentity currentUser;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
String catalogName;
if (params.isSetCatalog()) {
catalogName = params.catalog;
} else {
catalogName = InternalCatalog.INTERNAL_CATALOG_NAME;
}
PatternMatcher matcher = null;
if (params.isSetPattern()) {
try {
matcher = PatternMatcher.createMysqlPattern(params.getPattern(),
CaseSensibility.TABLE.getCaseSensibility());
} catch (PatternMatcherException e) {
throw new TException("Pattern is in bad format " + params.getPattern());
}
}
PatternMatcher finalMatcher = matcher;
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogName);
if (catalog != null) {
String dbName = getDbNameFromMysqlTableSchema(catalogName, params.db);
DatabaseIf db = catalog.getDbNullable(dbName);
if (db != null) {
List<TableIf> tables = db.getTables();
for (TableIf table : tables) {
if (!Env.getCurrentEnv().getAccessManager().checkTblPriv(currentUser, catalogName, dbName,
table.getName(), PrivPredicate.SHOW)) {
continue;
}
table.readLock();
try {
if (finalMatcher != null && !finalMatcher.match(table.getName())) {
continue;
}
TTableMetadataNameIds status = new TTableMetadataNameIds();
status.setName(table.getName());
status.setId(table.getId());
tablesResult.add(status);
} finally {
table.readUnlock();
}
}
}
}
});
try {
if (catalogName.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
future.get();
} else {
future.get(Config.query_metadata_name_ids_timeout, TimeUnit.SECONDS);
}
} catch (TimeoutException e) {
future.cancel(true);
LOG.info("From catalog:{},db:{} get tables timeout.", catalogName, params.db);
} catch (InterruptedException | ExecutionException e) {
future.cancel(true);
} finally {
executor.shutdown();
}
return result;
}
@Override
public TListPrivilegesResult listTablePrivilegeStatus(TGetTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get list table privileges request: {}", params);
}
TListPrivilegesResult result = new TListPrivilegesResult();
List<TPrivilegeStatus> tblPrivResult = Lists.newArrayList();
result.setPrivileges(tblPrivResult);
UserIdentity currentUser = null;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
Env.getCurrentEnv().getAuth().getTablePrivStatus(tblPrivResult, currentUser);
return result;
}
@Override
public TListPrivilegesResult listSchemaPrivilegeStatus(TGetTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get list schema privileges request: {}", params);
}
TListPrivilegesResult result = new TListPrivilegesResult();
List<TPrivilegeStatus> tblPrivResult = Lists.newArrayList();
result.setPrivileges(tblPrivResult);
UserIdentity currentUser = null;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
Env.getCurrentEnv().getAuth().getSchemaPrivStatus(tblPrivResult, currentUser);
return result;
}
@Override
public TListPrivilegesResult listUserPrivilegeStatus(TGetTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get list user privileges request: {}", params);
}
TListPrivilegesResult result = new TListPrivilegesResult();
List<TPrivilegeStatus> userPrivResult = Lists.newArrayList();
result.setPrivileges(userPrivResult);
UserIdentity currentUser = null;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
Env.getCurrentEnv().getAuth().getGlobalPrivStatus(userPrivResult, currentUser);
return result;
}
@Override
public TFeResult updateExportTaskStatus(TUpdateExportTaskStatusRequest request) throws TException {
TStatus status = new TStatus(TStatusCode.OK);
TFeResult result = new TFeResult(FrontendServiceVersion.V1, status);
return result;
}
@Override
public TDescribeTablesResult describeTables(TDescribeTablesParams params) throws TException {
if (LOG.isDebugEnabled()) {
LOG.debug("get desc tables request: {}", params);
}
TDescribeTablesResult result = new TDescribeTablesResult();
List<TColumnDef> columns = Lists.newArrayList();
List<Integer> tablesOffset = Lists.newArrayList();
List<String> tables = params.getTablesName();
result.setColumns(columns);
result.setTablesOffset(tablesOffset);
// database privs should be checked in analysis phrase
UserIdentity currentUser = null;
if (params.isSetCurrentUserIdent()) {
currentUser = UserIdentity.fromThrift(params.current_user_ident);
} else {
currentUser = UserIdentity.createAnalyzedUserIdentWithIp(params.user, params.user_ip);
}
String dbName = getDbNameFromMysqlTableSchema(params.catalog, params.db);
for (String tableName : tables) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(currentUser, params.catalog, dbName, tableName, PrivPredicate.SHOW)) {
return result;
}
}
String catalogName = Strings.isNullOrEmpty(params.catalog) ? InternalCatalog.INTERNAL_CATALOG_NAME
: params.catalog;
DatabaseIf<TableIf> db = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrException(catalogName, catalog -> new TException("Unknown catalog " + catalog))
.getDbNullable(dbName);
if (db != null) {
for (String tableName : tables) {
TableIf table = db.getTableNullableIfException(tableName);
if (table != null) {
table.readLock();
try {
List<Column> baseSchema = table.getBaseSchemaOrEmpty();
for (Column column : baseSchema) {
final TColumnDesc desc = getColumnDesc(column);
final TColumnDef colDef = new TColumnDef(desc);
final String comment = column.getComment();
if (comment != null) {
if (Config.column_comment_length_limit > 0
&& comment.length() > Config.column_comment_length_limit) {
colDef.setComment(comment.substring(0, Config.column_comment_length_limit));
} else {
colDef.setComment(comment);
}
}
if (column.isKey()) {
if (table instanceof OlapTable) {
desc.setColumnKey(((OlapTable) table).getKeysType().toMetadata());
}
}
columns.add(colDef);
}
} finally {
table.readUnlock();
}
tablesOffset.add(columns.size());
}
}
}
return result;
}
private TColumnDesc getColumnDesc(Column column) {
final TColumnDesc desc = new TColumnDesc(column.getName(), column.getDataType().toThrift());
final Integer precision = column.getOriginType().getPrecision();
if (precision != null) {
desc.setColumnPrecision(precision);
}
final Integer columnLength = column.getOriginType().getColumnSize();
if (columnLength != null) {
desc.setColumnLength(columnLength);
}
final Integer decimalDigits = column.getOriginType().getDecimalDigits();
if (decimalDigits != null) {
desc.setColumnScale(decimalDigits);
}
desc.setIsAllowNull(column.isAllowNull());
if (column.getChildren().size() > 0) {
ArrayList<TColumnDesc> children = new ArrayList<>();
for (Column child : column.getChildren()) {
children.add(getColumnDesc(child));
}
desc.setChildren(children);
}
String defaultValue = column.getDefaultValue();
if (defaultValue != null) {
desc.setDefaultValue(defaultValue);
}
return desc;
}
@Override
public TShowVariableResult showVariables(TShowVariableRequest params) throws TException {
TShowVariableResult result = new TShowVariableResult();
List<List<String>> vars = Lists.newArrayList();
result.setVariables(vars);
// Find connect
ConnectContext ctx = exeEnv.getScheduler().getContext((int) params.getThreadId());
if (ctx == null) {
return result;
}
vars = VariableMgr.dump(SetType.fromThrift(params.getVarType()), ctx.getSessionVariable(), null);
result.setVariables(vars);
return result;
}
@Override
public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException {
return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr());
}
@Override
public TFetchSplitBatchResult fetchSplitBatch(TFetchSplitBatchRequest request) throws TException {
TFetchSplitBatchResult result = new TFetchSplitBatchResult();
SplitSource splitSource =
Env.getCurrentEnv().getSplitSourceManager().getSplitSource(request.getSplitSourceId());
if (splitSource == null) {
throw new TException("Split source " + request.getSplitSourceId() + " is released");
}
try {
List<TScanRangeLocations> locations = splitSource.getNextBatch(request.getMaxNumSplits());
result.setSplits(locations);
result.status = new TStatus(TStatusCode.OK);
return result;
} catch (Exception e) {
LOG.warn("failed to fetch split batch with source id {}", request.getSplitSourceId(), e);
result.status = new TStatus(TStatusCode.INTERNAL_ERROR);
result.status.addToErrorMsgs(e.getMessage());
}
return result;
}
@Override
public TStatus updatePartitionStatsCache(TUpdateFollowerPartitionStatsCacheRequest request) {
UpdatePartitionStatsTarget target = GsonUtils.GSON.fromJson(request.key, UpdatePartitionStatsTarget.class);
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
analysisManager.updateLocalPartitionStatsCache(target.catalogId, target.dbId, target.tableId,
target.indexId, target.partitions, target.columnName);
return new TStatus(TStatusCode.OK);
}
@Override
public TMasterResult finishTask(TFinishTaskRequest request) throws TException {
return masterImpl.finishTask(request);
}
@Override
public TMasterResult report(TReportRequest request) throws TException {
return masterImpl.report(request);
}
// This interface is used for keeping backward compatible
@Override
public TFetchResourceResult fetchResource() throws TException {
throw new TException("not supported");
}
@Override
public TMasterOpResult forward(TMasterOpRequest params) throws TException {
Frontend fe = Env.getCurrentEnv().checkFeExist(params.getClientNodeHost(), params.getClientNodePort());
if (fe == null) {
LOG.warn("reject request from invalid host. client: {}", params.getClientNodeHost());
throw new TException("request from invalid host was rejected.");
}
if (params.isSyncJournalOnly()) {
final TMasterOpResult result = new TMasterOpResult();
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
// just make the protocol happy
result.setPacket("".getBytes());
return result;
}
if (params.getGroupCommitInfo() != null && params.getGroupCommitInfo().isGetGroupCommitLoadBeId()) {
final TGroupCommitInfo info = params.getGroupCommitInfo();
final TMasterOpResult result = new TMasterOpResult();
try {
result.setGroupCommitLoadBeId(Env.getCurrentEnv().getGroupCommitManager()
.selectBackendForGroupCommitInternal(info.groupCommitLoadTableId, info.cluster));
} catch (LoadException | DdlException e) {
throw new TException(e.getMessage());
}
// just make the protocol happy
result.setPacket("".getBytes());
return result;
}
if (params.getGroupCommitInfo() != null && params.getGroupCommitInfo().isUpdateLoadData()) {
final TGroupCommitInfo info = params.getGroupCommitInfo();
final TMasterOpResult result = new TMasterOpResult();
Env.getCurrentEnv().getGroupCommitManager()
.updateLoadData(info.tableId, info.receiveData);
// just make the protocol happy
result.setPacket("".getBytes());
return result;
}
if (params.isSetCancelQeury() && params.isCancelQeury()) {
if (!params.isSetQueryId()) {
throw new TException("a query id is needed to cancel a query");
}
TUniqueId queryId = params.getQueryId();
ConnectContext ctx = proxyQueryIdToConnCtx.get(queryId);
if (ctx != null) {
ctx.cancelQuery(new Status(TStatusCode.CANCELLED, "cancel query by forward request."));
}
final TMasterOpResult result = new TMasterOpResult();
result.setStatusCode(0);
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
// just make the protocol happy
result.setPacket("".getBytes());
return result;
}
// add this log so that we can track this stmt
if (LOG.isDebugEnabled()) {
LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost());
}
ConnectContext context = new ConnectContext(null, true, params.getSessionId());
// Set current connected FE to the client address, so that we can know where
// this request come from.
context.setCurrentConnectedFEIp(params.getClientNodeHost());
if (Config.isCloudMode() && !Strings.isNullOrEmpty(params.getCloudCluster())) {
context.setCloudCluster(params.getCloudCluster());
}
ConnectProcessor processor = null;
if (context.getConnectType().equals(ConnectType.MYSQL)) {
processor = new MysqlConnectProcessor(context);
} else if (context.getConnectType().equals(ConnectType.ARROW_FLIGHT_SQL)) {
processor = new FlightSqlConnectProcessor(context);
} else {
throw new TException("unknown ConnectType: " + context.getConnectType());
}
Runnable clearCallback = () -> {};
if (params.isSetQueryId()) {
proxyQueryIdToConnCtx.put(params.getQueryId(), context);
clearCallback = () -> proxyQueryIdToConnCtx.remove(params.getQueryId());
}
TMasterOpResult result = processor.proxyExecute(params);
if (QueryState.MysqlStateType.ERR.name().equalsIgnoreCase(result.getStatus())) {
context.getState().setError(result.getStatus());
} else {
context.getState().setOk();
}
ConnectContext.remove();
clearCallback.run();
return result;
}
private List<String> getTableNames(String dbName, List<Long> tableIds) throws UserException {
final String fullDbName = dbName;
Database db = Env.getCurrentInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
throw new UserException(String.format("can't find db named: %s", dbName));
}
List<String> tableNames = Lists.newArrayList();
for (Long id : tableIds) {
Table table = db.getTableNullable(id);
if (table == null) {
throw new UserException(String.format("can't find table id: %d in db: %s", id, dbName));
}
tableNames.add(table.getName());
}
return tableNames;
}
private void checkSingleTablePasswordAndPrivs(String user, String passwd, String db, String tbl,
String clientIp, PrivPredicate predicate) throws AuthenticationException {
checkPasswordAndPrivs(user, passwd, db, Lists.newArrayList(tbl), clientIp, predicate);
}
private void checkDbPasswordAndPrivs(String user, String passwd, String db, String clientIp,
PrivPredicate predicate) throws AuthenticationException {
checkPasswordAndPrivs(user, passwd, db, null, clientIp, predicate);
}
private void checkPasswordAndPrivs(String user, String passwd, String db, List<String> tables,
String clientIp, PrivPredicate predicate) throws AuthenticationException {
final String fullUserName = ClusterNamespace.getNameFromFullName(user);
final String fullDbName = db;
List<UserIdentity> currentUser = Lists.newArrayList();
Env.getCurrentEnv().getAuth().checkPlainPassword(fullUserName, clientIp, passwd, currentUser);
Preconditions.checkState(currentUser.size() == 1);
if (tables == null || tables.isEmpty()) {
if (!Env.getCurrentEnv().getAccessManager()
.checkDbPriv(currentUser.get(0), InternalCatalog.INTERNAL_CATALOG_NAME, fullDbName, predicate)) {
throw new AuthenticationException(
"Access denied; you need (at least one of) the (" + predicate.toString()
+ ") privilege(s) for this operation");
}
return;
}
for (String tbl : tables) {
if (!Env.getCurrentEnv().getAccessManager()
.checkTblPriv(currentUser.get(0), InternalCatalog.INTERNAL_CATALOG_NAME, fullDbName, tbl,
predicate)) {
throw new AuthenticationException(
"Access denied; you need (at least one of) the (" + predicate.toString()
+ ") privilege(s) for this operation");
}
}
}
private void checkPassword(String user, String passwd, String clientIp)
throws AuthenticationException {
final String fullUserName = ClusterNamespace.getNameFromFullName(user);
List<UserIdentity> currentUser = Lists.newArrayList();
Env.getCurrentEnv().getAuth().checkPlainPassword(fullUserName, clientIp, passwd, currentUser);
Preconditions.checkState(currentUser.size() == 1);
}
@Override
public TLoadTxnBeginResult loadTxnBegin(TLoadTxnBeginRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn begin request: {}, backend: {}", request, clientAddr);
}
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to loadTxnBegin:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, clientAddr);
return result;
}
try {
TLoadTxnBeginResult tmpRes = loadTxnBeginImpl(request, clientAddr);
result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
e.getTxnId());
result.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
status.setStatusCode(TStatusCode.LABEL_ALREADY_EXISTS);
status.addToErrorMsgs(e.getMessage());
result.setJobStatus(e.getJobStatus());
} catch (MetaNotFoundException e) {
LOG.warn("failed to begin", e);
status.setStatusCode(TStatusCode.NOT_FOUND);
status.addToErrorMsgs(e.getMessage());
} catch (UserException e) {
LOG.warn("failed to begin: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private TLoadTxnBeginResult loadTxnBeginImpl(TLoadTxnBeginRequest request, String clientIp) throws UserException {
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
} else {
if (!checkToken(request.getToken())) {
throw new AuthenticationException("Invalid token: " + request.getToken());
}
}
// check label
if (Strings.isNullOrEmpty(request.getLabel())) {
throw new UserException("empty label in begin request");
}
// check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new MetaNotFoundException("unknown database, database=" + dbName);
}
OlapTable table = (OlapTable) db.getTableOrMetaException(request.tbl, TableType.OLAP);
// begin
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
if (request.isSetToken()) {
txnCoord.isFromInternal = true;
}
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), Lists.newArrayList(table.getId()), request.getLabel(), request.getRequestId(),
txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
TLoadTxnBeginResult result = new TLoadTxnBeginResult();
result.setTxnId(txnId).setDbId(db.getId());
return result;
}
@Override
public TBeginTxnResult beginTxn(TBeginTxnRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn begin request: {}, client: {}", request, clientAddr);
}
TBeginTxnResult result = new TBeginTxnResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
TBeginTxnResult tmpRes = beginTxnImpl(request, clientAddr);
result.setTxnId(tmpRes.getTxnId()).setDbId(tmpRes.getDbId());
if (tmpRes.isSetSubTxnIds()) {
result.setSubTxnIds(tmpRes.getSubTxnIds());
}
} catch (DuplicatedRequestException e) {
// this is a duplicate request, just return previous txn id
LOG.warn("duplicate request for stream load. request id: {}, txn: {}", e.getDuplicatedRequestId(),
e.getTxnId());
result.setTxnId(e.getTxnId());
} catch (LabelAlreadyUsedException e) {
status.setStatusCode(TStatusCode.LABEL_ALREADY_EXISTS);
status.addToErrorMsgs(e.getMessage());
result.setJobStatus(e.getJobStatus());
} catch (UserException e) {
LOG.warn("failed to begin: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private TBeginTxnResult beginTxnImpl(TBeginTxnRequest request, String clientIp) throws UserException {
/// Check required arg: user, passwd, db, tables, label
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetTableIds()) {
throw new UserException("table ids is not set");
}
if (!request.isSetLabel()) {
throw new UserException("label is not set");
}
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
// lookup table ids && convert into tableNameList
List<String> tableNameList = getTableNames(request.getDb(), request.getTableIds());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tableNameList,
request.getUserIp(), PrivPredicate.LOAD);
}
// step 2: check label
if (Strings.isNullOrEmpty(request.getLabel())) {
throw new UserException("empty label in begin request");
}
// step 3: check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new MetaNotFoundException("unknown database, database=" + dbName);
}
// step 4: fetch all tableIds
// table ids is checked at step 1
List<Long> tableIdList = request.getTableIds();
// step 5: get timeout
long timeoutSecond = request.isSetTimeout() ? request.getTimeout() : Config.stream_load_default_timeout_second;
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
long startTime = backend != null ? backend.getLastStartTime() : 0;
TxnCoordinator txnCoord = new TxnCoordinator(TxnSourceType.BE, request.getBackendId(), clientIp, startTime);
// step 6: begin transaction
long txnId = Env.getCurrentGlobalTransactionMgr().beginTransaction(
db.getId(), tableIdList, request.getLabel(), request.getRequestId(), txnCoord,
TransactionState.LoadJobSourceType.BACKEND_STREAMING, -1, timeoutSecond);
// step 7: return result
TBeginTxnResult result = new TBeginTxnResult();
result.setTxnId(txnId).setDbId(db.getId());
if (request.isSetSubTxnNum() && request.getSubTxnNum() > 0) {
result.addToSubTxnIds(txnId);
for (int i = 0; i < request.getSubTxnNum() - 1; i++) {
result.addToSubTxnIds(Env.getCurrentGlobalTransactionMgr().getNextTransactionId());
}
}
return result;
}
@Override
public TLoadTxnCommitResult loadTxnPreCommit(TLoadTxnCommitRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn pre-commit request: {}, backend: {}", request, clientAddr);
}
TLoadTxnCommitResult result = new TLoadTxnCommitResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to loadTxnPreCommit:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, clientAddr);
return result;
}
try {
loadTxnPreCommitImpl(request);
} catch (UserException e) {
LOG.warn("failed to pre-commit txn: {}: {}", request.getTxnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private List<Table> queryLoadCommitTables(TLoadTxnCommitRequest request, Database db) throws UserException {
if (request.isSetTableId() && request.getTableId() > 0) {
Table table = Env.getCurrentEnv().getInternalCatalog().getTableByTableId(request.getTableId());
if (table == null) {
throw new MetaNotFoundException("unknown table, table_id=" + request.getTableId());
}
return Collections.singletonList(table);
}
List<String> tbNames;
// check has multi table
if (CollectionUtils.isNotEmpty(request.getTbls())) {
tbNames = request.getTbls();
} else {
tbNames = Collections.singletonList(request.getTbl());
}
List<Table> tables = new ArrayList<>(tbNames.size());
for (String tbl : tbNames) {
OlapTable table = (OlapTable) db.getTableOrMetaException(tbl, TableType.OLAP);
tables.add(table);
}
if (tables.size() > 1) {
tables.sort(Comparator.comparing(Table::getId));
}
// if it has multi table, use multi table and update multi table running
// transaction table ids
if (CollectionUtils.isNotEmpty(request.getTbls())) {
List<Long> multiTableIds = tables.stream().map(Table::getId).collect(Collectors.toList());
Env.getCurrentGlobalTransactionMgr()
.updateMultiTableRunningTransactionTableIds(db.getId(), request.getTxnId(), multiTableIds);
if (LOG.isDebugEnabled()) {
LOG.debug("txn {} has multi table {}", request.getTxnId(), request.getTbls());
}
}
return tables;
}
private void loadTxnPreCommitImpl(TLoadTxnCommitRequest request) throws UserException {
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
if (!checkToken(request.getToken())) {
throw new AuthenticationException("Invalid token: " + request.getToken());
}
} else {
// refactoring it
if (CollectionUtils.isNotEmpty(request.getTbls())) {
for (String tbl : request.getTbls()) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
tbl,
request.getUserIp(), PrivPredicate.LOAD);
}
} else {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
}
// get database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db;
if (request.isSetDbId() && request.getDbId() > 0) {
db = env.getInternalCatalog().getDbNullable(request.getDbId());
} else {
db = env.getInternalCatalog().getDbNullable(fullDbName);
}
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
List<Table> tables = queryLoadCommitTables(request, db);
Env.getCurrentGlobalTransactionMgr()
.preCommitTransaction2PC(db, tables, request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
}
@Override
public TLoadTxn2PCResult loadTxn2PC(TLoadTxn2PCRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn 2PC request: {}, backend: {}", request, clientAddr);
}
TLoadTxn2PCResult result = new TLoadTxn2PCResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to loadTxn2PC:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, clientAddr);
return result;
}
try {
loadTxn2PCImpl(request);
} catch (UserException e) {
LOG.warn("failed to {} txn {}: {}", request.getOperation(), request.getTxnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private void loadTxn2PCImpl(TLoadTxn2PCRequest request) throws UserException {
String dbName = request.getDb();
if (Strings.isNullOrEmpty(dbName)) {
throw new UserException("No database selected.");
}
String fullDbName = dbName;
// get database
Env env = Env.getCurrentEnv();
Database database = env.getInternalCatalog().getDbNullable(fullDbName);
if (database == null) {
throw new UserException("unknown database, database=" + fullDbName);
}
String txnOperation = request.getOperation().trim();
if (!request.isSetTxnId()) {
List<TransactionStatus> statusList = new ArrayList<>();
statusList.add(TransactionStatus.PRECOMMITTED);
if (txnOperation.equalsIgnoreCase("abort")) {
statusList.add(TransactionStatus.PREPARE);
}
request.setTxnId(Env.getCurrentGlobalTransactionMgr()
.getTransactionIdByLabel(database.getId(), request.getLabel(), statusList));
}
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(database.getId(), request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
if (LOG.isDebugEnabled()) {
LOG.debug("txn {} has multi table {}", request.getTxnId(), transactionState.getTableIdList());
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = new ArrayList<>();
// if table was dropped, stream load must can abort.
if (txnOperation.equalsIgnoreCase("abort")) {
tableList = database.getTablesOnIdOrderIfExist(tableIdList);
} else {
tableList = database.getTablesOnIdOrderOrThrowException(tableIdList);
}
for (Table table : tableList) {
// check auth
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
table.getName(),
request.getUserIp(), PrivPredicate.LOAD);
}
if (txnOperation.equalsIgnoreCase("commit")) {
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000;
Env.getCurrentGlobalTransactionMgr()
.commitTransaction2PC(database, tableList, request.getTxnId(), timeoutMs);
} else if (txnOperation.equalsIgnoreCase("abort")) {
Env.getCurrentGlobalTransactionMgr().abortTransaction2PC(database.getId(), request.getTxnId(), tableList);
} else {
throw new UserException("transaction operation should be \'commit\' or \'abort\'");
}
}
@Override
public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws TException {
multiTableFragmentInstanceIdIndexMap.remove(request.getTxnId());
deleteMultiTableStreamLoadJobIndex(request.getTxnId());
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn commit request: {}, backend: {}", request, clientAddr);
}
TLoadTxnCommitResult result = new TLoadTxnCommitResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to loadTxnCommit:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, clientAddr);
return result;
}
if (DebugPointUtil.isEnable("load.commit_timeout")) {
try {
Thread.sleep(60 * 1000);
} catch (InterruptedException e) {
LOG.warn("failed to sleep", e);
}
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs("load commit timeout");
return result;
}
try {
if (!loadTxnCommitImpl(request)) {
// committed success but not visible
status.setStatusCode(TStatusCode.PUBLISH_TIMEOUT);
status.addToErrorMsgs("transaction commit successfully, BUT data will be visible later");
}
} catch (UserException e) {
LOG.warn("failed to commit txn: {}", request.getTxnId(), e);
// DELETE_BITMAP_LOCK_ERR will be retried on be
if (e.getErrorCode() == InternalErrorCode.DELETE_BITMAP_LOCK_ERR) {
status.setStatusCode(TStatusCode.DELETE_BITMAP_LOCK_ERROR);
status.addToErrorMsgs(e.getMessage());
} else {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
}
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
// return true if commit success and publish success, return false if publish
// timeout
private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException {
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
if (CollectionUtils.isNotEmpty(request.getTbls())) {
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbls(), request.getUserIp(), PrivPredicate.LOAD);
} else {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(), request.getUserIp(), PrivPredicate.LOAD);
}
}
if (request.groupCommit) {
try {
Env.getCurrentEnv().getGroupCommitManager().updateLoadData(request.table_id, request.receiveBytes);
} catch (Exception e) {
LOG.warn("Failed to update group commit load data, {}", e.getMessage());
}
}
// get database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db;
if (request.isSetDbId() && request.getDbId() > 0) {
db = env.getInternalCatalog().getDbNullable(request.getDbId());
} else {
db = env.getInternalCatalog().getDbNullable(fullDbName);
}
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName + " request.isSetDbId(): "
+ request.isSetDbId() + " id: " + Long.toString(request.isSetDbId() ? request.getDbId() : 0)
+ " fullDbName: " + fullDbName);
}
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2
: Config.try_commit_lock_timeout_seconds * 1000;
List<Table> tables = queryLoadCommitTables(request, db);
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tables, request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.txnCommitAttachment));
}
@Override
public TCommitTxnResult commitTxn(TCommitTxnRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn commit request: {}, client: {}", request, clientAddr);
}
TCommitTxnResult result = new TCommitTxnResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get commitTxn: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
if (!commitTxnImpl(request)) {
// committed success but not visible
status.setStatusCode(TStatusCode.PUBLISH_TIMEOUT);
status.addToErrorMsgs("transaction commit successfully, BUT data will be visible later");
}
} catch (UserException e) {
LOG.warn("failed to commit txn: {}: {}", request.getTxnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
// return true if commit success and publish success, return false if publish
// timeout
private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException {
/// Check required arg: user, passwd, db, txn_id, commit_infos
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetTxnId()) {
throw new UserException("txn_id is not set");
}
if (request.isSetTxnInsert() && request.isTxnInsert()) {
if (!request.isSetSubTxnInfos()) {
throw new UserException("sub_txn_infos is not set");
}
} else {
if (!request.isSetCommitInfos()) {
throw new UserException("commit_infos is not set");
}
}
// Step 1: get && check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db;
if (request.isSetDbId() && request.getDbId() > 0) {
db = env.getInternalCatalog().getDbNullable(request.getDbId());
} else {
db = env.getInternalCatalog().getDbNullable(fullDbName);
}
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
// Step 2: get tables
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
// if table was dropped, transaction must be aborted
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
// Step 3: check auth
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
// Step 4: get timeout
long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2
: Config.try_commit_lock_timeout_seconds * 1000;
// Step 5: commit and publish
if (request.isSetTxnInsert() && request.isTxnInsert()) {
List<Long> subTxnIds = new ArrayList<>();
List<SubTransactionState> subTransactionStates = new ArrayList<>();
for (TSubTxnInfo subTxnInfo : request.getSubTxnInfos()) {
TableIf table = db.getTableNullable(subTxnInfo.getTableId());
if (table == null) {
continue;
}
subTxnIds.add(subTxnInfo.getSubTxnId());
subTransactionStates.add(
new SubTransactionState(subTxnInfo.getSubTxnId(), (Table) table,
subTxnInfo.getTabletCommitInfos(), null));
}
transactionState.setSubTxnIds(subTxnIds);
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, request.getTxnId(),
subTransactionStates, timeoutMs);
} else if (!request.isOnlyCommit()) {
return Env.getCurrentGlobalTransactionMgr()
.commitAndPublishTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
} else {
// single table commit, so don't need to wait for publish.
Env.getCurrentGlobalTransactionMgr()
.commitTransaction(db, tableList,
request.getTxnId(),
TabletCommitInfo.fromThrift(request.getCommitInfos()), timeoutMs,
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()));
return true;
}
}
@Override
public TLoadTxnRollbackResult loadTxnRollback(TLoadTxnRollbackRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn rollback request: {}, backend: {}", request, clientAddr);
}
TLoadTxnRollbackResult result = new TLoadTxnRollbackResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to loadTxnRollback:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, clientAddr);
return result;
}
try {
if (DebugPointUtil.isEnable("FrontendServiceImpl.loadTxnRollback.error")) {
throw new UserException("FrontendServiceImpl.loadTxnRollback.error");
}
loadTxnRollbackImpl(request);
} catch (MetaNotFoundException e) {
LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e);
status.setStatusCode(TStatusCode.NOT_FOUND);
status.addToErrorMsgs(e.getMessage());
} catch (UserException e) {
LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private void loadTxnRollbackImpl(TLoadTxnRollbackRequest request) throws UserException {
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
// multi table load
if (CollectionUtils.isNotEmpty(request.getTbls())) {
for (String tbl : request.getTbls()) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
tbl,
request.getUserIp(), PrivPredicate.LOAD);
}
} else {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
}
String dbName = request.getDb();
Database db;
if (request.isSetDbId() && request.getDbId() > 0) {
db = Env.getCurrentInternalCatalog().getDbNullable(request.getDbId());
} else {
db = Env.getCurrentInternalCatalog().getDbNullable(dbName);
}
if (db == null) {
throw new MetaNotFoundException("db " + request.getDb() + " does not exist");
}
long dbId = db.getId();
if (request.getTxnId() > 0) { // txnId is required in thrift
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(dbId, request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Table> tableList = db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, request.getTxnId(),
request.isSetReason() ? request.getReason() : "system cancel",
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
} else if (request.isSetLabel()) {
Env.getCurrentGlobalTransactionMgr()
.abortTransaction(db.getId(), request.getLabel(),
request.isSetReason() ? request.getReason() : "system cancel");
} else {
throw new UserException("must set txn_id or label");
}
}
@Override
public TRollbackTxnResult rollbackTxn(TRollbackTxnRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive txn rollback request: {}, client: {}", request, clientAddr);
}
TRollbackTxnResult result = new TRollbackTxnResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get rollbackTxn: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
rollbackTxnImpl(request);
} catch (UserException e) {
LOG.warn("failed to rollback txn {}: {}", request.getTxnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private void rollbackTxnImpl(TRollbackTxnRequest request) throws UserException {
/// Check required arg: user, passwd, db, txn_id
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetTxnId()) {
throw new UserException("txn_id is not set");
}
// Step 1: get && check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db;
if (request.isSetDbId() && request.getDbId() > 0) {
db = env.getInternalCatalog().getDbNullable(request.getDbId());
} else {
db = env.getInternalCatalog().getDbNullable(fullDbName);
}
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
// Step 2: get tables
TransactionState transactionState = Env.getCurrentGlobalTransactionMgr()
.getTransactionState(db.getId(), request.getTxnId());
if (transactionState == null) {
throw new UserException("transaction [" + request.getTxnId() + "] not found");
}
List<Long> tableIdList = transactionState.getTableIdList();
List<Table> tableList = db.getTablesOnIdOrderOrThrowException(tableIdList);
// Step 3: check auth
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (request.isSetToken()) {
checkToken(request.getToken());
} else {
List<String> tables = tableList.stream().map(Table::getName).collect(Collectors.toList());
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), tables,
request.getUserIp(), PrivPredicate.LOAD);
}
// Step 4: abort txn
Env.getCurrentGlobalTransactionMgr().abortTransaction(db.getId(), request.getTxnId(),
request.isSetReason() ? request.getReason() : "system cancel",
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
}
@Override
public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load put request: {}, backend: {}", request, clientAddr);
}
TStreamLoadPutResult result = new TStreamLoadPutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
StreamLoadHandler streamLoadHandler = new StreamLoadHandler(request, null, result, clientAddr);
try {
streamLoadHandler.setCloudCluster();
List<TPipelineWorkloadGroup> tWorkloadGroupList = null;
// mysql load request not carry user info, need fix it later.
boolean hasUserName = !StringUtils.isEmpty(request.getUser());
if (Config.enable_workload_group && hasUserName) {
tWorkloadGroupList = Env.getCurrentEnv().getWorkloadGroupMgr()
.getWorkloadGroup(ConnectContext.get());
}
if (!Strings.isNullOrEmpty(request.getLoadSql())) {
httpStreamPutImpl(request, result);
if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) {
result.pipeline_params.setWorkloadGroups(tWorkloadGroupList);
}
return result;
} else {
streamLoadHandler.generatePlan();
result.setPipelineParams((TPipelineFragmentParams) streamLoadHandler.getFragmentParams().get(0));
}
if (tWorkloadGroupList != null && tWorkloadGroupList.size() > 0) {
result.pipeline_params.setWorkloadGroups(tWorkloadGroupList);
}
} catch (MetaNotFoundException e) {
LOG.warn("failed to rollback txn, id: {}, label: {}", request.getTxnId(), request.getLabel(), e);
status.setStatusCode(TStatusCode.NOT_FOUND);
status.addToErrorMsgs(e.getMessage());
} catch (UserException e) {
LOG.warn("failed to get stream load plan, label: {}", request.getLabel(), e);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("stream load catch unknown result, label: {}", request.getLabel(), e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
} finally {
ConnectContext.remove();
}
return result;
}
private void deleteMultiTableStreamLoadJobIndex(long txnId) {
try {
Env.getCurrentEnv().getRoutineLoadManager().removeMultiLoadTaskTxnIdToRoutineLoadJobId(txnId);
} catch (Exception e) {
LOG.warn("failed to delete multi table stream load job index: {}", e.getMessage());
}
}
@Override
public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequest request) {
TStreamLoadMultiTablePutResult result = new TStreamLoadMultiTablePutResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
List planFragmentParamsList = new ArrayList<>();
multiTableFragmentInstanceIdIndexMap.putIfAbsent(request.getTxnId(), new AtomicInteger(1));
AtomicInteger index = multiTableFragmentInstanceIdIndexMap.get(request.getTxnId());
StreamLoadHandler streamLoadHandler = new StreamLoadHandler(request, index, null,
getClientAddrAsString());
try {
streamLoadHandler.generatePlan();
planFragmentParamsList.addAll(streamLoadHandler.getFragmentParams());
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load multi table put request result: {}", result);
}
} catch (UserException exception) {
LOG.warn("failed to get stream load plan: {}", exception.getMessage());
status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(exception.getMessage());
result.setStatus(status);
try {
RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager()
.getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId());
routineLoadJob.updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.INTERNAL_ERR,
"failed to get stream load plan, " + exception.getMessage()), false);
} catch (Throwable e) {
LOG.warn("catch update routine load job error.", e);
}
return result;
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
return result;
} finally {
ConnectContext.remove();
}
result.setPipelineParams(planFragmentParamsList);
if (LOG.isDebugEnabled()) {
LOG.debug("receive stream load multi table put request result: {}", result);
}
return result;
}
private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, ConnectContext ctx)
throws UserException {
String originStmt = request.getLoadSql();
HttpStreamParams httpStreamParams;
try {
while (DebugPointUtil.isEnable("FE.FrontendServiceImpl.initHttpStreamPlan.block")) {
Thread.sleep(1000);
LOG.info("block initHttpStreamPlan");
}
StmtExecutor executor = new StmtExecutor(ctx, originStmt);
ctx.setExecutor(executor);
httpStreamParams = executor.generateHttpStreamPlan(ctx.queryId());
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
Coordinator coord = new Coordinator(ctx, analyzer, executor.planner());
coord.setLoadMemLimit(request.getExecMemLimit());
coord.setQueryType(TQueryType.LOAD);
TableIf table = httpStreamParams.getTable();
if (table instanceof OlapTable) {
boolean isEnableMemtableOnSinkNode = ((OlapTable) table).getTableProperty().getUseSchemaLightChange()
? coord.getQueryOptions().isEnableMemtableOnSinkNode()
: false;
coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
}
httpStreamParams.setParams(coord.getStreamLoadPlan());
} catch (UserException e) {
LOG.warn("exec sql error", e);
throw e;
} catch (Throwable e) {
LOG.warn("exec sql error catch unknown result.", e);
throw new UserException("exec sql error catch unknown result." + e);
}
return httpStreamParams;
}
private void httpStreamPutImpl(TStreamLoadPutRequest request, TStreamLoadPutResult result)
throws UserException {
if (LOG.isDebugEnabled()) {
LOG.debug("receive http stream put request: {}", request);
}
ConnectContext ctx = ConnectContext.get();
if (request.isSetAuthCode()) {
// TODO: deprecated, removed in 3.1, use token instead.
} else if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTbl(),
request.getUserIp(), PrivPredicate.LOAD);
}
if (request.isSetMemtableOnSinkNode()) {
ctx.getSessionVariable().enableMemtableOnSinkNode = request.isMemtableOnSinkNode();
} else {
ctx.getSessionVariable().enableMemtableOnSinkNode = Config.stream_load_default_memtable_on_sink_node;
}
ctx.getSessionVariable().groupCommit = request.getGroupCommitMode();
if (request.isSetPartialUpdate() && !request.isPartialUpdate()) {
ctx.getSessionVariable().setEnableUniqueKeyPartialUpdate(false);
}
try {
HttpStreamParams httpStreamParams = initHttpStreamPlan(request, ctx);
int loadStreamPerNode = 2;
if (request.getStreamPerNode() > 0) {
loadStreamPerNode = request.getStreamPerNode();
}
httpStreamParams.getParams().setLoadStreamPerNode(loadStreamPerNode);
httpStreamParams.getParams().setTotalLoadStreams(loadStreamPerNode);
httpStreamParams.getParams().setNumLocalSink(1);
TransactionState txnState = Env.getCurrentGlobalTransactionMgr().getTransactionState(
httpStreamParams.getDb().getId(), httpStreamParams.getTxnId());
if (txnState == null) {
LOG.warn("Not found http stream related txn, txn id = {}", httpStreamParams.getTxnId());
} else {
TxnCoordinator txnCoord = txnState.getCoordinator();
Backend backend = Env.getCurrentSystemInfo().getBackend(request.getBackendId());
if (backend != null) {
// only modify txnCoord in memory, not write editlog yet.
txnCoord.sourceType = TxnSourceType.BE;
txnCoord.id = backend.getId();
txnCoord.ip = backend.getHost();
txnCoord.startTime = backend.getLastStartTime();
LOG.info("Change http stream related txn {} to coordinator {}",
httpStreamParams.getTxnId(), txnCoord);
}
}
result.setPipelineParams(httpStreamParams.getParams());
result.getPipelineParams().setDbName(httpStreamParams.getDb().getFullName());
result.getPipelineParams().setTableName(httpStreamParams.getTable().getName());
result.getPipelineParams().setTxnConf(new TTxnParams().setTxnId(httpStreamParams.getTxnId()));
result.getPipelineParams().setImportLabel(httpStreamParams.getLabel());
result.getPipelineParams()
.setIsMowTable(((OlapTable) httpStreamParams.getTable()).getEnableUniqueKeyMergeOnWrite());
result.setDbId(httpStreamParams.getDb().getId());
result.setTableId(httpStreamParams.getTable().getId());
result.setBaseSchemaVersion(((OlapTable) httpStreamParams.getTable()).getBaseSchemaVersion());
result.setGroupCommitIntervalMs(((OlapTable) httpStreamParams.getTable()).getGroupCommitIntervalMs());
result.setGroupCommitDataBytes(((OlapTable) httpStreamParams.getTable()).getGroupCommitDataBytes());
result.setWaitInternalGroupCommitFinish(Config.wait_internal_group_commit_finish);
} catch (UserException e) {
LOG.warn("exec sql error", e);
throw e;
} catch (Throwable e) {
LOG.warn("exec sql error catch unknown result.", e);
throw new UserException("exec sql error catch unknown result." + e);
}
}
@Override
public TStatus snapshotLoaderReport(TSnapshotLoaderReportRequest request) throws TException {
if (Env.getCurrentEnv().getBackupHandler().report(request.getTaskType(), request.getJobId(),
request.getTaskId(), request.getFinishedNum(), request.getTotalNum())) {
return new TStatus(TStatusCode.OK);
}
return new TStatus(TStatusCode.CANCELLED);
}
@Override
public TFrontendReportAliveSessionResult getAliveSessions(TFrontendReportAliveSessionRequest request)
throws TException {
TFrontendReportAliveSessionResult result = new TFrontendReportAliveSessionResult();
result.setStatus(TStatusCode.OK);
if (LOG.isDebugEnabled()) {
LOG.debug("receive get alive sessions request: {}", request);
}
Env env = Env.getCurrentEnv();
if (env.isReady()) {
if (request.getClusterId() != env.getClusterId()) {
result.setStatus(TStatusCode.INVALID_ARGUMENT);
result.setMsg("invalid cluster id: " + Env.getCurrentEnv().getClusterId());
} else if (!request.getToken().equals(env.getToken())) {
result.setStatus(TStatusCode.INVALID_ARGUMENT);
result.setMsg("invalid token: " + Env.getCurrentEnv().getToken());
} else {
result.setMsg("success");
result.setSessionIdList(env.getAllAliveSessionIds());
}
} else {
result.setStatus(TStatusCode.ILLEGAL_STATE);
result.setMsg("not ready");
}
return result;
}
@Override
public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException {
boolean isReady = Env.getCurrentEnv().isReady();
TFrontendPingFrontendResult result = new TFrontendPingFrontendResult();
// The following fields are required in thrift.
// So must give them a default value to avoid "Required field xx was not present" error.
result.setStatus(TFrontendPingFrontendStatusCode.OK);
result.setMsg("");
result.setQueryPort(0);
result.setRpcPort(0);
result.setReplayedJournalId(0);
result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
if (isReady) {
if (request.getClusterId() != Env.getCurrentEnv().getClusterId()) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("invalid cluster id: " + Env.getCurrentEnv().getClusterId());
}
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
// If the version of FE is too old, we need to ensure compatibility.
if (request.getDeployMode() == null) {
LOG.warn("Couldn't find deployMode in heartbeat info, "
+ "maybe you need upgrade FE master.");
} else if (!request.getDeployMode().equals(Env.getCurrentEnv().getDeployMode())) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("expected deployMode: "
+ request.getDeployMode()
+ ", but found deployMode: "
+ Env.getCurrentEnv().getDeployMode());
}
}
if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
if (!request.getToken().equals(Env.getCurrentEnv().getToken())) {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("invalid token: " + Env.getCurrentEnv().getToken());
}
}
if (result.status == TFrontendPingFrontendStatusCode.OK) {
// cluster id and token are valid, return replayed journal id
long replayedJournalId = Env.getCurrentEnv().getReplayedJournalId();
result.setMsg("success");
result.setReplayedJournalId(replayedJournalId);
result.setQueryPort(Config.query_port);
result.setRpcPort(Config.rpc_port);
result.setArrowFlightSqlPort(Config.arrow_flight_sql_port);
result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
result.setLastStartupTime(exeEnv.getStartupTime());
result.setProcessUUID(exeEnv.getProcessUUID());
if (exeEnv.getDiskInfos() != null) {
result.setDiskInfos(FeDiskInfo.toThrifts(exeEnv.getDiskInfos()));
}
}
} else {
result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
result.setMsg("not ready");
}
return result;
}
@Override
public TFetchSchemaTableDataResult fetchSchemaTableData(TFetchSchemaTableDataRequest request) throws TException {
try {
if (!request.isSetSchemaTableName()) {
return MetadataGenerator.errorResult("Fetch schema table name is not set");
}
// tvf queries
if (request.getSchemaTableName() == TSchemaTableName.METADATA_TABLE) {
return MetadataGenerator.getMetadataTable(request);
} else {
// database information_schema's tables
return MetadataGenerator.getSchemaTableData(request);
}
} catch (Exception e) {
LOG.warn("Failed to fetchSchemaTableData", e);
return MetadataGenerator.errorResult(e.getMessage());
}
}
private TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
if (connectionContext != null) {
return connectionContext.getClient();
}
return null;
}
private String getClientAddrAsString() {
TNetworkAddress addr = getClientAddr();
return addr == null ? "unknown" : addr.hostname;
}
@Override
public TWaitingTxnStatusResult waitingTxnStatus(TWaitingTxnStatusRequest request) throws TException {
TWaitingTxnStatusResult result = new TWaitingTxnStatusResult();
result.setStatus(new TStatus());
try {
result = Env.getCurrentGlobalTransactionMgr().getWaitingTxnStatus(request);
result.status.setStatusCode(TStatusCode.OK);
} catch (TimeoutException e) {
result.status.setStatusCode(TStatusCode.INCOMPLETE);
result.status.addToErrorMsgs(e.getMessage());
} catch (AnalysisException e) {
result.status.setStatusCode(TStatusCode.INTERNAL_ERROR);
result.status.addToErrorMsgs(e.getMessage());
}
return result;
}
@Override
public TInitExternalCtlMetaResult initExternalCtlMeta(TInitExternalCtlMetaRequest request) throws TException {
if (request.isSetCatalogId() && request.isSetDbId()) {
return initDb(request.catalogId, request.dbId);
} else if (request.isSetCatalogId()) {
return initCatalog(request.catalogId);
} else {
throw new TException("Catalog name is not set. Init failed.");
}
}
private TInitExternalCtlMetaResult initCatalog(long catalogId) throws TException {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (!(catalog instanceof ExternalCatalog)) {
throw new TException("Only support forward ExternalCatalog init operation.");
}
TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
try {
((ExternalCatalog) catalog).makeSureInitialized();
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
result.setStatus(MasterCatalogExecutor.STATUS_OK);
} catch (Throwable t) {
LOG.warn("init catalog failed. catalog: {}", catalog.getName(), t);
result.setStatus(Util.getRootCauseStack(t));
}
return result;
}
private TInitExternalCtlMetaResult initDb(long catalogId, long dbId) throws TException {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(catalogId);
if (!(catalog instanceof ExternalCatalog)) {
throw new TException("Only support forward ExternalCatalog init operation.");
}
DatabaseIf db = catalog.getDbNullable(dbId);
if (db == null) {
throw new TException("database " + dbId + " is null");
}
if (!(db instanceof ExternalDatabase)) {
throw new TException("Only support forward ExternalDatabase init operation.");
}
TInitExternalCtlMetaResult result = new TInitExternalCtlMetaResult();
try {
((ExternalDatabase) db).makeSureInitialized();
result.setMaxJournalId(Env.getCurrentEnv().getMaxJournalId());
result.setStatus(MasterCatalogExecutor.STATUS_OK);
} catch (Throwable t) {
LOG.warn("init database failed. catalog.database: {}", catalog.getName(), db.getFullName(), t);
result.setStatus(Util.getRootCauseStack(t));
}
return result;
}
@Override
public TMySqlLoadAcquireTokenResult acquireToken() throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive acquire token request from client: {}", clientAddr);
}
TMySqlLoadAcquireTokenResult result = new TMySqlLoadAcquireTokenResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
try {
String token = Env.getCurrentEnv().getTokenManager().acquireToken();
result.setToken(token);
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
@Override
public boolean checkToken(String token) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive check token request from client: {}", clientAddr);
}
try {
return Env.getCurrentEnv().getTokenManager().checkAuthToken(token);
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
return false;
}
}
@Override
public TCheckAuthResult checkAuth(TCheckAuthRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive auth request: {}, backend: {}", request, clientAddr);
}
TCheckAuthResult result = new TCheckAuthResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
// check account and password
final String fullUserName = ClusterNamespace.getNameFromFullName(request.getUser());
List<UserIdentity> currentUser = Lists.newArrayList();
try {
Env.getCurrentEnv().getAuth().checkPlainPassword(fullUserName, request.getUserIp(), request.getPasswd(),
currentUser);
} catch (AuthenticationException e) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
Preconditions.checkState(currentUser.size() == 1);
PrivPredicate predicate = getPrivPredicate(request.getPrivType());
if (predicate == null) {
return result;
}
// check privilege
AccessControllerManager accessManager = Env.getCurrentEnv().getAccessManager();
TPrivilegeCtrl privCtrl = request.getPrivCtrl();
TPrivilegeHier privHier = privCtrl.getPrivHier();
if (privHier == TPrivilegeHier.GLOBAL) {
if (!accessManager.checkGlobalPriv(currentUser.get(0), predicate)) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Global permissions error");
}
} else if (privHier == TPrivilegeHier.CATALOG) {
if (!accessManager.checkCtlPriv(currentUser.get(0), privCtrl.getCtl(), predicate)) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Catalog permissions error");
}
} else if (privHier == TPrivilegeHier.DATABASE) {
String fullDbName = privCtrl.getDb();
if (!accessManager.checkDbPriv(currentUser.get(0), privCtrl.getCtl(), fullDbName,
predicate)) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Database permissions error");
}
} else if (privHier == TPrivilegeHier.TABLE) {
String fullDbName = privCtrl.getDb();
if (!accessManager.checkTblPriv(currentUser.get(0), privCtrl.getCtl(), fullDbName, privCtrl.getTbl(),
predicate)) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Table permissions error");
}
} else if (privHier == TPrivilegeHier.COLUMNS) {
String fullDbName = privCtrl.getDb();
try {
accessManager.checkColumnsPriv(currentUser.get(0), InternalCatalog.INTERNAL_CATALOG_NAME, fullDbName,
privCtrl.getTbl(), privCtrl.getCols(),
predicate);
} catch (UserException e) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Columns permissions error:" + e.getMessage());
}
} else if (privHier == TPrivilegeHier.RESOURSE) {
if (!accessManager.checkResourcePriv(currentUser.get(0), privCtrl.getRes(), predicate)) {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Resourse permissions error");
}
} else {
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Privilege control error");
}
return result;
}
private PrivPredicate getPrivPredicate(TPrivilegeType privType) {
if (privType == null) {
return null;
}
switch (privType) {
case SHOW:
return PrivPredicate.SHOW;
case SHOW_RESOURCES:
return PrivPredicate.SHOW_RESOURCES;
case GRANT:
return PrivPredicate.GRANT;
case ADMIN:
return PrivPredicate.ADMIN;
case LOAD:
return PrivPredicate.LOAD;
case ALTER:
return PrivPredicate.ALTER;
case USAGE:
return PrivPredicate.USAGE;
case CREATE:
return PrivPredicate.CREATE;
case ALL:
return PrivPredicate.ALL;
case OPERATOR:
return PrivPredicate.OPERATOR;
case DROP:
return PrivPredicate.DROP;
default:
return null;
}
}
@Override
public TQueryStatsResult getQueryStats(TGetQueryStatsRequest request) throws TException {
TQueryStatsResult result = new TQueryStatsResult();
result.setStatus(new TStatus(TStatusCode.OK));
if (!request.isSetType()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("type is not set");
result.setStatus(status);
return result;
}
try {
switch (request.getType()) {
case CATALOG: {
if (!request.isSetCatalog()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog is not set");
result.setStatus(status);
} else {
result.setSimpleResult(Env.getCurrentEnv().getQueryStats().getCatalogStats(request.catalog));
}
break;
}
case DATABASE: {
if (!request.isSetCatalog() || !request.isSetDb()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog or db is not set");
result.setStatus(status);
return result;
} else {
result.setSimpleResult(
Env.getCurrentEnv().getQueryStats().getDbStats(request.catalog, request.db));
}
break;
}
case TABLE: {
if (!request.isSetCatalog() || !request.isSetDb() || !request.isSetTbl()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog or db or table is not set");
result.setStatus(status);
return result;
} else {
Env.getCurrentEnv().getQueryStats().getTblStats(request.catalog, request.db, request.tbl)
.forEach((col, stat) -> {
TTableQueryStats colunmStat = new TTableQueryStats();
colunmStat.setField(col);
colunmStat.setQueryStats(stat.first);
colunmStat.setFilterStats(stat.second);
result.addToTableStats(colunmStat);
});
}
break;
}
case TABLE_ALL: {
if (!request.isSetCatalog() || !request.isSetDb() || !request.isSetTbl()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog or db or table is not set");
result.setStatus(status);
} else {
result.setSimpleResult(Env.getCurrentEnv().getQueryStats()
.getTblAllStats(request.catalog, request.db, request.tbl));
}
break;
}
case TABLE_ALL_VERBOSE: {
if (!request.isSetCatalog() || !request.isSetDb() || !request.isSetTbl()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("catalog or db or table is not set");
result.setStatus(status);
} else {
Env.getCurrentEnv().getQueryStats()
.getTblAllVerboseStats(request.catalog, request.db, request.tbl)
.forEach((indexName, indexStats) -> {
TTableIndexQueryStats indexStat = new TTableIndexQueryStats();
indexStat.setIndexName(indexName);
indexStats.forEach((col, stat) -> {
TTableQueryStats colunmStat = new TTableQueryStats();
colunmStat.setField(col);
colunmStat.setQueryStats(stat.first);
colunmStat.setFilterStats(stat.second);
indexStat.addToTableStats(colunmStat);
});
result.addToTableVerbosStats(indexStat);
});
}
break;
}
case TABLET: {
if (!request.isSetReplicaId()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Replica Id is not set");
result.setStatus(status);
} else {
Map<Long, Long> tabletStats = new HashMap<>();
tabletStats.put(request.getReplicaId(),
Env.getCurrentEnv().getQueryStats().getStats(request.getReplicaId()));
result.setTabletStats(tabletStats);
}
break;
}
case TABLETS: {
if (!request.isSetReplicaIds()) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("Replica Ids is not set");
result.setStatus(status);
} else {
Map<Long, Long> tabletStats = new HashMap<>();
QueryStats qs = Env.getCurrentEnv().getQueryStats();
for (long replicaId : request.getReplicaIds()) {
tabletStats.put(replicaId, qs.getStats(replicaId));
}
result.setTabletStats(tabletStats);
}
break;
}
default: {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs("unknown type: " + request.getType());
result.setStatus(status);
break;
}
}
} catch (UserException e) {
TStatus status = new TStatus(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
result.setStatus(status);
}
return result;
}
public TGetTabletReplicaInfosResult getTabletReplicaInfos(TGetTabletReplicaInfosRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get replicas request: {}, backend: {}", request, clientAddr);
}
TGetTabletReplicaInfosResult result = new TGetTabletReplicaInfosResult();
List<Long> tabletIds = request.getTabletIds();
Map<Long, List<TReplicaInfo>> tabletReplicaInfos = Maps.newHashMap();
for (Long tabletId : tabletIds) {
if (DebugPointUtil.isEnable("getTabletReplicaInfos.returnEmpty")) {
LOG.info("enable getTabletReplicaInfos.returnEmpty");
continue;
}
List<TReplicaInfo> replicaInfos = Lists.newArrayList();
List<Replica> replicas = Env.getCurrentEnv().getCurrentInvertedIndex()
.getReplicasByTabletId(tabletId);
for (Replica replica : replicas) {
if (!replica.isNormal()) {
LOG.warn("replica {} not normal", replica.getId());
continue;
}
Backend backend = Env.getCurrentSystemInfo().getBackend(replica.getBackendIdWithoutException());
if (backend != null) {
TReplicaInfo replicaInfo = new TReplicaInfo();
replicaInfo.setHost(backend.getHost());
replicaInfo.setBePort(backend.getBePort());
replicaInfo.setHttpPort(backend.getHttpPort());
replicaInfo.setBrpcPort(backend.getBrpcPort());
replicaInfo.setReplicaId(replica.getId());
replicaInfos.add(replicaInfo);
}
}
tabletReplicaInfos.put(tabletId, replicaInfos);
}
result.setTabletReplicaInfos(tabletReplicaInfos);
result.setToken(Env.getCurrentEnv().getToken());
result.setStatus(new TStatus(TStatusCode.OK));
return result;
}
@Override
public TAutoIncrementRangeResult getAutoIncrementRange(TAutoIncrementRangeRequest request) {
String clientAddr = getClientAddrAsString();
LOG.info("[auto-inc] receive getAutoIncrementRange request: {}, backend: {}", request, clientAddr);
TAutoIncrementRangeResult result = new TAutoIncrementRangeResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("[auto-inc] failed to getAutoIncrementRange:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
try {
Env env = Env.getCurrentEnv();
Database db = env.getInternalCatalog().getDbOrMetaException(request.getDbId());
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(request.getTableId(), TableType.OLAP);
AutoIncrementGenerator autoIncrementGenerator = null;
autoIncrementGenerator = olapTable.getAutoIncrementGenerator();
long columnId = request.getColumnId();
long length = request.getLength();
Pair<Long, Long> range = autoIncrementGenerator.getAutoIncrementRange(columnId, length, -1);
result.setStart(range.first);
result.setLength(range.second);
} catch (UserException e) {
LOG.warn("[auto-inc] failed to get auto-increment range of db_id={}, table_id={}, column_id={}, errmsg={}",
request.getDbId(), request.getTableId(), request.getColumnId(), e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("[auto-inc] catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(e.getClass().getSimpleName() + ": " + Strings.nullToEmpty(e.getMessage()));
}
return result;
}
public TGetBinlogResult getBinlog(TGetBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get binlog request: {}", request);
}
TGetBinlogResult result = new TGetBinlogResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
boolean syncJournal = false;
if (!Env.getCurrentEnv().isMaster()) {
if (!request.isAllowFollowerRead()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get binlog: {}", NOT_MASTER_ERR_MSG);
return result;
}
syncJournal = true;
}
try {
/// Check all required arg: user, passwd, db, prev_commit_seq
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetPrevCommitSeq()) {
throw new UserException("prev_commit_seq is not set");
}
if (syncJournal) {
ConnectContext ctx = new ConnectContext(null);
ctx.setDatabase(request.getDb());
ctx.setQualifiedUser(request.getUser());
ctx.setEnv(Env.getCurrentEnv());
MasterOpExecutor executor = new MasterOpExecutor(ctx);
executor.syncJournal();
}
result = getBinlogImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get binlog: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private TGetBinlogResult getBinlogImpl(TGetBinlogRequest request, String clientIp) throws UserException {
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTable(),
request.getUserIp(), PrivPredicate.SELECT);
}
// step 3: check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
// step 4: fetch all tableIds
// lookup tables && convert into tableIdList
long tableId = -1;
if (request.isSetTableId()) {
tableId = request.getTableId();
} else if (request.isSetTable()) {
String tableName = request.getTable();
Table table = db.getTableOrMetaException(tableName, TableType.OLAP);
if (table == null) {
throw new UserException("unknown table, table=" + tableName);
}
tableId = table.getId();
}
// step 6: get binlog
long dbId = db.getId();
TGetBinlogResult result = new TGetBinlogResult();
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();
long numAcquired = request.getNumAcquired();
Pair<TStatus, List<TBinlog>> statusBinlogPair = env.getBinlogManager()
.getBinlog(dbId, tableId, prevCommitSeq, numAcquired);
TStatus status = statusBinlogPair.first;
if (status != null && status.getStatusCode() != TStatusCode.OK) {
result.setStatus(status);
// TOO_OLD return first exist binlog
if (status.getStatusCode() != TStatusCode.BINLOG_TOO_OLD_COMMIT_SEQ) {
return result;
}
}
List<TBinlog> binlogs = statusBinlogPair.second;
if (binlogs != null) {
result.setBinlogs(binlogs);
}
return result;
}
// getSnapshot
public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.trace("receive get snapshot info request: {}", request);
TGetSnapshotResult result = new TGetSnapshotResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get getSnapshot: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = getSnapshotImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get snapshot info: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
// getSnapshotImpl
private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp)
throws UserException, IOException {
// Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name,
// snapshot_type
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetLabelName()) {
throw new UserException("label_name is not set");
}
if (!request.isSetSnapshotName()) {
throw new UserException("snapshot_name is not set");
}
if (!request.isSetSnapshotType()) {
throw new UserException("snapshot_type is not set");
} else if (request.getSnapshotType() != TSnapshotType.LOCAL) {
throw new UserException("snapshot_type is not LOCAL");
}
LOG.info("get snapshot info, user: {}, db: {}, label_name: {}, snapshot_name: {}, snapshot_type: {}",
request.getUser(), request.getDb(), request.getLabelName(), request.getSnapshotName(),
request.getSnapshotType());
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTable(), clientIp, PrivPredicate.SELECT);
}
// Step 3: get snapshot
String label = request.getLabelName();
TGetSnapshotResult result = new TGetSnapshotResult();
result.setStatus(new TStatus(TStatusCode.OK));
Snapshot snapshot = Env.getCurrentEnv().getBackupHandler().getSnapshot(label);
if (snapshot == null) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_NOT_EXIST);
result.getStatus().addToErrorMsgs(String.format("snapshot %s not exist", label));
} else if (snapshot.isExpired()) {
result.getStatus().setStatusCode(TStatusCode.SNAPSHOT_EXPIRED);
result.getStatus().addToErrorMsgs(String.format("snapshot %s is expired", label));
} else {
byte[] meta = snapshot.getMeta();
byte[] jobInfo = snapshot.getJobInfo();
long expiredAt = snapshot.getExpiredAt();
long commitSeq = snapshot.getCommitSeq();
LOG.info("get snapshot info, snapshot: {}, meta size: {}, job info size: {}, "
+ "expired at: {}, commit seq: {}", label, meta.length, jobInfo.length, expiredAt, commitSeq);
if (request.isEnableCompress()) {
meta = GZIPUtils.compress(meta);
jobInfo = GZIPUtils.compress(jobInfo);
result.setCompressed(true);
if (LOG.isDebugEnabled()) {
LOG.debug("get snapshot info with compress, snapshot: {}, compressed meta "
+ "size {}, compressed job info size {}", label, meta.length, jobInfo.length);
}
}
result.setMeta(meta);
result.setJobInfo(jobInfo);
result.setExpiredAt(expiredAt);
result.setCommitSeq(commitSeq);
}
return result;
}
// Restore Snapshot
public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) throws TException {
String clientAddr = getClientAddrAsString();
LOG.trace("receive restore snapshot info request: {}", request);
TRestoreSnapshotResult result = new TRestoreSnapshotResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get restoreSnapshot: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = restoreSnapshotImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get snapshot info: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
// restoreSnapshotImpl
private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp)
throws UserException {
// Step 1: Check all required arg: user, passwd, db, label_name, repo_name,
// meta, info
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetLabelName()) {
throw new UserException("label_name is not set");
}
if (!request.isSetRepoName()) {
throw new UserException("repo_name is not set");
}
if (!request.isSetMeta()) {
throw new UserException("meta is not set");
}
if (!request.isSetJobInfo()) {
throw new UserException("job_info is not set");
}
// Step 2: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkDbPasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), clientIp,
PrivPredicate.LOAD);
}
// Step 3: get snapshot
TRestoreSnapshotResult result = new TRestoreSnapshotResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
LabelName label = new LabelName(request.getDb(), request.getLabelName());
String repoName = request.getRepoName();
Map<String, String> properties = request.getProperties();
// Restore requires that all properties are known, so the old version of FE will not be able
// to recognize the properties of the new version. Therefore, request parameters are used here
// instead of directly putting them in properties to avoid compatibility issues of cross-version
// synchronization.
if (request.isCleanPartitions()) {
properties.put(RestoreStmt.PROP_CLEAN_PARTITIONS, "true");
}
if (request.isCleanTables()) {
properties.put(RestoreStmt.PROP_CLEAN_TABLES, "true");
}
if (request.isAtomicRestore()) {
properties.put(RestoreStmt.PROP_ATOMIC_RESTORE, "true");
}
if (request.isForceReplace()) {
properties.put(RestoreStmt.PROP_FORCE_REPLACE, "true");
}
AbstractBackupTableRefClause restoreTableRefClause = null;
if (request.isSetTableRefs()) {
List<TableRef> tableRefs = new ArrayList<>();
for (TTableRef tTableRef : request.getTableRefs()) {
tableRefs.add(new TableRef(new TableName(tTableRef.getTable()), tTableRef.getAliasName()));
}
if (tableRefs.size() > 0) {
boolean isExclude = false;
restoreTableRefClause = new AbstractBackupTableRefClause(isExclude, tableRefs);
}
}
byte[] meta = request.getMeta();
byte[] jobInfo = request.getJobInfo();
if (Config.enable_restore_snapshot_rpc_compression && request.isCompressed()) {
if (LOG.isDebugEnabled()) {
LOG.debug("decompress meta and job info, compressed meta size {}, compressed job info size {}",
meta.length, jobInfo.length);
}
try {
meta = GZIPUtils.decompress(meta);
jobInfo = GZIPUtils.decompress(jobInfo);
} catch (Exception e) {
LOG.warn("decompress meta and job info failed", e);
throw new UserException("decompress meta and job info failed", e);
}
} else if (GZIPUtils.isGZIPCompressed(jobInfo) || GZIPUtils.isGZIPCompressed(meta)) {
throw new UserException("The request is compressed, but the config "
+ "`enable_restore_snapshot_rpc_compressed` is not enabled.");
}
RestoreStmt restoreStmt = new RestoreStmt(label, repoName, restoreTableRefClause, properties, meta, jobInfo);
restoreStmt.setIsBeingSynced();
LOG.debug("restore snapshot info, restoreStmt: {}", restoreStmt);
try {
ConnectContext ctx = new ConnectContext();
ctx.setQualifiedUser(request.getUser());
String fullUserName = ClusterNamespace.getNameFromFullName(request.getUser());
ctx.setCurrentUserIdentity(UserIdentity.createAnalyzedUserIdentWithIp(fullUserName, "%"));
ctx.setThreadLocalInfo();
Analyzer analyzer = new Analyzer(ctx.getEnv(), ctx);
restoreStmt.analyze(analyzer);
DdlExecutor.execute(Env.getCurrentEnv(), restoreStmt);
} catch (UserException e) {
LOG.warn("failed to restore: {}, stmt: {}", e.getMessage(), restoreStmt, e);
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage() + ", stmt: " + restoreStmt.toString());
} catch (Throwable e) {
LOG.warn("catch unknown result. stmt: {}", restoreStmt, e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()) + ", stmt: " + restoreStmt.toString());
} finally {
ConnectContext.remove();
}
return result;
}
@Override
public TPlsqlStoredProcedureResult addPlsqlStoredProcedure(TAddPlsqlStoredProcedureRequest request) {
TPlsqlStoredProcedureResult result = new TPlsqlStoredProcedureResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to addPlsqlStoredProcedure:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
if (!request.isSetPlsqlStoredProcedure()) {
status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
status.addToErrorMsgs("missing stored procedure.");
return result;
}
try {
Env.getCurrentEnv().getPlsqlManager()
.addPlsqlStoredProcedure(PlsqlStoredProcedure.fromThrift(request.getPlsqlStoredProcedure()),
request.isSetIsForce() && request.isIsForce());
} catch (RuntimeException e) {
status.setStatusCode(TStatusCode.ALREADY_EXIST);
status.addToErrorMsgs(e.getMessage());
return result;
}
return result;
}
@Override
public TPlsqlStoredProcedureResult dropPlsqlStoredProcedure(TDropPlsqlStoredProcedureRequest request) {
TPlsqlStoredProcedureResult result = new TPlsqlStoredProcedureResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to dropPlsqlStoredProcedure:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
if (!request.isSetPlsqlProcedureKey()) {
status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
status.addToErrorMsgs("missing stored key.");
return result;
}
Env.getCurrentEnv().getPlsqlManager().dropPlsqlStoredProcedure(PlsqlProcedureKey.fromThrift(
request.getPlsqlProcedureKey()));
return result;
}
@Override
public TPlsqlPackageResult addPlsqlPackage(TAddPlsqlPackageRequest request) throws TException {
TPlsqlPackageResult result = new TPlsqlPackageResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to addPlsqlPackage:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
if (!request.isSetPlsqlPackage()) {
status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
status.addToErrorMsgs("missing plsql package.");
return result;
}
try {
Env.getCurrentEnv().getPlsqlManager().addPackage(PlsqlPackage.fromThrift(request.getPlsqlPackage()),
request.isSetIsForce() && request.isIsForce());
} catch (RuntimeException e) {
status.setStatusCode(TStatusCode.ALREADY_EXIST);
status.addToErrorMsgs(e.getMessage());
return result;
}
return result;
}
@Override
public TPlsqlPackageResult dropPlsqlPackage(TDropPlsqlPackageRequest request) throws TException {
TPlsqlPackageResult result = new TPlsqlPackageResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to dropPlsqlPackage:{}, request:{}, backend:{}",
NOT_MASTER_ERR_MSG, request, getClientAddrAsString());
return result;
}
if (!request.isSetPlsqlProcedureKey()) {
status.setStatusCode(TStatusCode.INVALID_ARGUMENT);
status.addToErrorMsgs("missing stored key.");
return result;
}
Env.getCurrentEnv().getPlsqlManager().dropPackage(PlsqlProcedureKey.fromThrift(request.getPlsqlProcedureKey()));
return result;
}
public TGetMasterTokenResult getMasterToken(TGetMasterTokenRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get master token request: {}", request);
}
TGetMasterTokenResult result = new TGetMasterTokenResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get getMasterToken: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
checkPassword(request.getUser(), request.getPassword(), clientAddr);
result.setToken(Env.getCurrentEnv().getToken());
} catch (AuthenticationException e) {
LOG.warn("failed to get master token: {}", e.getMessage());
status.setStatusCode(TStatusCode.NOT_AUTHORIZED);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
}
return result;
}
// getBinlogLag
public TGetBinlogLagResult getBinlogLag(TGetBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get binlog lag request: {}", request);
}
TGetBinlogLagResult result = new TGetBinlogLagResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get binlog lag: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = getBinlogLagImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get binlog lag: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String clientIp) throws UserException {
/// Check all required arg: user, passwd, db, prev_commit_seq
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetPrevCommitSeq()) {
throw new UserException("prev_commit_seq is not set");
}
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTable(),
request.getUserIp(), PrivPredicate.SELECT);
}
// step 3: check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
// step 4: fetch all tableIds
// lookup tables && convert into tableIdList
long tableId = -1;
if (request.isSetTableId()) {
tableId = request.getTableId();
} else if (request.isSetTable()) {
String tableName = request.getTable();
Table table = db.getTableOrMetaException(tableName, TableType.OLAP);
if (table == null) {
throw new UserException("unknown table, table=" + tableName);
}
tableId = table.getId();
}
// step 6: get binlog
long dbId = db.getId();
TGetBinlogLagResult result = new TGetBinlogLagResult();
result.setStatus(new TStatus(TStatusCode.OK));
long prevCommitSeq = request.getPrevCommitSeq();
Pair<TStatus, BinlogLagInfo> binlogLagInfo = env.getBinlogManager().getBinlogLag(dbId, tableId, prevCommitSeq);
TStatus status = binlogLagInfo.first;
if (status != null && status.getStatusCode() != TStatusCode.OK) {
result.setStatus(status);
}
BinlogLagInfo lagInfo = binlogLagInfo.second;
if (lagInfo != null) {
result.setLag(lagInfo.getLag());
result.setFirstCommitSeq(lagInfo.getFirstCommitSeq());
result.setLastCommitSeq(lagInfo.getLastCommitSeq());
result.setFirstBinlogTimestamp(lagInfo.getFirstCommitTs());
result.setLastBinlogTimestamp(lagInfo.getLastCommitTs());
result.setNextCommitSeq(lagInfo.getNextCommitSeq());
result.setNextBinlogTimestamp(lagInfo.getNextCommitTs());
}
return result;
}
public TLockBinlogResult lockBinlog(TLockBinlogRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive lock binlog request: {}", request);
}
TLockBinlogResult result = new TLockBinlogResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to lock binlog: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = lockBinlogImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to lock binlog: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
return result;
}
return result;
}
private TLockBinlogResult lockBinlogImpl(TLockBinlogRequest request, String clientIp) throws UserException {
/// Check all required arg: user, passwd, db, prev_commit_seq
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
if (!request.isSetJobUniqueId()) {
throw new UserException("job_unique_id is not set");
}
// step 1: check auth
if (Strings.isNullOrEmpty(request.getToken())) {
checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(),
request.getTable(), clientIp, PrivPredicate.SELECT);
}
// step 3: check database
Env env = Env.getCurrentEnv();
String fullDbName = request.getDb();
Database db = env.getInternalCatalog().getDbNullable(fullDbName);
if (db == null) {
String dbName = fullDbName;
if (Strings.isNullOrEmpty(request.getCluster())) {
dbName = request.getDb();
}
throw new UserException("unknown database, database=" + dbName);
}
// step 4: fetch all tableIds
// lookup tables && convert into tableIdList
long tableId = -1;
if (request.isSetTableId()) {
tableId = request.getTableId();
} else if (request.isSetTable()) {
String tableName = request.getTable();
Table table = db.getTableOrMetaException(tableName, TableType.OLAP);
if (table == null) {
throw new UserException("unknown table, table=" + tableName);
}
tableId = table.getId();
}
// step 6: lock binlog
long dbId = db.getId();
String jobUniqueId = request.getJobUniqueId();
long lockCommitSeq = -1L;
if (request.isSetLockCommitSeq()) {
lockCommitSeq = request.getLockCommitSeq();
}
Pair<TStatus, Long> statusSeqPair = env.getBinlogManager().lockBinlog(
dbId, tableId, jobUniqueId, lockCommitSeq);
TLockBinlogResult result = new TLockBinlogResult();
result.setStatus(statusSeqPair.first);
result.setLockedCommitSeq(statusSeqPair.second);
return result;
}
@Override
public TStatus updateStatsCache(TUpdateFollowerStatsCacheRequest request) throws TException {
StatisticsCacheKey k = GsonUtils.GSON.fromJson(request.key, StatisticsCacheKey.class);
ColStatsData data = GsonUtils.GSON.fromJson(request.colStatsData, ColStatsData.class);
ColumnStatistic c = data.toColumnStatistic();
if (c == ColumnStatistic.UNKNOWN) {
Env.getCurrentEnv().getStatisticsCache().invalidateColumnStatsCache(k.catalogId, k.dbId, k.tableId,
k.idxId, k.colName);
} else {
Env.getCurrentEnv().getStatisticsCache().updateColStatsCache(
k.catalogId, k.dbId, k.tableId, k.idxId, k.colName, c);
}
// Return Ok anyway
return new TStatus(TStatusCode.OK);
}
@Override
public TStatus updatePlanStatsCache(TUpdatePlanStatsCacheRequest request) throws TException {
PlanNodeAndHash key = GsonUtils.GSON.fromJson(request.key, PlanNodeAndHash.class);
RecentRunsPlanStatistics data = GsonUtils.GSON.fromJson(request.planStatsData, RecentRunsPlanStatistics.class);
Env.getCurrentEnv().getHboPlanStatisticsManager().getHboPlanStatisticsProvider()
.updatePlanStats(key, data);
return new TStatus(TStatusCode.OK);
}
@Override
public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request) throws TException {
InvalidateStatsTarget target = GsonUtils.GSON.fromJson(request.key, InvalidateStatsTarget.class);
AnalysisManager analysisManager = Env.getCurrentEnv().getAnalysisManager();
TableStatsMeta tableStats = analysisManager.findTableStatsStatus(target.tableId);
PartitionNames partitionNames = null;
if (target.partitions != null) {
partitionNames = new PartitionNames(false, new ArrayList<>(target.partitions));
}
if (target.isTruncate) {
TableIf table = StatisticsUtil.findTable(target.catalogId, target.dbId, target.tableId);
analysisManager.submitAsyncDropStatsTask(table, target.catalogId, target.dbId,
target.tableId, tableStats, partitionNames, false);
} else {
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
}
return new TStatus(TStatusCode.OK);
}
@Override
public TCreatePartitionResult createPartition(TCreatePartitionRequest request) throws TException {
LOG.info("Receive create partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
TCreatePartitionResult result = new TCreatePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
if (!Env.getCurrentEnv().isMaster()) {
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG);
return result;
}
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
if (db == null) {
errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
Table table = db.getTable(tableId).get();
if (table == null) {
errorStatus.setErrorMsgs(
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
if (!(table instanceof OlapTable)) {
errorStatus.setErrorMsgs(
Lists.newArrayList(String.format("dbId=%d tableId=%d is not olap table", dbId, tableId)));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
if (request.partitionValues == null) {
errorStatus.setErrorMsgs(Lists.newArrayList("partitionValues should not null."));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
OlapTable olapTable = (OlapTable) table;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
ArrayList<List<TNullableStringLiteral>> partitionValues = new ArrayList<>();
for (int i = 0; i < request.partitionValues.size(); i++) {
if (partitionInfo.getType() == PartitionType.RANGE && request.partitionValues.get(i).size() != 1) {
errorStatus.setErrorMsgs(
Lists.newArrayList(
"Only support single partition of RANGE, partitionValues size should equal 1."));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
partitionValues.add(request.partitionValues.get(i));
}
Map<String, AddPartitionClause> addPartitionClauseMap;
try {
addPartitionClauseMap = PartitionExprUtil.getAddPartitionClauseFromPartitionValues(olapTable,
partitionValues, partitionInfo);
} catch (AnalysisException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
// check partition's number limit.
int partitionNum = olapTable.getPartitionNum() + addPartitionClauseMap.size();
if (partitionNum > Config.max_auto_partition_num) {
String errorMessage = String.format(
"create partition failed. partition numbers %d will exceed limit variable "
+ "max_auto_partition_num %d",
partitionNum, Config.max_auto_partition_num);
LOG.warn(errorMessage);
errorStatus.setErrorMsgs(Lists.newArrayList(errorMessage));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
for (AddPartitionClause addPartitionClause : addPartitionClauseMap.values()) {
try {
// here maybe check and limit created partitions num
Env.getCurrentEnv().addPartition(db, olapTable.getName(), addPartitionClause, false, 0, true);
} catch (DdlException e) {
LOG.warn(e);
errorStatus.setErrorMsgs(
Lists.newArrayList(String.format("create partition failed. error:%s", e.getMessage())));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
}
// build partition & tablets
List<TOlapTablePartition> partitions = Lists.newArrayList();
List<TTabletLocation> tablets = Lists.newArrayList();
List<TTabletLocation> slaveTablets = new ArrayList<>();
for (String partitionName : addPartitionClauseMap.keySet()) {
Partition partition = table.getPartition(partitionName);
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());
int partColNum = partitionInfo.getPartitionColumns().size();
try {
OlapTableSink.setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum);
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
partitions.add(tPartition);
// tablet
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
+ 1;
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
// we should ensure the replica backend is alive
// otherwise, there will be a 'unknown node id, id=xxx' error for stream load
// BE id -> path hash
Multimap<Long, Long> bePathsMap;
try {
if (Config.isCloudMode() && request.isSetBeEndpoint()) {
bePathsMap = ((CloudTablet) tablet)
.getNormalReplicaBackendPathMapCloud(request.be_endpoint);
} else {
bePathsMap = tablet.getNormalReplicaBackendPathMap();
}
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
if (bePathsMap.keySet().size() < quorum) {
LOG.warn("auto go quorum exception");
}
if (request.isSetWriteSingleReplica() && request.isWriteSingleReplica()) {
Long[] nodes = bePathsMap.keySet().toArray(new Long[0]);
Random random = new SecureRandom();
Long masterNode = nodes[random.nextInt(nodes.length)];
Multimap<Long, Long> slaveBePathsMap = bePathsMap;
slaveBePathsMap.removeAll(masterNode);
tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(Sets.newHashSet(masterNode))));
slaveTablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(slaveBePathsMap.keySet())));
} else {
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
}
}
}
}
result.setPartitions(partitions);
result.setTablets(tablets);
result.setSlaveTablets(slaveTablets);
// build nodes
List<TNodeInfo> nodeInfos = Lists.newArrayList();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
for (Long id : systemInfoService.getAllBackendByCurrentCluster(false)) {
Backend backend = systemInfoService.getBackend(id);
nodeInfos.add(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
}
result.setNodes(nodeInfos);
result.setStatus(new TStatus(TStatusCode.OK));
if (LOG.isDebugEnabled()) {
LOG.debug("send create partition result: {}", result);
}
return result;
}
@Override
public TReplacePartitionResult replacePartition(TReplacePartitionRequest request) throws TException {
LOG.info("Receive replace partition request: {}", request);
long dbId = request.getDbId();
long tableId = request.getTableId();
List<Long> reqPartitionIds = request.getPartitionIds();
long taskGroupId = request.getOverwriteGroupId();
TReplacePartitionResult result = new TReplacePartitionResult();
TStatus errorStatus = new TStatus(TStatusCode.RUNTIME_ERROR);
if (!Env.getCurrentEnv().isMaster()) {
errorStatus.setStatusCode(TStatusCode.NOT_MASTER);
errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG);
return result;
}
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(dbId);
if (db == null) {
errorStatus.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
result.setStatus(errorStatus);
LOG.warn("send replace partition error status: {}", result);
return result;
}
Table table = db.getTable(tableId).get();
if (table == null) {
errorStatus.setErrorMsgs(
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
result.setStatus(errorStatus);
LOG.warn("send replace partition error status: {}", result);
return result;
}
if (!(table instanceof OlapTable)) {
errorStatus.setErrorMsgs(
Lists.newArrayList(String.format("dbId=%d tableId=%d is not olap table", dbId, tableId)));
result.setStatus(errorStatus);
LOG.warn("send replace partition error status: {}", result);
return result;
}
OlapTable olapTable = (OlapTable) table;
InsertOverwriteManager overwriteManager = Env.getCurrentEnv().getInsertOverwriteManager();
ReentrantLock taskLock = overwriteManager.getLock(taskGroupId);
if (taskLock == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
ArrayList<Long> resultPartitionIds = new ArrayList<>(); // [1 2 5 6] -> [7 8 5 6]
ArrayList<Long> pendingPartitionIds = new ArrayList<>(); // pending: [1 2]
ArrayList<Long> newPartitionIds = new ArrayList<>(); // requested temp partition ids. for [7 8]
boolean needReplace = false;
try {
taskLock.lock();
// double check lock. maybe taskLock is not null, but has been removed from the Map. means the task failed.
if (overwriteManager.getLock(taskGroupId) == null) {
errorStatus.setErrorMsgs(Lists
.newArrayList(new String("cannot find task group " + taskGroupId + ", maybe already failed.")));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
// we dont lock the table. other thread in this txn will be controled by taskLock.
// if we have already replaced, dont do it again, but acquire the recorded new partition directly.
// if not by this txn, just let it fail naturally is ok.
needReplace = overwriteManager.tryReplacePartitionIds(taskGroupId, reqPartitionIds, resultPartitionIds);
// request: [1 2 3 4] result: [1 2 5 6] means ONLY 1 and 2 need replace.
if (needReplace) {
// names for [1 2]. if another txn dropped origin partitions, we will fail here. return the exception.
// that's reasonable because we want iot-auto-detect txn has as less impact as possible. so only when we
// detected the conflict in this RPC, we will fail the txn. it allows more concurrent transactions.
List<String> pendingPartitionNames = olapTable.getEqualPartitionNames(reqPartitionIds,
resultPartitionIds);
for (String name : pendingPartitionNames) {
pendingPartitionIds.add(olapTable.getPartition(name).getId()); // put [1 2]
}
// names for [7 8]
List<String> newTempNames = InsertOverwriteUtil
.generateTempPartitionNames(pendingPartitionNames);
// a task means one time insert overwrite
long taskId = overwriteManager.registerTask(dbId, tableId, newTempNames);
overwriteManager.registerTaskInGroup(taskGroupId, taskId);
InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, newTempNames);
// now temp partitions are bumped up and use new names. we get their ids and record them.
for (String newPartName : newTempNames) {
newPartitionIds.add(olapTable.getPartition(newPartName).getId()); // put [7 8]
}
overwriteManager.recordPartitionPairs(taskGroupId, pendingPartitionIds, newPartitionIds);
if (LOG.isDebugEnabled()) {
LOG.debug("partition replacement: ");
for (int i = 0; i < pendingPartitionIds.size(); i++) {
LOG.debug("[" + pendingPartitionIds.get(i) + " - " + pendingPartitionNames.get(i) + ", "
+ newPartitionIds.get(i) + " - " + newTempNames.get(i) + "], ");
}
}
}
} catch (DdlException | RuntimeException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
} finally {
taskLock.unlock();
}
// result: [1 2 5 6], make it [7 8 5 6]
int idx = 0;
if (needReplace) {
for (int i = 0; i < reqPartitionIds.size(); i++) {
if (reqPartitionIds.get(i).equals(resultPartitionIds.get(i))) {
resultPartitionIds.set(i, newPartitionIds.get(idx++));
}
}
}
if (idx != newPartitionIds.size()) {
errorStatus.addToErrorMsgs("changed partition number " + idx + " is not correct");
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
if (LOG.isDebugEnabled()) {
LOG.debug("replace partition origin ids: ["
+ String.join(", ", reqPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
LOG.debug("replace partition result ids: ["
+ String.join(", ", resultPartitionIds.stream().map(String::valueOf).collect(Collectors.toList()))
+ ']');
}
// build partition & tablets. now all partitions in allReqPartNames are replaced an recorded.
// so they won't be changed again. if other transaction changing it. just let it fail.
List<TOlapTablePartition> partitions = new ArrayList<>();
List<TTabletLocation> tablets = new ArrayList<>();
List<TTabletLocation> slaveTablets = new ArrayList<>();
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (long partitionId : resultPartitionIds) {
Partition partition = olapTable.getPartition(partitionId);
TOlapTablePartition tPartition = new TOlapTablePartition();
tPartition.setId(partition.getId());
// set partition keys
int partColNum = partitionInfo.getPartitionColumns().size();
try {
OlapTableSink.setPartitionKeys(tPartition, partitionInfo.getItem(partition.getId()), partColNum);
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send replace partition error status: {}", result);
return result;
}
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
tPartition.addToIndexes(new TOlapTableIndexTablets(index.getId(), Lists.newArrayList(
index.getTablets().stream().map(Tablet::getId).collect(Collectors.toList()))));
tPartition.setNumBuckets(index.getTablets().size());
}
tPartition.setIsMutable(olapTable.getPartitionInfo().getIsMutable(partition.getId()));
partitions.add(tPartition);
// tablet
int quorum = olapTable.getPartitionInfo().getReplicaAllocation(partition.getId()).getTotalReplicaNum() / 2
+ 1;
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
// we should ensure the replica backend is alive
// otherwise, there will be a 'unknown node id, id=xxx' error for stream load
// BE id -> path hash
Multimap<Long, Long> bePathsMap;
try {
if (Config.isCloudMode() && request.isSetBeEndpoint()) {
bePathsMap = ((CloudTablet) tablet)
.getNormalReplicaBackendPathMapCloud(request.be_endpoint);
} else {
bePathsMap = tablet.getNormalReplicaBackendPathMap();
}
} catch (UserException ex) {
errorStatus.setErrorMsgs(Lists.newArrayList(ex.getMessage()));
result.setStatus(errorStatus);
LOG.warn("send create partition error status: {}", result);
return result;
}
if (bePathsMap.keySet().size() < quorum) {
LOG.warn("auto go quorum exception");
}
if (request.isSetWriteSingleReplica() && request.isWriteSingleReplica()) {
Long[] nodes = bePathsMap.keySet().toArray(new Long[0]);
Random random = new SecureRandom();
Long masterNode = nodes[random.nextInt(nodes.length)];
Multimap<Long, Long> slaveBePathsMap = bePathsMap;
slaveBePathsMap.removeAll(masterNode);
tablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(Sets.newHashSet(masterNode))));
slaveTablets.add(new TTabletLocation(tablet.getId(),
Lists.newArrayList(slaveBePathsMap.keySet())));
} else {
tablets.add(new TTabletLocation(tablet.getId(), Lists.newArrayList(bePathsMap.keySet())));
}
}
}
}
result.setPartitions(partitions);
result.setTablets(tablets);
result.setSlaveTablets(slaveTablets);
// build nodes
List<TNodeInfo> nodeInfos = Lists.newArrayList();
SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
for (Long id : systemInfoService.getAllBackendByCurrentCluster(false)) {
Backend backend = systemInfoService.getBackend(id);
nodeInfos.add(new TNodeInfo(backend.getId(), 0, backend.getHost(), backend.getBrpcPort()));
}
result.setNodes(nodeInfos);
// successfully return
result.setStatus(new TStatus(TStatusCode.OK));
if (LOG.isDebugEnabled()) {
LOG.debug("send replace partition result: {}", result);
}
return result;
}
public TGetMetaResult getMeta(TGetMetaRequest request) throws TException {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get meta request: {}", request);
}
TGetMetaResult result = new TGetMetaResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = getMetaImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get meta: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
}
return result;
}
private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp)
throws Exception {
// Step 1: check fields
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
if (!request.isSetDb()) {
throw new UserException("db is not set");
}
// Step 2: check auth
TGetMetaResult result = new TGetMetaResult();
result.setStatus(new TStatus(TStatusCode.OK));
Database db = null;
List<Table> tables = null;
if (Strings.isNullOrEmpty(request.getToken())) {
TGetMetaDB getMetaDb = request.getDb();
if (getMetaDb.isSetId()) {
db = Env.getCurrentInternalCatalog().getDbNullable(getMetaDb.getId());
} else if (getMetaDb.isSetName()) {
db = Env.getCurrentInternalCatalog().getDbNullable(getMetaDb.getName());
}
if (db == null) {
LOG.warn("db not found {}", getMetaDb);
return result;
}
if (getMetaDb.isSetTables()) {
tables = Lists.newArrayList();
List<TGetMetaTable> getMetaTables = getMetaDb.getTables();
for (TGetMetaTable getMetaTable : getMetaTables) {
Table table = null;
if (getMetaTable.isSetId()) {
table = db.getTableNullable(getMetaTable.getId());
} else {
table = db.getTableNullable(getMetaTable.getName());
}
if (table == null) {
// Since Database.getTableNullable is lock-free, we need to take lock and check again,
// to ensure the visibility of the table.
db.readLock();
try {
if (getMetaTable.isSetId()) {
table = db.getTableNullable(getMetaTable.getId());
} else {
table = db.getTableNullable(getMetaTable.getName());
}
} finally {
db.readUnlock();
}
}
if (table == null) {
LOG.warn("table not found {}", getMetaTable);
continue;
}
tables.add(table);
}
}
if (tables == null) {
checkDbPasswordAndPrivs(request.getUser(), request.getPasswd(), db.getFullName(), clientIp,
PrivPredicate.SELECT);
} else {
List<String> tableList = Lists.newArrayList();
for (Table table : tables) {
tableList.add(table.getName());
}
checkPasswordAndPrivs(request.getUser(), request.getPasswd(), db.getFullName(), tableList,
clientIp,
PrivPredicate.SELECT);
}
}
// Step 3: get meta
try {
return Env.getMeta(db, tables);
} catch (Throwable e) {
throw e;
}
}
@Override
public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) {
TGetColumnInfoResult result = new TGetColumnInfoResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
long dbId = request.getDbId();
long tableId = request.getTableId();
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
LOG.error("failed to getColumnInfo: {}", NOT_MASTER_ERR_MSG);
return result;
}
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
status.setStatusCode(TStatusCode.NOT_FOUND);
status.setErrorMsgs(Lists.newArrayList(String.format("dbId=%d is not exists", dbId)));
return result;
}
Table table = db.getTableNullable(tableId);
if (table == null) {
status.setStatusCode(TStatusCode.NOT_FOUND);
status.setErrorMsgs(
(Lists.newArrayList(String.format("dbId=%d tableId=%d is not exists", dbId, tableId))));
return result;
}
List<TColumnInfo> columnsResult = Lists.newArrayList();
for (Column column : table.getBaseSchema(true)) {
final TColumnInfo info = new TColumnInfo();
info.setColumnName(column.getName());
info.setColumnId(column.getUniqueId());
columnsResult.add(info);
}
result.setColumns(columnsResult);
return result;
}
public TGetBackendMetaResult getBackendMeta(TGetBackendMetaRequest request) {
String clientAddr = getClientAddrAsString();
if (LOG.isDebugEnabled()) {
LOG.debug("receive get backend meta request: {}", request);
}
TGetBackendMetaResult result = new TGetBackendMetaResult();
TStatus status = new TStatus(TStatusCode.OK);
result.setStatus(status);
if (!Env.getCurrentEnv().isMaster()) {
status.setStatusCode(TStatusCode.NOT_MASTER);
status.addToErrorMsgs(NOT_MASTER_ERR_MSG);
result.setMasterAddress(getMasterAddress());
LOG.error("failed to get beginTxn: {}", NOT_MASTER_ERR_MSG);
return result;
}
try {
result = getBackendMetaImpl(request, clientAddr);
} catch (UserException e) {
LOG.warn("failed to get backend meta: {}", e.getMessage());
status.setStatusCode(TStatusCode.ANALYSIS_ERROR);
status.addToErrorMsgs(e.getMessage());
} catch (Throwable e) {
LOG.warn("catch unknown result.", e);
status.setStatusCode(TStatusCode.INTERNAL_ERROR);
status.addToErrorMsgs(Strings.nullToEmpty(e.getMessage()));
}
return result;
}
private TGetBackendMetaResult getBackendMetaImpl(TGetBackendMetaRequest request, String clientAddr)
throws UserException {
// Step 1: Check fields
if (!request.isSetUser()) {
throw new UserException("user is not set");
}
if (!request.isSetPasswd()) {
throw new UserException("passwd is not set");
}
// Step 2: check auth
checkPassword(request.getUser(), request.getPasswd(), clientAddr);
// TODO: check getBackendMeta privilege, which privilege should be checked?
// Step 3: get meta
try {
TGetBackendMetaResult result = new TGetBackendMetaResult();
result.setStatus(new TStatus(TStatusCode.OK));
final SystemInfoService systemInfoService = Env.getCurrentSystemInfo();
List<Backend> backends = systemInfoService.getAllBackendsByAllCluster().values().asList();
for (Backend backend : backends) {
TBackend tBackend = new TBackend();
tBackend.setId(backend.getId());
tBackend.setHost(backend.getHost());
tBackend.setHttpPort(backend.getHttpPort());
tBackend.setBrpcPort(backend.getBrpcPort());
tBackend.setBePort(backend.getBePort());
tBackend.setIsAlive(backend.isAlive());
result.addToBackends(tBackend);
}
return result;
} catch (Throwable e) {
throw e;
}
}
class TableStats {
public long updatedRowCount;
}
public TStatus reportCommitTxnResult(TReportCommitTxnResultRequest request) throws TException {
String clientAddr = getClientAddrAsString();
// FE only has one master, this should not be a problem
if (!Env.getCurrentEnv().isMaster()) {
LOG.error("failed to handle load stats report: not master, backend:{}",
clientAddr);
return new TStatus(TStatusCode.NOT_MASTER);
}
LOG.info("receive load stats report request: {}, backend: {}, dbId: {}, txnId: {}, label: {}",
request, clientAddr, request.getDbId(), request.getTxnId(), request.getLabel());
try {
byte[] receivedProtobufBytes = request.getPayload();
if (receivedProtobufBytes == null || receivedProtobufBytes.length <= 0) {
return new TStatus(TStatusCode.INVALID_ARGUMENT);
}
CommitTxnResponse commitTxnResponse = CommitTxnResponse.parseFrom(receivedProtobufBytes);
Env.getCurrentGlobalTransactionMgr().afterCommitTxnResp(commitTxnResponse);
} catch (InvalidProtocolBufferException e) {
// Handle the exception, log it, or take appropriate action
e.printStackTrace();
}
return new TStatus(TStatusCode.OK);
}
@Override
public TShowProcessListResult showProcessList(TShowProcessListRequest request) {
boolean isShowFullSql = false;
if (request.isSetShowFullSql()) {
isShowFullSql = request.isShowFullSql();
}
UserIdentity userIdentity = UserIdentity.ROOT;
if (request.isSetCurrentUserIdent()) {
userIdentity = UserIdentity.fromThrift(request.getCurrentUserIdent());
}
String timeZone = VariableMgr.getDefaultSessionVariable().getTimeZone();
if (request.isSetTimeZone()) {
timeZone = request.getTimeZone();
}
List<List<String>> processList = ExecuteEnv.getInstance().getScheduler()
.listConnectionForRpc(userIdentity, isShowFullSql, Optional.of(timeZone));
TShowProcessListResult result = new TShowProcessListResult();
result.setProcessList(processList);
return result;
}
@Override
public TShowUserResult showUser(TShowUserRequest request) {
List<List<String>> userInfo = Env.getCurrentEnv().getAuth().getAllUserInfo();
TShowUserResult result = new TShowUserResult();
result.setUserinfoList(userInfo);
return result;
}
public TStatus syncQueryColumns(TSyncQueryColumns request) throws TException {
Env.getCurrentEnv().getAnalysisManager().mergeFollowerQueryColumns(request.highPriorityColumns,
request.midPriorityColumns);
return new TStatus(TStatusCode.OK);
}
@Override
public TFetchRunningQueriesResult fetchRunningQueries(TFetchRunningQueriesRequest request) {
TFetchRunningQueriesResult result = new TFetchRunningQueriesResult();
if (!Env.getCurrentEnv().isReady()) {
result.setStatus(new TStatus(TStatusCode.ILLEGAL_STATE));
return result;
}
List<TUniqueId> runningQueries = Lists.newArrayList();
List<Coordinator> allCoordinators = QeProcessorImpl.INSTANCE.getAllCoordinators();
for (Coordinator coordinator : allCoordinators) {
runningQueries.add(coordinator.getQueryId());
}
result.setStatus(new TStatus(TStatusCode.OK));
result.setRunningQueries(runningQueries);
return result;
}
@Override
public TFetchRoutineLoadJobResult fetchRoutineLoadJob(TFetchRoutineLoadJobRequest request) {
TFetchRoutineLoadJobResult result = new TFetchRoutineLoadJobResult();
if (!Env.getCurrentEnv().isReady()) {
return result;
}
RoutineLoadManager routineLoadManager = Env.getCurrentEnv().getRoutineLoadManager();
List<TRoutineLoadJob> jobInfos = Lists.newArrayList();
List<RoutineLoadJob> routineLoadJobs = routineLoadManager.getAllRoutineLoadJobs();
for (RoutineLoadJob job : routineLoadJobs) {
TRoutineLoadJob jobInfo = new TRoutineLoadJob();
jobInfo.setJobId(String.valueOf(job.getId()));
jobInfo.setJobName(job.getName());
jobInfo.setCreateTime(job.getCreateTimestampString());
jobInfo.setPauseTime(job.getPauseTimestampString());
jobInfo.setEndTime(job.getEndTimestampString());
String dbName = "";
String tableName = "";
try {
dbName = job.getDbFullName();
tableName = job.getTableName();
} catch (MetaNotFoundException e) {
LOG.warn("Failed to get db or table name for routine load job: {}", job.getId(), e);
}
jobInfo.setDbName(dbName);
jobInfo.setTableName(tableName);
jobInfo.setState(job.getState().name());
jobInfo.setCurrentTaskNum(String.valueOf(job.getSizeOfRoutineLoadTaskInfoList()));
jobInfo.setJobProperties(job.jobPropertiesToJsonString());
jobInfo.setDataSourceProperties(job.dataSourcePropertiesJsonToString());
jobInfo.setCustomProperties(job.customPropertiesJsonToString());
jobInfo.setStatistic(job.getStatistic());
jobInfo.setProgress(job.getProgress().toJsonString());
jobInfo.setLag(job.getLag());
jobInfo.setReasonOfStateChanged(job.getStateReason());
jobInfo.setErrorLogUrls(Joiner.on(", ").join(job.getErrorLogUrls()));
jobInfo.setUserName(job.getUserIdentity().getQualifiedUser());
jobInfo.setCurrentAbortTaskNum(job.getJobStatistic().currentAbortedTaskNum);
jobInfo.setIsAbnormalPause(job.isAbnormalPause());
jobInfos.add(jobInfo);
}
if (LOG.isDebugEnabled()) {
LOG.debug("routine load job infos: {}", jobInfos);
}
result.setRoutineLoadJobs(jobInfos);
return result;
}
}