Coverage Report

Created: 2024-11-21 14:46

/root/doris/be/src/runtime/query_context.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <atomic>
25
#include <memory>
26
#include <string>
27
28
#include "common/config.h"
29
#include "common/factory_creator.h"
30
#include "common/object_pool.h"
31
#include "runtime/exec_env.h"
32
#include "runtime/memory/mem_tracker_limiter.h"
33
#include "runtime/query_statistics.h"
34
#include "runtime/runtime_filter_mgr.h"
35
#include "runtime/runtime_predicate.h"
36
#include "util/threadpool.h"
37
#include "vec/exec/scan/scanner_scheduler.h"
38
#include "vec/runtime/shared_hash_table_controller.h"
39
#include "vec/runtime/shared_scanner_controller.h"
40
#include "workload_group/workload_group.h"
41
42
namespace doris {
43
44
namespace pipeline {
45
class PipelineFragmentContext;
46
} // namespace pipeline
47
48
struct ReportStatusRequest {
49
    bool is_pipeline_x;
50
    const Status status;
51
    std::vector<RuntimeState*> runtime_states;
52
    RuntimeProfile* profile = nullptr;
53
    RuntimeProfile* load_channel_profile = nullptr;
54
    bool done;
55
    TNetworkAddress coord_addr;
56
    TUniqueId query_id;
57
    int fragment_id;
58
    TUniqueId fragment_instance_id;
59
    int backend_num;
60
    RuntimeState* runtime_state;
61
    std::function<Status(Status)> update_fn;
62
    std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn;
63
};
64
65
enum class QuerySource {
66
    INTERNAL_FRONTEND,
67
    STREAM_LOAD,
68
    GROUP_COMMIT_LOAD,
69
    ROUTINE_LOAD,
70
    EXTERNAL_CONNECTOR
71
};
72
73
const std::string toString(QuerySource query_source);
74
75
// Save the common components of fragments in a query.
76
// Some components like DescriptorTbl may be very large
77
// that will slow down each execution of fragments when DeSer them every time.
78
class DescriptorTbl;
79
class QueryContext {
80
    ENABLE_FACTORY_CREATOR(QueryContext);
81
82
public:
83
    QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env,
84
                 const TQueryOptions& query_options, TNetworkAddress coord_addr, bool is_pipeline,
85
                 bool is_nereids, TNetworkAddress current_connect_fe, QuerySource query_type);
86
87
    ~QueryContext();
88
89
    // Notice. For load fragments, the fragment_num sent by FE has a small probability of 0.
90
    // this may be a bug, bug <= 1 in theory it shouldn't cause any problems at this stage.
91
0
    bool countdown(int instance_num) {
92
0
        return fragment_num.fetch_sub(instance_num) <= instance_num;
93
0
    }
94
95
0
    ExecEnv* exec_env() { return _exec_env; }
96
97
0
    bool is_timeout(const VecDateTimeValue& now) const {
98
0
        if (timeout_second <= 0) {
99
0
            return false;
100
0
        }
101
0
        if (now.second_diff(_start_time) > timeout_second) {
102
0
            return true;
103
0
        }
104
0
        return false;
105
0
    }
106
107
0
    int64_t query_time(VecDateTimeValue& now) { return now.second_diff(_start_time); }
108
109
0
    void set_thread_token(int concurrency, bool is_serial) {
110
0
        _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
111
0
                is_serial ? ThreadPool::ExecutionMode::SERIAL
112
0
                          : ThreadPool::ExecutionMode::CONCURRENT,
113
0
                concurrency);
114
0
    }
115
116
0
    ThreadPoolToken* get_token() { return _thread_token.get(); }
117
118
    void set_ready_to_execute(bool is_cancelled);
119
120
0
    [[nodiscard]] bool is_cancelled() const { return _is_cancelled.load(); }
121
122
    void cancel_all_pipeline_context(const PPlanFragmentCancelReason& reason,
123
                                     const std::string& msg);
124
    Status cancel_pipeline_context(const int fragment_id, const PPlanFragmentCancelReason& reason,
125
                                   const std::string& msg);
126
    std::string print_all_pipeline_context();
127
    void set_pipeline_context(const int fragment_id,
128
                              std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
129
    void cancel(std::string msg, Status new_status, int fragment_id = -1);
130
131
0
    void set_exec_status(Status new_status) {
132
0
        if (new_status.ok()) {
133
0
            return;
134
0
        }
135
0
        std::lock_guard<std::mutex> l(_exec_status_lock);
136
0
        if (!_exec_status.ok()) {
137
0
            return;
138
0
        }
139
0
        _exec_status = new_status;
140
0
    }
141
142
0
    [[nodiscard]] Status exec_status() {
143
0
        std::lock_guard<std::mutex> l(_exec_status_lock);
144
0
        return _exec_status;
145
0
    }
146
147
    void set_execution_dependency_ready();
148
149
    void set_ready_to_execute_only();
150
151
0
    bool is_ready_to_execute() {
152
0
        std::lock_guard<std::mutex> l(_start_lock);
153
0
        return _ready_to_execute;
154
0
    }
155
156
0
    bool wait_for_start() {
157
0
        int wait_time = config::max_fragment_start_wait_time_seconds;
158
0
        std::unique_lock<std::mutex> l(_start_lock);
159
0
        while (!_ready_to_execute.load() && !_is_cancelled.load() && --wait_time > 0) {
160
0
            _start_cond.wait_for(l, std::chrono::seconds(1));
161
0
        }
162
0
        return _ready_to_execute.load() && !_is_cancelled.load();
163
0
    }
164
165
0
    std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() {
166
0
        return _shared_hash_table_controller;
167
0
    }
168
169
0
    std::shared_ptr<vectorized::SharedScannerController> get_shared_scanner_controller() {
170
0
        return _shared_scanner_controller;
171
0
    }
172
173
0
    vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) {
174
0
        DCHECK(_runtime_predicates.contains(source_node_id) || _runtime_predicates.contains(0));
175
0
        if (_runtime_predicates.contains(source_node_id)) {
176
0
            return _runtime_predicates[source_node_id];
177
0
        }
178
0
        return _runtime_predicates[0];
179
0
    }
180
181
0
    void init_runtime_predicates(std::vector<int> source_node_ids) {
182
0
        for (int id : source_node_ids) {
183
0
            _runtime_predicates.try_emplace(id);
184
0
        }
185
0
    }
186
187
    Status set_workload_group(WorkloadGroupPtr& tg);
188
189
0
    int execution_timeout() const {
190
0
        return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
191
0
                                                        : _query_options.query_timeout;
192
0
    }
193
194
0
    int32_t runtime_filter_wait_time_ms() const {
195
0
        return _query_options.runtime_filter_wait_time_ms;
196
0
    }
197
198
0
    bool runtime_filter_wait_infinitely() const {
199
0
        return _query_options.__isset.runtime_filter_wait_infinitely &&
200
0
               _query_options.runtime_filter_wait_infinitely;
201
0
    }
202
203
0
    bool enable_pipeline_exec() const {
204
0
        return _query_options.__isset.enable_pipeline_engine &&
205
0
               _query_options.enable_pipeline_engine;
206
0
    }
207
208
0
    bool enable_pipeline_x_exec() const {
209
0
        return _query_options.__isset.enable_pipeline_x_engine &&
210
0
               _query_options.enable_pipeline_x_engine;
211
0
    }
212
213
0
    int be_exec_version() const {
214
0
        if (!_query_options.__isset.be_exec_version) {
215
0
            return 0;
216
0
        }
217
0
        return _query_options.be_exec_version;
218
0
    }
219
220
0
    [[nodiscard]] int64_t get_fe_process_uuid() const {
221
0
        return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
222
0
    }
223
224
    // global runtime filter mgr, the runtime filter have remote target or
225
    // need local merge should regist here. before publish() or push_to_remote()
226
    // the runtime filter should do the local merge work
227
0
    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
228
229
0
    TUniqueId query_id() const { return _query_id; }
230
231
0
    vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
232
233
0
    vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
234
0
        return _remote_scan_task_scheduler;
235
0
    }
236
237
0
    pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }
238
239
    void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
240
241
    std::shared_ptr<QueryStatistics> get_query_statistics();
242
243
    void register_memory_statistics();
244
245
    void register_cpu_statistics();
246
247
0
    std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; }
248
249
    doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
250
251
    ThreadPool* get_memtable_flush_pool();
252
253
0
    int64_t mem_limit() const { return _bytes_limit; }
254
255
    void set_merge_controller_handler(
256
0
            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
257
0
        _merge_controller_handler = handler;
258
0
    }
259
260
0
    bool is_nereids() const { return _is_nereids; }
261
262
0
    WorkloadGroupPtr workload_group() const { return _workload_group; }
263
264
0
    void inc_running_big_mem_op_num() {
265
0
        _running_big_mem_op_num.fetch_add(1, std::memory_order_relaxed);
266
0
    }
267
0
    void dec_running_big_mem_op_num() {
268
0
        _running_big_mem_op_num.fetch_sub(1, std::memory_order_relaxed);
269
0
    }
270
0
    int32_t get_running_big_mem_op_num() {
271
0
        return _running_big_mem_op_num.load(std::memory_order_relaxed);
272
0
    }
273
274
0
    void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
275
0
    int64_t spill_threshold() { return _spill_threshold; }
276
    DescriptorTbl* desc_tbl = nullptr;
277
    bool set_rsc_info = false;
278
    std::string user;
279
    std::string group;
280
    TNetworkAddress coord_addr;
281
    TNetworkAddress current_connect_fe;
282
    TQueryGlobals query_globals;
283
284
    /// In the current implementation, for multiple fragments executed by a query on the same BE node,
285
    /// we store some common components in QueryContext, and save QueryContext in FragmentMgr.
286
    /// When all Fragments are executed, QueryContext needs to be deleted from FragmentMgr.
287
    /// Here we use a counter to store the number of Fragments that have not yet been completed,
288
    /// and after each Fragment is completed, this value will be reduced by one.
289
    /// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment
290
    /// will clean up QueryContext.
291
    std::atomic<int> fragment_num;
292
    int timeout_second;
293
    ObjectPool obj_pool;
294
    // MemTracker that is shared by all fragment instances running on this host.
295
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
296
297
    std::vector<TUniqueId> fragment_instance_ids;
298
299
    // plan node id -> TFileScanRangeParams
300
    // only for file scan node
301
    std::map<int, TFileScanRangeParams> file_scan_range_params_map;
302
303
0
    void update_wg_cpu_adder(int64_t delta_cpu_time) {
304
0
        if (_workload_group != nullptr) {
305
0
            _workload_group->update_cpu_adder(delta_cpu_time);
306
0
        }
307
0
    }
308
309
    void add_using_brpc_stub(const TNetworkAddress& network_address,
310
0
                             std::shared_ptr<PBackendService_Stub> brpc_stub) {
311
0
        if (network_address.port == 0) {
312
0
            return;
313
0
        }
314
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
315
0
        if (!_using_brpc_stubs.contains(network_address)) {
316
0
            _using_brpc_stubs.emplace(network_address, brpc_stub);
317
0
        }
318
319
0
        DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
320
0
    }
321
322
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
323
0
    get_using_brpc_stubs() {
324
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
325
0
        return _using_brpc_stubs;
326
0
    }
327
328
private:
329
    TUniqueId _query_id;
330
    ExecEnv* _exec_env = nullptr;
331
    VecDateTimeValue _start_time;
332
    int64_t _bytes_limit = 0;
333
    bool _is_pipeline = false;
334
    bool _is_nereids = false;
335
    std::atomic<int> _running_big_mem_op_num = 0;
336
337
    // A token used to submit olap scanner to the "_limited_scan_thread_pool",
338
    // This thread pool token is created from "_limited_scan_thread_pool" from exec env.
339
    // And will be shared by all instances of this query.
340
    // So that we can control the max thread that a query can be used to execute.
341
    // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
342
    std::unique_ptr<ThreadPoolToken> _thread_token;
343
344
    std::mutex _start_lock;
345
    std::condition_variable _start_cond;
346
    // Only valid when _need_wait_execution_trigger is set to true in PlanFragmentExecutor.
347
    // And all fragments of this query will start execution when this is set to true.
348
    std::atomic<bool> _ready_to_execute {false};
349
    std::atomic<bool> _is_cancelled {false};
350
351
    void _init_query_mem_tracker();
352
353
    std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
354
    std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller;
355
    std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
356
357
    WorkloadGroupPtr _workload_group = nullptr;
358
    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
359
    const TQueryOptions _query_options;
360
361
    std::mutex _exec_status_lock;
362
    // All pipeline tasks use the same query context to report status. So we need a `_exec_status`
363
    // to report the real message if failed.
364
    Status _exec_status = Status::OK();
365
366
    doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
367
    vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
368
    ThreadPool* _memtable_flush_pool = nullptr;
369
    vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
370
    std::unique_ptr<pipeline::Dependency> _execution_dependency;
371
372
    std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
373
    // This shared ptr is never used. It is just a reference to hold the object.
374
    // There is a weak ptr in runtime filter manager to reference this object.
375
    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
376
377
    std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
378
    std::mutex _pipeline_map_write_lock;
379
380
    std::atomic<int64_t> _spill_threshold {0};
381
    timespec _query_arrival_timestamp;
382
    // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
383
    // help us manage the query.
384
    QuerySource _query_source;
385
386
    std::mutex _brpc_stubs_mutex;
387
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs;
388
389
public:
390
0
    timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
391
0
    QuerySource get_query_source() const { return this->_query_source; }
392
};
393
394
} // namespace doris