Coverage Report

Created: 2025-04-25 20:12

/root/doris/be/src/runtime/query_context.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/RuntimeProfile_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <atomic>
26
#include <cstdint>
27
#include <memory>
28
#include <mutex>
29
#include <string>
30
#include <unordered_map>
31
32
#include "common/config.h"
33
#include "common/factory_creator.h"
34
#include "common/object_pool.h"
35
#include "pipeline/dependency.h"
36
#include "runtime/exec_env.h"
37
#include "runtime/memory/mem_tracker_limiter.h"
38
#include "runtime/runtime_predicate.h"
39
#include "runtime/workload_management/resource_context.h"
40
#include "runtime_filter/runtime_filter_mgr.h"
41
#include "util/hash_util.hpp"
42
#include "util/threadpool.h"
43
#include "vec/exec/scan/scanner_scheduler.h"
44
#include "workload_group/workload_group.h"
45
46
namespace doris {
47
48
namespace pipeline {
49
class PipelineFragmentContext;
50
class PipelineTask;
51
} // namespace pipeline
52
53
struct ReportStatusRequest {
54
    const Status status;
55
    std::vector<RuntimeState*> runtime_states;
56
    bool done;
57
    TNetworkAddress coord_addr;
58
    TUniqueId query_id;
59
    int fragment_id;
60
    TUniqueId fragment_instance_id;
61
    int backend_num;
62
    RuntimeState* runtime_state;
63
    std::string load_error_url;
64
    std::function<void(const Status&)> cancel_fn;
65
};
66
67
enum class QuerySource {
68
    INTERNAL_FRONTEND,
69
    STREAM_LOAD,
70
    GROUP_COMMIT_LOAD,
71
    ROUTINE_LOAD,
72
    EXTERNAL_CONNECTOR
73
};
74
75
const std::string toString(QuerySource query_source);
76
77
// Save the common components of fragments in a query.
78
// Some components like DescriptorTbl may be very large
79
// that will slow down each execution of fragments when DeSer them every time.
80
class DescriptorTbl;
81
class QueryContext : public std::enable_shared_from_this<QueryContext> {
82
    ENABLE_FACTORY_CREATOR(QueryContext);
83
84
public:
85
    static std::shared_ptr<QueryContext> create(TUniqueId query_id, ExecEnv* exec_env,
86
                                                const TQueryOptions& query_options,
87
                                                TNetworkAddress coord_addr, bool is_nereids,
88
                                                TNetworkAddress current_connect_fe,
89
                                                QuerySource query_type);
90
91
    // use QueryContext::create, cannot be made private because of ENABLE_FACTORY_CREATOR::create_shared.
92
    QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options,
93
                 TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe,
94
                 QuerySource query_type);
95
96
    ~QueryContext();
97
98
    void init_query_task_controller();
99
100
0
    ExecEnv* exec_env() const { return _exec_env; }
101
102
0
    bool is_timeout(timespec now) const {
103
0
        if (_timeout_second <= 0) {
104
0
            return false;
105
0
        }
106
0
        return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
107
0
    }
108
109
0
    void set_thread_token(int concurrency, bool is_serial) {
110
0
        _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
111
0
                is_serial ? ThreadPool::ExecutionMode::SERIAL
112
0
                          : ThreadPool::ExecutionMode::CONCURRENT,
113
0
                concurrency);
114
0
    }
115
116
0
    ThreadPoolToken* get_token() { return _thread_token.get(); }
117
118
    void set_ready_to_execute(Status reason);
119
120
1.45M
    [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
121
122
    void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1);
123
    std::string print_all_pipeline_context();
124
    void set_pipeline_context(const int fragment_id,
125
                              std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
126
    void cancel(Status new_status, int fragment_id = -1);
127
128
20
    [[nodiscard]] Status exec_status() { return _exec_status.status(); }
129
130
    void set_execution_dependency_ready();
131
132
    void set_memory_sufficient(bool sufficient);
133
134
    void set_ready_to_execute_only();
135
136
22
    bool has_runtime_predicate(int source_node_id) {
137
22
        return _runtime_predicates.contains(source_node_id);
138
22
    }
139
140
0
    vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) {
141
0
        DCHECK(has_runtime_predicate(source_node_id));
142
0
        return _runtime_predicates.find(source_node_id)->second;
143
0
    }
144
145
0
    void init_runtime_predicates(const std::vector<TTopnFilterDesc>& topn_filter_descs) {
146
0
        for (auto desc : topn_filter_descs) {
147
0
            _runtime_predicates.try_emplace(desc.source_node_id, desc);
148
0
        }
149
0
    }
150
151
    void set_workload_group(WorkloadGroupPtr& wg);
152
153
6
    int execution_timeout() const {
154
6
        return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
155
6
                                                        : _query_options.query_timeout;
156
6
    }
157
158
115
    int32_t runtime_filter_wait_time_ms() const {
159
115
        return _query_options.runtime_filter_wait_time_ms;
160
115
    }
161
162
121
    bool runtime_filter_wait_infinitely() const {
163
121
        return _query_options.__isset.runtime_filter_wait_infinitely &&
164
121
               _query_options.runtime_filter_wait_infinitely;
165
121
    }
166
167
0
    int be_exec_version() const {
168
0
        if (!_query_options.__isset.be_exec_version) {
169
0
            return 0;
170
0
        }
171
0
        return _query_options.be_exec_version;
172
0
    }
173
174
0
    [[nodiscard]] int64_t get_fe_process_uuid() const {
175
0
        return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
176
0
    }
177
178
0
    bool ignore_runtime_filter_error() const {
179
0
        return _query_options.__isset.ignore_runtime_filter_error
180
0
                       ? _query_options.ignore_runtime_filter_error
181
0
                       : false;
182
0
    }
183
184
0
    bool enable_force_spill() const {
185
0
        return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill;
186
0
    }
187
6.20k
    const TQueryOptions& query_options() const { return _query_options; }
188
189
    // global runtime filter mgr, the runtime filter have remote target or
190
    // need local merge should regist here. before publish() or push_to_remote()
191
    // the runtime filter should do the local merge work
192
103
    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
193
194
78.1k
    TUniqueId query_id() const { return _query_id; }
195
196
0
    vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
197
198
0
    vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
199
0
        return _remote_scan_task_scheduler;
200
0
    }
201
202
78.1k
    pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }
203
78.2k
    pipeline::Dependency* get_memory_sufficient_dependency() {
204
78.2k
        return _memory_sufficient_dependency.get();
205
78.2k
    }
206
207
    doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
208
209
    void set_merge_controller_handler(
210
0
            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
211
0
        _merge_controller_handler = handler;
212
0
    }
213
0
    std::shared_ptr<RuntimeFilterMergeControllerEntity> get_merge_controller_handler() const {
214
0
        return _merge_controller_handler;
215
0
    }
216
217
0
    bool is_nereids() const { return _is_nereids; }
218
219
78.6k
    WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); }
220
78.9k
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const {
221
78.9k
        return _resource_ctx->memory_context()->mem_tracker();
222
78.9k
    }
223
224
168
    int32_t get_slot_count() const {
225
168
        return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1;
226
168
    }
227
228
    DescriptorTbl* desc_tbl = nullptr;
229
    bool set_rsc_info = false;
230
    std::string user;
231
    std::string group;
232
    TNetworkAddress coord_addr;
233
    TNetworkAddress current_connect_fe;
234
    TQueryGlobals query_globals;
235
236
    ObjectPool obj_pool;
237
238
5.16k
    std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; }
239
240
    // plan node id -> TFileScanRangeParams
241
    // only for file scan node
242
    std::map<int, TFileScanRangeParams> file_scan_range_params_map;
243
244
    void add_using_brpc_stub(const TNetworkAddress& network_address,
245
0
                             std::shared_ptr<PBackendService_Stub> brpc_stub) {
246
0
        if (network_address.port == 0) {
247
0
            return;
248
0
        }
249
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
250
0
        if (!_using_brpc_stubs.contains(network_address)) {
251
0
            _using_brpc_stubs.emplace(network_address, brpc_stub);
252
0
        }
253
254
0
        DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
255
0
    }
256
257
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
258
0
    get_using_brpc_stubs() {
259
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
260
0
        return _using_brpc_stubs;
261
0
    }
262
263
1.19k
    void set_low_memory_mode() {
264
        // will not return from low memory mode to non-low memory mode.
265
1.19k
        _resource_ctx->task_controller()->set_low_memory_mode(true);
266
1.19k
    }
267
177k
    bool low_memory_mode() { return _resource_ctx->task_controller()->low_memory_mode(); }
268
269
78.7k
    bool is_pure_load_task() {
270
78.7k
        return _query_source == QuerySource::STREAM_LOAD ||
271
78.7k
               _query_source == QuerySource::ROUTINE_LOAD ||
272
78.7k
               _query_source == QuerySource::GROUP_COMMIT_LOAD;
273
78.7k
    }
274
275
    void set_load_error_url(std::string error_url);
276
    std::string get_load_error_url();
277
278
private:
279
    friend class QueryTaskController;
280
281
    int _timeout_second;
282
    TUniqueId _query_id;
283
    ExecEnv* _exec_env = nullptr;
284
    MonotonicStopWatch _query_watcher;
285
    bool _is_nereids = false;
286
287
    std::shared_ptr<ResourceContext> _resource_ctx;
288
289
    // A token used to submit olap scanner to the "_limited_scan_thread_pool",
290
    // This thread pool token is created from "_limited_scan_thread_pool" from exec env.
291
    // And will be shared by all instances of this query.
292
    // So that we can control the max thread that a query can be used to execute.
293
    // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
294
    std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
295
296
    void _init_resource_context();
297
    void _init_query_mem_tracker();
298
299
    std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
300
301
    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
302
    const TQueryOptions _query_options;
303
304
    // All pipeline tasks use the same query context to report status. So we need a `_exec_status`
305
    // to report the real message if failed.
306
    AtomicStatus _exec_status;
307
308
    doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
309
    vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
310
    vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
311
    // This dependency indicates if the 2nd phase RPC received from FE.
312
    std::unique_ptr<pipeline::Dependency> _execution_dependency;
313
    // This dependency indicates if memory is sufficient to execute.
314
    std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency;
315
316
    // This shared ptr is never used. It is just a reference to hold the object.
317
    // There is a weak ptr in runtime filter manager to reference this object.
318
    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
319
320
    std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
321
    std::mutex _pipeline_map_write_lock;
322
323
    std::mutex _profile_mutex;
324
    timespec _query_arrival_timestamp;
325
    // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
326
    // help us manage the query.
327
    QuerySource _query_source;
328
329
    std::mutex _brpc_stubs_mutex;
330
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs;
331
332
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
333
    // flatten profile of one fragment:
334
    // Pipeline 0
335
    //      PipelineTask 0
336
    //              Operator 1
337
    //              Operator 2
338
    //              Scanner
339
    //      PipelineTask 1
340
    //              Operator 1
341
    //              Operator 2
342
    //              Scanner
343
    // Pipeline 1
344
    //      PipelineTask 2
345
    //              Operator 3
346
    //      PipelineTask 3
347
    //              Operator 3
348
    // fragment_id -> list<profile>
349
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map;
350
    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;
351
352
    void _report_query_profile();
353
354
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
355
    _collect_realtime_query_profile();
356
357
    std::mutex _error_url_lock;
358
    std::string _load_error_url;
359
360
public:
361
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
362
    void add_fragment_profile(
363
            int fragment_id,
364
            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile,
365
            std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
366
367
    TReportExecStatusParams get_realtime_exec_status();
368
369
78.5k
    bool enable_profile() const {
370
78.5k
        return _query_options.__isset.enable_profile && _query_options.enable_profile;
371
78.5k
    }
372
373
0
    timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
374
0
    QuerySource get_query_source() const { return this->_query_source; }
375
};
376
377
} // namespace doris