CacheHotspotManagerUtils.java
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.cloud;
import org.apache.doris.analysis.DbName;
import org.apache.doris.analysis.SetType;
import org.apache.doris.analysis.UserIdentity;
import org.apache.doris.analysis.VariableExpr;
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Table;
import org.apache.doris.cloud.qe.ComputeGroupException;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.FeConstants;
import org.apache.doris.nereids.trees.plans.commands.CreateDatabaseCommand;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.qe.VariableMgr;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TUniqueId;
import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
public class CacheHotspotManagerUtils {
public static final int CACHE_HOT_SPOT_INSERT_TIMEOUT_IN_SEC = 300;
private static final Logger LOG = LogManager.getLogger(CacheHotspotManagerUtils.class);
private static final String TABLE_NAME = String.format("%s.%s",
FeConstants.INTERNAL_DB_NAME, FeConstants.INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME);
// TODO(yuejing): 如何加字段
private static final String CREATE_CACHE_TABLE =
"create table " + TABLE_NAME + " (\n"
+ " cluster_id varchar(65530),\n"
+ " backend_id bigint,\n"
+ " table_id bigint,\n"
+ " index_id bigint,\n"
+ " partition_id bigint,\n"
+ " insert_day DATEV2,\n"
+ " table_name varchar(65530),\n"
+ " index_name varchar(65530),\n"
+ " partition_name varchar(65530),\n"
+ " cluster_name varchar(65530),\n"
+ " file_cache_size bigint,\n"
+ " query_per_day bigint,\n"
+ " query_per_week bigint,\n"
+ " last_access_time DATETIMEV2)\n"
+ " UNIQUE KEY(cluster_id, backend_id, table_id, index_id, partition_id, insert_day)\n"
+ " PARTITION BY RANGE (insert_day) ()\n"
+ " DISTRIBUTED BY HASH (cluster_id) BUCKETS 1\n"
+ " PROPERTIES (\n"
+ " \"dynamic_partition.enable\" = \"true\",\n"
+ " \"dynamic_partition.buckets\" = \"1\",\n"
+ " \"dynamic_partition.time_unit\" = \"DAY\",\n"
+ " \"dynamic_partition.start\" = \"-7\",\n"
+ " \"dynamic_partition.end\" = \"3\",\n"
+ " \"dynamic_partition.prefix\" = \"p\",\n"
+ " \"dynamic_partition.create_history_partition\" = \"true\",\n"
+ " \"dynamic_partition.history_partition_num\" = \"7\"\n"
+ " );";
private static final String BATCH_INSERT_INTO_CACHE_TABLE_TEMPLATE =
"INSERT INTO " + TABLE_NAME + " values";
private static final String INSERT_INTO_CACHE_TABLE_TEMPLATE =
"('${cluster_id}', '${backend_id}', '${table_id}', '${index_id}',"
+ " '${partition_id}', '${insert_day}', '${table_name}', "
+ " '${index_name}', '${partition_name}', '${cluster_name}', "
+ "'${file_cache_size}', '${qpd}', '${qpw}', '${last_access_time}')";
private static final String CONTAINS_CLUSTER_TEMPLATE =
"SELECT COUNT(*) FROM " + TABLE_NAME
+ " WHERE '${cluster_id}' = 'cluster'";
private static final String GET_CLUSTER_PARTITIONS_TEMPLATE = "WITH t as (SELECT\n"
+ "table_name, table_id, partition_id,\n"
+ "partition_name, index_id, insert_day, sum(query_per_day) as query_per_day_total,\n"
+ "sum(query_per_week) as query_per_week_total\n"
+ "FROM " + TABLE_NAME + "\n"
+ "where cluster_id = '${cluster_id}' \n"
+ "group by cluster_id, cluster_name, table_id, table_name, partition_id,\n"
+ "partition_name, index_id, insert_day order by insert_day desc,\n"
+ "query_per_day_total desc, query_per_week_total desc)\n"
+ "select distinct table_id, table_name, partition_id, index_id from t;";
private static String INTERNAL_TABLE_ID;
private static int getCacheHotSpotInsertTimeoutInSecTimeout() {
try {
SessionVariable sessionVariable = VariableMgr.getDefaultSessionVariable();
VariableExpr variableExpr = new VariableExpr(SessionVariable.INTERNAL_CACHE_HOT_SPOT_TIMEOUT,
SetType.GLOBAL);
VariableMgr.getValue(sessionVariable, variableExpr);
return sessionVariable.cacheHotSpotTimeoutS;
} catch (Exception e) {
LOG.warn("Failed to get value of table_stats_health_threshold, return default", e);
}
return CACHE_HOT_SPOT_INSERT_TIMEOUT_IN_SEC;
}
public static boolean clusterContains(String clusterId) {
if (clusterId == null) {
return false;
}
Map<String, String> params = new HashMap<String, String>();
params.put("cluster_id", clusterId);
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(CONTAINS_CLUSTER_TEMPLATE);
List<ResultRow> result = null;
try {
result = StatisticsUtil.execStatisticQuery(sql, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
return !(result == null || result.size() == 0);
}
// table_id, table_name, index_name, partition_name
public static List<List<String>> getClusterTopNPartitions(String clusterId) {
if (clusterId == null) {
String err = String.format("cluster doesn't exist, clusterId %s", clusterId);
LOG.warn(err);
throw new RuntimeException(err);
}
Map<String, String> params = new HashMap<String, String>();
params.put("cluster_id", clusterId);
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(GET_CLUSTER_PARTITIONS_TEMPLATE);
List<ResultRow> result = null;
try {
result = StatisticsUtil.execStatisticQuery(sql, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
if (result == null) {
String err = String.format("Could not find the src cluster id {}"
+ " in __internal_schema.selectdb_cache_hotspot", clusterId);
LOG.warn(err);
throw new RuntimeException(err);
}
return result.stream().map(ResultRow::getValues).collect(Collectors.toList());
}
public static void transformIntoCacheHotSpotTableValue(Map<String, String> params, List<String> values) {
if (INTERNAL_TABLE_ID.equals(params.get("table_id"))) {
// we don't insert into internal table
return;
}
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(INSERT_INTO_CACHE_TABLE_TEMPLATE);
values.add(sql);
}
public static void doBatchInsert(List<String> values) throws Exception {
if (values.isEmpty()) {
return;
}
StringBuilder query = new StringBuilder(BATCH_INSERT_INTO_CACHE_TABLE_TEMPLATE);
for (int i = 0; i < values.size(); i++) {
query.append(values.get(i));
if (i + 1 != values.size()) {
query.append(",");
} else {
query.append(";");
}
}
execUpdate(query.toString());
}
public static void execUpdate(String sql) throws Exception {
try (AutoCloseConnectContext r = buildConnectContext()) {
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
}
}
private static void execCreateDatabase() throws Exception {
CreateDatabaseCommand command = new CreateDatabaseCommand(true,
new DbName("", FeConstants.INTERNAL_DB_NAME),
null);
try {
Env.getCurrentEnv().createDb(command);
} catch (DdlException e) {
LOG.warn("Failed to create database: {}, will try again later",
FeConstants.INTERNAL_DB_NAME, e);
}
}
public static void execCreateCacheTable() throws Exception {
try (AutoCloseConnectContext r = buildConnectContext()) {
execCreateDatabase();
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, CREATE_CACHE_TABLE);
r.connectContext.setExecutor(stmtExecutor);
stmtExecutor.execute();
}
Database db = Env.getCurrentInternalCatalog().getDbNullable(FeConstants.INTERNAL_DB_NAME);
if (db == null) {
LOG.warn("{} database doesn't exist", FeConstants.INTERNAL_DB_NAME);
}
Table t = db.getTableNullable(FeConstants.INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME);
if (t == null) {
LOG.warn("{} table doesn't exist", FeConstants.INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME);
}
INTERNAL_TABLE_ID = String.valueOf(t.getId());
}
public static AutoCloseConnectContext buildConnectContext() {
ConnectContext connectContext = new ConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
// sessionVariable.setMaxExecMemByte(StatisticConstants.STATISTICS_MAX_MEM_PER_QUERY_IN_BYTES);
sessionVariable.setEnableInsertStrict(true);
sessionVariable.setInsertMaxFilterRatio(1);
// sessionVariable.parallelExecInstanceNum = StatisticConstants.STATISTIC_PARALLEL_EXEC_INSTANCE_NUM;
sessionVariable.enableProfile = false;
connectContext.setEnv(Env.getCurrentEnv());
connectContext.setDatabase(FeConstants.INTERNAL_DB_NAME);
UUID uuid = UUID.randomUUID();
TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits());
connectContext.setQueryId(queryId);
connectContext.setStartTime();
connectContext.setCurrentUserIdentity(UserIdentity.ADMIN);
connectContext.setQualifiedUser(UserIdentity.ADMIN.getQualifiedUser());
connectContext.setUserInsertTimeout(getCacheHotSpotInsertTimeoutInSecTimeout());
return new AutoCloseConnectContext(connectContext);
}
public static class AutoCloseConnectContext implements AutoCloseable {
public final ConnectContext connectContext;
private final ConnectContext previousContext;
public AutoCloseConnectContext(ConnectContext connectContext) {
this.previousContext = ConnectContext.get();
this.connectContext = connectContext;
connectContext.setThreadLocalInfo();
try {
connectContext.getCloudCluster();
} catch (ComputeGroupException e) {
LOG.warn("failed to get compute group name", e);
}
}
@Override
public void close() {
ConnectContext.remove();
if (previousContext != null) {
previousContext.setThreadLocalInfo();
}
}
}
}