NereidsSortedPartitionsCacheManager.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.common.cache;
import org.apache.doris.catalog.DatabaseIf;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.PartitionItem;
import org.apache.doris.catalog.SupportBinarySearchFilteringPartitions;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.nereids.rules.expression.rules.MultiColumnBound;
import org.apache.doris.nereids.rules.expression.rules.PartitionItemToRange;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndId;
import org.apache.doris.nereids.rules.expression.rules.SortedPartitionRanges.PartitionItemAndRange;
import org.apache.doris.nereids.trees.plans.algebra.CatalogRelation;
import org.apache.doris.qe.ConnectContext;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Range;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.hadoop.util.Lists;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Optional;
/**
* This cache is used to sort the table partitions by range, so we can do binary search to skip
* filter the huge numbers of partitions, for example, the table partition column is `dt date`
* and one date for one partition, range from '2017-01-01' to '2025-01-01', for partition predicate
* `where dt = '2024-12-24'`, we can fast jump to '2024-12-24' within few partition range comparison,
* and the QPS can be improved
*/
public class NereidsSortedPartitionsCacheManager {
private volatile Cache<TableIdentifier, PartitionCacheContext> partitionCaches;
public NereidsSortedPartitionsCacheManager() {
partitionCaches = buildCaches(
Config.cache_partition_meta_table_manage_num,
Config.expire_cache_partition_meta_table_in_fe_second
);
}
public Optional<SortedPartitionRanges<?>> get(
SupportBinarySearchFilteringPartitions table, CatalogRelation scan) {
ConnectContext connectContext = ConnectContext.get();
if (connectContext != null && !connectContext.getSessionVariable().enableBinarySearchFilteringPartitions) {
return Optional.empty();
}
DatabaseIf<?> database = table.getDatabase();
if (database == null) {
return Optional.empty();
}
CatalogIf<?> catalog = database.getCatalog();
if (catalog == null) {
return Optional.empty();
}
TableIdentifier key = new TableIdentifier(
catalog.getName(), database.getFullName(), table.getName());
PartitionCacheContext partitionCacheContext = partitionCaches.getIfPresent(key);
if (partitionCacheContext == null) {
return Optional.ofNullable(loadCache(key, table, scan));
}
if (table.getId() != partitionCacheContext.tableId
|| !Objects.equals(table.getPartitionMetaVersion(scan), partitionCacheContext.partitionMetaVersion)) {
partitionCaches.invalidate(key);
return Optional.ofNullable(loadCache(key, table, scan));
}
return Optional.of(partitionCacheContext.sortedPartitionRanges);
}
private SortedPartitionRanges<?> loadCache(
TableIdentifier key, SupportBinarySearchFilteringPartitions table, CatalogRelation scan) {
long now = System.currentTimeMillis();
long partitionMetaLoadTime = table.getPartitionMetaLoadTimeMillis(scan);
// if insert too frequently, we will skip sort partitions
if (now <= partitionMetaLoadTime || (now - partitionMetaLoadTime) <= (10 * 1000)) {
return null;
}
Map<?, PartitionItem> unsortedMap = table.getOriginPartitions(scan);
List<Entry<?, PartitionItem>> unsortedList = Lists.newArrayList(unsortedMap.entrySet());
List<PartitionItemAndRange<?>> sortedRanges = Lists.newArrayListWithCapacity(unsortedMap.size());
List<PartitionItemAndId<?>> defaultPartitions = Lists.newArrayList();
for (Entry<?, PartitionItem> entry : unsortedList) {
PartitionItem partitionItem = entry.getValue();
Object id = entry.getKey();
if (!partitionItem.isDefaultPartition()) {
List<Range<MultiColumnBound>> ranges = PartitionItemToRange.toRanges(partitionItem);
for (Range<MultiColumnBound> range : ranges) {
sortedRanges.add(new PartitionItemAndRange<>(id, partitionItem, range));
}
} else {
defaultPartitions.add(new PartitionItemAndId<>(id, partitionItem));
}
}
sortedRanges.sort((o1, o2) -> {
Range<MultiColumnBound> span1 = o1.range;
Range<MultiColumnBound> span2 = o2.range;
int result = span1.lowerEndpoint().compareTo(span2.lowerEndpoint());
if (result != 0) {
return result;
}
result = span1.upperEndpoint().compareTo(span2.upperEndpoint());
return result;
});
SortedPartitionRanges<?> sortedPartitionRanges = new SortedPartitionRanges(
sortedRanges, defaultPartitions
);
PartitionCacheContext context = new PartitionCacheContext(
table.getId(), table.getPartitionMetaVersion(scan), sortedPartitionRanges);
partitionCaches.put(key, context);
return sortedPartitionRanges;
}
private static Cache<TableIdentifier, PartitionCacheContext> buildCaches(
int sortedPartitionTableManageNum, int expireSortedPartitionTableInFeSecond) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
// auto evict cache when jvm memory too low
.softValues();
if (sortedPartitionTableManageNum > 0) {
cacheBuilder = cacheBuilder.maximumSize(sortedPartitionTableManageNum);
}
if (expireSortedPartitionTableInFeSecond > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireSortedPartitionTableInFeSecond));
}
return cacheBuilder.build();
}
public static synchronized void updateConfig() {
Env currentEnv = Env.getCurrentEnv();
if (currentEnv == null) {
return;
}
NereidsSortedPartitionsCacheManager cacheManager = currentEnv.getSortedPartitionsCacheManager();
if (cacheManager == null) {
return;
}
Cache<TableIdentifier, PartitionCacheContext> caches = buildCaches(
Config.cache_partition_meta_table_manage_num,
Config.expire_cache_partition_meta_table_in_fe_second
);
caches.putAll(cacheManager.partitionCaches.asMap());
cacheManager.partitionCaches = caches;
}
@Data
@AllArgsConstructor
private static class TableIdentifier {
public final String catalog;
public final String db;
public final String table;
}
private static class PartitionCacheContext {
private final long tableId;
private final Object partitionMetaVersion;
private final SortedPartitionRanges sortedPartitionRanges;
public PartitionCacheContext(
long tableId, Object partitionMetaVersion, SortedPartitionRanges sortedPartitionRanges) {
this.tableId = tableId;
this.partitionMetaVersion
= Objects.requireNonNull(partitionMetaVersion, "partitionMetaVersion cannot be null");
this.sortedPartitionRanges = sortedPartitionRanges;
}
@Override
public String toString() {
return "PartitionCacheContext(tableId="
+ tableId + ", tableVersion=" + partitionMetaVersion
+ ", partitionNum=" + sortedPartitionRanges.sortedPartitions.size() + ")";
}
}
// NOTE: used in Config.cache_partition_meta_table_manage_num.callbackClassString and
// Config.expire_cache_partition_meta_table_in_fe_second.callbackClassString,
// don't remove it!
public static class UpdateConfig extends DefaultConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
super.handle(field, confVal);
NereidsSortedPartitionsCacheManager.updateConfig();
}
}
}