RuntimeProfile.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.common.profile;

import org.apache.doris.common.Pair;
import org.apache.doris.common.Reference;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.thrift.TCounter;
import org.apache.doris.thrift.TPlanNodeRuntimeStatsItem;
import org.apache.doris.thrift.TRuntimeProfileNode;
import org.apache.doris.thrift.TRuntimeProfileTree;
import org.apache.doris.thrift.TUnit;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Formatter;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * It is accessed by two kinds of thread, one is to create this RuntimeProfile
 * , named 'query thread', the other is to call
 * {@link org.apache.doris.common.proc.CurrentQueryInfoProvider}.
 */
public class RuntimeProfile {
    // TODO: 这里维护性太差了
    // BE 上的 OperatorXBase::init 里面有 Operator 的命名规则
    // 简而言之是把 XXX_NODE 平替为 XXX_OPERATOR
    // 所以这里应该把 gensrc/thrift/PlanNodes.thrift 里面定义的 NodeType
    // 手动转为了其对应 BE 上 operator 的名字
    // 有一些 Node 比较特殊,它在 BE 上是作为 Pipeline breaker 的,比如 AGGREGATION_NODE,
    // 也得手动把这类 operator 的名字添加到这里
    public static final ImmutableSet<String> PLAN_NODE_TYPE_MAP =
            ImmutableSet.<String>builder()
                    .add("OLAP_SCAN_OPERATOR")
                    .add("AGGREGATION_OPERATOR")
                    .add("AGGREGATION_SINK_OPERATOR")
                    .add("STREAMING_AGGREGATION_OPERATOR")
                    .add("HASH_JOIN_OPERATOR")
                    .add("HASH_JOIN_SINK_OPERATOR").build();
    private static final Logger LOG = LogManager.getLogger(RuntimeProfile.class);
    public static String ROOT_COUNTER = "";
    public static String MAX_TIME_PRE = "max ";
    public static String MIN_TIME_PRE = "min ";
    public static String AVG_TIME_PRE = "avg ";
    public static String SUM_TIME_PRE = "sum ";
    @SerializedName(value = "counterTotalTime")
    private Counter counterTotalTime;
    @SerializedName(value = "localTimePercent")
    private double localTimePercent = 0;
    @SerializedName(value = "infoStrings")
    private Map<String, String> infoStrings = Maps.newHashMap();
    @SerializedName(value = "infoStringsDisplayOrder")
    private List<String> infoStringsDisplayOrder = Lists.newArrayList();
    private transient ReentrantReadWriteLock infoStringsLock = new ReentrantReadWriteLock();

    @SerializedName(value = "counterMap")
    private Map<String, Counter> counterMap = Maps.newConcurrentMap();
    @SerializedName(value = "childCounterMap")
    private Map<String, TreeSet<String>> childCounterMap = Maps.newConcurrentMap();
    // protect TreeSet in ChildCounterMap
    private transient ReentrantReadWriteLock counterLock = new ReentrantReadWriteLock();
    @SerializedName(value = "childMap")
    private Map<String, RuntimeProfile> childMap = Maps.newConcurrentMap();
    @SerializedName(value = "childList")
    private LinkedList<Pair<RuntimeProfile, Boolean>> childList = Lists.newLinkedList();
    private transient ReentrantReadWriteLock childLock = new ReentrantReadWriteLock();
    @SerializedName(value = "planNodeInfos")
    private List<String> planNodeInfos = Lists.newArrayList();

    @SerializedName(value = "name")
    private String name = "";
    @SerializedName(value = "timestamp")
    private Long timestamp = -1L;
    @SerializedName(value = "isDone")
    private Boolean isDone = false;
    @SerializedName(value = "isCancel")
    private Boolean isCancel = false;
    // In pipelineX, we have explicitly split the Operator into sink and operator,
    // and we can distinguish them using tags.
    // In the old pipeline, we can only differentiate them based on their position
    // in the profile, which is quite tricky and only transitional.
    @SerializedName(value = "isSinkOperator")
    private Boolean isSinkOperator = false;
    @SerializedName(value = "nodeid")
    private int nodeid = -1;

    public Map<String, Long> rowsProducedMap = new HashMap<>();

    public RuntimeProfile() {
        init();
    }

    public RuntimeProfile(String name) {
        if (Strings.isNullOrEmpty(name)) {
            throw new RuntimeException("Profile name must not be null");
        }
        this.name = name;
        this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 1);
        this.counterMap.put("TotalTime", counterTotalTime);
        init();
    }

    public RuntimeProfile(String name, int nodeId) {
        if (Strings.isNullOrEmpty(name)) {
            throw new RuntimeException("Profile name must not be null");
        }
        this.name = name;
        this.nodeid = nodeId;
        this.counterTotalTime = new Counter(TUnit.TIME_NS, 0, 3);
        this.counterMap.put("TotalTime", counterTotalTime);
        init();
    }

    private void init() {
        this.infoStringsLock = new ReentrantReadWriteLock();
        this.childLock = new ReentrantReadWriteLock();
        this.counterLock = new ReentrantReadWriteLock();
    }

    public static RuntimeProfile read(DataInput input) throws IOException {
        return GsonUtils.GSON.fromJson(Text.readString(input), RuntimeProfile.class);
    }

    public void setIsCancel(Boolean isCancel) {
        this.isCancel = isCancel;
    }

    public Boolean getIsCancel() {
        return isCancel;
    }

    public void setIsDone(Boolean isDone) {
        this.isDone = isDone;
    }

    public Boolean getIsDone() {
        return isDone;
    }

    public String getName() {
        return name;
    }

    public Counter getCounterTotalTime() {
        Counter totalTimeCounter = counterMap.get("TotalTime");
        if (totalTimeCounter == null) {
            return counterTotalTime;
        }
        return totalTimeCounter;
    }

    public int nodeId() {
        return this.nodeid;
    }

    public Map<String, Counter> getCounterMap() {
        return counterMap;
    }

    public List<Pair<RuntimeProfile, Boolean>> getChildList() {
        return childList;
    }

    public boolean isEmpty() {
        return childList.isEmpty();
    }

    public Map<String, RuntimeProfile> getChildMap() {
        return childMap;
    }

    public Map<String, TreeSet<String>> getChildCounterMap() {
        return childCounterMap;
    }



    public Counter addCounter(String name, TUnit type, String parentCounterName) {
        counterLock.writeLock().lock();
        try {
            Counter counter = this.counterMap.get(name);
            if (counter != null) {
                return counter;
            } else {
                Preconditions.checkState(parentCounterName.equals(ROOT_COUNTER)
                        || this.counterMap.containsKey(parentCounterName));
                Counter newCounter = new Counter(type, 0);
                this.counterMap.put(name, newCounter);

                Set<String> childCounters = childCounterMap.get(parentCounterName);
                if (childCounters == null) {
                    childCounterMap.put(parentCounterName, new TreeSet<String>());
                    childCounters = childCounterMap.get(parentCounterName);
                }
                childCounters.add(name);
                return newCounter;
            }
        } finally {
            counterLock.writeLock().unlock();
        }
    }

    public void addCounter(String name, Counter newCounter, String parentCounterName) {
        counterLock.writeLock().lock();
        try {
            Counter counter = this.counterMap.get(name);
            if (counter != null) {
                return;
            } else {
                Preconditions.checkState(parentCounterName.equals(ROOT_COUNTER)
                        || this.counterMap.containsKey(parentCounterName));
                this.counterMap.put(name, newCounter);

                Set<String> childCounters = childCounterMap.get(parentCounterName);
                if (childCounters == null) {
                    childCounterMap.put(parentCounterName, new TreeSet<String>());
                    childCounters = childCounterMap.get(parentCounterName);
                }
                childCounters.add(name);
            }
        } finally {
            counterLock.writeLock().unlock();
        }
    }

    public void update(final TRuntimeProfileTree thriftProfile) {
        Reference<Integer> idx = new Reference<Integer>(0);
        update(thriftProfile.nodes, idx);
        Preconditions.checkState(idx.getRef().equals(thriftProfile.nodes.size()));
    }

    // preorder traversal, idx should be modified in the traversal process
    private void update(List<TRuntimeProfileNode> nodes, Reference<Integer> idx) {
        TRuntimeProfileNode node = nodes.get(idx.getRef());
        // Make sure to update the latest LoadChannel profile according to the
        // timestamp.
        if (node.timestamp != -1 && node.timestamp < timestamp) {
            return;
        }
        if (node.isSetMetadata()) {
            this.nodeid = (int) node.getMetadata();
        }

        Preconditions.checkState(timestamp == -1 || node.timestamp != -1);
        // update this level's counters
        if (node.counters != null) {
            for (TCounter tcounter : node.counters) {
                // If different node has counter with the same name, it will lead to chaos.
                Counter counter = this.counterMap.get(tcounter.name);
                if (counter == null) {
                    if (tcounter.isSetDescription()) {
                        counterMap.put(tcounter.name, new Counter(tcounter.description));
                    } else {
                        counterMap.put(tcounter.name, new Counter(tcounter.type, tcounter.value, tcounter.level));
                    }
                } else {
                    counter.setLevel(tcounter.level);
                    if (counter.getType() != tcounter.type) {
                        LOG.error("Cannot update counters with the same name but different types"
                                + " type=" + tcounter.type);
                    } else if (tcounter.isSetDescription()) {
                        continue;
                    } else {
                        counter.setValue(tcounter.type, tcounter.value);
                    }
                }
            }

            if (node.child_counters_map != null) {
                // update childCounters
                for (Map.Entry<String, Set<String>> entry : node.child_counters_map.entrySet()) {
                    String parentCounterName = entry.getKey();

                    counterLock.writeLock().lock();
                    try {
                        Set<String> childCounters = childCounterMap.get(parentCounterName);
                        if (childCounters == null) {
                            childCounterMap.put(parentCounterName, new TreeSet<String>());
                            childCounters = childCounterMap.get(parentCounterName);
                        }
                        childCounters.addAll(entry.getValue());
                    } finally {
                        counterLock.writeLock().unlock();
                    }
                }
            }
        }

        if (node.info_strings_display_order != null) {
            Map<String, String> nodeInfoStrings = node.info_strings;
            for (String key : node.info_strings_display_order) {
                String value = nodeInfoStrings.get(key);
                Preconditions.checkState(value != null);
                infoStringsLock.writeLock().lock();
                try {
                    if (this.infoStrings.containsKey(key)) {
                        // exists then replace
                        this.infoStrings.put(key, value);
                    } else {
                        this.infoStrings.put(key, value);
                        this.infoStringsDisplayOrder.add(key);
                    }
                } finally {
                    infoStringsLock.writeLock().unlock();
                }
            }
        }

        idx.setRef(idx.getRef() + 1);

        for (int i = 0; i < node.num_children; i++) {
            TRuntimeProfileNode tchild = nodes.get(idx.getRef());
            String childName = tchild.name;
            RuntimeProfile childProfile;

            childLock.writeLock().lock();
            try {
                childProfile = this.childMap.get(childName);
                if (childProfile == null) {
                    childMap.put(childName, new RuntimeProfile(childName));
                    childProfile = this.childMap.get(childName);
                    Pair<RuntimeProfile, Boolean> pair = Pair.of(childProfile, tchild.indent);
                    this.childList.add(pair);
                }
            } finally {
                childLock.writeLock().unlock();
            }
            childProfile.update(nodes, idx);
        }
    }

    public class Brief {
        String name;
        long rowsReturned = 0;
        String totalTime = "";
        List<Brief> children = new ArrayList<>();
    }

    public Brief toBrief() {
        Brief brief = new Brief();
        brief.name = this.name;
        brief.rowsReturned = 0L;

        counterLock.readLock().lock();
        try {
            Counter rowsReturnedCounter = counterMap.get("RowsReturned");
            if (rowsReturnedCounter != null) {
                brief.rowsReturned = rowsReturnedCounter.getValue();
            }
            Counter totalTimeCounter = counterMap.get("TotalTime");
            if (totalTimeCounter != null) {
                brief.totalTime = printCounter(totalTimeCounter.getValue(), totalTimeCounter.getType());
            }
        } finally {
            counterLock.readLock().unlock();
        }

        childLock.readLock().lock();
        try {
            for (Pair<RuntimeProfile, Boolean> pair : childList) {
                brief.children.add(pair.first.toBrief());
            }
        } finally {
            childLock.readLock().unlock();
        }

        return brief;
    }

    private void printActimeCounter(StringBuilder builder) {
        Counter counter = this.counterMap.get("ExecTime");
        if (counter == null) {
            counter = this.counterMap.get("TotalTime");
        }
        if (counter.getValue() != 0) {
            try (Formatter fmt = new Formatter()) {
                builder.append("(ExecTime: ")
                        .append(RuntimeProfile.printCounter(counter.getValue(), counter.getType()))
                        .append(")");
            }
        }
    }

    // Print the profile:
    // 1. Profile Name
    // 2. Info Strings
    // 3. Counters
    // 4. Children
    public void prettyPrint(StringBuilder builder, String prefix) {
        // 1. profile name
        builder.append(prefix).append(name).append(":");
        // total time
        printActimeCounter(builder);

        builder.append("\n");

        // plan node info
        printPlanNodeInfo(prefix + "   ", builder);

        // 2. info String
        infoStringsLock.readLock().lock();
        try {
            for (String key : this.infoStringsDisplayOrder) {
                builder.append(prefix);
                if (SummaryProfile.EXECUTION_SUMMARY_KEYS_INDENTATION.containsKey(key)) {
                    for (int i = 0; i < SummaryProfile.EXECUTION_SUMMARY_KEYS_INDENTATION.get(key); i++) {
                        builder.append("  ");
                    }
                }
                builder.append("   - ").append(key).append(": ")
                        .append(this.infoStrings.get(key)).append("\n");
            }
        } finally {
            infoStringsLock.readLock().unlock();
        }

        // 3. counters
        try {
            printChildCounters(prefix, ROOT_COUNTER, builder);
        } catch (Exception e) {
            builder.append("print child counters error: ").append(e.getMessage());
        }


        // 4. children
        childLock.readLock().lock();
        try {
            for (int i = 0; i < childList.size(); i++) {
                Pair<RuntimeProfile, Boolean> pair = childList.get(i);
                boolean indent = pair.second;
                RuntimeProfile profile = pair.first;
                profile.prettyPrint(builder, prefix + (indent ? "  " : ""));
            }
        } finally {
            childLock.readLock().unlock();
        }
    }

    private void printPlanNodeInfo(String prefix, StringBuilder builder) {
        if (planNodeInfos.isEmpty()) {
            return;
        }
        builder.append(prefix).append("- ").append("PlanInfo\n");

        for (String info : planNodeInfos) {
            builder.append(prefix).append("   - ").append(info).append("\n");
        }
    }

    private static List<RuntimeProfile> getChildListFromLists(String profileName, List<RuntimeProfile> profiles) {
        List<RuntimeProfile> ret = new ArrayList<RuntimeProfile>();
        for (RuntimeProfile profile : profiles) {
            RuntimeProfile tmp = profile.getChildMap().get(profileName);
            if (tmp != null) {
                ret.add(profile.getChildMap().get(profileName));
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("could not find {} from {}", profileName, profile.toString());
                }
            }
        }
        return ret;
    }

    public String toString() {
        StringBuilder builder = new StringBuilder();
        prettyPrint(builder, "");
        return builder.toString();
    }

    public static void mergeProfiles(List<RuntimeProfile> profiles,
            RuntimeProfile simpleProfile, Map<Integer, String> planNodeMap) {
        mergeCounters(ROOT_COUNTER, profiles, simpleProfile);
        if (profiles.isEmpty()) {
            return;
        }
        RuntimeProfile templateProfile = profiles.get(0);
        for (int i = 0; i < templateProfile.childList.size(); i++) {
            RuntimeProfile templateChildProfile = templateProfile.childList.get(i).first;
            // Traverse all profiles and get the child profile with the same name
            List<RuntimeProfile> allChilds = getChildListFromLists(templateChildProfile.name, profiles);
            RuntimeProfile newCreatedMergedChildProfile = new RuntimeProfile(templateChildProfile.name,
                    templateChildProfile.nodeId());
            mergeProfiles(allChilds, newCreatedMergedChildProfile, planNodeMap);
            // RuntimeProfile has at least one counter named TotalTime, should exclude it.
            if (newCreatedMergedChildProfile.counterMap.size() > 1) {
                simpleProfile.addChildWithCheck(newCreatedMergedChildProfile, planNodeMap,
                                            templateProfile.childList.get(i).second);
                simpleProfile.rowsProducedMap.putAll(newCreatedMergedChildProfile.rowsProducedMap);
            }
        }
    }

    static void mergeCounters(String parentCounterName, List<RuntimeProfile> profiles,
            RuntimeProfile simpleProfile) {
        if (profiles.isEmpty()) {
            return;
        }
        RuntimeProfile templateProfile = profiles.get(0);
        Map<String, Counter> templateCounterMap = templateProfile.counterMap;
        Pattern pattern = Pattern.compile("nereids_id=(\\d+)");
        Matcher matcher = pattern.matcher(templateProfile.getName());
        String nereidsId = null;
        if (matcher.find()) {
            nereidsId = matcher.group(1);
        }
        Set<String> childCounterSet = templateProfile.childCounterMap.get(parentCounterName);
        if (childCounterSet == null) {
            return;
        }
        for (String childCounterName : childCounterSet) {
            Counter counter = templateProfile.counterMap.get(childCounterName);
            if (counter == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Child counter {} of {} not found in profile {}", childCounterName, parentCounterName,
                            templateProfile.toString());
                }
                continue;
            }
            if (counter.getLevel() == 1) {
                Counter oldCounter = templateCounterMap.get(childCounterName);
                AggCounter aggCounter = new AggCounter(oldCounter.getType());
                for (RuntimeProfile profile : profiles) {
                    // orgCounter could be null if counter-structure is changed
                    // change of counter structure happens when NonZeroCounter is involved.
                    // So here we have to ignore the counter if it is not found in the profile.
                    Counter orgCounter = profile.counterMap.get(childCounterName);
                    aggCounter.addCounter(orgCounter);
                }
                if (nereidsId != null && childCounterName.equals("RowsProduced")) {
                    simpleProfile.rowsProducedMap.put(nereidsId, aggCounter.sum.getValue());
                }
                if (simpleProfile.counterMap.containsKey(parentCounterName)) {
                    simpleProfile.addCounter(childCounterName, aggCounter, parentCounterName);
                } else {
                    simpleProfile.addCounter(childCounterName, aggCounter, ROOT_COUNTER);
                }
            }
            mergeCounters(childCounterName, profiles, simpleProfile);
        }
    }

    private void printChildCounters(String prefix, String counterName, StringBuilder builder) {
        Set<String> childCounterSet = childCounterMap.get(counterName);
        if (childCounterSet == null) {
            return;
        }

        counterLock.readLock().lock();
        try {
            for (String childCounterName : childCounterSet) {
                Counter counter = this.counterMap.get(childCounterName);
                if (counter != null) {
                    builder.append(prefix).append("   - ").append(childCounterName).append(": ")
                            .append(counter.print()).append("\n");
                    this.printChildCounters(prefix + "  ", childCounterName, builder);
                } else {
                    throw new RuntimeException("Child counter " + childCounterName + " of " + counterName
                            + " not found in profile");
                }
            }
        } finally {
            counterLock.readLock().unlock();
        }
    }

    public static String printCounter(long value, TUnit type) {
        StringBuilder builder = new StringBuilder();
        long tmpValue = value;
        switch (type) {
            case NONE: {
                // Do nothing, it is just a label
                break;
            }
            case UNIT: {
                Pair<Double, String> pair = DebugUtil.getUint(tmpValue);
                if (pair.second.isEmpty()) {
                    builder.append(tmpValue);
                } else {
                    builder.append(pair.first).append(pair.second)
                            .append(" (").append(tmpValue).append(")");
                }
                break;
            }
            case TIME_NS: {
                if (tmpValue >= DebugUtil.BILLION) {
                    // If the time is over a second, print it up to ms.
                    tmpValue /= DebugUtil.MILLION;
                    DebugUtil.printTimeMs(tmpValue, builder);
                } else if (tmpValue >= DebugUtil.MILLION) {
                    // if the time is over a ms, print it up to microsecond in the unit of ms.
                    tmpValue /= 1000;
                    builder.append(tmpValue / 1000).append(".").append(tmpValue % 1000).append("ms");
                } else if (tmpValue > 1000) {
                    // if the time is over a microsecond, print it using unit microsecond
                    builder.append(tmpValue / 1000).append(".").append(tmpValue % 1000).append("us");
                } else {
                    builder.append(tmpValue).append("ns");
                }
                break;
            }
            case TIME_MS: {
                if (tmpValue >= DebugUtil.THOUSAND) {
                    // If the time is over a second, print it up to ms.
                    DebugUtil.printTimeMs(tmpValue, builder);
                } else {
                    builder.append(tmpValue).append("ms");
                }
                break;
            }
            case BYTES: {
                Pair<Double, String> pair = DebugUtil.getByteUint(tmpValue);
                Formatter fmt = new Formatter();
                builder.append(fmt.format("%.2f", pair.first)).append(" ").append(pair.second);
                fmt.close();
                break;
            }
            case BYTES_PER_SECOND: {
                Pair<Double, String> pair = DebugUtil.getByteUint(tmpValue);
                builder.append(pair.first).append(" ").append(pair.second).append("/sec");
                break;
            }
            case DOUBLE_VALUE: {
                Formatter fmt = new Formatter();
                builder.append(fmt.format("%.2f", (double) tmpValue));
                fmt.close();
                break;
            }
            case UNIT_PER_SECOND: {
                Pair<Double, String> pair = DebugUtil.getUint(tmpValue);
                if (pair.second.isEmpty()) {
                    builder.append(tmpValue);
                } else {
                    builder.append(pair.first).append(pair.second)
                            .append(" ").append("/sec");
                }
                break;
            }
            default: {
                Preconditions.checkState(false, "type=" + type);
                break;
            }
        }
        return builder.toString();
    }

    public void addChild(RuntimeProfile child, boolean indent) {
        if (child == null) {
            return;
        }

        childLock.writeLock().lock();
        try {
            if (childMap.containsKey(child.name)) {
                // Pipeline/Instance has alread finished.
                // This could happen because the report profile rpc is async.
                if (childMap.get(child.name).getIsDone() || childMap.get(child.name).getIsCancel()) {
                    return;
                } else {
                    childList.removeIf(e -> e.first.name.equals(child.name));
                }
            }
            this.childMap.put(child.name, child);
            Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
            this.childList.add(pair);
        } finally {
            childLock.writeLock().unlock();
        }
    }

    public void addChildWithCheck(RuntimeProfile child, Map<Integer, String> planNodeMap, boolean indent) {
        // check name
        if (child.name.startsWith("PipelineTask") || child.name.startsWith("PipelineContext")) {
            return;
        }
        childLock.writeLock().lock();
        try {
            Pair<RuntimeProfile, Boolean> pair = Pair.of(child, indent);
            this.childList.add(pair);
        } finally {
            childLock.writeLock().unlock();
        }
        // insert plan node info to profile string info
        if (planNodeMap == null || !planNodeMap.containsKey(child.nodeId())) {
            return;
        }

        /*
         * The check for SINK_OPERATOR is performed because SINK_OPERATOR does not have
         * a corresponding plan node ID.
         * Currently, the plan node info is only printed for non-sink nodes in the merge
         * profile.
         */
        if (name.contains("_SINK_OPERATOR")) {
            child.addPlanNodeInfos(planNodeMap.get(child.nodeId()));
            planNodeMap.remove(child.nodeId());
        }
    }

    public void addPlanNodeInfos(String infos) {
        String[] infoList = infos.split("\n");
        for (String info : infoList) {
            planNodeInfos.add(info);
        }
    }

    public void computeTimeInProfile() {
        computeTimeInProfile(this.counterTotalTime.getValue());
    }

    private void computeTimeInProfile(long total) {
        if (total == 0) {
            return;
        }

        childLock.readLock().lock();
        try {
            // Add all the total times in all the children
            long totalChildTime = 0;
            for (int i = 0; i < this.childList.size(); ++i) {
                totalChildTime += childList.get(i).first.getCounterTotalTime().getValue();
            }

            long localTime = this.getCounterTotalTime().getValue() - totalChildTime;
            // Counters have some margin, set to 0 if it was negative.
            localTime = Math.max(0, localTime);
            this.localTimePercent = Double.valueOf(localTime) / Double.valueOf(total);
            this.localTimePercent = Math.min(1.0, this.localTimePercent) * 100;

            // Recurse on children
            for (int i = 0; i < this.childList.size(); i++) {
                childList.get(i).first.computeTimeInProfile(total);
            }
        } finally {
            childLock.readLock().unlock();
        }
    }

    // from bigger to smaller
    public void sortChildren() {
        childLock.writeLock().lock();
        try {
            this.childList.sort((profile1, profile2) -> Long.compare(profile2.first.getCounterTotalTime().getValue(),
                    profile1.first.getCounterTotalTime().getValue()));
        } catch (IllegalArgumentException e) {
            // This exception may be thrown if the counter total time of the child is
            // updated in the update method
            // during the sorting process. This sorting only affects the profile instance
            // display order, so this
            // exception is temporarily ignored here.
            if (LOG.isDebugEnabled()) {
                LOG.debug("sort child list error: ", e);
            }
        } finally {
            childLock.writeLock().unlock();
        }
    }

    public void addInfoString(String key, String value) {
        infoStringsLock.writeLock().lock();
        try {
            String target = this.infoStrings.get(key);
            this.infoStrings.put(key, value);
            if (target == null) {
                this.infoStringsDisplayOrder.add(key);
            }
        } finally {
            infoStringsLock.writeLock().unlock();
        }
    }

    // Returns the value to which the specified key is mapped;
    // or null if this map contains no mapping for the key.
    public String getInfoString(String key) {
        return infoStrings.getOrDefault(key, "");
    }

    public Map<String, String> getInfoStrings() {
        return infoStrings;
    }

    public void write(DataOutput output) throws IOException {
        Text.writeString(output, GsonUtils.GSON.toJson(this));
    }

    private TPlanNodeRuntimeStatsItem toTPlanNodeRuntimeStatsItem() {
        TPlanNodeRuntimeStatsItem item = new TPlanNodeRuntimeStatsItem();
        item.setNodeId(this.nodeId());
        boolean isBuildSinkOperator = this.getName().startsWith("HASH_JOIN_SINK_OPERATOR");
        for (Map.Entry<String, Counter> entry : this.counterMap.entrySet()) {
            if (entry.getValue() instanceof AggCounter) {
                AggCounter value = (AggCounter) entry.getValue();
                String key = entry.getKey();
                item.setInstanceNum(value.number);
                switch (key) {
                    case "RowsProduced":
                        item.setOutputRows(value.sum.getValue());
                        break;
                    case "InputRows":
                        item.setInputRows(value.sum.getValue());
                        if (isBuildSinkOperator) {
                            item.setJoinBuilderRows(value.sum.getValue());
                            if (value.sum.getValue() == 0 || value.sum.getValue() / value.number == 0) {
                                // builder side may be empty, set 1 directly for passing skew checking
                                item.setJoinBuilderSkewRatio(1);
                            } else {
                                long avgBuildValue = value.sum.getValue() / value.number;
                                item.setJoinBuilderSkewRatio((int) (value.max.getValue() / avgBuildValue));
                            }
                        }
                        break;
                    case "ProbeRows":
                        item.setJoinProbeRows(value.sum.getValue());
                        if (value.sum.getValue() == 0 || value.sum.getValue() / value.number == 0) {
                            // probe side may be empty, set 1 directly for passing skew checking
                            item.setJoinProberSkewRatio(1);
                        } else {
                            long avgProbeValue = value.sum.getValue() / value.number;
                            item.setJoinProberSkewRatio((int) (value.max.getValue() / avgProbeValue));
                        }
                        break;
                    default:
                        break;
                }
            }
        }

        // Handle runtime profile structure
        /*
- RuntimeFilterInfo: sum 0, avg 0, max 0, min 0
- RF0 FilterRows: sum 1, avg 0, max 1, min 0
- RF0 InputRows: sum 390.144K (390144), avg 8.128K (8128), max 8.128K (8128), min 8.128K (8128)
- RF1 FilterRows: sum 9.349313M (9349313), avg 194.777K (194777), max 196.512K (196512), min 192.736K (192736)
- RF1 InputRows: sum 11.692823M (11692823), avg 243.6K (243600), max 245.822K (245822), min 240.939K (240939)
- RF2 FilterRows: sum 0, avg 0, max 0, min 0
- RF2 InputRows: sum 390.143K (390143), avg 8.127K (8127), max 8.128K (8128), min 8.127K (8127)
- RF3 FilterRows: sum 0, avg 0, max 0, min 0
- RF3 InputRows: sum 0, avg 0, max 0, min 0
- RF4 FilterRows: sum 0, avg 0, max 0, min 0
- RF4 InputRows: sum 0, avg 0, max 0, min 0
- RF5 FilterRows: sum 86.632484M (86632484), avg 1.804843M (1804843), max 1.809673M (1809673), min 1.7993M (1799300)
- RF5 InputRows: sum 144.419127M (144419127), avg 3.008731M (3008731), max 3.015499M (3015499), min 3.001376M (3001376)
- RF6 FilterRows: sum 0, avg 0, max 0, min 0
- RF6 InputRows: sum 361.696K (361696), avg 7.535K (7535), max 16.256K (16256), min 0
- RF7 FilterRows: sum 46.09382M (46093820), avg 960.287K (960287), max 962.989K (962989), min 956.812K (956812)
- RF7 InputRows: sum 57.786643M (57786643), avg 1.203888M (1203888), max 1.206715M (1206715), min 1.199902M (1199902)
         */
        for (Entry<String, TreeSet<String>> entry : this.childCounterMap.entrySet()) {
            if (entry.getKey().equals("RuntimeFilterInfo")) {
                AggCounter filteredAggCounter = new AggCounter(TUnit.UNIT);
                AggCounter inputAggCounter = new AggCounter(TUnit.UNIT);
                Set<String> childCounterSet = entry.getValue();
                for (String childCounterName : childCounterSet) {
                    Counter rhsAggCounter = this.counterMap.get(childCounterName);
                    if (rhsAggCounter == null) {
                        LOG.warn("Should not happen, childCounter {} not found.", childCounterName);
                        throw new RuntimeException("Invalid profile");
                    }
                    // Only merged profile will call toTPlanNodeRuntimeStatsItem()
                    if (!(rhsAggCounter instanceof AggCounter)) {
                        LOG.warn("Should not happen, invalid counter type, counter name: {}", childCounterName);
                        throw new RuntimeException("Invalid profile");
                    }

                    if (childCounterName.endsWith("FilterRows")) {
                        filteredAggCounter.mergeCounter((AggCounter) rhsAggCounter);
                    } else if (childCounterName.endsWith("InputRows")) {
                        inputAggCounter.mergeCounter((AggCounter) rhsAggCounter);
                    } else {
                        LOG.warn("Should not happen, childCounterName: " + childCounterName);
                    }
                }
                item.setRuntimeFilterInputRows(inputAggCounter.sum.getValue());
                item.setRuntimeFilterRows(filteredAggCounter.sum.getValue());
            }
        }
        return item;
    }

    public static List<TPlanNodeRuntimeStatsItem> toTPlanNodeRuntimeStatsItem(RuntimeProfile profile,
            List<TPlanNodeRuntimeStatsItem> itemsFromParent) {
        if (itemsFromParent == null) {
            itemsFromParent = new ArrayList<>();
        }

        // Recurse on children
        profile.childLock.readLock().lock();
        try {
            for (int i = 0; i < profile.childList.size(); i++) {
                List<TPlanNodeRuntimeStatsItem> childItems = new ArrayList<>();
                itemsFromParent.addAll(toTPlanNodeRuntimeStatsItem(profile.childList.get(i).first, childItems));
            }
        } finally {
            profile.childLock.readLock().unlock();
        }

        String name = profile.getName();
        if (!name.contains("_OPERATOR")) {
            return itemsFromParent;
        } else {
            int index = name.indexOf('(');
            name = name.substring(0, index).trim();
        }

        // Check if current node is valid for processing
        if (!PLAN_NODE_TYPE_MAP.contains(name)) {
            return itemsFromParent; // Skip current node
        }
        // Add stats for current node
        itemsFromParent.add(profile.toTPlanNodeRuntimeStatsItem());

        if (LOG.isDebugEnabled()) {
            List<TPlanNodeRuntimeStatsItem> currentItem = new ArrayList<TPlanNodeRuntimeStatsItem>();
            currentItem.add(profile.toTPlanNodeRuntimeStatsItem());
            LOG.debug("Current node {}({}) hbo items\n{},\nparent\n{}",
                    profile.getName(), profile.nodeid,
                    DebugUtil.prettyPrintPlanNodeRuntimeStatsItems(currentItem),
                    DebugUtil.prettyPrintPlanNodeRuntimeStatsItems(itemsFromParent));
        }

        return itemsFromParent;
    }

    public static List<TPlanNodeRuntimeStatsItem> mergeTPlanNodeRuntimeStatsItem(
            List<TPlanNodeRuntimeStatsItem> items) {
        Map<Integer, TPlanNodeRuntimeStatsItem> itemMap = new HashMap<>();
        for (TPlanNodeRuntimeStatsItem item : items) {
            int nodeId = item.getNodeId();
            if (itemMap.containsKey(nodeId)) {
                TPlanNodeRuntimeStatsItem oldItem = itemMap.get(nodeId);
                oldItem.setInputRows(oldItem.getInputRows() + item.getInputRows());
                oldItem.setOutputRows(oldItem.getOutputRows() + item.getOutputRows());
                oldItem.setJoinProbeRows(oldItem.getJoinProbeRows() + item.getJoinProbeRows());
                oldItem.setJoinProberSkewRatio(oldItem.getJoinProberSkewRatio() + item.getJoinProberSkewRatio());
                oldItem.setJoinBuilderRows(oldItem.getJoinBuilderRows() + item.getJoinBuilderRows());
                oldItem.setJoinBuilderSkewRatio(oldItem.getJoinBuilderSkewRatio() + item.getJoinBuilderSkewRatio());
                oldItem.setInstanceNum(oldItem.getInstanceNum() + item.getInstanceNum());
            } else {
                itemMap.put(nodeId, item);
            }
        }
        return new ArrayList<>(itemMap.values());
    }
}