Coverage Report

Created: 2025-05-21 07:53

/root/doris/be/src/util/thread.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
// This file is copied from
18
// https://github.com/apache/impala/blob/branch-2.9.0/be/src/util/thread.cc
19
// and modified by Doris
20
21
#include "thread.h"
22
23
#include <sys/resource.h>
24
25
#ifndef __APPLE__
26
// IWYU pragma: no_include <bits/types/struct_sched_param.h>
27
#include <sched.h>
28
#include <sys/prctl.h>
29
#else
30
#include <pthread.h>
31
32
#include <cstdint>
33
#endif
34
35
// IWYU pragma: no_include <bthread/errno.h>
36
#include <errno.h> // IWYU pragma: keep
37
#include <sys/syscall.h>
38
#include <time.h>
39
#include <unistd.h>
40
41
#include <algorithm>
42
// IWYU pragma: no_include <bits/chrono.h>
43
#include <chrono> // IWYU pragma: keep
44
#include <cstring>
45
#include <functional>
46
#include <limits>
47
#include <map>
48
#include <memory>
49
#include <mutex>
50
#include <ostream>
51
#include <string>
52
#include <vector>
53
54
#include "absl/strings/substitute.h"
55
#include "common/config.h"
56
#include "common/logging.h"
57
#include "gutil/atomicops.h"
58
#include "http/web_page_handler.h"
59
#include "runtime/thread_context.h"
60
#include "util/easy_json.h"
61
#include "util/os_util.h"
62
#include "util/scoped_cleanup.h"
63
#include "util/url_coding.h"
64
65
namespace doris {
66
67
class ThreadMgr;
68
69
__thread Thread* Thread::_tls = nullptr;
70
71
// Singleton instance of ThreadMgr. Only visible in this file, used only by Thread.
72
// // The Thread class adds a reference to thread_manager while it is supervising a thread so
73
// // that a race between the end of the process's main thread (and therefore the destruction
74
// // of thread_manager) and the end of a thread that tries to remove itself from the
75
// // manager after the destruction can be avoided.
76
static std::shared_ptr<ThreadMgr> thread_manager;
77
//
78
// Controls the single (lazy) initialization of thread_manager.
79
static std::once_flag once;
80
81
// A singleton class that tracks all live threads, and groups them together for easy
82
// auditing. Used only by Thread.
83
class ThreadMgr {
84
public:
85
4
    ThreadMgr() : _threads_started_metric(0), _threads_running_metric(0) {}
86
87
0
    ~ThreadMgr() {
88
0
        std::unique_lock<std::mutex> lock(_lock);
89
0
        _thread_categories.clear();
90
0
    }
91
92
    static void set_thread_name(const std::string& name, int64_t tid);
93
94
#ifndef __APPLE__
95
    static void set_idle_sched(int64_t tid);
96
97
    static void set_thread_nice_value(int64_t tid);
98
#endif
99
100
    // not the system TID, since pthread_t is less prone to being recycled.
101
    void add_thread(const pthread_t& pthread_id, const std::string& name,
102
                    const std::string& category, int64_t tid);
103
104
    // Removes a thread from the supplied category. If the thread has
105
    // already been removed, this is a no-op.
106
    void remove_thread(const pthread_t& pthread_id, const std::string& category);
107
108
    void display_thread_callback(const WebPageHandler::ArgumentMap& args, EasyJson* ej) const;
109
110
private:
111
    // Container class for any details we want to capture about a thread
112
    // TODO: Add start-time.
113
    // TODO: Track fragment ID.
114
    class ThreadDescriptor {
115
    public:
116
33.7k
        ThreadDescriptor() {}
117
        ThreadDescriptor(std::string category, std::string name, int64_t thread_id)
118
33.7k
                : _name(std::move(name)), _category(std::move(category)), _thread_id(thread_id) {}
119
120
0
        const std::string& name() const { return _name; }
121
0
        const std::string& category() const { return _category; }
122
0
        int64_t thread_id() const { return _thread_id; }
123
124
    private:
125
        std::string _name;
126
        std::string _category;
127
        int64_t _thread_id;
128
    };
129
130
    void summarize_thread_descriptor(const ThreadDescriptor& desc, EasyJson* ej) const;
131
132
    // A ThreadCategory is a set of threads that are logically related.
133
    // TODO: unordered_map is incompatible with pthread_t, but would be more
134
    // efficient here.
135
    typedef std::map<const pthread_t, ThreadDescriptor> ThreadCategory;
136
137
    // All thread categories, keyed on the category name.
138
    typedef std::map<std::string, ThreadCategory> ThreadCategoryMap;
139
140
    // Protects _thread_categories and thread metrics.
141
    mutable std::mutex _lock;
142
143
    // All thread categories that ever contained a thread, even if empty
144
    ThreadCategoryMap _thread_categories;
145
146
    // Counters to track all-time total number of threads, and the
147
    // current number of running threads.
148
    uint64_t _threads_started_metric;
149
    uint64_t _threads_running_metric;
150
151
    DISALLOW_COPY_AND_ASSIGN(ThreadMgr);
152
};
153
154
36.3k
void ThreadMgr::set_thread_name(const std::string& name, int64_t tid) {
155
36.3k
    if (tid == getpid()) {
156
0
        return;
157
0
    }
158
#ifdef __APPLE__
159
    int err = pthread_setname_np(name.c_str());
160
#else
161
36.3k
    int err = prctl(PR_SET_NAME, name.c_str());
162
36.3k
#endif
163
36.3k
    if (err < 0 && errno != EPERM) {
164
0
        LOG(ERROR) << "set_thread_name";
165
0
    }
166
36.3k
}
167
168
#ifndef __APPLE__
169
112
void ThreadMgr::set_idle_sched(int64_t tid) {
170
112
    if (tid == getpid()) {
171
0
        return;
172
0
    }
173
112
    struct sched_param sp = {.sched_priority = 0};
174
112
    int err = sched_setscheduler(0, SCHED_IDLE, &sp);
175
112
    if (err < 0 && errno != EPERM) {
176
0
        LOG(ERROR) << "set_thread_idle_sched";
177
0
    }
178
112
}
179
180
0
void ThreadMgr::set_thread_nice_value(int64_t tid) {
181
0
    if (tid == getpid()) {
182
0
        return;
183
0
    }
184
    // From Linux kernel:
185
    // In the current implementation, each unit of difference in the nice values of two
186
    // processes results in a factor of 1.25 in the degree to which the
187
    // scheduler favors the higher priority process.  This causes very
188
    // low nice values (+19) to truly provide little CPU to a process
189
    // whenever there is any other higher priority load on the system,
190
    // and makes high nice values (-20) deliver most of the CPU to
191
    // applications that require it (e.g., some audio applications).
192
193
    // Choose 5 as lower priority value, default is 0
194
0
    int err = setpriority(PRIO_PROCESS, 0, config::scan_thread_nice_value);
195
0
    if (err < 0 && errno != EPERM) {
196
0
        LOG(ERROR) << "set_thread_low_priority";
197
0
    }
198
0
}
199
#endif
200
201
void ThreadMgr::add_thread(const pthread_t& pthread_id, const std::string& name,
202
33.7k
                           const std::string& category, int64_t tid) {
203
33.7k
    std::unique_lock<std::mutex> l(_lock);
204
33.7k
    _thread_categories[category][pthread_id] = ThreadDescriptor(category, name, tid);
205
33.7k
    _threads_running_metric++;
206
33.7k
    _threads_started_metric++;
207
33.7k
}
208
209
31.2k
void ThreadMgr::remove_thread(const pthread_t& pthread_id, const std::string& category) {
210
31.2k
    std::unique_lock<std::mutex> l(_lock);
211
31.2k
    auto category_it = _thread_categories.find(category);
212
31.2k
    DCHECK(category_it != _thread_categories.end());
213
31.2k
    category_it->second.erase(pthread_id);
214
31.2k
    _threads_running_metric--;
215
31.2k
}
216
217
void ThreadMgr::display_thread_callback(const WebPageHandler::ArgumentMap& args,
218
0
                                        EasyJson* ej) const {
219
0
    if (args.contains("group")) {
220
0
        const auto& category_name = args.at("group");
221
0
        bool requested_all = category_name == "all";
222
0
        ej->Set("requested_thread_group", EasyJson::kObject);
223
0
        (*ej)["group_name"] = escape_for_html_to_string(category_name);
224
0
        (*ej)["requested_all"] = requested_all;
225
226
        // The critical section is as short as possible so as to minimize the delay
227
        // imposed on new threads that acquire the lock in write mode.
228
0
        std::vector<ThreadDescriptor> descriptors_to_print;
229
0
        if (!requested_all) {
230
0
            std::unique_lock<std::mutex> l(_lock);
231
0
            if (!_thread_categories.contains(category_name)) {
232
0
                return;
233
0
            }
234
0
            for (const auto& elem : _thread_categories.at(category_name)) {
235
0
                descriptors_to_print.emplace_back(elem.second);
236
0
            }
237
0
        } else {
238
0
            std::unique_lock<std::mutex> l(_lock);
239
0
            for (const auto& category : _thread_categories) {
240
0
                for (const auto& elem : category.second) {
241
0
                    descriptors_to_print.emplace_back(elem.second);
242
0
                }
243
0
            }
244
0
        }
245
246
0
        EasyJson found = (*ej).Set("found", EasyJson::kObject);
247
0
        EasyJson threads = found.Set("threads", EasyJson::kArray);
248
0
        for (const auto& desc : descriptors_to_print) {
249
0
            summarize_thread_descriptor(desc, &threads);
250
0
        }
251
0
    } else {
252
        // List all thread groups and the number of threads running in each.
253
0
        std::vector<std::pair<std::string, uint64_t>> thread_categories_info;
254
0
        uint64_t running;
255
0
        {
256
0
            std::unique_lock<std::mutex> l(_lock);
257
0
            running = _threads_running_metric;
258
0
            thread_categories_info.reserve(_thread_categories.size());
259
0
            for (const auto& category : _thread_categories) {
260
0
                thread_categories_info.emplace_back(category.first, category.second.size());
261
0
            }
262
263
0
            (*ej)["total_threads_running"] = running;
264
0
            EasyJson groups = ej->Set("groups", EasyJson::kArray);
265
0
            for (const auto& elem : thread_categories_info) {
266
0
                std::string category_arg;
267
0
                url_encode(elem.first, &category_arg);
268
0
                EasyJson group = groups.PushBack(EasyJson::kObject);
269
0
                group["encoded_group_name"] = category_arg;
270
0
                group["group_name"] = elem.first;
271
0
                group["threads_running"] = elem.second;
272
0
            }
273
0
        }
274
0
    }
275
0
}
276
277
void ThreadMgr::summarize_thread_descriptor(const ThreadMgr::ThreadDescriptor& desc,
278
0
                                            EasyJson* ej) const {
279
0
    ThreadStats stats;
280
0
    Status status = get_thread_stats(desc.thread_id(), &stats);
281
0
    if (!status.ok()) {
282
0
        LOG(WARNING) << "Could not get per-thread statistics: " << status.to_string();
283
0
    }
284
285
0
    EasyJson thread = ej->PushBack(EasyJson::kObject);
286
0
    thread["thread_name"] = desc.name();
287
0
    thread["user_sec"] = static_cast<double>(stats.user_ns) / 1e9;
288
0
    thread["kernel_sec"] = static_cast<double>(stats.kernel_ns) / 1e9;
289
0
    thread["iowait_sec"] = static_cast<double>(stats.iowait_ns) / 1e9;
290
0
}
291
292
31.2k
Thread::~Thread() {
293
31.2k
    if (_joinable) {
294
30.0k
        int ret = pthread_detach(_thread);
295
30.0k
        CHECK_EQ(ret, 0);
296
30.0k
    }
297
31.2k
}
298
299
2.60k
void Thread::set_self_name(const std::string& name) {
300
2.60k
    ThreadMgr::set_thread_name(name, current_thread_id());
301
2.60k
}
302
303
#ifndef __APPLE__
304
112
void Thread::set_idle_sched() {
305
112
    ThreadMgr::set_idle_sched(current_thread_id());
306
112
}
307
308
0
void Thread::set_thread_nice_value() {
309
0
    ThreadMgr::set_thread_nice_value(current_thread_id());
310
0
}
311
#endif
312
313
1.11k
void Thread::join() {
314
1.11k
    static_cast<void>(ThreadJoiner(this).join());
315
1.11k
}
316
317
1.00k
int64_t Thread::tid() const {
318
1.00k
    int64_t t = base::subtle::Acquire_Load(&_tid);
319
1.00k
    if (t != PARENT_WAITING_TID) {
320
1.00k
        return _tid;
321
1.00k
    }
322
1
    return wait_for_tid();
323
1.00k
}
324
325
0
pthread_t Thread::pthread_id() const {
326
0
    return _thread;
327
0
}
328
329
33.7k
const std::string& Thread::name() const {
330
33.7k
    return _name;
331
33.7k
}
332
333
64.9k
const std::string& Thread::category() const {
334
64.9k
    return _category;
335
64.9k
}
336
337
0
std::string Thread::to_string() const {
338
0
    return absl::Substitute("Thread $0 (name: \"$1\", category: \"$2\")", tid(), _name, _category);
339
0
}
340
341
1.71M
Thread* Thread::current_thread() {
342
1.71M
    return _tls;
343
1.71M
}
344
345
0
int64_t Thread::unique_thread_id() {
346
#ifdef __APPLE__
347
    uint64_t tid;
348
    pthread_threadid_np(pthread_self(), &tid);
349
    return tid;
350
#else
351
0
    return static_cast<int64_t>(pthread_self());
352
0
#endif
353
0
}
354
355
36.4k
int64_t Thread::current_thread_id() {
356
#ifdef __APPLE__
357
    uint64_t tid;
358
    pthread_threadid_np(nullptr, &tid);
359
    return tid;
360
#else
361
36.4k
    return syscall(SYS_gettid);
362
36.4k
#endif
363
36.4k
}
364
365
1
int64_t Thread::wait_for_tid() const {
366
1
    int loop_count = 0;
367
36
    while (true) {
368
36
        int64_t t = Acquire_Load(&_tid);
369
36
        if (t != PARENT_WAITING_TID) {
370
1
            return t;
371
1
        }
372
        // copied from boost::detail::yield
373
35
        int k = loop_count++;
374
35
        if (k < 32 || k & 1) {
375
33
            sched_yield();
376
33
        } else {
377
            // g++ -Wextra warns on {} or {0}
378
2
            struct timespec rqtp = {0, 0};
379
380
            // POSIX says that timespec has tv_sec and tv_nsec
381
            // But it doesn't guarantee order or placement
382
383
2
            rqtp.tv_sec = 0;
384
2
            rqtp.tv_nsec = 1000;
385
386
2
            nanosleep(&rqtp, 0);
387
2
        }
388
35
    }
389
1
}
390
391
Status Thread::start_thread(const std::string& category, const std::string& name,
392
                            const ThreadFunctor& functor, uint64_t flags,
393
33.7k
                            scoped_refptr<Thread>* holder) {
394
33.7k
    std::call_once(once, init_threadmgr);
395
396
    // Temporary reference for the duration of this function.
397
33.7k
    scoped_refptr<Thread> t(new Thread(category, name, functor));
398
399
    // Optional, and only set if the thread was successfully created.
400
    //
401
    // We have to set this before we even start the thread because it's
402
    // allowed for the thread functor to access 'holder'.
403
33.7k
    if (holder) {
404
1.22k
        *holder = t;
405
1.22k
    }
406
407
33.7k
    t->_tid = PARENT_WAITING_TID;
408
409
    // Add a reference count to the thread since SuperviseThread() needs to
410
    // access the thread object, and we have no guarantee that our caller
411
    // won't drop the reference as soon as we return. This is dereferenced
412
    // in FinishThread().
413
33.7k
    t->AddRef();
414
415
33.7k
    auto cleanup = MakeScopedCleanup([&]() {
416
        // If we failed to create the thread, we need to undo all of our prep work.
417
0
        t->_tid = INVALID_TID;
418
0
        t->Release();
419
0
    });
420
421
33.7k
    int ret = pthread_create(&t->_thread, nullptr, &Thread::supervise_thread, t.get());
422
33.7k
    if (ret) {
423
0
        return Status::RuntimeError("Could not create thread. (error {}) {}", ret, strerror(ret));
424
0
    }
425
426
    // The thread has been created and is now joinable.
427
    //
428
    // Why set this in the parent and not the child? Because only the parent
429
    // (or someone communicating with the parent) can join, so joinable must
430
    // be set before the parent returns.
431
33.7k
    t->_joinable = true;
432
33.7k
    cleanup.cancel();
433
434
18.4E
    VLOG_NOTICE << "Started thread " << t->tid() << " - " << category << ":" << name;
435
33.7k
    return Status::OK();
436
33.7k
}
437
438
33.7k
void* Thread::supervise_thread(void* arg) {
439
33.7k
    Thread* t = static_cast<Thread*>(arg);
440
33.7k
    int64_t system_tid = Thread::current_thread_id();
441
33.7k
    PCHECK(system_tid != -1);
442
443
    // Take an additional reference to the thread manager, which we'll need below.
444
33.7k
    std::shared_ptr<ThreadMgr> thread_mgr_ref = thread_manager;
445
446
    // Set up the TLS.
447
    //
448
    // We could store a scoped_refptr in the TLS itself, but as its
449
    // lifecycle is poorly defined, we'll use a bare pointer. We
450
    // already incremented the reference count in StartThread.
451
33.7k
    Thread::_tls = t;
452
453
    // Create thread context, there is no need to create it when func is executed.
454
33.7k
    ThreadLocalHandle::create_thread_local_if_not_exits();
455
456
    // Publish our tid to '_tid', which unblocks any callers waiting in
457
    // WaitForTid().
458
33.7k
    Release_Store(&t->_tid, system_tid);
459
460
33.7k
    std::string name = absl::Substitute("$0-$1", t->name(), system_tid);
461
33.7k
    thread_manager->set_thread_name(name, t->_tid);
462
33.7k
    thread_manager->add_thread(pthread_self(), name, t->category(), t->_tid);
463
464
    // FinishThread() is guaranteed to run (even if functor_ throws an
465
    // exception) because pthread_cleanup_push() creates a scoped object
466
    // whose destructor invokes the provided callback.
467
33.7k
    pthread_cleanup_push(&Thread::finish_thread, t);
468
33.7k
    t->_functor();
469
33.7k
    pthread_cleanup_pop(true);
470
471
33.7k
    return nullptr;
472
33.7k
}
473
474
31.2k
void Thread::finish_thread(void* arg) {
475
31.2k
    Thread* t = static_cast<Thread*>(arg);
476
477
    // We're here either because of the explicit pthread_cleanup_pop() in
478
    // SuperviseThread() or through pthread_exit(). In either case,
479
    // thread_manager is guaranteed to be live because thread_mgr_ref in
480
    // SuperviseThread() is still live.
481
31.2k
    thread_manager->remove_thread(pthread_self(), t->category());
482
483
    // Signal any Joiner that we're done.
484
31.2k
    t->_done.count_down();
485
486
31.2k
    VLOG_CRITICAL << "Ended thread " << t->_tid << " - " << t->category() << ":" << t->name();
487
31.2k
    t->Release();
488
    // NOTE: the above 'Release' call could be the last reference to 'this',
489
    // so 'this' could be destructed at this point. Do not add any code
490
    // following here!
491
492
31.2k
    ThreadLocalHandle::del_thread_local_if_count_is_zero();
493
31.2k
}
494
495
4
void Thread::init_threadmgr() {
496
4
    thread_manager.reset(new ThreadMgr());
497
4
}
498
499
ThreadJoiner::ThreadJoiner(Thread* thr)
500
        : _thread(CHECK_NOTNULL(thr)),
501
          _warn_after_ms(kDefaultWarnAfterMs),
502
          _warn_every_ms(kDefaultWarnEveryMs),
503
1.12k
          _give_up_after_ms(kDefaultGiveUpAfterMs) {}
504
505
1
ThreadJoiner& ThreadJoiner::warn_after_ms(int ms) {
506
1
    _warn_after_ms = ms;
507
1
    return *this;
508
1
}
509
510
1
ThreadJoiner& ThreadJoiner::warn_every_ms(int ms) {
511
1
    _warn_every_ms = ms;
512
1
    return *this;
513
1
}
514
515
1
ThreadJoiner& ThreadJoiner::give_up_after_ms(int ms) {
516
1
    _give_up_after_ms = ms;
517
1
    return *this;
518
1
}
519
520
1.12k
Status ThreadJoiner::join() {
521
1.12k
    if (Thread::current_thread() && Thread::current_thread()->tid() == _thread->tid()) {
522
1
        return Status::InvalidArgument("Can't join on own thread. (error {}) {}", -1,
523
1
                                       _thread->_name);
524
1
    }
525
526
    // Early exit: double join is a no-op.
527
1.12k
    if (!_thread->_joinable) {
528
1
        return Status::OK();
529
1
    }
530
531
1.12k
    int waited_ms = 0;
532
1.12k
    bool keep_trying = true;
533
1.13k
    while (keep_trying) {
534
1.13k
        if (waited_ms >= _warn_after_ms) {
535
10
            LOG(WARNING) << absl::Substitute("Waited for $0ms trying to join with $1 (tid $2)",
536
10
                                             waited_ms, _thread->_name, _thread->_tid);
537
10
        }
538
539
1.13k
        int remaining_before_giveup = std::numeric_limits<int>::max();
540
1.13k
        if (_give_up_after_ms != -1) {
541
1
            remaining_before_giveup = _give_up_after_ms - waited_ms;
542
1
        }
543
544
1.13k
        int remaining_before_next_warn = _warn_every_ms;
545
1.13k
        if (waited_ms < _warn_after_ms) {
546
1.12k
            remaining_before_next_warn = _warn_after_ms - waited_ms;
547
1.12k
        }
548
549
1.13k
        if (remaining_before_giveup < remaining_before_next_warn) {
550
1
            keep_trying = false;
551
1
        }
552
553
1.13k
        int wait_for = std::min(remaining_before_giveup, remaining_before_next_warn);
554
555
1.13k
        if (_thread->_done.wait_for(std::chrono::milliseconds(wait_for))) {
556
            // Unconditionally join before returning, to guarantee that any TLS
557
            // has been destroyed (pthread_key_create() destructors only run
558
            // after a pthread's user method has returned).
559
1.11k
            int ret = pthread_join(_thread->_thread, nullptr);
560
1.11k
            CHECK_EQ(ret, 0);
561
1.11k
            _thread->_joinable = false;
562
1.11k
            return Status::OK();
563
1.11k
        }
564
11
        waited_ms += wait_for;
565
11
    }
566
1
    return Status::Aborted("Timed out after {}ms joining on {}", waited_ms, _thread->_name);
567
1.12k
}
568
569
4
void register_thread_display_page(WebPageHandler* web_page_handler) {
570
4
    web_page_handler->register_template_page(
571
4
            "/threadz", "Threads",
572
4
            std::bind(&ThreadMgr::display_thread_callback, thread_manager.get(),
573
4
                      std::placeholders::_1, std::placeholders::_2),
574
4
            true);
575
4
}
576
} // namespace doris