be/src/runtime/exec_env_init.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // IWYU pragma: no_include <bthread/errno.h> |
19 | | #include <gen_cpp/HeartbeatService_types.h> |
20 | | #include <gen_cpp/Metrics_types.h> |
21 | | #include <simdjson.h> |
22 | | #include <sys/resource.h> |
23 | | |
24 | | #include <cerrno> // IWYU pragma: keep |
25 | | #include <cstdint> |
26 | | #include <cstdlib> |
27 | | #include <cstring> |
28 | | #include <limits> |
29 | | #include <memory> |
30 | | #include <ostream> |
31 | | #include <string> |
32 | | #include <vector> |
33 | | |
34 | | #include "cloud/cloud_cluster_info.h" |
35 | | #include "cloud/cloud_storage_engine.h" |
36 | | #include "cloud/cloud_stream_load_executor.h" |
37 | | #include "cloud/cloud_tablet_hotspot.h" |
38 | | #include "cloud/cloud_warm_up_manager.h" |
39 | | #include "cloud/config.h" |
40 | | #include "common/cast_set.h" |
41 | | #include "common/config.h" |
42 | | #include "common/kerberos/kerberos_ticket_mgr.h" |
43 | | #include "common/logging.h" |
44 | | #include "common/metrics/doris_metrics.h" |
45 | | #include "common/multi_version.h" |
46 | | #include "common/status.h" |
47 | | #include "cpp/s3_rate_limiter.h" |
48 | | #include "exec/exchange/vdata_stream_mgr.h" |
49 | | #include "exec/pipeline/pipeline_tracing.h" |
50 | | #include "exec/pipeline/task_queue.h" |
51 | | #include "exec/pipeline/task_scheduler.h" |
52 | | #include "exec/scan/scanner_scheduler.h" |
53 | | #include "exec/sink/delta_writer_v2_pool.h" |
54 | | #include "exec/sink/load_stream_map_pool.h" |
55 | | #include "exec/spill/spill_stream_manager.h" |
56 | | #include "exprs/function/dictionary_factory.h" |
57 | | #include "format/orc/orc_memory_pool.h" |
58 | | #include "format/parquet/arrow_memory_pool.h" |
59 | | #include "io/cache/block_file_cache_downloader.h" |
60 | | #include "io/cache/block_file_cache_factory.h" |
61 | | #include "io/cache/fs_file_cache_storage.h" |
62 | | #include "io/fs/file_meta_cache.h" |
63 | | #include "io/fs/local_file_reader.h" |
64 | | #include "load/channel/load_channel_mgr.h" |
65 | | #include "load/channel/load_stream_mgr.h" |
66 | | #include "load/group_commit/group_commit_mgr.h" |
67 | | #include "load/group_commit/wal/wal_manager.h" |
68 | | #include "load/load_path_mgr.h" |
69 | | #include "load/memtable/memtable_memory_limiter.h" |
70 | | #include "load/routine_load/routine_load_task_executor.h" |
71 | | #include "load/stream_load/new_load_stream_mgr.h" |
72 | | #include "load/stream_load/stream_load_executor.h" |
73 | | #include "load/stream_load/stream_load_recorder_manager.h" |
74 | | #include "runtime/broker_mgr.h" |
75 | | #include "runtime/cache/result_cache.h" |
76 | | #include "runtime/cdc_client_mgr.h" |
77 | | #include "runtime/exec_env.h" |
78 | | #include "runtime/external_scan_context_mgr.h" |
79 | | #include "runtime/fragment_mgr.h" |
80 | | #include "runtime/heartbeat_flags.h" |
81 | | #include "runtime/index_policy/index_policy_mgr.h" |
82 | | #include "runtime/memory/cache_manager.h" |
83 | | #include "runtime/memory/heap_profiler.h" |
84 | | #include "runtime/memory/mem_tracker.h" |
85 | | #include "runtime/memory/mem_tracker_limiter.h" |
86 | | #include "runtime/memory/thread_mem_tracker_mgr.h" |
87 | | #include "runtime/process_profile.h" |
88 | | #include "runtime/query_cache/query_cache.h" |
89 | | #include "runtime/result_buffer_mgr.h" |
90 | | #include "runtime/result_queue_mgr.h" |
91 | | #include "runtime/runtime_query_statistics_mgr.h" |
92 | | #include "runtime/small_file_mgr.h" |
93 | | #include "runtime/thread_context.h" |
94 | | #include "runtime/user_function_cache.h" |
95 | | #include "runtime/workload_group/workload_group_manager.h" |
96 | | #include "runtime/workload_management/workload_sched_policy_mgr.h" |
97 | | #include "service/backend_options.h" |
98 | | #include "service/backend_service.h" |
99 | | #include "service/point_query_executor.h" |
100 | | #include "storage/cache/page_cache.h" |
101 | | #include "storage/cache/schema_cache.h" |
102 | | #include "storage/id_manager.h" |
103 | | #include "storage/index/inverted/inverted_index_cache.h" |
104 | | #include "storage/olap_define.h" |
105 | | #include "storage/options.h" |
106 | | #include "storage/segment/condition_cache.h" |
107 | | #include "storage/segment/encoding_info.h" |
108 | | #include "storage/segment/segment_loader.h" |
109 | | #include "storage/storage_engine.h" |
110 | | #include "storage/storage_policy.h" |
111 | | #include "storage/tablet/tablet_column_object_pool.h" |
112 | | #include "storage/tablet/tablet_meta.h" |
113 | | #include "storage/tablet/tablet_schema_cache.h" |
114 | | #include "udf/python/python_server.h" |
115 | | #include "util/bfd_parser.h" |
116 | | #include "util/bit_util.h" |
117 | | #include "util/brpc_client_cache.h" |
118 | | #include "util/client_cache.h" |
119 | | #include "util/cpu_info.h" |
120 | | #include "util/disk_info.h" |
121 | | #include "util/dns_cache.h" |
122 | | #include "util/mem_info.h" |
123 | | #include "util/parse_util.h" |
124 | | #include "util/pretty_printer.h" |
125 | | #include "util/threadpool.h" |
126 | | #include "util/thrift_rpc_helper.h" |
127 | | #include "util/timezone_utils.h" |
128 | | // clang-format off |
129 | | // this must after util/brpc_client_cache.h |
130 | | // /doris/thirdparty/installed/include/brpc/errno.pb.h:69:3: error: expected identifier |
131 | | // EINTERNAL = 2001, |
132 | | // ^ |
133 | | // /doris/thirdparty/installed/include/hadoop_hdfs/hdfs.h:61:19: note: expanded from macro 'EINTERNAL' |
134 | | // #define EINTERNAL 255 |
135 | | #include "io/fs/hdfs/hdfs_mgr.h" |
136 | | #include "io/fs/packed_file_manager.h" |
137 | | // clang-format on |
138 | | |
139 | | namespace doris { |
140 | | |
141 | | #include "common/compile_check_begin.h" |
142 | | class PBackendService_Stub; |
143 | | class PFunctionService_Stub; |
144 | | |
145 | | // Warmup download rate limiter metrics |
146 | | bvar::LatencyRecorder warmup_download_rate_limit_latency("warmup_download_rate_limit_latency"); |
147 | | bvar::Adder<int64_t> warmup_download_rate_limit_ns("warmup_download_rate_limit_ns"); |
148 | | bvar::Adder<int64_t> warmup_download_rate_limit_exceed_req_num( |
149 | | "warmup_download_rate_limit_exceed_req_num"); |
150 | | |
151 | 7 | static void init_doris_metrics(const std::vector<StorePath>& store_paths) { |
152 | 7 | bool init_system_metrics = config::enable_system_metrics; |
153 | 7 | std::set<std::string> disk_devices; |
154 | 7 | std::vector<std::string> network_interfaces; |
155 | 7 | std::vector<std::string> paths; |
156 | 11 | for (const auto& store_path : store_paths) { |
157 | 11 | paths.emplace_back(store_path.path); |
158 | 11 | } |
159 | 7 | if (init_system_metrics) { |
160 | 1 | auto st = DiskInfo::get_disk_devices(paths, &disk_devices); |
161 | 1 | if (!st.ok()) { |
162 | 0 | LOG(WARNING) << "get disk devices failed, status=" << st; |
163 | 0 | return; |
164 | 0 | } |
165 | 1 | st = get_inet_interfaces(&network_interfaces, BackendOptions::is_bind_ipv6()); |
166 | 1 | if (!st.ok()) { |
167 | 0 | LOG(WARNING) << "get inet interfaces failed, status=" << st; |
168 | 0 | return; |
169 | 0 | } |
170 | 1 | } |
171 | 7 | DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces); |
172 | 7 | } |
173 | | |
174 | | // Used to calculate the num of min thread and max thread based on the passed config |
175 | 14 | static std::pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) { |
176 | 14 | auto num_cores = doris::CpuInfo::num_cores(); |
177 | 14 | min_num = (min_num == 0) ? num_cores : min_num; |
178 | 14 | max_num = (max_num == 0) ? num_cores : max_num; |
179 | 14 | auto factor = max_num / min_num; |
180 | 14 | min_num = std::min(num_cores * factor, min_num); |
181 | 14 | max_num = std::min(min_num * factor, max_num); |
182 | 14 | return {min_num, max_num}; |
183 | 14 | } |
184 | | |
185 | 69.8k | ThreadPool* ExecEnv::non_block_close_thread_pool() { |
186 | 69.8k | return _non_block_close_thread_pool.get(); |
187 | 69.8k | } |
188 | | |
189 | 12 | ExecEnv::ExecEnv() = default; |
190 | | |
191 | 8 | ExecEnv::~ExecEnv() { |
192 | 8 | destroy(); |
193 | 8 | } |
194 | | |
195 | | Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths, |
196 | | const std::vector<StorePath>& spill_store_paths, |
197 | 7 | const std::set<std::string>& broken_paths) { |
198 | 7 | return env->_init(store_paths, spill_store_paths, broken_paths); |
199 | 7 | } |
200 | | |
201 | | // pick simdjson implementation based on CPU capabilities |
202 | 7 | inline void init_simdjson_parser() { |
203 | | // haswell: AVX2 (2013 Intel Haswell or later, all AMD Zen processors) |
204 | 7 | const auto* haswell_implementation = simdjson::get_available_implementations()["haswell"]; |
205 | 7 | if (!haswell_implementation || !haswell_implementation->supported_by_runtime_system()) { |
206 | | // pick available implementation |
207 | 0 | for (const auto* implementation : simdjson::get_available_implementations()) { |
208 | 0 | if (implementation->supported_by_runtime_system()) { |
209 | 0 | LOG(INFO) << "Using SimdJSON implementation : " << implementation->name() << ": " |
210 | 0 | << implementation->description(); |
211 | 0 | simdjson::get_active_implementation() = implementation; |
212 | 0 | return; |
213 | 0 | } |
214 | 0 | } |
215 | 0 | LOG(WARNING) << "No available SimdJSON implementation found."; |
216 | 7 | } else { |
217 | 7 | LOG(INFO) << "Using SimdJSON Haswell implementation"; |
218 | 7 | } |
219 | 7 | } |
220 | | |
221 | | Status ExecEnv::_init(const std::vector<StorePath>& store_paths, |
222 | | const std::vector<StorePath>& spill_store_paths, |
223 | 7 | const std::set<std::string>& broken_paths) { |
224 | | //Only init once before be destroyed |
225 | 7 | if (ready()) { |
226 | 0 | return Status::OK(); |
227 | 0 | } |
228 | 7 | std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> spill_store_map; |
229 | 11 | for (const auto& spill_path : spill_store_paths) { |
230 | 11 | spill_store_map.emplace(spill_path.path, std::make_unique<SpillDataDir>( |
231 | 11 | spill_path.path, spill_path.capacity_bytes, |
232 | 11 | spill_path.storage_medium)); |
233 | 11 | } |
234 | 7 | init_doris_metrics(store_paths); |
235 | 7 | _store_paths = store_paths; |
236 | 7 | _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths); |
237 | 7 | RETURN_IF_ERROR(_tmp_file_dirs->init()); |
238 | 7 | _user_function_cache = new UserFunctionCache(); |
239 | 7 | static_cast<void>(_user_function_cache->init(doris::config::user_function_dir)); |
240 | 7 | _external_scan_context_mgr = new ExternalScanContextMgr(this); |
241 | 7 | set_stream_mgr(new doris::VDataStreamMgr()); |
242 | 7 | _result_mgr = new ResultBufferMgr(); |
243 | 7 | _result_queue_mgr = new ResultQueueMgr(); |
244 | 7 | _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host); |
245 | 7 | _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host); |
246 | 7 | _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host); |
247 | | |
248 | 7 | TimezoneUtils::load_timezones_to_cache(); |
249 | | |
250 | 7 | static_cast<void>(ThreadPoolBuilder("SendBatchThreadPool") |
251 | 7 | .set_min_threads(config::send_batch_thread_pool_thread_num) |
252 | 7 | .set_max_threads(config::send_batch_thread_pool_thread_num) |
253 | 7 | .set_max_queue_size(config::send_batch_thread_pool_queue_size) |
254 | 7 | .build(&_send_batch_thread_pool)); |
255 | | |
256 | 7 | static_cast<void>(ThreadPoolBuilder("UDFCloseWorkers") |
257 | 7 | .set_min_threads(4) |
258 | 7 | .set_max_threads(std::min(32, CpuInfo::num_cores())) |
259 | 7 | .build(&_udf_close_workers_thread_pool)); |
260 | | |
261 | 7 | auto [buffered_reader_min_threads, buffered_reader_max_threads] = |
262 | 7 | get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread, |
263 | 7 | config::num_buffered_reader_prefetch_thread_pool_max_thread); |
264 | 7 | static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") |
265 | 7 | .set_min_threads(cast_set<int>(buffered_reader_min_threads)) |
266 | 7 | .set_max_threads(cast_set<int>(buffered_reader_max_threads)) |
267 | 7 | .build(&_buffered_reader_prefetch_thread_pool)); |
268 | | |
269 | 7 | static_cast<void>(ThreadPoolBuilder("SegmentPrefetchThreadPool") |
270 | 7 | .set_min_threads(cast_set<int>( |
271 | 7 | config::segment_prefetch_thread_pool_thread_num_min)) |
272 | 7 | .set_max_threads(cast_set<int>( |
273 | 7 | config::segment_prefetch_thread_pool_thread_num_max)) |
274 | 7 | .build(&_segment_prefetch_thread_pool)); |
275 | | |
276 | 7 | static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool") |
277 | 7 | .set_min_threads(8) |
278 | 7 | .set_max_threads(32) |
279 | 7 | .build(&_send_table_stats_thread_pool)); |
280 | | |
281 | 7 | auto [s3_file_upload_min_threads, s3_file_upload_max_threads] = |
282 | 7 | get_num_threads(config::num_s3_file_upload_thread_pool_min_thread, |
283 | 7 | config::num_s3_file_upload_thread_pool_max_thread); |
284 | 7 | static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool") |
285 | 7 | .set_min_threads(cast_set<int>(s3_file_upload_min_threads)) |
286 | 7 | .set_max_threads(cast_set<int>(s3_file_upload_max_threads)) |
287 | 7 | .build(&_s3_file_upload_thread_pool)); |
288 | | |
289 | | // min num equal to fragment pool's min num |
290 | | // max num is useless because it will start as many as requested in the past |
291 | | // queue size is useless because the max thread num is very large |
292 | 7 | static_cast<void>(ThreadPoolBuilder("LazyReleaseMemoryThreadPool") |
293 | 7 | .set_min_threads(1) |
294 | 7 | .set_max_threads(1) |
295 | 7 | .set_max_queue_size(1000000) |
296 | 7 | .build(&_lazy_release_obj_pool)); |
297 | 7 | static_cast<void>(ThreadPoolBuilder("NonBlockCloseThreadPool") |
298 | 7 | .set_min_threads(cast_set<int>(config::min_nonblock_close_thread_num)) |
299 | 7 | .set_max_threads(cast_set<int>(config::max_nonblock_close_thread_num)) |
300 | 7 | .build(&_non_block_close_thread_pool)); |
301 | 7 | static_cast<void>(ThreadPoolBuilder("S3FileSystemThreadPool") |
302 | 7 | .set_min_threads(config::min_s3_file_system_thread_num) |
303 | 7 | .set_max_threads(config::max_s3_file_system_thread_num) |
304 | 7 | .build(&_s3_file_system_thread_pool)); |
305 | 7 | RETURN_IF_ERROR(init_mem_env()); |
306 | | |
307 | | // NOTE: runtime query statistics mgr could be visited by query and daemon thread |
308 | | // so it should be created before all query begin and deleted after all query and daemon thread stoppped |
309 | 7 | _runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr(); |
310 | 7 | CgroupCpuCtl::init_doris_cgroup_path(); |
311 | 7 | _file_cache_open_fd_cache = std::make_unique<io::FDCache>(); |
312 | 7 | _file_cache_factory = new io::FileCacheFactory(); |
313 | 7 | std::vector<doris::CachePath> cache_paths; |
314 | 7 | init_file_cache_factory(cache_paths); |
315 | 7 | doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths, |
316 | 7 | cache_paths); |
317 | 7 | _pipeline_tracer_ctx = std::make_unique<PipelineTracerContext>(); // before query |
318 | 7 | _init_runtime_filter_timer_queue(); |
319 | | |
320 | 7 | _workload_group_manager = new WorkloadGroupMgr(); |
321 | | |
322 | 7 | _fragment_mgr = new FragmentMgr(this); |
323 | 7 | _result_cache = new ResultCache(config::query_cache_max_size_mb, |
324 | 7 | config::query_cache_elasticity_size_mb); |
325 | 7 | if (config::is_cloud_mode()) { |
326 | 1 | _cluster_info = new CloudClusterInfo(); |
327 | 6 | } else { |
328 | 6 | _cluster_info = new ClusterInfo(); |
329 | 6 | } |
330 | | |
331 | 7 | _load_path_mgr = new LoadPathMgr(this); |
332 | 7 | _bfd_parser = BfdParser::create(); |
333 | 7 | _broker_mgr = new BrokerMgr(this); |
334 | 7 | _load_channel_mgr = new LoadChannelMgr(); |
335 | 7 | auto num_flush_threads = std::min( |
336 | 7 | _store_paths.size() * config::flush_thread_num_per_store, |
337 | 7 | static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu); |
338 | 7 | _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads); |
339 | 7 | _new_load_stream_mgr = NewLoadStreamMgr::create_unique(); |
340 | 7 | _internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); |
341 | 7 | _streaming_client_cache = |
342 | 7 | new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming"); |
343 | 7 | _function_client_cache = |
344 | 7 | new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol); |
345 | 7 | if (config::is_cloud_mode()) { |
346 | 1 | _stream_load_executor = CloudStreamLoadExecutor::create_unique(this); |
347 | 6 | } else { |
348 | 6 | _stream_load_executor = StreamLoadExecutor::create_unique(this); |
349 | 6 | } |
350 | 7 | _routine_load_task_executor = new RoutineLoadTaskExecutor(this); |
351 | 7 | RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); |
352 | 7 | _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); |
353 | 7 | _group_commit_mgr = new GroupCommitMgr(this); |
354 | 7 | _cdc_client_mgr = new CdcClientMgr(); |
355 | 7 | _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>(); |
356 | 7 | _load_stream_map_pool = std::make_unique<LoadStreamMapPool>(); |
357 | 7 | _delta_writer_v2_pool = std::make_unique<DeltaWriterV2Pool>(); |
358 | 7 | _wal_manager = WalManager::create_unique(this, config::group_commit_wal_path); |
359 | 7 | _dns_cache = new DNSCache(); |
360 | 7 | _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>(); |
361 | 7 | _spill_stream_mgr = new SpillStreamManager(std::move(spill_store_map)); |
362 | 7 | _kerberos_ticket_mgr = new kerberos::KerberosTicketMgr(config::kerberos_ccache_path); |
363 | 7 | _hdfs_mgr = new io::HdfsMgr(); |
364 | 7 | _backend_client_cache->init_metrics("backend"); |
365 | 7 | _frontend_client_cache->init_metrics("frontend"); |
366 | 7 | _broker_client_cache->init_metrics("broker"); |
367 | 7 | static_cast<void>(_result_mgr->init()); |
368 | 7 | Status status = _load_path_mgr->init(); |
369 | 7 | if (!status.ok()) { |
370 | 0 | LOG(ERROR) << "Load path mgr init failed. " << status; |
371 | 0 | return status; |
372 | 0 | } |
373 | 7 | _broker_mgr->init(); |
374 | 7 | static_cast<void>(_small_file_mgr->init()); |
375 | 7 | if (!status.ok()) { |
376 | 0 | LOG(ERROR) << "Scanner scheduler init failed. " << status; |
377 | 0 | return status; |
378 | 0 | } |
379 | | |
380 | 7 | RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit())); |
381 | 7 | RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); |
382 | 7 | RETURN_IF_ERROR(_wal_manager->init()); |
383 | 7 | _heartbeat_flags = new HeartbeatFlags(); |
384 | | |
385 | 7 | _tablet_schema_cache = |
386 | 7 | TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); |
387 | | |
388 | 7 | _tablet_column_object_pool = TabletColumnObjectPool::create_global_column_cache( |
389 | 7 | config::tablet_schema_cache_capacity); |
390 | | |
391 | | // Storage engine |
392 | 7 | doris::EngineOptions options; |
393 | 7 | options.store_paths = store_paths; |
394 | 7 | options.broken_paths = broken_paths; |
395 | 7 | options.backend_uid = doris::UniqueId::gen_uid(); |
396 | | // Check if the startup mode has been modified |
397 | 7 | RETURN_IF_ERROR(_check_deploy_mode()); |
398 | 7 | if (config::is_cloud_mode()) { |
399 | 1 | std::cout << "start BE in cloud mode, cloud_unique_id: " << config::cloud_unique_id |
400 | 1 | << ", meta_service_endpoint: " << config::meta_service_endpoint << std::endl; |
401 | 1 | _storage_engine = std::make_unique<CloudStorageEngine>(options); |
402 | 6 | } else { |
403 | 6 | std::cout << "start BE in local mode" << std::endl; |
404 | 6 | _storage_engine = std::make_unique<StorageEngine>(options); |
405 | 6 | } |
406 | 7 | auto st = _storage_engine->open(); |
407 | 7 | if (!st.ok()) { |
408 | 0 | LOG(ERROR) << "Fail to open StorageEngine, res=" << st; |
409 | 0 | return st; |
410 | 0 | } |
411 | 7 | _storage_engine->set_heartbeat_flags(this->heartbeat_flags()); |
412 | 7 | if (st = _storage_engine->start_bg_threads(nullptr); !st.ok()) { |
413 | 0 | LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st; |
414 | 0 | return st; |
415 | 0 | } |
416 | | |
417 | | // should start after storage_engine->open() |
418 | 7 | _stream_load_recorder_manager = new StreamLoadRecorderManager(); |
419 | 7 | _stream_load_recorder_manager->start(); |
420 | | |
421 | | // create internal workload group should be after storage_engin->open() |
422 | 7 | RETURN_IF_ERROR(_create_internal_workload_group()); |
423 | 7 | _workload_sched_mgr = new WorkloadSchedPolicyMgr(); |
424 | 7 | _workload_sched_mgr->start(this); |
425 | | |
426 | | // Initialize packed file manager |
427 | 7 | _packed_file_manager = io::PackedFileManager::instance(); |
428 | 7 | if (config::is_cloud_mode()) { |
429 | 1 | RETURN_IF_ERROR(_packed_file_manager->init()); |
430 | 1 | _packed_file_manager->start_background_manager(); |
431 | | |
432 | | // Start cluster info background worker for compaction read-write separation |
433 | 1 | static_cast<CloudClusterInfo*>(_cluster_info)->start_bg_worker(); |
434 | 1 | } |
435 | | |
436 | 7 | _index_policy_mgr = new IndexPolicyMgr(); |
437 | | |
438 | | // Initialize warmup download rate limiter for cloud mode |
439 | | // Always create the rate limiter in cloud mode to support dynamic rate limit changes |
440 | 7 | if (config::is_cloud_mode()) { |
441 | 1 | int64_t rate_limit = config::file_cache_warmup_download_rate_limit_bytes_per_second; |
442 | | // When rate_limit <= 0, pass 0 to disable rate limiting |
443 | 1 | int64_t rate = rate_limit > 0 ? rate_limit : 0; |
444 | | // max_burst is the same as rate (1 second burst) |
445 | | // limit is 0 which means no total limit |
446 | | // When rate is 0, S3RateLimiter will not throttle (no rate limiting) |
447 | 1 | _warmup_download_rate_limiter = new S3RateLimiterHolder(rate, rate, 0, [&](int64_t ns) { |
448 | 0 | if (ns > 0) { |
449 | 0 | warmup_download_rate_limit_latency << ns / 1000; |
450 | 0 | warmup_download_rate_limit_ns << ns; |
451 | 0 | warmup_download_rate_limit_exceed_req_num << 1; |
452 | 0 | } |
453 | 0 | }); |
454 | 1 | } |
455 | | |
456 | 7 | RETURN_IF_ERROR(_spill_stream_mgr->init()); |
457 | 7 | RETURN_IF_ERROR(_runtime_query_statistics_mgr->start_report_thread()); |
458 | 7 | _dict_factory = new doris::DictionaryFactory(); |
459 | 7 | _s_ready = true; |
460 | | |
461 | 7 | init_simdjson_parser(); |
462 | | |
463 | | // Make aws-sdk-cpp InitAPI and ShutdownAPI called in the same thread |
464 | 7 | S3ClientFactory::instance(); |
465 | 7 | return Status::OK(); |
466 | 7 | } |
467 | | |
468 | | // when user not sepcify a workload group in FE, then query could |
469 | | // use dummy workload group. |
470 | 7 | Status ExecEnv::_create_internal_workload_group() { |
471 | 7 | LOG(INFO) << "begin create internal workload group."; |
472 | | |
473 | 7 | RETURN_IF_ERROR(_workload_group_manager->create_internal_wg()); |
474 | 7 | return Status::OK(); |
475 | 7 | } |
476 | | |
477 | 8 | void ExecEnv::_init_runtime_filter_timer_queue() { |
478 | 8 | _runtime_filter_timer_queue = new doris::RuntimeFilterTimerQueue(); |
479 | 8 | _runtime_filter_timer_queue->run(); |
480 | 8 | } |
481 | | |
482 | 7 | void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) { |
483 | | // Load file cache before starting up daemon threads to make sure StorageEngine is read. |
484 | 7 | if (!config::enable_file_cache) { |
485 | 4 | if (config::is_cloud_mode()) { |
486 | 0 | LOG(FATAL) << "Cloud mode requires to enable file cache, plz set " |
487 | 0 | "config::enable_file_cache " |
488 | 0 | "= true"; |
489 | 0 | exit(-1); |
490 | 0 | } |
491 | 4 | return; |
492 | 4 | } |
493 | 3 | if (config::file_cache_each_block_size > config::s3_write_buffer_size || |
494 | 3 | config::s3_write_buffer_size % config::file_cache_each_block_size != 0) { |
495 | 0 | LOG_FATAL( |
496 | 0 | "The config file_cache_each_block_size {} must less than or equal to config " |
497 | 0 | "s3_write_buffer_size {} and config::s3_write_buffer_size % " |
498 | 0 | "config::file_cache_each_block_size must be zero", |
499 | 0 | config::file_cache_each_block_size, config::s3_write_buffer_size); |
500 | 0 | exit(-1); |
501 | 0 | } |
502 | 3 | std::unordered_set<std::string> cache_path_set; |
503 | 3 | Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); |
504 | 3 | if (!rest) { |
505 | 0 | throw Exception( |
506 | 0 | Status::FatalError("parse config file cache path failed, path={}, reason={}", |
507 | 0 | doris::config::file_cache_path, rest.msg())); |
508 | 0 | } |
509 | | |
510 | 3 | doris::Status cache_status; |
511 | 5 | for (auto& cache_path : cache_paths) { |
512 | 5 | if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { |
513 | 0 | LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); |
514 | 0 | continue; |
515 | 0 | } |
516 | | |
517 | 5 | cache_status = doris::io::FileCacheFactory::instance()->create_file_cache( |
518 | 5 | cache_path.path, cache_path.init_settings()); |
519 | 5 | if (!cache_status.ok()) { |
520 | 0 | if (!doris::config::ignore_broken_disk) { |
521 | 0 | throw Exception( |
522 | 0 | Status::FatalError("failed to init file cache, err: {}", cache_status)); |
523 | 0 | } |
524 | 0 | LOG(WARNING) << "failed to init file cache, err: " << cache_status; |
525 | 0 | } |
526 | 5 | cache_path_set.emplace(cache_path.path); |
527 | 5 | } |
528 | 3 | } |
529 | | |
530 | 7 | Status ExecEnv::init_mem_env() { |
531 | 7 | bool is_percent = false; |
532 | 7 | std::stringstream ss; |
533 | | // 1. init mem tracker |
534 | 7 | _process_profile = ProcessProfile::create_global_instance(); |
535 | 7 | _heap_profiler = HeapProfiler::create_global_instance(); |
536 | 7 | init_mem_tracker(); |
537 | 7 | thread_context()->thread_mem_tracker_mgr->init(); |
538 | | |
539 | 7 | if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { |
540 | 0 | ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size; |
541 | 0 | return Status::InternalError(ss.str()); |
542 | 0 | } |
543 | | |
544 | 7 | _id_manager = new IdManager(); |
545 | 7 | _cache_manager = CacheManager::create_global_instance(); |
546 | | |
547 | 7 | int64_t storage_cache_limit = |
548 | 7 | ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(), |
549 | 7 | MemInfo::physical_mem(), &is_percent); |
550 | 7 | while (!is_percent && storage_cache_limit > MemInfo::mem_limit() / 2) { |
551 | 0 | storage_cache_limit = storage_cache_limit / 2; |
552 | 0 | } |
553 | 7 | int32_t index_percentage = config::index_page_cache_percentage; |
554 | 7 | int32_t num_shards = config::storage_page_cache_shard_size; |
555 | 7 | if ((num_shards & (num_shards - 1)) != 0) { |
556 | 0 | int old_num_shards = num_shards; |
557 | 0 | num_shards = cast_set<int>(BitUtil::RoundUpToPowerOfTwo(num_shards)); |
558 | 0 | LOG(WARNING) << "num_shards should be power of two, but got " << old_num_shards |
559 | 0 | << ". Rounded up to " << num_shards |
560 | 0 | << ". Please modify the 'storage_page_cache_shard_size' parameter in your " |
561 | 0 | "conf file to be a power of two for better performance."; |
562 | 0 | } |
563 | 7 | if (storage_cache_limit < num_shards * 2) { |
564 | 0 | LOG(WARNING) << "storage_cache_limit(" << storage_cache_limit << ") less than num_shards(" |
565 | 0 | << num_shards |
566 | 0 | << ") * 2, cache capacity will be 0, continuing to use " |
567 | 0 | "cache will only have negative effects, will be disabled."; |
568 | 0 | } |
569 | 7 | int64_t pk_storage_page_cache_limit = |
570 | 7 | ParseUtil::parse_mem_spec(config::pk_storage_page_cache_limit, MemInfo::mem_limit(), |
571 | 7 | MemInfo::physical_mem(), &is_percent); |
572 | 7 | while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() / 2) { |
573 | 0 | pk_storage_page_cache_limit = storage_cache_limit / 2; |
574 | 0 | } |
575 | 7 | _storage_page_cache = StoragePageCache::create_global_cache( |
576 | 7 | storage_cache_limit, index_percentage, pk_storage_page_cache_limit, num_shards); |
577 | 7 | LOG(INFO) << "Storage page cache memory limit: " |
578 | 7 | << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES) |
579 | 7 | << ", origin config value: " << config::storage_page_cache_limit; |
580 | | |
581 | | // Init row cache |
582 | 7 | int64_t row_cache_mem_limit = |
583 | 7 | ParseUtil::parse_mem_spec(config::row_cache_mem_limit, MemInfo::mem_limit(), |
584 | 7 | MemInfo::physical_mem(), &is_percent); |
585 | 7 | while (!is_percent && row_cache_mem_limit > MemInfo::mem_limit() / 2) { |
586 | | // Reason same as buffer_pool_limit |
587 | 0 | row_cache_mem_limit = row_cache_mem_limit / 2; |
588 | 0 | } |
589 | 7 | _row_cache = RowCache::create_global_cache(row_cache_mem_limit); |
590 | 7 | LOG(INFO) << "Row cache memory limit: " |
591 | 7 | << PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES) |
592 | 7 | << ", origin config value: " << config::row_cache_mem_limit; |
593 | | |
594 | 7 | uint64_t fd_number = config::min_file_descriptor_number; |
595 | 7 | struct rlimit l; |
596 | 7 | int ret = getrlimit(RLIMIT_NOFILE, &l); |
597 | 7 | if (ret != 0) { |
598 | 0 | LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno) |
599 | 0 | << ", use default configuration instead."; |
600 | 7 | } else { |
601 | 7 | fd_number = static_cast<uint64_t>(l.rlim_cur); |
602 | 7 | } |
603 | | |
604 | 7 | int64_t segment_cache_capacity = 0; |
605 | 7 | if (config::is_cloud_mode()) { |
606 | | // when in cloud mode, segment cache hold no system FD |
607 | | // thus the FD num limit makes no sense |
608 | | // cloud mode use FDCache to control FD |
609 | 1 | segment_cache_capacity = UINT32_MAX; |
610 | 6 | } else { |
611 | | // SegmentLoader caches segments in rowset granularity. So the size of |
612 | | // opened files will greater than segment_cache_capacity. |
613 | 6 | segment_cache_capacity = config::segment_cache_capacity; |
614 | 6 | int64_t segment_cache_fd_limit = fd_number / 100 * config::segment_cache_fd_percentage; |
615 | 6 | if (segment_cache_capacity < 0 || segment_cache_capacity > segment_cache_fd_limit) { |
616 | 6 | segment_cache_capacity = segment_cache_fd_limit; |
617 | 6 | } |
618 | 6 | } |
619 | | |
620 | 7 | int64_t segment_cache_mem_limit = |
621 | 7 | MemInfo::mem_limit() / 100 * config::segment_cache_memory_percentage; |
622 | | |
623 | 7 | _segment_loader = new SegmentLoader(segment_cache_mem_limit, segment_cache_capacity); |
624 | 7 | LOG(INFO) << "segment_cache_capacity <= fd_number * 1 / 5, fd_number: " << fd_number |
625 | 7 | << " segment_cache_capacity: " << segment_cache_capacity |
626 | 7 | << " min_segment_cache_mem_limit " << segment_cache_mem_limit; |
627 | | |
628 | 7 | _schema_cache = new SchemaCache(config::schema_cache_capacity); |
629 | | |
630 | 7 | size_t block_file_cache_fd_cache_size = |
631 | 7 | std::min((uint64_t)config::file_cache_max_file_reader_cache_size, fd_number / 3); |
632 | 7 | LOG(INFO) << "max file reader cache size is: " << block_file_cache_fd_cache_size |
633 | 7 | << ", resource hard limit is: " << fd_number |
634 | 7 | << ", config file_cache_max_file_reader_cache_size is: " |
635 | 7 | << config::file_cache_max_file_reader_cache_size; |
636 | 7 | config::file_cache_max_file_reader_cache_size = block_file_cache_fd_cache_size; |
637 | | |
638 | 7 | _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); |
639 | | |
640 | 7 | _lookup_connection_cache = |
641 | 7 | LookupConnectionCache::create_global_instance(config::lookup_connection_cache_capacity); |
642 | | |
643 | | // use memory limit |
644 | 7 | int64_t inverted_index_cache_limit = |
645 | 7 | ParseUtil::parse_mem_spec(config::inverted_index_searcher_cache_limit, |
646 | 7 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
647 | 7 | while (!is_percent && inverted_index_cache_limit > MemInfo::mem_limit() / 2) { |
648 | | // Reason same as buffer_pool_limit |
649 | 0 | inverted_index_cache_limit = inverted_index_cache_limit / 2; |
650 | 0 | } |
651 | 7 | _inverted_index_searcher_cache = |
652 | 7 | InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit, 256); |
653 | 7 | LOG(INFO) << "Inverted index searcher cache memory limit: " |
654 | 7 | << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) |
655 | 7 | << ", origin config value: " << config::inverted_index_searcher_cache_limit; |
656 | | |
657 | | // use memory limit |
658 | 7 | int64_t inverted_index_query_cache_limit = |
659 | 7 | ParseUtil::parse_mem_spec(config::inverted_index_query_cache_limit, |
660 | 7 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
661 | 7 | while (!is_percent && inverted_index_query_cache_limit > MemInfo::mem_limit() / 2) { |
662 | | // Reason same as buffer_pool_limit |
663 | 0 | inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2; |
664 | 0 | } |
665 | 7 | _inverted_index_query_cache = InvertedIndexQueryCache::create_global_cache( |
666 | 7 | inverted_index_query_cache_limit, config::inverted_index_query_cache_shards); |
667 | 7 | LOG(INFO) << "Inverted index query match cache memory limit: " |
668 | 7 | << PrettyPrinter::print(inverted_index_query_cache_limit, TUnit::BYTES) |
669 | 7 | << ", origin config value: " << config::inverted_index_query_cache_limit; |
670 | | |
671 | | // use memory limit |
672 | 7 | int64_t condition_cache_limit = config::condition_cache_limit * 1024L * 1024L; |
673 | 7 | _condition_cache = ConditionCache::create_global_cache(condition_cache_limit); |
674 | 7 | LOG(INFO) << "Condition cache memory limit: " |
675 | 7 | << PrettyPrinter::print(condition_cache_limit, TUnit::BYTES) |
676 | 7 | << ", origin config value: " << config::condition_cache_limit; |
677 | | |
678 | | // Initialize encoding info resolver |
679 | 7 | _encoding_info_resolver = new segment_v2::EncodingInfoResolver(); |
680 | | |
681 | | // init orc memory pool |
682 | 7 | _orc_memory_pool = new doris::ORCMemoryPool(); |
683 | 7 | _arrow_memory_pool = new doris::ArrowMemoryPool(); |
684 | | |
685 | 7 | _query_cache = QueryCache::create_global_cache(config::query_cache_size * 1024L * 1024L); |
686 | 7 | LOG(INFO) << "query cache memory limit: " << config::query_cache_size << "MB"; |
687 | | |
688 | | // The default delete bitmap cache is set to 100MB, |
689 | | // which can be insufficient and cause performance issues when the amount of user data is large. |
690 | | // To mitigate the problem of an inadequate cache, |
691 | | // we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size. |
692 | 7 | int64_t delete_bitmap_agg_cache_cache_limit = |
693 | 7 | ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit, |
694 | 7 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
695 | 7 | _delete_bitmap_agg_cache = DeleteBitmapAggCache::create_instance(std::max( |
696 | 7 | delete_bitmap_agg_cache_cache_limit, config::delete_bitmap_agg_cache_capacity)); |
697 | | |
698 | 7 | return Status::OK(); |
699 | 7 | } |
700 | | |
701 | 8 | void ExecEnv::init_mem_tracker() { |
702 | 8 | mem_tracker_limiter_pool.resize(MEM_TRACKER_GROUP_NUM, |
703 | 8 | TrackerLimiterGroup()); // before all mem tracker init. |
704 | 8 | _s_tracking_memory = true; |
705 | 8 | _orphan_mem_tracker = |
706 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); |
707 | 8 | _brpc_iobuf_block_memory_tracker = |
708 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "IOBufBlockMemory"); |
709 | 8 | _segcompaction_mem_tracker = |
710 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, "SegCompaction"); |
711 | 8 | _tablets_no_cache_mem_tracker = MemTrackerLimiter::create_shared( |
712 | 8 | MemTrackerLimiter::Type::METADATA, "Tablets(not in SchemaCache, TabletSchemaCache)"); |
713 | 8 | _segments_no_cache_mem_tracker = MemTrackerLimiter::create_shared( |
714 | 8 | MemTrackerLimiter::Type::METADATA, "Segments(not in SegmentCache)"); |
715 | 8 | _rowsets_no_cache_mem_tracker = |
716 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "Rowsets"); |
717 | 8 | _point_query_executor_mem_tracker = |
718 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); |
719 | 8 | _query_cache_mem_tracker = |
720 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::CACHE, "QueryCache"); |
721 | 8 | _block_compression_mem_tracker = |
722 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); |
723 | 8 | _rowid_storage_reader_tracker = |
724 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); |
725 | 8 | _subcolumns_tree_tracker = |
726 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); |
727 | 8 | _s3_file_buffer_tracker = |
728 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); |
729 | 8 | _stream_load_pipe_tracker = |
730 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); |
731 | 8 | _parquet_meta_tracker = |
732 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "ParquetMeta"); |
733 | 8 | } |
734 | | |
735 | 7 | Status ExecEnv::_check_deploy_mode() { |
736 | 11 | for (auto _path : _store_paths) { |
737 | 11 | auto deploy_mode_path = fmt::format("{}/{}", _path.path, DEPLOY_MODE_PREFIX); |
738 | 11 | std::string expected_mode = doris::config::is_cloud_mode() ? "cloud" : "local"; |
739 | 11 | bool exists = false; |
740 | 11 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(deploy_mode_path, &exists)); |
741 | 11 | if (exists) { |
742 | | // check if is ok |
743 | 10 | io::FileReaderSPtr reader; |
744 | 10 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(deploy_mode_path, &reader)); |
745 | 10 | size_t fsize = reader->size(); |
746 | 10 | if (fsize > 0) { |
747 | 10 | std::string actual_mode; |
748 | 10 | actual_mode.resize(fsize, '\0'); |
749 | 10 | size_t bytes_read = 0; |
750 | 10 | RETURN_IF_ERROR(reader->read_at(0, {actual_mode.data(), fsize}, &bytes_read)); |
751 | 10 | DCHECK_EQ(fsize, bytes_read); |
752 | 10 | if (expected_mode != actual_mode) { |
753 | 0 | return Status::InternalError( |
754 | 0 | "You can't switch deploy mode from {} to {}, " |
755 | 0 | "maybe you need to check be.conf\n", |
756 | 0 | actual_mode.c_str(), expected_mode.c_str()); |
757 | 0 | } |
758 | 10 | LOG(INFO) << "The current deployment mode is " << expected_mode << "."; |
759 | 10 | } |
760 | 10 | } else { |
761 | 1 | io::FileWriterPtr file_writer; |
762 | 1 | RETURN_IF_ERROR( |
763 | 1 | io::global_local_filesystem()->create_file(deploy_mode_path, &file_writer)); |
764 | 1 | RETURN_IF_ERROR(file_writer->append(expected_mode)); |
765 | 1 | RETURN_IF_ERROR(file_writer->close()); |
766 | 1 | LOG(INFO) << "The file deploy_mode doesn't exist, create it."; |
767 | 1 | auto cluster_id_path = fmt::format("{}/{}", _path.path, CLUSTER_ID_PREFIX); |
768 | 1 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
769 | 1 | if (exists) { |
770 | 0 | LOG(WARNING) << "This may be an upgrade from old version," |
771 | 0 | << "or the deploy_mode file has been manually deleted"; |
772 | 0 | } |
773 | 1 | } |
774 | 11 | } |
775 | 7 | return Status::OK(); |
776 | 7 | } |
777 | | |
778 | | #ifdef BE_TEST |
779 | | void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) { |
780 | | this->_new_load_stream_mgr = std::move(new_load_stream_mgr); |
781 | | } |
782 | | |
783 | | void ExecEnv::clear_new_load_stream_mgr() { |
784 | | this->_new_load_stream_mgr.reset(); |
785 | | } |
786 | | |
787 | | void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) { |
788 | | this->_stream_load_executor = std::move(stream_load_executor); |
789 | | } |
790 | | |
791 | | void ExecEnv::clear_stream_load_executor() { |
792 | | this->_stream_load_executor.reset(); |
793 | | } |
794 | | |
795 | | void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) { |
796 | | this->_wal_manager = std::move(wm); |
797 | | } |
798 | | void ExecEnv::clear_wal_mgr() { |
799 | | this->_wal_manager.reset(); |
800 | | } |
801 | | #endif |
802 | | // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. |
803 | | // We need to stop all threads before releasing resource. |
804 | 16 | void ExecEnv::destroy() { |
805 | | //Only destroy once after init |
806 | 16 | if (!ready()) { |
807 | 9 | return; |
808 | 9 | } |
809 | | // Memory barrier to prevent other threads from accessing destructed resources |
810 | 7 | _s_ready = false; |
811 | | |
812 | 7 | SAFE_STOP(_wal_manager); |
813 | 7 | _wal_manager.reset(); |
814 | 7 | SAFE_STOP(_load_channel_mgr); |
815 | 7 | SAFE_STOP(_broker_mgr); |
816 | 7 | SAFE_STOP(_load_path_mgr); |
817 | 7 | SAFE_STOP(_result_mgr); |
818 | 7 | SAFE_STOP(_group_commit_mgr); |
819 | | // _routine_load_task_executor should be stopped before _new_load_stream_mgr. |
820 | 7 | SAFE_STOP(_routine_load_task_executor); |
821 | 7 | SAFE_STOP(_stream_load_recorder_manager); |
822 | | // stop workload scheduler |
823 | 7 | SAFE_STOP(_workload_sched_mgr); |
824 | | // stop pipline step 2, cgroup execution |
825 | 7 | SAFE_STOP(_workload_group_manager); |
826 | | |
827 | 7 | SAFE_STOP(_external_scan_context_mgr); |
828 | 7 | SAFE_STOP(_fragment_mgr); |
829 | 7 | SAFE_STOP(_runtime_filter_timer_queue); |
830 | | // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped. |
831 | 7 | _load_stream_mgr.reset(); |
832 | 7 | _new_load_stream_mgr.reset(); |
833 | 7 | _stream_load_executor.reset(); |
834 | 7 | _memtable_memory_limiter.reset(); |
835 | 7 | _delta_writer_v2_pool.reset(); |
836 | 7 | _load_stream_map_pool.reset(); |
837 | 7 | SAFE_STOP(_write_cooldown_meta_executors); |
838 | | |
839 | | // _id_manager must be destoried before tablet schema cache |
840 | 7 | SAFE_DELETE(_id_manager); |
841 | | |
842 | | // Stop cluster info background worker before storage engine is destroyed |
843 | 7 | if (config::is_cloud_mode() && _cluster_info) { |
844 | 0 | static_cast<CloudClusterInfo*>(_cluster_info)->stop_bg_worker(); |
845 | 0 | } |
846 | | |
847 | | // StorageEngine must be destoried before _cache_manager destory |
848 | 7 | SAFE_STOP(_storage_engine); |
849 | 7 | _storage_engine.reset(); |
850 | | |
851 | 7 | SAFE_STOP(_spill_stream_mgr); |
852 | 7 | if (_runtime_query_statistics_mgr) { |
853 | 3 | _runtime_query_statistics_mgr->stop_report_thread(); |
854 | 3 | } |
855 | 7 | SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); |
856 | 7 | SAFE_SHUTDOWN(_segment_prefetch_thread_pool); |
857 | 7 | SAFE_SHUTDOWN(_s3_file_upload_thread_pool); |
858 | 7 | SAFE_SHUTDOWN(_lazy_release_obj_pool); |
859 | 7 | SAFE_SHUTDOWN(_non_block_close_thread_pool); |
860 | 7 | SAFE_SHUTDOWN(_s3_file_system_thread_pool); |
861 | 7 | SAFE_SHUTDOWN(_send_batch_thread_pool); |
862 | 7 | SAFE_SHUTDOWN(_udf_close_workers_thread_pool); |
863 | 7 | SAFE_SHUTDOWN(_send_table_stats_thread_pool); |
864 | | |
865 | 7 | SAFE_DELETE(_load_channel_mgr); |
866 | | |
867 | 7 | SAFE_DELETE(_inverted_index_query_cache); |
868 | 7 | SAFE_DELETE(_inverted_index_searcher_cache); |
869 | 7 | SAFE_DELETE(_condition_cache); |
870 | 7 | SAFE_DELETE(_encoding_info_resolver); |
871 | 7 | SAFE_DELETE(_lookup_connection_cache); |
872 | 7 | SAFE_DELETE(_schema_cache); |
873 | 7 | SAFE_DELETE(_segment_loader); |
874 | 7 | SAFE_DELETE(_row_cache); |
875 | 7 | SAFE_DELETE(_query_cache); |
876 | 7 | SAFE_DELETE(_delete_bitmap_agg_cache); |
877 | | |
878 | | // Free resource after threads are stopped. |
879 | | // Some threads are still running, like threads created by _new_load_stream_mgr ... |
880 | 7 | SAFE_DELETE(_tablet_schema_cache); |
881 | 7 | SAFE_DELETE(_tablet_column_object_pool); |
882 | | |
883 | | // _storage_page_cache must be destoried before _cache_manager |
884 | 7 | SAFE_DELETE(_storage_page_cache); |
885 | | |
886 | 7 | SAFE_DELETE(_small_file_mgr); |
887 | 7 | SAFE_DELETE(_broker_mgr); |
888 | 7 | SAFE_DELETE(_load_path_mgr); |
889 | 7 | SAFE_DELETE(_result_mgr); |
890 | 7 | SAFE_DELETE(_file_meta_cache); |
891 | 7 | SAFE_DELETE(_group_commit_mgr); |
892 | 7 | SAFE_DELETE(_cdc_client_mgr); |
893 | 7 | SAFE_DELETE(_routine_load_task_executor); |
894 | 7 | SAFE_DELETE(_stream_load_recorder_manager); |
895 | | // _stream_load_executor |
896 | 7 | SAFE_DELETE(_function_client_cache); |
897 | 7 | SAFE_DELETE(_streaming_client_cache); |
898 | 7 | SAFE_DELETE(_internal_client_cache); |
899 | | |
900 | 7 | SAFE_DELETE(_bfd_parser); |
901 | 7 | SAFE_DELETE(_result_cache); |
902 | 7 | SAFE_DELETE(_vstream_mgr); |
903 | | // When _vstream_mgr is deconstructed, it will try call query context's dctor and will |
904 | | // access spill stream mgr, so spill stream mgr should be deconstructed after data stream manager |
905 | 7 | SAFE_DELETE(_spill_stream_mgr); |
906 | 7 | SAFE_DELETE(_fragment_mgr); |
907 | 7 | SAFE_DELETE(_workload_sched_mgr); |
908 | 7 | SAFE_DELETE(_workload_group_manager); |
909 | 7 | SAFE_DELETE(_file_cache_factory); |
910 | 7 | SAFE_DELETE(_runtime_filter_timer_queue); |
911 | 7 | SAFE_DELETE(_dict_factory); |
912 | | // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? |
913 | 7 | _lazy_release_obj_pool.reset(nullptr); |
914 | 7 | _non_block_close_thread_pool.reset(nullptr); |
915 | 7 | _s3_file_system_thread_pool.reset(nullptr); |
916 | 7 | _send_table_stats_thread_pool.reset(nullptr); |
917 | 7 | _buffered_reader_prefetch_thread_pool.reset(nullptr); |
918 | 7 | _segment_prefetch_thread_pool.reset(nullptr); |
919 | 7 | _s3_file_upload_thread_pool.reset(nullptr); |
920 | 7 | _send_batch_thread_pool.reset(nullptr); |
921 | 7 | _udf_close_workers_thread_pool.reset(nullptr); |
922 | 7 | _write_cooldown_meta_executors.reset(nullptr); |
923 | | |
924 | 7 | SAFE_DELETE(_broker_client_cache); |
925 | 7 | SAFE_DELETE(_frontend_client_cache); |
926 | 7 | SAFE_DELETE(_backend_client_cache); |
927 | 7 | SAFE_DELETE(_result_queue_mgr); |
928 | | |
929 | 7 | SAFE_DELETE(_external_scan_context_mgr); |
930 | 7 | SAFE_DELETE(_user_function_cache); |
931 | | |
932 | | // cache_manager must be destoried after all cache. |
933 | | // https://github.com/apache/doris/issues/24082#issuecomment-1712544039 |
934 | 7 | SAFE_DELETE(_cache_manager); |
935 | 7 | _file_cache_open_fd_cache.reset(nullptr); |
936 | | |
937 | | // _heartbeat_flags must be destoried after staroge engine |
938 | 7 | SAFE_DELETE(_heartbeat_flags); |
939 | | |
940 | | // Master Info is a thrift object, it could be the last one to deconstruct. |
941 | | // Master info should be deconstruct later than fragment manager, because fragment will |
942 | | // access cluster_info.backend_id to access some info. If there is a running query and master |
943 | | // info is deconstructed then BE process will core at coordinator back method in fragment mgr. |
944 | 7 | SAFE_DELETE(_cluster_info); |
945 | | |
946 | | // NOTE: runtime query statistics mgr could be visited by query and daemon thread |
947 | | // so it should be created before all query begin and deleted after all query and daemon thread stoppped |
948 | 7 | SAFE_DELETE(_runtime_query_statistics_mgr); |
949 | | |
950 | 7 | SAFE_DELETE(_arrow_memory_pool); |
951 | | |
952 | 7 | SAFE_DELETE(_orc_memory_pool); |
953 | | |
954 | | // dns cache is a global instance and need to be released at last |
955 | 7 | SAFE_DELETE(_dns_cache); |
956 | 7 | SAFE_DELETE(_kerberos_ticket_mgr); |
957 | 7 | SAFE_DELETE(_hdfs_mgr); |
958 | | // PackedFileManager is a singleton, just stop its background thread |
959 | 7 | if (_packed_file_manager) { |
960 | 3 | _packed_file_manager->stop_background_manager(); |
961 | 3 | _packed_file_manager = nullptr; |
962 | 3 | } |
963 | | |
964 | 7 | SAFE_DELETE(_process_profile); |
965 | 7 | SAFE_DELETE(_heap_profiler); |
966 | | |
967 | 7 | SAFE_DELETE(_index_policy_mgr); |
968 | 7 | SAFE_DELETE(_warmup_download_rate_limiter); |
969 | | |
970 | 7 | _s_tracking_memory = false; |
971 | | |
972 | 7 | clear_storage_resource(); |
973 | 7 | PythonServerManager::instance().shutdown(); |
974 | | LOG(INFO) << "Doris exec envorinment is destoried."; |
975 | 7 | } |
976 | | |
977 | | } // namespace doris |
978 | | |
979 | | namespace doris::config { |
980 | | // Callback to update warmup download rate limiter when config changes is registered |
981 | | DEFINE_ON_UPDATE(file_cache_warmup_download_rate_limit_bytes_per_second, |
982 | | [](int64_t old_val, int64_t new_val) { |
983 | | auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter(); |
984 | | if (rate_limiter != nullptr && new_val != old_val) { |
985 | | // Reset rate limiter with new rate limit value |
986 | | // When new_val <= 0, pass 0 to disable rate limiting |
987 | | int64_t rate = new_val > 0 ? new_val : 0; |
988 | | rate_limiter->reset(rate, rate, 0); |
989 | | if (rate > 0) { |
990 | | LOG(INFO) << "Warmup download rate limiter updated from " << old_val |
991 | | << " to " << new_val << " bytes/s"; |
992 | | } else { |
993 | | LOG(INFO) << "Warmup download rate limiter disabled"; |
994 | | } |
995 | | } |
996 | | }); |
997 | | } // namespace doris::config |