DictionaryManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.dictionary;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.CustomThreadFactory;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Status;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.dictionary.Dictionary.DictionaryStatus;
import org.apache.doris.job.extensions.insert.InsertTask;
import org.apache.doris.job.manager.TaskDisruptorGroupManager;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.trees.plans.commands.info.CreateDictionaryInfo;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoDictionaryCommand;
import org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
import org.apache.doris.persist.CreateDictionaryPersistInfo;
import org.apache.doris.persist.DictionaryDecreaseVersionInfo;
import org.apache.doris.persist.DictionaryIncreaseVersionInfo;
import org.apache.doris.persist.DropDictionaryPersistInfo;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.proto.InternalService;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TDictionaryStatus;
import org.apache.doris.thrift.TDictionaryStatusList;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* Manager for dictionary operations, including creation, deletion, and data loading.
*/
public class DictionaryManager extends MasterDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(DictionaryManager.class);
private static final long DICTIONARY_JOB_ID = -493209151411825L; // "DICTIONARY" to INT
// Lock for protecting dictionaryIds map
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(true);
/// ATTN: we MUST have only one container holds the dictionary object because of GSON deserialization.
/// make it `idToDictionary`. so all others MUST be secondary index.\
// Map of database name -> dictionary name -> dictionary id
@SerializedName(value = "ids")
private Map<String, Map<String, Long>> dictionaryIds = Maps.newConcurrentMap();
// dbname -> tablename -> dict id
@SerializedName(value = "t")
private Map<String, ListMultimap<String, Long>> dbTableToDicIds = Maps.newConcurrentMap();
@SerializedName(value = "idmap")
private Map<Long, Dictionary> idToDictionary = Maps.newConcurrentMap();
@SerializedName(value = "i")
private long uniqueId = 0;
private static final int DISPATCH_DICTIONARY_THREAD_NUM = Config.job_dictionary_task_consumer_thread_num > 0
? Config.job_dictionary_task_consumer_thread_num
: TaskDisruptorGroupManager.DEFAULT_CONSUMER_THREAD_NUM;
private static final int DISPATCH_DICTIONARY_TASK_QUEUE_SIZE = TaskDisruptorGroupManager
.normalizeRingbufferSize(Config.dictionary_task_queue_size);
// thread pool for dictionary data load and unload
private ExecutorService executor;
public DictionaryManager() {
super("Dictionary Manager", Config.dictionary_auto_refresh_interval_seconds * 1000);
this.executor = new ThreadPoolExecutor(
DISPATCH_DICTIONARY_THREAD_NUM, // default thread num
DISPATCH_DICTIONARY_THREAD_NUM, // max = default
0L, // max = default so not useful
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(DISPATCH_DICTIONARY_TASK_QUEUE_SIZE),
new CustomThreadFactory("dictionary-task-execute"),
new ThreadPoolExecutor.AbortPolicy() // throw when queue is full
);
}
@Override
protected void runAfterCatalogReady() {
// interval unit is ms. aware of ADMIN SET CONFIG
setInterval(Config.dictionary_auto_refresh_interval_seconds * 1000);
// Check and update dictionary data in each cycle
try {
checkAndUpdateDictionaries();
} catch (Exception e) {
LOG.warn("Failed to check and update dictionaries", e);
}
}
// if lock manager and dictionary together, manager should be locked first!
public void lockRead() {
lock.readLock().lock();
}
public void unlockRead() {
lock.readLock().unlock();
}
public void lockWrite() {
lock.writeLock().lock();
}
public void unlockWrite() {
lock.writeLock().unlock();
}
/**
* Create a new dictionary based on the provided info.
*
* @throws Exception
*/
public Dictionary createDictionary(ConnectContext ctx, CreateDictionaryInfo info) throws Exception {
lockWrite();
try {
// 1. Check if dictionary already exists
if (hasDictionaryWithoutLock(info.getDbName(), info.getDictName())) {
if (info.isIfNotExists()) {
return getDictionary(info.getDbName(), info.getDictName());
} else {
throw new DdlException(
"Dictionary " + info.getDictName() + " already exists in database " + info.getDbName());
}
}
// 2. Create dictionary object. the origin status is OUT_OF_DATE.
Dictionary dictionary = new Dictionary(info, ++uniqueId);
// Add to dictionaryIds map. no throw here. so schedule below is safe.
idToDictionary.put(dictionary.getId(), dictionary);
Map<String, Long> dbDictIds = dictionaryIds.computeIfAbsent(info.getDbName(),
k -> Maps.newConcurrentMap());
dbDictIds.put(info.getDictName(), dictionary.getId());
ListMultimap<String, Long> tableToDicIds = dbTableToDicIds.computeIfAbsent(info.getDbName(),
k -> ArrayListMultimap.create());
tableToDicIds.put(info.getSourceTableName(), dictionary.getId());
// 3. Log the creation operation
Env.getCurrentEnv().getEditLog().logCreateDictionary(dictionary);
if (!dictionary.hasNewerSourceVersion()) {
// shouldn't be. the data version in dictionary should be zero now.
LOG.warn("Dictionary {} is too new when creating", dictionary.getName());
}
submitDataLoad(dictionary, false);
return dictionary;
} finally {
unlockWrite();
}
}
/// for all drop operations, we don't care about data drop on BE. drop metadata and when BEs report them,
/// they are unknown dicts at that time. daemon will schedule to drop them on BEs.
/**
* Delete a dictionary.
*
* @throws DdlException if the dictionary does not exist or unload failed
*/
public void dropDictionary(ConnectContext ctx, String dbName, String dictName, boolean ifExists)
throws DdlException {
lockWrite();
Dictionary dictionary = null;
try {
Map<String, Long> dbDictIds = dictionaryIds.get(dbName);
if (dbDictIds == null || !dbDictIds.containsKey(dictName)) {
if (!ifExists) {
throw new DdlException("Dictionary " + dictName + " does not exist in database " + dbName);
}
return;
}
Long id = dbDictIds.remove(dictName);
dictionary = idToDictionary.remove(id);
// remove mapping from table to dict
dbTableToDicIds.get(dbName).remove(dictionary.getSourceTableName(), id);
// Log the drop operation
Env.getCurrentEnv().getEditLog().logDropDictionary(dbName, dictName);
} finally {
unlockWrite();
}
}
/**
* Drop all dictionaries in a table. Used when dropping a table. So maybe no db or table records.
*/
public void dropTableDictionaries(String dbName, String tableName) {
lockWrite();
try {
ListMultimap<String, Long> tableToDictIds = dbTableToDicIds.get(dbName);
if (tableToDictIds == null) { // this db has no table with dictionary records.
return;
}
// get all dictionary names of this table
List<Long> dictIds = tableToDictIds.removeAll(tableName);
if (dictIds == null) { // this table has no dictionaries.
return;
}
// all this db's dictionaries. tableToDictIds is not null so nameToDics must not be null.
Map<String, Long> nameToIds = dictionaryIds.get(dbName);
for (Long id : dictIds) {
Dictionary dict = idToDictionary.remove(id);
if (id == null) {
LOG.warn("Dictionary {} does not exist in dictionaryIds", id);
continue;
}
nameToIds.remove(dict.getName());
// Log the drop operation
Env.getCurrentEnv().getEditLog().logDropDictionary(dbName, dict.getName());
}
} finally {
unlockWrite();
}
}
/**
* Drop all dictionaries in a database. Used when dropping a database.
*/
public void dropDbDictionaries(String dbName) {
lockWrite();
try {
// pop and save item from dictionaries
Map<String, Long> dbDictIds = dictionaryIds.remove(dbName);
// Log the drop operation
if (dbDictIds != null) {
for (Map.Entry<String, Long> entry : dbDictIds.entrySet()) {
Env.getCurrentEnv().getEditLog().logDropDictionary(dbName, entry.getKey());
}
// also drop all name mapping records.
dbTableToDicIds.remove(dbName);
}
} finally {
unlockWrite();
}
}
private boolean hasDictionaryWithoutLock(String dbName, String dictName) {
Map<String, Long> dbDictIds = dictionaryIds.get(dbName);
return dbDictIds != null && dbDictIds.containsKey(dictName);
}
public Map<String, Dictionary> getDictionaries(String dbName) {
lockRead();
try {
Map<String, Long> ids = dictionaryIds.computeIfAbsent(dbName, k -> Maps.newConcurrentMap());
return Maps.transformValues(ids, id -> idToDictionary.get(id));
} finally {
unlockRead();
}
}
/**
* Get a dictionary.
*
* @throws DdlException if the dictionary does not exist
*/
public Dictionary getDictionary(String dbName, String dictName) throws DdlException {
lockRead();
try {
Map<String, Long> dbDictIds = dictionaryIds.get(dbName);
if (dbDictIds == null || !dbDictIds.containsKey(dictName)) {
throw new DdlException("Dictionary " + dictName + " does not exist in database " + dbName);
}
return idToDictionary.get(dbDictIds.get(dictName));
} finally {
unlockRead();
}
}
public Dictionary getDictionary(long dictId) {
lockRead();
try {
return idToDictionary.get(dictId);
} finally {
unlockRead();
}
}
/**
* Get all BE's dictionaries' status. Then load for lack of dictionary and
* unload for unknown dictionary.
*/
private void checkAndUpdateDictionaries() throws Exception {
long now = System.currentTimeMillis();
// get all BE dictionaries' status
Map<Long, List<Long>> unknownDictsIdtoBes = collectDictionaryStatus(null);
// DROP unknown dictionaries
for (Map.Entry<Long, List<Long>> entry : unknownDictsIdtoBes.entrySet()) {
Long dictId = entry.getKey();
List<Long> beIds = entry.getValue();
submitDataUnload(dictId, beIds);
}
// check all dictionaries and REFRESH if needed
lockRead(); // lock to protect with DROP and LOAD_BY_CREATE
try {
for (Map<String, Long> dbDictIds : dictionaryIds.values()) {
// ATTN: there shouldn't be any Exception in this loop. will block irrelated dictionary refresh.
for (Long id : dbDictIds.values()) {
Dictionary dictionary = idToDictionary.get(id);
/// for all dictionaries:
// 1. if it's OUT_OF_DATE(maybe update failed or something), try to refresh it.
if (dictionary.getStatus() == DictionaryStatus.OUT_OF_DATE && dictionary.checkBaseDataValid()) {
submitDataLoad(dictionary, false);
continue;
}
// 2. if some BE lost datas(new or restart), refresh it all.
if (!dictionary.dataCompleted() // rely on collectDictionaryStatus() we just did.
// 3. base table has been updated AND when data is older than its lifetime, refresh it.
|| dictionary.hasNewerSourceVersion() && dictionary.getNextRefreshTime() < now) {
// should schedule refresh. ONLY trigger when it's NORMAL because if not,
// it's already going to refresh or drop.
if (dictionary.trySetStatusIf(DictionaryStatus.NORMAL, DictionaryStatus.OUT_OF_DATE)) {
submitDataLoad(dictionary, true);
}
}
}
}
} finally {
unlockRead();
}
}
/**
* @param adaptiveLoad if only load to outdated BE, true. if must load to all BE, false.
*/
private void submitDataLoad(Dictionary dictionary, boolean adaptiveLoad) {
LOG.info("Submit dictionary {} refresh task", dictionary.getName());
executor.execute(() -> {
try {
dataLoad(null, dictionary, adaptiveLoad);
} catch (Exception e) {
LOG.warn("Failed to load dictionary " + dictionary.getName(), e);
}
});
}
/**
* For task which auto submitted, if the status is already not OUT_OF_DATE, just skip this task because it have been
* loaded after task submitted.
*
* @param ctx must keep it null for auto scheduled task.
*/
public void dataLoad(ConnectContext ctx, Dictionary dictionary, boolean adaptiveLoad) throws Exception {
Dictionary.DictionaryStatus oldStatus = dictionary.getStatus();
if (ctx == null && oldStatus != DictionaryStatus.OUT_OF_DATE) {
LOG.info("skip auto-triggered dataLoad of dictionary " + dictionary.getName());
return;
}
// use atomic status as a lock.
if (!dictionary.trySetStatus(Dictionary.DictionaryStatus.LOADING)) {
throw new AnalysisException("Dictionary " + dictionary.getName() + " cannot load now, status is "
+ dictionary.getStatus().name());
}
if (ctx == null) { // for run with scheduler, not by command.
// priv check is done in relative(caller) command. so use ADMIN here is ok.
ctx = InsertTask.makeConnectContext(UserIdentity.ADMIN, dictionary.getDbName());
}
// not use rerfresh command's executor to avoid potential problems.
StmtExecutor executor = InsertTask.makeStmtExecutor(ctx);
NereidsParser parser = new NereidsParser();
InsertIntoTableCommand baseCommand = (InsertIntoTableCommand) parser
.parseSingle("insert into " + dictionary.getDbName() + "." + dictionary.getName() + " select * from "
+ dictionary.getSourceCtlName() + "." + dictionary.getSourceDbName() + "."
+ dictionary.getSourceTableName());
LOG.info("Loading to dictionary {} with query {}", dictionary.getName(), ctx.queryId());
if (!baseCommand.getLabelName().isPresent()) {
baseCommand.setLabelName(Optional.of(DICTIONARY_JOB_ID + "_" + ctx.queryId().toString()));
}
if (baseCommand.getJobId() == 0) {
baseCommand.setJobId(DICTIONARY_JOB_ID);
}
InsertIntoDictionaryCommand command = new InsertIntoDictionaryCommand(baseCommand, dictionary, adaptiveLoad);
// run with sync by status.
try {
// avoid to generate EmptySetNode making us not able to get base table version.
ctx.getSessionVariable().setVarOnce(SessionVariable.DISABLE_NEREIDS_RULES,
"OLAP_SCAN_PARTITION_PRUNE,PRUNE_EMPTY_PARTITION");
command.run(ctx, executor);
} catch (Exception e) {
// wait next shedule.
dictionary.trySetStatus(oldStatus);
dictionary.setLastUpdateResult(e.getMessage());
throw e;
}
// some insert failed won't throw but only set error status.
if (ctx.getState().getErrorCode() != null && ctx.getState().getErrorMessage() != null) {
dictionary.trySetStatus(oldStatus);
dictionary.setLastUpdateResult(ctx.getState().getErrorMessage());
// for must failed refresh, we can skip it at next time. this mark is tricky but we have to do now.
if (ctx.getState().getErrorMessage().contains("[INVALID_DICT_MARK]")) {
LOG.warn("Dictionary {} load failed with src version {}, mark it invalid", dictionary.getName(),
ctx.getStatementContext().getDictionaryUsedSrcVersion());
dictionary.updateLatestInvalidVersion(ctx.getStatementContext().getDictionaryUsedSrcVersion());
}
throw new RuntimeException(ctx.getState().getErrorMessage());
}
// because of deleting does NOT conflict with loading, we should check dictionary's existance again!
lockRead();
boolean unlocked = false;
try {
if (!dictionaryIds.containsKey(dictionary.getDbName())
|| !dictionaryIds.get(dictionary.getDbName()).containsKey(dictionary.getName())) {
unlockRead();
unlocked = true;
// WITHOUT LOCK HERE. MUST NOT THROW BEFORE HERE!!!
dictionary.trySetStatus(oldStatus); // revert status.
// already dropped. abort temporary version without lock.
// haven't increase version so use getVersion() + 1
if (ctx.getStatementContext().isPartialLoadDictionary()) {
abortNextVersion(ctx, dictionary, dictionary.getVersion());
} else {
abortNextVersion(ctx, dictionary, dictionary.getVersion() + 1);
}
throw new RuntimeException("Dictionary " + dictionary.getName() + " has been dropped during loading");
}
// need under read lock here.
// complete some(could be ALL) BE's data and no source data updated -> no need to increase version
if (!ctx.getStatementContext().isPartialLoadDictionary()) {
dictionary.increaseVersion();
Env.getCurrentEnv().getEditLog().logDictionaryIncVersion(dictionary);
}
} finally {
if (!unlocked) {
unlockRead();
}
}
// commit and check the result. not modify metadata so dont need lock.
if (!commitNextVersion(ctx, dictionary)) {
dictionary.decreaseVersion();
Env.getCurrentEnv().getEditLog().logDictionaryDecVersion(dictionary);
dictionary.trySetStatus(oldStatus);
abortNextVersion(ctx, dictionary, dictionary.getVersion());
throw new RuntimeException(
"Dictionary " + dictionary.getName() + " commit version " + dictionary.getVersion() + " failed");
}
// commit succeed. update metadata.
dictionary.trySetStatus(Dictionary.DictionaryStatus.NORMAL);
dictionary.updateLastUpdateTime();
dictionary.updateSrcVersion(ctx.getStatementContext().getDictionaryUsedSrcVersion());
if (ctx.getStatementContext().isPartialLoadDictionary()) {
dictionary.setLastUpdateResult("succeed fix version " + dictionary.getVersion());
} else {
dictionary.setLastUpdateResult("succeed");
}
LOG.info("Dictionary {} refresh succeed. now version is {}. used src version {}", dictionary.getName(),
dictionary.getVersion(), ctx.getStatementContext().getDictionaryUsedSrcVersion());
}
private boolean commitNextVersion(ConnectContext ctx, Dictionary dictionary) {
// use the same BEs when we get before start loading.
List<Backend> beList = ctx.getStatementContext().getUsedBackendsDistributing();
List<Future<InternalService.PCommitRefreshDictionaryResponse>> futureList = new ArrayList<>();
boolean allSucceed = true;
try {
for (Backend be : beList) {
if (!be.isAlive()) {
throw new RuntimeException("BE " + be.getId() + " is not alive");
}
final InternalService.PCommitRefreshDictionaryRequest request =
InternalService.PCommitRefreshDictionaryRequest.newBuilder().setDictionaryId(dictionary.getId())
.setVersionId(dictionary.getVersion()).build();
Future<InternalService.PCommitRefreshDictionaryResponse> response = BackendServiceProxy.getInstance()
.commitDictionaryAsync(be.getBrpcAddress(), Config.dictionary_rpc_timeout_seconds, request);
futureList.add(response);
}
// wait all responses. if succeed, delete dictionary.
for (int i = 0; i < futureList.size(); i++) {
Future<InternalService.PCommitRefreshDictionaryResponse> future = futureList.get(i);
Backend be = beList.get(i);
if (future == null) {
throw new RuntimeException("Cannot get response future of BE " + be.getId());
}
InternalService.PCommitRefreshDictionaryResponse response = future
.get(Config.dictionary_rpc_timeout_seconds, TimeUnit.SECONDS);
if (response.hasStatus()) {
Status status = new Status(response.getStatus());
if (status.getErrorCode() != TStatusCode.OK) {
LOG.warn("Failed to commit dictionary " + dictionary.getId() + " on be " + be.getAddress()
+ " because " + status.getErrorMsg());
allSucceed = false;
}
} else {
LOG.warn("Failed to commit dictionary " + dictionary.getId() + " on be " + be.getAddress());
allSucceed = false;
}
}
} catch (Exception e) {
dictionary.setLastUpdateResult("commit failed: " + e.getMessage());
LOG.warn("Failed to commit dictionary " + dictionary.getId(), e);
allSucceed = false;
}
return allSucceed;
}
// abort could to all BE. swallow any failures.
private void abortNextVersion(ConnectContext ctx, Dictionary dictionary, long versionId) {
// use the same BEs when we get before start loading.
List<Backend> beList = ctx.getStatementContext().getUsedBackendsDistributing();
List<Future<InternalService.PAbortRefreshDictionaryResponse>> futureList = new ArrayList<>();
try {
for (Backend be : beList) {
if (!be.isAlive()) {
throw new RuntimeException("BE " + be.getId() + " is not alive");
}
final InternalService.PAbortRefreshDictionaryRequest request =
InternalService.PAbortRefreshDictionaryRequest.newBuilder().setDictionaryId(dictionary.getId())
.setVersionId(versionId).build();
Future<InternalService.PAbortRefreshDictionaryResponse> response = BackendServiceProxy.getInstance()
.abortDictionaryAsync(be.getBrpcAddress(), Config.dictionary_rpc_timeout_seconds, request);
futureList.add(response);
}
// wait all responses. if succeed, delete dictionary.
for (int i = 0; i < futureList.size(); i++) {
Future<InternalService.PAbortRefreshDictionaryResponse> future = futureList.get(i);
Backend be = beList.get(i);
if (future == null) {
throw new RuntimeException("Cannot get response future of BE " + be.getId());
}
InternalService.PAbortRefreshDictionaryResponse response = future
.get(Config.dictionary_rpc_timeout_seconds, TimeUnit.SECONDS);
if (response.hasStatus()) {
Status status = new Status(response.getStatus());
if (status.getErrorCode() != TStatusCode.OK) {
LOG.warn("Failed to abort dictionary " + dictionary.getId() + " on be " + be.getAddress()
+ " because " + status.getErrorMsg());
}
} else {
LOG.warn("Failed to abort dictionary " + dictionary.getId() + " on be " + be.getAddress());
}
}
} catch (Exception e) {
dictionary.setLastUpdateResult("abort failed: " + e.getMessage());
LOG.warn("Failed to abort dictionary " + dictionary.getId(), e);
}
}
private void submitDataUnload(long dictId, List<Long> beIds) {
LOG.info("Submit dictionary {} unload data task", dictId);
executor.execute(() -> {
try {
dataUnload(dictId, beIds);
} catch (Exception e) {
// already logged in dataUnload
}
});
}
/**
* Unload dictionary data from all alive backends. Only for drop unknown dictionary we could directly call this.
*
* @param dictId dictionary id
* @param beIds backend ids to unload. if null, unload all alive backends.
* @return true if all succeed, false if some failed.
*/
private boolean dataUnload(long dictId, List<Long> beIds) {
// some of them not alive will lead to fail. acceptable(try next time collect infos of them).
List<Backend> aliveBes = Env.getCurrentSystemInfo().getBackends(beIds);
// get all alive BEs and send rpc.
List<Future<InternalService.PDeleteDictionaryResponse>> futureList = new ArrayList<>();
boolean allSucceed = true;
try {
for (Backend be : aliveBes) {
if (!be.isAlive()) {
continue;
}
final InternalService.PDeleteDictionaryRequest request = InternalService.PDeleteDictionaryRequest
.newBuilder().setDictionaryId(dictId).build();
Future<InternalService.PDeleteDictionaryResponse> response = BackendServiceProxy.getInstance()
.deleteDictionaryAsync(be.getBrpcAddress(), Config.dictionary_rpc_timeout_seconds, request);
futureList.add(response);
}
// wait all responses. if succeed, delete dictionary.
for (int i = 0; i < futureList.size(); i++) {
Future<InternalService.PDeleteDictionaryResponse> future = futureList.get(i);
Backend be = aliveBes.get(i);
if (future == null) {
throw new RuntimeException("Cannot get response future of BE " + be.getId());
}
InternalService.PDeleteDictionaryResponse response = future.get(Config.dictionary_rpc_timeout_seconds,
TimeUnit.SECONDS);
if (response.hasStatus()) {
Status status = new Status(response.getStatus());
if (status.getErrorCode() != TStatusCode.OK) {
LOG.warn("Failed to unload dictionary " + dictId + " on be "
+ be.getAddress() + " because " + status.getErrorMsg());
allSucceed = false;
}
} else {
LOG.warn("Failed to unload dictionary " + dictId + " on be " + be.getAddress());
allSucceed = false;
}
}
} catch (Exception e) {
LOG.warn("Failed to unload dictionary " + dictId, e);
allSucceed = false;
}
if (allSucceed) {
LOG.info("Unload data of dictionary {} succeed", dictId);
}
return allSucceed;
}
/**
* Get dictionary status from all alive backends. shouldn't under lock because of RPC.
* if dictionaries changed, just let it fail.
*
* @param queryDicts query dictionaries. if null, query all dictionaries.
* @return Map of unknown dictionary <id, List<beId>>
*/
public Map<Long, List<Long>> collectDictionaryStatus(List<Long> queryDicts) throws RuntimeException {
Map<Long, List<Long>> unknownDictionaries = Maps.newHashMap();
// make the old stats of query dicts expired
if (queryDicts == null) {
queryDicts = ImmutableList.of(); // query all dictionaries
for (Dictionary dictionary : idToDictionary.values()) {
dictionary.resetDataDistributions();
}
} else {
for (Long dictId : queryDicts) {
Dictionary dictionary = getDictionary(dictId);
if (dictionary == null) {
throw new RuntimeException("Dictionary " + dictId + " does not exist");
}
dictionary.resetDataDistributions();
}
}
LOG.info("Collecting all dictionaries status for " + queryDicts.size() + " dictionaries");
if (LOG.isDebugEnabled()) {
LOG.debug("Collecting all dictionaries status for " + queryDicts);
}
// traverse all backends
for (Long backendId : Env.getCurrentSystemInfo().getAllBackendIds(true)) {
Backend backend = Env.getCurrentSystemInfo().getBackend(backendId);
BackendService.Client client = null;
TNetworkAddress address = null;
TDictionaryStatusList allStatusList = null;
try {
address = new TNetworkAddress(backend.getHost(), backend.getBePort());
client = ClientPool.backendPool.borrowObject(address);
// rpc. query for dictionaries status
allStatusList = client.getDictionaryStatus(queryDicts);
ClientPool.backendPool.returnObject(address, client);
} catch (Exception e) {
LOG.warn("failed to get dictionary status from backend[{}]", backend.getId(), e);
ClientPool.backendPool.invalidateObject(address, client);
}
if (allStatusList == null || !allStatusList.isSetDictionaryStatusList()) {
throw new RuntimeException("failed to get dictionary status from backend[" + backend.getId() + "]");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Get dictionary status from backend[{}]: {}", backend.getId(), allStatusList);
}
// traverse all dictionary status in this BE
for (TDictionaryStatus status : allStatusList.getDictionaryStatusList()) {
if (!status.isSetDictionaryId() || !status.isSetVersionId() || !status.isSetDictionaryMemorySize()) {
throw new RuntimeException("invalid dictionary status from backend[" + backend.getId() + "]");
}
long dictionaryId = status.getDictionaryId();
Dictionary dictionary = idToDictionary.get(dictionaryId);
if (dictionary == null) {
// Found an unknown dictionary, record it
unknownDictionaries.computeIfAbsent(dictionaryId, k -> Lists.newArrayList()).add(backend.getId());
continue;
}
// add one record of this dictionary in this BE
DictionaryDistribution newDistribution = new DictionaryDistribution(backend, status.getVersionId(),
status.getDictionaryMemorySize());
// add new distribution to list
dictionary.getDataDistributions().add(newDistribution);
}
}
LOG.info("Collect all dictionaries status succeed");
return unknownDictionaries;
}
public void replayCreateDictionary(CreateDictionaryPersistInfo info) {
Dictionary dictionary = info.getDictionary();
lockWrite();
try {
// Add to dictionaries map
Map<String, Long> dbDictIds = dictionaryIds.computeIfAbsent(dictionary.getDbName(),
k -> Maps.newConcurrentMap());
if (dbDictIds.containsKey(dictionary.getName())) {
LOG.warn("Dictionary {} already exists when replaying create dictionary", dictionary.getName());
return;
}
dbDictIds.put(dictionary.getName(), dictionary.getId());
dbTableToDicIds.computeIfAbsent(dictionary.getDbName(), k -> ArrayListMultimap.create())
.put(dictionary.getSourceTableName(), dictionary.getId());
idToDictionary.put(dictionary.getId(), dictionary);
uniqueId = Math.max(uniqueId, dictionary.getId());
} finally {
unlockWrite();
}
}
public void replayDropDictionary(DropDictionaryPersistInfo info) {
lockWrite();
try {
Map<String, Long> dbDictIds = dictionaryIds.get(info.getDbName());
if (dbDictIds != null) {
Long id = dbDictIds.remove(info.getDictionaryName());
Dictionary dict = idToDictionary.remove(id);
if (dbDictIds.isEmpty()) {
dictionaryIds.remove(info.getDbName());
}
dbTableToDicIds.get(info.getDbName()).remove(dict.getSourceTableName(), id);
} else {
LOG.warn("Database {} does not exist when replaying drop dictionary", info.getDbName());
}
} finally {
unlockWrite();
}
}
public void replayIncreaseVersion(DictionaryIncreaseVersionInfo info) throws DdlException {
String dbName = info.getDictionary().getDbName();
String dictName = info.getDictionary().getName();
Dictionary dictionary = getDictionary(dbName, dictName);
dictionary.writeLock();
dictionary.increaseVersion();
dictionary.writeUnlock();
}
public void replayDecreaseVersion(DictionaryDecreaseVersionInfo info) throws DdlException {
String dbName = info.getDictionary().getDbName();
String dictName = info.getDictionary().getName();
Dictionary dictionary = getDictionary(dbName, dictName);
dictionary.writeLock();
dictionary.decreaseVersion();
dictionary.writeUnlock();
}
// Metadata serialization
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
Text.writeString(out, json);
}
public static DictionaryManager read(DataInput in) throws IOException {
String json = Text.readString(in);
return GsonUtils.GSON.fromJson(json, DictionaryManager.class);
}
}