Env.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.catalog;
import org.apache.doris.alter.Alter;
import org.apache.doris.alter.AlterJobV2;
import org.apache.doris.alter.AlterJobV2.JobType;
import org.apache.doris.alter.MaterializedViewHandler;
import org.apache.doris.alter.QuotaType;
import org.apache.doris.alter.SchemaChangeHandler;
import org.apache.doris.alter.SystemHandler;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.AddPartitionLikeClause;
import org.apache.doris.analysis.AdminCheckTabletsStmt;
import org.apache.doris.analysis.AdminCheckTabletsStmt.CheckType;
import org.apache.doris.analysis.AdminCleanTrashStmt;
import org.apache.doris.analysis.AdminCompactTableStmt;
import org.apache.doris.analysis.AdminSetConfigStmt;
import org.apache.doris.analysis.AdminSetPartitionVersionStmt;
import org.apache.doris.analysis.AdminSetReplicaStatusStmt;
import org.apache.doris.analysis.AdminSetReplicaVersionStmt;
import org.apache.doris.analysis.AdminSetTableStatusStmt;
import org.apache.doris.analysis.AlterDatabasePropertyStmt;
import org.apache.doris.analysis.AlterDatabaseQuotaStmt;
import org.apache.doris.analysis.AlterDatabaseRename;
import org.apache.doris.analysis.AlterMultiPartitionClause;
import org.apache.doris.analysis.AlterSystemStmt;
import org.apache.doris.analysis.AlterTableStmt;
import org.apache.doris.analysis.AlterViewStmt;
import org.apache.doris.analysis.BackupStmt;
import org.apache.doris.analysis.CancelAlterSystemStmt;
import org.apache.doris.analysis.CancelAlterTableStmt;
import org.apache.doris.analysis.CancelBackupStmt;
import org.apache.doris.analysis.ColumnRenameClause;
import org.apache.doris.analysis.CreateDbStmt;
import org.apache.doris.analysis.CreateFunctionStmt;
import org.apache.doris.analysis.CreateMaterializedViewStmt;
import org.apache.doris.analysis.CreateTableAsSelectStmt;
import org.apache.doris.analysis.CreateTableLikeStmt;
import org.apache.doris.analysis.CreateTableStmt;
import org.apache.doris.analysis.CreateViewStmt;
import org.apache.doris.analysis.DdlStmt;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropDbStmt;
import org.apache.doris.analysis.DropFunctionStmt;
import org.apache.doris.analysis.DropMaterializedViewStmt;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.DropTableStmt;
import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.FunctionName;
import org.apache.doris.analysis.InstallPluginStmt;
import org.apache.doris.analysis.ModifyDistributionClause;
import org.apache.doris.analysis.PartitionRenameClause;
import org.apache.doris.analysis.RecoverDbStmt;
import org.apache.doris.analysis.RecoverPartitionStmt;
import org.apache.doris.analysis.RecoverTableStmt;
import org.apache.doris.analysis.ReplacePartitionClause;
import org.apache.doris.analysis.RestoreStmt;
import org.apache.doris.analysis.RollupRenameClause;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.ShowAlterStmt.AlterType;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableRenameClause;
import org.apache.doris.analysis.TruncateTableStmt;
import org.apache.doris.analysis.UninstallPluginStmt;
import org.apache.doris.backup.BackupHandler;
import org.apache.doris.backup.RestoreJob;
import org.apache.doris.binlog.BinlogGcer;
import org.apache.doris.binlog.BinlogManager;
import org.apache.doris.blockrule.SqlBlockRuleMgr;
import org.apache.doris.catalog.ColocateTableIndex.GroupId;
import org.apache.doris.catalog.DistributionInfo.DistributionInfoType;
import org.apache.doris.catalog.MaterializedIndex.IndexExtState;
import org.apache.doris.catalog.MetaIdGenerator.IdGeneratorBuffer;
import org.apache.doris.catalog.OlapTable.OlapTableState;
import org.apache.doris.catalog.Replica.ReplicaStatus;
import org.apache.doris.catalog.TableIf.TableType;
import org.apache.doris.clone.ColocateTableCheckerAndBalancer;
import org.apache.doris.clone.DynamicPartitionScheduler;
import org.apache.doris.clone.TabletChecker;
import org.apache.doris.clone.TabletScheduler;
import org.apache.doris.clone.TabletSchedulerStat;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase;
import org.apache.doris.common.ConfigException;
import org.apache.doris.common.DNSCache;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.ErrorCode;
import org.apache.doris.common.ErrorReport;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.FeMetaVersion;
import org.apache.doris.common.LogUtils;
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.common.UserException;
import org.apache.doris.common.cache.NereidsSortedPartitionsCacheManager;
import org.apache.doris.common.cache.NereidsSqlCacheManager;
import org.apache.doris.common.io.CountingDataOutputStream;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.lock.MonitoredReentrantLock;
import org.apache.doris.common.publish.TopicPublisher;
import org.apache.doris.common.publish.TopicPublisherThread;
import org.apache.doris.common.publish.WorkloadGroupPublisher;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.HttpURLUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.MetaLockUtils;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.SmallFileMgr;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.common.util.Util;
import org.apache.doris.consistency.ConsistencyChecker;
import org.apache.doris.cooldown.CooldownConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.CatalogMgr;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.ExternalMetaCacheMgr;
import org.apache.doris.datasource.ExternalMetaIdMgr;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.datasource.SplitSourceManager;
import org.apache.doris.datasource.es.EsExternalCatalog;
import org.apache.doris.datasource.es.EsRepository;
import org.apache.doris.datasource.hive.HiveTransactionMgr;
import org.apache.doris.datasource.hive.event.MetastoreEventsProcessor;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.deploy.DeployManager;
import org.apache.doris.deploy.impl.AmbariDeployManager;
import org.apache.doris.deploy.impl.K8sDeployManager;
import org.apache.doris.deploy.impl.LocalFileDeployManager;
import org.apache.doris.dictionary.DictionaryManager;
import org.apache.doris.event.EventProcessor;
import org.apache.doris.event.ReplacePartitionEvent;
import org.apache.doris.ha.BDBHA;
import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.ha.HAProtocol;
import org.apache.doris.ha.MasterInfo;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.meta.MetaBaseAction;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.insertoverwrite.InsertOverwriteManager;
import org.apache.doris.job.base.AbstractJob;
import org.apache.doris.job.extensions.mtmv.MTMVTask;
import org.apache.doris.job.manager.JobManager;
import org.apache.doris.journal.JournalCursor;
import org.apache.doris.journal.JournalEntity;
import org.apache.doris.journal.bdbje.Timestamp;
import org.apache.doris.load.DeleteHandler;
import org.apache.doris.load.ExportJob;
import org.apache.doris.load.ExportJobState;
import org.apache.doris.load.ExportMgr;
import org.apache.doris.load.GroupCommitManager;
import org.apache.doris.load.Load;
import org.apache.doris.load.StreamLoadRecordMgr;
import org.apache.doris.load.loadv2.LoadEtlChecker;
import org.apache.doris.load.loadv2.LoadJobScheduler;
import org.apache.doris.load.loadv2.LoadLoadingChecker;
import org.apache.doris.load.loadv2.LoadManager;
import org.apache.doris.load.loadv2.LoadManagerAdapter;
import org.apache.doris.load.loadv2.LoadTask;
import org.apache.doris.load.loadv2.ProgressManager;
import org.apache.doris.load.routineload.RoutineLoadManager;
import org.apache.doris.load.routineload.RoutineLoadScheduler;
import org.apache.doris.load.routineload.RoutineLoadTaskScheduler;
import org.apache.doris.load.sync.SyncChecker;
import org.apache.doris.load.sync.SyncJobManager;
import org.apache.doris.master.Checkpoint;
import org.apache.doris.master.MetaHelper;
import org.apache.doris.master.PartitionInfoCollector;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.metric.MetricRepo;
import org.apache.doris.mtmv.MTMVAlterOpType;
import org.apache.doris.mtmv.MTMVPartitionInfo;
import org.apache.doris.mtmv.MTMVPartitionInfo.MTMVPartitionType;
import org.apache.doris.mtmv.MTMVRefreshPartitionSnapshot;
import org.apache.doris.mtmv.MTMVRelation;
import org.apache.doris.mtmv.MTMVService;
import org.apache.doris.mtmv.MTMVStatus;
import org.apache.doris.mtmv.MTMVUtil;
import org.apache.doris.mysql.authenticate.AuthenticateType;
import org.apache.doris.mysql.authenticate.AuthenticatorManager;
import org.apache.doris.mysql.privilege.AccessControllerManager;
import org.apache.doris.mysql.privilege.Auth;
import org.apache.doris.mysql.privilege.PrivPredicate;
import org.apache.doris.nereids.jobs.load.LabelProcessor;
import org.apache.doris.nereids.stats.HboPlanStatisticsManager;
import org.apache.doris.nereids.trees.plans.commands.AdminSetFrontendConfigCommand;
import org.apache.doris.nereids.trees.plans.commands.AdminSetReplicaStatusCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterSystemCommand;
import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.AnalyzeCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelAlterTableCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelBackupCommand;
import org.apache.doris.nereids.trees.plans.commands.CancelBuildIndexCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.nereids.trees.plans.commands.CreateMaterializedViewCommand;
import org.apache.doris.nereids.trees.plans.commands.DropCatalogRecycleBinCommand.IdType;
import org.apache.doris.nereids.trees.plans.commands.TruncateTableCommand;
import org.apache.doris.nereids.trees.plans.commands.UninstallPluginCommand;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVPropertyInfo;
import org.apache.doris.nereids.trees.plans.commands.info.AlterMTMVRefreshInfo;
import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
import org.apache.doris.persist.AlterMTMV;
import org.apache.doris.persist.AutoIncrementIdUpdateLog;
import org.apache.doris.persist.BackendReplicasInfo;
import org.apache.doris.persist.BackendTabletsInfo;
import org.apache.doris.persist.BinlogGcInfo;
import org.apache.doris.persist.CleanQueryStatsInfo;
import org.apache.doris.persist.CreateDbInfo;
import org.apache.doris.persist.CreateTableInfo;
import org.apache.doris.persist.DropDbInfo;
import org.apache.doris.persist.DropPartitionInfo;
import org.apache.doris.persist.EditLog;
import org.apache.doris.persist.GlobalVarPersistInfo;
import org.apache.doris.persist.ModifyPartitionInfo;
import org.apache.doris.persist.ModifyTableDefaultDistributionBucketNumOperationLog;
import org.apache.doris.persist.ModifyTablePropertyOperationLog;
import org.apache.doris.persist.OperationType;
import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.persist.RecoverInfo;
import org.apache.doris.persist.RefreshExternalTableInfo;
import org.apache.doris.persist.ReplacePartitionOperationLog;
import org.apache.doris.persist.ReplicaPersistInfo;
import org.apache.doris.persist.SetPartitionVersionOperationLog;
import org.apache.doris.persist.SetReplicaStatusOperationLog;
import org.apache.doris.persist.SetReplicaVersionOperationLog;
import org.apache.doris.persist.SetTableStatusOperationLog;
import org.apache.doris.persist.Storage;
import org.apache.doris.persist.StorageInfo;
import org.apache.doris.persist.TableInfo;
import org.apache.doris.persist.TablePropertyInfo;
import org.apache.doris.persist.TableRenameColumnInfo;
import org.apache.doris.persist.TruncateTableInfo;
import org.apache.doris.persist.meta.MetaHeader;
import org.apache.doris.persist.meta.MetaReader;
import org.apache.doris.persist.meta.MetaWriter;
import org.apache.doris.planner.TabletLoadIndexRecorderMgr;
import org.apache.doris.plsql.metastore.PlsqlManager;
import org.apache.doris.plugin.PluginInfo;
import org.apache.doris.plugin.PluginMgr;
import org.apache.doris.policy.PolicyMgr;
import org.apache.doris.qe.AuditEventProcessor;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.FEOpExecutor;
import org.apache.doris.qe.GlobalVariable;
import org.apache.doris.qe.JournalObservable;
import org.apache.doris.qe.QueryCancelWorker;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.resource.AdmissionControl;
import org.apache.doris.resource.Tag;
import org.apache.doris.resource.computegroup.ComputeGroupMgr;
import org.apache.doris.resource.workloadgroup.WorkloadGroupChecker;
import org.apache.doris.resource.workloadgroup.WorkloadGroupMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadRuntimeStatusMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyMgr;
import org.apache.doris.resource.workloadschedpolicy.WorkloadSchedPolicyPublisher;
import org.apache.doris.scheduler.manager.TransientTaskManager;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.statistics.AnalysisManager;
import org.apache.doris.statistics.FollowerColumnSender;
import org.apache.doris.statistics.StatisticsAutoCollector;
import org.apache.doris.statistics.StatisticsCache;
import org.apache.doris.statistics.StatisticsCleaner;
import org.apache.doris.statistics.StatisticsJobAppender;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.HeartbeatMgr;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.system.SystemInfoService.HostInfo;
import org.apache.doris.task.AgentBatchTask;
import org.apache.doris.task.AgentTaskExecutor;
import org.apache.doris.task.CleanTrashTask;
import org.apache.doris.task.CleanUDFCacheTask;
import org.apache.doris.task.CompactionTask;
import org.apache.doris.task.MasterTaskExecutor;
import org.apache.doris.task.PriorityMasterTaskExecutor;
import org.apache.doris.thrift.TCompressionType;
import org.apache.doris.thrift.TFrontendInfo;
import org.apache.doris.thrift.TGetMetaDBMeta;
import org.apache.doris.thrift.TGetMetaIndexMeta;
import org.apache.doris.thrift.TGetMetaPartitionMeta;
import org.apache.doris.thrift.TGetMetaReplicaMeta;
import org.apache.doris.thrift.TGetMetaResult;
import org.apache.doris.thrift.TGetMetaTableMeta;
import org.apache.doris.thrift.TGetMetaTabletMeta;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatus;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TStorageMedium;
import org.apache.doris.transaction.DbUsedDataQuotaInfoCollector;
import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr;
import org.apache.doris.transaction.GlobalTransactionMgrIface;
import org.apache.doris.transaction.PublishVersionDaemon;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Queues;
import com.sleepycat.je.rep.InsufficientLogException;
import com.sleepycat.je.rep.NetworkRestore;
import com.sleepycat.je.rep.NetworkRestoreConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* A singleton class can also be seen as an entry point of Doris.
* All manager classes can be obtained through this class.
*/
public class Env {
private static final Logger LOG = LogManager.getLogger(Env.class);
// 0 ~ 9999 used for qe
public static final long NEXT_ID_INIT_VALUE = 10000;
private static final int HTTP_TIMEOUT_SECOND = Config.sync_image_timeout_second;
private static final int STATE_CHANGE_CHECK_INTERVAL_MS = 100;
private static final int REPLAY_INTERVAL_MS = 1;
private static final String BDB_DIR = "/bdb";
public static final String IMAGE_DIR = "/image";
public static final String CLIENT_NODE_HOST_KEY = "CLIENT_NODE_HOST";
public static final String CLIENT_NODE_PORT_KEY = "CLIENT_NODE_PORT";
private String metaDir;
private String bdbDir;
protected String imageDir;
private MetaContext metaContext;
private long epoch = 0;
// Lock to perform atomic modification on map like 'idToDb' and 'fullNameToDb'.
// These maps are all thread safe, we only use lock to perform atomic operations.
// Operations like Get or Put do not need lock.
// We use fair ReentrantLock to avoid starvation. Do not use this lock in critical code pass
// because fair lock has poor performance.
// Using QueryableReentrantLock to print owner thread in debug mode.
private MonitoredReentrantLock lock;
private CatalogMgr catalogMgr;
private GlobalFunctionMgr globalFunctionMgr;
private Load load;
protected LoadManager loadManager;
private ProgressManager progressManager;
private StreamLoadRecordMgr streamLoadRecordMgr;
private TabletLoadIndexRecorderMgr tabletLoadIndexRecorderMgr;
private RoutineLoadManager routineLoadManager;
private GroupCommitManager groupCommitManager;
private SqlBlockRuleMgr sqlBlockRuleMgr;
private ExportMgr exportMgr;
private SyncJobManager syncJobManager;
private Alter alter;
private ConsistencyChecker consistencyChecker;
private BackupHandler backupHandler;
private PublishVersionDaemon publishVersionDaemon;
private DeleteHandler deleteHandler;
private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector;
private PartitionInfoCollector partitionInfoCollector;
private CooldownConfHandler cooldownConfHandler;
private ExternalMetaIdMgr externalMetaIdMgr;
private MetastoreEventsProcessor metastoreEventsProcessor;
private JobManager<? extends AbstractJob<?, ?>, ?> jobManager;
private LabelProcessor labelProcessor;
private TransientTaskManager transientTaskManager;
private MasterDaemon labelCleaner; // To clean old LabelInfo, ExportJobInfos
private MasterDaemon txnCleaner; // To clean aborted or timeout txns
private Daemon feDiskUpdater; // Update fe disk info
private Daemon replayer;
private Daemon timePrinter;
private Daemon listener;
private ColumnIdFlushDaemon columnIdFlusher;
protected boolean isFirstTimeStartUp = false;
protected boolean isElectable;
// set to true after finished replay all meta and ready to serve
// set to false when catalog is not ready.
private AtomicBoolean isReady = new AtomicBoolean(false);
// set to true after http server start
private AtomicBoolean httpReady = new AtomicBoolean(false);
// set to true if FE can offer READ service.
// canRead can be true even if isReady is false.
// for example: OBSERVER transfer to UNKNOWN, then isReady will be set to false, but canRead can still be true
private AtomicBoolean canRead = new AtomicBoolean(false);
private String toMasterProgress = "";
private BlockingQueue<FrontendNodeType> typeTransferQueue;
// node name is used for bdbje NodeName.
protected String nodeName;
protected FrontendNodeType role;
protected FrontendNodeType feType;
// replica and observer use this value to decide provide read service or not
private long synchronizedTimeMs;
private MasterInfo masterInfo;
private MetaIdGenerator idGenerator = new MetaIdGenerator(NEXT_ID_INIT_VALUE);
private EditLog editLog;
protected int clusterId;
protected String token;
// For checkpoint and observer memory replayed marker
private AtomicLong replayedJournalId;
private static Env CHECKPOINT = null;
private static long checkpointThreadId = -1;
private Checkpoint checkpointer;
protected List<HostInfo> helperNodes = Lists.newArrayList();
protected HostInfo selfNode = null;
// node name -> Frontend
protected ConcurrentHashMap<String, Frontend> frontends;
// removed frontends' name. used for checking if name is duplicated in bdbje
private ConcurrentLinkedQueue<String> removedFrontends;
private HAProtocol haProtocol = null;
private JournalObservable journalObservable;
protected SystemInfoService systemInfo;
private HeartbeatMgr heartbeatMgr;
private FESessionMgr feSessionMgr;
private TemporaryTableMgr temporaryTableMgr;
// alive session of current fe
private Set<String> aliveSessionSet;
private TabletInvertedIndex tabletInvertedIndex;
private ColocateTableIndex colocateTableIndex;
private CatalogRecycleBin recycleBin;
private FunctionSet functionSet;
// for nereids
private FunctionRegistry functionRegistry;
private MetaReplayState metaReplayState;
private BrokerMgr brokerMgr;
private ResourceMgr resourceMgr;
private StorageVaultMgr storageVaultMgr;
private GlobalTransactionMgrIface globalTransactionMgr;
private DeployManager deployManager;
private MasterDaemon tabletStatMgr;
private Auth auth;
private AccessControllerManager accessManager;
private AuthenticatorManager authenticatorManager;
private DomainResolver domainResolver;
private TabletSchedulerStat stat;
private TabletScheduler tabletScheduler;
private TabletChecker tabletChecker;
// Thread pools for pending and loading task, separately
private MasterTaskExecutor pendingLoadTaskScheduler;
private PriorityMasterTaskExecutor<LoadTask> loadingLoadTaskScheduler;
protected LoadJobScheduler loadJobScheduler;
private LoadEtlChecker loadEtlChecker;
private LoadLoadingChecker loadLoadingChecker;
private RoutineLoadScheduler routineLoadScheduler;
private RoutineLoadTaskScheduler routineLoadTaskScheduler;
private SyncChecker syncChecker;
private SmallFileMgr smallFileMgr;
private DynamicPartitionScheduler dynamicPartitionScheduler;
private PluginMgr pluginMgr;
private AuditEventProcessor auditEventProcessor;
private RefreshManager refreshManager;
private PolicyMgr policyMgr;
private AnalysisManager analysisManager;
private HboPlanStatisticsManager hboPlanStatisticsManager;
private ExternalMetaCacheMgr extMetaCacheMgr;
private AtomicLong stmtIdCounter;
private WorkloadGroupMgr workloadGroupMgr;
private ComputeGroupMgr computeGroupMgr;
private WorkloadSchedPolicyMgr workloadSchedPolicyMgr;
private WorkloadRuntimeStatusMgr workloadRuntimeStatusMgr;
private AdmissionControl admissionControl;
private QueryStats queryStats;
private StatisticsCleaner statisticsCleaner;
private PlsqlManager plsqlManager;
private BinlogManager binlogManager;
private BinlogGcer binlogGcer;
private QueryCancelWorker queryCancelWorker;
/**
* TODO(tsy): to be removed after load refactor
*/
private final LoadManagerAdapter loadManagerAdapter;
private StatisticsAutoCollector statisticsAutoCollector;
private StatisticsJobAppender statisticsJobAppender;
private FollowerColumnSender followerColumnSender;
private HiveTransactionMgr hiveTransactionMgr;
private TopicPublisherThread topicPublisherThread;
private WorkloadGroupChecker workloadGroupCheckerThread;
private MTMVService mtmvService;
private EventProcessor eventProcessor;
private InsertOverwriteManager insertOverwriteManager;
private DNSCache dnsCache;
private final NereidsSqlCacheManager sqlCacheManager;
private final NereidsSortedPartitionsCacheManager sortedPartitionsCacheManager;
private final SplitSourceManager splitSourceManager;
private final GlobalExternalTransactionInfoMgr globalExternalTransactionInfoMgr;
private final List<String> forceSkipJournalIds = Arrays.asList(Config.force_skip_journal_ids);
// all sessions' last heartbeat time of all fe
private static volatile Map<String, Long> sessionReportTimeMap = new HashMap<>();
private TokenManager tokenManager;
private DictionaryManager dictionaryManager;
// if a config is relative to a daemon thread. record the relation here. we will proactively change interval of it.
private final Map<String, Supplier<MasterDaemon>> configtoThreads = ImmutableMap
.of("dynamic_partition_check_interval_seconds", this::getDynamicPartitionScheduler);
public List<TFrontendInfo> getFrontendInfos() {
List<TFrontendInfo> res = new ArrayList<>();
for (Frontend fe : frontends.values()) {
TFrontendInfo feInfo = new TFrontendInfo();
feInfo.setCoordinatorAddress(new TNetworkAddress(fe.getHost(), fe.getRpcPort()));
feInfo.setProcessUuid(fe.getProcessUUID());
res.add(feInfo);
}
return res;
}
public List<Frontend> getFrontends(FrontendNodeType nodeType) {
if (nodeType == null) {
// get all
return Lists.newArrayList(frontends.values());
}
List<Frontend> result = Lists.newArrayList();
for (Frontend frontend : frontends.values()) {
if (frontend.getRole() == nodeType) {
result.add(frontend);
}
}
return result;
}
public List<String> getRemovedFrontendNames() {
return Lists.newArrayList(removedFrontends);
}
public JournalObservable getJournalObservable() {
return journalObservable;
}
public SystemInfoService getClusterInfo() {
return this.systemInfo;
}
private HeartbeatMgr getHeartbeatMgr() {
return this.heartbeatMgr;
}
public TabletInvertedIndex getTabletInvertedIndex() {
return this.tabletInvertedIndex;
}
// only for test
public void setColocateTableIndex(ColocateTableIndex colocateTableIndex) {
this.colocateTableIndex = colocateTableIndex;
}
public ColocateTableIndex getColocateTableIndex() {
return this.colocateTableIndex;
}
private CatalogRecycleBin getRecycleBin() {
return this.recycleBin;
}
public MetaReplayState getMetaReplayState() {
return metaReplayState;
}
public DynamicPartitionScheduler getDynamicPartitionScheduler() {
return this.dynamicPartitionScheduler;
}
public CatalogMgr getCatalogMgr() {
return catalogMgr;
}
public ExternalMetaCacheMgr getExtMetaCacheMgr() {
return extMetaCacheMgr;
}
public CatalogIf getCurrentCatalog() {
ConnectContext ctx = ConnectContext.get();
if (ctx == null) {
return catalogMgr.getInternalCatalog();
}
return ctx.getCurrentCatalog();
}
public InternalCatalog getInternalCatalog() {
return catalogMgr.getInternalCatalog();
}
public static InternalCatalog getCurrentInternalCatalog() {
return getCurrentEnv().getInternalCatalog();
}
public BinlogManager getBinlogManager() {
return binlogManager;
}
private static class SingletonHolder {
private static final Env INSTANCE = EnvFactory.getInstance().createEnv(false);
}
private Env() {
this(false);
}
// if isCheckpointCatalog is true, it means that we should not collect thread pool metric
public Env(boolean isCheckpointCatalog) {
this.catalogMgr = new CatalogMgr();
this.load = new Load();
this.routineLoadManager = EnvFactory.getInstance().createRoutineLoadManager();
this.groupCommitManager = new GroupCommitManager();
this.sqlBlockRuleMgr = new SqlBlockRuleMgr();
this.exportMgr = new ExportMgr();
this.syncJobManager = new SyncJobManager();
this.alter = new Alter();
this.consistencyChecker = new ConsistencyChecker();
this.lock = new MonitoredReentrantLock(true);
this.backupHandler = new BackupHandler(this);
this.metaDir = Config.meta_dir;
this.publishVersionDaemon = new PublishVersionDaemon();
this.deleteHandler = new DeleteHandler();
this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector();
this.partitionInfoCollector = new PartitionInfoCollector();
if (Config.enable_storage_policy) {
this.cooldownConfHandler = new CooldownConfHandler();
}
this.externalMetaIdMgr = new ExternalMetaIdMgr();
this.metastoreEventsProcessor = new MetastoreEventsProcessor();
this.jobManager = new JobManager<>();
this.labelProcessor = new LabelProcessor();
this.transientTaskManager = new TransientTaskManager();
this.replayedJournalId = new AtomicLong(0L);
this.stmtIdCounter = new AtomicLong(0L);
this.isElectable = false;
this.synchronizedTimeMs = 0;
this.feType = FrontendNodeType.INIT;
this.typeTransferQueue = Queues.newLinkedBlockingDeque();
this.role = FrontendNodeType.UNKNOWN;
this.frontends = new ConcurrentHashMap<>();
this.removedFrontends = new ConcurrentLinkedQueue<>();
this.journalObservable = new JournalObservable();
this.masterInfo = new MasterInfo();
this.systemInfo = EnvFactory.getInstance().createSystemInfoService();
this.heartbeatMgr = new HeartbeatMgr(systemInfo, !isCheckpointCatalog);
this.feSessionMgr = new FESessionMgr();
this.temporaryTableMgr = new TemporaryTableMgr();
this.aliveSessionSet = new HashSet<>();
this.tabletInvertedIndex = new TabletInvertedIndex();
this.colocateTableIndex = new ColocateTableIndex();
this.recycleBin = new CatalogRecycleBin();
this.functionSet = new FunctionSet();
this.functionSet.init();
this.functionRegistry = new FunctionRegistry();
this.metaReplayState = new MetaReplayState();
this.brokerMgr = new BrokerMgr();
this.resourceMgr = new ResourceMgr();
this.storageVaultMgr = new StorageVaultMgr(systemInfo);
this.globalTransactionMgr = EnvFactory.getInstance().createGlobalTransactionMgr(this);
this.tabletStatMgr = EnvFactory.getInstance().createTabletStatMgr();
this.auth = new Auth();
this.accessManager = new AccessControllerManager(auth);
this.authenticatorManager = new AuthenticatorManager(AuthenticateType.getAuthTypeConfigString());
this.domainResolver = new DomainResolver(auth);
this.metaContext = new MetaContext();
this.metaContext.setThreadLocalInfo();
this.stat = new TabletSchedulerStat();
this.tabletScheduler = new TabletScheduler(this, systemInfo, tabletInvertedIndex, stat,
Config.tablet_rebalancer_type);
this.tabletChecker = new TabletChecker(this, systemInfo, tabletScheduler, stat);
// The pendingLoadTaskScheduler's queue size should not less than Config.desired_max_waiting_jobs.
// So that we can guarantee that all submitted load jobs can be scheduled without being starved.
this.pendingLoadTaskScheduler = new MasterTaskExecutor("pending-load-task-scheduler",
Config.async_pending_load_task_pool_size, Config.desired_max_waiting_jobs, !isCheckpointCatalog);
// The loadingLoadTaskScheduler's queue size is unlimited, so that it can receive all loading tasks
// created after pending tasks finish. And don't worry about the high concurrency, because the
// concurrency is limited by Config.desired_max_waiting_jobs and Config.async_loading_load_task_pool_size.
this.loadingLoadTaskScheduler = new PriorityMasterTaskExecutor<>("loading-load-task-scheduler",
Config.async_loading_load_task_pool_size, LoadTask.COMPARATOR, LoadTask.class, !isCheckpointCatalog);
this.loadJobScheduler = new LoadJobScheduler();
this.loadManager = EnvFactory.getInstance().createLoadManager(loadJobScheduler);
this.progressManager = new ProgressManager();
this.streamLoadRecordMgr = new StreamLoadRecordMgr("stream_load_record_manager",
Config.fetch_stream_load_record_interval_second * 1000L);
this.tabletLoadIndexRecorderMgr = new TabletLoadIndexRecorderMgr();
this.loadEtlChecker = new LoadEtlChecker(loadManager);
this.loadLoadingChecker = new LoadLoadingChecker(loadManager);
this.routineLoadScheduler = new RoutineLoadScheduler(routineLoadManager);
this.routineLoadTaskScheduler = new RoutineLoadTaskScheduler(routineLoadManager);
this.syncChecker = new SyncChecker(syncJobManager);
this.smallFileMgr = new SmallFileMgr();
this.dynamicPartitionScheduler = new DynamicPartitionScheduler("DynamicPartitionScheduler",
Config.dynamic_partition_check_interval_seconds * 1000L);
this.metaDir = Config.meta_dir;
this.bdbDir = this.metaDir + BDB_DIR;
this.imageDir = this.metaDir + IMAGE_DIR;
this.pluginMgr = new PluginMgr();
this.auditEventProcessor = new AuditEventProcessor(this.pluginMgr);
this.refreshManager = new RefreshManager();
this.policyMgr = new PolicyMgr();
this.extMetaCacheMgr = new ExternalMetaCacheMgr(isCheckpointCatalog);
this.analysisManager = new AnalysisManager();
this.hboPlanStatisticsManager = new HboPlanStatisticsManager();
this.statisticsCleaner = new StatisticsCleaner();
this.statisticsAutoCollector = new StatisticsAutoCollector();
this.statisticsJobAppender = new StatisticsJobAppender();
this.globalFunctionMgr = new GlobalFunctionMgr();
this.workloadGroupMgr = new WorkloadGroupMgr();
this.computeGroupMgr = new ComputeGroupMgr(systemInfo);
this.workloadSchedPolicyMgr = new WorkloadSchedPolicyMgr();
this.workloadRuntimeStatusMgr = new WorkloadRuntimeStatusMgr();
this.admissionControl = new AdmissionControl(systemInfo);
this.queryStats = new QueryStats();
this.loadManagerAdapter = new LoadManagerAdapter();
this.hiveTransactionMgr = new HiveTransactionMgr();
this.plsqlManager = new PlsqlManager();
this.binlogManager = new BinlogManager();
this.binlogGcer = new BinlogGcer();
this.columnIdFlusher = new ColumnIdFlushDaemon();
this.queryCancelWorker = new QueryCancelWorker(systemInfo);
this.topicPublisherThread = new TopicPublisherThread(
"TopicPublisher", Config.publish_topic_info_interval_ms, systemInfo);
this.workloadGroupCheckerThread = new WorkloadGroupChecker();
this.mtmvService = new MTMVService();
this.eventProcessor = new EventProcessor(mtmvService);
this.insertOverwriteManager = new InsertOverwriteManager();
this.dnsCache = new DNSCache();
this.sqlCacheManager = new NereidsSqlCacheManager();
this.sortedPartitionsCacheManager = new NereidsSortedPartitionsCacheManager();
this.splitSourceManager = new SplitSourceManager();
this.globalExternalTransactionInfoMgr = new GlobalExternalTransactionInfoMgr();
this.tokenManager = new TokenManager();
this.dictionaryManager = new DictionaryManager();
}
public static Map<String, Long> getSessionReportTimeMap() {
return sessionReportTimeMap;
}
public void registerTempTableAndSession(Table table) {
if (ConnectContext.get() != null) {
ConnectContext.get().addTempTableToDB(table.getQualifiedDbName(), table.getName());
}
refreshSession(Util.getTempTableSessionId(table.getName()));
}
public void unregisterTempTable(Table table) {
if (ConnectContext.get() != null) {
ConnectContext.get().removeTempTableFromDB(table.getQualifiedDbName(), table.getName());
}
}
private void refreshSession(String sessionId) {
sessionReportTimeMap.put(sessionId, System.currentTimeMillis());
}
public void checkAndRefreshSession(String sessionId) {
if (sessionReportTimeMap.containsKey(sessionId)) {
sessionReportTimeMap.put(sessionId, System.currentTimeMillis());
}
}
public void refreshAllAliveSession() {
for (String sessionId : sessionReportTimeMap.keySet()) {
refreshSession(sessionId);
}
}
public static void destroyCheckpoint() {
if (CHECKPOINT != null) {
CHECKPOINT = null;
}
}
public static Env getCurrentEnv() {
if (isCheckpointThread()) {
// only checkpoint thread it self will goes here.
// so no need to care about the thread safe.
if (CHECKPOINT == null) {
CHECKPOINT = EnvFactory.getInstance().createEnv(true);
}
return CHECKPOINT;
} else {
return SingletonHolder.INSTANCE;
}
}
// NOTICE: in most case, we should use getCurrentEnv() to get the right catalog.
// but in some cases, we should get the serving catalog explicitly.
public static Env getServingEnv() {
return SingletonHolder.INSTANCE;
}
public BrokerMgr getBrokerMgr() {
return brokerMgr;
}
public ResourceMgr getResourceMgr() {
return resourceMgr;
}
public StorageVaultMgr getStorageVaultMgr() {
return storageVaultMgr;
}
public static GlobalTransactionMgrIface getCurrentGlobalTransactionMgr() {
return getCurrentEnv().globalTransactionMgr;
}
public GlobalTransactionMgrIface getGlobalTransactionMgr() {
return globalTransactionMgr;
}
public PluginMgr getPluginMgr() {
return pluginMgr;
}
public Auth getAuth() {
return auth;
}
public AccessControllerManager getAccessManager() {
return accessManager;
}
public AuthenticatorManager getAuthenticatorManager() {
return authenticatorManager;
}
public MTMVService getMtmvService() {
return mtmvService;
}
public EventProcessor getEventProcessor() {
return eventProcessor;
}
public InsertOverwriteManager getInsertOverwriteManager() {
return insertOverwriteManager;
}
public TabletScheduler getTabletScheduler() {
return tabletScheduler;
}
public TabletChecker getTabletChecker() {
return tabletChecker;
}
public AuditEventProcessor getAuditEventProcessor() {
return auditEventProcessor;
}
public ComputeGroupMgr getComputeGroupMgr() {
return computeGroupMgr;
}
public WorkloadGroupMgr getWorkloadGroupMgr() {
return workloadGroupMgr;
}
public WorkloadSchedPolicyMgr getWorkloadSchedPolicyMgr() {
return workloadSchedPolicyMgr;
}
public WorkloadRuntimeStatusMgr getWorkloadRuntimeStatusMgr() {
return workloadRuntimeStatusMgr;
}
public AdmissionControl getAdmissionControl() {
return admissionControl;
}
public ExternalMetaIdMgr getExternalMetaIdMgr() {
return externalMetaIdMgr;
}
public MetastoreEventsProcessor getMetastoreEventsProcessor() {
return metastoreEventsProcessor;
}
public PlsqlManager getPlsqlManager() {
return plsqlManager;
}
// use this to get correct ClusterInfoService instance
public static SystemInfoService getCurrentSystemInfo() {
return getCurrentEnv().getClusterInfo();
}
public static HeartbeatMgr getCurrentHeartbeatMgr() {
return getCurrentEnv().getHeartbeatMgr();
}
// use this to get correct TabletInvertedIndex instance
public static TabletInvertedIndex getCurrentInvertedIndex() {
return getCurrentEnv().getTabletInvertedIndex();
}
// use this to get correct ColocateTableIndex instance
public static ColocateTableIndex getCurrentColocateIndex() {
return getCurrentEnv().getColocateTableIndex();
}
public static CatalogRecycleBin getCurrentRecycleBin() {
return getCurrentEnv().getRecycleBin();
}
// use this to get correct env's journal version
public static int getCurrentEnvJournalVersion() {
if (MetaContext.get() == null) {
return FeMetaVersion.VERSION_CURRENT;
}
return MetaContext.get().getMetaVersion();
}
public static final boolean isCheckpointThread() {
return Thread.currentThread().getId() == checkpointThreadId;
}
public static PluginMgr getCurrentPluginMgr() {
return getCurrentEnv().getPluginMgr();
}
public static AuditEventProcessor getCurrentAuditEventProcessor() {
return getCurrentEnv().getAuditEventProcessor();
}
// For unit test only
public Checkpoint getCheckpointer() {
return checkpointer;
}
public HiveTransactionMgr getHiveTransactionMgr() {
return hiveTransactionMgr;
}
public static HiveTransactionMgr getCurrentHiveTransactionMgr() {
return getCurrentEnv().getHiveTransactionMgr();
}
public DNSCache getDnsCache() {
return dnsCache;
}
public List<String> getForceSkipJournalIds() {
return forceSkipJournalIds;
}
// Use tryLock to avoid potential dead lock
private boolean tryLock(boolean mustLock) {
while (true) {
try {
if (!lock.tryLock(Config.catalog_try_lock_timeout_ms, TimeUnit.MILLISECONDS)) {
// to see which thread held this lock for long time.
Thread owner = lock.getOwner();
if (owner != null) {
LOG.info("env lock is held by: {}", Util.dumpThread(owner, 10));
}
if (mustLock) {
continue;
} else {
return false;
}
}
return true;
} catch (InterruptedException e) {
LOG.warn("got exception while getting env lock", e);
if (mustLock) {
continue;
} else {
return lock.isHeldByCurrentThread();
}
}
}
}
private void unlock() {
if (lock.isHeldByCurrentThread()) {
this.lock.unlock();
}
}
public String getBdbDir() {
return bdbDir;
}
public String getImageDir() {
return imageDir;
}
public void initialize(String[] args) throws Exception {
// set meta dir first.
// we already set these variables in constructor. but Catalog is a singleton class.
// so they may be set before Config is initialized.
// set them here again to make sure these variables use values in fe.conf.
this.metaDir = Config.meta_dir;
this.bdbDir = this.metaDir + BDB_DIR;
this.imageDir = this.metaDir + IMAGE_DIR;
// 0. get local node and helper node info
getSelfHostPort();
getHelperNodes(args);
// 1. check and create dirs and files
File meta = new File(metaDir);
if (!meta.exists()) {
LOG.warn("Doris' meta dir {} does not exist." + " You need to create it before starting FE",
meta.getAbsolutePath());
throw new Exception(meta.getAbsolutePath() + " does not exist, will exit");
}
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
File bdbDir = new File(this.bdbDir);
if (!bdbDir.exists()) {
bdbDir.mkdirs();
}
}
File imageDir = new File(this.imageDir);
if (!imageDir.exists()) {
imageDir.mkdirs();
}
// init plugin manager
pluginMgr.init();
auditEventProcessor.start();
// 2. get cluster id and role (Observer or Follower)
if (!Config.enable_check_compatibility_mode) {
checkDeployMode();
getClusterIdAndRole();
} else {
isElectable = true;
role = FrontendNodeType.FOLLOWER;
nodeName = genFeNodeName(selfNode.getHost(),
selfNode.getPort(), false /* new style */);
}
// 3. Load image first and replay edits
this.editLog = new EditLog(nodeName);
loadImage(this.imageDir); // load image file
editLog.open(); // open bdb env
this.globalTransactionMgr.setEditLog(editLog);
this.idGenerator.setEditLog(editLog);
if (Config.enable_check_compatibility_mode) {
replayJournalsAndExit();
}
// 4. create load and export job label cleaner thread
createLabelCleaner();
// 5. create txn cleaner thread
createTxnCleaner();
// 6. start state listener thread
startStateListener();
// 7. create fe disk updater
createFeDiskUpdater();
if (!Config.edit_log_type.equalsIgnoreCase("bdb")) {
// If not using bdb, we need to notify the FE type transfer manually.
notifyNewFETypeTransfer(FrontendNodeType.MASTER);
}
queryCancelWorker.start();
StmtExecutor.initBlockSqlAstNames();
}
// wait until FE is ready.
public void waitForReady() throws InterruptedException {
long counter = 0;
while (true) {
if (isReady()) {
LOG.info("catalog is ready. FE type: {}", feType);
break;
}
Thread.sleep(100);
if (counter++ % 100 == 0) {
String reason = editLog == null ? "editlog is null" : editLog.getNotReadyReason();
LOG.info("wait catalog to be ready. feType:{} isReady:{}, counter:{} reason: {}",
feType, isReady.get(), counter, reason);
}
}
}
public boolean isReady() {
return isReady.get();
}
public boolean isHttpReady() {
return httpReady.get();
}
public void setHttpReady(boolean httpReady) {
this.httpReady.set(httpReady);
}
protected boolean isStartFromEmpty() {
File roleFile = new File(this.imageDir, Storage.ROLE_FILE);
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
return !roleFile.exists() && !versionFile.exists();
}
private void getClusterIdFromStorage(Storage storage) throws IOException {
clusterId = storage.getClusterID();
if (Config.cluster_id != -1 && Config.cluster_id != this.clusterId) {
LOG.warn("Configured cluster_id {} does not match stored cluster_id {}. "
+ "This may indicate a configuration error.",
Config.cluster_id, this.clusterId);
throw new IOException("Configured cluster_id does not match stored cluster_id. "
+ "Please check your configuration.");
}
}
protected void getClusterIdAndRole() throws IOException {
File roleFile = new File(this.imageDir, Storage.ROLE_FILE);
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
// if helper node is point to self, or there is ROLE and VERSION file in local.
// get the node type from local
if (isMyself() || (roleFile.exists() && versionFile.exists())) {
if (!isMyself()) {
LOG.info("find ROLE and VERSION file in local, ignore helper nodes: {}", helperNodes);
}
// check file integrity, if has.
if ((roleFile.exists() && !versionFile.exists()) || (!roleFile.exists() && versionFile.exists())) {
throw new IOException("role file and version file must both exist or both not exist. "
+ "please specific one helper node to recover. will exit.");
}
// ATTN:
// If the version file and role file does not exist and the helper node is itself,
// this should be the very beginning startup of the cluster, so we create ROLE and VERSION file,
// set isFirstTimeStartUp to true, and add itself to frontends list.
// If ROLE and VERSION file is deleted for some reason, we may arbitrarily start this node as
// FOLLOWER, which may cause UNDEFINED behavior.
// Everything may be OK if the origin role is exactly FOLLOWER,
// but if not, FE process will exit somehow.
Storage storage = new Storage(this.imageDir);
if (!roleFile.exists()) {
// The very first time to start the first node of the cluster.
// It should became a Master node (Master node's role is also FOLLOWER, which means electable)
// For compatibility. Because this is the very first time to start, so we arbitrarily choose
// a new name for this node
role = FrontendNodeType.FOLLOWER;
nodeName = genFeNodeName(selfNode.getHost(),
selfNode.getPort(), false /* new style */);
storage.writeFrontendRoleAndNodeName(role, nodeName);
LOG.info("very first time to start this node. role: {}, node name: {}", role.name(), nodeName);
} else {
role = storage.getRole();
if (role == FrontendNodeType.REPLICA) {
// for compatibility
role = FrontendNodeType.FOLLOWER;
}
nodeName = storage.getNodeName();
if (Strings.isNullOrEmpty(nodeName)) {
// In normal case, if ROLE file exist, role and nodeName should both exist.
// But we will get a empty nodeName after upgrading.
// So for forward compatibility, we use the "old-style" way of naming: "ip_port",
// and update the ROLE file.
nodeName = genFeNodeName(selfNode.getHost(), selfNode.getPort(), true/* old style */);
storage.writeFrontendRoleAndNodeName(role, nodeName);
LOG.info("forward compatibility. role: {}, node name: {}", role.name(), nodeName);
}
// Notice:
// With the introduction of FQDN, the nodeName is no longer bound to an IP address,
// so consistency is no longer checked here. Otherwise, the startup will fail.
}
Preconditions.checkNotNull(role);
Preconditions.checkNotNull(nodeName);
if (!versionFile.exists()) {
clusterId = Config.cluster_id == -1 ? Storage.newClusterID() : Config.cluster_id;
token = Strings.isNullOrEmpty(Config.auth_token) ? Storage.newToken() : Config.auth_token;
storage = new Storage(clusterId, token, this.imageDir);
storage.writeClusterIdAndToken();
isFirstTimeStartUp = true;
Frontend self = new Frontend(role, nodeName, selfNode.getHost(),
selfNode.getPort());
// Set self alive to true, the BDBEnvironment.getReplicationGroupAdmin() will rely on this to get
// helper node, before the heartbeat thread is started.
self.setIsAlive(true);
// We don't need to check if frontends already contains self.
// frontends must be empty cause no image is loaded and no journal is replayed yet.
// And this frontend will be persisted later after opening bdbje environment.
frontends.put(nodeName, self);
LOG.info("add self frontend: {}", self);
} else {
getClusterIdFromStorage(storage);
if (storage.getToken() == null) {
token = Strings.isNullOrEmpty(Config.auth_token) ? Storage.newToken() : Config.auth_token;
LOG.info("refresh new token");
storage.setToken(token);
storage.writeClusterIdAndToken();
} else {
token = storage.getToken();
}
isFirstTimeStartUp = false;
}
} else {
// try to get role and node name from helper node,
// this loop will not end until we get certain role type and name
while (true) {
if (!getFeNodeTypeAndNameFromHelpers()) {
LOG.warn("current node {} is not added to the group. please add it first. "
+ "sleep 5 seconds and retry, current helper nodes: {}", selfNode, helperNodes);
try {
Thread.sleep(5000);
continue;
} catch (InterruptedException e) {
LOG.warn("", e);
System.exit(-1);
}
}
if (role == FrontendNodeType.REPLICA) {
// for compatibility
role = FrontendNodeType.FOLLOWER;
}
break;
}
Preconditions.checkState(helperNodes.size() == 1);
Preconditions.checkNotNull(role);
Preconditions.checkNotNull(nodeName);
HostInfo rightHelperNode = helperNodes.get(0);
Storage storage = new Storage(this.imageDir);
if (roleFile.exists() && (role != storage.getRole() || !nodeName.equals(storage.getNodeName()))
|| !roleFile.exists()) {
storage.writeFrontendRoleAndNodeName(role, nodeName);
}
if (!versionFile.exists()) {
// If the version file doesn't exist, download it from helper node
if (!getVersionFileFromHelper(rightHelperNode)) {
throw new IOException("fail to download version file from "
+ rightHelperNode.getHost() + " will exit.");
}
// NOTE: cluster_id will be init when Storage object is constructed,
// so we new one.
storage = new Storage(this.imageDir);
getClusterIdFromStorage(storage);
token = storage.getToken();
if (Strings.isNullOrEmpty(token)) {
token = Config.auth_token;
}
} else {
// If the version file exist, read the cluster id and check the
// id with helper node to make sure they are identical
getClusterIdFromStorage(storage);
token = storage.getToken();
try {
String url = "http://" + NetUtils
.getHostPortInAccessibleFormat(rightHelperNode.getHost(), Config.http_port) + "/check";
HttpURLConnection conn = HttpURLUtil.getConnectionWithNodeIdent(url);
conn.setConnectTimeout(2 * 1000);
conn.setReadTimeout(2 * 1000);
String clusterIdString = conn.getHeaderField(MetaBaseAction.CLUSTER_ID);
int remoteClusterId = Integer.parseInt(clusterIdString);
if (remoteClusterId != clusterId) {
LOG.error("cluster id is not equal with helper node {}. will exit.",
rightHelperNode.getHost());
throw new IOException(
"cluster id is not equal with helper node "
+ rightHelperNode.getHost() + ". will exit.");
}
String remoteToken = conn.getHeaderField(MetaBaseAction.TOKEN);
if (token == null && remoteToken != null) {
LOG.info("get token from helper node. token={}.", remoteToken);
token = remoteToken;
storage.writeClusterIdAndToken();
storage.reload();
}
if (Config.enable_token_check) {
Preconditions.checkNotNull(token);
Preconditions.checkNotNull(remoteToken);
if (!token.equals(remoteToken)) {
throw new IOException(
"token is not equal with helper node "
+ rightHelperNode.getHost() + ". will exit.");
}
}
} catch (Exception e) {
throw new IOException("fail to check cluster_id and token with helper node.", e);
}
}
getNewImage(rightHelperNode);
}
if (Config.cluster_id != -1 && clusterId != Config.cluster_id) {
throw new IOException("cluster id is not equal with config item cluster_id. will exit. "
+ "If you are in recovery mode, please also modify the cluster_id in 'doris-meta/image/VERSION'");
}
if (role.equals(FrontendNodeType.FOLLOWER)) {
isElectable = true;
} else {
isElectable = false;
}
Preconditions.checkState(helperNodes.size() == 1);
LOG.info("finished to get cluster id: {}, isElectable: {}, role: {} and node name: {}",
clusterId, isElectable, role.name(), nodeName);
}
/**
* write cloud/local to MODE_FILE.
*/
protected void checkDeployMode() throws IOException {
File modeFile = new File(this.imageDir, Storage.DEPLOY_MODE_FILE);
Storage storage = new Storage(this.imageDir);
String expectedMode = getDeployMode();
if (modeFile.exists()) {
String actualMode = storage.getDeployMode();
Preconditions.checkArgument(expectedMode.equals(actualMode),
"You can't switch deploy mode from %s to %s, maybe you need to check fe.conf",
actualMode, expectedMode);
LOG.info("The current deployment mode is " + expectedMode + ".");
} else {
storage.setDeployMode(expectedMode);
storage.writeClusterMode();
LOG.info("The file DEPLOY_MODE doesn't exist, create it.");
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
if (versionFile.exists()) {
LOG.warn("This may be an upgrade from old version, "
+ "or the DEPLOY_MODE file has been manually deleted");
}
}
}
public static String genFeNodeName(String host, int port, boolean isOldStyle) {
if (isOldStyle) {
return host + "_" + port;
} else {
return "fe_" + UUID.randomUUID().toString().replace("-", "_");
}
}
// Get the role info and node name from helper node.
// return false if failed.
protected boolean getFeNodeTypeAndNameFromHelpers() {
// we try to get info from helper nodes, once we get the right helper node,
// other helper nodes will be ignored and removed.
HostInfo rightHelperNode = null;
for (HostInfo helperNode : helperNodes) {
try {
// For upgrade compatibility, the host parameter name remains the same
// and the new hostname parameter is added
String url = "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port)
+ "/role?host=" + selfNode.getHost()
+ "&port=" + selfNode.getPort();
HttpURLConnection conn = HttpURLUtil.getConnectionWithNodeIdent(url);
if (conn.getResponseCode() != 200) {
LOG.warn("failed to get fe node type from helper node: {}. response code: {}", helperNode,
conn.getResponseCode());
continue;
}
String type = conn.getHeaderField("role");
if (type == null) {
LOG.warn("failed to get fe node type from helper node: {}.", helperNode);
continue;
}
role = FrontendNodeType.valueOf(type);
nodeName = conn.getHeaderField("name");
// get role and node name before checking them, because we want to throw any exception
// as early as we encounter.
if (role == FrontendNodeType.UNKNOWN) {
LOG.warn("frontend {} is not added to cluster yet. role UNKNOWN", selfNode);
return false;
}
if (Strings.isNullOrEmpty(nodeName)) {
// For forward compatibility, we use old-style name: "ip_port"
nodeName = genFeNodeName(selfNode.getHost(), selfNode.getPort(), true /* old style */);
}
} catch (Exception e) {
LOG.warn("failed to get fe node type from helper node: {}.", helperNode, e);
continue;
}
LOG.info("get fe node type {}, name {} from {}:{}:{}", role, nodeName,
helperNode.getHost(), helperNode.getHost(), Config.http_port);
rightHelperNode = helperNode;
break;
}
if (rightHelperNode == null) {
return false;
}
helperNodes.clear();
helperNodes.add(rightHelperNode);
return true;
}
private void getSelfHostPort() {
String host = Strings.nullToEmpty(FrontendOptions.getLocalHostAddress());
selfNode = new HostInfo(host, Config.edit_log_port);
LOG.info("get self node: {}", selfNode);
}
private void getHelperNodes(String[] args) throws Exception {
String helpers = null;
for (int i = 0; i < args.length; i++) {
if (args[i].equalsIgnoreCase("-helper")) {
if (i + 1 >= args.length) {
throw new AnalysisException("-helper need parameter host:port,host:port");
}
helpers = args[i + 1];
break;
}
}
if (!Config.enable_deploy_manager.equalsIgnoreCase("disable")) {
if (Config.enable_deploy_manager.equalsIgnoreCase("k8s")) {
deployManager = new K8sDeployManager(this, 5000 /* 5s interval */);
} else if (Config.enable_deploy_manager.equalsIgnoreCase("ambari")) {
deployManager = new AmbariDeployManager(this, 5000 /* 5s interval */);
} else if (Config.enable_deploy_manager.equalsIgnoreCase("local")) {
deployManager = new LocalFileDeployManager(this, 5000 /* 5s interval */);
} else {
throw new AnalysisException("Unknow deploy manager: " + Config.enable_deploy_manager);
}
getHelperNodeFromDeployManager();
} else {
if (helpers != null) {
String[] splittedHelpers = helpers.split(",");
for (String helper : splittedHelpers) {
HostInfo helperHostPort = SystemInfoService.getHostAndPort(helper);
if (helperHostPort.isSame(selfNode)) {
/**
* If user specified the helper node to this FE itself,
* we will stop the starting FE process and report an error.
* First, it is meaningless to point the helper to itself.
* Secondly, when some users add FE for the first time, they will mistakenly
* point the helper that should have pointed to the Master to themselves.
* In this case, some errors have caused users to be troubled.
* So here directly exit the program and inform the user to avoid unnecessary trouble.
*/
throw new AnalysisException("Do not specify the helper node to FE itself. "
+ "Please specify it to the existing running Master or Follower FE");
}
helperNodes.add(helperHostPort);
}
} else {
// If helper node is not designated, use local node as helper node.
helperNodes.add(new HostInfo(selfNode.getHost(), Config.edit_log_port));
}
}
LOG.info("get helper nodes: {}", helperNodes);
}
@SuppressWarnings("unchecked")
private void getHelperNodeFromDeployManager() throws Exception {
Preconditions.checkNotNull(deployManager);
// 1. check if this is the first time to start up
File roleFile = new File(this.imageDir, Storage.ROLE_FILE);
File versionFile = new File(this.imageDir, Storage.VERSION_FILE);
if ((roleFile.exists() && !versionFile.exists()) || (!roleFile.exists() && versionFile.exists())) {
throw new Exception("role file and version file must both exist or both not exist. "
+ "please specific one helper node to recover. will exit.");
}
if (roleFile.exists()) {
// This is not the first time this node start up.
// It should already added to FE group, just set helper node as it self.
LOG.info("role file exist. this is not the first time to start up");
helperNodes = Lists.newArrayList(new HostInfo(selfNode.getHost(),
Config.edit_log_port));
return;
}
// This is the first time this node start up.
// Get helper node from deploy manager.
helperNodes = deployManager.getHelperNodes();
if (helperNodes == null || helperNodes.isEmpty()) {
throw new Exception("failed to get helper node from deploy manager. exit");
}
}
@SuppressWarnings({"checkstyle:WhitespaceAfter", "checkstyle:LineLength"})
private void transferToMaster() {
try {
// stop replayer
if (replayer != null) {
replayer.exit();
try {
replayer.join();
} catch (InterruptedException e) {
LOG.warn("got exception when stopping the replayer thread", e);
}
replayer = null;
}
// set this after replay thread stopped. to avoid replay thread modify them.
isReady.set(false);
canRead.set(false);
toMasterProgress = "open editlog";
editLog.open();
if (Config.edit_log_type.equalsIgnoreCase("bdb")) {
if (!haProtocol.fencing()) {
LOG.error("fencing failed. will exit.");
System.exit(-1);
}
}
toMasterProgress = "replay journal";
long replayStartTime = System.currentTimeMillis();
// replay journals. -1 means replay all the journals larger than current journal id.
replayJournal(-1);
long replayEndTime = System.currentTimeMillis();
LOG.info("finish replay in " + (replayEndTime - replayStartTime) + " msec");
removeDroppedFrontends(removedFrontends);
if (Config.enable_check_compatibility_mode) {
String msg = "check metadata compatibility successfully";
LOG.info(msg);
LogUtils.stdout(msg);
System.exit(0);
}
checkCurrentNodeExist();
checkBeExecVersion();
toMasterProgress = "roll editlog";
editLog.rollEditLog();
if (Config.enable_advance_next_id) {
advanceNextId();
}
// Log meta_version
long journalVersion = MetaContext.get().getMetaVersion();
if (journalVersion < FeConstants.meta_version) {
toMasterProgress = "log meta version";
editLog.logMetaVersion(FeConstants.meta_version);
MetaContext.get().setMetaVersion(FeConstants.meta_version);
}
// Log the first frontend
if (isFirstTimeStartUp) {
// if isFirstTimeStartUp is true, frontends must contains this Node.
Frontend self = frontends.get(nodeName);
Preconditions.checkNotNull(self);
// OP_ADD_FIRST_FRONTEND is emitted, so it can write to BDBJE even if canWrite is false
editLog.logAddFirstFrontend(self);
initLowerCaseTableNames();
// Set initial root password if master FE first time launch.
auth.setInitialRootPassword(Config.initial_root_password);
} else {
VariableMgr.forceUpdateVariables();
if (journalVersion <= FeMetaVersion.VERSION_114) {
// if journal version is less than 114, which means it is upgraded from version before 2.0.
// When upgrading from 1.2 to 2.0,
// we need to make sure that the parallelism of query remain unchanged
// when switch to pipeline engine, otherwise it may impact the load of entire cluster
// because the default parallelism of pipeline engine is higher than previous version.
// so set parallel_pipeline_task_num to parallel_fragment_exec_instance_num
int newVal = VariableMgr.newSessionVariable().parallelExecInstanceNum;
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.PARALLEL_PIPELINE_TASK_NUM,
String.valueOf(newVal));
// similar reason as above, need to upgrade broadcast scale factor during 1.2 to 2.x
// if the default value has been upgraded
double newBcFactorVal = VariableMgr.newSessionVariable().getBroadcastRightTableScaleFactor();
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x",
SessionVariable.BROADCAST_RIGHT_TABLE_SCALE_FACTOR,
String.valueOf(newBcFactorVal));
// similar reason as above, need to upgrade enable_nereids_planner to true
VariableMgr.refreshDefaultSessionVariables("1.x to 2.x", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
}
if (journalVersion <= FeMetaVersion.VERSION_123) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1", SessionVariable.ENABLE_NEREIDS_DML,
"true");
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.FRAGMENT_TRANSMISSION_COMPRESSION_CODEC, "none");
if (VariableMgr.newSessionVariable().nereidsTimeoutSecond == 5) {
VariableMgr.refreshDefaultSessionVariables("2.0 to 2.1",
SessionVariable.NEREIDS_TIMEOUT_SECOND, "30");
}
}
if (journalVersion <= FeMetaVersion.VERSION_129) {
VariableMgr.refreshDefaultSessionVariables("2.1 to 3.0", SessionVariable.ENABLE_NEREIDS_PLANNER,
"true");
VariableMgr.refreshDefaultSessionVariables("2.1 to 3.0", SessionVariable.ENABLE_NEREIDS_DML,
"true");
VariableMgr.refreshDefaultSessionVariables("2.1 to 3.0",
SessionVariable.ENABLE_FALLBACK_TO_ORIGINAL_PLANNER,
"false");
VariableMgr.refreshDefaultSessionVariables("2.1 to 3.0",
SessionVariable.ENABLE_MATERIALIZED_VIEW_REWRITE,
"true");
}
}
getPolicyMgr().createDefaultStoragePolicy();
// MUST set master ip before starting checkpoint thread.
// because checkpoint thread need this info to select non-master FE to push image
toMasterProgress = "log master info";
this.masterInfo = new MasterInfo(Env.getCurrentEnv().getSelfNode().getHost(),
Config.http_port,
Config.rpc_port);
editLog.logMasterInfo(masterInfo);
LOG.info("logMasterInfo:{}", masterInfo);
// for master, the 'isReady' is set behind.
// but we are sure that all metadata is replayed if we get here.
// so no need to check 'isReady' flag in this method
postProcessAfterMetadataReplayed(false);
insertOverwriteManager.allTaskFail();
toMasterProgress = "start daemon threads";
// coz current fe was not master fe and didn't get all fes' alive session report before, which cause
// sessionReportTimeMap is not up-to-date.
// reset all session's last heartbeat time. must run before init of TemporaryTableMgr
refreshAllAliveSession();
// start all daemon threads that only running on MASTER FE
startMasterOnlyDaemonThreads();
// start other daemon threads that should running on all FE
startNonMasterDaemonThreads();
MetricRepo.init();
toMasterProgress = "finished";
canRead.set(true);
isReady.set(true);
checkLowerCaseTableNames();
String msg = "master finished to replay journal, can write now.";
LogUtils.stdout(msg);
LOG.info(msg);
// for master, there are some new thread pools need to register metric
ThreadPoolManager.registerAllThreadPoolMetric();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
} catch (Throwable e) {
// When failed to transfer to master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to master. progress: {}", toMasterProgress, e);
System.exit(-1);
}
}
/*
* Advance the id generator, ensuring it doesn't roll back.
*
* If we need to support time travel, the next id cannot be rolled back to avoid
* errors in the corresponding relationship of the metadata recorded in BE/MS.
*/
void advanceNextId() {
long currentId = idGenerator.getBatchEndId();
long currentMill = System.currentTimeMillis();
long nextId = currentId + 1;
// Reserve ~1 trillion for use in case of bugs or frequent reboots (~2 billion reboots)
if ((1L << 63) - nextId < (1L << 40)) {
LOG.warn("nextId is too large: {}, it may be a bug and consider backup and migration", nextId);
} else {
// Keep compatible with previous impl, the previous impl may result in extreme large nextId,
// and guess there are no more than 1L<<32 (~4e9) ids used since last reboot
nextId = (currentId + 1) < currentMill ? currentMill : currentId + (1L << 32);
}
// ATTN: Because MetaIdGenerator has guaranteed that each id it returns must have
// been persisted, there is no need to perform persistence again here.
idGenerator.setId(nextId);
LOG.info("advance the next id from {} to {}", currentId, nextId);
}
/*
* There are something need to do after metadata is replayed, such as
* 1. bug fix for metadata
* 2. register some hook.
* If there is, add them here.
*/
public boolean postProcessAfterMetadataReplayed(boolean waitCatalogReady) {
if (waitCatalogReady) {
while (!isReady()) {
// Avoid endless waiting if the state has changed.
//
// Consider the following situation:
// 1. The follower replay journals and is not set to ready because the synchronization internval
// exceeds meta delay toleration seconds.
// 2. The underlying BEBJE node of this follower is selected as the master, but the state listener
// thread is waiting for catalog ready.
if (typeTransferQueue.peek() != null) {
return false;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
LOG.warn("", e);
}
}
}
auth.rectifyPrivs();
catalogMgr.registerCatalogRefreshListener(this);
// MTMV needs to be compatible with old metadata, and during the compatibility process,
// it needs to wait for all catalog data to be ready, so it cannot be processed through gsonPostProcess()
// We catch all possible exceptions to avoid FE startup failure
try {
MTMVUtil.compatibleMTMV(catalogMgr);
} catch (Throwable t) {
LOG.warn("compatibleMTMV failed", t);
}
return true;
}
// start all daemon threads only running on Master
protected void startMasterOnlyDaemonThreads() {
// start checkpoint thread
checkpointer = new Checkpoint(editLog);
checkpointer.setMetaContext(metaContext);
// set "checkpointThreadId" before the checkpoint thread start, because the thread
// need to check the "checkpointThreadId" when running.
checkpointThreadId = checkpointer.getId();
checkpointer.start();
LOG.info("checkpointer thread started. thread id is {}", checkpointThreadId);
// heartbeat mgr
heartbeatMgr.setMaster(clusterId, token, epoch);
heartbeatMgr.start();
// alive session of all fes' mgr
feSessionMgr.setClusterId(clusterId);
feSessionMgr.setToken(token);
feSessionMgr.start();
temporaryTableMgr.start();
// New load scheduler
pendingLoadTaskScheduler.start();
loadingLoadTaskScheduler.start();
loadManager.prepareJobs();
loadJobScheduler.start();
loadEtlChecker.start();
loadLoadingChecker.start();
if (Config.isNotCloudMode()) {
// Tablet checker and scheduler
tabletChecker.start();
tabletScheduler.start();
// Colocate tables checker and balancer
ColocateTableCheckerAndBalancer.getInstance().start();
// Publish Version Daemon
publishVersionDaemon.start();
// Start txn cleaner
txnCleaner.start();
// Consistency checker
getConsistencyChecker().start();
// Backup handler
getBackupHandler().start();
// start daemon thread to update global partition version and in memory information periodically
partitionInfoCollector.start();
}
jobManager.start();
// transient task manager
transientTaskManager.start();
// Alter
getAlterInstance().start();
// catalog recycle bin
getRecycleBin().start();
// time printer
createTimePrinter();
timePrinter.start();
// deploy manager
if (!Config.enable_deploy_manager.equalsIgnoreCase("disable")) {
LOG.info("deploy manager {} start", deployManager.getName());
deployManager.start();
deployManager.startListener();
}
// start routine load scheduler
routineLoadScheduler.start();
routineLoadTaskScheduler.start();
// start sync checker
syncChecker.start();
// start dynamic partition task
dynamicPartitionScheduler.start();
// start daemon thread to update db used data quota for db txn manager periodically
dbUsedDataQuotaInfoCollector.start();
if (Config.enable_storage_policy) {
cooldownConfHandler.start();
}
streamLoadRecordMgr.start();
tabletLoadIndexRecorderMgr.start();
new InternalSchemaInitializer().start();
getRefreshManager().start();
// binlog gcer
binlogGcer.start();
columnIdFlusher.start();
insertOverwriteManager.start();
dictionaryManager.start();
TopicPublisher wgPublisher = new WorkloadGroupPublisher(this);
topicPublisherThread.addToTopicPublisherList(wgPublisher);
WorkloadSchedPolicyPublisher wpPublisher = new WorkloadSchedPolicyPublisher(this);
topicPublisherThread.addToTopicPublisherList(wpPublisher);
topicPublisherThread.start();
workloadGroupCheckerThread.start();
// auto analyze related threads.
statisticsCleaner.start();
statisticsAutoCollector.start();
statisticsJobAppender.start();
}
// start threads that should run on all FE
protected void startNonMasterDaemonThreads() {
// start load manager thread
tokenManager.start();
loadManager.start();
tabletStatMgr.start();
// load and export job label cleaner thread
labelCleaner.start();
// es repository
getInternalCatalog().getEsRepository().start();
// domain resolver
domainResolver.start();
// fe disk updater
feDiskUpdater.start();
metastoreEventsProcessor.start();
dnsCache.start();
workloadGroupMgr.start();
workloadSchedPolicyMgr.start();
workloadRuntimeStatusMgr.start();
admissionControl.start();
splitSourceManager.start();
}
private void transferToNonMaster(FrontendNodeType newType) {
isReady.set(false);
try {
if (feType == FrontendNodeType.OBSERVER || feType == FrontendNodeType.FOLLOWER) {
Preconditions.checkState(newType == FrontendNodeType.UNKNOWN);
LOG.warn("{} to UNKNOWN, still offer read service", feType.name());
// not set canRead here, leave canRead as what is was.
// if meta out of date, canRead will be set to false in replayer thread.
metaReplayState.setTransferToUnknown();
return;
}
// transfer from INIT/UNKNOWN to OBSERVER/FOLLOWER
if (replayer == null) {
createReplayer();
replayer.start();
}
// 'isReady' will be set to true in 'setCanRead()' method
if (!postProcessAfterMetadataReplayed(true)) {
// the state has changed, exit early.
return;
}
checkLowerCaseTableNames();
startNonMasterDaemonThreads();
MetricRepo.init();
if (analysisManager != null) {
analysisManager.getStatisticsCache().preHeat();
}
if (followerColumnSender == null) {
followerColumnSender = new FollowerColumnSender();
followerColumnSender.start();
}
} catch (Throwable e) {
// When failed to transfer to non-master, we need to exit the process.
// Otherwise, the process will be in an unknown state.
LOG.error("failed to transfer to non-master.", e);
System.exit(-1);
}
}
// Set global variable 'lower_case_table_names' only when the cluster is initialized.
private void initLowerCaseTableNames() {
if (Config.lower_case_table_names > 2 || Config.lower_case_table_names < 0) {
LOG.error("Unsupported configuration value of lower_case_table_names: " + Config.lower_case_table_names);
System.exit(-1);
}
try {
VariableMgr.setLowerCaseTableNames(Config.lower_case_table_names);
} catch (Exception e) {
LOG.error("Initialization of lower_case_table_names failed.", e);
System.exit(-1);
}
LOG.info("Finish initializing lower_case_table_names, value is {}", GlobalVariable.lowerCaseTableNames);
}
// After the cluster initialization is complete, 'lower_case_table_names' can not be modified during the cluster
// restart or upgrade.
private void checkLowerCaseTableNames() {
while (!isReady()) {
// Waiting for lower_case_table_names to initialize value from image or editlog.
try {
LOG.info("Waiting for \'lower_case_table_names\' initialization.");
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
LOG.error("Sleep got exception while waiting for lower_case_table_names initialization. ", e);
}
}
if (Config.lower_case_table_names != GlobalVariable.lowerCaseTableNames) {
LOG.error("The configuration of \'lower_case_table_names\' does not support modification, "
+ "the expected value is {}, but the actual value is {}",
GlobalVariable.lowerCaseTableNames,
Config.lower_case_table_names);
System.exit(-1);
}
LOG.info("lower_case_table_names is {}", GlobalVariable.lowerCaseTableNames);
}
/*
* If the current node is not in the frontend list, then exit. This may
* happen when this node is removed from frontend list, and the drop
* frontend log is deleted because of checkpoint.
*/
private void checkCurrentNodeExist() {
boolean metadataFailureRecovery = null != System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY);
if (metadataFailureRecovery) {
return;
}
Frontend fe = checkFeExist(selfNode.getHost(), selfNode.getPort());
if (fe == null) {
LOG.error("current node {}:{} is not added to the cluster, will exit."
+ " Your FE IP maybe changed, please set 'priority_networks' config in fe.conf properly.",
selfNode.getHost(), selfNode.getPort());
System.exit(-1);
} else if (fe.getRole() != role) {
LOG.error("current node role is {} not match with frontend recorded role {}. will exit", role,
fe.getRole());
System.exit(-1);
}
}
private void checkBeExecVersion() {
if (Config.be_exec_version < Config.min_be_exec_version
|| Config.be_exec_version > Config.max_be_exec_version) {
LOG.error("be_exec_version={} is not supported, please set be_exec_version in interval [{}, {}]",
Config.be_exec_version, Config.min_be_exec_version, Config.max_be_exec_version);
System.exit(-1);
}
}
protected boolean getVersionFileFromHelper(HostInfo helperNode) throws IOException {
try {
String url = "http://" + NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port)
+ "/version";
File dir = new File(this.imageDir);
MetaHelper.getRemoteFile(url, HTTP_TIMEOUT_SECOND * 1000,
MetaHelper.getFile(Storage.VERSION_FILE, dir));
MetaHelper.complete(Storage.VERSION_FILE, dir);
return true;
} catch (Exception e) {
LOG.warn(e);
}
return false;
}
protected void getNewImage(HostInfo helperNode) throws IOException {
long localImageVersion = 0;
Storage storage = new Storage(this.imageDir);
localImageVersion = storage.getLatestImageSeq();
try {
String hostPort = NetUtils.getHostPortInAccessibleFormat(helperNode.getHost(), Config.http_port);
String infoUrl = "http://" + hostPort + "/info";
ResponseBody<StorageInfo> responseBody = MetaHelper
.doGet(infoUrl, HTTP_TIMEOUT_SECOND * 1000, StorageInfo.class);
if (responseBody.getCode() != RestApiStatusCode.OK.code) {
LOG.warn("get image failed,responseBody:{}", responseBody);
throw new IOException(responseBody.toString());
}
StorageInfo info = responseBody.getData();
long version = info.getImageSeq();
if (version > localImageVersion) {
String url = "http://" + hostPort + "/image?version=" + version;
String filename = Storage.IMAGE + "." + version;
File dir = new File(this.imageDir);
MetaHelper.getRemoteFile(url, Config.sync_image_timeout_second * 1000,
MetaHelper.getFile(filename, dir));
MetaHelper.complete(filename, dir);
} else {
LOG.warn("get an image with a lower version, localImageVersion: {}, got version: {}",
localImageVersion, version);
}
} catch (Exception e) {
throw new IOException(e);
}
}
protected boolean isMyself() {
Preconditions.checkNotNull(selfNode);
Preconditions.checkNotNull(helperNodes);
if (LOG.isDebugEnabled()) {
LOG.debug("self: {}. helpers: {}", selfNode, helperNodes);
}
// if helper nodes contain itself, remove other helpers
boolean containSelf = false;
for (HostInfo helperNode : helperNodes) {
// WARN: cannot use equals() here, because the hostname may not equal to helperNode.getHostName()
if (selfNode.isSame(helperNode)) {
containSelf = true;
break;
}
}
if (containSelf) {
helperNodes.clear();
helperNodes.add(selfNode);
}
return containSelf;
}
public StatisticsCache getStatisticsCache() {
return analysisManager.getStatisticsCache();
}
public boolean hasReplayer() {
return replayer != null;
}
public void loadImage(String imageDir) throws IOException, DdlException {
Storage storage = new Storage(imageDir);
getClusterIdFromStorage(storage);
File curFile = storage.getCurrentImageFile();
if (!curFile.exists()) {
// image.0 may not exist
LOG.info("image does not exist: {}", curFile.getAbsolutePath());
return;
}
replayedJournalId.set(storage.getLatestImageSeq());
MetaReader.read(curFile, this);
}
public long loadHeader(DataInputStream dis, MetaHeader metaHeader, long checksum) throws IOException, DdlException {
switch (metaHeader.getMetaFormat()) {
case COR1:
return loadHeaderCOR1(dis, checksum);
default:
throw new DdlException("unsupported image format.");
}
}
public long loadHeaderCOR1(DataInputStream dis, long checksum) throws IOException {
int journalVersion = dis.readInt();
if (journalVersion > FeMetaVersion.VERSION_CURRENT) {
throw new IOException("The meta version of image is " + journalVersion
+ ", which is higher than FE current version " + FeMetaVersion.VERSION_CURRENT
+ ". Please upgrade your cluster to the latest version first.");
}
long newChecksum = checksum ^ journalVersion;
MetaContext.get().setMetaVersion(journalVersion);
long replayedJournalId = dis.readLong();
newChecksum ^= replayedJournalId;
long catalogId = dis.readLong();
newChecksum ^= catalogId;
idGenerator.setId(catalogId);
// compatible with isDefaultClusterCreated, now is deprecated.
// just read and skip it.
dis.readBoolean();
LOG.info("finished replay header from image");
return newChecksum;
}
public long loadMasterInfo(DataInputStream dis, long checksum) throws IOException {
masterInfo = MasterInfo.read(dis);
long newChecksum = checksum ^ masterInfo.getRpcPort();
newChecksum ^= masterInfo.getHttpPort();
LOG.info("finished replay masterInfo from image");
return newChecksum;
}
public long loadFrontends(DataInputStream dis, long checksum) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
for (int i = 0; i < size; i++) {
Frontend fe = Frontend.read(dis);
replayAddFrontend(fe);
}
size = dis.readInt();
newChecksum ^= size;
for (int i = 0; i < size; i++) {
removedFrontends.add(Text.readString(dis));
}
LOG.info("finished replay frontends from image");
return newChecksum;
}
public long loadBackends(DataInputStream dis, long checksum) throws IOException {
LOG.info("start loading backends from image");
return systemInfo.loadBackends(dis, checksum);
}
public long loadDb(DataInputStream dis, long checksum) throws IOException, DdlException {
LOG.info("start loading db from image");
return getInternalCatalog().loadDb(dis, checksum);
}
public long loadExportJob(DataInputStream dis, long checksum) throws IOException, DdlException {
long curTime = System.currentTimeMillis();
long newChecksum = checksum;
int size = dis.readInt();
newChecksum = checksum ^ size;
for (int i = 0; i < size; ++i) {
long jobId = dis.readLong();
newChecksum ^= jobId;
ExportJob job = ExportJob.read(dis);
job.cancelReplayedExportJob();
if (!job.isExpired(curTime)) {
exportMgr.unprotectAddJob(job);
}
}
LOG.info("finished replay exportJob from image");
return newChecksum;
}
public long loadSyncJobs(DataInputStream dis, long checksum) throws IOException, DdlException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_103) {
syncJobManager.readField(dis);
}
LOG.info("finished replay syncJobMgr from image");
return checksum;
}
public long loadAlterJob(DataInputStream dis, long checksum)
throws IOException, AnalysisException {
long newChecksum = checksum;
for (JobType type : JobType.values()) {
newChecksum = loadAlterJob(dis, newChecksum, type);
}
LOG.info("finished replay alterJob from image");
return newChecksum;
}
public long loadAlterJob(DataInputStream dis, long checksum, JobType type)
throws IOException, AnalysisException {
// alter jobs
int size = dis.readInt();
long newChecksum = checksum ^ size;
if (size > 0) {
// There should be no old alter jobs, if exist throw exception, should not use this FE version
throw new IOException("There are [" + size + "] old alter jobs."
+ " Please downgrade FE to an older version and handle residual jobs");
}
// finished or cancelled jobs
size = dis.readInt();
newChecksum ^= size;
if (size > 0) {
throw new IOException("There are [" + size + "] old finished or cancelled alter jobs."
+ " Please downgrade FE to an older version and handle residual jobs");
}
// alter job v2
size = dis.readInt();
newChecksum ^= size;
for (int i = 0; i < size; i++) {
AlterJobV2 alterJobV2 = AlterJobV2.read(dis);
if (alterJobV2.isExpire()) {
LOG.info("alter job {} is expired, type: {}, ignore it", alterJobV2.getJobId(), alterJobV2.getType());
continue;
}
if (type == JobType.ROLLUP || type == JobType.SCHEMA_CHANGE) {
if (type == JobType.ROLLUP) {
this.getMaterializedViewHandler().addAlterJobV2(alterJobV2);
} else {
this.getSchemaChangeHandler().addAlterJobV2(alterJobV2);
}
// ATTN : we just want to add tablet into TabletInvertedIndex when only PendingJob is checkpointed
// to prevent TabletInvertedIndex data loss,
// So just use AlterJob.replay() instead of AlterHandler.replay().
if (alterJobV2.getJobState() == AlterJobV2.JobState.PENDING) {
alterJobV2.replay(alterJobV2);
LOG.info("replay pending alter job when load alter job {} ", alterJobV2.getJobId());
}
} else {
throw new IOException("Invalid alter job type: " + type.name());
}
}
return newChecksum;
}
public long loadBackupHandler(DataInputStream dis, long checksum) throws IOException {
getBackupHandler().readFields(dis);
getBackupHandler().setEnv(this);
LOG.info("finished replay backupHandler from image");
return checksum;
}
public long loadDeleteHandler(DataInputStream dis, long checksum) throws IOException {
this.deleteHandler = DeleteHandler.read(dis);
LOG.info("finished replay deleteHandler from image");
return checksum;
}
public long loadAuth(DataInputStream dis, long checksum) throws IOException {
// CAN NOT use Auth.read(), cause this auth instance is already passed to DomainResolver
auth.readFields(dis);
LOG.info("finished replay auth from image");
return checksum;
}
public long loadTransactionState(DataInputStream dis, long checksum) throws IOException {
int size = dis.readInt();
long newChecksum = checksum ^ size;
globalTransactionMgr.readFields(dis);
LOG.info("finished replay transactionState from image");
return newChecksum;
}
public long loadRecycleBin(DataInputStream dis, long checksum) throws IOException {
recycleBin = CatalogRecycleBin.read(dis);
// add tablet in Recycle bin to TabletInvertedIndex
recycleBin.addTabletToInvertedIndex();
// create DatabaseTransactionMgr for db in recycle bin.
// these dbs do not exist in `idToDb` of the catalog.
for (Long dbId : recycleBin.getAllDbIds()) {
globalTransactionMgr.addDatabaseTransactionMgr(dbId);
}
LOG.info("finished replay recycleBin from image");
return checksum;
}
// global variable persistence
public long loadGlobalVariable(DataInputStream in, long checksum) throws IOException, DdlException {
VariableMgr.read(in);
LOG.info("finished replay globalVariable from image");
return checksum;
}
// load binlogs
public long loadBinlogs(DataInputStream dis, long checksum) throws IOException {
binlogManager.read(dis, checksum);
LOG.info("finished replay binlogMgr from image");
return checksum;
}
public long loadColocateTableIndex(DataInputStream dis, long checksum) throws IOException {
Env.getCurrentColocateIndex().readFields(dis);
LOG.info("finished replay colocateTableIndex from image");
return checksum;
}
public long loadRoutineLoadJobs(DataInputStream dis, long checksum) throws IOException {
Env.getCurrentEnv().getRoutineLoadManager().readFields(dis);
LOG.info("finished replay routineLoadJobs from image");
return checksum;
}
public long loadLoadJobsV2(DataInputStream in, long checksum) throws IOException {
loadManager.readFields(in);
LOG.info("finished replay loadJobsV2 from image");
return checksum;
}
public long loadAsyncJobManager(DataInputStream in, long checksum) throws IOException {
jobManager.readFields(in);
LOG.info("finished replay asyncJobMgr from image");
return checksum;
}
public long saveAsyncJobManager(CountingDataOutputStream out, long checksum) throws IOException {
jobManager.write(out);
LOG.info("finished save analysisMgr to image");
return checksum;
}
public long loadResources(DataInputStream in, long checksum) throws IOException {
resourceMgr = ResourceMgr.read(in);
LOG.info("finished replay resources from image");
return checksum;
}
public long loadWorkloadGroups(DataInputStream in, long checksum) throws IOException {
workloadGroupMgr = WorkloadGroupMgr.read(in);
LOG.info("finished replay workload groups from image");
return checksum;
}
public long loadWorkloadSchedPolicy(DataInputStream in, long checksum) throws IOException {
workloadSchedPolicyMgr = WorkloadSchedPolicyMgr.read(in);
LOG.info("finished replay workload sched policy from image");
return checksum;
}
public long loadPlsqlProcedure(DataInputStream in, long checksum) throws IOException {
plsqlManager = PlsqlManager.read(in);
LOG.info("finished replay plsql procedure from image");
return checksum;
}
public long loadSmallFiles(DataInputStream in, long checksum) throws IOException {
smallFileMgr.readFields(in);
LOG.info("finished replay smallFiles from image");
return checksum;
}
public long loadSqlBlockRule(DataInputStream in, long checksum) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_104) {
sqlBlockRuleMgr = SqlBlockRuleMgr.read(in);
}
LOG.info("finished replay sqlBlockRule from image");
return checksum;
}
/**
* Load policy through file.
**/
public long loadPolicy(DataInputStream in, long checksum) throws IOException {
if (Env.getCurrentEnvJournalVersion() >= FeMetaVersion.VERSION_109) {
policyMgr = PolicyMgr.read(in);
}
LOG.info("finished replay policy from image");
return checksum;
}
/**
* Load catalogs through file.
**/
public long loadCatalog(DataInputStream in, long checksum) throws IOException {
LOG.info("start loading catalog from image");
CatalogMgr mgr = CatalogMgr.read(in);
// When enable the multi catalog in the first time, the "mgr" will be a null value.
// So ignore it to use default catalog manager.
if (mgr != null) {
this.catalogMgr = mgr;
}
LOG.info("finished replay catalog from image");
return checksum;
}
/**
* Load global function.
**/
public long loadGlobalFunction(DataInputStream in, long checksum) throws IOException {
this.globalFunctionMgr = GlobalFunctionMgr.read(in);
LOG.info("finished replay global function from image");
return checksum;
}
public long loadAnalysisManager(DataInputStream in, long checksum) throws IOException {
this.analysisManager = AnalysisManager.readFields(in);
LOG.info("finished replay AnalysisMgr from image");
return checksum;
}
public long loadInsertOverwrite(DataInputStream in, long checksum) throws IOException {
this.insertOverwriteManager = InsertOverwriteManager.read(in);
LOG.info("finished replay iot from image");
return checksum;
}
public long saveInsertOverwrite(CountingDataOutputStream out, long checksum) throws IOException {
this.insertOverwriteManager.write(out);
LOG.info("finished save iot to image");
return checksum;
}
public long loadDictionaryManager(DataInputStream in, long checksum) throws IOException {
this.dictionaryManager = DictionaryManager.read(in);
LOG.info("finished replay dictMgr from image");
return checksum;
}
public long saveDictionaryManager(CountingDataOutputStream out, long checksum) throws IOException {
this.dictionaryManager.write(out);
LOG.info("finished save dictMgr to image");
return checksum;
}
// Only called by checkpoint thread
// return the latest image file's absolute path
public String saveImage() throws IOException {
// Write image.ckpt
Storage storage = new Storage(this.imageDir);
File curFile = storage.getImageFile(replayedJournalId.get());
File ckpt = new File(this.imageDir, Storage.IMAGE_NEW);
saveImage(ckpt, replayedJournalId.get());
// Move image.ckpt to image.dataVersion
LOG.info("Move " + ckpt.getAbsolutePath() + " to " + curFile.getAbsolutePath());
if (!ckpt.renameTo(curFile)) {
curFile.delete();
throw new IOException();
}
return curFile.getAbsolutePath();
}
public void saveImage(File curFile, long replayedJournalId) throws IOException {
if (curFile.exists()) {
if (!curFile.delete()) {
throw new IOException(curFile.getName() + " can not be deleted.");
}
}
if (!curFile.createNewFile()) {
throw new IOException(curFile.getName() + " can not be created.");
}
MetaWriter.write(curFile, this);
}
public long saveHeader(CountingDataOutputStream dos, long replayedJournalId, long checksum) throws IOException {
// Write meta version
checksum ^= FeConstants.meta_version;
dos.writeInt(FeConstants.meta_version);
// Write replayed journal id
checksum ^= replayedJournalId;
dos.writeLong(replayedJournalId);
// Write id
long id = idGenerator.getBatchEndId();
checksum ^= id;
dos.writeLong(id);
// compatible with isDefaultClusterCreated value, now is deprecated,
// so just write a true value.
dos.writeBoolean(true);
return checksum;
}
public long saveMasterInfo(CountingDataOutputStream dos, long checksum) throws IOException {
masterInfo.write(dos);
checksum ^= masterInfo.getRpcPort();
checksum ^= masterInfo.getHttpPort();
return checksum;
}
public long saveFrontends(CountingDataOutputStream dos, long checksum) throws IOException {
int size = frontends.size();
checksum ^= size;
dos.writeInt(size);
for (Frontend fe : frontends.values()) {
fe.write(dos);
}
size = removedFrontends.size();
checksum ^= size;
dos.writeInt(size);
for (String feName : removedFrontends) {
Text.writeString(dos, feName);
}
return checksum;
}
public long saveBackends(CountingDataOutputStream dos, long checksum) throws IOException {
return systemInfo.saveBackends(dos, checksum);
}
public long saveDb(CountingDataOutputStream dos, long checksum) throws IOException {
return getInternalCatalog().saveDb(dos, checksum);
}
public long saveExportJob(CountingDataOutputStream dos, long checksum) throws IOException {
long curTime = System.currentTimeMillis();
List<ExportJob> jobs = exportMgr.getJobs().stream().filter(t -> !t.isExpired(curTime))
.collect(Collectors.toList());
if (jobs.size() > Config.max_export_history_job_num) {
jobs.sort(Comparator.comparingLong(ExportJob::getCreateTimeMs));
Iterator<ExportJob> iterator = jobs.iterator();
while (jobs.size() > Config.max_export_history_job_num && iterator.hasNext()) {
ExportJob job = iterator.next();
if (job.getState() == ExportJobState.FINISHED || job.getState() == ExportJobState.CANCELLED) {
iterator.remove();
}
}
}
int size = jobs.size();
checksum ^= size;
dos.writeInt(size);
for (ExportJob job : jobs) {
long jobId = job.getId();
checksum ^= jobId;
dos.writeLong(jobId);
job.write(dos);
}
return checksum;
}
public long saveSyncJobs(CountingDataOutputStream dos, long checksum) throws IOException {
syncJobManager.write(dos);
return checksum;
}
public long saveAlterJob(CountingDataOutputStream dos, long checksum) throws IOException {
for (JobType type : JobType.values()) {
checksum = saveAlterJob(dos, checksum, type);
}
return checksum;
}
public long saveAlterJob(CountingDataOutputStream dos, long checksum, JobType type) throws IOException {
Map<Long, AlterJobV2> alterJobsV2;
if (type == JobType.ROLLUP) {
alterJobsV2 = this.getMaterializedViewHandler().getAlterJobsV2();
} else if (type == JobType.SCHEMA_CHANGE) {
alterJobsV2 = this.getSchemaChangeHandler().getAlterJobsV2();
} else if (type == JobType.DECOMMISSION_BACKEND) {
// Load alter job need decommission backend type to load image
alterJobsV2 = Maps.newHashMap();
} else {
throw new IOException("Invalid alter job type: " + type.name());
}
// alter jobs == 0
// If the FE version upgrade from old version, if it have alter jobs, the FE will failed during start process
// the number of old version alter jobs has to be 0
int size = 0;
checksum ^= size;
dos.writeInt(size);
// finished or cancelled jobs
size = 0;
checksum ^= size;
dos.writeInt(size);
// alter job v2
size = alterJobsV2.size();
checksum ^= size;
dos.writeInt(size);
for (AlterJobV2 alterJobV2 : alterJobsV2.values()) {
alterJobV2.write(dos);
}
return checksum;
}
public long saveBackupHandler(CountingDataOutputStream dos, long checksum) throws IOException {
getBackupHandler().write(dos);
return checksum;
}
public long saveDeleteHandler(CountingDataOutputStream dos, long checksum) throws IOException {
getDeleteHandler().write(dos);
return checksum;
}
public long saveAuth(CountingDataOutputStream dos, long checksum) throws IOException {
auth.write(dos);
return checksum;
}
public long saveTransactionState(CountingDataOutputStream dos, long checksum) throws IOException {
int size = globalTransactionMgr.getTransactionNum();
checksum ^= size;
dos.writeInt(size);
globalTransactionMgr.write(dos);
return checksum;
}
public long saveRecycleBin(CountingDataOutputStream dos, long checksum) throws IOException {
CatalogRecycleBin recycleBin = Env.getCurrentRecycleBin();
recycleBin.write(dos);
return checksum;
}
public long saveColocateTableIndex(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentColocateIndex().write(dos);
return checksum;
}
public long saveRoutineLoadJobs(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getRoutineLoadManager().write(dos);
return checksum;
}
public long saveGlobalVariable(CountingDataOutputStream dos, long checksum) throws IOException {
VariableMgr.write(dos);
return checksum;
}
public void replayGlobalVariableV2(GlobalVarPersistInfo info) throws IOException, DdlException {
VariableMgr.replayGlobalVariableV2(info, false);
}
public long saveLoadJobsV2(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getLoadManager().write(dos);
return checksum;
}
public long saveResources(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getResourceMgr().write(dos);
return checksum;
}
public long saveWorkloadGroups(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getWorkloadGroupMgr().write(dos);
return checksum;
}
public long saveWorkloadSchedPolicy(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getWorkloadSchedPolicyMgr().write(dos);
return checksum;
}
public long savePlsqlProcedure(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentEnv().getPlsqlManager().write(dos);
return checksum;
}
public long saveSmallFiles(CountingDataOutputStream dos, long checksum) throws IOException {
smallFileMgr.write(dos);
return checksum;
}
public long saveSqlBlockRule(CountingDataOutputStream out, long checksum) throws IOException {
Env.getCurrentEnv().getSqlBlockRuleMgr().write(out);
return checksum;
}
public long savePolicy(CountingDataOutputStream out, long checksum) throws IOException {
Env.getCurrentEnv().getPolicyMgr().write(out);
return checksum;
}
/**
* Save catalog image.
*/
public long saveCatalog(CountingDataOutputStream out, long checksum) throws IOException {
Env.getCurrentEnv().getCatalogMgr().write(out);
return checksum;
}
public long saveGlobalFunction(CountingDataOutputStream out, long checksum) throws IOException {
this.globalFunctionMgr.write(out);
LOG.info("Save global function to image");
return checksum;
}
public long saveBinlogs(CountingDataOutputStream out, long checksum) throws IOException {
if (!Config.enable_feature_binlog) {
return checksum;
}
this.binlogManager.write(out, checksum);
LOG.info("Save binlogs to image");
return checksum;
}
public long saveAnalysisMgr(CountingDataOutputStream dos, long checksum) throws IOException {
analysisManager.write(dos);
return checksum;
}
public void createLabelCleaner() {
labelCleaner = new MasterDaemon("LoadLabelCleaner", Config.label_clean_interval_second * 1000L) {
@Override
protected void runAfterCatalogReady() {
load.removeOldLoadJobs();
loadManager.removeOldLoadJob();
exportMgr.removeOldExportJobs();
deleteHandler.removeOldDeleteInfos();
loadManager.removeOverLimitLoadJob();
}
};
}
public void createTxnCleaner() {
txnCleaner = new MasterDaemon("txnCleaner", Config.transaction_clean_interval_second * 1000L) {
@Override
protected void runAfterCatalogReady() {
globalTransactionMgr.removeExpiredAndTimeoutTxns();
}
};
}
public void createFeDiskUpdater() {
feDiskUpdater = new Daemon("feDiskUpdater", Config.heartbeat_interval_second * 1000L) {
@Override
protected void runOneCycle() {
ExecuteEnv.getInstance().refreshAndGetDiskInfo(true);
}
};
}
public void createReplayer() {
replayer = new Daemon("replayer", REPLAY_INTERVAL_MS) {
// Avoid numerous 'meta out of date' log
private long lastLogMetaOutOfDateTime = System.currentTimeMillis();
@Override
protected void runOneCycle() {
boolean err = false;
boolean hasLog = false;
try {
hasLog = replayJournal(-1);
metaReplayState.setOk();
} catch (InsufficientLogException insufficientLogEx) {
// Copy the missing log files from a member of the
// replication group who owns the files
LOG.error("catch insufficient log exception. please restart.", insufficientLogEx);
NetworkRestore restore = new NetworkRestore();
NetworkRestoreConfig config = new NetworkRestoreConfig();
config.setRetainLogFiles(false);
restore.execute(insufficientLogEx, config);
System.exit(-1);
} catch (Throwable e) {
LOG.error("replayer thread catch an exception when replay journal.", e);
metaReplayState.setException(e);
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
LOG.error("sleep got exception. ", e);
}
err = true;
}
setCanRead(hasLog, err);
}
private void setCanRead(boolean hasLog, boolean err) {
if (err) {
canRead.set(false);
isReady.set(false);
return;
}
if (Config.ignore_meta_check) {
// can still offer read, but is not ready
canRead.set(true);
isReady.set(false);
return;
}
long currentTimeMs = System.currentTimeMillis();
if (currentTimeMs - synchronizedTimeMs > Config.meta_delay_toleration_second * 1000) {
if (currentTimeMs - lastLogMetaOutOfDateTime > 5000L) {
// we still need this log to observe this situation
// but service may be continued when there is no log being replayed.
LOG.warn("meta out of date. currentTime:{}, syncTime:{}, delta:{}ms, hasLog:{}, feType:{}",
currentTimeMs, synchronizedTimeMs, (currentTimeMs - synchronizedTimeMs),
hasLog, feType);
lastLogMetaOutOfDateTime = currentTimeMs;
}
if (hasLog || feType == FrontendNodeType.UNKNOWN) {
// 1. if we read log from BDB, which means master is still alive.
// So we need to set meta out of date.
// 2. if we didn't read any log from BDB and feType is UNKNOWN,
// which means this non-master node is disconnected with master.
// So we need to set meta out of date either.
metaReplayState.setOutOfDate(currentTimeMs, synchronizedTimeMs);
canRead.set(false);
isReady.set(false);
if (editLog != null) {
String reason = editLog.getNotReadyReason();
if (!Strings.isNullOrEmpty(reason)) {
LOG.warn("Not ready reason:{}", reason);
}
}
}
} else {
canRead.set(true);
isReady.set(true);
}
}
};
replayer.setMetaContext(metaContext);
}
public void notifyNewFETypeTransfer(FrontendNodeType newType) {
try {
String msg = "notify new FE type transfer: " + newType;
LOG.warn(msg);
LogUtils.stdout(msg);
this.typeTransferQueue.put(newType);
} catch (InterruptedException e) {
LOG.error("failed to put new FE type: {}", newType, e);
}
}
public void startStateListener() {
listener = new Daemon("stateListener", STATE_CHANGE_CHECK_INTERVAL_MS) {
@Override
protected synchronized void runOneCycle() {
while (true) {
FrontendNodeType newType = null;
try {
newType = typeTransferQueue.take();
} catch (InterruptedException e) {
LOG.error("got exception when take FE type from queue", e);
LogUtils.stdout("got exception when take FE type from queue. " + e.getMessage());
System.exit(-1);
}
Preconditions.checkNotNull(newType);
LOG.info("begin to transfer FE type from {} to {}", feType, newType);
if (feType == newType) {
return;
}
/*
* INIT -> MASTER: transferToMaster
* INIT -> FOLLOWER/OBSERVER: transferToNonMaster
* UNKNOWN -> MASTER: transferToMaster
* UNKNOWN -> FOLLOWER/OBSERVER: transferToNonMaster
* FOLLOWER -> MASTER: transferToMaster
* FOLLOWER/OBSERVER -> INIT/UNKNOWN: set isReady to false
*/
switch (feType) {
case INIT: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
case UNKNOWN:
break;
default:
break;
}
break;
}
case UNKNOWN: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case FOLLOWER:
case OBSERVER: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case FOLLOWER: {
switch (newType) {
case MASTER: {
transferToMaster();
break;
}
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case OBSERVER: {
switch (newType) {
case UNKNOWN: {
transferToNonMaster(newType);
break;
}
default:
break;
}
break;
}
case MASTER: {
// exit if master changed to any other type
String msg = "transfer FE type from MASTER to " + newType.name() + ". exit";
LOG.error(msg);
LogUtils.stdout(msg);
System.exit(-1);
break;
}
default:
break;
} // end switch formerFeType
feType = newType;
LOG.info("finished to transfer FE type to {}", feType);
}
} // end runOneCycle
};
listener.setMetaContext(metaContext);
listener.start();
}
public synchronized boolean replayJournal(long toJournalId) {
long newToJournalId = toJournalId;
if (newToJournalId == -1) {
newToJournalId = getMaxJournalId();
}
if (newToJournalId <= replayedJournalId.get()) {
return false;
}
LOG.info("replayed journal id is {}, replay to journal id is {}", replayedJournalId, newToJournalId);
JournalCursor cursor = editLog.read(replayedJournalId.get() + 1, newToJournalId);
if (cursor == null) {
LOG.warn("failed to get cursor from {} to {}", replayedJournalId.get() + 1, newToJournalId);
return false;
}
long startTime = System.currentTimeMillis();
boolean hasLog = false;
while (true) {
long entityStartTime = System.currentTimeMillis();
Pair<Long, JournalEntity> kv = cursor.next();
if (kv == null) {
break;
}
Long logId = kv.first;
JournalEntity entity = kv.second;
if (entity == null) {
if (logId != null && forceSkipJournalIds.contains(String.valueOf(logId))) {
replayedJournalId.incrementAndGet();
String msg = "journal " + replayedJournalId + " has skipped by config force_skip_journal_id";
LOG.info(msg);
LogUtils.stdout(msg);
if (MetricRepo.isInit) {
// Metric repo may not init after this replay thread start
MetricRepo.COUNTER_EDIT_LOG_READ.increase(1L);
}
continue;
} else {
break;
}
}
hasLog = true;
EditLog.loadJournal(this, logId, entity);
long loadJournalEndTime = System.currentTimeMillis();
replayedJournalId.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("journal {} replayed.", replayedJournalId);
}
if (feType != FrontendNodeType.MASTER) {
journalObservable.notifyObservers(replayedJournalId.get());
}
if (MetricRepo.isInit) {
// Metric repo may not init after this replay thread start
MetricRepo.COUNTER_EDIT_LOG_READ.increase(1L);
}
long entityCost = System.currentTimeMillis() - entityStartTime;
if (entityCost >= 1000) {
long loadJournalCost = loadJournalEndTime - entityStartTime;
LOG.warn("entityCost:{} loadJournalCost:{} logId:{} replayedJournalId:{} code:{} size:{}",
entityCost, loadJournalCost, logId, replayedJournalId, entity.getOpCode(),
entity.getDataSize());
}
}
long cost = System.currentTimeMillis() - startTime;
if (LOG.isDebugEnabled() && cost >= 1000) {
LOG.debug("replay journal cost too much time: {} replayedJournalId: {}", cost, replayedJournalId);
}
return hasLog;
}
public void createTimePrinter() {
// time printer will write timestamp edit log every 10 seconds
timePrinter = new MasterDaemon("timePrinter", 10 * 1000L) {
@Override
protected void runAfterCatalogReady() {
Timestamp stamp = new Timestamp();
editLog.logTimestamp(stamp);
}
};
}
public void addFrontend(FrontendNodeType role, String host, int editLogPort) throws DdlException {
addFrontend(role, host, editLogPort, "");
}
public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName) throws DdlException {
addFrontend(role, host, editLogPort, nodeName, "");
}
public void addFrontend(FrontendNodeType role, String host, int editLogPort, String nodeName, String cloudUniqueId)
throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
try {
if (Strings.isNullOrEmpty(nodeName)) {
nodeName = genFeNodeName(host, editLogPort, false /* new name style */);
}
Frontend fe = checkFeExist(host, editLogPort);
if (fe != null) {
throw new DdlException("frontend already exists " + fe);
}
if (Config.enable_fqdn_mode && StringUtils.isEmpty(host)) {
throw new DdlException("frontend's hostName should not be empty while enable_fqdn_mode is true");
}
if (removedFrontends.contains(nodeName)) {
throw new DdlException("frontend name already exists " + nodeName + ". Try again");
}
BDBHA bdbha = (BDBHA) haProtocol;
bdbha.removeConflictNodeIfExist(host, editLogPort);
int targetFollowerCount = getFollowerCount() + 1;
if (role == FrontendNodeType.FOLLOWER || role == FrontendNodeType.REPLICA) {
helperNodes.add(new HostInfo(host, editLogPort));
bdbha.addUnReadyElectableNode(nodeName, targetFollowerCount);
}
// Only add frontend after removing the conflict nodes, to ensure the exception safety.
fe = new Frontend(role, nodeName, host, editLogPort);
fe.setCloudUniqueId(cloudUniqueId);
frontends.put(nodeName, fe);
LOG.info("add frontend: {}", fe);
editLog.logAddFrontend(fe);
} finally {
unlock();
}
}
public void modifyFrontendHostName(String srcHost, int srcPort, String destHost) throws DdlException {
Frontend fe = checkFeExist(srcHost, srcPort);
if (fe == null) {
throw new DdlException("frontend does not exist, host:" + srcHost);
}
modifyFrontendHost(fe.getNodeName(), destHost);
}
private void modifyFrontendHost(String nodeName, String destHost) throws DdlException {
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
try {
Frontend fe = getFeByName(nodeName);
if (fe == null) {
throw new DdlException("frontend does not exist, nodeName:" + nodeName);
}
LOG.info("modify frontend with new host: {}, exist frontend: {}", fe.getHost(), fe);
boolean needLog = false;
// we use hostname as address of bdbha, so we don't need to update node address when ip changed
if (!Strings.isNullOrEmpty(destHost) && !destHost.equals(fe.getHost())) {
fe.setHost(destHost);
BDBHA bdbha = (BDBHA) haProtocol;
bdbha.updateNodeAddress(fe.getNodeName(), destHost, fe.getEditLogPort());
needLog = true;
}
if (needLog) {
Env.getCurrentEnv().getEditLog().logModifyFrontend(fe);
}
} finally {
unlock();
}
}
public void dropFrontend(FrontendNodeType role, String host, int port) throws DdlException {
dropFrontendFromBDBJE(role, host, port);
}
public void dropFrontendFromBDBJE(FrontendNodeType role, String host, int port) throws DdlException {
if (port == selfNode.getPort() && feType == FrontendNodeType.MASTER
&& selfNode.getHost().equals(host)) {
throw new DdlException("can not drop current master node.");
}
if (!tryLock(false)) {
throw new DdlException("Failed to acquire env lock. Try again");
}
try {
Frontend fe = checkFeExist(host, port);
if (fe == null) {
throw new DdlException("frontend does not exist[" + NetUtils
.getHostPortInAccessibleFormat(host, port) + "]");
}
if (fe.getRole() != role) {
throw new DdlException(role.toString() + " does not exist[" + NetUtils
.getHostPortInAccessibleFormat(host, port) + "]");
}
if (role == FrontendNodeType.FOLLOWER && fe.isAlive()) {
// Try drop an alive follower, check the quorum safety.
ensureSafeToDropAliveFollower();
}
editLog.logRemoveFrontend(fe);
LOG.info("remove frontend: {}", fe);
int targetFollowerCount = getFollowerCount() - 1;
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
haProtocol.removeElectableNode(fe.getNodeName());
removeHelperNode(host, port);
BDBHA ha = (BDBHA) haProtocol;
ha.removeUnReadyElectableNode(fe.getNodeName(), targetFollowerCount);
}
// Only remove frontend after removing the electable node success, to ensure the
// exception safety.
frontends.remove(fe.getNodeName());
removedFrontends.add(fe.getNodeName());
} finally {
unlock();
}
}
private void removeDroppedFrontends(ConcurrentLinkedQueue<String> removedFrontends) {
if (removedFrontends.size() == 0) {
return;
}
if (!Strings.isNullOrEmpty(System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY))) {
// metadata recovery mode
LOG.info("Metadata failure recovery({}), ignore removing dropped frontends",
System.getProperty(FeConstants.METADATA_FAILURE_RECOVERY_KEY));
return;
}
if (haProtocol != null && haProtocol instanceof BDBHA) {
BDBHA bdbha = (BDBHA) haProtocol;
LOG.info("remove frontends, num {} frontends {}", removedFrontends.size(), removedFrontends);
bdbha.removeDroppedMember(removedFrontends);
}
}
private void removeHelperNode(String host, int port) {
// ip may be changed, so we need use both ip and hostname to check.
// use node.getIdent() for simplicity here.
helperNodes.removeIf(node -> node.getHost().equals(host) && node.getPort() == port);
}
private void ensureSafeToDropAliveFollower() throws DdlException {
int numFollower = 0;
int numAliveFollower = 0;
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER) {
numFollower += 1;
if (fe.isAlive()) {
numAliveFollower += 1;
}
}
}
int nextMajority = ((numFollower - 1) / 2) + 1;
if (nextMajority + 1 <= numAliveFollower) {
return;
}
LOG.warn("Drop an alive follower is not safety. Current alive followers {}, followers {}, next majority: {}",
numAliveFollower, numFollower, nextMajority);
throw new DdlException("Unable to drop this alive follower, because the quorum requirements "
+ "are not met after this command execution. Current num alive followers "
+ numAliveFollower + ", num followers " + numFollower + ", majority after execution " + nextMajority);
}
public Frontend checkFeExist(String host, int port) {
for (Frontend fe : frontends.values()) {
if (fe.getEditLogPort() != port) {
continue;
}
if (fe.getHost().equals(host)) {
return fe;
}
}
return null;
}
public Frontend getFeByName(String name) {
for (Frontend fe : frontends.values()) {
if (fe.getNodeName().equals(name)) {
return fe;
}
}
return null;
}
// The interface which DdlExecutor needs.
public void createDb(CreateDbStmt stmt) throws DdlException {
CatalogIf<?> catalogIf;
if (StringUtils.isEmpty(stmt.getCtlName())) {
catalogIf = getCurrentCatalog();
} else {
catalogIf = catalogMgr.getCatalog(stmt.getCtlName());
}
catalogIf.createDb(stmt);
}
// The interface which DdlExecutor needs.
public void createDb(CreateDatabaseCommand command) throws DdlException {
CatalogIf<?> catalogIf;
if (StringUtils.isEmpty(command.getCtlName())) {
catalogIf = getCurrentCatalog();
} else {
catalogIf = catalogMgr.getCatalog(command.getCtlName());
}
catalogIf.createDb(command);
}
// For replay edit log, need't lock metadata
public void unprotectCreateDb(Database db) {
getInternalCatalog().unprotectCreateDb(db);
}
public void replayCreateDb(CreateDbInfo dbInfo) {
if (dbInfo.getInternalDb() != null) {
getInternalCatalog().replayCreateDb(dbInfo.getInternalDb(), "");
} else {
ExternalCatalog externalCatalog = (ExternalCatalog) catalogMgr.getCatalog(dbInfo.getCtlName());
if (externalCatalog != null) {
externalCatalog.replayCreateDb(dbInfo.getDbName());
}
}
}
public void dropDb(DropDbStmt stmt) throws DdlException {
dropDb(stmt.getCtlName(), stmt.getDbName(), stmt.isSetIfExists(), stmt.isForceDrop());
}
public void dropDb(String catalogName, String dbName, boolean ifExists, boolean force) throws DdlException {
CatalogIf<?> catalogIf;
if (StringUtils.isEmpty(catalogName)) {
catalogIf = getCurrentCatalog();
} else {
catalogIf = catalogMgr.getCatalog(catalogName);
}
catalogIf.dropDb(dbName, ifExists, force);
}
public void replayDropDb(DropDbInfo info) throws DdlException {
if (Strings.isNullOrEmpty(info.getCtlName()) || info.getCtlName()
.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
getInternalCatalog().replayDropDb(info.getDbName(), info.isForceDrop(), info.getRecycleTime());
} else {
ExternalCatalog externalCatalog = (ExternalCatalog) catalogMgr.getCatalog(info.getCtlName());
if (externalCatalog != null) {
externalCatalog.replayDropDb(info.getDbName());
}
}
}
public void recoverDatabase(RecoverDbStmt recoverStmt) throws DdlException {
getInternalCatalog().recoverDatabase(recoverStmt);
}
public void recoverDatabase(String dbName, long dbId, String newDbName) throws DdlException {
getInternalCatalog().recoverDatabase(dbName, dbId, newDbName);
}
public void recoverTable(RecoverTableStmt recoverStmt) throws DdlException {
getInternalCatalog().recoverTable(recoverStmt);
}
public void recoverTable(String dbName, String tableName, String newTableName, long tableId) throws DdlException {
getInternalCatalog().recoverTable(dbName, tableName, newTableName, tableId);
}
public void recoverPartition(RecoverPartitionStmt recoverStmt) throws DdlException {
getInternalCatalog().recoverPartition(recoverStmt);
}
public void recoverPartition(String dbName, String tableName, String partitionName,
String newPartitionName, long partitionId) throws DdlException {
getInternalCatalog().recoverPartition(dbName, tableName, partitionName, newPartitionName, partitionId);
}
public void dropCatalogRecycleBin(IdType idType, long id) throws DdlException {
getInternalCatalog().dropCatalogRecycleBin(idType, id);
}
public void replayEraseDatabase(long dbId) throws DdlException {
Env.getCurrentRecycleBin().replayEraseDatabase(dbId);
}
public void replayRecoverDatabase(RecoverInfo info) {
getInternalCatalog().replayRecoverDatabase(info);
}
public void alterDatabaseQuota(AlterDatabaseQuotaStmt stmt) throws DdlException {
getInternalCatalog().alterDatabaseQuota(stmt.getDbName(), stmt.getQuotaType(), stmt.getQuota());
}
public void replayAlterDatabaseQuota(String dbName, long quota, QuotaType quotaType) throws MetaNotFoundException {
getInternalCatalog().replayAlterDatabaseQuota(dbName, quota, quotaType);
}
public void alterDatabaseProperty(AlterDatabasePropertyStmt stmt) throws DdlException {
getInternalCatalog().alterDatabaseProperty(stmt);
}
public void alterDatabaseProperty(String dbName, Map<String, String> properties) throws DdlException {
getInternalCatalog().alterDatabaseProperty(dbName, properties);
}
public void replayAlterDatabaseProperty(String dbName, Map<String, String> properties)
throws MetaNotFoundException {
getInternalCatalog().replayAlterDatabaseProperty(dbName, properties);
}
public void renameDatabase(AlterDatabaseRename stmt) throws DdlException {
getInternalCatalog().renameDatabase(stmt.getDbName(), stmt.getNewDbName());
}
public void replayRenameDatabase(String dbName, String newDbName) {
getInternalCatalog().replayRenameDatabase(dbName, newDbName);
}
/**
* Following is the step to create an olap table:
* 1. create columns
* 2. create partition info
* 3. create distribution info
* 4. set table id and base index id
* 5. set bloom filter columns
* 6. set and build TableProperty includes:
* 6.1. dynamicProperty
* 6.2. replicationNum
* 6.3. inMemory
* 6.4. storageFormat
* 6.5. compressionType
* 7. set index meta
* 8. check colocation properties
* 9. create tablet in BE
* 10. add this table to FE's meta
* 11. add this table to ColocateGroup if necessary
* @return if CreateTableStmt.isIfNotExists is true, return true if table already exists
* otherwise return false
*/
public boolean createTable(CreateTableStmt stmt) throws UserException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(stmt.getCatalogName(),
catalog -> new DdlException(("Unknown catalog " + catalog)));
return catalogIf.createTable(stmt);
}
public void createTableAsSelect(CreateTableAsSelectStmt stmt) throws DdlException {
getInternalCatalog().createTableAsSelect(stmt);
}
/**
* Adds a partition to a table
*
* @param db
* @param tableName
* @param addPartitionClause clause in the CreateTableStmt
* @param isCreateTable this call is for creating table
* @param generatedPartitionId the preset partition id for the partition to add
* @param writeEditLog whether to write an edit log for this addition
* @return PartitionPersistInfo to be written to editlog. It may be null if no partitions added.
* @throws DdlException
*/
public PartitionPersistInfo addPartition(Database db, String tableName, AddPartitionClause addPartitionClause,
boolean isCreateTable, long generatedPartitionId,
boolean writeEditLog) throws DdlException {
return getInternalCatalog().addPartition(db, tableName, addPartitionClause,
isCreateTable, generatedPartitionId, writeEditLog);
}
public void addMultiPartitions(Database db, String tableName, AlterMultiPartitionClause multiPartitionClause)
throws DdlException {
getInternalCatalog().addMultiPartitions(db, tableName, multiPartitionClause);
}
public void addPartitionLike(Database db, String tableName, AddPartitionLikeClause addPartitionLikeClause)
throws DdlException {
getInternalCatalog().addPartitionLike(db, tableName, addPartitionLikeClause);
}
public void replayAddPartition(PartitionPersistInfo info) throws MetaNotFoundException {
getInternalCatalog().replayAddPartition(info);
}
public void dropPartition(Database db, OlapTable olapTable, DropPartitionClause clause) throws DdlException {
getInternalCatalog().dropPartition(db, olapTable, clause);
}
public void replayDropPartition(DropPartitionInfo info) throws MetaNotFoundException {
getInternalCatalog().replayDropPartition(info);
}
public void replayErasePartition(long partitionId) {
getInternalCatalog().replayErasePartition(partitionId);
}
public void replayRecoverPartition(RecoverInfo info) throws MetaNotFoundException, DdlException {
getInternalCatalog().replayRecoverPartition(info);
}
public void replayGcBinlog(BinlogGcInfo binlogGcInfo) {
binlogManager.replayGc(binlogGcInfo);
}
public static void getDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition,
hidePassword, false, specificVersion, false, false);
}
public static void getSyncedDdlStmt(TableIf table, List<String> createTableStmt, List<String> addPartitionStmt,
List<String> createRollupStmt, boolean separatePartition, boolean hidePassword,
long specificVersion) {
getDdlStmt(null, null, table, createTableStmt, addPartitionStmt, createRollupStmt, separatePartition,
hidePassword, false, specificVersion, false, true);
}
public static String getMTMVDdl(MTMV mtmv) throws AnalysisException {
if (!mtmv.tryReadLock(1, TimeUnit.MINUTES)) {
throw new AnalysisException(
"get table read lock timeout, database=" + mtmv.getDBName() + ",table=" + mtmv.getName());
}
try {
StringBuilder sb = new StringBuilder("CREATE MATERIALIZED VIEW ");
sb.append(mtmv.getName());
addMTMVCols(mtmv, sb);
sb.append("\n");
sb.append(mtmv.getRefreshInfo());
addMTMVKeyInfo(mtmv, sb);
addTableComment(mtmv, sb);
addMTMVPartitionInfo(mtmv, sb);
DistributionInfo distributionInfo = mtmv.getDefaultDistributionInfo();
sb.append("\n").append(distributionInfo.toSql());
// properties
sb.append("\nPROPERTIES (\n");
addOlapTablePropertyInfo(mtmv, sb, false, false, null);
addMTMVPropertyInfo(mtmv, sb);
sb.append("\n)");
sb.append("\nAS ");
sb.append(mtmv.getQuerySql());
return sb.toString();
} finally {
mtmv.readUnlock();
}
}
private static void addMTMVKeyInfo(MTMV mtmv, StringBuilder sb) {
if (!mtmv.isDuplicateWithoutKey()) {
String keySql = mtmv.getKeysType().toSql();
sb.append("\n").append(keySql).append("(");
List<String> keysColumnNames = Lists.newArrayList();
for (Column column : mtmv.getBaseSchema()) {
if (column.isKey()) {
keysColumnNames.add("`" + column.getName() + "`");
}
}
sb.append(Joiner.on(", ").join(keysColumnNames)).append(")");
}
}
private static void addMTMVPartitionInfo(MTMV mtmv, StringBuilder sb) {
MTMVPartitionInfo mvPartitionInfo = mtmv.getMvPartitionInfo();
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.SELF_MANAGE) {
return;
}
sb.append("\n").append("PARTITION BY (");
if (mvPartitionInfo.getPartitionType() == MTMVPartitionType.FOLLOW_BASE_TABLE) {
sb.append("`" + mvPartitionInfo.getPartitionCol() + "`");
} else {
sb.append(mvPartitionInfo.getExpr().toSql());
}
sb.append(")");
}
private static void addMTMVCols(MTMV mtmv, StringBuilder sb) {
sb.append("\n(");
List<Column> columns = mtmv.getBaseSchema();
for (int i = 0; i < columns.size(); i++) {
if (i != 0) {
sb.append(",");
}
Column column = columns.get(i);
sb.append(column.getName());
if (!StringUtils.isEmpty(column.getComment())) {
sb.append(" comment '");
sb.append(column.getComment());
sb.append("'");
}
}
sb.append(")");
}
private static void addMTMVPropertyInfo(MTMV mtmv, StringBuilder sb) {
Map<String, String> mvProperties = mtmv.getMvProperties();
for (Entry<String, String> entry : mvProperties.entrySet()) {
sb.append(",\n\"").append(entry.getKey()).append("\" = \"");
sb.append(entry.getValue()).append("\"");
}
}
private static void addOlapTablePropertyInfo(OlapTable olapTable, StringBuilder sb, boolean separatePartition,
boolean getDdlForSync, List<Long> partitionId) {
// replicationNum
ReplicaAllocation replicaAlloc = olapTable.getDefaultReplicaAllocation();
if (Config.isCloudMode()) {
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_FILE_CACHE_TTL_SECONDS).append("\" = \"");
sb.append(olapTable.getTTLSeconds()).append("\"");
} else {
sb.append("\"").append(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION).append("\" = \"");
sb.append(replicaAlloc.toCreateStmt()).append("\"");
// min load replica num
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_MIN_LOAD_REPLICA_NUM).append("\" = \"");
sb.append(olapTable.getMinLoadReplicaNum()).append("\"");
}
// bloom filter
Set<String> bfColumnNames = olapTable.getCopiedBfColumns();
if (bfColumnNames != null) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_BF_COLUMNS).append("\" = \"");
sb.append(Joiner.on(", ").join(olapTable.getCopiedBfColumns())).append("\"");
}
if (separatePartition) {
// version info
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_VERSION_INFO).append("\" = \"");
Partition partition = null;
if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) {
partition = olapTable.getPartition(olapTable.getName());
} else {
Preconditions.checkState(partitionId.size() == 1);
partition = olapTable.getPartition(partitionId.get(0));
}
sb.append(partition.getVisibleVersion()).append("\"");
}
// mark this table is being synced
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_IS_BEING_SYNCED).append("\" = \"");
sb.append(String.valueOf(olapTable.isBeingSynced() || getDdlForSync)).append("\"");
// mark this table if it is a auto bucket table
if (getDdlForSync && olapTable.isAutoBucket()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_AUTO_BUCKET).append("\" = \"");
sb.append("true").append("\"");
}
// colocateTable
String colocateTable = olapTable.getColocateGroup();
if (colocateTable != null) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH).append("\" = \"");
sb.append(colocateTable).append("\"");
}
// dynamic partition
if (olapTable.dynamicPartitionExists()) {
sb.append(olapTable.getTableProperty().getDynamicPartitionProperty().getProperties(replicaAlloc));
}
// only display z-order sort info
if (olapTable.isZOrderSort()) {
sb.append(olapTable.getDataSortInfo().toSql());
}
// in memory
if (olapTable.isInMemory()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INMEMORY).append("\" = \"");
sb.append(olapTable.isInMemory()).append("\"");
}
// storage medium
if (olapTable.getStorageMedium() != null) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM).append("\" = \"");
sb.append(olapTable.getStorageMedium().name().toLowerCase());
sb.append("\"");
}
// storage type
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_FORMAT).append("\" = \"");
sb.append(olapTable.getStorageFormat()).append("\"");
// inverted index storage type
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_INVERTED_INDEX_STORAGE_FORMAT).append("\" = \"");
sb.append(olapTable.getInvertedIndexFileStorageFormat()).append("\"");
// compression type
if (olapTable.getCompressionType() != TCompressionType.LZ4F) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPRESSION).append("\" = \"");
sb.append(olapTable.getCompressionType()).append("\"");
}
// estimate_partition_size
if (!olapTable.getEstimatePartitionSize().equals("")) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ESTIMATE_PARTITION_SIZE).append("\" = \"");
sb.append(olapTable.getEstimatePartitionSize()).append("\"");
}
// unique key table with merge on write, always print this property for unique table
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_MERGE_ON_WRITE).append("\" = \"");
sb.append(olapTable.getEnableUniqueKeyMergeOnWrite()).append("\"");
}
// enable_unique_key_skip_bitmap, always print this property for merge-on-write unique table
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()
&& olapTable.getEnableUniqueKeySkipBitmap()) {
sb.append(",\n\"").append(PropertyAnalyzer.ENABLE_UNIQUE_KEY_SKIP_BITMAP_COLUMN).append("\" = \"");
sb.append(olapTable.getEnableUniqueKeySkipBitmap()).append("\"");
}
// show lightSchemaChange only when it is set true
if (olapTable.getEnableLightSchemaChange()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_LIGHT_SCHEMA_CHANGE).append("\" = \"");
sb.append(olapTable.getEnableLightSchemaChange()).append("\"");
}
// storage policy
if (olapTable.getStoragePolicy() != null && !olapTable.getStoragePolicy().equals("")) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY).append("\" = \"");
sb.append(olapTable.getStoragePolicy()).append("\"");
}
// sequence type
if (olapTable.hasSequenceCol()) {
if (olapTable.getSequenceMapCol() != null) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_COL).append("\" = \"");
sb.append(olapTable.getSequenceMapCol()).append("\"");
} else {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_FUNCTION_COLUMN + "."
+ PropertyAnalyzer.PROPERTIES_SEQUENCE_TYPE).append("\" = \"");
sb.append(olapTable.getSequenceType().toString()).append("\"");
}
}
// store row column
if (olapTable.storeRowColumn()) {
List<String> rsColumnNames = olapTable.getTableProperty().getCopiedRowStoreColumns();
if (rsColumnNames != null && !rsColumnNames.isEmpty()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ROW_STORE_COLUMNS).append("\" = \"");
sb.append(Joiner.on(",").join(rsColumnNames)).append("\"");
} else {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORE_ROW_COLUMN).append("\" = \"");
sb.append(olapTable.storeRowColumn()).append("\"");
}
// row store page size
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ROW_STORE_PAGE_SIZE).append("\" = \"");
sb.append(olapTable.rowStorePageSize()).append("\"");
}
// storage page size
if (olapTable.storagePageSize() != PropertyAnalyzer.STORAGE_PAGE_SIZE_DEFAULT_VALUE) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_STORAGE_PAGE_SIZE).append("\" = \"");
sb.append(olapTable.storagePageSize()).append("\"");
}
// skip inverted index on load
if (olapTable.skipWriteIndexOnLoad()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_SKIP_WRITE_INDEX_ON_LOAD).append("\" = \"");
sb.append(olapTable.skipWriteIndexOnLoad()).append("\"");
}
// compaction policy
if (olapTable.getCompactionPolicy() != null && !olapTable.getCompactionPolicy().equals("")
&& !olapTable.getCompactionPolicy().equals(PropertyAnalyzer.SIZE_BASED_COMPACTION_POLICY)) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_COMPACTION_POLICY).append("\" = \"");
sb.append(olapTable.getCompactionPolicy()).append("\"");
}
// time series compaction goal size
if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy()
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_TIME_SERIES_COMPACTION_GOAL_SIZE_MBYTES).append("\" = \"");
sb.append(olapTable.getTimeSeriesCompactionGoalSizeMbytes()).append("\"");
}
// time series compaction file count threshold
if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy()
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_TIME_SERIES_COMPACTION_FILE_COUNT_THRESHOLD).append("\" = \"");
sb.append(olapTable.getTimeSeriesCompactionFileCountThreshold()).append("\"");
}
// time series compaction time threshold
if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy()
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_TIME_SERIES_COMPACTION_TIME_THRESHOLD_SECONDS).append("\" = \"");
sb.append(olapTable.getTimeSeriesCompactionTimeThresholdSeconds()).append("\"");
}
// time series compaction empty rowsets threshold
if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy()
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_TIME_SERIES_COMPACTION_EMPTY_ROWSETS_THRESHOLD).append("\" = \"");
sb.append(olapTable.getTimeSeriesCompactionEmptyRowsetsThreshold()).append("\"");
}
// time series compaction level threshold
if (olapTable.getCompactionPolicy() != null && olapTable.getCompactionPolicy()
.equals(PropertyAnalyzer.TIME_SERIES_COMPACTION_POLICY)) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_TIME_SERIES_COMPACTION_LEVEL_THRESHOLD).append("\" = \"");
sb.append(olapTable.getTimeSeriesCompactionLevelThreshold()).append("\"");
}
// Storage Vault
if (!Strings.isNullOrEmpty(olapTable.getStorageVaultId())) {
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_ID).append("\" = \"");
sb.append(olapTable.getStorageVaultId()).append("\"");
sb.append(",\n\"").append(PropertyAnalyzer
.PROPERTIES_STORAGE_VAULT_NAME).append("\" = \"");
sb.append(olapTable.getStorageVaultName()).append("\"");
}
// disable auto compaction
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_DISABLE_AUTO_COMPACTION).append("\" = \"");
sb.append(olapTable.disableAutoCompaction()).append("\"");
if (olapTable.variantEnableFlattenNested()) {
// enable flatten nested type in variant
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_VARIANT_ENABLE_FLATTEN_NESTED).append("\" = \"");
sb.append(olapTable.variantEnableFlattenNested()).append("\"");
}
// binlog
if (Config.enable_feature_binlog) {
BinlogConfig binlogConfig = olapTable.getBinlogConfig();
binlogConfig.appendToShowCreateTable(sb);
}
// enable single replica compaction
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_SINGLE_REPLICA_COMPACTION).append("\" = \"");
sb.append(olapTable.enableSingleReplicaCompaction()).append("\"");
// group commit interval ms
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_INTERVAL_MS).append("\" = \"");
sb.append(olapTable.getGroupCommitIntervalMs()).append("\"");
// group commit data bytes
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_GROUP_COMMIT_DATA_BYTES).append("\" = \"");
sb.append(olapTable.getGroupCommitDataBytes()).append("\"");
// enable delete on delete predicate
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS && olapTable.getEnableUniqueKeyMergeOnWrite()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_ENABLE_MOW_LIGHT_DELETE)
.append("\" = \"");
sb.append(olapTable.getEnableMowLightDelete()).append("\"");
}
// enable duplicate without keys by default
if (olapTable.isDuplicateWithoutKey()) {
sb.append(",\n\"")
.append(PropertyAnalyzer.PROPERTIES_ENABLE_DUPLICATE_WITHOUT_KEYS_BY_DEFAULT)
.append("\" = \"");
sb.append(olapTable.isDuplicateWithoutKey()).append("\"");
}
if (olapTable.isInAtomicRestore()) {
sb.append(",\n\"").append(PropertyAnalyzer.PROPERTIES_IN_ATOMIC_RESTORE).append("\" = \"true\"");
}
}
/**
* Get table ddl stmt.
*
* @param getDdlForLike Get schema for 'create table like' or not. when true, without hidden columns.
*/
public static void getDdlStmt(DdlStmt ddlStmt, String dbName, TableIf table, List<String> createTableStmt,
List<String> addPartitionStmt, List<String> createRollupStmt,
boolean separatePartition,
boolean hidePassword, boolean getDdlForLike, long specificVersion,
boolean getBriefDdl, boolean getDdlForSync) {
StringBuilder sb = new StringBuilder();
// 1. create table
// 1.1 view
if (table.getType() == TableType.VIEW) {
View view = (View) table;
sb.append("CREATE VIEW `").append(table.getName()).append("`");
if (StringUtils.isNotBlank(table.getComment())) {
sb.append(" COMMENT '").append(table.getComment()).append("'");
}
sb.append(" AS ").append(view.getInlineViewDef());
createTableStmt.add(sb + ";");
return;
}
// 1.2 other table type
sb.append("CREATE ");
if (table.getType() == TableType.ODBC || table.getType() == TableType.MYSQL
|| table.getType() == TableType.ELASTICSEARCH || table.getType() == TableType.BROKER
|| table.getType() == TableType.HIVE || table.getType() == TableType.JDBC) {
sb.append("EXTERNAL ");
}
// create table like a temporary table or create temporary table like a table
if (ddlStmt instanceof CreateTableLikeStmt) {
if (((CreateTableLikeStmt) ddlStmt).isTemp()) {
sb.append("TEMPORARY ");
}
} else if (table.isTemporary()) {
// used for show create table
sb.append("TEMPORARY ");
}
sb.append(table.getType() != TableType.MATERIALIZED_VIEW ? "TABLE " : "MATERIALIZED VIEW ");
if (!Strings.isNullOrEmpty(dbName)) {
sb.append("`").append(dbName).append("`.");
}
if (table.isTemporary()) {
sb.append("`").append(Util.getTempTableDisplayName(table.getName())).append("`");
} else {
sb.append("`").append(table.getName()).append("`");
}
sb.append(" (\n");
int idx = 0;
List<Column> columns = table.getBaseSchema(false);
for (Column column : columns) {
if (idx++ != 0) {
sb.append(",\n");
}
// There MUST BE 2 space in front of each column description line
// sqlalchemy requires this to parse SHOW CREATE TABLE stmt.
if (table.isManagedTable()) {
sb.append(" ").append(
column.toSql(((OlapTable) table).getKeysType() == KeysType.UNIQUE_KEYS, true));
} else {
sb.append(" ").append(column.toSql());
}
}
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
if (CollectionUtils.isNotEmpty(olapTable.getIndexes())) {
for (Index index : olapTable.getIndexes()) {
sb.append(",\n");
sb.append(" ").append(index.toSql());
}
}
}
sb.append("\n) ENGINE=");
sb.append(table.getType().name());
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
// keys
String keySql = olapTable.getKeysType().toSql();
if (olapTable.isDuplicateWithoutKey()) {
// after #18621, use can create a DUP_KEYS olap table without key columns
// and get a ddl schema without key type and key columns
} else {
sb.append("\n").append(keySql).append("(");
List<String> keysColumnNames = Lists.newArrayList();
Map<Integer, String> clusterKeysColumnNamesToId = new TreeMap<>();
for (Column column : olapTable.getBaseSchema()) {
if (column.isKey()) {
keysColumnNames.add("`" + column.getName() + "`");
}
if (column.isClusterKey()) {
clusterKeysColumnNamesToId.put(column.getClusterKeyId(), column.getName());
}
}
sb.append(Joiner.on(", ").join(keysColumnNames)).append(")");
// show cluster keys
if (!clusterKeysColumnNamesToId.isEmpty()) {
sb.append("\n").append("CLUSTER BY (`");
sb.append(Joiner.on("`, `").join(clusterKeysColumnNamesToId.values())).append("`)");
}
}
if (specificVersion != -1) {
// for copy tablet operation
sb.append("\nDISTRIBUTED BY HASH(").append(olapTable.getBaseSchema().get(0).getName())
.append(") BUCKETS 1");
sb.append("\nPROPERTIES (\n" + "\"replication_num\" = \"1\",\n" + "\"version_info\" = \""
+ specificVersion + "\"\n" + ")");
createTableStmt.add(sb + ";");
return;
}
addTableComment(olapTable, sb);
// partition
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
List<Long> partitionId = null;
if (separatePartition) {
partitionId = Lists.newArrayList();
}
if (!getBriefDdl && (partitionInfo.getType() == PartitionType.RANGE
|| partitionInfo.getType() == PartitionType.LIST)) {
sb.append("\n").append(partitionInfo.toSql(olapTable, partitionId));
}
// distribution
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
sb.append("\n").append(distributionInfo.toSql());
// rollup index
if (ddlStmt instanceof CreateTableLikeStmt) {
CreateTableLikeStmt stmt = (CreateTableLikeStmt) ddlStmt;
ArrayList<String> rollupNames = stmt.getRollupNames();
boolean withAllRollup = stmt.isWithAllRollup();
List<Long> addIndexIdList = Lists.newArrayList();
if (!CollectionUtils.isEmpty(rollupNames)) {
for (String rollupName : rollupNames) {
addIndexIdList.add(olapTable.getIndexIdByName(rollupName));
}
} else if (withAllRollup) {
addIndexIdList = olapTable.getIndexIdListExceptBaseIndex();
}
if (!addIndexIdList.isEmpty()) {
sb.append("\n").append("rollup (");
}
int size = addIndexIdList.size();
int index = 1;
for (long indexId : addIndexIdList) {
String indexName = olapTable.getIndexNameById(indexId);
sb.append("\n").append(indexName).append("(");
List<Column> indexSchema = olapTable.getSchemaByIndexId(indexId, false);
for (int i = 0; i < indexSchema.size(); i++) {
Column column = indexSchema.get(i);
sb.append(column.getName());
if (i != indexSchema.size() - 1) {
sb.append(", ");
}
}
if (index != size) {
sb.append("),");
} else {
sb.append(")");
sb.append("\n)");
}
index++;
}
}
// with all rollup
do {
if (!getDdlForSync) {
break;
}
List<Long> indexIds = new ArrayList<>(olapTable.getIndexIdToMeta().keySet());
if (indexIds.size() == 1 && indexIds.get(0) == olapTable.getBaseIndexId()) {
break;
}
indexIds = indexIds.stream().filter(item -> item != olapTable.getBaseIndexId())
.collect(Collectors.toList());
sb.append("\nROLLUP (\n");
for (int i = 0; i < indexIds.size(); i++) {
Long indexId = indexIds.get(i);
MaterializedIndexMeta materializedIndexMeta = olapTable.getIndexIdToMeta().get(indexId);
String indexName = olapTable.getIndexNameById(indexId);
sb.append(indexName).append(" (");
List<Column> indexSchema = materializedIndexMeta.getSchema();
for (int j = 0; j < indexSchema.size(); j++) {
Column column = indexSchema.get(j);
sb.append(column.getName());
if (j != indexSchema.size() - 1) {
sb.append(", ");
}
}
sb.append(")");
if (i != indexIds.size() - 1) {
sb.append(",\n");
}
}
sb.append("\n)");
} while (false);
// properties
sb.append("\nPROPERTIES (\n");
addOlapTablePropertyInfo(olapTable, sb, separatePartition, getDdlForSync, partitionId);
sb.append("\n)");
} else if (table.getType() == TableType.MYSQL) {
MysqlTable mysqlTable = (MysqlTable) table;
addTableComment(mysqlTable, sb);
// properties
sb.append("\nPROPERTIES (\n");
if (mysqlTable.getOdbcCatalogResourceName() == null) {
sb.append("\"host\" = \"").append(mysqlTable.getHost()).append("\",\n");
sb.append("\"port\" = \"").append(mysqlTable.getPort()).append("\",\n");
sb.append("\"user\" = \"").append(mysqlTable.getUserName()).append("\",\n");
sb.append("\"password\" = \"").append(hidePassword ? "" : mysqlTable.getPasswd()).append("\",\n");
sb.append("\"charset\" = \"").append(mysqlTable.getCharset()).append("\",\n");
} else {
sb.append("\"odbc_catalog_resource\" = \"").append(mysqlTable.getOdbcCatalogResourceName())
.append("\",\n");
}
sb.append("\"database\" = \"").append(mysqlTable.getMysqlDatabaseName()).append("\",\n");
sb.append("\"table\" = \"").append(mysqlTable.getMysqlTableName()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.ODBC) {
OdbcTable odbcTable = (OdbcTable) table;
addTableComment(odbcTable, sb);
// properties
sb.append("\nPROPERTIES (\n");
if (odbcTable.getOdbcCatalogResourceName() == null) {
sb.append("\"host\" = \"").append(odbcTable.getHost()).append("\",\n");
sb.append("\"port\" = \"").append(odbcTable.getPort()).append("\",\n");
sb.append("\"user\" = \"").append(odbcTable.getUserName()).append("\",\n");
sb.append("\"password\" = \"").append(hidePassword ? "" : odbcTable.getPasswd()).append("\",\n");
sb.append("\"driver\" = \"").append(odbcTable.getOdbcDriver()).append("\",\n");
sb.append("\"odbc_type\" = \"").append(odbcTable.getOdbcTableTypeName()).append("\",\n");
sb.append("\"charest\" = \"").append(odbcTable.getCharset()).append("\",\n");
} else {
sb.append("\"odbc_catalog_resource\" = \"").append(odbcTable.getOdbcCatalogResourceName())
.append("\",\n");
}
sb.append("\"database\" = \"").append(odbcTable.getOdbcDatabaseName()).append("\",\n");
sb.append("\"table\" = \"").append(odbcTable.getOdbcTableName()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.BROKER) {
BrokerTable brokerTable = (BrokerTable) table;
addTableComment(brokerTable, sb);
// properties
sb.append("\nPROPERTIES (\n");
sb.append("\"broker_name\" = \"").append(brokerTable.getBrokerName()).append("\",\n");
sb.append("\"path\" = \"").append(Joiner.on(",").join(brokerTable.getEncodedPaths())).append("\",\n");
sb.append("\"column_separator\" = \"").append(brokerTable.getReadableColumnSeparator()).append("\",\n");
sb.append("\"line_delimiter\" = \"").append(brokerTable.getReadableLineDelimiter()).append("\",\n");
sb.append(")");
if (!brokerTable.getBrokerProperties().isEmpty()) {
sb.append("\nBROKER PROPERTIES (\n");
sb.append(new PrintableMap<>(brokerTable.getBrokerProperties(), " = ", true, true,
hidePassword).toString());
sb.append("\n)");
}
} else if (table.getType() == TableType.ELASTICSEARCH) {
EsTable esTable = (EsTable) table;
addTableComment(esTable, sb);
// partition
PartitionInfo partitionInfo = esTable.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE) {
sb.append("\n");
sb.append("PARTITION BY RANGE(");
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) partitionInfo;
for (Column column : rangePartitionInfo.getPartitionColumns()) {
sb.append("`").append(column.getName()).append("`");
}
sb.append(")\n()");
}
// properties
sb.append("\nPROPERTIES (\n");
sb.append("\"hosts\" = \"").append(esTable.getHosts()).append("\",\n");
sb.append("\"user\" = \"").append(esTable.getUserName()).append("\",\n");
sb.append("\"password\" = \"").append(hidePassword ? "" : esTable.getPasswd()).append("\",\n");
sb.append("\"index\" = \"").append(esTable.getIndexName()).append("\",\n");
if (esTable.getMappingType() != null) {
sb.append("\"type\" = \"").append(esTable.getMappingType()).append("\",\n");
}
sb.append("\"enable_docvalue_scan\" = \"").append(esTable.isEnableDocValueScan()).append("\",\n");
sb.append("\"max_docvalue_fields\" = \"").append(esTable.getMaxDocValueFields()).append("\",\n");
sb.append("\"enable_keyword_sniff\" = \"").append(esTable.isEnableKeywordSniff()).append("\",\n");
sb.append("\"nodes_discovery\" = \"").append(esTable.isNodesDiscovery()).append("\",\n");
sb.append("\"http_ssl_enabled\" = \"").append(esTable.isHttpSslEnabled()).append("\",\n");
sb.append("\"like_push_down\" = \"").append(esTable.isLikePushDown()).append("\"\n");
sb.append(")");
} else if (table.getType() == TableType.HIVE) {
HiveTable hiveTable = (HiveTable) table;
addTableComment(hiveTable, sb);
// properties
sb.append("\nPROPERTIES (\n");
sb.append("\"database\" = \"").append(hiveTable.getHiveDb()).append("\",\n");
sb.append("\"table\" = \"").append(hiveTable.getHiveTable()).append("\",\n");
sb.append(new PrintableMap<>(hiveTable.getHiveProperties(), " = ", true, true, hidePassword).toString());
sb.append("\n)");
} else if (table.getType() == TableType.JDBC) {
JdbcTable jdbcTable = (JdbcTable) table;
addTableComment(jdbcTable, sb);
sb.append("\nPROPERTIES (\n");
sb.append("\"resource\" = \"").append(jdbcTable.getResourceName()).append("\",\n");
sb.append("\"table\" = \"").append(jdbcTable.getJdbcTable()).append("\",\n");
sb.append("\"table_type\" = \"").append(jdbcTable.getJdbcTypeName()).append("\"");
sb.append("\n)");
} else if (table.getType() == TableType.ICEBERG_EXTERNAL_TABLE) {
addTableComment(table, sb);
org.apache.iceberg.Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
sb.append("\nLOCATION '").append(icebergTable.location()).append("'");
sb.append("\nPROPERTIES (");
Iterator<Entry<String, String>> iterator = icebergTable.properties().entrySet().iterator();
while (iterator.hasNext()) {
Entry<String, String> prop = iterator.next();
sb.append("\n \"").append(prop.getKey()).append("\" = \"").append(prop.getValue()).append("\"");
if (iterator.hasNext()) {
sb.append(",");
}
}
sb.append("\n)");
}
createTableStmt.add(sb + ";");
// 2. add partition
if (separatePartition && (table instanceof OlapTable) && ((OlapTable) table).getPartitions().size() > 1) {
if (((OlapTable) table).getPartitionInfo().getType() == PartitionType.RANGE
|| ((OlapTable) table).getPartitionInfo().getType() == PartitionType.LIST) {
OlapTable olapTable = (OlapTable) table;
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
boolean first = true;
for (Map.Entry<Long, PartitionItem> entry : partitionInfo.getPartitionItemEntryList(false, true)) {
if (first) {
first = false;
continue;
}
sb = new StringBuilder();
Partition partition = olapTable.getPartition(entry.getKey());
sb.append("ALTER TABLE ").append(table.getName());
sb.append(" ADD PARTITION ").append(partition.getName()).append(" VALUES ");
if (partitionInfo.getType() == PartitionType.RANGE) {
sb.append("[");
sb.append(((RangePartitionItem) entry.getValue()).getItems().lowerEndpoint().toSql());
sb.append(", ");
sb.append(((RangePartitionItem) entry.getValue()).getItems().upperEndpoint().toSql());
sb.append(")");
} else if (partitionInfo.getType() == PartitionType.LIST) {
sb.append("IN (");
sb.append(((ListPartitionItem) entry.getValue()).toSql());
sb.append(")");
}
sb.append("(\"version_info\" = \"");
sb.append(partition.getVisibleVersion()).append("\"");
sb.append(");");
addPartitionStmt.add(sb + ";");
}
}
}
// 3. rollup
if (createRollupStmt != null && (table instanceof OlapTable)) {
OlapTable olapTable = (OlapTable) table;
for (Map.Entry<Long, MaterializedIndexMeta> entry : olapTable.getIndexIdToMeta().entrySet()) {
if (entry.getKey() == olapTable.getBaseIndexId()) {
continue;
}
MaterializedIndexMeta materializedIndexMeta = entry.getValue();
sb = new StringBuilder();
String indexName = olapTable.getIndexNameById(entry.getKey());
sb.append("ALTER TABLE ").append(table.getName()).append(" ADD ROLLUP ").append(indexName);
sb.append("(");
List<Column> indexSchema = materializedIndexMeta.getSchema();
for (int i = 0; i < indexSchema.size(); i++) {
Column column = indexSchema.get(i);
sb.append(column.getName());
if (i != indexSchema.size() - 1) {
sb.append(", ");
}
}
sb.append(");");
createRollupStmt.add(sb + ";");
}
}
}
public void replayCreateTable(CreateTableInfo info) throws MetaNotFoundException {
if (Strings.isNullOrEmpty(info.getCtlName()) || info.getCtlName()
.equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
getInternalCatalog().replayCreateTable(info.getDbName(), info.getTable());
} else {
ExternalCatalog externalCatalog = (ExternalCatalog) catalogMgr.getCatalog(info.getCtlName());
if (externalCatalog != null) {
externalCatalog.replayCreateTable(info.getDbName(), info.getTblName());
}
}
}
public void replayAlterExternalTableSchema(String dbName, String tableName, List<Column> newSchema)
throws MetaNotFoundException {
getInternalCatalog().replayAlterExternalTableSchema(dbName, tableName, newSchema);
}
// Drop table
public void dropTable(DropTableStmt stmt) throws DdlException {
if (stmt == null) {
throw new DdlException("DropTableStmt is null");
}
dropTable(stmt.getCatalogName(), stmt.getDbName(), stmt.getTableName(), stmt.isView(),
stmt.isMaterializedView(), stmt.isSetIfExists(), stmt.isForceDrop());
}
public void dropTable(String catalogName, String dbName, String tableName, boolean isView, boolean isMtmv,
boolean ifExists, boolean force) throws DdlException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(catalogName,
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.dropTable(dbName, tableName, isView, isMtmv, ifExists, force);
}
public void dropView(String catalogName, String dbName, String tableName, boolean ifExists) throws DdlException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(catalogName,
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.dropTable(dbName, tableName, true, false, ifExists, false);
}
public boolean unprotectDropTable(Database db, Table table, boolean isForceDrop, boolean isReplay,
Long recycleTime) {
return getInternalCatalog().unprotectDropTable(db, table, isForceDrop, isReplay, recycleTime);
}
public void replayDropTable(Database db, long tableId, boolean isForceDrop,
Long recycleTime) throws MetaNotFoundException {
getInternalCatalog().replayDropTable(db, tableId, isForceDrop, recycleTime);
}
public void replayEraseTable(long tableId) {
getInternalCatalog().replayEraseTable(tableId);
}
public void replayRecoverTable(RecoverInfo info) throws MetaNotFoundException, DdlException {
getInternalCatalog().replayRecoverTable(info);
}
public void replayAddReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
getInternalCatalog().replayAddReplica(info);
}
public void replayUpdateReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
getInternalCatalog().replayUpdateReplica(info);
}
public void unprotectDeleteReplica(OlapTable olapTable, ReplicaPersistInfo info) {
getInternalCatalog().unprotectDeleteReplica(olapTable, info);
}
public void replayDeleteReplica(ReplicaPersistInfo info) throws MetaNotFoundException {
getInternalCatalog().replayDeleteReplica(info);
}
public void replayAddFrontend(Frontend fe) {
tryLock(true);
try {
Frontend existFe = checkFeExist(fe.getHost(), fe.getEditLogPort());
if (existFe != null) {
LOG.warn("fe {} already exist.", existFe);
if (existFe.getRole() != fe.getRole()) {
/*
* This may happen if:
* 1. first, add a FE as OBSERVER.
* 2. This OBSERVER is restarted with ROLE and VERSION file being DELETED.
* In this case, this OBSERVER will be started as a FOLLOWER, and add itself to the frontends.
* 3. this "FOLLOWER" begin to load image or replay journal,
* then find the origin OBSERVER in image or journal.
* This will cause UNDEFINED behavior, so it is better to exit and fix it manually.
*/
System.err.println("Try to add an already exist FE with different role" + fe.getRole());
System.exit(-1);
}
return;
}
LOG.info("replay add frontend: {}", fe);
frontends.put(fe.getNodeName(), fe);
if (fe.getRole() == FrontendNodeType.FOLLOWER || fe.getRole() == FrontendNodeType.REPLICA) {
// DO NOT add helper sockets here, cause BDBHA is not instantiated yet.
// helper sockets will be added after start BDBHA
// But add to helperNodes, just for show
helperNodes.add(new HostInfo(fe.getHost(), fe.getEditLogPort()));
}
} finally {
unlock();
}
}
public void replayModifyFrontend(Frontend fe) {
tryLock(true);
try {
Frontend existFe = getFeByName(fe.getNodeName());
if (existFe == null) {
// frontend may already be dropped. this may happen when
// drop and modify operations do not guarantee the order.
return;
}
// modify fe in frontends
LOG.info("replay modify frontend with new host: {}, exist frontend: {}", fe.getHost(), existFe);
existFe.setHost(fe.getHost());
} finally {
unlock();
}
}
public void replayDropFrontend(Frontend frontend) {
tryLock(true);
try {
Frontend removedFe = frontends.remove(frontend.getNodeName());
if (removedFe == null) {
LOG.error(frontend + " does not exist.");
return;
}
LOG.info("replay drop frontend: {}", removedFe);
if (removedFe.getRole() == FrontendNodeType.FOLLOWER || removedFe.getRole() == FrontendNodeType.REPLICA) {
removeHelperNode(removedFe.getHost(), removedFe.getEditLogPort());
}
removedFrontends.add(removedFe.getNodeName());
} finally {
unlock();
}
}
public int getClusterId() {
return this.clusterId;
}
public String getDeployMode() {
return Config.isCloudMode() ? Storage.CLOUD_MODE : Storage.LOCAL_MODE;
}
public String getToken() {
return token;
}
public EditLog getEditLog() {
return editLog;
}
// Get the next available, needn't lock because of nextId is atomic.
public long getNextId() {
return idGenerator.getNextId();
}
// counter for prepared statement id
public long getNextStmtId() {
return this.stmtIdCounter.getAndIncrement();
}
public IdGeneratorBuffer getIdGeneratorBuffer(long bufferSize) {
return idGenerator.getIdGeneratorBuffer(bufferSize);
}
public HashMap<Long, TStorageMedium> getPartitionIdToStorageMediumMap() {
HashMap<Long, TStorageMedium> storageMediumMap = new HashMap<Long, TStorageMedium>();
// record partition which need to change storage medium
// dbId -> (tableId -> partitionId)
HashMap<Long, Multimap<Long, Long>> changedPartitionsMap = new HashMap<Long, Multimap<Long, Long>>();
long currentTimeMs = System.currentTimeMillis();
List<Long> dbIds = getInternalCatalog().getDbIds();
for (long dbId : dbIds) {
Database db = getInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db {} does not exist while doing backend report", dbId);
continue;
}
List<Table> tableList = db.getTables();
for (Table table : tableList) {
if (!table.isManagedTable()) {
continue;
}
long tableId = table.getId();
OlapTable olapTable = (OlapTable) table;
olapTable.readLock();
try {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
for (Partition partition : olapTable.getAllPartitions()) {
long partitionId = partition.getId();
DataProperty dataProperty = partitionInfo.getDataProperty(partition.getId());
Preconditions.checkNotNull(dataProperty,
partition.getName() + ", pId:" + partitionId + ", db: " + dbId + ", tbl: " + tableId);
if (dataProperty.getStorageMedium() == TStorageMedium.SSD
&& dataProperty.getCooldownTimeMs() < currentTimeMs) {
// expire. change to HDD.
// record and change when holding write lock
Multimap<Long, Long> multimap = changedPartitionsMap.get(dbId);
if (multimap == null) {
multimap = HashMultimap.create();
changedPartitionsMap.put(dbId, multimap);
}
multimap.put(tableId, partitionId);
} else {
storageMediumMap.put(partitionId, dataProperty.getStorageMedium());
}
} // end for partitions
} finally {
olapTable.readUnlock();
}
} // end for tables
} // end for dbs
// handle data property changed
for (Long dbId : changedPartitionsMap.keySet()) {
Database db = getInternalCatalog().getDbNullable(dbId);
if (db == null) {
LOG.warn("db {} does not exist while checking backend storage medium", dbId);
continue;
}
Multimap<Long, Long> tableIdToPartitionIds = changedPartitionsMap.get(dbId);
for (Long tableId : tableIdToPartitionIds.keySet()) {
TableIf table = db.getTableNullable(tableId);
if (table == null) {
continue;
}
OlapTable olapTable = (OlapTable) table;
// use try lock to avoid blocking a long time.
// if block too long, backend report rpc will timeout.
if (!olapTable.tryWriteLockIfExist(Table.TRY_LOCK_TIMEOUT_MS, TimeUnit.MILLISECONDS)) {
LOG.warn("try get table {} writelock but failed" + " when checking backend storage medium",
table.getName());
continue;
}
Preconditions.checkState(olapTable.isWriteLockHeldByCurrentThread());
try {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
Collection<Long> partitionIds = tableIdToPartitionIds.get(tableId);
for (Long partitionId : partitionIds) {
Partition partition = olapTable.getPartition(partitionId);
if (partition == null) {
continue;
}
DataProperty dataProperty = partitionInfo.getDataProperty(partition.getId());
if (dataProperty.getStorageMedium() == TStorageMedium.SSD
&& dataProperty.getCooldownTimeMs() < currentTimeMs) {
// expire. change to HDD.
DataProperty hddProperty = new DataProperty(TStorageMedium.HDD);
partitionInfo.setDataProperty(partition.getId(), hddProperty);
storageMediumMap.put(partitionId, TStorageMedium.HDD);
LOG.info("partition[{}-{}-{}] storage medium changed from SSD to HDD. "
+ "cooldown time: {}. current time: {}", dbId, tableId, partitionId,
TimeUtils.longToTimeString(dataProperty.getCooldownTimeMs()),
TimeUtils.longToTimeString(currentTimeMs));
// log
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), olapTable.getId(),
partition.getId(), hddProperty, ReplicaAllocation.NOT_SET,
partitionInfo.getIsInMemory(partition.getId()),
partitionInfo.getStoragePolicy(partitionId), Maps.newHashMap());
editLog.logModifyPartition(info);
}
} // end for partitions
} finally {
olapTable.writeUnlock();
}
} // end for tables
} // end for dbs
return storageMediumMap;
}
public ConsistencyChecker getConsistencyChecker() {
return this.consistencyChecker;
}
public Alter getAlterInstance() {
return this.alter;
}
public SchemaChangeHandler getSchemaChangeHandler() {
return (SchemaChangeHandler) this.alter.getSchemaChangeHandler();
}
public MaterializedViewHandler getMaterializedViewHandler() {
return (MaterializedViewHandler) this.alter.getMaterializedViewHandler();
}
public CooldownConfHandler getCooldownConfHandler() {
return cooldownConfHandler;
}
public SystemHandler getSystemHandler() {
return (SystemHandler) this.alter.getSystemHandler();
}
public BackupHandler getBackupHandler() {
return this.backupHandler;
}
public DeleteHandler getDeleteHandler() {
return this.deleteHandler;
}
public Load getLoadInstance() {
return this.load;
}
public LoadManager getLoadManager() {
return loadManager;
}
public TokenManager getTokenManager() {
return tokenManager;
}
public ProgressManager getProgressManager() {
return progressManager;
}
public static ProgressManager getCurrentProgressManager() {
return getCurrentEnv().getProgressManager();
}
public StreamLoadRecordMgr getStreamLoadRecordMgr() {
return streamLoadRecordMgr;
}
public TabletLoadIndexRecorderMgr getTabletLoadIndexRecorderMgr() {
return tabletLoadIndexRecorderMgr;
}
public MasterTaskExecutor getPendingLoadTaskScheduler() {
return pendingLoadTaskScheduler;
}
public MasterTaskExecutor getLoadingLoadTaskScheduler() {
return loadingLoadTaskScheduler;
}
public RoutineLoadManager getRoutineLoadManager() {
return routineLoadManager;
}
public GroupCommitManager getGroupCommitManager() {
return groupCommitManager;
}
public SqlBlockRuleMgr getSqlBlockRuleMgr() {
return sqlBlockRuleMgr;
}
public RoutineLoadTaskScheduler getRoutineLoadTaskScheduler() {
return routineLoadTaskScheduler;
}
public ExportMgr getExportMgr() {
return this.exportMgr;
}
public SyncJobManager getSyncJobManager() {
return this.syncJobManager;
}
public JobManager getJobManager() {
return jobManager;
}
public LabelProcessor getLabelProcessor() {
return labelProcessor;
}
public TransientTaskManager getTransientTaskManager() {
return transientTaskManager;
}
public SmallFileMgr getSmallFileMgr() {
return this.smallFileMgr;
}
public RefreshManager getRefreshManager() {
return this.refreshManager;
}
public DictionaryManager getDictionaryManager() {
return this.dictionaryManager;
}
public long getReplayedJournalId() {
return this.replayedJournalId.get();
}
public HAProtocol getHaProtocol() {
return this.haProtocol;
}
public Long getMaxJournalId() {
return this.editLog.getMaxJournalId();
}
public long getEpoch() {
return this.epoch;
}
public void setEpoch(long epoch) {
this.epoch = epoch;
}
public FrontendNodeType getRole() {
return this.role;
}
public HostInfo getHelperNode() {
Preconditions.checkState(helperNodes.size() >= 1);
return this.helperNodes.get(0);
}
public List<HostInfo> getHelperNodes() {
return Lists.newArrayList(helperNodes);
}
public HostInfo getSelfNode() {
return this.selfNode;
}
public String getNodeName() {
return this.nodeName;
}
public FrontendNodeType getFeType() {
return this.feType;
}
public int getMasterRpcPort() {
if (!isReady()) {
return 0;
}
return this.masterInfo.getRpcPort();
}
public int getMasterHttpPort() {
if (!isReady()) {
return 0;
}
return this.masterInfo.getHttpPort();
}
public String getMasterHost() {
if (!isReady()) {
return "";
}
return this.masterInfo.getHost();
}
public EsRepository getEsRepository() {
return getInternalCatalog().getEsRepository();
}
public PolicyMgr getPolicyMgr() {
return this.policyMgr;
}
public void setMaster(MasterInfo info) {
this.masterInfo = info;
LOG.info("setMaster MasterInfo:{}", info);
}
public boolean canRead() {
return this.canRead.get();
}
public boolean isElectable() {
return this.isElectable;
}
public boolean isMaster() {
return feType == FrontendNodeType.MASTER;
}
public void setSynchronizedTime(long time) {
this.synchronizedTimeMs = time;
}
public void setEditLog(EditLog editLog) {
this.editLog = editLog;
}
public void setNextId(long id) {
idGenerator.setId(id);
}
public void setHaProtocol(HAProtocol protocol) {
this.haProtocol = protocol;
}
public static short calcShortKeyColumnCount(List<Column> columns, Map<String, String> properties,
boolean isKeysRequired) throws DdlException {
List<Column> indexColumns = new ArrayList<Column>();
Map<Integer, Column> clusterColumns = new TreeMap<>();
boolean hasValueColumn = false;
for (Column column : columns) {
if (column.isKey()) {
if (hasValueColumn && isKeysRequired) {
throw new DdlException("The materialized view not support value column before key column");
}
indexColumns.add(column);
} else {
hasValueColumn = true;
}
if (column.isClusterKey()) {
clusterColumns.put(column.getClusterKeyId(), column);
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("index column size: {}, cluster column size: {}", indexColumns.size(), clusterColumns.size());
}
if (isKeysRequired && indexColumns.isEmpty()) {
throw new DdlException("The materialized view need key column");
}
// sort by cluster keys for mow if set, otherwise by index columns
List<Column> sortKeyColumns = clusterColumns.isEmpty() ? indexColumns
: clusterColumns.values().stream().collect(Collectors.toList());
// figure out shortKeyColumnCount
short shortKeyColumnCount = (short) -1;
try {
shortKeyColumnCount = PropertyAnalyzer.analyzeShortKeyColumnCount(properties);
} catch (AnalysisException e) {
throw new DdlException(e.getMessage());
}
if (shortKeyColumnCount != (short) -1) {
// use user specified short key column count
if (shortKeyColumnCount <= 0) {
throw new DdlException("Invalid short key: " + shortKeyColumnCount);
}
if (shortKeyColumnCount > sortKeyColumns.size()) {
throw new DdlException("Short key is too large. should less than: " + sortKeyColumns.size());
}
for (int pos = 0; pos < shortKeyColumnCount; pos++) {
if (sortKeyColumns.get(pos).getDataType() == PrimitiveType.VARCHAR && pos != shortKeyColumnCount - 1) {
throw new DdlException("Varchar should not in the middle of short keys.");
}
}
} else {
/*
* Calc short key column count. NOTE: short key column count is
* calculated as follow: 1. All index column are taking into
* account. 2. Max short key column count is Min(Num of
* indexColumns, META_MAX_SHORT_KEY_NUM). 3. Short key list can
* contains at most one VARCHAR column. And if contains, it should
* be at the last position of the short key list.
*/
shortKeyColumnCount = 0;
int shortKeySizeByte = 0;
int maxShortKeyColumnCount = Math.min(sortKeyColumns.size(), FeConstants.shortkey_max_column_count);
for (int i = 0; i < maxShortKeyColumnCount; i++) {
Column column = sortKeyColumns.get(i);
shortKeySizeByte += column.getOlapColumnIndexSize();
if (shortKeySizeByte > FeConstants.shortkey_maxsize_bytes) {
if (column.getDataType().isCharFamily()) {
++shortKeyColumnCount;
}
break;
}
if (!column.getType().couldBeShortKey()) {
break;
}
if (column.getDataType() == PrimitiveType.VARCHAR) {
++shortKeyColumnCount;
break;
}
++shortKeyColumnCount;
}
if (isKeysRequired && shortKeyColumnCount == 0) {
throw new DdlException("The first column could not be float or double type, use decimal instead");
}
} // end calc shortKeyColumnCount
if (clusterColumns.size() > 0 && shortKeyColumnCount < clusterColumns.size()) {
boolean sameKey = true;
for (int i = 0; i < shortKeyColumnCount && i < indexColumns.size(); i++) {
if (!clusterColumns.get(i).getName().equals(indexColumns.get(i).getName())) {
sameKey = false;
break;
}
}
if (sameKey && !Config.random_add_cluster_keys_for_mow) {
throw new DdlException(shortKeyColumnCount + " short keys is a part of unique keys");
}
}
return shortKeyColumnCount;
}
/*
* used for handling AlterTableStmt (for client is the ALTER TABLE command).
* including SchemaChangeHandler and RollupHandler
*/
public void alterTable(AlterTableStmt stmt) throws UserException {
this.alter.processAlterTable(stmt);
}
public void alterTable(AlterTableCommand command) throws UserException {
this.alter.processAlterTable(command);
}
/**
* used for handling AlterViewStmt (the ALTER VIEW command).
*/
public void alterView(AlterViewStmt stmt) throws UserException {
this.alter.processAlterView(stmt, ConnectContext.get());
}
public void createMaterializedView(CreateMaterializedViewStmt stmt)
throws AnalysisException, DdlException, MetaNotFoundException {
this.alter.processCreateMaterializedView(stmt);
}
public void createMaterializedView(CreateMaterializedViewCommand command)
throws AnalysisException, DdlException, MetaNotFoundException {
this.alter.processCreateMaterializedView(command);
}
public void dropMaterializedView(DropMaterializedViewStmt stmt) throws DdlException, MetaNotFoundException {
this.alter.processDropMaterializedView(stmt);
}
/*
* used for handling CancelAlterCommand (for client is the CANCEL ALTER
* command). including SchemaChangeHandler and RollupHandler
*/
public void cancelAlter(CancelAlterTableCommand command) throws DdlException {
if (command.getAlterType() == CancelAlterTableCommand.AlterType.ROLLUP
|| command.getAlterType() == CancelAlterTableCommand.AlterType.MV) {
this.getMaterializedViewHandler().cancel(command);
} else if (command.getAlterType() == CancelAlterTableCommand.AlterType.COLUMN) {
this.getSchemaChangeHandler().cancel(command);
} else {
throw new DdlException("Cancel " + command.getAlterType() + " does not implement yet");
}
}
/*
* used for handling CancelIndexCommand
*/
public void cancelBuildIndex(CancelBuildIndexCommand command) throws DdlException {
this.getSchemaChangeHandler().cancelIndexJob(command);
}
/*
* used for handling CancelAlterStmt (for client is the CANCEL ALTER
* command). including SchemaChangeHandler and RollupHandler
*/
public void cancelAlter(CancelAlterTableStmt stmt) throws DdlException {
if (stmt.getAlterType() == AlterType.ROLLUP) {
this.getMaterializedViewHandler().cancel(stmt);
} else if (stmt.getAlterType() == AlterType.COLUMN
|| stmt.getAlterType() == AlterType.INDEX) {
this.getSchemaChangeHandler().cancel(stmt);
} else {
throw new DdlException("Cancel " + stmt.getAlterType() + " does not implement yet");
}
}
/*
* used for handling backup opt
*/
public void backup(BackupStmt stmt) throws DdlException {
getBackupHandler().process(stmt);
}
public void restore(RestoreStmt stmt) throws DdlException {
getBackupHandler().process(stmt);
}
public void cancelBackup(CancelBackupCommand command) throws DdlException {
getBackupHandler().cancel(command);
}
public void cancelBackup(CancelBackupStmt stmt) throws DdlException {
getBackupHandler().cancel(stmt);
}
public void renameTable(Database db, Table table, TableRenameClause tableRenameClause) throws DdlException {
renameTable(db, table, tableRenameClause.getNewTableName());
}
// entry of rename table operation
public void renameTable(Database db, Table table, String newTableName) throws DdlException {
db.writeLockOrDdlException();
try {
table.writeLockOrDdlException();
try {
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
olapTable.checkNormalStateForAlter();
}
String oldTableName = table.getName();
if (Env.isStoredTableNamesLowerCase() && !Strings.isNullOrEmpty(newTableName)) {
newTableName = newTableName.toLowerCase();
}
if (oldTableName.equals(newTableName)) {
throw new DdlException("Same table name");
}
// check if name is already used
if (db.getTable(newTableName).isPresent()) {
throw new DdlException("Table name[" + newTableName + "] is already used");
}
if (db.getTable(RestoreJob.tableAliasWithAtomicRestore(newTableName)).isPresent()) {
throw new DdlException("Table name[" + newTableName + "] is already used (in restoring)");
}
if (table.isManagedTable()) {
// If not checked first, execute db.unregisterTable first,
// and then check the name in setName, it cannot guarantee atomicity
((OlapTable) table).checkAndSetName(newTableName, true);
}
db.unregisterTable(oldTableName);
if (table.isManagedTable()) {
// olap table should also check if any rollup has same name as "newTableName"
((OlapTable) table).checkAndSetName(newTableName, false);
} else {
table.setName(newTableName);
}
db.registerTable(table);
TableInfo tableInfo = TableInfo.createForTableRename(db.getId(), table.getId(), oldTableName,
newTableName);
editLog.logTableRename(tableInfo);
LOG.info("rename table[{}] to {}", oldTableName, newTableName);
} finally {
table.writeUnlock();
}
} finally {
db.writeUnlock();
}
}
public void refreshExternalTableSchema(Database db, Table table, List<Column> newSchema) {
RefreshExternalTableInfo refreshExternalTableInfo = new RefreshExternalTableInfo(db.getFullName(),
table.getName(), newSchema);
editLog.logRefreshExternalTableSchema(refreshExternalTableInfo);
LOG.info("refresh db[{}] table[{}] for schema change", db.getFullName(), table.getName());
}
public void replayRenameTable(TableInfo tableInfo) throws MetaNotFoundException {
long dbId = tableInfo.getDbId();
long tableId = tableInfo.getTableId();
String newTableName = tableInfo.getNewTableName();
Database db = getInternalCatalog().getDbOrMetaException(dbId);
db.writeLock();
try {
Table table = db.getTableOrMetaException(tableId);
table.writeLock();
try {
String tableName = table.getName();
db.unregisterTable(tableName);
table.setName(newTableName);
db.registerTable(table);
LOG.info("replay rename table[{}] to {}", tableName, newTableName);
} finally {
table.writeUnlock();
}
} finally {
db.writeUnlock();
}
}
// the invoker should keep table's write lock
public void modifyTableColocate(Database db, OlapTable table, String assignedGroup, boolean isReplay,
GroupId assignedGroupId)
throws DdlException {
String oldGroup = table.getColocateGroup();
GroupId groupId = null;
if (!Strings.isNullOrEmpty(assignedGroup)) {
String fullAssignedGroupName = GroupId.getFullGroupName(db.getId(), assignedGroup);
// When the new name is the same as the old name, we return it to prevent npe
if (!Strings.isNullOrEmpty(oldGroup)) {
String oldFullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup);
if (oldFullGroupName.equals(fullAssignedGroupName)) {
LOG.warn("modify table[{}] group name same as old group name,skip.", table.getName());
return;
}
}
if (!isReplay && table.isAutoBucket()) {
throw new DdlException("table " + table.getName() + " is auto buckets");
}
ColocateGroupSchema groupSchema = colocateTableIndex.getGroupSchema(fullAssignedGroupName);
if (groupSchema == null) {
// user set a new colocate group,
// check if all partitions all this table has same buckets num and same replication number
PartitionInfo partitionInfo = table.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
int bucketsNum = -1;
ReplicaAllocation replicaAlloc = null;
for (Partition partition : table.getPartitions()) {
if (bucketsNum == -1) {
bucketsNum = partition.getDistributionInfo().getBucketNum();
} else if (bucketsNum != partition.getDistributionInfo().getBucketNum()) {
throw new DdlException(
"Partitions in table " + table.getName() + " have different buckets number");
}
if (replicaAlloc == null) {
replicaAlloc = partitionInfo.getReplicaAllocation(partition.getId());
} else if (!replicaAlloc.equals(partitionInfo.getReplicaAllocation(partition.getId()))) {
throw new DdlException(
"Partitions in table " + table.getName() + " have different replica allocation.");
}
}
}
} else {
// set to an already exist colocate group, check if this table can be added to this group.
groupSchema.checkColocateSchema(table);
}
if (Config.isCloudMode()) {
groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, assignedGroup, assignedGroupId);
} else {
Map<Tag, List<List<Long>>> backendsPerBucketSeq = null;
if (groupSchema == null) {
// assign to a newly created group, set backends sequence.
// we arbitrarily choose a tablet backends sequence from this table,
// let the colocation balancer do the work.
backendsPerBucketSeq = table.getArbitraryTabletBucketsSeq();
}
// change group after getting backends sequence(if has), in case 'getArbitraryTabletBucketsSeq' failed
groupId = colocateTableIndex.changeGroup(db.getId(), table, oldGroup, assignedGroup, assignedGroupId);
if (groupSchema == null) {
Preconditions.checkNotNull(backendsPerBucketSeq);
colocateTableIndex.addBackendsPerBucketSeq(groupId, backendsPerBucketSeq);
}
// set this group as unstable
colocateTableIndex.markGroupUnstable(groupId, "Colocation group modified by user",
false /* edit log is along with modify table log */);
}
table.setColocateGroup(assignedGroup);
} else {
// unset colocation group
if (Strings.isNullOrEmpty(oldGroup)) {
// this table is not a colocate table, do nothing
return;
}
// when replayModifyTableColocate, we need the groupId info
String fullGroupName = GroupId.getFullGroupName(db.getId(), oldGroup);
groupId = colocateTableIndex.getGroupSchema(fullGroupName).getGroupId();
colocateTableIndex.removeTable(table.getId());
table.setColocateGroup(null);
}
if (!isReplay) {
Map<String, String> properties = Maps.newHashMapWithExpectedSize(1);
properties.put(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH, assignedGroup);
TablePropertyInfo info = new TablePropertyInfo(db.getId(), table.getId(), groupId, properties);
editLog.logModifyTableColocate(info);
}
LOG.info("finished modify table's colocation property. table: {}, is replay: {}", table.getName(), isReplay);
}
public void replayModifyTableColocate(TablePropertyInfo info) throws MetaNotFoundException {
long dbId = info.getGroupId().dbId;
if (dbId == 0) {
dbId = info.getDbId();
}
Preconditions.checkState(dbId != 0, "replay modify table colocate failed, table id: " + info.getTableId());
long tableId = info.getTableId();
Map<String, String> properties = info.getPropertyMap();
Database db = getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
modifyTableColocate(db, olapTable, properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH), true,
info.getGroupId());
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay modify table colocate", e);
} finally {
olapTable.writeUnlock();
}
}
public void renameRollup(Database db, OlapTable table, RollupRenameClause renameClause) throws DdlException {
table.writeLockOrDdlException();
try {
table.checkNormalStateForAlter();
String rollupName = renameClause.getRollupName();
// check if it is base table name
if (rollupName.equals(table.getName())) {
throw new DdlException("Using ALTER TABLE RENAME to change table name");
}
String newRollupName = renameClause.getNewRollupName();
if (rollupName.equals(newRollupName)) {
throw new DdlException("Same rollup name");
}
Map<String, Long> indexNameToIdMap = table.getIndexNameToId();
if (indexNameToIdMap.get(rollupName) == null) {
throw new DdlException("Rollup index[" + rollupName + "] does not exists");
}
// check if name is already used
if (indexNameToIdMap.get(newRollupName) != null) {
throw new DdlException("Rollup name[" + newRollupName + "] is already used");
}
long indexId = indexNameToIdMap.remove(rollupName);
indexNameToIdMap.put(newRollupName, indexId);
// log
TableInfo tableInfo = TableInfo.createForRollupRename(db.getId(), table.getId(), indexId,
rollupName, newRollupName);
editLog.logRollupRename(tableInfo);
LOG.info("rename rollup[{}] to {}", rollupName, newRollupName);
} finally {
table.writeUnlock();
}
}
public void replayRenameRollup(TableInfo tableInfo) throws MetaNotFoundException {
long dbId = tableInfo.getDbId();
long tableId = tableInfo.getTableId();
long indexId = tableInfo.getIndexId();
String newRollupName = tableInfo.getNewRollupName();
Database db = getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
String rollupName = olapTable.getIndexNameById(indexId);
Map<String, Long> indexNameToIdMap = olapTable.getIndexNameToId();
indexNameToIdMap.remove(rollupName);
indexNameToIdMap.put(newRollupName, indexId);
LOG.info("replay rename rollup[{}] to {}", rollupName, newRollupName);
} finally {
olapTable.writeUnlock();
}
}
public void renamePartition(Database db, OlapTable table, PartitionRenameClause renameClause) throws DdlException {
table.writeLockOrDdlException();
try {
table.checkNormalStateForAlter();
if (table.getPartitionInfo().getType() != PartitionType.RANGE
&& table.getPartitionInfo().getType() != PartitionType.LIST) {
throw new DdlException(
"Table[" + table.getName() + "] is single partitioned. " + "no need to rename partition name.");
}
String partitionName = renameClause.getPartitionName();
String newPartitionName = renameClause.getNewPartitionName();
if (partitionName.equalsIgnoreCase(newPartitionName)) {
throw new DdlException("Same partition name");
}
Partition partition = table.getPartition(partitionName);
if (partition == null) {
throw new DdlException("Partition[" + partitionName + "] does not exists");
}
// check if name is already used
if (table.checkPartitionNameExist(newPartitionName)) {
throw new DdlException("Partition name[" + newPartitionName + "] is already used");
}
table.renamePartition(partitionName, newPartitionName);
// log
TableInfo tableInfo = TableInfo.createForPartitionRename(db.getId(), table.getId(), partition.getId(),
partitionName, newPartitionName);
editLog.logPartitionRename(tableInfo);
LOG.info("rename partition[{}] to {}", partitionName, newPartitionName);
} finally {
table.writeUnlock();
}
}
public void replayRenamePartition(TableInfo tableInfo) throws MetaNotFoundException {
long dbId = tableInfo.getDbId();
long tableId = tableInfo.getTableId();
long partitionId = tableInfo.getPartitionId();
String newPartitionName = tableInfo.getNewPartitionName();
Database db = getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
Partition partition = olapTable.getPartition(partitionId);
olapTable.renamePartition(partition.getName(), newPartitionName);
LOG.info("replay rename partition[{}] to {}", partition.getName(), newPartitionName);
} finally {
olapTable.writeUnlock();
}
}
private void renameColumn(Database db, OlapTable table, String colName,
String newColName, Map<Long, Integer> indexIdToSchemaVersion,
boolean isReplay) throws DdlException {
table.checkNormalStateForAlter();
if (colName.equalsIgnoreCase(newColName)) {
throw new DdlException("Same column name");
}
// @NOTE: Rename partition columns should also rename column names in partition expressions
// but this is not implemented currently. Therefore, forbid renaming partition columns temporarily.
PartitionInfo partitionInfo = table.getPartitionInfo();
if (partitionInfo.getPartitionColumns().stream().anyMatch(c -> c.getName().equalsIgnoreCase(colName))) {
throw new DdlException("Renaming partition columns has problems, forbidden in current Doris version");
}
Map<Long, MaterializedIndexMeta> indexIdToMeta = table.getIndexIdToMeta();
for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
// rename column is not implemented for non-light-schema-change table.
if (!table.getEnableLightSchemaChange()) {
throw new DdlException("not implemented for table without column unique id,"
+ " which are created with property 'light_schema_change'.");
}
// check if new name is already used
if (entry.getValue().getColumnByName(newColName) != null) {
throw new DdlException("Column name[" + newColName + "] is already used");
}
// check if have materialized view on rename column
// and check whether colName is referenced by generated columns
for (Column column : entry.getValue().getSchema()) {
if (column.getName().equals(colName) && !column.getGeneratedColumnsThatReferToThis().isEmpty()) {
throw new DdlException(
"Cannot rename column, because column '" + colName
+ "' has a generated column dependency on :"
+ column.getGeneratedColumnsThatReferToThis());
}
Expr expr = column.getDefineExpr();
if (expr == null) {
continue;
}
List<SlotRef> slots = new ArrayList<>();
expr.collect(SlotRef.class, slots);
for (SlotRef slot : slots) {
String name = MaterializedIndexMeta
.normalizeName(CreateMaterializedViewStmt.mvColumnBreaker(slot.toSqlWithoutTbl()));
if (!isReplay && name.equals(colName)) {
throw new DdlException("Column[" + colName + "] have materialized view index");
}
}
}
}
// 1. modify old MaterializedIndexMeta
boolean hasColumn = false;
for (Map.Entry<Long, MaterializedIndexMeta> entry : indexIdToMeta.entrySet()) {
Column column = entry.getValue().getColumnByName(colName);
if (column != null) {
column.setName(newColName);
hasColumn = true;
Env.getCurrentEnv().getQueryStats()
.rename(Env.getCurrentEnv().getCurrentCatalog().getId(), db.getId(),
table.getId(), entry.getKey(), colName, newColName);
if (!isReplay) {
indexIdToSchemaVersion.put(entry.getKey(), entry.getValue().getSchemaVersion() + 1);
}
if (indexIdToSchemaVersion != null) {
entry.getValue().setSchemaVersion(indexIdToSchemaVersion.get(entry.getKey()));
}
}
}
if (!hasColumn) {
throw new DdlException("Column[" + colName + "] does not exists");
}
// @NOTE: Rename partition columns should also rename column names in partition expressions
// but this is not implemented currently. Therefore, forbid renaming partition columns temporarily.
//
// 2. modify partition key
// PartitionInfo partitionInfo = table.getPartitionInfo();
// List<Column> partitionColumns = partitionInfo.getPartitionColumns();
// for (Column column : partitionColumns) {
// if (column.getName().equalsIgnoreCase(colName)) {
// column.setName(newColName);
// }
//}
// 3. modify index
List<Index> indexes = table.getIndexes();
for (Index index : indexes) {
List<String> columns = index.getColumns();
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i).equalsIgnoreCase(colName)) {
columns.set(i, newColName);
}
}
}
// 4. modify distribution info
DistributionInfo distributionInfo = table.getDefaultDistributionInfo();
if (distributionInfo.getType() == DistributionInfoType.HASH) {
// modify default distribution info
List<Column> distributionColumns = ((HashDistributionInfo) distributionInfo).getDistributionColumns();
for (Column column : distributionColumns) {
if (column.getName().equalsIgnoreCase(colName)) {
column.setName(newColName);
}
}
// modify distribution info inside partitions
for (Partition p : table.getPartitions()) {
DistributionInfo partDistInfo = p.getDistributionInfo();
if (partDistInfo.getType() != DistributionInfoType.HASH) {
continue;
}
List<Column> partDistColumns = ((HashDistributionInfo) partDistInfo).getDistributionColumns();
for (Column column : partDistColumns) {
if (column.getName().equalsIgnoreCase(colName)) {
column.setName(newColName);
}
}
}
}
// 5. modify sequence map col
if (table.hasSequenceCol() && table.getSequenceMapCol() != null
&& table.getSequenceMapCol().equalsIgnoreCase(colName)) {
table.setSequenceMapCol(newColName);
}
// 6. modify bloom filter col
Set<String> bfCols = table.getCopiedBfColumns();
if (bfCols != null) {
Set<String> newBfCols = new HashSet<>();
for (String bfCol : bfCols) {
if (bfCol.equalsIgnoreCase(colName)) {
newBfCols.add(newColName);
} else {
newBfCols.add(bfCol);
}
}
table.setBloomFilterInfo(newBfCols, table.getBfFpp());
}
table.rebuildFullSchema();
if (!isReplay) {
// log
TableRenameColumnInfo info = new TableRenameColumnInfo(db.getId(), table.getId(), colName, newColName,
indexIdToSchemaVersion);
editLog.logColumnRename(info);
LOG.info("rename coloumn[{}] to {}", colName, newColName);
Env.getCurrentEnv().getAnalysisManager().dropStats(table, null);
}
}
public void renameColumn(Database db, OlapTable table, ColumnRenameClause renameClause) throws DdlException {
table.writeLockOrDdlException();
try {
String colName = renameClause.getColName();
String newColName = renameClause.getNewColName();
Map<Long, Integer> indexIdToSchemaVersion = new HashMap<Long, Integer>();
renameColumn(db, table, colName, newColName, indexIdToSchemaVersion, false);
} finally {
table.writeUnlock();
}
}
public void replayRenameColumn(TableRenameColumnInfo info) throws MetaNotFoundException {
if (LOG.isDebugEnabled()) {
LOG.debug("info:{}", info);
}
long dbId = info.getDbId();
long tableId = info.getTableId();
String colName = info.getColName();
String newColName = info.getNewColName();
Map<Long, Integer> indexIdToSchemaVersion = info.getIndexIdToSchemaVersion();
Database db = getCurrentEnv().getInternalCatalog().getDbOrMetaException(dbId);
OlapTable table = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
table.writeLock();
try {
renameColumn(db, table, colName, newColName, indexIdToSchemaVersion, true);
} catch (DdlException e) {
// should not happen
LOG.warn("failed to replay rename column", e);
} finally {
table.writeUnlock();
}
}
public void modifyTableDynamicPartition(Database db, OlapTable table, Map<String, String> properties)
throws UserException {
convertDynamicPartitionReplicaNumToReplicaAllocation(properties);
if (properties.containsKey(DynamicPartitionProperty.REPLICATION_ALLOCATION)) {
table.checkChangeReplicaAllocation();
}
Map<String, String> logProperties = new HashMap<>(properties);
TableProperty tableProperty = table.getTableProperty();
if (tableProperty == null) {
DynamicPartitionUtil.checkAndSetDynamicPartitionProperty(table, properties, db);
} else {
// Merge the new properties with origin properties, and then analyze them
Map<String, String> origDynamicProperties = tableProperty.getOriginDynamicPartitionProperty();
origDynamicProperties.putAll(properties);
Map<String, String> analyzedDynamicPartition = DynamicPartitionUtil.analyzeDynamicPartition(
origDynamicProperties, table, db, false);
tableProperty.modifyTableProperties(analyzedDynamicPartition);
tableProperty.buildDynamicProperty();
}
DynamicPartitionUtil.registerOrRemoveDynamicPartitionTable(db.getId(), table, false);
dynamicPartitionScheduler.createOrUpdateRuntimeInfo(table.getId(), DynamicPartitionScheduler.LAST_UPDATE_TIME,
TimeUtils.getCurrentFormatTime());
ModifyTablePropertyOperationLog info =
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
logProperties);
editLog.logDynamicPartition(info);
}
private void convertDynamicPartitionReplicaNumToReplicaAllocation(Map<String, String> properties) {
if (properties.containsKey(DynamicPartitionProperty.REPLICATION_NUM)) {
short repNum = Short.parseShort(properties.remove(DynamicPartitionProperty.REPLICATION_NUM));
ReplicaAllocation replicaAlloc = new ReplicaAllocation(repNum);
properties.put(DynamicPartitionProperty.REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt());
}
}
/**
* Set replication number for unpartitioned table.
*
* @param db
* @param table
* @param properties
* @throws DdlException
*/
// The caller need to hold the table write lock
public void modifyTableReplicaAllocation(Database db, OlapTable table, Map<String, String> properties)
throws UserException {
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
String defaultReplicationNumName = "default." + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM;
PartitionInfo partitionInfo = table.getPartitionInfo();
if (partitionInfo.getType() == PartitionType.RANGE || partitionInfo.getType() == PartitionType.LIST) {
throw new DdlException(
"This is a partitioned table, you should specify partitions" + " with MODIFY PARTITION clause."
+ " If you want to set default replication number, please use '" + defaultReplicationNumName
+ "' instead of '" + PropertyAnalyzer.PROPERTIES_REPLICATION_NUM
+ "' to escape misleading.");
}
String partitionName = table.getName();
Partition partition = table.getPartition(partitionName);
if (partition == null) {
throw new DdlException("Partition does not exist. name: " + partitionName);
}
ReplicaAllocation replicaAlloc = PropertyAnalyzer.analyzeReplicaAllocation(properties, "");
table.checkChangeReplicaAllocation();
Env.getCurrentSystemInfo().checkReplicaAllocation(replicaAlloc);
Preconditions.checkState(!replicaAlloc.isNotSet());
boolean isInMemory = partitionInfo.getIsInMemory(partition.getId());
DataProperty newDataProperty = partitionInfo.getDataProperty(partition.getId());
partitionInfo.setReplicaAllocation(partition.getId(), replicaAlloc);
// set table's default replication number.
Map<String, String> tblProperties = Maps.newHashMap();
tblProperties.put("default." + PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION, replicaAlloc.toCreateStmt());
table.setReplicaAllocation(tblProperties);
// log
ModifyPartitionInfo info = new ModifyPartitionInfo(db.getId(), table.getId(), partition.getId(),
newDataProperty, replicaAlloc, isInMemory, partitionInfo.getStoragePolicy(partition.getId()),
tblProperties);
editLog.logModifyPartition(info);
if (LOG.isDebugEnabled()) {
LOG.debug("modify partition[{}-{}-{}] replica allocation to {}",
db.getId(), table.getId(), partition.getName(), replicaAlloc.toCreateStmt());
}
}
/**
* Set default replication allocation for a specified table.
* You can see the default replication allocation by executing Show Create Table stmt.
*
* @param db
* @param table
* @param properties
*/
// The caller need to hold the table write lock
public void modifyTableDefaultReplicaAllocation(Database db, OlapTable table,
Map<String, String> properties) throws UserException {
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
table.checkChangeReplicaAllocation();
table.setReplicaAllocation(properties);
ModifyTablePropertyOperationLog info =
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
properties);
editLog.logModifyReplicationNum(info);
if (LOG.isDebugEnabled()) {
LOG.debug("modify table[{}] replication num to {}", table.getName(),
properties.get(PropertyAnalyzer.PROPERTIES_REPLICATION_NUM));
}
}
// The caller need to hold the table write lock
public void modifyTableProperties(Database db, OlapTable table, Map<String, String> properties) {
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
TableProperty tableProperty = table.getTableProperty();
if (tableProperty == null) {
tableProperty = new TableProperty(properties);
} else {
tableProperty.modifyTableProperties(properties);
}
tableProperty.buildInMemory()
.buildMinLoadReplicaNum()
.buildStoragePolicy()
.buildStorageMedium()
.buildIsBeingSynced()
.buildCompactionPolicy()
.buildTimeSeriesCompactionGoalSizeMbytes()
.buildTimeSeriesCompactionFileCountThreshold()
.buildTimeSeriesCompactionTimeThresholdSeconds()
.buildSkipWriteIndexOnLoad()
.buildDisableAutoCompaction()
.buildEnableSingleReplicaCompaction()
.buildTimeSeriesCompactionEmptyRowsetsThreshold()
.buildTimeSeriesCompactionLevelThreshold()
.buildTTLSeconds()
.buildAutoAnalyzeProperty();
// need to update partition info meta
for (Partition partition : table.getPartitions()) {
table.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.isInMemory());
table.getPartitionInfo().setStoragePolicy(partition.getId(), tableProperty.getStoragePolicy());
}
ModifyTablePropertyOperationLog info =
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
properties);
editLog.logModifyTableProperties(info);
}
public void updateBinlogConfig(Database db, OlapTable table, BinlogConfig newBinlogConfig) {
Preconditions.checkArgument(table.isWriteLockHeldByCurrentThread());
table.setBinlogConfig(newBinlogConfig);
ModifyTablePropertyOperationLog info =
new ModifyTablePropertyOperationLog(db.getId(), table.getId(), table.getName(),
newBinlogConfig.toProperties());
editLog.logUpdateBinlogConfig(info);
}
public void replayModifyTableProperty(short opCode, ModifyTablePropertyOperationLog info)
throws MetaNotFoundException {
String ctlName = info.getCtlName();
long dbId = info.getDbId();
long tableId = info.getTableId();
Map<String, String> properties = info.getProperties();
// Handle HMSExternalTable set auto analyze policy.
if (ctlName != null && !(InternalCatalog.INTERNAL_CATALOG_NAME.equalsIgnoreCase(ctlName))) {
setExternalTableAutoAnalyze(properties, info);
return;
}
Database db = getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
TableProperty tableProperty = olapTable.getTableProperty();
if (tableProperty == null) {
tableProperty = new TableProperty(properties).buildProperty(opCode);
olapTable.setTableProperty(tableProperty);
} else {
tableProperty.modifyTableProperties(properties);
tableProperty.buildProperty(opCode);
}
// need to replay partition info meta
switch (opCode) {
case OperationType.OP_MODIFY_TABLE_PROPERTIES:
for (Partition partition : olapTable.getPartitions()) {
olapTable.getPartitionInfo().setIsInMemory(partition.getId(), tableProperty.isInMemory());
// storage policy re-use modify in memory
Optional.ofNullable(tableProperty.getStoragePolicy()).filter(p -> !p.isEmpty())
.ifPresent(p -> olapTable.getPartitionInfo().setStoragePolicy(partition.getId(), p));
Optional.ofNullable(tableProperty.getStoragePolicy()).filter(p -> !p.isEmpty())
.ifPresent(p -> olapTable.getPartitionInfo().getDataProperty(partition.getId())
.setStoragePolicy(p));
}
break;
case OperationType.OP_UPDATE_BINLOG_CONFIG:
BinlogConfig newBinlogConfig = new BinlogConfig();
newBinlogConfig.mergeFromProperties(properties);
olapTable.setBinlogConfig(newBinlogConfig);
break;
default:
break;
}
} finally {
olapTable.writeUnlock();
}
}
private void setExternalTableAutoAnalyze(Map<String, String> properties, ModifyTablePropertyOperationLog info) {
if (properties.size() != 1) {
LOG.warn("External table property should contain exactly 1 entry.");
return;
}
if (!properties.containsKey(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY)) {
LOG.warn("External table property should only contain auto_analyze_policy");
return;
}
String value = properties.get(PropertyAnalyzer.PROPERTIES_AUTO_ANALYZE_POLICY);
if (!PropertyAnalyzer.ENABLE_AUTO_ANALYZE_POLICY.equalsIgnoreCase(value)
&& !PropertyAnalyzer.DISABLE_AUTO_ANALYZE_POLICY.equalsIgnoreCase(value)
&& !PropertyAnalyzer.USE_CATALOG_AUTO_ANALYZE_POLICY.equalsIgnoreCase(value)) {
LOG.warn("External table property should be 'enable', 'disable' or 'base_on_catalog'");
return;
}
try {
CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr()
.getCatalogOrException(info.getCtlName(),
ctlName -> new DdlException("Unknown catalog " + ctlName));
value = value.equalsIgnoreCase(PropertyAnalyzer.USE_CATALOG_AUTO_ANALYZE_POLICY) ? null : value;
((ExternalCatalog) catalog).setAutoAnalyzePolicy(info.getDbName(), info.getTableName(), value);
} catch (Exception e) {
LOG.warn("Failed to replay external table set property.", e);
}
}
public void modifyDefaultDistributionBucketNum(Database db, OlapTable olapTable,
ModifyDistributionClause modifyDistributionClause)
throws DdlException {
olapTable.writeLockOrDdlException();
try {
if (olapTable.isColocateTable()) {
throw new DdlException("Cannot change default bucket number of colocate table.");
}
if (olapTable.getPartitionInfo().getType() != PartitionType.RANGE
&& olapTable.getPartitionInfo().getType() != PartitionType.LIST) {
throw new DdlException("Only support change partitioned table's distribution.");
}
DistributionDesc distributionDesc = modifyDistributionClause.getDistributionDesc();
if (distributionDesc != null) {
DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
List<Column> baseSchema = olapTable.getBaseSchema();
DistributionInfo distributionInfo = distributionDesc.toDistributionInfo(baseSchema);
// for now. we only support modify distribution's bucket num
if (distributionInfo.getType() != defaultDistributionInfo.getType()) {
throw new DdlException(
"Cannot change distribution type when modify" + " default distribution bucket num");
}
if (distributionInfo.getType() == DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
if (!hashDistributionInfo.sameDistributionColumns((HashDistributionInfo) defaultDistributionInfo)) {
throw new DdlException("Cannot assign hash distribution with different distribution cols. "
+ "new is: " + hashDistributionInfo.getDistributionColumns() + " default is: "
+ ((HashDistributionInfo) defaultDistributionInfo).getDistributionColumns());
}
}
int bucketNum = distributionInfo.getBucketNum();
if (bucketNum <= 0) {
throw new DdlException("Cannot assign hash distribution buckets less than 1");
}
defaultDistributionInfo.setBucketNum(bucketNum);
ModifyTableDefaultDistributionBucketNumOperationLog info
= new ModifyTableDefaultDistributionBucketNumOperationLog(db.getId(), olapTable.getId(),
distributionInfo.getType(), distributionInfo.getAutoBucket(), bucketNum,
defaultDistributionInfo.getColumnsName());
editLog.logModifyDefaultDistributionBucketNum(info);
LOG.info("modify table[{}] default bucket num to {}", olapTable.getName(), bucketNum);
}
} finally {
olapTable.writeUnlock();
}
}
public void replayModifyTableDefaultDistributionBucketNum(ModifyTableDefaultDistributionBucketNumOperationLog info)
throws MetaNotFoundException {
long dbId = info.getDbId();
long tableId = info.getTableId();
int bucketNum = info.getBucketNum();
Database db = getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableId, TableType.OLAP);
olapTable.writeLock();
try {
DistributionInfo defaultDistributionInfo = olapTable.getDefaultDistributionInfo();
defaultDistributionInfo.setBucketNum(bucketNum);
} finally {
olapTable.writeUnlock();
}
}
/*
* used for handling AlterSystemStmt
* (for client is the ALTER SYSTEM command).
*/
public void alterSystem(AlterSystemStmt stmt) throws DdlException, UserException {
this.alter.processAlterSystem(stmt);
}
public void alterSystem(AlterSystemCommand command) throws UserException {
this.alter.processAlterSystem(command);
}
public void analyze(AnalyzeCommand command, boolean isProxy) throws DdlException, AnalysisException {
this.analysisManager.createAnalyze(command, isProxy);
}
public void cancelAlterSystem(CancelAlterSystemStmt stmt) throws DdlException {
this.alter.getSystemHandler().cancel(stmt);
}
// Switch catalog of this session
public void changeCatalog(ConnectContext ctx, String catalogName) throws DdlException {
CatalogIf catalogIf = catalogMgr.getCatalog(catalogName);
if (catalogIf == null) {
throw new DdlException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalogName),
ErrorCode.ERR_UNKNOWN_CATALOG);
}
String currentDB = ctx.getDatabase();
if (StringUtils.isNotEmpty(currentDB)) {
// When dropped the current catalog in current context, the current catalog will be null.
if (ctx.getCurrentCatalog() != null) {
ctx.addLastDBOfCatalog(ctx.getCurrentCatalog().getName(), currentDB);
}
}
ctx.changeDefaultCatalog(catalogName);
String lastDb = ctx.getLastDBOfCatalog(catalogName);
if (StringUtils.isNotEmpty(lastDb)) {
ctx.setDatabase(lastDb);
}
if (catalogIf instanceof EsExternalCatalog) {
ctx.setDatabase(EsExternalCatalog.DEFAULT_DB);
}
}
// Change current database of this session.
public void changeDb(ConnectContext ctx, String qualifiedDb) throws DdlException {
if (!accessManager.checkDbPriv(ctx, ctx.getDefaultCatalog(), qualifiedDb, PrivPredicate.SHOW)) {
ErrorReport.reportDdlException(ErrorCode.ERR_DBACCESS_DENIED_ERROR, ctx.getQualifiedUser(), qualifiedDb);
}
ctx.getCurrentCatalog().getDbOrDdlException(qualifiedDb);
ctx.setDatabase(qualifiedDb);
}
// for test only
public void clear() {
getInternalCatalog().clearDbs();
if (load.getIdToLoadJob() != null) {
load.getIdToLoadJob().clear();
// load = null;
}
System.gc();
}
public void createView(CreateViewStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
String tableName = stmt.getTable();
// check if db exists
Database db = getInternalCatalog().getDbOrDdlException(dbName);
// check if table exists in db
boolean replace = false;
if (db.getTable(tableName).isPresent()) {
if (stmt.isSetIfNotExists()) {
LOG.info("create view[{}] which already exists", tableName);
return;
} else if (stmt.isSetOrReplace()) {
replace = true;
LOG.info("view[{}] already exists, need to replace it", tableName);
} else {
ErrorReport.reportDdlException(ErrorCode.ERR_TABLE_EXISTS_ERROR, tableName);
}
}
if (replace) {
String comment = stmt.getComment();
comment = comment == null || comment.isEmpty() ? null : comment;
AlterViewStmt alterViewStmt = new AlterViewStmt(stmt.getTableName(), stmt.getColWithComments(),
stmt.getViewDefStmt(), comment);
alterViewStmt.setInlineViewDef(stmt.getInlineViewDef());
alterViewStmt.setFinalColumns(stmt.getColumns());
try {
alterView(alterViewStmt);
} catch (UserException e) {
throw new DdlException("failed to replace view[" + tableName + "], reason=" + e.getMessage());
}
LOG.info("successfully replace view[{}]", tableName);
} else {
List<Column> columns = stmt.getColumns();
long tableId = Env.getCurrentEnv().getNextId();
View newView = new View(tableId, tableName, columns);
newView.setComment(stmt.getComment());
newView.setInlineViewDefWithSqlMode(stmt.getInlineViewDef(),
ConnectContext.get().getSessionVariable().getSqlMode());
if (!((Database) db).createTableWithLock(newView, false, stmt.isSetIfNotExists()).first) {
throw new DdlException("Failed to create view[" + tableName + "].");
}
LOG.info("successfully create view[" + tableName + "-" + newView.getId() + "]");
}
}
public FunctionRegistry getFunctionRegistry() {
return functionRegistry;
}
/**
* Returns the function that best matches 'desc' that is registered with the
* catalog using 'mode' to check for matching. If desc matches multiple
* functions in the catalog, it will return the function with the strictest
* matching mode. If multiple functions match at the same matching mode,
* ties are broken by comparing argument types in lexical order. Argument
* types are ordered by argument precision (e.g. double is preferred over
* float) and then by alphabetical order of argument type name, to guarantee
* deterministic results.
*/
public Function getFunction(Function desc, Function.CompareMode mode) {
return functionSet.getFunction(desc, mode);
}
public List<Function> getBuiltinFunctions() {
return functionSet.getBulitinFunctions();
}
public Function getTableFunction(Function desc, Function.CompareMode mode) {
return functionSet.getFunction(desc, mode, true);
}
public boolean isNondeterministicFunction(String funcName) {
return functionSet.isNondeterministicFunction(funcName);
}
public boolean isNullResultWithOneNullParamFunction(String funcName) {
return functionSet.isNullResultWithOneNullParamFunctions(funcName);
}
public boolean isAggFunctionName(String name) {
return functionSet.isAggFunctionName(name);
}
@Deprecated
public long loadCluster(DataInputStream dis, long checksum) throws IOException, DdlException {
return getInternalCatalog().loadCluster(dis, checksum);
}
public long saveCluster(CountingDataOutputStream dos, long checksum) throws IOException {
// do nothing
return checksum;
}
public long saveBrokers(CountingDataOutputStream dos, long checksum) throws IOException {
Map<String, List<FsBroker>> addressListMap = brokerMgr.getBrokerListMap();
int size = addressListMap.size();
checksum ^= size;
dos.writeInt(size);
for (Map.Entry<String, List<FsBroker>> entry : addressListMap.entrySet()) {
Text.writeString(dos, entry.getKey());
final List<FsBroker> addrs = entry.getValue();
size = addrs.size();
checksum ^= size;
dos.writeInt(size);
for (FsBroker addr : addrs) {
addr.write(dos);
}
}
return checksum;
}
public long loadBrokers(DataInputStream dis, long checksum) throws IOException, DdlException {
int count = dis.readInt();
checksum ^= count;
for (long i = 0; i < count; ++i) {
String brokerName = Text.readString(dis);
int size = dis.readInt();
checksum ^= size;
List<FsBroker> addrs = Lists.newArrayList();
for (int j = 0; j < size; j++) {
FsBroker addr = FsBroker.readIn(dis);
addrs.add(addr);
}
brokerMgr.replayAddBrokers(brokerName, addrs);
}
LOG.info("finished replay brokerMgr from image");
return checksum;
}
public String dumpImage() {
LOG.info("begin to dump meta data");
String dumpFilePath;
List<Database> databases = Lists.newArrayList();
List<List<Table>> tableLists = Lists.newArrayList();
tryLock(true);
try {
// sort all dbs to avoid potential dead lock
for (long dbId : getInternalCatalog().getDbIds()) {
Database db = getInternalCatalog().getDbNullable(dbId);
databases.add(db);
}
databases.sort(Comparator.comparing(DatabaseIf::getId));
// lock all dbs
MetaLockUtils.readLockDatabases(databases);
LOG.info("acquired all the dbs' read lock.");
// lock all tables
for (Database db : databases) {
List<Table> tableList = db.getTablesOnIdOrder();
MetaLockUtils.readLockTables(tableList);
tableLists.add(tableList);
}
LOG.info("acquired all the tables' read lock.");
load.readLock();
LOG.info("acquired all jobs' read lock.");
long journalId = getMaxJournalId();
File dumpFile = new File(Config.meta_dir, "image." + journalId);
if (Config.enable_check_compatibility_mode) {
dumpFile = new File(imageDir, "image." + journalId);
}
dumpFilePath = dumpFile.getAbsolutePath();
try {
LOG.info("begin to dump {}", dumpFilePath);
saveImage(dumpFile, journalId);
} catch (IOException e) {
LOG.error("failed to dump image to {}", dumpFilePath, e);
return null;
}
} finally {
// unlock all
load.readUnlock();
for (int i = databases.size() - 1; i >= 0; i--) {
MetaLockUtils.readUnlockTables(tableLists.get(i));
}
MetaLockUtils.readUnlockDatabases(databases);
unlock();
}
LOG.info("finished dumping image to {}", dumpFilePath);
return dumpFilePath;
}
/*
* Truncate specified table or partitions.
* The main idea is:
*
* 1. using the same schema to create new table(partitions)
* 2. use the new created table(partitions) to replace the old ones.
*
* if no partition specified, it will truncate all partitions of this table, including all temp partitions,
* otherwise, it will only truncate those specified partitions.
*
*/
public void truncateTable(TruncateTableStmt stmt) throws DdlException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(stmt.getTblRef().getName().getCtl(),
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.truncateTable(stmt);
}
/*
* Truncate specified table or partitions.
* The main idea is:
*
* 1. using the same schema to create new table(partitions)
* 2. use the new created table(partitions) to replace the old ones.
*
* if no partition specified, it will truncate all partitions of this table, including all temp partitions,
* otherwise, it will only truncate those specified partitions.
*
*/
public void truncateTable(TruncateTableCommand command) throws DdlException {
CatalogIf<?> catalogIf = catalogMgr.getCatalogOrException(command.getTableNameInfo().getCtl(),
catalog -> new DdlException(("Unknown catalog " + catalog)));
catalogIf.truncateTable(command);
}
public void replayTruncateTable(TruncateTableInfo info) throws MetaNotFoundException {
if (Strings.isNullOrEmpty(info.getCtl()) || info.getCtl().equals(InternalCatalog.INTERNAL_CATALOG_NAME)) {
// In previous versions(before 2.1.8), there is no catalog info in TruncateTableInfo,
// So if the catalog info is empty, we assume it's internal table.
getInternalCatalog().replayTruncateTable(info);
} else {
ExternalCatalog ctl = (ExternalCatalog) catalogMgr.getCatalog(info.getCtl());
if (ctl != null) {
ctl.replayTruncateTable(info);
}
}
}
public void createFunction(CreateFunctionStmt stmt) throws UserException {
if (SetType.GLOBAL.equals(stmt.getType())) {
globalFunctionMgr.addFunction(stmt.getFunction(), stmt.isIfNotExists());
} else {
Database db = getInternalCatalog().getDbOrDdlException(stmt.getFunctionName().getDb());
db.addFunction(stmt.getFunction(), stmt.isIfNotExists());
if (stmt.getFunction().isUDTFunction()) {
// all of the table function in doris will have two function
// one is the noraml, and another is outer, the different of them is deal with
// empty: whether need to insert NULL result value
Function outerFunction = stmt.getFunction().clone();
FunctionName name = outerFunction.getFunctionName();
name.setFn(name.getFunction() + "_outer");
db.addFunction(outerFunction, stmt.isIfNotExists());
}
}
}
public void replayCreateFunction(Function function) throws MetaNotFoundException {
String dbName = function.getFunctionName().getDb();
Database db = getInternalCatalog().getDbOrMetaException(dbName);
db.replayAddFunction(function);
}
public void replayCreateGlobalFunction(Function function) {
globalFunctionMgr.replayAddFunction(function);
}
public void dropFunction(DropFunctionStmt stmt) throws UserException {
FunctionName name = stmt.getFunctionName();
if (SetType.GLOBAL.equals(stmt.getType())) {
globalFunctionMgr.dropFunction(stmt.getFunction(), stmt.isIfExists());
} else {
Database db = getInternalCatalog().getDbOrDdlException(name.getDb());
db.dropFunction(stmt.getFunction(), stmt.isIfExists());
}
cleanUDFCacheTask(stmt); // BE will cache classload, when drop function, BE need clear cache
}
public void replayDropFunction(FunctionSearchDesc functionSearchDesc) throws MetaNotFoundException {
String dbName = functionSearchDesc.getName().getDb();
Database db = getInternalCatalog().getDbOrMetaException(dbName);
db.replayDropFunction(functionSearchDesc);
}
public void replayDropGlobalFunction(FunctionSearchDesc functionSearchDesc) {
globalFunctionMgr.replayDropFunction(functionSearchDesc);
}
/**
* we can't set callback which is in fe-core to config items which are in fe-common. so wrap them here. it's not so
* good but is best for us now.
*/
public void setMutableConfigWithCallback(String key, String value) throws ConfigException {
ConfigBase.setMutableConfig(key, value);
if (configtoThreads.get(key) != null) {
try {
// not atomic. maybe delay to aware. but acceptable.
configtoThreads.get(key).get().setInterval(Config.getField(key).getLong(null) * 1000L);
// shouldn't interrupt to keep possible bdbje writing safe.
LOG.info("set config " + key + " to " + value);
} catch (IllegalAccessException e) {
LOG.warn("set config " + key + " failed: " + e.getMessage());
}
}
}
public void setConfig(AdminSetConfigStmt stmt) throws Exception {
Map<String, String> configs = stmt.getConfigs();
Preconditions.checkState(configs.size() == 1);
for (Map.Entry<String, String> entry : configs.entrySet()) {
try {
setMutableConfigWithCallback(entry.getKey(), entry.getValue());
} catch (ConfigException e) {
throw new DdlException(e.getMessage());
}
}
if (stmt.isApplyToAll()) {
for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
continue;
}
TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
FEOpExecutor executor = new FEOpExecutor(feAddr, stmt.getLocalSetStmt(), ConnectContext.get(), false);
executor.execute();
if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
throw new DdlException(String.format("failed to apply to fe %s:%s, error message: %s",
fe.getHost(), fe.getRpcPort(), executor.getErrMsg()));
}
}
}
}
public void setConfig(AdminSetFrontendConfigCommand command) throws Exception {
Map<String, String> configs = command.getConfigs();
Preconditions.checkState(configs.size() == 1);
for (Map.Entry<String, String> entry : configs.entrySet()) {
try {
setMutableConfigWithCallback(entry.getKey(), entry.getValue());
} catch (ConfigException e) {
throw new DdlException(e.getMessage());
}
}
if (command.isApplyToAll()) {
for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
continue;
}
TNetworkAddress feAddr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
FEOpExecutor executor = new FEOpExecutor(feAddr, command.getLocalSetStmt(),
ConnectContext.get(), false);
executor.execute();
if (executor.getStatusCode() != TStatusCode.OK.getValue()) {
throw new DdlException(String.format("failed to apply to fe %s:%s, error message: %s",
fe.getHost(), fe.getRpcPort(), executor.getErrMsg()));
}
}
}
}
public void replayBackendReplicasInfo(BackendReplicasInfo backendReplicasInfo) {
long backendId = backendReplicasInfo.getBackendId();
List<BackendReplicasInfo.ReplicaReportInfo> replicaInfos = backendReplicasInfo.getReplicaReportInfos();
for (BackendReplicasInfo.ReplicaReportInfo info : replicaInfos) {
if (tabletInvertedIndex.getTabletMeta(info.tabletId) == null) {
// The tablet has been deleted. Because the reporting of tablet and
// the deletion of tablet are two independent events,
// and directly do not do mutually exclusive processing,
// so it may appear that the tablet is deleted first, and the reporting information is processed later.
// Here we simply ignore the deleted tablet.
continue;
}
Replica replica = tabletInvertedIndex.getReplica(info.tabletId, backendId);
if (replica == null) {
LOG.warn("failed to find replica of tablet {} on backend {} when replaying backend report info",
info.tabletId, backendId);
continue;
}
switch (info.type) {
case BAD:
replica.setBad(true);
break;
case MISSING_VERSION:
replica.updateLastFailedVersion(info.lastFailedVersion);
break;
default:
break;
}
}
}
@Deprecated
public void replayBackendTabletsInfo(BackendTabletsInfo backendTabletsInfo) {
List<Pair<Long, Integer>> tabletsWithSchemaHash = backendTabletsInfo.getTabletSchemaHash();
if (!tabletsWithSchemaHash.isEmpty()) {
// In previous version, we save replica info in `tabletsWithSchemaHash`,
// but it is wrong because we can not get replica from `tabletInvertedIndex` when doing checkpoint,
// because when doing checkpoint, the tabletInvertedIndex is not initialized at all.
//
// So we can only discard this information, in this case, it is equivalent to losing the record of these
// operations. But it doesn't matter, these records are currently only used to record whether a replica is
// in a bad state. This state has little effect on the system, and it can be restored after the system
// has processed the bad state replica.
for (Pair<Long, Integer> tabletInfo : tabletsWithSchemaHash) {
LOG.warn("find an old backendTabletsInfo for tablet {}, ignore it", tabletInfo.first);
}
return;
}
// in new version, replica info is saved here.
// but we need to get replica from db->tbl->partition->...
List<ReplicaPersistInfo> replicaPersistInfos = backendTabletsInfo.getReplicaPersistInfos();
for (ReplicaPersistInfo info : replicaPersistInfos) {
OlapTable olapTable = (OlapTable) getInternalCatalog().getDb(info.getDbId())
.flatMap(db -> db.getTable(info.getTableId())).filter(t -> t.isManagedTable())
.orElse(null);
if (olapTable == null) {
continue;
}
olapTable.writeLock();
try {
Partition partition = olapTable.getPartition(info.getPartitionId());
if (partition == null) {
continue;
}
MaterializedIndex mindex = partition.getIndex(info.getIndexId());
if (mindex == null) {
continue;
}
Tablet tablet = mindex.getTablet(info.getTabletId());
if (tablet == null) {
continue;
}
Replica replica = tablet.getReplicaById(info.getReplicaId());
if (replica != null) {
replica.setBad(true);
if (LOG.isDebugEnabled()) {
LOG.debug("get replica {} of tablet {} on backend {} to bad when replaying",
info.getReplicaId(), info.getTabletId(), info.getBackendId());
}
}
} finally {
olapTable.writeUnlock();
}
}
}
// Convert table's distribution type from hash to random.
public void convertDistributionType(Database db, OlapTable tbl) throws DdlException {
tbl.writeLockOrDdlException();
try {
if (tbl.isColocateTable()) {
throw new DdlException("Cannot change distribution type of colocate table.");
}
if (tbl.getKeysType() == KeysType.UNIQUE_KEYS) {
throw new DdlException("Cannot change distribution type of unique keys table.");
}
if (tbl.getKeysType() == KeysType.AGG_KEYS) {
for (Column column : tbl.getBaseSchema()) {
if (column.getAggregationType() == AggregateType.REPLACE
|| column.getAggregationType() == AggregateType.REPLACE_IF_NOT_NULL) {
throw new DdlException("Cannot change distribution type of aggregate keys table which has value"
+ " columns with " + column.getAggregationType() + " type.");
}
}
}
if (!tbl.convertHashDistributionToRandomDistribution()) {
throw new DdlException("Table " + tbl.getName() + " is not hash distributed");
}
TableInfo tableInfo = TableInfo.createForModifyDistribution(db.getId(), tbl.getId());
editLog.logModifyDistributionType(tableInfo);
LOG.info("finished to modify distribution type of table from hash to random : " + tbl.getName());
} finally {
tbl.writeUnlock();
}
}
public void replayConvertDistributionType(TableInfo info) throws MetaNotFoundException {
Database db = getInternalCatalog().getDbOrMetaException(info.getDbId());
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(info.getTableId(), TableType.OLAP);
olapTable.writeLock();
try {
olapTable.convertHashDistributionToRandomDistribution();
LOG.info("replay modify distribution type of table from hash to random : " + olapTable.getName());
} finally {
olapTable.writeUnlock();
}
}
/*
* The entry of replacing partitions with temp partitions.
*/
public void replaceTempPartition(Database db, OlapTable olapTable, ReplacePartitionClause clause)
throws DdlException {
Preconditions.checkState(olapTable.isWriteLockHeldByCurrentThread());
List<String> partitionNames = clause.getPartitionNames();
List<String> tempPartitionNames = clause.getTempPartitionNames();
boolean isStrictRange = clause.isStrictRange();
boolean useTempPartitionName = clause.useTempPartitionName();
boolean isForceDropOld = clause.isForceDropOldPartition();
// check partition exist
for (String partName : partitionNames) {
if (!olapTable.checkPartitionNameExist(partName, false)) {
throw new DdlException("Partition[" + partName + "] does not exist");
}
}
for (String partName : tempPartitionNames) {
if (!olapTable.checkPartitionNameExist(partName, true)) {
throw new DdlException("Temp partition[" + partName + "] does not exist");
}
}
List<Long> replacedPartitionIds = olapTable.replaceTempPartitions(db.getId(), partitionNames,
tempPartitionNames, isStrictRange,
useTempPartitionName, isForceDropOld);
long version = 0L;
long versionTime = System.currentTimeMillis();
// In cloud mode, the internal partition deletion logic will update the table version,
// so here we only need to handle non-cloud mode.
if (Config.isNotCloudMode()) {
version = olapTable.getNextVersion();
olapTable.updateVisibleVersionAndTime(version, versionTime);
}
// Here, we only wait for the EventProcessor to finish processing the event,
// but regardless of the success or failure of the result,
// it does not affect the logic of replace the partition
try {
Env.getCurrentEnv().getEventProcessor().processEvent(
new ReplacePartitionEvent(db.getCatalog().getId(), db.getId(),
olapTable.getId()));
} catch (Throwable t) {
// According to normal logic, no exceptions will be thrown,
// but in order to avoid bugs affecting the original logic, all exceptions are caught
LOG.warn("produceEvent failed: ", t);
}
// write log
ReplacePartitionOperationLog info = new ReplacePartitionOperationLog(db.getId(), db.getFullName(),
olapTable.getId(), olapTable.getName(),
partitionNames, tempPartitionNames, replacedPartitionIds, isStrictRange, useTempPartitionName, version,
versionTime,
isForceDropOld);
editLog.logReplaceTempPartition(info);
LOG.info("finished to replace partitions {} with temp partitions {} from table: {}", clause.getPartitionNames(),
clause.getTempPartitionNames(), olapTable.getName());
}
public void replayReplaceTempPartition(ReplacePartitionOperationLog replaceTempPartitionLog)
throws MetaNotFoundException {
long dbId = replaceTempPartitionLog.getDbId();
long tableId = replaceTempPartitionLog.getTblId();
Database db = getInternalCatalog().getDbOrMetaException(dbId);
OlapTable olapTable = (OlapTable) db
.getTableOrMetaException(tableId, Lists.newArrayList(TableType.OLAP, TableType.MATERIALIZED_VIEW));
olapTable.writeLock();
try {
olapTable.replaceTempPartitions(dbId, replaceTempPartitionLog.getPartitions(),
replaceTempPartitionLog.getTempPartitions(), replaceTempPartitionLog.isStrictRange(),
replaceTempPartitionLog.useTempPartitionName(), replaceTempPartitionLog.isForce());
// In cloud mode, the internal partition deletion logic will update the table version,
// so here we only need to handle non-cloud mode.
if (Config.isNotCloudMode()) {
olapTable.updateVisibleVersionAndTime(replaceTempPartitionLog.getVersion(),
replaceTempPartitionLog.getVersionTime());
}
} catch (DdlException e) {
throw new MetaNotFoundException(e);
} finally {
olapTable.writeUnlock();
}
}
public void installPlugin(InstallPluginStmt stmt) throws UserException, IOException {
pluginMgr.installPlugin(stmt);
}
public long savePlugins(CountingDataOutputStream dos, long checksum) throws IOException {
Env.getCurrentPluginMgr().write(dos);
return checksum;
}
public long loadPlugins(DataInputStream dis, long checksum) throws IOException {
Env.getCurrentPluginMgr().readFields(dis);
LOG.info("finished replay plugins from image");
return checksum;
}
public void replayInstallPlugin(PluginInfo pluginInfo) throws MetaNotFoundException {
try {
pluginMgr.replayLoadDynamicPlugin(pluginInfo);
} catch (Exception e) {
throw new MetaNotFoundException(e);
}
}
public void uninstallPlugin(UninstallPluginStmt stmt) throws IOException, UserException {
PluginInfo info = pluginMgr.uninstallPlugin(stmt.getPluginName());
if (null != info) {
editLog.logUninstallPlugin(info);
}
LOG.info("uninstall plugin = " + stmt.getPluginName());
}
public void uninstallPlugin(UninstallPluginCommand cmd) throws IOException, UserException {
PluginInfo info = pluginMgr.uninstallPlugin(cmd.getPluginName());
if (null != info) {
editLog.logUninstallPlugin(info);
}
LOG.info("uninstall plugin = " + cmd.getPluginName());
}
public void replayUninstallPlugin(PluginInfo pluginInfo) throws MetaNotFoundException {
try {
pluginMgr.uninstallPlugin(pluginInfo.getName());
} catch (Exception e) {
throw new MetaNotFoundException(e);
}
}
// entry of checking tablets operation
public void checkTablets(AdminCheckTabletsStmt stmt) {
CheckType type = stmt.getType();
switch (type) {
case CONSISTENCY:
consistencyChecker.addTabletsToCheck(stmt.getTabletIds());
break;
default:
break;
}
}
public void setTableStatus(AdminSetTableStatusStmt stmt) throws MetaNotFoundException {
String dbName = stmt.getDbName();
String tableName = stmt.getTblName();
setTableStatusInternal(dbName, tableName, stmt.getTableState(), false);
}
public void replaySetTableStatus(SetTableStatusOperationLog log) throws MetaNotFoundException {
setTableStatusInternal(log.getDbName(), log.getTblName(), log.getState(), true);
}
public void setTableStatusInternal(String dbName, String tableName, OlapTableState state, boolean isReplay)
throws MetaNotFoundException {
Database db = getInternalCatalog().getDbOrMetaException(dbName);
OlapTable olapTable = (OlapTable) db.getTableOrMetaException(tableName, TableType.OLAP);
olapTable.writeLockOrMetaException();
try {
OlapTableState oldState = olapTable.getState();
if (state != null && oldState != state) {
olapTable.setState(state);
if (!isReplay) {
SetTableStatusOperationLog log = new SetTableStatusOperationLog(dbName, tableName, state);
editLog.logSetTableStatus(log);
}
LOG.info("set table {} state from {} to {}. is replay: {}.",
tableName, oldState, state, isReplay);
} else {
LOG.warn("ignore set same state {} for table {}. is replay: {}.",
olapTable.getState(), tableName, isReplay);
}
} finally {
olapTable.writeUnlock();
}
}
// Set specified replica's status. If replica does not exist, just ignore it.
public void setReplicaStatus(AdminSetReplicaStatusCommand command) throws MetaNotFoundException {
long tabletId = command.getTabletId();
long backendId = command.getBackendId();
ReplicaStatus status = command.getStatus();
long userDropTime = status == ReplicaStatus.DROP ? System.currentTimeMillis() : -1L;
setReplicaStatusInternal(tabletId, backendId, status, userDropTime, false);
}
public void setReplicaStatus(AdminSetReplicaStatusStmt stmt) throws MetaNotFoundException {
long tabletId = stmt.getTabletId();
long backendId = stmt.getBackendId();
ReplicaStatus status = stmt.getStatus();
long userDropTime = status == ReplicaStatus.DROP ? System.currentTimeMillis() : -1L;
setReplicaStatusInternal(tabletId, backendId, status, userDropTime, false);
}
public void replaySetReplicaStatus(SetReplicaStatusOperationLog log) throws MetaNotFoundException {
setReplicaStatusInternal(log.getTabletId(), log.getBackendId(), log.getReplicaStatus(),
log.getUserDropTime(), true);
}
private void setReplicaStatusInternal(long tabletId, long backendId, ReplicaStatus status, long userDropTime,
boolean isReplay)
throws MetaNotFoundException {
try {
TabletMeta meta = tabletInvertedIndex.getTabletMeta(tabletId);
if (meta == null) {
throw new MetaNotFoundException("tablet does not exist");
}
Database db = getInternalCatalog().getDbOrMetaException(meta.getDbId());
Table table = db.getTableOrMetaException(meta.getTableId());
table.writeLockOrMetaException();
try {
Replica replica = tabletInvertedIndex.getReplica(tabletId, backendId);
if (replica == null) {
throw new MetaNotFoundException("replica does not exist on backend, beId=" + backendId);
}
boolean updated = false;
if (status == ReplicaStatus.BAD || status == ReplicaStatus.OK) {
replica.setUserDropTime(-1L);
if (replica.setBad(status == ReplicaStatus.BAD)) {
updated = true;
LOG.info("set replica {} of tablet {} on backend {} as {}. is replay: {}", replica.getId(),
tabletId, backendId, status, isReplay);
}
} else if (status == ReplicaStatus.DROP) {
replica.setUserDropTime(userDropTime);
updated = true;
LOG.info("set replica {} of tablet {} on backend {} as {}.", replica.getId(),
tabletId, backendId, status);
}
if (updated && !isReplay) {
SetReplicaStatusOperationLog log = new SetReplicaStatusOperationLog(backendId, tabletId,
status, userDropTime);
getEditLog().logSetReplicaStatus(log);
}
} finally {
table.writeUnlock();
}
} catch (MetaNotFoundException e) {
throw new MetaNotFoundException("set replica status failed, tabletId=" + tabletId, e);
}
}
// Set specified replica's version. If replica does not exist, just ignore it.
public void setReplicaVersion(AdminSetReplicaVersionStmt stmt) throws MetaNotFoundException {
long tabletId = stmt.getTabletId();
long backendId = stmt.getBackendId();
Long version = stmt.getVersion();
Long lastSuccessVersion = stmt.getLastSuccessVersion();
Long lastFailedVersion = stmt.getLastFailedVersion();
long updateTime = System.currentTimeMillis();
setReplicaVersionInternal(tabletId, backendId, version, lastSuccessVersion, lastFailedVersion,
updateTime, false);
}
public void replaySetReplicaVersion(SetReplicaVersionOperationLog log) throws MetaNotFoundException {
setReplicaVersionInternal(log.getTabletId(), log.getBackendId(), log.getVersion(),
log.getLastSuccessVersion(), log.getLastFailedVersion(), log.getUpdateTime(), true);
}
private void setReplicaVersionInternal(long tabletId, long backendId, Long version, Long lastSuccessVersion,
Long lastFailedVersion, long updateTime, boolean isReplay)
throws MetaNotFoundException {
try {
if (Config.isCloudMode()) {
throw new MetaNotFoundException("not support modify replica version in cloud mode");
}
TabletMeta meta = tabletInvertedIndex.getTabletMeta(tabletId);
if (meta == null) {
throw new MetaNotFoundException("tablet does not exist");
}
Database db = getInternalCatalog().getDbOrMetaException(meta.getDbId());
Table table = db.getTableOrMetaException(meta.getTableId());
table.writeLockOrMetaException();
try {
Replica replica = tabletInvertedIndex.getReplica(tabletId, backendId);
if (replica == null) {
throw new MetaNotFoundException("replica does not exist on backend, beId=" + backendId);
}
replica.adminUpdateVersionInfo(version, lastFailedVersion, lastSuccessVersion, updateTime);
if (!isReplay) {
SetReplicaVersionOperationLog log = new SetReplicaVersionOperationLog(backendId, tabletId,
version, lastSuccessVersion, lastFailedVersion, updateTime);
getEditLog().logSetReplicaVersion(log);
}
LOG.info("set replica {} of tablet {} on backend {} as version {}, last success version {}, "
+ "last failed version {}, update time {}. is replay: {}", replica.getId(), tabletId,
backendId, version, lastSuccessVersion, lastFailedVersion, updateTime, isReplay);
} finally {
table.writeUnlock();
}
} catch (MetaNotFoundException e) {
throw new MetaNotFoundException("set replica version failed, tabletId=" + tabletId, e);
}
}
public void eraseDatabase(long dbId, boolean needEditLog) {
// remove jobs
Env.getCurrentEnv().getLoadInstance().removeDbLoadJob(dbId);
// remove database transaction manager
Env.getCurrentGlobalTransactionMgr().removeDatabaseTransactionMgr(dbId);
if (needEditLog) {
Env.getCurrentEnv().getEditLog().logEraseDb(dbId);
}
}
public void onEraseOlapTable(OlapTable olapTable, boolean isReplay) {
// inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
Collection<Partition> allPartitions = olapTable.getAllPartitions();
for (Partition partition : allPartitions) {
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
}
// TODO: does checkpoint need update colocate index ?
// colocation
Env.getCurrentColocateIndex().removeTable(olapTable.getId());
getInternalCatalog().eraseTableDropBackendReplicas(olapTable, isReplay);
}
public void onErasePartition(Partition partition) {
// remove tablet in inverted index
TabletInvertedIndex invertedIndex = Env.getCurrentInvertedIndex();
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
for (Tablet tablet : index.getTablets()) {
invertedIndex.deleteTablet(tablet.getId());
}
}
getInternalCatalog().erasePartitionDropBackendReplicas(Lists.newArrayList(partition));
}
public void cleanTrash(AdminCleanTrashStmt stmt) {
List<Backend> backends = stmt.getBackends();
AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backends) {
CleanTrashTask cleanTrashTask = new CleanTrashTask(backend.getId());
batchTask.addTask(cleanTrashTask);
LOG.info("clean trash in be {}, beId {}", backend.getHost(), backend.getId());
}
AgentTaskExecutor.submit(batchTask);
}
public void cleanUDFCacheTask(DropFunctionStmt stmt) throws UserException {
ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getAllBackendsByAllCluster();
String functionSignature = stmt.signatureString();
AgentBatchTask batchTask = new AgentBatchTask();
for (Backend backend : backendsInfo.values()) {
CleanUDFCacheTask cleanUDFCacheTask = new CleanUDFCacheTask(backend.getId(), functionSignature);
batchTask.addTask(cleanUDFCacheTask);
LOG.info("clean udf cache in be {}, beId {}", backend.getHost(), backend.getId());
}
AgentTaskExecutor.submit(batchTask);
}
public void setPartitionVersion(AdminSetPartitionVersionStmt stmt) throws DdlException {
String database = stmt.getDatabase();
String table = stmt.getTable();
long partitionId = stmt.getPartitionId();
long visibleVersion = stmt.getVisibleVersion();
int setSuccess = setPartitionVersionInternal(database, table, partitionId, visibleVersion, false);
if (setSuccess == -1) {
throw new DdlException("Failed to set partition visible version to " + visibleVersion + ". " + "Partition "
+ partitionId + " not exists. Database " + database + ", Table " + table + ".");
}
}
public void replaySetPartitionVersion(SetPartitionVersionOperationLog log) throws DdlException {
int setSuccess = setPartitionVersionInternal(log.getDatabase(), log.getTable(),
log.getPartitionId(), log.getVisibleVersion(), true);
if (setSuccess == -1) {
LOG.warn("Failed to set partition visible version to {}. "
+ "Database {}, Table {}, Partition {} not exists.", log.getDatabase(), log.getTable(),
log.getVisibleVersion(), log.getPartitionId());
}
}
public int setPartitionVersionInternal(String database, String table, long partitionId,
long visibleVersion, boolean isReplay) throws DdlException {
int result = -1;
Database db = getInternalCatalog().getDbOrDdlException(database);
OlapTable olapTable = db.getOlapTableOrDdlException(table);
olapTable.writeLockOrDdlException();
try {
Partition partition = olapTable.getPartition(partitionId);
if (partition != null) {
Long oldVersion = partition.getVisibleVersion();
partition.updateVisibleVersion(visibleVersion);
partition.setNextVersion(visibleVersion + 1);
result = 0;
if (!isReplay) {
SetPartitionVersionOperationLog log = new SetPartitionVersionOperationLog(
database, table, partitionId, visibleVersion);
getEditLog().logSetPartitionVersion(log);
}
LOG.info("set partition {} visible version from {} to {}. Database {}, Table {}, is replay:"
+ " {}.", partitionId, oldVersion, visibleVersion, database, table, isReplay);
}
} finally {
olapTable.writeUnlock();
}
return result;
}
public static boolean isStoredTableNamesLowerCase() {
return GlobalVariable.lowerCaseTableNames == 1;
}
public static boolean isTableNamesCaseInsensitive() {
return GlobalVariable.lowerCaseTableNames == 2;
}
public static boolean isTableNamesCaseSensitive() {
return GlobalVariable.lowerCaseTableNames == 0;
}
private static void getTableMeta(OlapTable olapTable, TGetMetaDBMeta dbMeta) {
if (LOG.isDebugEnabled()) {
LOG.debug("get table meta. table: {}", olapTable.getName());
}
TGetMetaTableMeta tableMeta = new TGetMetaTableMeta();
olapTable.readLock();
try {
tableMeta.setId(olapTable.getId());
tableMeta.setName(olapTable.getName());
tableMeta.setType(olapTable.getType().name());
PartitionInfo tblPartitionInfo = olapTable.getPartitionInfo();
Collection<Partition> partitions = olapTable.getAllPartitions();
for (Partition partition : partitions) {
TGetMetaPartitionMeta partitionMeta = new TGetMetaPartitionMeta();
long partitionId = partition.getId();
partitionMeta.setId(partitionId);
partitionMeta.setName(partition.getName());
String partitionRange = tblPartitionInfo.getPartitionRangeString(partitionId);
partitionMeta.setRange(partitionRange);
partitionMeta.setVisibleVersion(partition.getVisibleVersion());
// partitionMeta.setTemp(partition.isTemp());
for (MaterializedIndex index : partition.getMaterializedIndices(IndexExtState.ALL)) {
TGetMetaIndexMeta indexMeta = new TGetMetaIndexMeta();
indexMeta.setId(index.getId());
indexMeta.setName(olapTable.getIndexNameById(index.getId()));
for (Tablet tablet : index.getTablets()) {
TGetMetaTabletMeta tabletMeta = new TGetMetaTabletMeta();
tabletMeta.setId(tablet.getId());
for (Replica replica : tablet.getReplicas()) {
TGetMetaReplicaMeta replicaMeta = new TGetMetaReplicaMeta();
replicaMeta.setId(replica.getId());
replicaMeta.setBackendId(replica.getBackendIdWithoutException());
replicaMeta.setVersion(replica.getVersion());
tabletMeta.addToReplicas(replicaMeta);
}
indexMeta.addToTablets(tabletMeta);
}
partitionMeta.addToIndexes(indexMeta);
}
tableMeta.addToPartitions(partitionMeta);
}
dbMeta.addToTables(tableMeta);
} finally {
olapTable.readUnlock();
}
}
public static TGetMetaResult getMeta(Database db, List<Table> tables) throws MetaNotFoundException {
TGetMetaResult result = new TGetMetaResult();
result.setStatus(new TStatus(TStatusCode.OK));
TGetMetaDBMeta dbMeta = new TGetMetaDBMeta();
dbMeta.setId(db.getId());
dbMeta.setName(db.getFullName());
if (tables == null) {
db.readLock();
tables = db.getTables();
db.readUnlock();
}
for (Table table : tables) {
if (!table.isManagedTable()) {
continue;
}
OlapTable olapTable = (OlapTable) table;
getTableMeta(olapTable, dbMeta);
}
if (Config.enable_feature_binlog) {
BinlogManager binlogManager = Env.getCurrentEnv().getBinlogManager();
// id -> commit seq
List<Pair<Long, Long>> droppedPartitions = binlogManager.getDroppedPartitions(db.getId());
List<Pair<Long, Long>> droppedTables = binlogManager.getDroppedTables(db.getId());
List<Pair<Long, Long>> droppedIndexes = binlogManager.getDroppedIndexes(db.getId());
dbMeta.setDroppedPartitionMap(droppedPartitions.stream()
.collect(Collectors.toMap(p -> p.first, p -> p.second)));
dbMeta.setDroppedTableMap(droppedTables.stream()
.collect(Collectors.toMap(p -> p.first, p -> p.second)));
dbMeta.setDroppedIndexMap(droppedIndexes.stream()
.collect(Collectors.toMap(p -> p.first, p -> p.second)));
// Keep compatibility with old version
dbMeta.setDroppedPartitions(droppedPartitions.stream()
.map(p -> p.first)
.collect(Collectors.toList()));
dbMeta.setDroppedTables(droppedTables.stream()
.map(p -> p.first)
.collect(Collectors.toList()));
dbMeta.setDroppedIndexes(droppedIndexes.stream()
.map(p -> p.first)
.collect(Collectors.toList()));
}
result.setDbMeta(dbMeta);
return result;
}
public void compactTable(AdminCompactTableStmt stmt) throws DdlException {
String dbName = stmt.getDbName();
String tableName = stmt.getTblName();
String type = stmt.getCompactionType();
List<String> partitionNames = stmt.getPartitions();
compactTable(dbName, tableName, type, partitionNames);
}
public void compactTable(String dbName, String tableName, String type, List<String> partitionNames)
throws DdlException {
Database db = getInternalCatalog().getDbOrDdlException(dbName);
OlapTable olapTable = db.getOlapTableOrDdlException(tableName);
AgentBatchTask batchTask = new AgentBatchTask();
olapTable.readLock();
try {
LOG.info("Table compaction. database: {}, table: {}, partition: {}, type: {}", dbName, tableName,
Joiner.on(", ").join(partitionNames), type);
for (String parName : partitionNames) {
Partition partition = olapTable.getPartition(parName);
if (partition == null) {
throw new DdlException("partition[" + parName + "] not exist in table[" + tableName + "]");
}
for (MaterializedIndex idx : partition.getMaterializedIndices(IndexExtState.VISIBLE)) {
for (Tablet tablet : idx.getTablets()) {
for (Replica replica : tablet.getReplicas()) {
CompactionTask compactionTask = new CompactionTask(replica.getBackendIdWithoutException(),
db.getId(), olapTable.getId(), partition.getId(), idx.getId(), tablet.getId(),
olapTable.getSchemaHashByIndexId(idx.getId()), type);
batchTask.addTask(compactionTask);
}
}
} // indices
}
} finally {
olapTable.readUnlock();
}
// send task immediately
AgentTaskExecutor.submit(batchTask);
}
private static void addTableComment(TableIf table, StringBuilder sb) {
if (StringUtils.isNotBlank(table.getComment())) {
sb.append("\nCOMMENT '").append(table.getComment(true)).append("'");
}
}
public int getFollowerCount() {
int count = 0;
for (Frontend fe : frontends.values()) {
if (fe.getRole() == FrontendNodeType.FOLLOWER) {
count++;
}
}
return count;
}
public AnalysisManager getAnalysisManager() {
return analysisManager;
}
public HboPlanStatisticsManager getHboPlanStatisticsManager() {
return hboPlanStatisticsManager;
}
public GlobalFunctionMgr getGlobalFunctionMgr() {
return globalFunctionMgr;
}
public StatisticsCleaner getStatisticsCleaner() {
return statisticsCleaner;
}
public LoadManagerAdapter getLoadManagerAdapter() {
return loadManagerAdapter;
}
public QueryStats getQueryStats() {
return queryStats;
}
public void cleanQueryStats(CleanQueryStatsInfo info) throws DdlException {
queryStats.clear(info);
editLog.logCleanQueryStats(info);
}
public void replayAutoIncrementIdUpdateLog(AutoIncrementIdUpdateLog log) throws Exception {
getInternalCatalog().replayAutoIncrementIdUpdateLog(log);
}
public ColumnIdFlushDaemon getColumnIdFlusher() {
return columnIdFlusher;
}
public StatisticsAutoCollector getStatisticsAutoCollector() {
return statisticsAutoCollector;
}
public MasterDaemon getTabletStatMgr() {
return tabletStatMgr;
}
public NereidsSqlCacheManager getSqlCacheManager() {
return sqlCacheManager;
}
public NereidsSortedPartitionsCacheManager getSortedPartitionsCacheManager() {
return sortedPartitionsCacheManager;
}
public SplitSourceManager getSplitSourceManager() {
return splitSourceManager;
}
public GlobalExternalTransactionInfoMgr getGlobalExternalTransactionInfoMgr() {
return globalExternalTransactionInfoMgr;
}
public StatisticsJobAppender getStatisticsJobAppender() {
return statisticsJobAppender;
}
public void alterMTMVRefreshInfo(AlterMTMVRefreshInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(), info.getRefreshInfo(), MTMVAlterOpType.ALTER_REFRESH_INFO);
this.alter.processAlterMTMV(alter, false);
}
public void alterMTMVProperty(AlterMTMVPropertyInfo info) {
AlterMTMV alter = new AlterMTMV(info.getMvName(), MTMVAlterOpType.ALTER_PROPERTY);
alter.setMvProperties(info.getProperties());
this.alter.processAlterMTMV(alter, false);
}
public void alterMTMVStatus(TableNameInfo mvName, MTMVStatus status) {
AlterMTMV alter = new AlterMTMV(mvName, MTMVAlterOpType.ALTER_STATUS);
alter.setStatus(status);
this.alter.processAlterMTMV(alter, false);
}
public void addMTMVTaskResult(TableNameInfo mvName, MTMVTask task, MTMVRelation relation,
Map<String, MTMVRefreshPartitionSnapshot> partitionSnapshots) {
AlterMTMV alter = new AlterMTMV(mvName, MTMVAlterOpType.ADD_TASK);
alter.setTask(task);
alter.setRelation(relation);
alter.setPartitionSnapshots(partitionSnapshots);
this.alter.processAlterMTMV(alter, false);
}
// Ensure the env is ready, otherwise throw an exception.
public void checkReadyOrThrow() throws Exception {
if (isReady()) {
return;
}
StringBuilder sb = new StringBuilder();
sb.append("Node catalog is not ready, please wait for a while. ")
.append("To master progress: " + toMasterProgress + ".\n")
.append("Frontends: \n");
for (String name : frontends.keySet()) {
Frontend frontend = frontends.get(name);
if (name == null) {
continue;
}
sb.append(frontend.toString()).append("\n");
}
String reason = editLog.getNotReadyReason();
if (!Strings.isNullOrEmpty(reason)) {
sb.append("Reason: ").append(reason).append("%\n");
}
if (haProtocol instanceof BDBHA) {
try {
BDBHA ha = (BDBHA) haProtocol;
List<InetSocketAddress> electableNodes = ha.getElectableNodes(true);
if (!electableNodes.isEmpty()) {
sb.append("Electable nodes: \n");
for (InetSocketAddress addr : electableNodes) {
sb.append(addr.toString()).append("\n");
}
}
List<InetSocketAddress> observerNodes = ha.getObserverNodes();
if (!observerNodes.isEmpty()) {
sb.append("Observer nodes: \n");
for (InetSocketAddress addr : electableNodes) {
sb.append(addr.toString()).append("\n");
}
}
} catch (Exception e) {
LOG.warn("checkReadyOrThrow:", e);
}
}
throw new Exception(sb.toString());
}
public void checkReadyOrThrowTException() throws TException {
try {
checkReadyOrThrow();
} catch (Exception e) {
throw new TException(e);
}
}
private void replayJournalsAndExit() {
replayJournal(-1);
LOG.info("check metadata compatibility successfully");
LogUtils.stdout("check metadata compatibility successfully");
if (Config.checkpoint_after_check_compatibility) {
String imagePath = dumpImage();
String msg = "the new image file path is: " + imagePath;
LOG.info(msg);
LogUtils.stdout(msg);
}
System.exit(0);
}
public void registerSessionInfo(String sessionId) {
this.aliveSessionSet.add(sessionId);
}
public void unregisterSessionInfo(String sessionId) {
this.aliveSessionSet.remove(sessionId);
}
public List<String> getAllAliveSessionIds() {
return new ArrayList<>(aliveSessionSet);
}
}