Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/util/runtime_profile.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/runtime-profile.cc
19
// and modified by Doris
20
21
#include "util/runtime_profile.h"
22
23
#include <gen_cpp/RuntimeProfile_types.h>
24
#include <opentelemetry/nostd/shared_ptr.h>
25
#include <opentelemetry/trace/span.h>
26
#include <opentelemetry/trace/tracer.h>
27
#include <rapidjson/encodings.h>
28
#include <rapidjson/stringbuffer.h>
29
#include <rapidjson/writer.h>
30
31
#include <iomanip>
32
#include <iostream>
33
34
#include "common/object_pool.h"
35
#include "util/container_util.hpp"
36
37
namespace doris {
38
39
// Thread counters name
40
static const std::string THREAD_TOTAL_TIME = "TotalWallClockTime";
41
static const std::string THREAD_VOLUNTARY_CONTEXT_SWITCHES = "VoluntaryContextSwitches";
42
static const std::string THREAD_INVOLUNTARY_CONTEXT_SWITCHES = "InvoluntaryContextSwitches";
43
44
static const std::string SPAN_ATTRIBUTE_KEY_SEPARATOR = "-";
45
46
// The root counter name for all top level counters.
47
static const std::string ROOT_COUNTER;
48
49
RuntimeProfile::RuntimeProfile(const std::string& name, bool is_averaged_profile)
50
        : _pool(new ObjectPool()),
51
          _name(name),
52
          _metadata(-1),
53
          _timestamp(-1),
54
          _is_averaged_profile(is_averaged_profile),
55
          _counter_total_time(TUnit::TIME_NS, 0),
56
1.51k
          _local_time_percent(0) {
57
1.51k
    _counter_map["TotalTime"] = &_counter_total_time;
58
1.51k
}
59
60
1.51k
RuntimeProfile::~RuntimeProfile() = default;
61
62
0
void RuntimeProfile::merge(RuntimeProfile* other) {
63
0
    DCHECK(other != nullptr);
64
65
    // Merge this level
66
0
    {
67
0
        CounterMap::iterator dst_iter;
68
0
        CounterMap::const_iterator src_iter;
69
0
        std::lock_guard<std::mutex> l(_counter_map_lock);
70
0
        std::lock_guard<std::mutex> m(other->_counter_map_lock);
71
72
0
        for (src_iter = other->_counter_map.begin(); src_iter != other->_counter_map.end();
73
0
             ++src_iter) {
74
0
            dst_iter = _counter_map.find(src_iter->first);
75
76
0
            if (dst_iter == _counter_map.end()) {
77
0
                _counter_map[src_iter->first] = _pool->add(
78
0
                        new Counter(src_iter->second->type(), src_iter->second->value()));
79
0
            } else {
80
0
                DCHECK(dst_iter->second->type() == src_iter->second->type());
81
82
0
                if (dst_iter->second->type() == TUnit::DOUBLE_VALUE) {
83
0
                    double new_val =
84
0
                            dst_iter->second->double_value() + src_iter->second->double_value();
85
0
                    dst_iter->second->set(new_val);
86
0
                } else {
87
0
                    dst_iter->second->update(src_iter->second->value());
88
0
                }
89
0
            }
90
0
        }
91
92
0
        ChildCounterMap::const_iterator child_counter_src_itr;
93
94
0
        for (child_counter_src_itr = other->_child_counter_map.begin();
95
0
             child_counter_src_itr != other->_child_counter_map.end(); ++child_counter_src_itr) {
96
0
            std::set<std::string>* child_counters = find_or_insert(
97
0
                    &_child_counter_map, child_counter_src_itr->first, std::set<std::string>());
98
0
            child_counters->insert(child_counter_src_itr->second.begin(),
99
0
                                   child_counter_src_itr->second.end());
100
0
        }
101
0
    }
102
103
0
    {
104
0
        std::lock_guard<std::mutex> l(_children_lock);
105
0
        std::lock_guard<std::mutex> m(other->_children_lock);
106
107
        // Recursively merge children with matching names
108
0
        for (int i = 0; i < other->_children.size(); ++i) {
109
0
            RuntimeProfile* other_child = other->_children[i].first;
110
0
            ChildMap::iterator j = _child_map.find(other_child->_name);
111
0
            RuntimeProfile* child = nullptr;
112
113
0
            if (j != _child_map.end()) {
114
0
                child = j->second;
115
0
            } else {
116
0
                child = _pool->add(new RuntimeProfile(other_child->_name));
117
0
                child->_local_time_percent = other_child->_local_time_percent;
118
0
                child->_metadata = other_child->_metadata;
119
0
                child->_timestamp = other_child->_timestamp;
120
0
                bool indent_other_child = other->_children[i].second;
121
0
                _child_map[child->_name] = child;
122
0
                _children.push_back(std::make_pair(child, indent_other_child));
123
0
            }
124
125
0
            child->merge(other_child);
126
0
        }
127
0
    }
128
0
}
129
130
0
void RuntimeProfile::update(const TRuntimeProfileTree& thrift_profile) {
131
0
    int idx = 0;
132
0
    update(thrift_profile.nodes, &idx);
133
0
    DCHECK_EQ(idx, thrift_profile.nodes.size());
134
0
}
135
136
0
void RuntimeProfile::update(const std::vector<TRuntimeProfileNode>& nodes, int* idx) {
137
0
    DCHECK_LT(*idx, nodes.size());
138
0
    const TRuntimeProfileNode& node = nodes[*idx];
139
0
    {
140
0
        std::lock_guard<std::mutex> l(_counter_map_lock);
141
        // update this level
142
0
        std::map<std::string, Counter*>::iterator dst_iter;
143
144
0
        for (int i = 0; i < node.counters.size(); ++i) {
145
0
            const TCounter& tcounter = node.counters[i];
146
0
            CounterMap::iterator j = _counter_map.find(tcounter.name);
147
148
0
            if (j == _counter_map.end()) {
149
0
                _counter_map[tcounter.name] =
150
0
                        _pool->add(new Counter(tcounter.type, tcounter.value));
151
0
            } else {
152
0
                if (j->second->type() != tcounter.type) {
153
0
                    LOG(ERROR) << "Cannot update counters with the same name (" << j->first
154
0
                               << ") but different types.";
155
0
                } else {
156
0
                    j->second->set(tcounter.value);
157
0
                }
158
0
            }
159
0
        }
160
161
0
        ChildCounterMap::const_iterator child_counter_src_itr;
162
163
0
        for (child_counter_src_itr = node.child_counters_map.begin();
164
0
             child_counter_src_itr != node.child_counters_map.end(); ++child_counter_src_itr) {
165
0
            std::set<std::string>* child_counters = find_or_insert(
166
0
                    &_child_counter_map, child_counter_src_itr->first, std::set<std::string>());
167
0
            child_counters->insert(child_counter_src_itr->second.begin(),
168
0
                                   child_counter_src_itr->second.end());
169
0
        }
170
0
    }
171
172
0
    {
173
0
        std::lock_guard<std::mutex> l(_info_strings_lock);
174
0
        const InfoStrings& info_strings = node.info_strings;
175
0
        for (const std::string& key : node.info_strings_display_order) {
176
            // Look for existing info strings and update in place. If there
177
            // are new strings, add them to the end of the display order.
178
            // TODO: Is nodes.info_strings always a superset of
179
            // _info_strings? If so, can just copy the display order.
180
0
            InfoStrings::const_iterator it = info_strings.find(key);
181
0
            DCHECK(it != info_strings.end());
182
0
            InfoStrings::iterator existing = _info_strings.find(key);
183
184
0
            if (existing == _info_strings.end()) {
185
0
                _info_strings.insert(std::make_pair(key, it->second));
186
0
                _info_strings_display_order.push_back(key);
187
0
            } else {
188
0
                _info_strings[key] = it->second;
189
0
            }
190
0
        }
191
0
    }
192
193
0
    ++*idx;
194
0
    {
195
0
        std::lock_guard<std::mutex> l(_children_lock);
196
197
        // update children with matching names; create new ones if they don't match
198
0
        for (int i = 0; i < node.num_children; ++i) {
199
0
            const TRuntimeProfileNode& tchild = nodes[*idx];
200
0
            ChildMap::iterator j = _child_map.find(tchild.name);
201
0
            RuntimeProfile* child = nullptr;
202
203
0
            if (j != _child_map.end()) {
204
0
                child = j->second;
205
0
            } else {
206
0
                child = _pool->add(new RuntimeProfile(tchild.name));
207
0
                child->_metadata = tchild.metadata;
208
0
                child->_timestamp = tchild.timestamp;
209
0
                _child_map[tchild.name] = child;
210
0
                _children.push_back(std::make_pair(child, tchild.indent));
211
0
            }
212
213
0
            child->update(nodes, idx);
214
0
        }
215
0
    }
216
0
}
217
218
0
void RuntimeProfile::divide(int n) {
219
0
    DCHECK_GT(n, 0);
220
0
    std::map<std::string, Counter*>::iterator iter;
221
0
    {
222
0
        std::lock_guard<std::mutex> l(_counter_map_lock);
223
224
0
        for (iter = _counter_map.begin(); iter != _counter_map.end(); ++iter) {
225
0
            if (iter->second->type() == TUnit::DOUBLE_VALUE) {
226
0
                iter->second->set(iter->second->double_value() / n);
227
0
            } else {
228
0
                int64_t value = iter->second->_value.load();
229
0
                value = value / n;
230
0
                iter->second->_value.store(value);
231
0
            }
232
0
        }
233
0
    }
234
0
    {
235
0
        std::lock_guard<std::mutex> l(_children_lock);
236
237
0
        for (ChildMap::iterator i = _child_map.begin(); i != _child_map.end(); ++i) {
238
0
            i->second->divide(n);
239
0
        }
240
0
    }
241
0
}
242
243
0
void RuntimeProfile::clear_children() {
244
0
    std::lock_guard<std::mutex> l(_children_lock);
245
0
    _children.clear();
246
0
}
247
248
0
void RuntimeProfile::compute_time_in_profile() {
249
0
    compute_time_in_profile(total_time_counter()->value());
250
0
}
251
252
0
void RuntimeProfile::compute_time_in_profile(int64_t total) {
253
0
    if (total == 0) {
254
0
        return;
255
0
    }
256
257
    // Add all the total times in all the children
258
0
    int64_t total_child_time = 0;
259
0
    std::lock_guard<std::mutex> l(_children_lock);
260
261
0
    for (int i = 0; i < _children.size(); ++i) {
262
0
        total_child_time += _children[i].first->total_time_counter()->value();
263
0
    }
264
265
0
    int64_t local_time = total_time_counter()->value() - total_child_time;
266
    // Counters have some margin, set to 0 if it was negative.
267
0
    local_time = std::max<int64_t>(0L, local_time);
268
0
    _local_time_percent = static_cast<double>(local_time) / total;
269
0
    _local_time_percent = std::min(1.0, _local_time_percent) * 100;
270
271
    // Recurse on children
272
0
    for (int i = 0; i < _children.size(); ++i) {
273
0
        _children[i].first->compute_time_in_profile(total);
274
0
    }
275
0
}
276
277
8
RuntimeProfile* RuntimeProfile::create_child(const std::string& name, bool indent, bool prepend) {
278
8
    std::lock_guard<std::mutex> l(_children_lock);
279
8
    DCHECK(_child_map.find(name) == _child_map.end());
280
8
    RuntimeProfile* child = _pool->add(new RuntimeProfile(name));
281
8
    if (_children.empty()) {
282
8
        add_child_unlock(child, indent, nullptr);
283
8
    } else {
284
0
        ChildVector::iterator pos = prepend ? _children.begin() : _children.end();
285
0
        add_child_unlock(child, indent, (*pos).first);
286
0
    }
287
8
    return child;
288
8
}
289
290
0
void RuntimeProfile::insert_child_head(doris::RuntimeProfile* child, bool indent) {
291
0
    std::lock_guard<std::mutex> l(_children_lock);
292
0
    DCHECK(child != nullptr);
293
0
    _child_map[child->_name] = child;
294
295
0
    auto it = _children.begin();
296
0
    _children.insert(it, std::make_pair(child, indent));
297
0
}
298
299
8
void RuntimeProfile::add_child_unlock(RuntimeProfile* child, bool indent, RuntimeProfile* loc) {
300
8
    DCHECK(child != nullptr);
301
8
    _child_map[child->_name] = child;
302
303
8
    if (loc == nullptr) {
304
8
        _children.push_back(std::make_pair(child, indent));
305
8
    } else {
306
0
        for (ChildVector::iterator it = _children.begin(); it != _children.end(); ++it) {
307
0
            if (it->first == loc) {
308
0
                _children.insert(it, std::make_pair(child, indent));
309
0
                return;
310
0
            }
311
0
        }
312
0
        DCHECK(false) << "Invalid loc";
313
0
    }
314
8
}
315
316
0
void RuntimeProfile::add_child(RuntimeProfile* child, bool indent, RuntimeProfile* loc) {
317
0
    std::lock_guard<std::mutex> l(_children_lock);
318
0
    add_child_unlock(child, indent, loc);
319
0
}
320
321
0
void RuntimeProfile::get_children(std::vector<RuntimeProfile*>* children) {
322
0
    children->clear();
323
0
    std::lock_guard<std::mutex> l(_children_lock);
324
325
0
    for (ChildMap::iterator i = _child_map.begin(); i != _child_map.end(); ++i) {
326
0
        children->push_back(i->second);
327
0
    }
328
0
}
329
330
0
void RuntimeProfile::get_all_children(std::vector<RuntimeProfile*>* children) {
331
0
    std::lock_guard<std::mutex> l(_children_lock);
332
333
0
    for (ChildMap::iterator i = _child_map.begin(); i != _child_map.end(); ++i) {
334
0
        children->push_back(i->second);
335
0
        i->second->get_all_children(children);
336
0
    }
337
0
}
338
339
0
void RuntimeProfile::add_info_string(const std::string& key, const std::string& value) {
340
0
    std::lock_guard<std::mutex> l(_info_strings_lock);
341
0
    InfoStrings::iterator it = _info_strings.find(key);
342
343
0
    if (it == _info_strings.end()) {
344
0
        _info_strings.insert(std::make_pair(key, value));
345
0
        _info_strings_display_order.push_back(key);
346
0
    } else {
347
0
        it->second = value;
348
0
    }
349
0
}
350
351
0
const std::string* RuntimeProfile::get_info_string(const std::string& key) {
352
0
    std::lock_guard<std::mutex> l(_info_strings_lock);
353
0
    InfoStrings::const_iterator it = _info_strings.find(key);
354
355
0
    if (it == _info_strings.end()) {
356
0
        return nullptr;
357
0
    }
358
359
0
    return &it->second;
360
0
}
361
362
#define ADD_COUNTER_IMPL(NAME, T)                                                                  \
363
    RuntimeProfile::T* RuntimeProfile::NAME(const std::string& name, TUnit::type unit,             \
364
3
                                            const std::string& parent_counter_name) {              \
365
3
        DCHECK_EQ(_is_averaged_profile, false);                                                    \
366
3
        std::lock_guard<std::mutex> l(_counter_map_lock);                                          \
367
3
        if (_counter_map.find(name) != _counter_map.end()) {                                       \
368
0
            return reinterpret_cast<T*>(_counter_map[name]);                                       \
369
0
        }                                                                                          \
370
3
        DCHECK(parent_counter_name == ROOT_COUNTER ||                                              \
371
3
               _counter_map.find(parent_counter_name) != _counter_map.end());                      \
372
3
        T* counter = _pool->add(new T(unit));                                                      \
373
3
        _counter_map[name] = counter;                                                              \
374
3
        std::set<std::string>* child_counters =                                                    \
375
3
                find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>()); \
376
3
        child_counters->insert(name);                                                              \
377
3
        return counter;                                                                            \
378
3
    }
379
380
//ADD_COUNTER_IMPL(AddCounter, Counter);
381
ADD_COUNTER_IMPL(AddHighWaterMarkCounter, HighWaterMarkCounter);
382
//ADD_COUNTER_IMPL(AddConcurrentTimerCounter, ConcurrentTimerCounter);
383
384
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> RuntimeProfile::AddSharedHighWaterMarkCounter(
385
0
        const std::string& name, TUnit::type unit, const std::string& parent_counter_name) {
386
0
    DCHECK_EQ(_is_averaged_profile, false);
387
0
    std::lock_guard<std::mutex> l(_counter_map_lock);
388
0
    if (_shared_counter_pool.find(name) != _shared_counter_pool.end()) {
389
0
        return _shared_counter_pool[name];
390
0
    }
391
0
    DCHECK(parent_counter_name == ROOT_COUNTER ||
392
0
           _counter_map.find(parent_counter_name) != _counter_map.end());
393
0
    std::shared_ptr<HighWaterMarkCounter> counter = std::make_shared<HighWaterMarkCounter>(unit);
394
0
    _shared_counter_pool[name] = counter;
395
396
0
    DCHECK(_counter_map.find(name) == _counter_map.end())
397
0
            << "already has a raw counter named " << name;
398
399
    // it's OK to insert shared counter to _counter_map, cuz _counter_map is not the owner of counters
400
0
    _counter_map[name] = counter.get();
401
0
    std::set<std::string>* child_counters =
402
0
            find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>());
403
0
    child_counters->insert(name);
404
0
    return counter;
405
0
}
406
407
RuntimeProfile::Counter* RuntimeProfile::add_counter(const std::string& name, TUnit::type type,
408
1.07k
                                                     const std::string& parent_counter_name) {
409
1.07k
    std::lock_guard<std::mutex> l(_counter_map_lock);
410
411
    // TODO(yingchun): Can we ensure that 'name' is not exist in '_counter_map'? Use CHECK instead?
412
1.07k
    if (_counter_map.find(name) != _counter_map.end()) {
413
        // TODO: should we make sure that we don't return existing derived counters?
414
22
        return _counter_map[name];
415
22
    }
416
417
1.05k
    DCHECK(parent_counter_name == ROOT_COUNTER ||
418
1.05k
           _counter_map.find(parent_counter_name) != _counter_map.end());
419
1.05k
    Counter* counter = _pool->add(new Counter(type, 0));
420
1.05k
    _counter_map[name] = counter;
421
1.05k
    std::set<std::string>* child_counters =
422
1.05k
            find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>());
423
1.05k
    child_counters->insert(name);
424
1.05k
    return counter;
425
1.07k
}
426
427
RuntimeProfile::DerivedCounter* RuntimeProfile::add_derived_counter(
428
        const std::string& name, TUnit::type type, const DerivedCounterFunction& counter_fn,
429
1
        const std::string& parent_counter_name) {
430
1
    std::lock_guard<std::mutex> l(_counter_map_lock);
431
432
1
    if (_counter_map.find(name) != _counter_map.end()) {
433
0
        return nullptr;
434
0
    }
435
436
1
    DerivedCounter* counter = _pool->add(new DerivedCounter(type, counter_fn));
437
1
    _counter_map[name] = counter;
438
1
    std::set<std::string>* child_counters =
439
1
            find_or_insert(&_child_counter_map, parent_counter_name, std::set<std::string>());
440
1
    child_counters->insert(name);
441
1
    return counter;
442
1
}
443
444
110
RuntimeProfile::Counter* RuntimeProfile::get_counter(const std::string& name) {
445
110
    std::lock_guard<std::mutex> l(_counter_map_lock);
446
447
110
    if (_counter_map.find(name) != _counter_map.end()) {
448
108
        return _counter_map[name];
449
108
    }
450
451
2
    return nullptr;
452
110
}
453
454
0
void RuntimeProfile::get_counters(const std::string& name, std::vector<Counter*>* counters) {
455
0
    Counter* c = get_counter(name);
456
457
0
    if (c != nullptr) {
458
0
        counters->push_back(c);
459
0
    }
460
461
0
    std::lock_guard<std::mutex> l(_children_lock);
462
463
0
    for (int i = 0; i < _children.size(); ++i) {
464
0
        _children[i].first->get_counters(name, counters);
465
0
    }
466
0
}
467
468
// Print the profile:
469
//  1. Profile Name
470
//  2. Info Strings
471
//  3. Counters
472
//  4. Children
473
0
void RuntimeProfile::pretty_print(std::ostream* s, const std::string& prefix) const {
474
0
    std::ostream& stream = *s;
475
476
    // create copy of _counter_map and _child_counter_map so we don't need to hold lock
477
    // while we call value() on the counters
478
0
    CounterMap counter_map;
479
0
    ChildCounterMap child_counter_map;
480
0
    {
481
0
        std::lock_guard<std::mutex> l(_counter_map_lock);
482
0
        counter_map = _counter_map;
483
0
        child_counter_map = _child_counter_map;
484
0
    }
485
486
0
    std::map<std::string, Counter*>::const_iterator total_time = counter_map.find("TotalTime");
487
0
    DCHECK(total_time != counter_map.end());
488
489
0
    stream.flags(std::ios::fixed);
490
0
    stream << prefix << _name << ":";
491
492
0
    if (total_time->second->value() != 0) {
493
0
        stream << "(Active: "
494
0
               << PrettyPrinter::print(total_time->second->value(), total_time->second->type())
495
0
               << ", non-child: " << std::setprecision(2) << _local_time_percent << "%)";
496
0
    }
497
498
0
    stream << std::endl;
499
500
0
    {
501
0
        std::lock_guard<std::mutex> l(_info_strings_lock);
502
0
        for (const std::string& key : _info_strings_display_order) {
503
0
            stream << prefix << "   - " << key << ": " << _info_strings.find(key)->second
504
0
                   << std::endl;
505
0
        }
506
0
    }
507
508
0
    {
509
        // Print all the event timers as the following:
510
        // <EventKey> Timeline: 2s719ms
511
        //     - Event 1: 6.522us (6.522us)
512
        //     - Event 2: 2s288ms (2s288ms)
513
        //     - Event 3: 2s410ms (121.138ms)
514
        // The times in parentheses are the time elapsed since the last event.
515
0
        std::lock_guard<std::mutex> l(_event_sequences_lock);
516
0
        for (const EventSequenceMap::value_type& event_sequence : _event_sequence_map) {
517
0
            stream << prefix << "  " << event_sequence.first << ": "
518
0
                   << PrettyPrinter::print(event_sequence.second->elapsed_time(), TUnit::TIME_NS)
519
0
                   << std::endl;
520
521
0
            int64_t last = 0L;
522
0
            for (const EventSequence::Event& event : event_sequence.second->events()) {
523
0
                stream << prefix << "     - " << event.first << ": "
524
0
                       << PrettyPrinter::print(event.second, TUnit::TIME_NS) << " ("
525
0
                       << PrettyPrinter::print(event.second - last, TUnit::TIME_NS) << ")"
526
0
                       << std::endl;
527
0
                last = event.second;
528
0
            }
529
0
        }
530
0
    }
531
532
0
    RuntimeProfile::print_child_counters(prefix, ROOT_COUNTER, counter_map, child_counter_map, s);
533
534
    // create copy of _children so we don't need to hold lock while we call
535
    // pretty_print() on the children
536
0
    ChildVector children;
537
0
    {
538
0
        std::lock_guard<std::mutex> l(_children_lock);
539
0
        children = _children;
540
0
    }
541
542
0
    for (int i = 0; i < children.size(); ++i) {
543
0
        RuntimeProfile* profile = children[i].first;
544
0
        bool indent = children[i].second;
545
0
        profile->pretty_print(s, prefix + (indent ? "  " : ""));
546
0
    }
547
0
}
548
549
6
void RuntimeProfile::add_to_span(OpentelemetrySpan span) {
550
6
    if (!span || !span->IsRecording() || _added_to_span) {
551
6
        return;
552
6
    }
553
0
    _added_to_span = true;
554
555
0
    CounterMap counter_map;
556
0
    ChildCounterMap child_counter_map;
557
0
    {
558
0
        std::lock_guard<std::mutex> l(_counter_map_lock);
559
0
        counter_map = _counter_map;
560
0
        child_counter_map = _child_counter_map;
561
0
    }
562
563
0
    auto total_time = counter_map.find("TotalTime");
564
0
    DCHECK(total_time != counter_map.end());
565
566
    // profile name like "VDataBufferSender  (dst_fragment_instance_id=-2608c96868f3b77d--713968f450bfbe0d):"
567
    // to "VDataBufferSender"
568
0
    auto i = _name.find_first_of("(: ");
569
0
    auto short_name = _name.substr(0, i);
570
0
    span->SetAttribute(short_name + SPAN_ATTRIBUTE_KEY_SEPARATOR + "TotalTime",
571
0
                       print_counter(total_time->second));
572
573
0
    {
574
0
        std::lock_guard<std::mutex> l(_info_strings_lock);
575
0
        for (const std::string& key : _info_strings_display_order) {
576
            // nlohmann json will core dump when serializing 'KeyRanges', here temporarily skip it.
577
0
            if (key.compare("KeyRanges") == 0) {
578
0
                continue;
579
0
            }
580
0
            span->SetAttribute(short_name + SPAN_ATTRIBUTE_KEY_SEPARATOR + key,
581
0
                               _info_strings.find(key)->second);
582
0
        }
583
0
    }
584
585
0
    RuntimeProfile::add_child_counters_to_span(span, short_name, ROOT_COUNTER, counter_map,
586
0
                                               child_counter_map);
587
588
0
    ChildVector children;
589
0
    {
590
0
        std::lock_guard<std::mutex> l(_children_lock);
591
0
        children = _children;
592
0
    }
593
594
0
    for (auto& [profile, flag] : children) {
595
0
        profile->add_to_span(span);
596
0
    }
597
0
}
598
599
void RuntimeProfile::add_child_counters_to_span(OpentelemetrySpan span,
600
                                                const std::string& profile_name,
601
                                                const std::string& counter_name,
602
                                                const CounterMap& counter_map,
603
0
                                                const ChildCounterMap& child_counter_map) {
604
0
    ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
605
606
0
    if (itr != child_counter_map.end()) {
607
0
        const std::set<std::string>& child_counters = itr->second;
608
0
        for (const std::string& child_counter : child_counters) {
609
0
            CounterMap::const_iterator iter = counter_map.find(child_counter);
610
0
            DCHECK(iter != counter_map.end());
611
0
            span->SetAttribute(profile_name + SPAN_ATTRIBUTE_KEY_SEPARATOR + iter->first,
612
0
                               print_counter(iter->second));
613
0
            RuntimeProfile::add_child_counters_to_span(span, profile_name, child_counter,
614
0
                                                       counter_map, child_counter_map);
615
0
        }
616
0
    }
617
0
}
618
619
0
void RuntimeProfile::to_thrift(TRuntimeProfileTree* tree) {
620
0
    tree->nodes.clear();
621
0
    to_thrift(&tree->nodes);
622
0
}
623
624
0
void RuntimeProfile::to_thrift(std::vector<TRuntimeProfileNode>* nodes) {
625
0
    nodes->reserve(nodes->size() + _children.size());
626
627
0
    int index = nodes->size();
628
0
    nodes->push_back(TRuntimeProfileNode());
629
0
    TRuntimeProfileNode& node = (*nodes)[index];
630
0
    node.name = _name;
631
0
    node.metadata = _metadata;
632
0
    node.timestamp = _timestamp;
633
0
    node.indent = true;
634
635
0
    CounterMap counter_map;
636
0
    {
637
0
        std::lock_guard<std::mutex> l(_counter_map_lock);
638
0
        counter_map = _counter_map;
639
0
        node.child_counters_map = _child_counter_map;
640
0
    }
641
642
0
    for (std::map<std::string, Counter*>::const_iterator iter = counter_map.begin();
643
0
         iter != counter_map.end(); ++iter) {
644
0
        TCounter counter;
645
0
        counter.name = iter->first;
646
0
        counter.value = iter->second->value();
647
0
        counter.type = iter->second->type();
648
0
        node.counters.push_back(counter);
649
0
    }
650
651
0
    {
652
0
        std::lock_guard<std::mutex> l(_info_strings_lock);
653
0
        node.info_strings = _info_strings;
654
0
        node.info_strings_display_order = _info_strings_display_order;
655
0
    }
656
657
0
    ChildVector children;
658
0
    {
659
0
        std::lock_guard<std::mutex> l(_children_lock);
660
0
        children = _children;
661
0
    }
662
0
    node.num_children = children.size();
663
664
0
    for (int i = 0; i < children.size(); ++i) {
665
0
        int child_idx = nodes->size();
666
0
        children[i].first->to_thrift(nodes);
667
        // fix up indentation flag
668
0
        (*nodes)[child_idx].indent = children[i].second;
669
0
    }
670
0
}
671
672
int64_t RuntimeProfile::units_per_second(const RuntimeProfile::Counter* total_counter,
673
0
                                         const RuntimeProfile::Counter* timer) {
674
0
    DCHECK(total_counter->type() == TUnit::BYTES || total_counter->type() == TUnit::UNIT);
675
0
    DCHECK(timer->type() == TUnit::TIME_NS);
676
677
0
    if (timer->value() == 0) {
678
0
        return 0;
679
0
    }
680
681
0
    double secs = static_cast<double>(timer->value()) / 1000.0 / 1000.0 / 1000.0;
682
0
    return total_counter->value() / secs;
683
0
}
684
685
0
int64_t RuntimeProfile::counter_sum(const std::vector<Counter*>* counters) {
686
0
    int64_t value = 0;
687
688
0
    for (int i = 0; i < counters->size(); ++i) {
689
0
        value += (*counters)[i]->value();
690
0
    }
691
692
0
    return value;
693
0
}
694
695
RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& name,
696
0
                                                          Counter* src_counter) {
697
0
    TUnit::type dst_type;
698
699
0
    switch (src_counter->type()) {
700
0
    case TUnit::BYTES:
701
0
        dst_type = TUnit::BYTES_PER_SECOND;
702
0
        break;
703
704
0
    case TUnit::UNIT:
705
0
        dst_type = TUnit::UNIT_PER_SECOND;
706
0
        break;
707
708
0
    default:
709
0
        DCHECK(false) << "Unsupported src counter type: " << src_counter->type();
710
0
        return nullptr;
711
0
    }
712
713
0
    Counter* dst_counter = add_counter(name, dst_type);
714
0
    return dst_counter;
715
0
}
716
717
RuntimeProfile::Counter* RuntimeProfile::add_rate_counter(const std::string& name, SampleFn fn,
718
0
                                                          TUnit::type dst_type) {
719
0
    return add_counter(name, dst_type);
720
0
}
721
722
RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
723
0
                                                              Counter* src_counter) {
724
0
    DCHECK(src_counter->type() == TUnit::UNIT);
725
0
    return add_counter(name, TUnit::DOUBLE_VALUE);
726
0
}
727
728
RuntimeProfile::Counter* RuntimeProfile::add_sampling_counter(const std::string& name,
729
0
                                                              SampleFn sample_fn) {
730
0
    return add_counter(name, TUnit::DOUBLE_VALUE);
731
0
}
732
733
0
RuntimeProfile::EventSequence* RuntimeProfile::add_event_sequence(const std::string& name) {
734
0
    std::lock_guard<std::mutex> l(_event_sequences_lock);
735
0
    EventSequenceMap::iterator timer_it = _event_sequence_map.find(name);
736
737
0
    if (timer_it != _event_sequence_map.end()) {
738
0
        return timer_it->second;
739
0
    }
740
741
0
    EventSequence* timer = _pool->add(new EventSequence());
742
0
    _event_sequence_map[name] = timer;
743
0
    return timer;
744
0
}
745
746
void RuntimeProfile::print_child_counters(const std::string& prefix,
747
                                          const std::string& counter_name,
748
                                          const CounterMap& counter_map,
749
                                          const ChildCounterMap& child_counter_map,
750
0
                                          std::ostream* s) {
751
0
    std::ostream& stream = *s;
752
0
    ChildCounterMap::const_iterator itr = child_counter_map.find(counter_name);
753
754
0
    if (itr != child_counter_map.end()) {
755
0
        const std::set<std::string>& child_counters = itr->second;
756
0
        for (const std::string& child_counter : child_counters) {
757
0
            CounterMap::const_iterator iter = counter_map.find(child_counter);
758
0
            DCHECK(iter != counter_map.end());
759
0
            stream << prefix << "   - " << iter->first << ": "
760
0
                   << PrettyPrinter::print(iter->second->value(), iter->second->type())
761
0
                   << std::endl;
762
0
            RuntimeProfile::print_child_counters(prefix + "  ", child_counter, counter_map,
763
0
                                                 child_counter_map, s);
764
0
        }
765
0
    }
766
0
}
767
768
} // namespace doris