Coverage Report

Created: 2025-03-11 14:37

/root/doris/be/src/runtime/query_context.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/RuntimeProfile_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <atomic>
26
#include <cstdint>
27
#include <memory>
28
#include <mutex>
29
#include <string>
30
#include <unordered_map>
31
32
#include "common/config.h"
33
#include "common/factory_creator.h"
34
#include "common/object_pool.h"
35
#include "pipeline/dependency.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/memory/mem_tracker_limiter.h"
38
#include "runtime/runtime_filter_mgr.h"
39
#include "runtime/runtime_predicate.h"
40
#include "runtime/workload_management/resource_context.h"
41
#include "util/hash_util.hpp"
42
#include "util/threadpool.h"
43
#include "vec/exec/scan/scanner_scheduler.h"
44
#include "vec/runtime/shared_hash_table_controller.h"
45
#include "workload_group/workload_group.h"
46
47
namespace doris {
48
49
namespace pipeline {
50
class PipelineFragmentContext;
51
class PipelineTask;
52
} // namespace pipeline
53
54
struct ReportStatusRequest {
55
    const Status status;
56
    std::vector<RuntimeState*> runtime_states;
57
    bool done;
58
    TNetworkAddress coord_addr;
59
    TUniqueId query_id;
60
    int fragment_id;
61
    TUniqueId fragment_instance_id;
62
    int backend_num;
63
    RuntimeState* runtime_state;
64
    std::function<void(const Status&)> cancel_fn;
65
};
66
67
enum class QuerySource {
68
    INTERNAL_FRONTEND,
69
    STREAM_LOAD,
70
    GROUP_COMMIT_LOAD,
71
    ROUTINE_LOAD,
72
    EXTERNAL_CONNECTOR
73
};
74
75
const std::string toString(QuerySource query_source);
76
77
// Save the common components of fragments in a query.
78
// Some components like DescriptorTbl may be very large
79
// that will slow down each execution of fragments when DeSer them every time.
80
class DescriptorTbl;
81
class QueryContext : public std::enable_shared_from_this<QueryContext> {
82
    ENABLE_FACTORY_CREATOR(QueryContext);
83
84
public:
85
    class QueryTaskController : public TaskController {
86
        ENABLE_FACTORY_CREATOR(QueryTaskController);
87
88
    public:
89
        static std::unique_ptr<TaskController> create(QueryContext* query_ctx);
90
91
        bool is_cancelled() const override;
92
        Status cancel(const Status& reason, int fragment_id);
93
0
        Status cancel(const Status& reason) override { return cancel(reason, -1); }
94
95
    private:
96
        QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx)
97
11
                : query_ctx_(query_ctx) {}
98
99
        const std::weak_ptr<QueryContext> query_ctx_;
100
    };
101
102
    class QueryMemoryContext : public MemoryContext {
103
        ENABLE_FACTORY_CREATOR(QueryMemoryContext);
104
105
    public:
106
        static std::unique_ptr<MemoryContext> create();
107
108
0
        int64_t revokable_bytes() override {
109
            // TODO
110
0
            return 0;
111
0
        }
112
113
0
        bool ready_do_revoke() override {
114
            // TODO
115
0
            return true;
116
0
        }
117
118
0
        Status revoke(int64_t bytes) override {
119
            // TODO
120
0
            return Status::OK();
121
0
        }
122
123
0
        Status enter_arbitration(Status reason) override {
124
            // TODO, pause the pipeline
125
0
            return Status::OK();
126
0
        }
127
128
0
        Status leave_arbitration(Status reason) override {
129
            // TODO, start pipeline
130
0
            return Status::OK();
131
0
        }
132
133
    private:
134
172
        QueryMemoryContext() = default;
135
    };
136
137
    static std::shared_ptr<QueryContext> create(TUniqueId query_id, ExecEnv* exec_env,
138
                                                const TQueryOptions& query_options,
139
                                                TNetworkAddress coord_addr, bool is_nereids,
140
                                                TNetworkAddress current_connect_fe,
141
                                                QuerySource query_type);
142
143
    // use QueryContext::create, cannot be made private because of ENABLE_FACTORY_CREATOR::create_shared.
144
    QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options,
145
                 TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe,
146
                 QuerySource query_type);
147
148
    ~QueryContext();
149
150
    void init_query_task_controller();
151
152
0
    ExecEnv* exec_env() { return _exec_env; }
153
154
0
    bool is_timeout(timespec now) const {
155
0
        if (_timeout_second <= 0) {
156
0
            return false;
157
0
        }
158
0
        return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
159
0
    }
160
161
0
    void set_thread_token(int concurrency, bool is_serial) {
162
0
        _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
163
0
                is_serial ? ThreadPool::ExecutionMode::SERIAL
164
0
                          : ThreadPool::ExecutionMode::CONCURRENT,
165
0
                concurrency);
166
0
    }
167
168
0
    ThreadPoolToken* get_token() { return _thread_token.get(); }
169
170
    void set_ready_to_execute(Status reason);
171
172
504
    [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
173
174
    void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1);
175
    std::string print_all_pipeline_context();
176
    void set_pipeline_context(const int fragment_id,
177
                              std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
178
    void cancel(Status new_status, int fragment_id = -1);
179
180
5
    [[nodiscard]] Status exec_status() { return _exec_status.status(); }
181
182
    void set_execution_dependency_ready();
183
184
    void set_memory_sufficient(bool sufficient);
185
186
    void set_ready_to_execute_only();
187
188
0
    std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() {
189
0
        return _shared_hash_table_controller;
190
0
    }
191
192
0
    bool has_runtime_predicate(int source_node_id) {
193
0
        return _runtime_predicates.contains(source_node_id);
194
0
    }
195
196
0
    vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) {
197
0
        DCHECK(has_runtime_predicate(source_node_id));
198
0
        return _runtime_predicates.find(source_node_id)->second;
199
0
    }
200
201
0
    void init_runtime_predicates(const std::vector<TTopnFilterDesc>& topn_filter_descs) {
202
0
        for (auto desc : topn_filter_descs) {
203
0
            _runtime_predicates.try_emplace(desc.source_node_id, desc);
204
0
        }
205
0
    }
206
207
    void set_workload_group(WorkloadGroupPtr& wg);
208
209
0
    int execution_timeout() const {
210
0
        return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
211
0
                                                        : _query_options.query_timeout;
212
0
    }
213
214
3
    int32_t runtime_filter_wait_time_ms() const {
215
3
        return _query_options.runtime_filter_wait_time_ms;
216
3
    }
217
218
3
    bool runtime_filter_wait_infinitely() const {
219
3
        return _query_options.__isset.runtime_filter_wait_infinitely &&
220
3
               _query_options.runtime_filter_wait_infinitely;
221
3
    }
222
223
0
    int be_exec_version() const {
224
0
        if (!_query_options.__isset.be_exec_version) {
225
0
            return 0;
226
0
        }
227
0
        return _query_options.be_exec_version;
228
0
    }
229
230
0
    [[nodiscard]] int64_t get_fe_process_uuid() const {
231
0
        return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
232
0
    }
233
234
0
    bool ignore_runtime_filter_error() const {
235
0
        return _query_options.__isset.ignore_runtime_filter_error
236
0
                       ? _query_options.ignore_runtime_filter_error
237
0
                       : false;
238
0
    }
239
240
0
    bool enable_force_spill() const {
241
0
        return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill;
242
0
    }
243
244
    // global runtime filter mgr, the runtime filter have remote target or
245
    // need local merge should regist here. before publish() or push_to_remote()
246
    // the runtime filter should do the local merge work
247
13
    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
248
249
560
    TUniqueId query_id() const { return _query_id; }
250
251
0
    vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
252
253
0
    vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
254
0
        return _remote_scan_task_scheduler;
255
0
    }
256
257
39
    pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }
258
36
    pipeline::Dependency* get_memory_sufficient_dependency() {
259
36
        return _memory_sufficient_dependency.get();
260
36
    }
261
262
    std::vector<pipeline::PipelineTask*> get_revocable_tasks() const;
263
264
    Status revoke_memory();
265
266
    doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
267
268
    ThreadPool* get_memtable_flush_pool();
269
270
    void set_merge_controller_handler(
271
0
            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
272
0
        _merge_controller_handler = handler;
273
0
    }
274
275
0
    bool is_nereids() const { return _is_nereids; }
276
277
425
    WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); }
278
1.46k
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
279
1.46k
        return _resource_ctx->memory_context()->mem_tracker();
280
1.46k
    }
281
282
0
    void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); }
283
284
    void decrease_revoking_tasks_count();
285
286
0
    int get_revoking_tasks_count() const { return _revoking_tasks_count.load(); }
287
288
    void get_revocable_info(size_t* revocable_size, size_t* memory_usage,
289
                            bool* has_running_task) const;
290
    size_t get_revocable_size() const;
291
292
    // This method is called by workload group manager to set query's memlimit using slot
293
    // If user set query limit explicitly, then should use less one
294
205
    void set_mem_limit(int64_t new_mem_limit) {
295
205
        _resource_ctx->memory_context()->mem_tracker()->set_limit(new_mem_limit);
296
205
    }
297
298
510
    int64_t get_mem_limit() const {
299
510
        return _resource_ctx->memory_context()->mem_tracker()->limit();
300
510
    }
301
302
    // The new memlimit should be less than user set memlimit.
303
202
    void set_adjusted_mem_limit(int64_t new_mem_limit) {
304
202
        _adjusted_mem_limit = std::min<int64_t>(new_mem_limit, _user_set_mem_limit);
305
202
    }
306
307
    // Expected mem limit is the limit when workload group reached limit.
308
304
    int64_t adjusted_mem_limit() { return _adjusted_mem_limit; }
309
310
603
    MemTrackerLimiter* get_mem_tracker() {
311
603
        return _resource_ctx->memory_context()->mem_tracker().get();
312
603
    }
313
314
404
    int32_t get_slot_count() const {
315
404
        return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1;
316
404
    }
317
318
    DescriptorTbl* desc_tbl = nullptr;
319
    bool set_rsc_info = false;
320
    std::string user;
321
    std::string group;
322
    TNetworkAddress coord_addr;
323
    TNetworkAddress current_connect_fe;
324
    TQueryGlobals query_globals;
325
326
    ObjectPool obj_pool;
327
328
112
    std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
329
330
    std::vector<TUniqueId> fragment_instance_ids;
331
332
    // plan node id -> TFileScanRangeParams
333
    // only for file scan node
334
    std::map<int, TFileScanRangeParams> file_scan_range_params_map;
335
336
    void add_using_brpc_stub(const TNetworkAddress& network_address,
337
0
                             std::shared_ptr<PBackendService_Stub> brpc_stub) {
338
0
        if (network_address.port == 0) {
339
0
            return;
340
0
        }
341
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
342
0
        if (!_using_brpc_stubs.contains(network_address)) {
343
0
            _using_brpc_stubs.emplace(network_address, brpc_stub);
344
0
        }
345
346
0
        DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
347
0
    }
348
349
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
350
0
    get_using_brpc_stubs() {
351
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
352
0
        return _using_brpc_stubs;
353
0
    }
354
355
10
    void set_low_memory_mode() { _low_memory_mode = true; }
356
357
26
    bool low_memory_mode() { return _low_memory_mode; }
358
359
2
    void disable_reserve_memory() { _enable_reserve_memory = false; }
360
361
11
    bool enable_reserve_memory() const {
362
11
        return _query_options.__isset.enable_reserve_memory &&
363
11
               _query_options.enable_reserve_memory && _enable_reserve_memory;
364
11
    }
365
366
10
    void update_paused_reason(const Status& st) {
367
10
        std::lock_guard l(_paused_mutex);
368
10
        if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
369
0
            return;
370
10
        } else if (_paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) {
371
0
            if (st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) {
372
0
                _paused_reason = st;
373
0
                return;
374
0
            } else {
375
0
                return;
376
0
            }
377
10
        } else {
378
10
            _paused_reason = st;
379
10
        }
380
10
    }
381
382
915
    Status paused_reason() {
383
915
        std::lock_guard l(_paused_mutex);
384
915
        return _paused_reason;
385
915
    }
386
387
576
    bool is_pure_load_task() {
388
576
        return _query_source == QuerySource::STREAM_LOAD ||
389
576
               _query_source == QuerySource::ROUTINE_LOAD ||
390
576
               _query_source == QuerySource::GROUP_COMMIT_LOAD;
391
576
    }
392
393
    std::string debug_string();
394
395
private:
396
    int _timeout_second;
397
    TUniqueId _query_id;
398
    ExecEnv* _exec_env = nullptr;
399
    MonotonicStopWatch _query_watcher;
400
    bool _is_nereids = false;
401
402
    std::shared_ptr<ResourceContext> _resource_ctx;
403
404
    std::mutex _revoking_tasks_mutex;
405
    std::atomic<int> _revoking_tasks_count = 0;
406
407
    // A token used to submit olap scanner to the "_limited_scan_thread_pool",
408
    // This thread pool token is created from "_limited_scan_thread_pool" from exec env.
409
    // And will be shared by all instances of this query.
410
    // So that we can control the max thread that a query can be used to execute.
411
    // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
412
    std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
413
414
    void _init_resource_context();
415
    void _init_query_mem_tracker();
416
417
    std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
418
    std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
419
420
    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
421
    const TQueryOptions _query_options;
422
423
    // All pipeline tasks use the same query context to report status. So we need a `_exec_status`
424
    // to report the real message if failed.
425
    AtomicStatus _exec_status;
426
427
    doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
428
    vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
429
    ThreadPool* _memtable_flush_pool = nullptr;
430
    vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
431
    // This dependency indicates if the 2nd phase RPC received from FE.
432
    std::unique_ptr<pipeline::Dependency> _execution_dependency;
433
    // This dependency indicates if memory is sufficient to execute.
434
    std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency;
435
436
    // This shared ptr is never used. It is just a reference to hold the object.
437
    // There is a weak ptr in runtime filter manager to reference this object.
438
    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
439
440
    std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
441
    std::mutex _pipeline_map_write_lock;
442
443
    std::mutex _paused_mutex;
444
    Status _paused_reason;
445
    std::atomic<int64_t> _paused_count = 0;
446
    std::atomic<bool> _low_memory_mode = false;
447
    std::atomic<bool> _enable_reserve_memory = true;
448
    int64_t _user_set_mem_limit = 0;
449
    std::atomic<int64_t> _adjusted_mem_limit = 0;
450
451
    std::mutex _profile_mutex;
452
    timespec _query_arrival_timestamp;
453
    // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
454
    // help us manage the query.
455
    QuerySource _query_source;
456
457
    std::mutex _brpc_stubs_mutex;
458
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs;
459
460
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
461
    // flatten profile of one fragment:
462
    // Pipeline 0
463
    //      PipelineTask 0
464
    //              Operator 1
465
    //              Operator 2
466
    //              Scanner
467
    //      PipelineTask 1
468
    //              Operator 1
469
    //              Operator 2
470
    //              Scanner
471
    // Pipeline 1
472
    //      PipelineTask 2
473
    //              Operator 3
474
    //      PipelineTask 3
475
    //              Operator 3
476
    // fragment_id -> list<profile>
477
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map;
478
    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;
479
480
    void _report_query_profile();
481
482
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
483
    _collect_realtime_query_profile() const;
484
485
public:
486
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
487
    void add_fragment_profile(
488
            int fragment_id,
489
            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile,
490
            std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
491
492
    TReportExecStatusParams get_realtime_exec_status() const;
493
494
172
    bool enable_profile() const {
495
172
        return _query_options.__isset.enable_profile && _query_options.enable_profile;
496
172
    }
497
498
0
    timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
499
0
    QuerySource get_query_source() const { return this->_query_source; }
500
};
501
502
} // namespace doris