QueryStatsUtil.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.statistics.query;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.ClientPool;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.system.Frontend;
import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TGetQueryStatsRequest;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryStatsResult;
import org.apache.doris.thrift.TQueryStatsType;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TTableIndexQueryStats;
import org.apache.doris.thrift.TTableQueryStats;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class QueryStatsUtil {
    private static final Logger LOG = LogManager.getLogger(QueryStatsUtil.class);

    public static Map<String, Long> getMergedCatalogStats(String catalog) throws UserException {
        Map<String, Long> result = Env.getCurrentEnv().getQueryStats().getCatalogStats(catalog);
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.CATALOG);
        request.setCatalog(catalog);
        for (TQueryStatsResult other : getStats(request)) {
            other.getSimpleResult().forEach((k, v) -> {
                result.merge(k, v, Long::sum);
            });
        }
        return result;
    }

    public static Map<String, Long> getMergedDatabaseStats(String catalog, String db) throws UserException {
        Map<String, Long> result = Env.getCurrentEnv().getQueryStats().getDbStats(catalog, db);
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.DATABASE);
        request.setCatalog(catalog);
        request.setDb(db);
        for (TQueryStatsResult other : getStats(request)) {
            other.getSimpleResult().forEach((k, v) -> {
                result.merge(k, v, Long::sum);
            });
        }
        return result;
    }

    public static Map<String, Pair<Long, Long>> getMergedTableStats(String catalog, String db, String table)
            throws UserException {
        Map<String, Pair<Long, Long>> result = Env.getCurrentEnv().getQueryStats().getTblStats(catalog, db, table);
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.TABLE);
        request.setCatalog(catalog);
        request.setDb(db);
        request.setTbl(table);
        for (TQueryStatsResult other : getStats(request)) {
            for (TTableQueryStats ts : other.getTableStats()) {
                if (result.containsKey(ts.getField())) {
                    result.get(ts.getField()).first += ts.getQueryStats();
                    result.get(ts.getField()).second += ts.getFilterStats();
                } else {
                    result.put(ts.getField(), Pair.of(ts.getQueryStats(), ts.getFilterStats()));
                }
            }
        }
        return result;
    }

    public static Map<String, Long> getMergedTableAllStats(String catalog, String db, String table)
            throws UserException {
        Map<String, Long> result = Env.getCurrentEnv().getQueryStats().getTblAllStats(catalog, db, table);
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.TABLE_ALL);
        request.setCatalog(catalog);
        request.setDb(db);
        request.setTbl(table);
        for (TQueryStatsResult other : getStats(request)) {
            other.getSimpleResult().forEach((k, v) -> {
                result.merge(k, v, Long::sum);
            });
        }
        return result;
    }

    public static Map<String, Map<String, Pair<Long, Long>>> getMergedTableAllVerboseStats(String catalog, String db,
            String table) throws UserException {
        Map<String, Map<String, Pair<Long, Long>>> result = Env.getCurrentEnv().getQueryStats()
                .getTblAllVerboseStats(catalog, db, table);
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.TABLE_ALL_VERBOSE);
        request.setCatalog(catalog);
        request.setDb(db);
        request.setTbl(table);
        for (TQueryStatsResult other : getStats(request)) {
            for (TTableIndexQueryStats tis : other.getTableVerbosStats()) {
                if (result.containsKey(tis.getIndexName())) {
                    for (TTableQueryStats ts : tis.getTableStats()) {
                        if (result.get(tis.getIndexName()).containsKey(ts.getField())) {
                            result.get(tis.getIndexName()).get(ts.getField()).first += ts.getQueryStats();
                            result.get(tis.getIndexName()).get(ts.getField()).second += ts.getFilterStats();
                        } else {
                            result.get(tis.getIndexName())
                                    .put(ts.getField(), Pair.of(ts.getQueryStats(), ts.getFilterStats()));
                        }
                    }
                } else {
                    Map<String, Pair<Long, Long>> indexMap = new HashMap<>();
                    for (TTableQueryStats ts : tis.getTableStats()) {
                        indexMap.put(ts.getField(), Pair.of(ts.getQueryStats(), ts.getFilterStats()));
                    }
                    result.put(tis.getIndexName(), indexMap);
                }
            }
        }
        return result;
    }

    public static long getMergedReplicaStats(long replicaId) {
        long queryHits = Env.getCurrentEnv().getQueryStats().getStats(replicaId);
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.TABLET);
        request.setReplicaId(replicaId);
        for (TQueryStatsResult other : getStats(request)) {
            queryHits += other.getTabletStats().get(replicaId);
        }
        return queryHits;
    }

    public static Map<Long, Long> getMergedReplicasStats(List<Long> replicaIds) {
        Map<Long, Long> result = new HashMap<>();
        QueryStats qs = Env.getCurrentEnv().getQueryStats();
        for (long replicaId : replicaIds) {
            result.put(replicaId, qs.getStats(replicaId));
        }
        TGetQueryStatsRequest request = new TGetQueryStatsRequest();
        request.setType(TQueryStatsType.TABLETS);
        request.setReplicaIds(replicaIds);
        for (TQueryStatsResult other : getStats(request)) {
            other.getTabletStats().forEach((k, v) -> {
                result.merge(k, v, Long::sum);
            });
        }
        return result;
    }

    private static List<TQueryStatsResult> getStats(TGetQueryStatsRequest request) {
        List<TQueryStatsResult> results = new ArrayList<>();
        for (Frontend fe : Env.getCurrentEnv().getFrontends(null /* all */)) {
            if (!fe.isAlive() || fe.getHost().equals(Env.getCurrentEnv().getSelfNode().getHost())) {
                continue;
            }
            FrontendService.Client client = null;
            try {
                int waitTimeOut = ConnectContext.get() == null ? 300 : ConnectContext.get().getExecTimeoutS();
                client = ClientPool.frontendPool.borrowObject(new TNetworkAddress(fe.getHost(), fe.getRpcPort()),
                        waitTimeOut * 1000);
                TQueryStatsResult other = client.getQueryStats(request);
                if (!other.isSetStatus() || other.getStatus().getStatusCode() != TStatusCode.OK) {
                    LOG.info("Failed to collect stats from " + fe.getHost());
                    continue;
                }
                switch (request.getType()) {
                    case TABLET:
                    case TABLETS: {
                        if (!other.isSetTabletStats()) {
                            throw new DdlException("Failed to collect stats from " + fe.getHost());
                        }
                        if (other.getTabletStats().isEmpty()) {
                            LOG.info("get empty stats from " + fe.getHost());
                            continue;
                        }
                        break;
                    }
                    case TABLE: {
                        if (!other.isSetTableStats()) {
                            throw new DdlException("Failed to collect stats from " + fe.getHost());
                        }
                        if (other.getTableStats().isEmpty()) {
                            LOG.info("get empty stats from " + fe.getHost());
                            continue;
                        }
                        break;
                    }
                    case CATALOG:
                    case DATABASE:
                    case TABLE_ALL: {
                        if (!other.isSetSimpleResult()) {
                            throw new DdlException("Failed to collect stats from " + fe.getHost());
                        }
                        if (other.getSimpleResult().isEmpty()) {
                            LOG.info("get empty stats from " + fe.getHost());
                            continue;
                        }
                        break;
                    }

                    case TABLE_ALL_VERBOSE: {
                        if (!other.isSetTableVerbosStats()) {
                            throw new DdlException("Failed to collect stats from " + fe.getHost());
                        }
                        if (other.getTableVerbosStats().isEmpty()) {
                            continue;
                        }
                        break;
                    }
                    default: {
                        throw new DdlException("Unknown stats type: " + request.getType());
                    }
                }
                results.add(other);
            } catch (Exception e) {
                LOG.info("Failed to get fe client.", e);
            }
        }
        return results;
    }

}