Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <bthread/bthread.h> |
21 | | |
22 | | #include <functional> |
23 | | #include <memory> |
24 | | |
25 | | #include "runtime/thread_context.h" |
26 | | |
27 | | namespace doris { |
28 | | |
29 | | // Launch a callable in a background bthread (fire-and-forget). |
30 | | // The callable is heap-allocated and deleted after execution. |
31 | | // Returns true on success, false if bthread creation failed (callable is still deleted). |
32 | | // |
33 | | // init_thread_ctx: when true, initialises the bthread's thread-local ThreadContext via |
34 | | // ScopedInitThreadContext before invoking fn. Set this when the bthread needs memory-tracker |
35 | | // or workload-group context (e.g. attaches an AttachTask inside fn). Defaults to false so |
36 | | // existing call-sites that do not need thread-context initialisation are unaffected. |
37 | | template <typename Fn> |
38 | 12 | bool start_bthread(Fn&& fn, bool init_thread_ctx = false, const bthread_attr_t* attr = nullptr) { |
39 | 12 | struct Args { |
40 | 12 | std::function<void()> fn; |
41 | 12 | bool init_thread_ctx; |
42 | 12 | }; |
43 | 12 | auto args = std::make_unique<Args>(Args {std::forward<Fn>(fn), init_thread_ctx}); |
44 | 12 | auto entry = [](void* arg) -> void* { |
45 | | // Reclaim ownership so Args is deleted when this scope exits. |
46 | 12 | auto a = std::unique_ptr<Args>(reinterpret_cast<Args*>(arg)); |
47 | 12 | std::optional<ScopedInitThreadContext> scoped; |
48 | 12 | if (a->init_thread_ctx) { |
49 | 11 | scoped.emplace(); |
50 | 11 | } |
51 | 12 | a->fn(); |
52 | 12 | return nullptr; |
53 | 12 | }; cached_remote_file_reader.cpp:_ZZN5doris13start_bthreadIZNS_2io22CachedRemoteFileReader20_execute_remote_readERKSt6vectorISt10shared_ptrINS1_9FileBlockEESaIS6_EEmmRSt10unique_ptrIA_cSt14default_deleteISC_EEPNS1_15PeerFetchResultERNS1_14ReadStatisticsEPKNS1_9IOContextEE3$_0EEbOT_bPK14bthread_attr_tENKUlPvE_clESU_ Line | Count | Source | 44 | 1 | auto entry = [](void* arg) -> void* { | 45 | | // Reclaim ownership so Args is deleted when this scope exits. | 46 | 1 | auto a = std::unique_ptr<Args>(reinterpret_cast<Args*>(arg)); | 47 | 1 | std::optional<ScopedInitThreadContext> scoped; | 48 | 1 | if (a->init_thread_ctx) { | 49 | 0 | scoped.emplace(); | 50 | 0 | } | 51 | 1 | a->fn(); | 52 | 1 | return nullptr; | 53 | 1 | }; |
cached_remote_file_reader.cpp:_ZZN5doris13start_bthreadIZNS_2io22CachedRemoteFileReader20_execute_winner_raceERKSt6vectorISt10shared_ptrINS1_9FileBlockEESaIS6_EEmmRSt10unique_ptrIA_cSt14default_deleteISC_EEPNS1_15PeerFetchResultERNS1_14ReadStatisticsEPKNS1_9IOContextERKS3_INS_13PeerCandidateESaISO_EElE3$_0EEbOT_bPK14bthread_attr_tENKUlPvE_clESZ_ Line | Count | Source | 44 | 11 | auto entry = [](void* arg) -> void* { | 45 | | // Reclaim ownership so Args is deleted when this scope exits. | 46 | 11 | auto a = std::unique_ptr<Args>(reinterpret_cast<Args*>(arg)); | 47 | 11 | std::optional<ScopedInitThreadContext> scoped; | 48 | 11 | if (a->init_thread_ctx) { | 49 | 11 | scoped.emplace(); | 50 | 11 | } | 51 | 11 | a->fn(); | 52 | 11 | return nullptr; | 53 | 11 | }; |
|
54 | 12 | bthread_t tid; |
55 | 12 | if (bthread_start_background(&tid, attr, entry, args.get()) != 0) { |
56 | 0 | return false; // args unique_ptr destructs here, no leak |
57 | 0 | } |
58 | 12 | args.release(); // bthread entry now owns the pointer |
59 | 12 | return true; |
60 | 12 | } cached_remote_file_reader.cpp:_ZN5doris13start_bthreadIZNS_2io22CachedRemoteFileReader20_execute_remote_readERKSt6vectorISt10shared_ptrINS1_9FileBlockEESaIS6_EEmmRSt10unique_ptrIA_cSt14default_deleteISC_EEPNS1_15PeerFetchResultERNS1_14ReadStatisticsEPKNS1_9IOContextEE3$_0EEbOT_bPK14bthread_attr_t Line | Count | Source | 38 | 1 | bool start_bthread(Fn&& fn, bool init_thread_ctx = false, const bthread_attr_t* attr = nullptr) { | 39 | 1 | struct Args { | 40 | 1 | std::function<void()> fn; | 41 | 1 | bool init_thread_ctx; | 42 | 1 | }; | 43 | 1 | auto args = std::make_unique<Args>(Args {std::forward<Fn>(fn), init_thread_ctx}); | 44 | 1 | auto entry = [](void* arg) -> void* { | 45 | | // Reclaim ownership so Args is deleted when this scope exits. | 46 | 1 | auto a = std::unique_ptr<Args>(reinterpret_cast<Args*>(arg)); | 47 | 1 | std::optional<ScopedInitThreadContext> scoped; | 48 | 1 | if (a->init_thread_ctx) { | 49 | 1 | scoped.emplace(); | 50 | 1 | } | 51 | 1 | a->fn(); | 52 | 1 | return nullptr; | 53 | 1 | }; | 54 | 1 | bthread_t tid; | 55 | 1 | if (bthread_start_background(&tid, attr, entry, args.get()) != 0) { | 56 | 0 | return false; // args unique_ptr destructs here, no leak | 57 | 0 | } | 58 | 1 | args.release(); // bthread entry now owns the pointer | 59 | 1 | return true; | 60 | 1 | } |
cached_remote_file_reader.cpp:_ZN5doris13start_bthreadIZNS_2io22CachedRemoteFileReader20_execute_winner_raceERKSt6vectorISt10shared_ptrINS1_9FileBlockEESaIS6_EEmmRSt10unique_ptrIA_cSt14default_deleteISC_EEPNS1_15PeerFetchResultERNS1_14ReadStatisticsEPKNS1_9IOContextERKS3_INS_13PeerCandidateESaISO_EElE3$_0EEbOT_bPK14bthread_attr_t Line | Count | Source | 38 | 11 | bool start_bthread(Fn&& fn, bool init_thread_ctx = false, const bthread_attr_t* attr = nullptr) { | 39 | 11 | struct Args { | 40 | 11 | std::function<void()> fn; | 41 | 11 | bool init_thread_ctx; | 42 | 11 | }; | 43 | 11 | auto args = std::make_unique<Args>(Args {std::forward<Fn>(fn), init_thread_ctx}); | 44 | 11 | auto entry = [](void* arg) -> void* { | 45 | | // Reclaim ownership so Args is deleted when this scope exits. | 46 | 11 | auto a = std::unique_ptr<Args>(reinterpret_cast<Args*>(arg)); | 47 | 11 | std::optional<ScopedInitThreadContext> scoped; | 48 | 11 | if (a->init_thread_ctx) { | 49 | 11 | scoped.emplace(); | 50 | 11 | } | 51 | 11 | a->fn(); | 52 | 11 | return nullptr; | 53 | 11 | }; | 54 | 11 | bthread_t tid; | 55 | 11 | if (bthread_start_background(&tid, attr, entry, args.get()) != 0) { | 56 | 0 | return false; // args unique_ptr destructs here, no leak | 57 | 0 | } | 58 | 11 | args.release(); // bthread entry now owns the pointer | 59 | 11 | return true; | 60 | 11 | } |
|
61 | | |
62 | | } // namespace doris |