Coverage Report

Created: 2026-04-24 12:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/thread_context.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/thread_context.h"
19
20
#include "common/signal_handler.h"
21
#include "runtime/query_context.h"
22
#include "runtime/runtime_state.h"
23
24
namespace doris {
25
class MemTracker;
26
27
14.6M
void AttachTask::init(const std::shared_ptr<ResourceContext>& rc) {
28
    // Validate the ResourceContext chain before mutating any thread-local
29
    // or signal state. If any link is null we throw immediately, so the
30
    // caller's stack-unwind sees a clean state (no thread-local handle is
31
    // acquired and no signal task id is set). Without these the previous
32
    // code would silently propagate a null mem_tracker into
33
    // ThreadMemTrackerMgr and crash much later inside the allocator.
34
14.6M
    if (UNLIKELY(rc == nullptr)) {
35
0
        throw Exception(
36
0
                Status::FatalError("AttachTask::init: rc is null. signal_query_id={:x}-{:x}",
37
0
                                   signal::query_id_hi, signal::query_id_lo));
38
0
    }
39
14.6M
    if (UNLIKELY(rc->memory_context() == nullptr)) {
40
0
        throw Exception(Status::FatalError(
41
0
                "AttachTask::init: rc->memory_context() is null. signal_query_id={:x}-{:x}",
42
0
                signal::query_id_hi, signal::query_id_lo));
43
0
    }
44
14.6M
    if (UNLIKELY(rc->memory_context()->mem_tracker() == nullptr)) {
45
0
        throw Exception(Status::FatalError(
46
0
                "AttachTask::init: rc->memory_context()->mem_tracker() is null. "
47
0
                "ResourceContext was created but set_mem_tracker has not been called yet "
48
0
                "(likely a half-initialized QueryContext used before _init_query_mem_tracker). "
49
0
                "signal_query_id={:x}-{:x}",
50
0
                signal::query_id_hi, signal::query_id_lo));
51
0
    }
52
14.6M
    if (UNLIKELY(rc->task_controller() == nullptr)) {
53
0
        throw Exception(Status::FatalError(
54
0
                "AttachTask::init: rc->task_controller() is null. signal_query_id={:x}-{:x}",
55
0
                signal::query_id_hi, signal::query_id_lo));
56
0
    }
57
14.6M
    ThreadLocalHandle::create_thread_local_if_not_exits();
58
14.6M
    signal::set_signal_task_id(rc->task_controller()->task_id());
59
14.6M
    thread_context()->attach_task(rc);
60
14.6M
}
61
62
725k
AttachTask::AttachTask(const std::shared_ptr<ResourceContext>& rc) {
63
725k
    init(rc);
64
725k
}
65
66
2.44M
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
67
    // if parameter is `orphan_mem_tracker`, if you do not switch thraed mem tracker afterwards,
68
    // alloc or free memory from Allocator will fail DCHECK. unless you know for sure that
69
    // the thread will not alloc or free memory from Allocator later.
70
    //
71
    // Validate before constructing the ResourceContext: MemoryContext::set_mem_tracker()
72
    // immediately dereferences mem_tracker->limit(), so a null shared_ptr would
73
    // crash there before reaching init()'s diagnostics.
74
2.44M
    if (UNLIKELY(mem_tracker == nullptr)) {
75
0
        throw Exception(Status::FatalError(
76
0
                "AttachTask(MemTrackerLimiter): mem_tracker is null. signal_query_id={:x}-{:x}",
77
0
                signal::query_id_hi, signal::query_id_lo));
78
0
    }
79
2.44M
    std::shared_ptr<ResourceContext> rc = ResourceContext::create_shared();
80
2.44M
    rc->memory_context()->set_mem_tracker(mem_tracker);
81
2.44M
    init(rc);
82
2.44M
}
83
84
11.1M
AttachTask::AttachTask(RuntimeState* runtime_state) {
85
    // Walk the chain `runtime_state -> get_query_ctx() -> resource_ctx()`
86
    // step by step so that an unexpected null pinpoints exactly which link
87
    // failed instead of crashing with a generic NPE inside attach_task() or
88
    // even later inside the allocator.
89
11.1M
    if (UNLIKELY(runtime_state == nullptr)) {
90
0
        throw Exception(Status::FatalError(
91
0
                "AttachTask(RuntimeState*): runtime_state is null. signal_query_id={:x}-{:x}",
92
0
                signal::query_id_hi, signal::query_id_lo));
93
0
    }
94
11.1M
    if (UNLIKELY(runtime_state->get_query_ctx() == nullptr)) {
95
0
        throw Exception(Status::FatalError(
96
0
                "AttachTask(RuntimeState*): runtime_state->get_query_ctx() is null. "
97
0
                "signal_query_id={:x}-{:x}",
98
0
                signal::query_id_hi, signal::query_id_lo));
99
0
    }
100
11.1M
    if (UNLIKELY(runtime_state->get_query_ctx()->resource_ctx() == nullptr)) {
101
0
        throw Exception(
102
0
                Status::FatalError("AttachTask(RuntimeState*): query_ctx->resource_ctx() is null. "
103
0
                                   "signal_query_id={:x}-{:x}",
104
0
                                   signal::query_id_hi, signal::query_id_lo));
105
0
    }
106
11.1M
    signal::set_signal_is_nereids(runtime_state->is_nereids());
107
11.1M
    init(runtime_state->get_query_ctx()->resource_ctx());
108
11.1M
}
109
110
267k
AttachTask::AttachTask(QueryContext* query_ctx) {
111
267k
    if (UNLIKELY(query_ctx == nullptr)) {
112
0
        throw Exception(Status::FatalError(
113
0
                "AttachTask(QueryContext*): query_ctx is null. signal_query_id={:x}-{:x}",
114
0
                signal::query_id_hi, signal::query_id_lo));
115
0
    }
116
267k
    init(query_ctx->resource_ctx());
117
267k
}
118
119
14.5M
AttachTask::~AttachTask() {
120
14.5M
    signal::set_signal_task_id(TUniqueId());
121
14.5M
    thread_context()->detach_task();
122
14.5M
    ThreadLocalHandle::del_thread_local_if_count_is_zero();
123
14.5M
}
124
125
0
SwitchResourceContext::SwitchResourceContext(const std::shared_ptr<ResourceContext>& rc) {
126
0
    DCHECK(rc != nullptr);
127
    // Validate the chain before mutating any thread-local or signal state,
128
    // symmetric to AttachTask::init(). Throwing after the thread-local
129
    // handle was acquired or the signal task id was set would skip this
130
    // object's destructor (because construction failed) and leak the
131
    // handle / leave a stale signal task id behind.
132
0
    if (UNLIKELY(rc == nullptr)) {
133
0
        throw Exception(
134
0
                Status::FatalError("SwitchResourceContext: rc is null. signal_query_id={:x}-{:x}",
135
0
                                   signal::query_id_hi, signal::query_id_lo));
136
0
    }
137
0
    if (UNLIKELY(rc->memory_context() == nullptr)) {
138
0
        throw Exception(Status::FatalError(
139
0
                "SwitchResourceContext: rc->memory_context() is null. signal_query_id={:x}-{:x}",
140
0
                signal::query_id_hi, signal::query_id_lo));
141
0
    }
142
0
    if (UNLIKELY(rc->memory_context()->mem_tracker() == nullptr)) {
143
0
        throw Exception(Status::FatalError(
144
0
                "SwitchResourceContext: rc->memory_context()->mem_tracker() is null. "
145
0
                "ResourceContext was switched in before _init_query_mem_tracker ran. "
146
0
                "signal_query_id={:x}-{:x}",
147
0
                signal::query_id_hi, signal::query_id_lo));
148
0
    }
149
0
    if (UNLIKELY(rc->task_controller() == nullptr)) {
150
0
        throw Exception(Status::FatalError(
151
0
                "SwitchResourceContext: rc->task_controller() is null. signal_query_id={:x}-{:x}",
152
0
                signal::query_id_hi, signal::query_id_lo));
153
0
    }
154
0
    doris::ThreadLocalHandle::create_thread_local_if_not_exits();
155
0
    DCHECK(thread_context()->is_attach_task());
156
0
    old_resource_ctx_ = thread_context()->resource_ctx();
157
0
    if (rc != old_resource_ctx_) {
158
0
        signal::set_signal_task_id(rc->task_controller()->task_id());
159
0
        thread_context()->resource_ctx_ = rc;
160
0
        thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
161
0
                rc->memory_context()->mem_tracker(), rc->workload_group());
162
0
    }
163
0
}
164
165
0
SwitchResourceContext::~SwitchResourceContext() {
166
0
    if (old_resource_ctx_ != thread_context()->resource_ctx()) {
167
0
        DCHECK(old_resource_ctx_ != nullptr);
168
0
        signal::set_signal_task_id(old_resource_ctx_->task_controller()->task_id());
169
0
        thread_context()->resource_ctx_ = old_resource_ctx_;
170
0
        thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker();
171
0
    }
172
0
    doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
173
0
}
174
175
SwitchThreadMemTrackerLimiter::SwitchThreadMemTrackerLimiter(
176
16.0M
        const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
177
16.0M
    DCHECK(mem_tracker);
178
    // Third entry point that calls attach_limiter_tracker(). Without this
179
    // null guard a null mem_tracker silently propagates and the next
180
    // allocation on this thread would NPE deep inside the allocator. Throw
181
    // before acquiring the thread-local handle / doing any side effect so
182
    // the destructor (which is noexcept) never runs in a dirty state.
183
16.0M
    if (UNLIKELY(mem_tracker == nullptr)) {
184
0
        throw Exception(Status::FatalError(
185
0
                "SwitchThreadMemTrackerLimiter: mem_tracker is null. signal_query_id={:x}-{:x}",
186
0
                signal::query_id_hi, signal::query_id_lo));
187
0
    }
188
16.0M
    doris::ThreadLocalHandle::create_thread_local_if_not_exits();
189
16.0M
    if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker_sptr()) {
190
4.50M
        thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
191
4.50M
        is_switched_ = true;
192
4.50M
    }
193
16.0M
}
194
195
16.6M
SwitchThreadMemTrackerLimiter::~SwitchThreadMemTrackerLimiter() {
196
16.6M
    if (is_switched_) {
197
5.10M
        thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker();
198
5.10M
    }
199
16.6M
    doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
200
16.6M
}
201
202
1.59M
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) {
203
1.59M
    ThreadLocalHandle::create_thread_local_if_not_exits();
204
1.59M
    if (mem_tracker) {
205
1.59M
        _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
206
1.59M
    }
207
1.59M
}
208
209
AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
210
        const std::shared_ptr<MemTracker>& mem_tracker)
211
391k
        : _mem_tracker(mem_tracker) {
212
391k
    ThreadLocalHandle::create_thread_local_if_not_exits();
213
391k
    if (_mem_tracker) {
214
391k
        _need_pop =
215
391k
                thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
216
391k
    }
217
391k
}
218
219
1.98M
AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
220
1.98M
    if (_need_pop) {
221
1.98M
        thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
222
1.98M
    }
223
1.98M
    ThreadLocalHandle::del_thread_local_if_count_is_zero();
224
1.98M
}
225
226
} // namespace doris