/root/doris/be/src/runtime/fragment_mgr.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <gen_cpp/types.pb.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <condition_variable> |
25 | | #include <functional> |
26 | | #include <iosfwd> |
27 | | #include <memory> |
28 | | #include <mutex> |
29 | | #include <string> |
30 | | #include <unordered_map> |
31 | | #include <vector> |
32 | | |
33 | | #include "common/status.h" |
34 | | #include "gutil/ref_counted.h" |
35 | | #include "http/rest_monitor_iface.h" |
36 | | #include "runtime/query_context.h" |
37 | | #include "runtime_filter_mgr.h" |
38 | | #include "util/countdown_latch.h" |
39 | | #include "util/hash_util.hpp" // IWYU pragma: keep |
40 | | #include "util/metrics.h" |
41 | | |
42 | | namespace butil { |
43 | | class IOBufAsZeroCopyInputStream; |
44 | | } |
45 | | |
46 | | namespace doris { |
47 | | |
48 | | namespace pipeline { |
49 | | class PipelineFragmentContext; |
50 | | class PipelineXFragmentContext; |
51 | | } // namespace pipeline |
52 | | class QueryContext; |
53 | | class ExecEnv; |
54 | | class PlanFragmentExecutor; |
55 | | class ThreadPool; |
56 | | class TExecPlanFragmentParams; |
57 | | class PExecPlanFragmentStartRequest; |
58 | | class PMergeFilterRequest; |
59 | | class PPublishFilterRequest; |
60 | | class RuntimeProfile; |
61 | | class RuntimeState; |
62 | | class TPipelineFragmentParams; |
63 | | class TPipelineInstanceParams; |
64 | | class TScanColumnDesc; |
65 | | class TScanOpenParams; |
66 | | class Thread; |
67 | | class WorkloadQueryInfo; |
68 | | |
69 | | std::string to_load_error_http_path(const std::string& file_name); |
70 | | |
71 | | template <typename Key, typename Value, typename ValueType> |
72 | | class ConcurrentContextMap { |
73 | | public: |
74 | | using ApplyFunction = std::function<Status(phmap::flat_hash_map<Key, Value>&)>; |
75 | | ConcurrentContextMap(); |
76 | | Value find(const Key& query_id); |
77 | | void insert(const Key& query_id, std::shared_ptr<ValueType>); |
78 | | void clear(); |
79 | | void erase(const Key& query_id); |
80 | 4 | size_t num_items() const { |
81 | 4 | size_t n = 0; |
82 | 512 | for (auto& pair : _internal_map) { |
83 | 512 | std::shared_lock lock(*pair.first); |
84 | 512 | auto& map = pair.second; |
85 | 512 | n += map.size(); |
86 | 512 | } |
87 | 4 | return n; |
88 | 4 | } _ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E9num_itemsEv Line | Count | Source | 80 | 4 | size_t num_items() const { | 81 | 4 | size_t n = 0; | 82 | 512 | for (auto& pair : _internal_map) { | 83 | 512 | std::shared_lock lock(*pair.first); | 84 | 512 | auto& map = pair.second; | 85 | 512 | n += map.size(); | 86 | 512 | } | 87 | 4 | return n; | 88 | 4 | } |
Unexecuted instantiation: _ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E9num_itemsEv Unexecuted instantiation: _ZNK5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E9num_itemsEv |
89 | 20 | void apply(ApplyFunction&& function) { |
90 | 2.56k | for (auto& pair : _internal_map) { |
91 | | // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must |
92 | | // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok |
93 | 2.56k | std::unique_lock lock(*pair.first); |
94 | 2.56k | static_cast<void>(function(pair.second)); |
95 | 2.56k | } |
96 | 20 | } _ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_8pipeline23PipelineFragmentContextEES4_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S5_NS9_4HashIS1_EENS9_7EqualToIS1_EESaISt4pairIKS1_S5_EEEEEE Line | Count | Source | 89 | 8 | void apply(ApplyFunction&& function) { | 90 | 1.02k | for (auto& pair : _internal_map) { | 91 | | // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must | 92 | | // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok | 93 | 1.02k | std::unique_lock lock(*pair.first); | 94 | 1.02k | static_cast<void>(function(pair.second)); | 95 | 1.02k | } | 96 | 8 | } |
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_20PlanFragmentExecutorEES3_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S4_NS8_4HashIS1_EENS8_7EqualToIS1_EESaISt4pairIKS1_S4_EEEEEE Line | Count | Source | 89 | 4 | void apply(ApplyFunction&& function) { | 90 | 512 | for (auto& pair : _internal_map) { | 91 | | // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must | 92 | | // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok | 93 | 512 | std::unique_lock lock(*pair.first); | 94 | 512 | static_cast<void>(function(pair.second)); | 95 | 512 | } | 96 | 4 | } |
_ZN5doris20ConcurrentContextMapINS_9TUniqueIdESt10shared_ptrINS_12QueryContextEES3_E5applyEOSt8functionIFNS_6StatusERN5phmap13flat_hash_mapIS1_S4_NS8_4HashIS1_EENS8_7EqualToIS1_EESaISt4pairIKS1_S4_EEEEEE Line | Count | Source | 89 | 8 | void apply(ApplyFunction&& function) { | 90 | 1.02k | for (auto& pair : _internal_map) { | 91 | | // TODO: Now only the cancel worker do the GC the _query_ctx_map. each query must | 92 | | // do erase the finish query unless in _query_ctx_map. Rethink the logic is ok | 93 | 1.02k | std::unique_lock lock(*pair.first); | 94 | 1.02k | static_cast<void>(function(pair.second)); | 95 | 1.02k | } | 96 | 8 | } |
|
97 | | |
98 | | Status apply_if_not_exists(const Key& query_id, std::shared_ptr<ValueType>& query_ctx, |
99 | | ApplyFunction&& function); |
100 | | |
101 | | private: |
102 | | // The lock should only be used to protect the structures in fragment manager. Has to be |
103 | | // used in a very small scope because it may dead lock. For example, if the _lock is used |
104 | | // in prepare stage, the call path is prepare --> expr prepare --> may call allocator |
105 | | // when allocate failed, allocator may call query_is_cancelled, query is callced will also |
106 | | // call _lock, so that there is dead lock. |
107 | | std::vector<std::pair<std::unique_ptr<std::shared_mutex>, phmap::flat_hash_map<Key, Value>>> |
108 | | _internal_map; |
109 | | }; |
110 | | |
111 | | // This class used to manage all the fragment execute in this instance |
112 | | class FragmentMgr : public RestMonitorIface { |
113 | | public: |
114 | | using FinishCallback = std::function<void(RuntimeState*, Status*)>; |
115 | | |
116 | | FragmentMgr(ExecEnv* exec_env); |
117 | | ~FragmentMgr() override; |
118 | | |
119 | | void stop(); |
120 | | |
121 | | // execute one plan fragment |
122 | | Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type); |
123 | | |
124 | | Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type); |
125 | | |
126 | | void remove_pipeline_context( |
127 | | std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context); |
128 | | |
129 | | // TODO(zc): report this is over |
130 | | Status exec_plan_fragment(const TExecPlanFragmentParams& params, const QuerySource query_type, |
131 | | const FinishCallback& cb); |
132 | | |
133 | | Status exec_plan_fragment(const TPipelineFragmentParams& params, const QuerySource query_type, |
134 | | const FinishCallback& cb); |
135 | | |
136 | | Status start_query_execution(const PExecPlanFragmentStartRequest* request); |
137 | | |
138 | | Status trigger_pipeline_context_report(const ReportStatusRequest, |
139 | | std::shared_ptr<pipeline::PipelineFragmentContext>&&); |
140 | | |
141 | | // Cancel instance (pipeline or nonpipeline). |
142 | | void cancel_instance(const TUniqueId& instance_id, const PPlanFragmentCancelReason& reason, |
143 | | const std::string& msg = ""); |
144 | | // Cancel fragment (only pipelineX). |
145 | | // {query id fragment} -> PipelineXFragmentContext |
146 | | void cancel_fragment(const TUniqueId& query_id, int32_t fragment_id, |
147 | | const PPlanFragmentCancelReason& reason, const std::string& msg = ""); |
148 | | |
149 | | // Can be used in both version. |
150 | | void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, |
151 | | const std::string& msg = ""); |
152 | | |
153 | | void cancel_worker(); |
154 | | |
155 | | void debug(std::stringstream& ss) override; |
156 | | |
157 | | // input: TScanOpenParams fragment_instance_id |
158 | | // output: selected_columns |
159 | | // execute external query, all query info are packed in TScanOpenParams |
160 | | Status exec_external_plan_fragment(const TScanOpenParams& params, |
161 | | const TUniqueId& fragment_instance_id, |
162 | | std::vector<TScanColumnDesc>* selected_columns); |
163 | | |
164 | | Status apply_filter(const PPublishFilterRequest* request, |
165 | | butil::IOBufAsZeroCopyInputStream* attach_data); |
166 | | |
167 | | Status apply_filterv2(const PPublishFilterRequestV2* request, |
168 | | butil::IOBufAsZeroCopyInputStream* attach_data); |
169 | | |
170 | | Status merge_filter(const PMergeFilterRequest* request, |
171 | | butil::IOBufAsZeroCopyInputStream* attach_data); |
172 | | |
173 | | Status send_filter_size(const PSendFilterSizeRequest* request); |
174 | | |
175 | | Status sync_filter_size(const PSyncFilterSizeRequest* request); |
176 | | |
177 | | std::string to_http_path(const std::string& file_name); |
178 | | |
179 | | void coordinator_callback(const ReportStatusRequest& req); |
180 | | |
181 | 0 | ThreadPool* get_thread_pool() { return _thread_pool.get(); } |
182 | | |
183 | | std::shared_ptr<QueryContext> get_query_context(const TUniqueId& query_id); |
184 | | |
185 | 0 | int32_t running_query_num() { return _query_ctx_map.num_items(); } |
186 | | |
187 | | std::string dump_pipeline_tasks(int64_t duration = 0); |
188 | | std::string dump_pipeline_tasks(TUniqueId& query_id); |
189 | | |
190 | | void get_runtime_query_info(std::vector<WorkloadQueryInfo>* _query_info_list); |
191 | | |
192 | | private: |
193 | | void cancel_unlocked_impl(const TUniqueId& id, const PPlanFragmentCancelReason& reason, |
194 | | const std::unique_lock<std::mutex>& state_lock, bool is_pipeline, |
195 | | const std::string& msg = ""); |
196 | | |
197 | | void _exec_actual(std::shared_ptr<PlanFragmentExecutor> fragment_executor, |
198 | | const FinishCallback& cb); |
199 | | struct BrpcItem { |
200 | | TNetworkAddress network_address; |
201 | | std::vector<std::weak_ptr<QueryContext>> queries; |
202 | | }; |
203 | | |
204 | | std::shared_ptr<QueryContext> _get_or_erase_query_ctx(const TUniqueId& query_id); |
205 | | |
206 | | template <typename Param> |
207 | | void _set_scan_concurrency(const Param& params, QueryContext* query_ctx); |
208 | | |
209 | | void _setup_shared_hashtable_for_broadcast_join(const TExecPlanFragmentParams& params, |
210 | | QueryContext* query_ctx); |
211 | | |
212 | | void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, |
213 | | const TPipelineInstanceParams& local_params, |
214 | | QueryContext* query_ctx); |
215 | | |
216 | | void _setup_shared_hashtable_for_broadcast_join(const TPipelineFragmentParams& params, |
217 | | QueryContext* query_ctx); |
218 | | |
219 | | template <typename Params> |
220 | | Status _get_query_ctx(const Params& params, TUniqueId query_id, bool pipeline, |
221 | | QuerySource query_type, std::shared_ptr<QueryContext>& query_ctx); |
222 | | |
223 | | void _check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub, |
224 | | const BrpcItem& brpc_item); |
225 | | |
226 | | // This is input params |
227 | | ExecEnv* _exec_env = nullptr; |
228 | | |
229 | | // Make sure that remove this before no data reference PlanFragmentExecutor |
230 | | // (QueryID, FragmentID) -> PipelineFragmentContext |
231 | | ConcurrentContextMap<TUniqueId, std::shared_ptr<PlanFragmentExecutor>, PlanFragmentExecutor> |
232 | | _fragment_instance_map; |
233 | | // (QueryID, FragmentID) -> PipelineFragmentContext |
234 | | ConcurrentContextMap<TUniqueId, std::shared_ptr<pipeline::PipelineFragmentContext>, |
235 | | pipeline::PipelineFragmentContext> |
236 | | _pipeline_map; |
237 | | |
238 | | // query id -> QueryContext |
239 | | ConcurrentContextMap<TUniqueId, std::shared_ptr<QueryContext>, QueryContext> _query_ctx_map; |
240 | | std::unordered_map<TUniqueId, std::unordered_map<int, int64_t>> _bf_size_map; |
241 | | |
242 | | CountDownLatch _stop_background_threads_latch; |
243 | | scoped_refptr<Thread> _cancel_thread; |
244 | | // every job is a pool |
245 | | std::unique_ptr<ThreadPool> _thread_pool; |
246 | | |
247 | | std::shared_ptr<MetricEntity> _entity; |
248 | | UIntGauge* timeout_canceled_fragment_count = nullptr; |
249 | | |
250 | | RuntimeFilterMergeController _runtimefilter_controller; |
251 | | std::unique_ptr<ThreadPool> _async_report_thread_pool = |
252 | | nullptr; // used for pipeliine context report |
253 | | }; |
254 | | |
255 | | } // namespace doris |