Coverage Report

Created: 2024-11-21 23:27

/root/doris/be/src/runtime/query_context.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PaloInternalService_types.h>
21
#include <gen_cpp/RuntimeProfile_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <atomic>
26
#include <memory>
27
#include <mutex>
28
#include <string>
29
#include <unordered_map>
30
31
#include "common/config.h"
32
#include "common/factory_creator.h"
33
#include "common/object_pool.h"
34
#include "runtime/exec_env.h"
35
#include "runtime/memory/mem_tracker_limiter.h"
36
#include "runtime/query_statistics.h"
37
#include "runtime/runtime_filter_mgr.h"
38
#include "runtime/runtime_predicate.h"
39
#include "util/hash_util.hpp"
40
#include "util/threadpool.h"
41
#include "vec/exec/scan/scanner_scheduler.h"
42
#include "vec/runtime/shared_hash_table_controller.h"
43
#include "workload_group/workload_group.h"
44
45
namespace doris {
46
47
namespace pipeline {
48
class PipelineFragmentContext;
49
} // namespace pipeline
50
51
struct ReportStatusRequest {
52
    const Status status;
53
    std::vector<RuntimeState*> runtime_states;
54
    bool done;
55
    TNetworkAddress coord_addr;
56
    TUniqueId query_id;
57
    int fragment_id;
58
    TUniqueId fragment_instance_id;
59
    int backend_num;
60
    RuntimeState* runtime_state;
61
    std::function<void(const Status&)> cancel_fn;
62
};
63
64
enum class QuerySource {
65
    INTERNAL_FRONTEND,
66
    STREAM_LOAD,
67
    GROUP_COMMIT_LOAD,
68
    ROUTINE_LOAD,
69
    EXTERNAL_CONNECTOR
70
};
71
72
const std::string toString(QuerySource query_source);
73
74
// Save the common components of fragments in a query.
75
// Some components like DescriptorTbl may be very large
76
// that will slow down each execution of fragments when DeSer them every time.
77
class DescriptorTbl;
78
class QueryContext {
79
    ENABLE_FACTORY_CREATOR(QueryContext);
80
81
public:
82
    QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options,
83
                 TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe,
84
                 QuerySource query_type);
85
86
    ~QueryContext();
87
88
0
    ExecEnv* exec_env() { return _exec_env; }
89
90
0
    bool is_timeout(timespec now) const {
91
0
        if (_timeout_second <= 0) {
92
0
            return false;
93
0
        }
94
0
        return _query_watcher.elapsed_time_seconds(now) > _timeout_second;
95
0
    }
96
97
0
    void set_thread_token(int concurrency, bool is_serial) {
98
0
        _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token(
99
0
                is_serial ? ThreadPool::ExecutionMode::SERIAL
100
0
                          : ThreadPool::ExecutionMode::CONCURRENT,
101
0
                concurrency);
102
0
    }
103
104
0
    ThreadPoolToken* get_token() { return _thread_token.get(); }
105
106
    void set_ready_to_execute(Status reason);
107
108
0
    [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); }
109
110
    void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1);
111
    std::string print_all_pipeline_context();
112
    void set_pipeline_context(const int fragment_id,
113
                              std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx);
114
    void cancel(Status new_status, int fragment_id = -1);
115
116
0
    [[nodiscard]] Status exec_status() { return _exec_status.status(); }
117
118
    void set_execution_dependency_ready();
119
120
    void set_ready_to_execute_only();
121
122
0
    std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() {
123
0
        return _shared_hash_table_controller;
124
0
    }
125
126
0
    bool has_runtime_predicate(int source_node_id) {
127
0
        return _runtime_predicates.contains(source_node_id);
128
0
    }
129
130
0
    vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) {
131
0
        DCHECK(has_runtime_predicate(source_node_id));
132
0
        return _runtime_predicates.find(source_node_id)->second;
133
0
    }
134
135
0
    void init_runtime_predicates(const std::vector<TTopnFilterDesc>& topn_filter_descs) {
136
0
        for (auto desc : topn_filter_descs) {
137
0
            _runtime_predicates.try_emplace(desc.source_node_id, desc);
138
0
        }
139
0
    }
140
141
    Status set_workload_group(WorkloadGroupPtr& tg);
142
143
0
    int execution_timeout() const {
144
0
        return _query_options.__isset.execution_timeout ? _query_options.execution_timeout
145
0
                                                        : _query_options.query_timeout;
146
0
    }
147
148
0
    int32_t runtime_filter_wait_time_ms() const {
149
0
        return _query_options.runtime_filter_wait_time_ms;
150
0
    }
151
152
0
    bool runtime_filter_wait_infinitely() const {
153
0
        return _query_options.__isset.runtime_filter_wait_infinitely &&
154
0
               _query_options.runtime_filter_wait_infinitely;
155
0
    }
156
157
0
    int be_exec_version() const {
158
0
        if (!_query_options.__isset.be_exec_version) {
159
0
            return 0;
160
0
        }
161
0
        return _query_options.be_exec_version;
162
0
    }
163
164
0
    [[nodiscard]] int64_t get_fe_process_uuid() const {
165
0
        return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
166
0
    }
167
168
0
    bool ignore_runtime_filter_error() const {
169
0
        return _query_options.__isset.ignore_runtime_filter_error
170
0
                       ? _query_options.ignore_runtime_filter_error
171
0
                       : false;
172
0
    }
173
174
    // global runtime filter mgr, the runtime filter have remote target or
175
    // need local merge should regist here. before publish() or push_to_remote()
176
    // the runtime filter should do the local merge work
177
0
    RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); }
178
179
0
    TUniqueId query_id() const { return _query_id; }
180
181
0
    vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; }
182
183
0
    vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() {
184
0
        return _remote_scan_task_scheduler;
185
0
    }
186
187
0
    pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); }
188
189
    void register_query_statistics(std::shared_ptr<QueryStatistics> qs);
190
191
    std::shared_ptr<QueryStatistics> get_query_statistics();
192
193
    void register_memory_statistics();
194
195
    void register_cpu_statistics();
196
197
0
    std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; }
198
199
    doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
200
201
    ThreadPool* get_memtable_flush_pool();
202
203
0
    int64_t mem_limit() const { return _bytes_limit; }
204
205
    void set_merge_controller_handler(
206
0
            std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) {
207
0
        _merge_controller_handler = handler;
208
0
    }
209
210
0
    bool is_nereids() const { return _is_nereids; }
211
212
0
    WorkloadGroupPtr workload_group() const { return _workload_group; }
213
214
0
    void inc_running_big_mem_op_num() {
215
0
        _running_big_mem_op_num.fetch_add(1, std::memory_order_relaxed);
216
0
    }
217
0
    void dec_running_big_mem_op_num() {
218
0
        _running_big_mem_op_num.fetch_sub(1, std::memory_order_relaxed);
219
0
    }
220
0
    int32_t get_running_big_mem_op_num() {
221
0
        return _running_big_mem_op_num.load(std::memory_order_relaxed);
222
0
    }
223
224
0
    void set_spill_threshold(int64_t spill_threshold) { _spill_threshold = spill_threshold; }
225
0
    int64_t spill_threshold() { return _spill_threshold; }
226
    DescriptorTbl* desc_tbl = nullptr;
227
    bool set_rsc_info = false;
228
    std::string user;
229
    std::string group;
230
    TNetworkAddress coord_addr;
231
    TNetworkAddress current_connect_fe;
232
    TQueryGlobals query_globals;
233
234
    ObjectPool obj_pool;
235
    // MemTracker that is shared by all fragment instances running on this host.
236
    std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
237
238
    std::vector<TUniqueId> fragment_instance_ids;
239
240
    // plan node id -> TFileScanRangeParams
241
    // only for file scan node
242
    std::map<int, TFileScanRangeParams> file_scan_range_params_map;
243
244
0
    void update_wg_cpu_adder(int64_t delta_cpu_time) {
245
0
        if (_workload_group != nullptr) {
246
0
            _workload_group->update_cpu_adder(delta_cpu_time);
247
0
        }
248
0
    }
249
250
    void add_using_brpc_stub(const TNetworkAddress& network_address,
251
0
                             std::shared_ptr<PBackendService_Stub> brpc_stub) {
252
0
        if (network_address.port == 0) {
253
0
            return;
254
0
        }
255
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
256
0
        if (!_using_brpc_stubs.contains(network_address)) {
257
0
            _using_brpc_stubs.emplace(network_address, brpc_stub);
258
0
        }
259
260
0
        DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get());
261
0
    }
262
263
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>>
264
0
    get_using_brpc_stubs() {
265
0
        std::lock_guard<std::mutex> lock(_brpc_stubs_mutex);
266
0
        return _using_brpc_stubs;
267
0
    }
268
269
private:
270
    int _timeout_second;
271
    TUniqueId _query_id;
272
    ExecEnv* _exec_env = nullptr;
273
    MonotonicStopWatch _query_watcher;
274
    int64_t _bytes_limit = 0;
275
    bool _is_nereids = false;
276
    std::atomic<int> _running_big_mem_op_num = 0;
277
278
    // A token used to submit olap scanner to the "_limited_scan_thread_pool",
279
    // This thread pool token is created from "_limited_scan_thread_pool" from exec env.
280
    // And will be shared by all instances of this query.
281
    // So that we can control the max thread that a query can be used to execute.
282
    // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env.
283
    std::unique_ptr<ThreadPoolToken> _thread_token {nullptr};
284
285
    void _init_query_mem_tracker();
286
287
    std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller;
288
    std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates;
289
290
    WorkloadGroupPtr _workload_group = nullptr;
291
    std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr;
292
    const TQueryOptions _query_options;
293
294
    // All pipeline tasks use the same query context to report status. So we need a `_exec_status`
295
    // to report the real message if failed.
296
    AtomicStatus _exec_status;
297
298
    doris::pipeline::TaskScheduler* _task_scheduler = nullptr;
299
    vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
300
    ThreadPool* _memtable_flush_pool = nullptr;
301
    vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr;
302
    std::unique_ptr<pipeline::Dependency> _execution_dependency;
303
304
    std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr;
305
    // This shared ptr is never used. It is just a reference to hold the object.
306
    // There is a weak ptr in runtime filter manager to reference this object.
307
    std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler;
308
309
    std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx;
310
    std::mutex _pipeline_map_write_lock;
311
312
    std::atomic<int64_t> _spill_threshold {0};
313
314
    std::mutex _profile_mutex;
315
    timespec _query_arrival_timestamp;
316
    // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to
317
    // help us manage the query.
318
    QuerySource _query_source;
319
320
    std::mutex _brpc_stubs_mutex;
321
    std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs;
322
323
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
324
    // flatten profile of one fragment:
325
    // Pipeline 0
326
    //      PipelineTask 0
327
    //              Operator 1
328
    //              Operator 2
329
    //              Scanner
330
    //      PipelineTask 1
331
    //              Operator 1
332
    //              Operator 2
333
    //              Scanner
334
    // Pipeline 1
335
    //      PipelineTask 2
336
    //              Operator 3
337
    //      PipelineTask 3
338
    //              Operator 3
339
    // fragment_id -> list<profile>
340
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map;
341
    std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map;
342
343
    void _report_query_profile();
344
345
    std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
346
    _collect_realtime_query_profile() const;
347
348
public:
349
    // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile
350
    void add_fragment_profile(
351
            int fragment_id,
352
            const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile,
353
            std::shared_ptr<TRuntimeProfileTree> load_channel_profile);
354
355
    TReportExecStatusParams get_realtime_exec_status() const;
356
357
0
    bool enable_profile() const {
358
0
        return _query_options.__isset.enable_profile && _query_options.enable_profile;
359
0
    }
360
361
0
    timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; }
362
0
    QuerySource get_query_source() const { return this->_query_source; }
363
};
364
365
} // namespace doris