Coverage Report

Created: 2025-08-27 03:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/util/thread.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/thread.cc
19
// and modified by Doris
20
21
#include "thread.h"
22
23
#include <sys/resource.h>
24
25
#ifndef __APPLE__
26
// IWYU pragma: no_include <bits/types/struct_sched_param.h>
27
#include <sched.h>
28
#include <sys/prctl.h>
29
#else
30
#include <pthread.h>
31
32
#include <cstdint>
33
#endif
34
35
// IWYU pragma: no_include <bthread/errno.h>
36
#include <errno.h> // IWYU pragma: keep
37
#include <sys/syscall.h>
38
#include <time.h>
39
#include <unistd.h>
40
41
#include <algorithm>
42
// IWYU pragma: no_include <bits/chrono.h>
43
#include <chrono> // IWYU pragma: keep
44
#include <cstring>
45
#include <functional>
46
#include <limits>
47
#include <map>
48
#include <memory>
49
#include <mutex>
50
#include <ostream>
51
#include <string>
52
#include <vector>
53
54
#include "absl/strings/substitute.h"
55
#include "common/config.h"
56
#include "common/logging.h"
57
#include "http/web_page_handler.h"
58
#include "runtime/thread_context.h"
59
#include "util/easy_json.h"
60
#include "util/os_util.h"
61
#include "util/url_coding.h"
62
63
namespace doris {
64
65
class ThreadMgr;
66
67
__thread Thread* Thread::_tls = nullptr;
68
69
// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
70
// // The Thread class adds a reference to thread_manager while it is supervising a thread so
71
// // that a race between the end of the process's main thread (and therefore the destruction
72
// // of thread_manager) and the end of a thread that tries to remove itself from the
73
// // manager after the destruction can be avoided.
74
static std::shared_ptr<ThreadMgr> thread_manager;
75
//
76
// Controls the single (lazy) initialization of thread_manager.
77
static std::once_flag once;
78
79
// A singleton class that tracks all live threads, and groups them together for easy
80
// auditing. Used only by Thread.
81
class ThreadMgr {
82
public:
83
9
    ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {}
84
85
0
    ~ThreadMgr() {
86
0
        std::unique_lock<std::mutex> lock(_lock);
87
0
        _thread_categories.clear();
88
0
    }
89
90
    static void set_thread_name(const std::string& name, int64_t tid);
91
92
#ifndef __APPLE__
93
    static void set_idle_sched(int64_t tid);
94
95
    static void set_thread_nice_value(int64_t tid);
96
#endif
97
98
    // not the system TID, since pthread_t is less prone to being recycled.
99
    void add_thread(const pthread_t& pthread_id, const std::string& name,
100
                    const std::string& category, int64_t tid);
101
102
    // Removes a thread from the supplied category. If the thread has
103
    // already been removed, this is a no-op.
104
    void remove_thread(const pthread_t& pthread_id, const std::string& category);
105
106
    void display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const;
107
108
private:
109
    // Container class for any details we want to capture about a thread
110
    // TODO: Add start-time.
111
    // TODO: Track fragment ID.
112
    class ThreadDescriptor {
113
    public:
114
72.8k
        ThreadDescriptor() {}
115
        ThreadDescriptor(std::string category, std::string name, int64_t thread_id)
116
72.8k
                : _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {}
117
118
0
        const std::string& name() const { return _name; }
119
0
        const std::string& category() const { return _category; }
120
0
        int64_t thread_id() const { return _thread_id; }
121
122
    private:
123
        std::string _name;
124
        std::string _category;
125
        int64_t _thread_id;
126
    };
127
128
    void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const;
129
130
    // A ThreadCategory is a set of threads that are logically related.
131
    // TODO: unordered_map is incompatible with pthread_t, but would be more
132
    // efficient here.
133
    typedef std::map<const pthread_t, ThreadDescriptor> ThreadCategory;
134
135
    // All thread categories, keyed on the category name.
136
    typedef std::map<std::string, ThreadCategory> ThreadCategoryMap;
137
138
    // Protects _thread_categories and thread metrics.
139
    mutable std::mutex _lock;
140
141
    // All thread categories that ever contained a thread, even if empty
142
    ThreadCategoryMap _thread_categories;
143
144
    // Counters to track all-time total number of threads, and the
145
    // current number of running threads.
146
    uint64_t _threads_started_metric;
147
    uint64_t _threads_running_metric;
148
149
    DISALLOW_COPY_AND_ASSIGN(ThreadMgr);
150
};
151
152
80.5k
void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) {
153
80.5k
    if (tid == getpid()) {
154
0
        return;
155
0
    }
156
#ifdef __APPLE__
157
    int err = pthread_setname_np(name.c_str());
158
#else
159
80.5k
    int err = prctl(PR_SET_NAME, name.c_str());
160
80.5k
#endif
161
80.5k
    if (err < 0 && errno != EPERM) {
162
0
        LOG(ERROR) << "set_thread_name";
163
0
    }
164
80.5k
}
165
166
#ifndef __APPLE__
167
386
void ThreadMgr::set_idle_sched(int64_t tid) {
168
386
    if (tid == getpid()) {
169
0
        return;
170
0
    }
171
386
    struct sched_param sp = {.sched_priority = 0};
172
386
    int err = sched_setscheduler(0, SCHED_IDLE, &sp);
173
386
    if (err < 0 && errno != EPERM) {
174
0
        LOG(ERROR) << "set_thread_idle_sched";
175
0
    }
176
386
}
177
178
0
void ThreadMgr::set_thread_nice_value(int64_t tid) {
179
0
    if (tid == getpid()) {
180
0
        return;
181
0
    }
182
    // From Linux kernel:
183
    // In the current implementation, each unit of difference in the nice values of two
184
    // processes results in a factor of 1.25 in the degree to which the
185
    // scheduler favors the higher priority process.  This causes very
186
    // low nice values (+19) to truly provide little CPU to a process
187
    // whenever there is any other higher priority load on the system,
188
    // and makes high nice values (-20) deliver most of the CPU to
189
    // applications that require it (e.g., some audio applications).
190
191
    // Choose 5 as lower priority value, default is 0
192
0
    int err = setpriority(PRIO_PROCESS, 0, config::scan_thread_nice_value);
193
0
    if (err < 0 && errno != EPERM) {
194
0
        LOG(ERROR) << "set_thread_low_priority";
195
0
    }
196
0
}
197
#endif
198
199
void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name,
200
72.7k
                           const std::string& category, int64_t tid) {
201
72.7k
    std::unique_lock<std::mutex> l(_lock);
202
72.7k
    _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid);
203
72.7k
    _threads_running_metric++;
204
72.7k
    _threads_started_metric++;
205
72.7k
}
206
207
68.1k
void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) {
208
68.1k
    std::unique_lock<std::mutex> l(_lock);
209
68.1k
    auto category_it = _thread_categories.find(category);
210
68.1k
    DCHECK(category_it != _thread_categories.end());
211
68.1k
    category_it->second.erase(pthread_id);
212
68.1k
    _threads_running_metric--;
213
68.1k
}
214
215
void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
216
0
                                        EasyJson* ej) const {
217
0
    if (args.contains("group")) {
218
0
        const auto& category_name = args.at("group");
219
0
        bool requested_all = category_name == "all";
220
0
        ej->Set("requested_thread_group", EasyJson::kObject);
221
0
        (*ej)["group_name"] = escape_for_html_to_string(category_name);
222
0
        (*ej)["requested_all"] = requested_all;
223
224
        // The critical section is as short as possible so as to minimize the delay
225
        // imposed on new threads that acquire the lock in write mode.
226
0
        std::vector<ThreadDescriptor> descriptors_to_print;
227
0
        if (!requested_all) {
228
0
            std::unique_lock<std::mutex> l(_lock);
229
0
            if (!_thread_categories.contains(category_name)) {
230
0
                return;
231
0
            }
232
0
            for (const auto& elem : _thread_categories.at(category_name)) {
233
0
                descriptors_to_print.emplace_back(elem.second);
234
0
            }
235
0
        } else {
236
0
            std::unique_lock<std::mutex> l(_lock);
237
0
            for (const auto& category : _thread_categories) {
238
0
                for (const auto& elem : category.second) {
239
0
                    descriptors_to_print.emplace_back(elem.second);
240
0
                }
241
0
            }
242
0
        }
243
244
0
        EasyJson found = (*ej).Set("found", EasyJson::kObject);
245
0
        EasyJson threads = found.Set("threads", EasyJson::kArray);
246
0
        for (const auto& desc : descriptors_to_print) {
247
0
            summarize_thread_descriptor(desc, &threads);
248
0
        }
249
0
    } else {
250
        // List all thread groups and the number of threads running in each.
251
0
        std::vector<std::pair<std::string, uint64_t>> thread_categories_info;
252
0
        uint64_t running;
253
0
        {
254
0
            std::unique_lock<std::mutex> l(_lock);
255
0
            running = _threads_running_metric;
256
0
            thread_categories_info.reserve(_thread_categories.size());
257
0
            for (const auto& category : _thread_categories) {
258
0
                thread_categories_info.emplace_back(category.first, category.second.size());
259
0
            }
260
261
0
            (*ej)["total_threads_running"] = running;
262
0
            EasyJson groups = ej->Set("groups", EasyJson::kArray);
263
0
            for (const auto& elem : thread_categories_info) {
264
0
                std::string category_arg;
265
0
                url_encode(elem.first, &category_arg);
266
0
                EasyJson group = groups.PushBack(EasyJson::kObject);
267
0
                group["encoded_group_name"] = category_arg;
268
0
                group["group_name"] = elem.first;
269
0
                group["threads_running"] = elem.second;
270
0
            }
271
0
        }
272
0
    }
273
0
}
274
275
void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc,
276
0
                                            EasyJson* ej) const {
277
0
    ThreadStats stats;
278
0
    Status status = get_thread_stats(desc.thread_id(), &stats);
279
0
    if (!status.ok()) {
280
0
        LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string();
281
0
    }
282
283
0
    EasyJson thread = ej->PushBack(EasyJson::kObject);
284
0
    thread["thread_name"] = desc.name();
285
0
    thread["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
286
0
    thread["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
287
0
    thread["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
288
0
}
289
290
68.1k
Thread::~Thread() {
291
68.1k
    if (_joinable) {
292
66.8k
        int ret = pthread_detach(_thread);
293
66.8k
        CHECK_EQ(ret, 0);
294
66.8k
    }
295
68.1k
}
296
297
7.73k
void Thread::set_self_name(const std::string& name) {
298
7.73k
    ThreadMgr::set_thread_name(name, current_thread_id());
299
7.73k
}
300
301
#ifndef __APPLE__
302
386
void Thread::set_idle_sched() {
303
386
    ThreadMgr::set_idle_sched(current_thread_id());
304
386
}
305
306
0
void Thread::set_thread_nice_value() {
307
0
    ThreadMgr::set_thread_nice_value(current_thread_id());
308
0
}
309
#endif
310
311
1.25k
void Thread::join() {
312
1.25k
    static_cast<void>(ThreadJoiner(this).join());
313
1.25k
}
314
315
1.00k
int64_t Thread::tid() const {
316
1.00k
    int64_t t = _tid.load();
317
1.00k
    if (t != PARENT_WAITING_TID) {
318
1.00k
        return t;
319
1.00k
    }
320
0
    return wait_for_tid();
321
1.00k
}
322
323
0
pthread_t Thread::pthread_id() const {
324
0
    return _thread;
325
0
}
326
327
72.7k
const std::string& Thread::name() const {
328
72.7k
    return _name;
329
72.7k
}
330
331
140k
const std::string& Thread::category() const {
332
140k
    return _category;
333
140k
}
334
335
0
std::string Thread::to_string() const {
336
0
    return absl::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category);
337
0
}
338
339
1.90M
Thread* Thread::current_thread() {
340
1.90M
    return _tls;
341
1.90M
}
342
343
0
int64_t Thread::unique_thread_id() {
344
#ifdef __APPLE__
345
    uint64_t tid;
346
    pthread_threadid_np(pthread_self(), &tid);
347
    return tid;
348
#else
349
0
    return static_cast<int64_t>(pthread_self());
350
0
#endif
351
0
}
352
353
80.9k
int64_t Thread::current_thread_id() {
354
#ifdef __APPLE__
355
    uint64_t tid;
356
    pthread_threadid_np(nullptr, &tid);
357
    return tid;
358
#else
359
80.9k
    return syscall(SYS_gettid);
360
80.9k
#endif
361
80.9k
}
362
363
0
int64_t Thread::wait_for_tid() const {
364
0
    int loop_count = 0;
365
0
    while (true) {
366
0
        int64_t t = _tid.load();
367
0
        if (t != PARENT_WAITING_TID) {
368
0
            return t;
369
0
        }
370
        // copied from boost::detail::yield
371
0
        int k = loop_count++;
372
0
        if (k < 32 || k & 1) {
373
0
            sched_yield();
374
0
        } else {
375
            // g++ -Wextra warns on {} or {0}
376
0
            struct timespec rqtp = {0, 0};
377
378
            // POSIX says that timespec has tv_sec and tv_nsec
379
            // But it doesn't guarantee order or placement
380
381
0
            rqtp.tv_sec = 0;
382
0
            rqtp.tv_nsec = 1000;
383
384
0
            nanosleep(&rqtp, 0);
385
0
        }
386
0
    }
387
0
}
388
389
Status Thread::start_thread(const std::string& category, const std::string& name,
390
72.7k
                            const ThreadFunctor& functor, std::shared_ptr<Thread>* holder) {
391
72.7k
    std::call_once(once, init_threadmgr);
392
393
    // Temporary reference for the duration of this function.
394
72.7k
    auto t = std::make_shared<Thread>(category, name, functor);
395
396
    // Optional, and only set if the thread was successfully created.
397
    //
398
    // We have to set this before we even start the thread because it's
399
    // allowed for the thread functor to access 'holder'.
400
72.7k
    if (holder) {
401
1.47k
        *holder = t;
402
1.47k
    }
403
404
72.7k
    t->_tid = PARENT_WAITING_TID;
405
406
    // Add a reference count to the thread since SuperviseThread() needs to
407
    // access the thread object, and we have no guarantee that our caller
408
    // won't drop the reference as soon as we return. This is dereferenced
409
    // in FinishThread().
410
72.7k
    t->_shared_self = t;
411
412
72.7k
    int ret = pthread_create(&t->_thread, nullptr, &Thread::supervise_thread, t.get());
413
72.7k
    if (ret) {
414
        // If we failed to create the thread, we need to undo all of our prep work.
415
0
        t->_tid = INVALID_TID;
416
0
        t->_shared_self.reset();
417
0
        return Status::RuntimeError("Could not create thread. (error {}) {}", ret, strerror(ret));
418
0
    }
419
420
    // The thread has been created and is now joinable.
421
    //
422
    // Why set this in the parent and not the child? Because only the parent
423
    // (or someone communicating with the parent) can join, so joinable must
424
    // be set before the parent returns.
425
72.7k
    t->_joinable = true;
426
18.4E
    VLOG_NOTICE << "Started thread " << t->tid() << " - " << category << ":" << name;
427
72.7k
    return Status::OK();
428
72.7k
}
429
430
72.8k
void* Thread::supervise_thread(void* arg) {
431
72.8k
    Thread* t = static_cast<Thread*>(arg);
432
72.8k
    int64_t system_tid = Thread::current_thread_id();
433
72.8k
    PCHECK(system_tid != -1);
434
435
    // Take an additional reference to the thread manager, which we'll need below.
436
72.8k
    std::shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
437
438
    // Set up the TLS.
439
    //
440
    // We could store a ptr in the TLS itself, but as its
441
    // lifecycle is poorly defined, we'll use a bare pointer. We
442
    // already incremented the reference count in StartThread.
443
72.8k
    Thread::_tls = t;
444
445
    // Create thread context, there is no need to create it when func is executed.
446
72.8k
    ThreadLocalHandle::create_thread_local_if_not_exits();
447
448
    // Publish our tid to '_tid', which unblocks any callers waiting in
449
    // WaitForTid().
450
72.8k
    t->_tid.store(system_tid);
451
452
72.8k
    std::string name = absl::Substitute("$0-$1", t->name(), system_tid);
453
72.8k
    ThreadMgr::set_thread_name(name, t->_tid);
454
72.8k
    thread_manager->add_thread(pthread_self(), name, t->category(), t->_tid);
455
456
    // FinishThread() is guaranteed to run (even if functor_ throws an
457
    // exception) because pthread_cleanup_push() creates a scoped object
458
    // whose destructor invokes the provided callback.
459
72.8k
    pthread_cleanup_push(&Thread::finish_thread, t);
460
72.8k
    t->_functor();
461
72.8k
    pthread_cleanup_pop(true);
462
463
72.8k
    return nullptr;
464
72.8k
}
465
466
68.1k
void Thread::finish_thread(void* arg) {
467
68.1k
    Thread* t = static_cast<Thread*>(arg);
468
469
    // We're here either because of the explicit pthread_cleanup_pop() in
470
    // SuperviseThread() or through pthread_exit(). In either case,
471
    // thread_manager is guaranteed to be live because thread_mgr_ref in
472
    // SuperviseThread() is still live.
473
68.1k
    thread_manager->remove_thread(pthread_self(), t->category());
474
475
    // Signal any Joiner that we're done.
476
68.1k
    t->_done.count_down();
477
478
18.4E
    VLOG_CRITICAL << "Ended thread " << t->_tid << " - " << t->category() << ":" << t->name();
479
68.1k
    t->_shared_self.reset();
480
    // NOTE: the above 'Release' call could be the last reference to 'this',
481
    // so 'this' could be destructed at this point. Do not add any code
482
    // following here!
483
484
68.1k
    ThreadLocalHandle::del_thread_local_if_count_is_zero();
485
68.1k
}
486
487
9
void Thread::init_threadmgr() {
488
9
    thread_manager.reset(new ThreadMgr());
489
9
}
490
491
ThreadJoiner::ThreadJoiner(Thread* thr)
492
3.78k
        : _thread(CHECK_NOTNULL(thr)),
493
3.78k
          _warn_after_ms(kDefaultWarnAfterMs),
494
1.26k
          _warn_every_ms(kDefaultWarnEveryMs),
495
1.26k
          _give_up_after_ms(kDefaultGiveUpAfterMs) {}
496
497
1
ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
498
1
    _warn_after_ms = ms;
499
1
    return *this;
500
1
}
501
502
1
ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) {
503
1
    _warn_every_ms = ms;
504
1
    return *this;
505
1
}
506
507
1
ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
508
1
    _give_up_after_ms = ms;
509
1
    return *this;
510
1
}
511
512
1.26k
Status ThreadJoiner::join() {
513
1.26k
    if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) {
514
1
        return Status::InvalidArgument("Can't join on own thread. (error {}) {}", -1,
515
1
                                       _thread->_name);
516
1
    }
517
518
    // Early exit: double join is a no-op.
519
1.26k
    if (!_thread->_joinable) {
520
1
        return Status::OK();
521
1
    }
522
523
1.25k
    int waited_ms = 0;
524
1.25k
    bool keep_trying = true;
525
1.27k
    while (keep_trying) {
526
1.27k
        if (waited_ms >= _warn_after_ms) {
527
13
            LOG(WARNING) << absl::Substitute("Waited for $0ms trying to join with $1 (tid $2)",
528
13
                                             waited_ms, _thread->_name, _thread->_tid.load());
529
13
        }
530
531
1.27k
        int remaining_before_giveup = std::numeric_limits<int>::max();
532
1.27k
        if (_give_up_after_ms != -1) {
533
1
            remaining_before_giveup = _give_up_after_ms - waited_ms;
534
1
        }
535
536
1.27k
        int remaining_before_next_warn = _warn_every_ms;
537
1.27k
        if (waited_ms < _warn_after_ms) {
538
1.25k
            remaining_before_next_warn = _warn_after_ms - waited_ms;
539
1.25k
        }
540
541
1.27k
        if (remaining_before_giveup < remaining_before_next_warn) {
542
1
            keep_trying = false;
543
1
        }
544
545
1.27k
        int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);
546
547
1.27k
        if (_thread->_done.wait_for(std::chrono::milliseconds(wait_for))) {
548
            // Unconditionally join before returning, to guarantee that any TLS
549
            // has been destroyed (pthread_key_create() destructors only run
550
            // after a pthread's user method has returned).
551
1.25k
            int ret = pthread_join(_thread->_thread, nullptr);
552
1.25k
            CHECK_EQ(ret, 0);
553
1.25k
            _thread->_joinable = false;
554
1.25k
            return Status::OK();
555
1.25k
        }
556
14
        waited_ms += wait_for;
557
14
    }
558
1
    return Status::Aborted("Timed out after {}ms joining on {}", waited_ms, _thread->_name);
559
1.25k
}
560
561
9
void register_thread_display_page(WebPageHandler* web_page_handler) {
562
9
    web_page_handler->register_template_page(
563
9
            "/threadz", "Threads",
564
9
            std::bind(&ThreadMgr::display_thread_callback, thread_manager.get(),
565
9
                      std::placeholders::_1, std::placeholders::_2),
566
9
            true);
567
9
}
568
} // namespace doris