/root/doris/be/src/runtime/exec_env.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <map> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <string> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "olap/options.h" |
32 | | #include "util/threadpool.h" |
33 | | |
34 | | namespace doris { |
35 | | namespace vectorized { |
36 | | class VDataStreamMgr; |
37 | | class ScannerScheduler; |
38 | | } // namespace vectorized |
39 | | namespace pipeline { |
40 | | class TaskScheduler; |
41 | | } |
42 | | namespace taskgroup { |
43 | | class TaskGroupManager; |
44 | | } |
45 | | class BfdParser; |
46 | | class BrokerMgr; |
47 | | template <class T> |
48 | | class BrpcClientCache; |
49 | | class ExternalScanContextMgr; |
50 | | class FragmentMgr; |
51 | | class ResultCache; |
52 | | class LoadPathMgr; |
53 | | class NewLoadStreamMgr; |
54 | | class MemTrackerLimiter; |
55 | | class MemTracker; |
56 | | class StorageEngine; |
57 | | class ResultBufferMgr; |
58 | | class ResultQueueMgr; |
59 | | class RuntimeQueryStatiticsMgr; |
60 | | class TMasterInfo; |
61 | | class LoadChannelMgr; |
62 | | class StreamLoadExecutor; |
63 | | class RoutineLoadTaskExecutor; |
64 | | class SmallFileMgr; |
65 | | class BlockSpillManager; |
66 | | class BackendServiceClient; |
67 | | class TPaloBrokerServiceClient; |
68 | | class PBackendService_Stub; |
69 | | class PFunctionService_Stub; |
70 | | template <class T> |
71 | | class ClientCache; |
72 | | class HeartbeatFlags; |
73 | | class FrontendServiceClient; |
74 | | class FileMetaCache; |
75 | | class DNSCache; |
76 | | |
77 | | // Execution environment for queries/plan fragments. |
78 | | // Contains all required global structures, and handles to |
79 | | // singleton services. Clients must call StartServices exactly |
80 | | // once to properly initialise service state. |
81 | | class ExecEnv { |
82 | | public: |
83 | | // Initial exec environment. must call this to init all |
84 | | static Status init(ExecEnv* env, const std::vector<StorePath>& store_paths); |
85 | | static void destroy(ExecEnv* exec_env); |
86 | | |
87 | | /// Returns the first created exec env instance. In a normal doris, this is |
88 | | /// the only instance. In test setups with multiple ExecEnv's per process, |
89 | | /// we return the most recently created instance. |
90 | 24.5k | static ExecEnv* GetInstance() { |
91 | 24.5k | static ExecEnv s_exec_env; |
92 | 24.5k | return &s_exec_env; |
93 | 24.5k | } |
94 | | |
95 | | // only used for test |
96 | | ExecEnv(); |
97 | | |
98 | | // Empty destructor because the compiler-generated one requires full |
99 | | // declarations for classes in scoped_ptrs. |
100 | | ~ExecEnv(); |
101 | | |
102 | 111 | bool initialized() const { return _is_init; } |
103 | | const std::string& token() const; |
104 | 0 | ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; } |
105 | 0 | doris::vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; } |
106 | 0 | ResultBufferMgr* result_mgr() { return _result_mgr; } |
107 | 1 | ResultQueueMgr* result_queue_mgr() { return _result_queue_mgr; } |
108 | 0 | ClientCache<BackendServiceClient>* client_cache() { return _backend_client_cache; } |
109 | 0 | ClientCache<FrontendServiceClient>* frontend_client_cache() { return _frontend_client_cache; } |
110 | 0 | ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return _broker_client_cache; } |
111 | | |
112 | 0 | pipeline::TaskScheduler* pipeline_task_scheduler() { return _pipeline_task_scheduler; } |
113 | 0 | pipeline::TaskScheduler* pipeline_task_group_scheduler() { |
114 | 0 | return _pipeline_task_group_scheduler; |
115 | 0 | } |
116 | 0 | taskgroup::TaskGroupManager* task_group_manager() { return _task_group_manager; } |
117 | | |
118 | 12 | RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() { |
119 | 12 | return _runtime_query_statistics_mgr; |
120 | 12 | } |
121 | | |
122 | | // using template to simplify client cache management |
123 | | template <typename T> |
124 | | inline ClientCache<T>* get_client_cache() { |
125 | | return nullptr; |
126 | | } |
127 | | |
128 | | void init_mem_tracker(); |
129 | 177 | std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; } |
130 | 45 | MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } |
131 | 114 | MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); } |
132 | 23.9k | std::shared_ptr<MemTracker> page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker; } |
133 | 0 | MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); } |
134 | | |
135 | 5 | ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); } |
136 | 1 | ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); } |
137 | 4 | ThreadPool* buffered_reader_prefetch_thread_pool() { |
138 | 4 | return _buffered_reader_prefetch_thread_pool.get(); |
139 | 4 | } |
140 | 0 | ThreadPool* send_report_thread_pool() { return _send_report_thread_pool.get(); } |
141 | 0 | ThreadPool* join_node_thread_pool() { return _join_node_thread_pool.get(); } |
142 | | |
143 | 1 | void set_serial_download_cache_thread_token() { |
144 | 1 | _serial_download_cache_thread_token = |
145 | 1 | download_cache_thread_pool()->new_token(ThreadPool::ExecutionMode::SERIAL, 1); |
146 | 1 | } |
147 | 0 | ThreadPoolToken* get_serial_download_cache_thread_token() { |
148 | 0 | return _serial_download_cache_thread_token.get(); |
149 | 0 | } |
150 | | void init_download_cache_buf(); |
151 | | void init_download_cache_required_components(); |
152 | | Status init_pipeline_task_scheduler(); |
153 | 0 | char* get_download_cache_buf(ThreadPoolToken* token) { |
154 | 0 | if (_download_cache_buf_map.find(token) == _download_cache_buf_map.end()) { |
155 | 0 | return nullptr; |
156 | 0 | } |
157 | 0 | return _download_cache_buf_map[token].get(); |
158 | 0 | } |
159 | 1 | FragmentMgr* fragment_mgr() { return _fragment_mgr; } |
160 | 0 | ResultCache* result_cache() { return _result_cache; } |
161 | 14 | TMasterInfo* master_info() { return _master_info; } |
162 | 1 | LoadPathMgr* load_path_mgr() { return _load_path_mgr; } |
163 | 1 | BfdParser* bfd_parser() const { return _bfd_parser; } |
164 | 0 | BrokerMgr* broker_mgr() const { return _broker_mgr; } |
165 | 26 | BrpcClientCache<PBackendService_Stub>* brpc_internal_client_cache() const { |
166 | 26 | return _internal_client_cache; |
167 | 26 | } |
168 | 0 | BrpcClientCache<PFunctionService_Stub>* brpc_function_client_cache() const { |
169 | 0 | return _function_client_cache; |
170 | 0 | } |
171 | 0 | LoadChannelMgr* load_channel_mgr() { return _load_channel_mgr; } |
172 | 3 | std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr() { return _new_load_stream_mgr; } |
173 | 0 | SmallFileMgr* small_file_mgr() { return _small_file_mgr; } |
174 | 7 | BlockSpillManager* block_spill_mgr() { return _block_spill_mgr; } |
175 | | |
176 | 1 | const std::vector<StorePath>& store_paths() const { return _store_paths; } |
177 | | |
178 | 0 | StorageEngine* storage_engine() { return _storage_engine; } |
179 | 30 | void set_storage_engine(StorageEngine* storage_engine) { _storage_engine = storage_engine; } |
180 | | |
181 | 1 | std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return _stream_load_executor; } |
182 | 0 | RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } |
183 | 0 | HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } |
184 | 0 | doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } |
185 | 0 | FileMetaCache* file_meta_cache() { return _file_meta_cache; } |
186 | 0 | DNSCache* dns_cache() { return _dns_cache; } |
187 | | |
188 | | // only for unit test |
189 | 1 | void set_master_info(TMasterInfo* master_info) { this->_master_info = master_info; } |
190 | 1 | void set_new_load_stream_mgr(std::shared_ptr<NewLoadStreamMgr> new_load_stream_mgr) { |
191 | 1 | this->_new_load_stream_mgr = new_load_stream_mgr; |
192 | 1 | } |
193 | 1 | void set_stream_load_executor(std::shared_ptr<StreamLoadExecutor> stream_load_executor) { |
194 | 1 | this->_stream_load_executor = stream_load_executor; |
195 | 1 | } |
196 | | |
197 | | private: |
198 | | Status _init(const std::vector<StorePath>& store_paths); |
199 | | void _destroy(); |
200 | | |
201 | | Status _init_mem_env(); |
202 | | |
203 | | void _register_metrics(); |
204 | | void _deregister_metrics(); |
205 | | |
206 | | bool _is_init; |
207 | | std::vector<StorePath> _store_paths; |
208 | | |
209 | | // Leave protected so that subclasses can override |
210 | | ExternalScanContextMgr* _external_scan_context_mgr = nullptr; |
211 | | doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr; |
212 | | ResultBufferMgr* _result_mgr = nullptr; |
213 | | ResultQueueMgr* _result_queue_mgr = nullptr; |
214 | | ClientCache<BackendServiceClient>* _backend_client_cache = nullptr; |
215 | | ClientCache<FrontendServiceClient>* _frontend_client_cache = nullptr; |
216 | | ClientCache<TPaloBrokerServiceClient>* _broker_client_cache = nullptr; |
217 | | |
218 | | // The default tracker consumed by mem hook. If the thread does not attach other trackers, |
219 | | // by default all consumption will be passed to the process tracker through the orphan tracker. |
220 | | // In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`. |
221 | | // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", |
222 | | // and the consumption of the orphan mem tracker is close to 0, but greater than 0. |
223 | | std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker; |
224 | | MemTrackerLimiter* _orphan_mem_tracker_raw; |
225 | | std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker; |
226 | | // page size not in cache, data page/index page/etc. |
227 | | std::shared_ptr<MemTracker> _page_no_cache_mem_tracker; |
228 | | std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker; |
229 | | |
230 | | std::unique_ptr<ThreadPool> _send_batch_thread_pool; |
231 | | |
232 | | // Threadpool used to download cache from remote storage |
233 | | std::unique_ptr<ThreadPool> _download_cache_thread_pool; |
234 | | // Threadpool used to prefetch remote file for buffered reader |
235 | | std::unique_ptr<ThreadPool> _buffered_reader_prefetch_thread_pool; |
236 | | // A token used to submit download cache task serially |
237 | | std::unique_ptr<ThreadPoolToken> _serial_download_cache_thread_token; |
238 | | // Pool used by fragment manager to send profile or status to FE coordinator |
239 | | std::unique_ptr<ThreadPool> _send_report_thread_pool; |
240 | | // Pool used by join node to build hash table |
241 | | std::unique_ptr<ThreadPool> _join_node_thread_pool; |
242 | | // ThreadPoolToken -> buffer |
243 | | std::unordered_map<ThreadPoolToken*, std::unique_ptr<char[]>> _download_cache_buf_map; |
244 | | FragmentMgr* _fragment_mgr = nullptr; |
245 | | pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr; |
246 | | pipeline::TaskScheduler* _pipeline_task_group_scheduler = nullptr; |
247 | | taskgroup::TaskGroupManager* _task_group_manager = nullptr; |
248 | | |
249 | | ResultCache* _result_cache = nullptr; |
250 | | TMasterInfo* _master_info = nullptr; |
251 | | LoadPathMgr* _load_path_mgr = nullptr; |
252 | | |
253 | | BfdParser* _bfd_parser = nullptr; |
254 | | BrokerMgr* _broker_mgr = nullptr; |
255 | | LoadChannelMgr* _load_channel_mgr = nullptr; |
256 | | std::shared_ptr<NewLoadStreamMgr> _new_load_stream_mgr; |
257 | | BrpcClientCache<PBackendService_Stub>* _internal_client_cache = nullptr; |
258 | | BrpcClientCache<PFunctionService_Stub>* _function_client_cache = nullptr; |
259 | | |
260 | | StorageEngine* _storage_engine = nullptr; |
261 | | |
262 | | std::shared_ptr<StreamLoadExecutor> _stream_load_executor; |
263 | | RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; |
264 | | SmallFileMgr* _small_file_mgr = nullptr; |
265 | | HeartbeatFlags* _heartbeat_flags = nullptr; |
266 | | doris::vectorized::ScannerScheduler* _scanner_scheduler = nullptr; |
267 | | |
268 | | BlockSpillManager* _block_spill_mgr = nullptr; |
269 | | // To save meta info of external file, such as parquet footer. |
270 | | FileMetaCache* _file_meta_cache = nullptr; |
271 | | DNSCache* _dns_cache = nullptr; |
272 | | |
273 | | RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr; |
274 | | }; |
275 | | |
276 | | template <> |
277 | 0 | inline ClientCache<BackendServiceClient>* ExecEnv::get_client_cache<BackendServiceClient>() { |
278 | 0 | return _backend_client_cache; |
279 | 0 | } |
280 | | template <> |
281 | 0 | inline ClientCache<FrontendServiceClient>* ExecEnv::get_client_cache<FrontendServiceClient>() { |
282 | 0 | return _frontend_client_cache; |
283 | 0 | } |
284 | | template <> |
285 | | inline ClientCache<TPaloBrokerServiceClient>* |
286 | 0 | ExecEnv::get_client_cache<TPaloBrokerServiceClient>() { |
287 | 0 | return _broker_client_cache; |
288 | 0 | } |
289 | | |
290 | | } // namespace doris |