DynamicPartitionScheduler.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.clone;
import org.apache.doris.analysis.AddPartitionClause;
import org.apache.doris.analysis.DistributionDesc;
import org.apache.doris.analysis.DropPartitionClause;
import org.apache.doris.analysis.HashDistributionDesc;
import org.apache.doris.analysis.PartitionKeyDesc;
import org.apache.doris.analysis.PartitionValue;
import org.apache.doris.analysis.RandomDistributionDesc;
import org.apache.doris.analysis.SinglePartitionDesc;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.DataProperty;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.DynamicPartitionProperty;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.MTMV;
import org.apache.doris.catalog.MetaIdGenerator;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.Partition;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.PartitionKey;
import org.apache.doris.catalog.RangePartitionInfo;
import org.apache.doris.catalog.RangePartitionItem;
import org.apache.doris.catalog.Table;
import org.apache.doris.common.Config;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.AutoBucketUtils;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.DynamicPartitionUtil;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.common.util.PropertyAnalyzer;
import org.apache.doris.common.util.RangeUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.datasource.InternalCatalog;
import org.apache.doris.meta.MetaContext;
import org.apache.doris.persist.PartitionPersistInfo;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.thrift.TStorageMedium;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is used to periodically add or drop partition on an olapTable which specify dynamic partition properties
* Config.dynamic_partition_enable determine whether this feature is enable,
* Config.dynamic_partition_check_interval_seconds determine how often the task is performed
*/
public class DynamicPartitionScheduler extends MasterDaemon {
private static final Logger LOG = LogManager.getLogger(DynamicPartitionScheduler.class);
public static final String LAST_SCHEDULER_TIME = "lastSchedulerTime";
public static final String LAST_UPDATE_TIME = "lastUpdateTime";
public static final String DYNAMIC_PARTITION_STATE = "dynamicPartitionState";
public static final String CREATE_PARTITION_MSG = "createPartitionMsg";
public static final String DROP_PARTITION_MSG = "dropPartitionMsg";
private static final String DEFAULT_RUNTIME_VALUE = FeConstants.null_string;
private static final long SLEEP_PIECE = 5000L;
private Map<Long, Map<String, String>> runtimeInfos = Maps.newConcurrentMap();
private Set<Pair<Long, Long>> dynamicPartitionTableInfo = Sets.newConcurrentHashSet();
private boolean initialize;
public enum State {
NORMAL,
ERROR
}
public DynamicPartitionScheduler(String name, long intervalMs) {
super(name, intervalMs);
this.initialize = false;
}
public void executeDynamicPartitionFirstTime(Long dbId, Long tableId) throws DdlException {
List<Pair<Long, Long>> tempDynamicPartitionTableInfo = Lists.newArrayList(Pair.of(dbId, tableId));
executeDynamicPartition(tempDynamicPartitionTableInfo, true);
}
public void registerDynamicPartitionTable(Long dbId, Long tableId) {
dynamicPartitionTableInfo.add(Pair.of(dbId, tableId));
}
// only for test
public boolean containsDynamicPartitionTable(Long dbId, Long tableId) {
return dynamicPartitionTableInfo.contains(Pair.of(dbId, tableId));
}
public void removeDynamicPartitionTable(Long dbId, Long tableId) {
dynamicPartitionTableInfo.remove(Pair.of(dbId, tableId));
}
public String getRuntimeInfo(long tableId, String key) {
Map<String, String> tableRuntimeInfo = runtimeInfos.getOrDefault(tableId, createDefaultRuntimeInfo());
return tableRuntimeInfo.getOrDefault(key, DEFAULT_RUNTIME_VALUE);
}
public void removeRuntimeInfo(long tableId) {
runtimeInfos.remove(tableId);
}
public void createOrUpdateRuntimeInfo(long tableId, String key, String value) {
Map<String, String> runtimeInfo = runtimeInfos.get(tableId);
if (runtimeInfo == null) {
runtimeInfo = createDefaultRuntimeInfo();
runtimeInfo.put(key, value);
runtimeInfos.put(tableId, runtimeInfo);
} else {
runtimeInfo.put(key, value);
}
}
private Map<String, String> createDefaultRuntimeInfo() {
Map<String, String> defaultRuntimeInfo = Maps.newConcurrentMap();
defaultRuntimeInfo.put(LAST_UPDATE_TIME, DEFAULT_RUNTIME_VALUE);
defaultRuntimeInfo.put(LAST_SCHEDULER_TIME, DEFAULT_RUNTIME_VALUE);
defaultRuntimeInfo.put(DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
defaultRuntimeInfo.put(CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
defaultRuntimeInfo.put(DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
return defaultRuntimeInfo;
}
// exponential moving average
private static long ema(ArrayList<Long> history, int period) {
double alpha = 2.0 / (period + 1);
double ema = history.get(0);
for (int i = 1; i < history.size(); i++) {
ema = alpha * history.get(i) + (1 - alpha) * ema;
}
return (long) ema;
}
private static long getNextPartitionSize(ArrayList<Long> historyPartitionsSize) {
if (historyPartitionsSize.size() < 2) {
return historyPartitionsSize.get(0);
}
boolean isAscending = true;
ArrayList<Long> ascendingDeltaSize = new ArrayList<Long>();
for (int i = Math.max(1, historyPartitionsSize.size() - 7); i < historyPartitionsSize.size(); i++) {
long delta = historyPartitionsSize.get(i) - historyPartitionsSize.get(i - 1);
if (delta < 0) {
isAscending = false;
break;
}
ascendingDeltaSize.add(delta);
}
if (isAscending) {
return historyPartitionsSize.get(historyPartitionsSize.size() - 1) + ema(ascendingDeltaSize, 7);
} else {
return ema(historyPartitionsSize, 7);
}
}
private static int getBucketsNum(DynamicPartitionProperty property, OlapTable table,
String partitionName, String nowPartitionName, boolean executeFirstTime) {
// if execute first time, all partitions no contain data
if (!table.isAutoBucket() || executeFirstTime) {
return property.getBuckets();
}
// auto bucket
// get all history partitions
RangePartitionInfo info = (RangePartitionInfo) (table.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
List<Partition> partitions = idToItems.stream()
.map(entry -> table.getPartition(entry.getKey()))
.filter(partition -> partition != null && !partition.getName().equals(nowPartitionName))
.collect(Collectors.toList());
List<Long> visibleVersions = null;
try {
visibleVersions = Partition.getVisibleVersions(partitions);
} catch (RpcException e) {
LOG.warn("autobucket use property's buckets get visible version fail, table: [{}-{}], "
+ "partition: {}, buckets num: {}, exception: ",
table.getName(), table.getId(), partitionName, property.getBuckets(), e);
return property.getBuckets();
}
List<Partition> hasDataPartitions = Lists.newArrayList();
for (int i = 0; i < partitions.size(); i++) {
if (visibleVersions.get(i) >= 2) {
hasDataPartitions.add(partitions.get(i));
}
}
// no exist history partition data
if (hasDataPartitions.isEmpty()) {
LOG.info("autobucket use property's buckets due to all partitions no data, table: [{}-{}], "
+ "partition: {}, buckets num: {}",
table.getName(), table.getId(), partitionName, property.getBuckets());
return property.getBuckets();
}
ArrayList<Long> partitionSizeArray = hasDataPartitions.stream()
.map(partition -> partition.getAllDataSize(true))
.collect(Collectors.toCollection(ArrayList::new));
long estimatePartitionSize = getNextPartitionSize(partitionSizeArray);
// plus 5 for uncompressed data
long uncompressedPartitionSize = estimatePartitionSize * 5;
int bucketsNum = AutoBucketUtils.getBucketsNum(uncompressedPartitionSize, Config.autobucket_min_buckets);
LOG.info("autobucket calc with {} history partitions, table: [{}-{}], partition: {}, buckets num: {}, "
+ " estimate partition size: {}, last partitions(partition name, local size, remote size): {}",
hasDataPartitions.size(), table.getName(), table.getId(), partitionName, bucketsNum,
estimatePartitionSize,
hasDataPartitions.stream()
.skip(Math.max(0, hasDataPartitions.size() - 7))
.map(partition -> "(" + partition.getName() + ", " + partition.getDataSize(true)
+ ", " + partition.getRemoteDataSize() + ")")
.collect(Collectors.toList()));
return bucketsNum;
}
private ArrayList<AddPartitionClause> getAddPartitionClause(Database db, OlapTable olapTable,
Column partitionColumn, String partitionFormat, boolean executeFirstTime) throws DdlException {
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
ZonedDateTime now = ZonedDateTime.now(dynamicPartitionProperty.getTimeZone().toZoneId());
boolean createHistoryPartition = dynamicPartitionProperty.isCreateHistoryPartition();
int idx;
// When enable create_history_partition, will check the valid value from start and history_partition_num.
if (createHistoryPartition) {
idx = DynamicPartitionUtil.getRealStart(dynamicPartitionProperty.getStart(),
dynamicPartitionProperty.getHistoryPartitionNum());
} else {
idx = 0;
}
int hotPartitionNum = dynamicPartitionProperty.getHotPartitionNum();
String storagePolicyName = dynamicPartitionProperty.getStoragePolicy();
String nowPartitionPrevBorder = DynamicPartitionUtil.getPartitionRangeString(
dynamicPartitionProperty, now, 0, partitionFormat);
String nowPartitionName = dynamicPartitionProperty.getPrefix()
+ DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(),
nowPartitionPrevBorder, dynamicPartitionProperty.getTimeUnit());
for (; idx <= dynamicPartitionProperty.getEnd(); idx++) {
String prevBorder = DynamicPartitionUtil.getPartitionRangeString(
dynamicPartitionProperty, now, idx, partitionFormat);
String nextBorder = DynamicPartitionUtil.getPartitionRangeString(
dynamicPartitionProperty, now, idx + 1, partitionFormat);
PartitionValue lowerValue = new PartitionValue(prevBorder);
PartitionValue upperValue = new PartitionValue(nextBorder);
boolean isPartitionExists = false;
Range<PartitionKey> addPartitionKeyRange;
try {
PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerValue),
Collections.singletonList(partitionColumn));
PartitionKey upperBound = PartitionKey.createPartitionKey(Collections.singletonList(upperValue),
Collections.singletonList(partitionColumn));
addPartitionKeyRange = Range.closedOpen(lowerBound, upperBound);
} catch (Exception e) {
// AnalysisException: keys.size is always equal to column.size, cannot reach this exception
// IllegalArgumentException: lb is greater than ub
LOG.warn("Error in gen addPartitionKeyRange. db: {}, table: {}, partition idx: {}",
db.getFullName(), olapTable.getName(), idx, e);
if (executeFirstTime) {
throw new DdlException("maybe dynamic_partition.start is too small, error: "
+ e.getMessage());
}
continue;
}
for (PartitionItem partitionItem : rangePartitionInfo.getIdToItem(false).values()) {
// only support single column partition now
try {
RangeUtils.checkRangeIntersect(partitionItem.getItems(), addPartitionKeyRange);
} catch (Exception e) {
isPartitionExists = true;
if (addPartitionKeyRange.equals(partitionItem.getItems())) {
if (LOG.isDebugEnabled()) {
LOG.debug("partition range {} exist in db {} table {} partition idx {}, clear fail msg",
addPartitionKeyRange, db.getFullName(), olapTable.getName(), idx);
}
clearCreatePartitionFailedMsg(olapTable.getId());
} else {
LOG.warn("check partition range {} in db {} table {} partiton idx {} fail",
addPartitionKeyRange, db.getFullName(), olapTable.getName(), idx, e);
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(),
e.getMessage(), olapTable.getId());
}
break;
}
}
if (isPartitionExists) {
continue;
}
// construct partition desc
PartitionKeyDesc partitionKeyDesc = PartitionKeyDesc.createFixed(Collections.singletonList(lowerValue),
Collections.singletonList(upperValue));
HashMap<String, String> partitionProperties = new HashMap<>(1);
if (dynamicPartitionProperty.getReplicaAllocation().isNotSet()) {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
olapTable.getDefaultReplicaAllocation().toCreateStmt());
} else {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_REPLICATION_ALLOCATION,
dynamicPartitionProperty.getReplicaAllocation().toCreateStmt());
}
// set storage_medium and storage_cooldown_time based on dynamic_partition.hot_partition_num
setStorageMediumProperty(partitionProperties, dynamicPartitionProperty, now, hotPartitionNum, idx);
if (StringUtils.isNotEmpty(storagePolicyName)) {
setStoragePolicyProperty(partitionProperties, dynamicPartitionProperty, now, idx, storagePolicyName);
}
String partitionName = dynamicPartitionProperty.getPrefix()
+ DynamicPartitionUtil.getFormattedPartitionName(dynamicPartitionProperty.getTimeZone(),
prevBorder, dynamicPartitionProperty.getTimeUnit());
SinglePartitionDesc rangePartitionDesc = new SinglePartitionDesc(true, partitionName,
partitionKeyDesc, partitionProperties);
DistributionDesc distributionDesc = null;
DistributionInfo distributionInfo = olapTable.getDefaultDistributionInfo();
int bucketsNum = getBucketsNum(dynamicPartitionProperty, olapTable, partitionName,
nowPartitionName, executeFirstTime);
if (distributionInfo.getType() == DistributionInfo.DistributionInfoType.HASH) {
HashDistributionInfo hashDistributionInfo = (HashDistributionInfo) distributionInfo;
List<String> distColumnNames = new ArrayList<>();
for (Column distributionColumn : hashDistributionInfo.getDistributionColumns()) {
distColumnNames.add(distributionColumn.getName());
}
distributionDesc = new HashDistributionDesc(bucketsNum, distColumnNames);
} else {
distributionDesc = new RandomDistributionDesc(bucketsNum);
}
// add partition according to partition desc and distribution desc
addPartitionClauses.add(new AddPartitionClause(rangePartitionDesc, distributionDesc, null, false));
}
return addPartitionClauses;
}
/**
* If dynamic_partition.storage_medium is set to SSD,
* ignore hot_partition_num property and set to (SSD, 9999-12-31 23:59:59)
* Else, if hot partition num is set, set storage medium to SSD due to time.
*
* @param partitionProperties
* @param property
* @param now
* @param hotPartitionNum
* @param offset
*/
private void setStorageMediumProperty(HashMap<String, String> partitionProperties,
DynamicPartitionProperty property, ZonedDateTime now, int hotPartitionNum, int offset) {
// 1. no hot partition, then use dynamic_partition.storage_medium
// 2. has hot partition
// 1) dynamic_partition.storage_medium = 'ssd', then use ssd;
// 2) otherwise
// a. cooldown partition, then use hdd
// b. hot partition. then use ssd
if (hotPartitionNum <= 0 || property.getStorageMedium().equalsIgnoreCase("ssd")) {
if (!Strings.isNullOrEmpty(property.getStorageMedium())) {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, property.getStorageMedium());
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME,
TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS));
}
} else if (offset + hotPartitionNum <= 0) {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.HDD.name());
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME,
TimeUtils.longToTimeString(DataProperty.MAX_COOLDOWN_TIME_MS));
} else {
String cooldownTime = DynamicPartitionUtil.getPartitionRangeString(
property, now, offset + hotPartitionNum, DynamicPartitionUtil.DATETIME_FORMAT);
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_MEDIUM, TStorageMedium.SSD.name());
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_COOLDOWN_TIME, cooldownTime);
}
}
private void setStoragePolicyProperty(HashMap<String, String> partitionProperties,
DynamicPartitionProperty property, ZonedDateTime now, int offset,
String storagePolicyName) {
partitionProperties.put(PropertyAnalyzer.PROPERTIES_STORAGE_POLICY, storagePolicyName);
String baseTime = DynamicPartitionUtil.getPartitionRangeString(
property, now, offset, DynamicPartitionUtil.DATETIME_FORMAT);
partitionProperties.put(PropertyAnalyzer.PROPERTIES_DATA_BASE_TIME, baseTime);
}
private Range<PartitionKey> getClosedRange(Database db, OlapTable olapTable, Column partitionColumn,
String partitionFormat, String lowerBorderOfReservedHistory, String upperBorderOfReservedHistory) {
Range<PartitionKey> reservedHistoryPartitionKeyRange = null;
PartitionValue lowerBorderPartitionValue = new PartitionValue(lowerBorderOfReservedHistory);
PartitionValue upperBorderPartitionValue = new PartitionValue(upperBorderOfReservedHistory);
try {
PartitionKey lowerBorderBound = PartitionKey.createPartitionKey(
Collections.singletonList(lowerBorderPartitionValue), Collections.singletonList(partitionColumn));
PartitionKey upperBorderBound = PartitionKey.createPartitionKey(
Collections.singletonList(upperBorderPartitionValue), Collections.singletonList(partitionColumn));
reservedHistoryPartitionKeyRange = Range.closed(lowerBorderBound, upperBorderBound);
} catch (org.apache.doris.common.AnalysisException | org.apache.doris.nereids.exceptions.AnalysisException e) {
// AnalysisException: keys.size is always equal to column.size, cannot reach this exception
// IllegalArgumentException: lb is greater than ub
LOG.warn("Error in gen reservePartitionKeyRange. {}, table: {}",
db.getFullName(), olapTable.getName(), e);
}
return reservedHistoryPartitionKeyRange;
}
/**
* 1. get the range of [start, 0) as a reserved range.
* 2. get DropPartitionClause of partitions which range are before this reserved range.
*/
private ArrayList<DropPartitionClause> getDropPartitionClause(Database db, OlapTable olapTable,
Column partitionColumn, String partitionFormat) throws DdlException {
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
DynamicPartitionProperty dynamicPartitionProperty = olapTable.getTableProperty().getDynamicPartitionProperty();
if (dynamicPartitionProperty.getStart() == DynamicPartitionProperty.MIN_START_OFFSET) {
// not set start offset, so not drop any partition
return dropPartitionClauses;
}
// drop partition only considering start, not considering history_partition_num.
// int realStart = DynamicPartitionUtil.getRealStart(dynamicPartitionProperty.getStart(),
// dynamicPartitionProperty.getHistoryPartitionNum());
int realStart = dynamicPartitionProperty.getStart();
ZonedDateTime now = ZonedDateTime.now(dynamicPartitionProperty.getTimeZone().toZoneId());
String lowerBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
now, realStart, partitionFormat);
String limitBorder = DynamicPartitionUtil.getPartitionRangeString(dynamicPartitionProperty,
now, 0, partitionFormat);
PartitionValue lowerPartitionValue = new PartitionValue(lowerBorder);
PartitionValue limitPartitionValue = new PartitionValue(limitBorder);
List<Range<PartitionKey>> reservedHistoryPartitionKeyRangeList = new ArrayList<Range<PartitionKey>>();
Range<PartitionKey> reservePartitionKeyRange;
try {
PartitionKey lowerBound = PartitionKey.createPartitionKey(Collections.singletonList(lowerPartitionValue),
Collections.singletonList(partitionColumn));
PartitionKey limitBound = PartitionKey.createPartitionKey(Collections.singletonList(limitPartitionValue),
Collections.singletonList(partitionColumn));
// if start offset very small, then lowerBound may be very large, such as '6031-07-01 00:00:00'
if (lowerBound.compareTo(limitBound) >= 0) {
return dropPartitionClauses;
}
reservePartitionKeyRange = Range.atLeast(lowerBound);
reservedHistoryPartitionKeyRangeList.add(reservePartitionKeyRange);
} catch (Exception e) {
// AnalysisException: keys.size is always equal to column.size, cannot reach this exception
// IllegalArgumentException: lb is greater than ub
String hint = "'dynamic_partition.start' = " + dynamicPartitionProperty.getStart()
+ ", maybe it's too small, can use alter table sql to increase it. ";
LOG.warn("Error in gen reservePartitionKeyRange. db: {}, table: {}. {}",
db.getFullName(), olapTable.getName(), hint, e);
recordDropPartitionFailedMsg(db.getFullName(), olapTable.getName(), hint + ", error: " + e.getMessage(),
olapTable.getId());
return dropPartitionClauses;
}
String reservedHistoryPeriods = dynamicPartitionProperty.getReservedHistoryPeriods();
List<Range> ranges = DynamicPartitionUtil.convertStringToPeriodsList(reservedHistoryPeriods,
dynamicPartitionProperty.getTimeUnit());
if (ranges.size() != 0) {
for (Range range : ranges) {
try {
String lowerBorderOfReservedHistory = DynamicPartitionUtil.getHistoryPartitionRangeString(
dynamicPartitionProperty, range.lowerEndpoint().toString(), partitionFormat);
String upperBorderOfReservedHistory = DynamicPartitionUtil.getHistoryPartitionRangeString(
dynamicPartitionProperty, range.upperEndpoint().toString(), partitionFormat);
Range<PartitionKey> reservedHistoryPartitionKeyRange = getClosedRange(db, olapTable,
partitionColumn, partitionFormat,
lowerBorderOfReservedHistory, upperBorderOfReservedHistory);
reservedHistoryPartitionKeyRangeList.add(reservedHistoryPartitionKeyRange);
} catch (IllegalArgumentException e) {
return dropPartitionClauses;
} catch (org.apache.doris.common.AnalysisException
| org.apache.doris.nereids.exceptions.AnalysisException e) {
throw new DdlException(e.getMessage());
}
}
}
RangePartitionInfo info = (RangePartitionInfo) (olapTable.getPartitionInfo());
List<Map.Entry<Long, PartitionItem>> idToItems = new ArrayList<>(info.getIdToItem(false).entrySet());
idToItems.sort(Comparator.comparing(o -> ((RangePartitionItem) o.getValue()).getItems().upperEndpoint()));
Map<Long, Boolean> isContaineds = new HashMap<>();
for (Map.Entry<Long, PartitionItem> idToItem : idToItems) {
isContaineds.put(idToItem.getKey(), false);
Long checkDropPartitionId = idToItem.getKey();
Range<PartitionKey> checkDropPartitionKey = idToItem.getValue().getItems();
for (Range<PartitionKey> reserveHistoryPartitionKeyRange : reservedHistoryPartitionKeyRangeList) {
if (RangeUtils.checkIsTwoRangesIntersect(reserveHistoryPartitionKeyRange, checkDropPartitionKey)) {
isContaineds.put(checkDropPartitionId, true);
}
}
}
for (Long dropPartitionId : isContaineds.keySet()) {
// Do not drop the partition "by force", or the partition will be dropped directly instread of being in
// catalog recycle bin. This is for safe reason.
if (!isContaineds.get(dropPartitionId)) {
String dropPartitionName = olapTable.getPartition(dropPartitionId).getName();
dropPartitionClauses.add(new DropPartitionClause(false, dropPartitionName, false, false));
}
}
return dropPartitionClauses;
}
// make public just for fe ut
public void executeDynamicPartition(Collection<Pair<Long, Long>> dynamicPartitionTableInfoCol,
boolean executeFirstTime) throws DdlException {
Iterator<Pair<Long, Long>> iterator = dynamicPartitionTableInfoCol.iterator();
while (iterator.hasNext()) {
Pair<Long, Long> tableInfo = iterator.next();
Long dbId = tableInfo.first;
Long tableId = tableInfo.second;
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
iterator.remove();
continue;
}
ArrayList<AddPartitionClause> addPartitionClauses = new ArrayList<>();
ArrayList<DropPartitionClause> dropPartitionClauses = new ArrayList<>();
String tableName = null;
boolean skipAddPartition = false;
OlapTable olapTable;
olapTable = (OlapTable) db.getTableNullable(tableId);
// Only OlapTable has DynamicPartitionProperty
if (olapTable == null
|| olapTable instanceof MTMV
|| !olapTable.dynamicPartitionExists()
|| !olapTable.getTableProperty().getDynamicPartitionProperty().getEnable()) {
iterator.remove();
continue;
} else if (olapTable.isBeingSynced()) {
continue;
}
olapTable.readLock();
try {
if (olapTable.getState() != OlapTable.OlapTableState.NORMAL) {
String errorMsg = "Table[" + olapTable.getName() + "]'s state is not NORMAL."
+ "Do not allow doing dynamic add partition. table state=" + olapTable.getState();
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(), errorMsg, olapTable.getId());
skipAddPartition = true;
}
// Determine the partition column type
// if column type is Date, format partition name as yyyyMMdd
// if column type is DateTime, format partition name as yyyyMMddHHssmm
// scheduler time should be record even no partition added
createOrUpdateRuntimeInfo(olapTable.getId(), LAST_SCHEDULER_TIME, TimeUtils.getCurrentFormatTime());
RangePartitionInfo rangePartitionInfo = (RangePartitionInfo) olapTable.getPartitionInfo();
if (rangePartitionInfo.getPartitionColumns().size() != 1) {
// currently only support partition with single column.
iterator.remove();
continue;
}
Column partitionColumn = rangePartitionInfo.getPartitionColumns().get(0);
String partitionFormat;
try {
partitionFormat = DynamicPartitionUtil.getPartitionFormat(partitionColumn);
} catch (Exception e) {
recordCreatePartitionFailedMsg(db.getFullName(), olapTable.getName(),
e.getMessage(), olapTable.getId());
continue;
}
if (!skipAddPartition) {
addPartitionClauses = getAddPartitionClause(db, olapTable, partitionColumn, partitionFormat,
executeFirstTime);
}
clearDropPartitionFailedMsg(olapTable.getId());
dropPartitionClauses = getDropPartitionClause(db, olapTable, partitionColumn, partitionFormat);
tableName = olapTable.getName();
} catch (Exception e) {
LOG.warn("db [{}-{}], table [{}-{}]'s dynamic partition has error",
db.getId(), db.getName(), olapTable.getId(), olapTable.getName(), e);
if (executeFirstTime) {
throw new DdlException(e.getMessage());
}
} finally {
olapTable.readUnlock();
}
for (DropPartitionClause dropPartitionClause : dropPartitionClauses) {
if (!olapTable.writeLockIfExist()) {
continue;
}
try {
Env.getCurrentEnv().dropPartition(db, olapTable, dropPartitionClause);
} catch (Exception e) {
recordDropPartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
LOG.warn("db [{}-{}], table [{}-{}]'s dynamic partition has error",
db.getId(), db.getName(), olapTable.getId(), olapTable.getName(), e);
if (executeFirstTime) {
throw new DdlException(e.getMessage());
}
} finally {
olapTable.writeUnlock();
}
}
if (!skipAddPartition) {
// get partitionIds and indexIds
List<Long> indexIds = new ArrayList<>(olapTable.getCopiedIndexIdToMeta().keySet());
List<Long> generatedPartitionIds = new ArrayList<>();
cloudBatchBeforeCreatePartitions(executeFirstTime, addPartitionClauses, olapTable, indexIds,
db, tableName, generatedPartitionIds);
List<PartitionPersistInfo> partsInfo = new ArrayList<>();
for (int i = 0; i < addPartitionClauses.size(); i++) {
try {
boolean needWriteEditLog = true;
// ATTN: !executeFirstTime, needWriteEditLog
// here executeFirstTime is create table, so in cloud edit log will postpone
if (Config.isCloudMode()) {
needWriteEditLog = !executeFirstTime;
}
PartitionPersistInfo info =
Env.getCurrentEnv().addPartition(db, tableName, addPartitionClauses.get(i),
executeFirstTime,
executeFirstTime && Config.isCloudMode() ? generatedPartitionIds.get(i) : 0,
needWriteEditLog);
if (info == null) {
throw new Exception("null persisted partition returned");
}
partsInfo.add(info);
clearCreatePartitionFailedMsg(olapTable.getId());
} catch (Exception e) {
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
LOG.warn("db [{}-{}], table [{}-{}]'s dynamic partition has error",
db.getId(), db.getName(), olapTable.getId(), olapTable.getName(), e);
if (executeFirstTime) {
throw new DdlException(e.getMessage());
}
}
}
cloudBatchAfterCreatePartitions(executeFirstTime, partsInfo,
addPartitionClauses, db, olapTable, indexIds, tableName);
// ATTN: Breaking up dynamic partition table scheduling, consuming peak CPU consumption
if (!executeFirstTime && !addPartitionClauses.isEmpty()
&& Config.dynamic_partition_step_interval_ms > 0) {
long sleep = Config.dynamic_partition_step_interval_ms;
if (sleep > 1800 * 1000) {
LOG.warn("fe conf dynamic_partition_step_interval_ms bigger than 1800s, plz check it");
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
LOG.warn("sleep err", e);
}
}
}
}
}
private void cloudBatchAfterCreatePartitions(boolean executeFirstTime, List<PartitionPersistInfo> partsInfo,
ArrayList<AddPartitionClause> addPartitionClauses, Database db,
OlapTable olapTable, List<Long> indexIds,
String tableName) throws DdlException {
if (Config.isNotCloudMode()) {
return;
}
List<Long> succeedPartitionIds = partsInfo.stream().map(partitionPersistInfo
-> partitionPersistInfo.getPartition().getId()).collect(Collectors.toList());
if (!executeFirstTime || addPartitionClauses.isEmpty()) {
LOG.info("cloud commit rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size());
return;
}
try {
// ATTN: failedPids = generatedPartitionIds - succeedPartitionIds,
// means some partitions failed when addPartition, failedPids will be recycled by recycler
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.commitCloudPartition")) {
LOG.info("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition, throw e");
// not commit, not log edit
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
}
Env.getCurrentInternalCatalog().afterCreatePartitions(db.getId(), olapTable.getId(),
succeedPartitionIds, indexIds, true);
LOG.info("begin write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
// ATTN: here, edit log must after commit cloud partition,
// prevent commit RPC failure from causing data loss
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.before.logEditPartitions")) {
LOG.info("debug point FE.DynamicPartitionScheduler.before.logEditPartitions, throw e");
// committed, but not log edit
throw new Exception("debug point FE.DynamicPartitionScheduler.before.commitCloudPartition");
}
for (int i = 0; i < partsInfo.size(); i++) {
Env.getCurrentEnv().getEditLog().logAddPartition(partsInfo.get(i));
if (DebugPointUtil.isEnable("FE.DynamicPartitionScheduler.in.logEditPartitions")) {
if (i == partsInfo.size() / 2) {
LOG.info("debug point FE.DynamicPartitionScheduler.in.logEditPartitions, throw e");
// committed, but log some edit, others failed
throw new Exception("debug point FE.DynamicPartitionScheduler"
+ ".in.commitCloudPartition");
}
}
}
LOG.info("finish write edit log to add partitions in batch, "
+ "numPartitions: {}, db: {}, table: {}, tableId: {}",
partsInfo.size(), db.getFullName(), tableName, olapTable.getId());
} catch (Exception e) {
LOG.warn("cloud in commit step, dbName {}, tableName {}, tableId {} exception {}",
db.getFullName(), tableName, olapTable.getId(), e.getMessage());
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
throw new DdlException("cloud in commit step err");
}
}
private void cloudBatchBeforeCreatePartitions(boolean executeFirstTime,
ArrayList<AddPartitionClause> addPartitionClauses,
OlapTable olapTable, List<Long> indexIds, Database db,
String tableName, List<Long> generatedPartitionIds)
throws DdlException {
if (Config.isNotCloudMode()) {
return;
}
if (!executeFirstTime || addPartitionClauses.isEmpty()) {
LOG.info("cloud prepare rpc in batch, {}-{}", !executeFirstTime, addPartitionClauses.size());
return;
}
AddPartitionClause addPartitionClause = addPartitionClauses.get(0);
DistributionDesc distributionDesc = addPartitionClause.getDistributionDesc();
try {
DistributionInfo distributionInfo = distributionDesc
.toDistributionInfo(olapTable.getBaseSchema());
if (distributionDesc == null) {
distributionInfo = olapTable.getDefaultDistributionInfo()
.toDistributionDesc().toDistributionInfo(olapTable.getBaseSchema());
}
long allPartitionBufferSize = 0;
for (int i = 0; i < addPartitionClauses.size(); i++) {
long bufferSize = InternalCatalog.checkAndGetBufferSize(indexIds.size(),
distributionInfo.getBucketNum(),
addPartitionClause.getSingeRangePartitionDesc()
.getReplicaAlloc().getTotalReplicaNum(),
db, tableName);
allPartitionBufferSize += bufferSize;
}
MetaIdGenerator.IdGeneratorBuffer idGeneratorBuffer = Env.getCurrentEnv()
.getIdGeneratorBuffer(allPartitionBufferSize);
addPartitionClauses.forEach(p -> generatedPartitionIds.add(idGeneratorBuffer.getNextId()));
// executeFirstTime true
Env.getCurrentInternalCatalog().beforeCreatePartitions(db.getId(), olapTable.getId(),
generatedPartitionIds, indexIds, true);
} catch (Exception e) {
LOG.warn("cloud in prepare step, dbName {}, tableName {}, tableId {} indexId {} exception {}",
db.getFullName(), tableName, olapTable.getId(), indexIds, e.getMessage());
recordCreatePartitionFailedMsg(db.getFullName(), tableName, e.getMessage(), olapTable.getId());
throw new DdlException("cloud in prepare step err");
}
}
private void recordCreatePartitionFailedMsg(String dbName, String tableName, String msg, long tableId) {
LOG.info("dynamic add partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
createOrUpdateRuntimeInfo(tableId, CREATE_PARTITION_MSG, msg);
}
private void clearCreatePartitionFailedMsg(long tableId) {
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
createOrUpdateRuntimeInfo(tableId, CREATE_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
}
private void recordDropPartitionFailedMsg(String dbName, String tableName, String msg, long tableId) {
LOG.warn("dynamic drop partition failed: {}, db: {}, table: {}", msg, dbName, tableName);
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.ERROR.toString());
createOrUpdateRuntimeInfo(tableId, DROP_PARTITION_MSG, msg);
}
private void clearDropPartitionFailedMsg(long tableId) {
createOrUpdateRuntimeInfo(tableId, DYNAMIC_PARTITION_STATE, State.NORMAL.toString());
createOrUpdateRuntimeInfo(tableId, DROP_PARTITION_MSG, DEFAULT_RUNTIME_VALUE);
}
private void initDynamicPartitionTable() {
for (Long dbId : Env.getCurrentEnv().getInternalCatalog().getDbIds()) {
Database db = Env.getCurrentInternalCatalog().getDbNullable(dbId);
if (db == null) {
continue;
}
List<Table> tableList = db.getTables();
for (Table table : tableList) {
table.readLock();
try {
if (DynamicPartitionUtil.isDynamicPartitionTable(table)) {
registerDynamicPartitionTable(db.getId(), table.getId());
}
} finally {
table.readUnlock();
}
}
}
initialize = true;
}
// specialized schedule logic. split sleep to many small pieces. so if interval changed, it won't take too much
// time to aware.
@Override
public void run() {
if (metaContext != null) {
metaContext.setThreadLocalInfo();
}
while (!isStop.get()) {
try {
runOneCycle();
} catch (Throwable e) {
LOG.error("daemon thread got exception. name: {}", getName(), e);
}
try {
long oldInterval = intervalMs;
long remainingInterval = oldInterval;
while (remainingInterval > SLEEP_PIECE) {
// if it changed. let it know at most 10 seconds. and 5 second per wakeup is acceptable.
if (intervalMs != oldInterval) { // changed
break;
}
Thread.sleep(SLEEP_PIECE);
remainingInterval -= SLEEP_PIECE;
}
if (remainingInterval <= SLEEP_PIECE) {
Thread.sleep(remainingInterval);
}
} catch (InterruptedException e) {
// This thread should NEVER be interrupted. or meet bdbje writing, it will be disaster.
LOG.fatal("InterruptedException: ", e);
}
}
if (metaContext != null) {
MetaContext.remove();
}
LOG.error("daemon thread exits. name=" + this.getName());
}
@Override
protected void runAfterCatalogReady() {
if (!initialize) {
// check Dynamic Partition tables only when FE start
initDynamicPartitionTable();
}
setInterval(Config.dynamic_partition_check_interval_seconds * 1000L);
if (Config.dynamic_partition_enable) {
try {
executeDynamicPartition(dynamicPartitionTableInfo, false);
} catch (Exception e) {
// previous had log DdlException
if (LOG.isDebugEnabled()) {
LOG.debug("dynamic partition has error: ", e);
}
}
}
}
}