MemoryHboPlanStatisticsProvider.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.nereids.stats;
import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.Config;
import org.apache.doris.common.ConfigBase.DefaultConfHandler;
import org.apache.doris.nereids.trees.plans.PlanNodeAndHash;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.statistics.hbo.RecentRunsPlanStatistics;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TUpdatePlanStatsCacheRequest;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* HboPlanStatisticsProvider's in-memory implementation.
*/
public class MemoryHboPlanStatisticsProvider implements HboPlanStatisticsProvider {
private static final Logger LOG = LogManager.getLogger(MemoryHboPlanStatisticsProvider.class);
private volatile Cache<String, RecentRunsPlanStatistics> hboPlanStatsCache;
public MemoryHboPlanStatisticsProvider() {
hboPlanStatsCache = buildHboPlanStatsCaches(
Config.hbo_plan_stats_cache_num,
Config.expire_hbo_plan_stats_cache_in_fe_second
);
}
@Override
public RecentRunsPlanStatistics getHboPlanStats(PlanNodeAndHash planNodeAndHash) {
if (planNodeAndHash.getHash().isPresent()) {
return hboPlanStatsCache.asMap().getOrDefault(planNodeAndHash.getHash().get(),
RecentRunsPlanStatistics.empty());
}
return RecentRunsPlanStatistics.empty();
}
@Override
public Map<PlanNodeAndHash, RecentRunsPlanStatistics> getHboPlanStats(List<PlanNodeAndHash> planNodeHashes) {
return planNodeHashes.stream().collect(Collectors.toMap(
planNodeAndHash -> planNodeAndHash,
planNodeAndHash -> {
if (planNodeAndHash.getHash().isPresent()) {
return hboPlanStatsCache.asMap().getOrDefault(planNodeAndHash.getHash().get(),
RecentRunsPlanStatistics.empty());
}
return RecentRunsPlanStatistics.empty();
}));
}
@Override
public void putHboPlanStats(Map<PlanNodeAndHash, RecentRunsPlanStatistics> hashStatisticsMap) {
hashStatisticsMap.forEach((planNodeAndHash, recentRunsPlanStatistics) -> {
if (planNodeAndHash.getHash().isPresent()) {
hboPlanStatsCache.put(planNodeAndHash.getHash().get(), recentRunsPlanStatistics);
}
});
}
@Override
public void updatePlanStats(PlanNodeAndHash hash, RecentRunsPlanStatistics planStatistics) {
hboPlanStatsCache.put(hash.getHash().get(), planStatistics);
}
/**
* sync hbo plan stats to other fe client.
* @param planKey planKey
* @param planStatsData planStatsData
*/
public void syncHboPlanStats(PlanNodeAndHash planKey, RecentRunsPlanStatistics planStatsData) {
TUpdatePlanStatsCacheRequest updateFollowerPlanStatsCacheRequest = new TUpdatePlanStatsCacheRequest();
updateFollowerPlanStatsCacheRequest.key = GsonUtils.GSON.toJson(planKey);
updateFollowerPlanStatsCacheRequest.planStatsData = GsonUtils.GSON.toJson(planStatsData);
SystemInfoService.HostInfo selfNode = Env.getCurrentEnv().getSelfNode();
for (Frontend frontend : Env.getCurrentEnv().getFrontends(null)) {
if (selfNode.getHost().equals(frontend.getHost())) {
continue;
}
sendPlanStats(frontend, updateFollowerPlanStatsCacheRequest);
}
}
private void sendPlanStats(Frontend frontend, TUpdatePlanStatsCacheRequest updateFollowerPlanStatsCacheRequest) {
TNetworkAddress address = new TNetworkAddress(frontend.getHost(), frontend.getRpcPort());
FrontendService.Client client = null;
try {
client = ClientPool.frontendPool.borrowObject(address);
client.updatePlanStatsCache(updateFollowerPlanStatsCacheRequest);
} catch (Throwable t) {
LOG.warn("Failed to sync plan stats to fe client: {}", address, t);
} finally {
if (client != null) {
ClientPool.frontendPool.returnObject(address, client);
}
}
}
private static Cache<String, RecentRunsPlanStatistics> buildHboPlanStatsCaches(
int cacheNum, long expireAfterAccessSeconds) {
Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder()
.softValues();
if (cacheNum > 0) {
cacheBuilder.maximumSize(cacheNum);
}
if (expireAfterAccessSeconds > 0) {
cacheBuilder = cacheBuilder.expireAfterAccess(Duration.ofSeconds(expireAfterAccessSeconds));
}
return cacheBuilder.build();
}
/**
* NOTE: used in Config.hbo_plan_stats_cache_num.callbackClassString and
* Config.expire_hbo_plan_stats_cache_in_fe_second.callbackClassString,
*/
public static class UpdateConfig extends DefaultConfHandler {
@Override
public void handle(Field field, String confVal) throws Exception {
super.handle(field, confVal);
MemoryHboPlanStatisticsProvider.updateConfig();
}
}
/**
* Reference the above UpdateConfig comments.
*/
public static synchronized void updateConfig() {
HboPlanStatisticsManager hboManger = Env.getCurrentEnv().getHboPlanStatisticsManager();
if (hboManger == null) {
return;
}
HboPlanStatisticsProvider hboPlanStatsProvider = hboManger.getHboPlanStatisticsProvider();
if (!(hboPlanStatsProvider instanceof MemoryHboPlanStatisticsProvider)) {
return;
}
MemoryHboPlanStatisticsProvider inMemHboPlanStatsProvider =
(MemoryHboPlanStatisticsProvider) hboPlanStatsProvider;
Cache<String, RecentRunsPlanStatistics> hboPlanStatsCache = buildHboPlanStatsCaches(
Config.hbo_plan_stats_cache_num,
Config.expire_hbo_plan_stats_cache_in_fe_second
);
hboPlanStatsCache.putAll(inMemHboPlanStatsProvider.hboPlanStatsCache.asMap());
inMemHboPlanStatsProvider.hboPlanStatsCache = hboPlanStatsCache;
}
}