StatisticsCache.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.statistics;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeConstants;
import org.apache.doris.common.ThreadPoolManager;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TInvalidateFollowerStatsCacheRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUpdateFollowerPartitionStatsCacheRequest;
import org.apache.doris.thrift.TUpdateFollowerStatsCacheRequest;
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.commons.collections.CollectionUtils;
import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
public class StatisticsCache {
private static final Logger LOG = LogManager.getLogger(StatisticsCache.class);
/**
* Use a standalone thread pool to avoid interference between this and any other jdk function
* that use the thread of ForkJoinPool#common in the system.
*/
private final ThreadPoolExecutor threadPool
= ThreadPoolManager.newDaemonFixedThreadPool(
10, Integer.MAX_VALUE, "STATS_FETCH", true);
private final ColumnStatisticsCacheLoader columnStatisticCacheLoader = new ColumnStatisticsCacheLoader();
private final HistogramCacheLoader histogramCacheLoader = new HistogramCacheLoader();
private final PartitionColumnStatisticCacheLoader partitionColumnStatisticCacheLoader
= new PartitionColumnStatisticCacheLoader();
private final AsyncLoadingCache<StatisticsCacheKey, Optional<ColumnStatistic>> columnStatisticsCache =
Caffeine.newBuilder()
.maximumSize(Config.stats_cache_size)
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
.executor(threadPool)
.buildAsync(columnStatisticCacheLoader);
private final AsyncLoadingCache<StatisticsCacheKey, Optional<Histogram>> histogramCache =
Caffeine.newBuilder()
.maximumSize(Config.stats_cache_size)
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
.executor(threadPool)
.buildAsync(histogramCacheLoader);
private final AsyncLoadingCache<PartitionColumnStatisticCacheKey, Optional<PartitionColumnStatistic>>
partitionColumnStatisticCache =
Caffeine.newBuilder()
.maximumSize(Config.stats_cache_size)
.refreshAfterWrite(Duration.ofHours(StatisticConstants.STATISTICS_CACHE_REFRESH_INTERVAL))
.executor(threadPool)
.buildAsync(partitionColumnStatisticCacheLoader);
public ColumnStatistic getColumnStatistics(long catalogId, long dbId, long tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().internalSession) {
return ColumnStatistic.UNKNOWN;
}
// Need to change base index id to -1 for OlapTable.
try {
idxId = changeBaseIndexId(catalogId, dbId, tblId, idxId);
} catch (Exception e) {
return ColumnStatistic.UNKNOWN;
}
StatisticsCacheKey k = new StatisticsCacheKey(catalogId, dbId, tblId, idxId, colName);
try {
CompletableFuture<Optional<ColumnStatistic>> f = columnStatisticsCache.get(k);
if (f.isDone()) {
return f.get().orElse(ColumnStatistic.UNKNOWN);
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning ColumnStatistic", e);
}
return ColumnStatistic.UNKNOWN;
}
public PartitionColumnStatistic getPartitionColumnStatistics(long catalogId, long dbId, long tblId, long idxId,
String partName, String colName) {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().internalSession) {
return PartitionColumnStatistic.UNKNOWN;
}
// Need to change base index id to -1 for OlapTable.
try {
idxId = changeBaseIndexId(catalogId, dbId, tblId, idxId);
} catch (Exception e) {
return PartitionColumnStatistic.UNKNOWN;
}
PartitionColumnStatisticCacheKey k = new PartitionColumnStatisticCacheKey(
catalogId, dbId, tblId, idxId, partName, colName);
try {
CompletableFuture<Optional<PartitionColumnStatistic>> f = partitionColumnStatisticCache.get(k);
if (f.isDone()) {
return f.get().orElse(PartitionColumnStatistic.UNKNOWN);
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning ColumnStatistic", e);
}
return PartitionColumnStatistic.UNKNOWN;
}
// Base index id should be set to -1 for OlapTable. Because statistics tables use -1 for base index.
// TODO: Need to use the real index id in statistics table in later version.
private long changeBaseIndexId(long catalogId, long dbId, long tblId, long idxId) {
if (idxId != -1) {
TableIf table = StatisticsUtil.findTable(catalogId, dbId, tblId);
if (table instanceof OlapTable) {
OlapTable olapTable = (OlapTable) table;
if (idxId == olapTable.getBaseIndexId()) {
idxId = -1;
}
}
}
return idxId;
}
public Histogram getHistogram(long ctlId, long dbId, long tblId, String colName) {
return getHistogram(ctlId, dbId, tblId, -1, colName).orElse(null);
}
private Optional<Histogram> getHistogram(long ctlId, long dbId, long tblId, long idxId, String colName) {
ConnectContext ctx = ConnectContext.get();
if (ctx != null && ctx.getSessionVariable().internalSession) {
return Optional.empty();
}
StatisticsCacheKey k = new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName);
try {
CompletableFuture<Optional<Histogram>> f = histogramCache.get(k);
if (f.isDone()) {
return f.get();
}
} catch (Exception e) {
LOG.warn("Unexpected exception while returning Histogram", e);
}
return Optional.empty();
}
public void invalidateColumnStatsCache(long ctlId, long dbId, long tblId, long idxId, String colName) {
columnStatisticsCache.synchronous().invalidate(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName));
}
public void invalidatePartitionColumnStatsCache(long ctlId, long dbId, long tblId, long idxId,
String partName, String colName) {
if (partName == null) {
return;
}
partitionColumnStatisticCache.synchronous().invalidate(
new PartitionColumnStatisticCacheKey(ctlId, dbId, tblId, idxId, partName, colName));
}
public void invalidateAllPartitionStatsCache() {
partitionColumnStatisticCache.synchronous().invalidateAll();
}
public void updatePartitionColStatsCache(long ctlId, long dbId, long tblId, long idxId,
String partName, String colName, PartitionColumnStatistic statistic) {
partitionColumnStatisticCache.synchronous().put(
new PartitionColumnStatisticCacheKey(ctlId, dbId, tblId, idxId, partName, colName), Optional.of(statistic));
}
public void updateColStatsCache(long ctlId, long dbId, long tblId, long idxId, String colName,
ColumnStatistic statistic) {
columnStatisticsCache.synchronous()
.put(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName), Optional.of(statistic));
}
public void refreshColStatsSync(long ctlId, long dbId, long tblId, long idxId, String colName) {
columnStatisticsCache.synchronous().refresh(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName));
}
public void refreshHistogramSync(long ctlId, long dbId, long tblId, long idxId, String colName) {
histogramCache.synchronous().refresh(new StatisticsCacheKey(ctlId, dbId, tblId, idxId, colName));
}
public void preHeat() {
if (!FeConstants.disablePreHeat) {
threadPool.submit(this::doPreHeat);
}
}
private void doPreHeat() {
List<ResultRow> recentStatsUpdatedCols = null;
long retryTimes = 0;
while (!StatisticsUtil.statsTblAvailable()) {
try {
Thread.sleep(100L);
} catch (InterruptedException e) {
// IGNORE
}
}
while (retryTimes < StatisticConstants.PRELOAD_RETRY_TIMES) {
try {
recentStatsUpdatedCols = StatisticsRepository.fetchRecentStatsUpdatedCol();
break;
} catch (Throwable t) {
// IGNORE
}
retryTimes++;
try {
Thread.sleep(StatisticConstants.PRELOAD_RETRY_INTERVAL_IN_SECONDS);
} catch (Throwable t) {
// IGNORE
}
}
if (CollectionUtils.isEmpty(recentStatsUpdatedCols)) {
return;
}
for (ResultRow r : recentStatsUpdatedCols) {
try {
StatsId statsId = new StatsId(r);
final StatisticsCacheKey k =
new StatisticsCacheKey(statsId.catalogId, statsId.dbId, statsId.tblId, statsId.idxId,
statsId.colId);
final ColumnStatistic c = ColumnStatistic.fromResultRow(r);
putCache(k, c);
} catch (Throwable t) {
LOG.warn("Error when preheating stats cache. reason: [{}]. Row:[{}]", t.getMessage(), r);
if (LOG.isDebugEnabled()) {
LOG.debug(t);
}
}
}
}
/**
* Refresh stats cache, invalidate cache if the new data is unknown.
*/
public void syncColStats(ColStatsData data) {
StatsId statsId = data.statsId;
final StatisticsCacheKey k = new StatisticsCacheKey(statsId.catalogId, statsId.dbId, statsId.tblId,
statsId.idxId, statsId.colId);
ColumnStatistic columnStatistic = data.toColumnStatistic();
if (columnStatistic == ColumnStatistic.UNKNOWN) {
invalidateColumnStatsCache(k.catalogId, k.dbId, k.tableId, k.idxId, k.colName);
} else {
putCache(k, columnStatistic);
}
TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest = new TUpdateFollowerStatsCacheRequest();
updateFollowerStatsCacheRequest.key = GsonUtils.GSON.toJson(k);
updateFollowerStatsCacheRequest.colStatsData = GsonUtils.GSON.toJson(data);
// For compatible only, to be deprecated.
updateFollowerStatsCacheRequest.statsRows = new ArrayList<>();
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
sendStats(frontend, updateFollowerStatsCacheRequest);
}
}
@VisibleForTesting
public void sendStats(Frontend frontend, TUpdateFollowerStatsCacheRequest updateFollowerStatsCacheRequest) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.updateStatsCache(updateFollowerStatsCacheRequest);
} catch (Throwable t) {
LOG.warn("Failed to sync stats to follower: {}", address, t);
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
}
@VisibleForTesting
public boolean invalidateStats(Frontend frontend, TInvalidateFollowerStatsCacheRequest request) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.invalidateStatsCache(request);
} catch (Throwable t) {
LOG.warn("Failed to sync invalidate to follower: {}", address, t);
return false;
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
return true;
}
public void putCache(StatisticsCacheKey k, ColumnStatistic c) {
CompletableFuture<Optional<ColumnStatistic>> f = new CompletableFuture<Optional<ColumnStatistic>>();
f.obtrudeValue(Optional.of(c));
columnStatisticsCache.put(k, f);
}
@VisibleForTesting
public boolean updatePartitionStats(Frontend frontend, TUpdateFollowerPartitionStatsCacheRequest request) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.updatePartitionStatsCache(request);
} catch (Throwable t) {
LOG.warn("Failed to update partition stats cache of follower: {}", address, t);
return false;
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
return true;
}
}