WorkloadRuntimeStatusMgr.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.resource.workloadschedpolicy;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.Config;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.MasterDaemon;
import org.apache.doris.plugin.AuditEvent;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TReportWorkloadRuntimeStatusParams;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;

// NOTE: not using a lock for beToQueryStatsMap's update because it should void global lock for all be
// this may cause in some corner case missing statistics update,for example:
// time1: clear logic judge query 1 is timeout
// time2: query 1 is update by report
// time3: clear logic remove query 1
// in this case, lost query stats is allowed. because query report time out is 60s by default,
// when this case happens, we should find why be not report for so long first.
public class WorkloadRuntimeStatusMgr extends MasterDaemon {

    private static final Logger LOG = LogManager.getLogger(WorkloadRuntimeStatusMgr.class);
    private Map<Long, BeReportInfo> beToQueryStatsMap = Maps.newConcurrentMap();
    private final ReentrantReadWriteLock queryAuditEventLock = new ReentrantReadWriteLock();
    private List<AuditEvent> queryAuditEventList = Lists.newLinkedList();

    private class BeReportInfo {
        volatile long beLastReportTime;

        BeReportInfo(long beLastReportTime) {
            this.beLastReportTime = beLastReportTime;
        }

        Map<String, Pair<Long, TQueryStatistics>> queryStatsMap = Maps.newConcurrentMap();
    }

    public WorkloadRuntimeStatusMgr() {
        super("workload-runtime-stats-thread", Config.workload_runtime_status_thread_interval_ms);
    }

    @Override
    protected void runAfterCatalogReady() {
        // 1 merge be query statistics
        Map<String, TQueryStatistics> queryStatisticsMap = getQueryStatisticsMap();

        // 2 log query audit
        try {
            List<AuditEvent> auditEventList = getQueryNeedAudit();
            int missedLogCount = 0;
            int succLogCount = 0;
            for (AuditEvent auditEvent : auditEventList) {
                TQueryStatistics queryStats = queryStatisticsMap.get(auditEvent.queryId);
                if (queryStats != null) {
                    auditEvent.scanRows = queryStats.scan_rows;
                    auditEvent.scanBytes = queryStats.scan_bytes;
                    auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage;
                    auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage;
                    auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
                    auditEvent.cpuTimeMs = queryStats.cpu_ms;
                    auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
                    auditEvent.shuffleSendRows = queryStats.shuffle_send_rows;
                    auditEvent.spillWriteBytesToLocalStorage = queryStats.spill_write_bytes_to_local_storage;
                    auditEvent.spillReadBytesFromLocalStorage = queryStats.spill_read_bytes_from_local_storage;
                }
                boolean ret = Env.getCurrentAuditEventProcessor().handleAuditEvent(auditEvent);
                if (!ret) {
                    missedLogCount++;
                } else {
                    succLogCount++;
                }
            }
            if (missedLogCount > 0) {
                LOG.warn("discard audit event because of log queue is full, discard num : {}, succ num : {}",
                        missedLogCount, succLogCount);
            }
        } catch (Throwable t) {
            LOG.warn("exception happens when handleAuditEvent, ", t);
        }

        // 3 clear beToQueryStatsMap when be report timeout
        clearReportTimeoutBeStatistics();
    }

    public void submitFinishQueryToAudit(AuditEvent event) {
        queryAuditEventLogWriteLock();
        try {
            if (queryAuditEventList.size() > Config.audit_event_log_queue_size) {
                // if queryAuditEventList is full, we don't put the event to queryAuditEventList.
                // so that the statistic info of this audit event will be ignored, and event will be logged directly.
                LOG.warn("audit log event queue size {} is full, this may cause audit log missing statistics."
                                + "you can check whether qps is too high or "
                                + "set audit_event_log_queue_size to a larger value in fe.conf. query id: {}",
                        queryAuditEventList.size(), event.queryId);
                Env.getCurrentAuditEventProcessor().handleAuditEvent(event);
            } else {
                // put the event to queryAuditEventList and let the worker thread to handle it.
                // the worker thread will try best to wait for the statistic info before logging this event.
                event.pushToAuditLogQueueTime = System.currentTimeMillis();
                queryAuditEventList.add(event);
            }
        } finally {
            queryAuditEventLogWriteUnlock();
        }
    }

    private List<AuditEvent> getQueryNeedAudit() {
        List<AuditEvent> ret = new ArrayList<>();
        long currentTime = System.currentTimeMillis();
        queryAuditEventLogWriteLock();
        try {
            int queryAuditLogTimeout = Config.query_audit_log_timeout_ms;
            Iterator<AuditEvent> iter = queryAuditEventList.iterator();
            while (iter.hasNext()) {
                AuditEvent ae = iter.next();
                if (currentTime - ae.pushToAuditLogQueueTime > queryAuditLogTimeout) {
                    ret.add(ae);
                    iter.remove();
                } else {
                    break;
                }
            }
        } finally {
            queryAuditEventLogWriteUnlock();
        }
        return ret;
    }

    public void updateBeQueryStats(TReportWorkloadRuntimeStatusParams params) {
        if (!params.isSetBackendId()) {
            LOG.warn("be report workload runtime status but without beid");
            return;
        }
        if (!params.isSetQueryStatisticsMap()) {
            LOG.warn("be report workload runtime status but without query stats map");
            return;
        }
        long beId = params.backend_id;
        // NOTE(wb) one be sends update request one by one,
        // so there is no need a global lock for beToQueryStatsMap here,
        // just keep one be's put/remove/get is atomic operation is enough
        long currentTime = System.currentTimeMillis();
        BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
        if (beReportInfo == null) {
            beReportInfo = new BeReportInfo(currentTime);
            beToQueryStatsMap.put(beId, beReportInfo);
        } else {
            beReportInfo.beLastReportTime = currentTime;
        }
        for (Map.Entry<String, TQueryStatistics> entry : params.query_statistics_map.entrySet()) {
            beReportInfo.queryStatsMap.put(entry.getKey(), Pair.of(currentTime, (TQueryStatistics) entry.getValue()));
        }
    }

    void clearReportTimeoutBeStatistics() {
        // 1 clear report timeout be
        Set<Long> currentBeIdSet = beToQueryStatsMap.keySet();
        Long currentTime = System.currentTimeMillis();
        for (Long beId : currentBeIdSet) {
            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
            if (currentTime - beReportInfo.beLastReportTime > Config.be_report_query_statistics_timeout_ms) {
                beToQueryStatsMap.remove(beId);
                continue;
            }
            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
            for (String queryId : queryIdSet) {
                Pair<Long, TQueryStatistics> pair = beReportInfo.queryStatsMap.get(queryId);
                long queryLastReportTime = pair.first;
                if (currentTime - queryLastReportTime > Config.be_report_query_statistics_timeout_ms) {
                    beReportInfo.queryStatsMap.remove(queryId);
                }
            }
        }
    }

    // NOTE: currently getQueryStatisticsMap must be called before clear beToQueryStatsMap
    // so there is no need lock or null check when visit beToQueryStatsMap
    public Map<String, TQueryStatistics> getQueryStatisticsMap() {
        // 1 merge query stats in all be
        Set<Long> beIdSet = beToQueryStatsMap.keySet();
        Map<String, TQueryStatistics> resultQueryMap = Maps.newHashMap();
        for (Long beId : beIdSet) {
            BeReportInfo beReportInfo = beToQueryStatsMap.get(beId);
            Set<String> queryIdSet = beReportInfo.queryStatsMap.keySet();
            for (String queryId : queryIdSet) {
                TQueryStatistics curQueryStats = beReportInfo.queryStatsMap.get(queryId).second;

                TQueryStatistics retQuery = resultQueryMap.get(queryId);
                if (retQuery == null) {
                    retQuery = new TQueryStatistics();
                    resultQueryMap.put(queryId, retQuery);
                }
                mergeQueryStatistics(retQuery, curQueryStats);
            }
        }

        return resultQueryMap;
    }

    private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
        dst.scan_rows += src.scan_rows;
        dst.scan_bytes += src.scan_bytes;
        dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
        dst.scan_bytes_from_remote_storage += src.scan_bytes_from_remote_storage;
        dst.cpu_ms += src.cpu_ms;
        dst.shuffle_send_bytes += src.shuffle_send_bytes;
        dst.shuffle_send_rows += src.shuffle_send_rows;
        if (dst.max_peak_memory_bytes < src.max_peak_memory_bytes) {
            dst.max_peak_memory_bytes = src.max_peak_memory_bytes;
        }
        dst.spill_write_bytes_to_local_storage += src.spill_write_bytes_to_local_storage;
        dst.spill_read_bytes_from_local_storage += src.spill_read_bytes_from_local_storage;
    }

    private void queryAuditEventLogWriteLock() {
        queryAuditEventLock.writeLock().lock();
    }

    private void queryAuditEventLogWriteUnlock() {
        queryAuditEventLock.writeLock().unlock();
    }
}