Coverage Report

Created: 2026-04-01 16:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/query_context.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/RuntimeProfile_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <atomic>
26
#include <cstdint>
27
#include <memory>
28
#include <mutex>
29
#include <string>
30
#include <unordered_map>
31
32
#include "common/config.h"
33
#include "common/factory_creator.h"
34
#include "common/object_pool.h"
35
#include "common/status.h"
36
#include "exec/common/memory.h"
37
#include "exec/runtime_filter/runtime_filter_mgr.h"
38
#include "exec/scan/scanner_scheduler.h"
39
#include "runtime/exec_env.h"
40
#include "runtime/memory/mem_tracker_limiter.h"
41
#include "runtime/runtime_predicate.h"
42
#include "runtime/workload_group/workload_group.h"
43
#include "runtime/workload_management/resource_context.h"
44
#include "util/hash_util.hpp"
45
#include "util/threadpool.h"
46
47
namespace doris {
48
49
class PipelineFragmentContext;
50
class PipelineTask;
51
class Dependency;
52
class RecCTEScanLocalState;
53
54
struct ReportStatusRequest {
55
    const Status status;
56
    std::vector<RuntimeState*> runtime_states;
57
    bool done;
58
    TNetworkAddress coord_addr;
59
    TUniqueId query_id;
60
    int fragment_id;
61
    TUniqueId fragment_instance_id;
62
    int backend_num;
63
    RuntimeState* runtime_state;
64
    std::string load_error_url;
65
    std::string first_error_msg;
66
    std::function<void(const Status&)> cancel_fn;
67
};
68
69
enum class QuerySource {
70
    INTERNAL_FRONTEND,
71
    STREAM_LOAD,
72
    GROUP_COMMIT_LOAD,
73
    ROUTINE_LOAD,
74
    EXTERNAL_CONNECTOR
75
};
76
77
const std::string toString(QuerySource query_source);
78
79
// Save the common components of fragments in a query.
80
// Some components like DescriptorTbl may be very large
81
// that will slow down each execution of fragments when DeSer them every time.
82
class DescriptorTbl;
83
class QueryContext : public std::enable_shared_from_this<QueryContext> {
84
    ENABLE_FACTORY_CREATOR(QueryContext);
85
86
public:
87
    static std::shared_ptr<QueryContext> create(TUniqueId query_id, ExecEnv* exec_env,
88
                                                const TQueryOptions& query_options,
89
                                                TNetworkAddress coord_addr, bool is_nereids,
90
                                                TNetworkAddress current_connect_fe,
91
                                                QuerySource query_type);
92
93
    // use QueryContext::create, cannot be made private because of ENABLE_FACTORY_CREATOR::create_shared.
94
    QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options,
95
                 TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe,
96
                 QuerySource query_type);
97
98
    ~QueryContext();
99
100
    void init_query_task_controller();
101
102
16.8k
    ExecEnv* exec_env() const { return _exec_env; }
103
104
53.5k
    bool is_timeout(timespec now) const {
105
53.5k
        if (_timeout_second <= 0) {
106
0
            return false;
107
0
        }
108
53.5k
        return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
109
53.5k
    }
110
111
372k
    bool is_single_backend_query() const { return _is_single_backend_query; }
112
113
428k
    void set_single_backend_query(bool is_single_backend_query) {
114
428k
        _is_single_backend_query = is_single_backend_query;
115
428k
    }
116
117
0
    int64_t get_remaining_query_time_seconds() const {
118
0
        timespec now;
119
0
        clock_gettime(CLOCK_MONOTONIC, &now);
120
0
        if (is_timeout(now)) {
121
0
            return -1;
122
0
        }
123
0
        int64_t elapsed_seconds = _query_watcher.elapsed_time_seconds(now);
124
0
        return _timeout_second - elapsed_seconds;
125
0
    }
126
127
    void set_ready_to_execute(Status reason);
128
129
50.7M
    [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
130
131
    void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1);
132
    std::string print_all_pipeline_context();
133
    void set_pipeline_context(const int fragment_id,
134
                              std::shared_ptr<PipelineFragmentContext> pip_ctx);
135
    void cancel(Status new_status, int fragment_id = -1);
136
137
868k
    [[nodiscard]] Status exec_status() { return _exec_status.status(); }
138
139
    void set_execution_dependency_ready();
140
141
    void set_memory_sufficient(bool sufficient);
142
143
    void set_ready_to_execute_only();
144
145
295k
    bool has_runtime_predicate(int source_node_id) {
146
295k
        return _runtime_predicates.contains(source_node_id);
147
295k
    }
148
149
53.4k
    RuntimePredicate& get_runtime_predicate(int source_node_id) {
150
53.4k
        DCHECK(has_runtime_predicate(source_node_id));
151
53.4k
        return _runtime_predicates.find(source_node_id)->second;
152
53.4k
    }
153
154
1.90k
    void init_runtime_predicates(const std::vector<TTopnFilterDesc>& topn_filter_descs) {
155
1.92k
        for (auto desc : topn_filter_descs) {
156
1.92k
            _runtime_predicates.try_emplace(desc.source_node_id, desc);
157
1.92k
        }
158
1.90k
    }
159
160
    Status set_workload_group(WorkloadGroupPtr& wg);
161
162
60.5k
    int execution_timeout() const {
163
60.6k
        return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
164
18.4E
                                                        : _query_options.query_timeout;
165
60.5k
    }
166
167
3.94k
    int32_t runtime_filter_wait_time_ms() const {
168
3.94k
        return _query_options.runtime_filter_wait_time_ms;
169
3.94k
    }
170
171
0
    int be_exec_version() const {
172
0
        if (!_query_options.__isset.be_exec_version) {
173
0
            return 0;
174
0
        }
175
0
        return _query_options.be_exec_version;
176
0
    }
177
178
53.5k
    [[nodiscard]] int64_t get_fe_process_uuid() const {
179
53.5k
        return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
180
53.5k
    }
181
182
2.28k
    bool ignore_runtime_filter_error() const {
183
2.28k
        return _query_options.__isset.ignore_runtime_filter_error
184
2.28k
                       ? _query_options.ignore_runtime_filter_error
185
2.28k
                       : false;
186
2.28k
    }
187
188
0
    bool enable_force_spill() const {
189
0
        return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill;
190
0
    }
191
15.9M
    const TQueryOptions& query_options() const { return _query_options; }
192
6.72k
    bool should_be_shuffled_agg(int node_id) const {
193
6.72k
        return _query_options.__isset.shuffled_agg_ids &&
194
6.72k
               std::any_of(_query_options.shuffled_agg_ids.begin(),
195
6.72k
                           _query_options.shuffled_agg_ids.end(),
196
6.72k
                           [&](const int id) -> bool { return id == node_id; });
197
6.72k
    }
198
199
    // global runtime filter mgr, the runtime filter have remote target or
200
    // need local merge should regist here. before publish() or push_to_remote()
201
    // the runtime filter should do the local merge work
202
501k
    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
203
204
801k
    TUniqueId query_id() const { return _query_id; }
205
206
543k
    ScannerScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
207
208
30.5k
    ScannerScheduler* get_remote_scan_scheduler() { return _remote_scan_task_scheduler; }
209
210
2.01M
    Dependency* get_execution_dependency() { return _execution_dependency.get(); }
211
2.94M
    Dependency* get_memory_sufficient_dependency() { return _memory_sufficient_dependency.get(); }
212
213
    doris::TaskScheduler* get_pipe_exec_scheduler();
214
215
    void set_merge_controller_handler(
216
283k
            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
217
283k
        _merge_controller_handler = handler;
218
283k
    }
219
5.71k
    std::shared_ptr<RuntimeFilterMergeControllerEntity> get_merge_controller_handler() const {
220
5.71k
        return _merge_controller_handler;
221
5.71k
    }
222
223
12.2M
    bool is_nereids() const { return _is_nereids; }
224
241k
    std::shared_ptr<MemShareArbitrator> mem_arb() const { return _mem_arb; }
225
226
6.62M
    WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); }
227
12.5M
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
228
12.5M
        DCHECK(_resource_ctx->memory_context()->mem_tracker() != nullptr);
229
12.5M
        return _resource_ctx->memory_context()->mem_tracker();
230
12.5M
    }
231
232
860k
    int32_t get_slot_count() const {
233
860k
        return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1;
234
860k
    }
235
236
    DescriptorTbl* desc_tbl = nullptr;
237
    bool set_rsc_info = false;
238
    std::string user;
239
    std::string group;
240
    TNetworkAddress coord_addr;
241
    TNetworkAddress current_connect_fe;
242
    TQueryGlobals query_globals;
243
943
    const TQueryGlobals get_query_globals() const { return query_globals; }
244
245
    ObjectPool obj_pool;
246
247
37.0M
    std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
248
249
    // plan node id -> TFileScanRangeParams
250
    // only for file scan node
251
    std::map<int, TFileScanRangeParams> file_scan_range_params_map;
252
253
    void add_using_brpc_stub(const TNetworkAddress& network_address,
254
1.63M
                             std::shared_ptr<PBackendService_Stub> brpc_stub) {
255
1.63M
        if (network_address.port == 0) {
256
99
            return;
257
99
        }
258
1.63M
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
259
1.63M
        if (!_using_brpc_stubs.contains(network_address)) {
260
44.1k
            _using_brpc_stubs.emplace(network_address, brpc_stub);
261
44.1k
        }
262
263
1.63M
        DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
264
1.63M
    }
265
266
355k
    void set_ai_resources(std::map<std::string, TAIResource> ai_resources) {
267
355k
        _ai_resources =
268
355k
                std::make_shared<std::map<std::string, TAIResource>>(std::move(ai_resources));
269
355k
    }
270
271
63
    const std::shared_ptr<std::map<std::string, TAIResource>>& get_ai_resources() const {
272
63
        return _ai_resources;
273
63
    }
274
275
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
276
43.8k
    get_using_brpc_stubs() {
277
43.8k
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
278
43.8k
        return _using_brpc_stubs;
279
43.8k
    }
280
281
0
    void set_low_memory_mode() {
282
        // will not return from low memory mode to non-low memory mode.
283
0
        _resource_ctx->task_controller()->set_low_memory_mode(true);
284
0
    }
285
16.3M
    bool low_memory_mode() { return _resource_ctx->task_controller()->low_memory_mode(); }
286
287
2.27M
    bool is_pure_load_task() {
288
2.27M
        return _query_source == QuerySource::STREAM_LOAD ||
289
2.27M
               _query_source == QuerySource::ROUTINE_LOAD ||
290
2.27M
               _query_source == QuerySource::GROUP_COMMIT_LOAD;
291
2.27M
    }
292
293
    void set_load_error_url(std::string error_url);
294
    std::string get_load_error_url();
295
    void set_first_error_msg(std::string error_msg);
296
    std::string get_first_error_msg();
297
298
    Status send_block_to_cte_scan(const TUniqueId& instance_id, int node_id,
299
                                  const google::protobuf::RepeatedPtrField<doris::PBlock>& pblocks,
300
                                  bool eos);
301
    void registe_cte_scan(const TUniqueId& instance_id, int node_id, RecCTEScanLocalState* scan);
302
    void deregiste_cte_scan(const TUniqueId& instance_id, int node_id);
303
304
0
    std::vector<int> get_fragment_ids() {
305
0
        std::vector<int> fragment_ids;
306
0
        for (const auto& it : _fragment_id_to_pipeline_ctx) {
307
0
            fragment_ids.push_back(it.first);
308
0
        }
309
0
        return fragment_ids;
310
0
    }
311
312
    Status reset_global_rf(const google::protobuf::RepeatedField<int32_t>& filter_ids);
313
314
private:
315
    friend class QueryTaskController;
316
317
    int _timeout_second;
318
    TUniqueId _query_id;
319
    ExecEnv* _exec_env = nullptr;
320
    MonotonicStopWatch _query_watcher;
321
    bool _is_nereids = false;
322
323
    std::shared_ptr<ResourceContext> _resource_ctx;
324
325
    void _init_resource_context();
326
    void _init_query_mem_tracker();
327
328
    std::unordered_map<int, RuntimePredicate> _runtime_predicates;
329
330
    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
331
    const TQueryOptions _query_options;
332
333
    // All pipeline tasks use the same query context to report status. So we need a `_exec_status`
334
    // to report the real message if failed.
335
    AtomicStatus _exec_status;
336
337
    doris::TaskScheduler* _task_scheduler = nullptr;
338
    ScannerScheduler* _scan_task_scheduler = nullptr;
339
    ScannerScheduler* _remote_scan_task_scheduler = nullptr;
340
    // This dependency indicates if the 2nd phase RPC received from FE.
341
    std::unique_ptr<Dependency> _execution_dependency;
342
    // This dependency indicates if memory is sufficient to execute.
343
    std::unique_ptr<Dependency> _memory_sufficient_dependency;
344
345
    // This shared ptr is never used. It is just a reference to hold the object.
346
    // There is a weak ptr in runtime filter manager to reference this object.
347
    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
348
349
    std::map<int, std::weak_ptr<PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
350
    std::mutex _pipeline_map_write_lock;
351
352
    std::mutex _profile_mutex;
353
    timespec _query_arrival_timestamp;
354
    // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
355
    // help us manage the query.
356
    QuerySource _query_source;
357
358
    std::mutex _brpc_stubs_mutex;
359
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs;
360
361
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
362
    // flatten profile of one fragment:
363
    // Pipeline 0
364
    //      PipelineTask 0
365
    //              Operator 1
366
    //              Operator 2
367
    //              Scanner
368
    //      PipelineTask 1
369
    //              Operator 1
370
    //              Operator 2
371
    //              Scanner
372
    // Pipeline 1
373
    //      PipelineTask 2
374
    //              Operator 3
375
    //      PipelineTask 3
376
    //              Operator 3
377
    // fragment_id -> list<profile>
378
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map;
379
    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;
380
381
    std::shared_ptr<std::map<std::string, TAIResource>> _ai_resources;
382
383
    void _report_query_profile();
384
385
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
386
    _collect_realtime_query_profile();
387
388
    std::mutex _error_url_lock;
389
    std::string _load_error_url;
390
    std::string _first_error_msg;
391
392
    bool _is_single_backend_query = false;
393
394
    // file cache context holders
395
    std::vector<io::BlockFileCache::QueryFileCacheContextHolderPtr> _query_context_holders;
396
    // instance id + node id -> cte scan
397
    std::map<std::pair<TUniqueId, int>, RecCTEScanLocalState*> _cte_scan;
398
    std::mutex _cte_scan_lock;
399
    std::shared_ptr<MemShareArbitrator> _mem_arb = nullptr;
400
401
public:
402
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
403
    void add_fragment_profile(
404
            int fragment_id,
405
            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile,
406
            std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
407
408
    TReportExecStatusParams get_realtime_exec_status();
409
410
839k
    bool enable_profile() const {
411
839k
        return _query_options.__isset.enable_profile && _query_options.enable_profile;
412
839k
    }
413
414
0
    timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
415
6.69k
    QuerySource get_query_source() const { return this->_query_source; }
416
417
818k
    TQueryOptions get_query_options() const { return _query_options; }
418
};
419
420
} // namespace doris