be/src/runtime/exec_env_init.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | // IWYU pragma: no_include <bthread/errno.h> |
19 | | #include <gen_cpp/HeartbeatService_types.h> |
20 | | #include <gen_cpp/Metrics_types.h> |
21 | | #include <simdjson.h> |
22 | | #include <sys/resource.h> |
23 | | |
24 | | #include <cerrno> // IWYU pragma: keep |
25 | | #include <cstdint> |
26 | | #include <cstdlib> |
27 | | #include <cstring> |
28 | | #include <limits> |
29 | | #include <memory> |
30 | | #include <ostream> |
31 | | #include <string> |
32 | | #include <vector> |
33 | | |
34 | | #include "cloud/cloud_cluster_info.h" |
35 | | #include "cloud/cloud_storage_engine.h" |
36 | | #include "cloud/cloud_stream_load_executor.h" |
37 | | #include "cloud/cloud_tablet_hotspot.h" |
38 | | #include "cloud/cloud_warm_up_manager.h" |
39 | | #include "cloud/config.h" |
40 | | #include "common/cast_set.h" |
41 | | #include "common/config.h" |
42 | | #include "common/kerberos/kerberos_ticket_mgr.h" |
43 | | #include "common/logging.h" |
44 | | #include "common/metrics/doris_metrics.h" |
45 | | #include "common/multi_version.h" |
46 | | #include "common/status.h" |
47 | | #include "cpp/s3_rate_limiter.h" |
48 | | #include "exec/exchange/vdata_stream_mgr.h" |
49 | | #include "exec/pipeline/pipeline_tracing.h" |
50 | | #include "exec/pipeline/task_queue.h" |
51 | | #include "exec/pipeline/task_scheduler.h" |
52 | | #include "exec/scan/scanner_scheduler.h" |
53 | | #include "exec/sink/delta_writer_v2_pool.h" |
54 | | #include "exec/sink/load_stream_map_pool.h" |
55 | | #include "exec/spill/spill_file_manager.h" |
56 | | #include "exprs/function/dictionary_factory.h" |
57 | | #include "format/orc/orc_memory_pool.h" |
58 | | #include "format/parquet/arrow_memory_pool.h" |
59 | | #include "io/cache/block_file_cache_downloader.h" |
60 | | #include "io/cache/block_file_cache_factory.h" |
61 | | #include "io/cache/fs_file_cache_storage.h" |
62 | | #include "io/fs/file_meta_cache.h" |
63 | | #include "io/fs/local_file_reader.h" |
64 | | #include "load/channel/load_channel_mgr.h" |
65 | | #include "load/channel/load_stream_mgr.h" |
66 | | #include "load/group_commit/group_commit_mgr.h" |
67 | | #include "load/group_commit/wal/wal_manager.h" |
68 | | #include "load/load_path_mgr.h" |
69 | | #include "load/memtable/memtable_memory_limiter.h" |
70 | | #include "load/routine_load/routine_load_task_executor.h" |
71 | | #include "load/stream_load/new_load_stream_mgr.h" |
72 | | #include "load/stream_load/stream_load_executor.h" |
73 | | #include "load/stream_load/stream_load_recorder_manager.h" |
74 | | #include "runtime/broker_mgr.h" |
75 | | #include "runtime/cache/result_cache.h" |
76 | | #include "runtime/cdc_client_mgr.h" |
77 | | #include "runtime/exec_env.h" |
78 | | #include "runtime/external_scan_context_mgr.h" |
79 | | #include "runtime/fragment_mgr.h" |
80 | | #include "runtime/heartbeat_flags.h" |
81 | | #include "runtime/index_policy/index_policy_mgr.h" |
82 | | #include "runtime/memory/cache_manager.h" |
83 | | #include "runtime/memory/heap_profiler.h" |
84 | | #include "runtime/memory/mem_tracker.h" |
85 | | #include "runtime/memory/mem_tracker_limiter.h" |
86 | | #include "runtime/memory/thread_mem_tracker_mgr.h" |
87 | | #include "runtime/process_profile.h" |
88 | | #include "runtime/query_cache/query_cache.h" |
89 | | #include "runtime/result_buffer_mgr.h" |
90 | | #include "runtime/result_queue_mgr.h" |
91 | | #include "runtime/runtime_query_statistics_mgr.h" |
92 | | #include "runtime/small_file_mgr.h" |
93 | | #include "runtime/thread_context.h" |
94 | | #include "runtime/user_function_cache.h" |
95 | | #include "runtime/workload_group/workload_group_manager.h" |
96 | | #include "runtime/workload_management/workload_sched_policy_mgr.h" |
97 | | #include "service/backend_options.h" |
98 | | #include "service/backend_service.h" |
99 | | #include "service/point_query_executor.h" |
100 | | #include "storage/cache/page_cache.h" |
101 | | #include "storage/cache/schema_cache.h" |
102 | | #include "storage/id_manager.h" |
103 | | #include "storage/index/inverted/inverted_index_cache.h" |
104 | | #include "storage/olap_define.h" |
105 | | #include "storage/options.h" |
106 | | #include "storage/segment/condition_cache.h" |
107 | | #include "storage/segment/encoding_info.h" |
108 | | #include "storage/segment/segment_loader.h" |
109 | | #include "storage/storage_engine.h" |
110 | | #include "storage/storage_policy.h" |
111 | | #include "storage/tablet/tablet_column_object_pool.h" |
112 | | #include "storage/tablet/tablet_meta.h" |
113 | | #include "storage/tablet/tablet_schema_cache.h" |
114 | | #include "udf/python/python_server.h" |
115 | | #include "util/bfd_parser.h" |
116 | | #include "util/bit_util.h" |
117 | | #include "util/brpc_client_cache.h" |
118 | | #include "util/client_cache.h" |
119 | | #include "util/cpu_info.h" |
120 | | #include "util/disk_info.h" |
121 | | #include "util/dns_cache.h" |
122 | | #include "util/mem_info.h" |
123 | | #include "util/parse_util.h" |
124 | | #include "util/pretty_printer.h" |
125 | | #include "util/threadpool.h" |
126 | | #include "util/thrift_rpc_helper.h" |
127 | | #include "util/timezone_utils.h" |
128 | | |
129 | | // clang-format off |
130 | | // this must after util/brpc_client_cache.h |
131 | | // /doris/thirdparty/installed/include/brpc/errno.pb.h:69:3: error: expected identifier |
132 | | // EINTERNAL = 2001, |
133 | | // ^ |
134 | | // /doris/thirdparty/installed/include/hadoop_hdfs/hdfs.h:61:19: note: expanded from macro 'EINTERNAL' |
135 | | // #define EINTERNAL 255 |
136 | | #include "io/fs/hdfs/hdfs_mgr.h" |
137 | | #include "io/fs/packed_file_manager.h" |
138 | | // clang-format on |
139 | | |
140 | | namespace doris { |
141 | | |
142 | | #include "common/compile_check_begin.h" |
143 | | class PBackendService_Stub; |
144 | | class PFunctionService_Stub; |
145 | | |
146 | | // Warmup download rate limiter metrics |
147 | | bvar::LatencyRecorder warmup_download_rate_limit_latency("warmup_download_rate_limit_latency"); |
148 | | bvar::Adder<int64_t> warmup_download_rate_limit_ns("warmup_download_rate_limit_ns"); |
149 | | bvar::Adder<int64_t> warmup_download_rate_limit_exceed_req_num( |
150 | | "warmup_download_rate_limit_exceed_req_num"); |
151 | | |
152 | 7 | static void init_doris_metrics(const std::vector<StorePath>& store_paths) { |
153 | 7 | bool init_system_metrics = config::enable_system_metrics; |
154 | 7 | std::set<std::string> disk_devices; |
155 | 7 | std::vector<std::string> network_interfaces; |
156 | 7 | std::vector<std::string> paths; |
157 | 11 | for (const auto& store_path : store_paths) { |
158 | 11 | paths.emplace_back(store_path.path); |
159 | 11 | } |
160 | 7 | if (init_system_metrics) { |
161 | 3 | auto st = DiskInfo::get_disk_devices(paths, &disk_devices); |
162 | 3 | if (!st.ok()) { |
163 | 0 | LOG(WARNING) << "get disk devices failed, status=" << st; |
164 | 0 | return; |
165 | 0 | } |
166 | 3 | st = get_inet_interfaces(&network_interfaces, BackendOptions::is_bind_ipv6()); |
167 | 3 | if (!st.ok()) { |
168 | 0 | LOG(WARNING) << "get inet interfaces failed, status=" << st; |
169 | 0 | return; |
170 | 0 | } |
171 | 3 | } |
172 | 7 | DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces); |
173 | 7 | } |
174 | | |
175 | | // Used to calculate the num of min thread and max thread based on the passed config |
176 | 14 | static std::pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) { |
177 | 14 | auto num_cores = doris::CpuInfo::num_cores(); |
178 | 14 | min_num = (min_num == 0) ? num_cores : min_num; |
179 | 14 | max_num = (max_num == 0) ? num_cores : max_num; |
180 | 14 | auto factor = max_num / min_num; |
181 | 14 | min_num = std::min(num_cores * factor, min_num); |
182 | 14 | max_num = std::min(min_num * factor, max_num); |
183 | 14 | return {min_num, max_num}; |
184 | 14 | } |
185 | | |
186 | 20.5k | ThreadPool* ExecEnv::non_block_close_thread_pool() { |
187 | 20.5k | return _non_block_close_thread_pool.get(); |
188 | 20.5k | } |
189 | | |
190 | 12 | ExecEnv::ExecEnv() = default; |
191 | | |
192 | 8 | ExecEnv::~ExecEnv() { |
193 | 8 | destroy(); |
194 | 8 | } |
195 | | |
196 | | Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths, |
197 | | const std::vector<StorePath>& spill_store_paths, |
198 | 7 | const std::set<std::string>& broken_paths) { |
199 | 7 | return env->_init(store_paths, spill_store_paths, broken_paths); |
200 | 7 | } |
201 | | |
202 | | // pick simdjson implementation based on CPU capabilities |
203 | 7 | inline void init_simdjson_parser() { |
204 | | // haswell: AVX2 (2013 Intel Haswell or later, all AMD Zen processors) |
205 | 7 | const auto* haswell_implementation = simdjson::get_available_implementations()["haswell"]; |
206 | 7 | if (!haswell_implementation || !haswell_implementation->supported_by_runtime_system()) { |
207 | | // pick available implementation |
208 | 0 | for (const auto* implementation : simdjson::get_available_implementations()) { |
209 | 0 | if (implementation->supported_by_runtime_system()) { |
210 | 0 | LOG(INFO) << "Using SimdJSON implementation : " << implementation->name() << ": " |
211 | 0 | << implementation->description(); |
212 | 0 | simdjson::get_active_implementation() = implementation; |
213 | 0 | return; |
214 | 0 | } |
215 | 0 | } |
216 | 0 | LOG(WARNING) << "No available SimdJSON implementation found."; |
217 | 7 | } else { |
218 | 7 | LOG(INFO) << "Using SimdJSON Haswell implementation"; |
219 | 7 | } |
220 | 7 | } |
221 | | |
222 | | Status ExecEnv::_init(const std::vector<StorePath>& store_paths, |
223 | | const std::vector<StorePath>& spill_store_paths, |
224 | 7 | const std::set<std::string>& broken_paths) { |
225 | | //Only init once before be destroyed |
226 | 7 | if (ready()) { |
227 | 0 | return Status::OK(); |
228 | 0 | } |
229 | 7 | std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> spill_store_map; |
230 | 11 | for (const auto& spill_path : spill_store_paths) { |
231 | 11 | spill_store_map.emplace(spill_path.path, std::make_unique<SpillDataDir>( |
232 | 11 | spill_path.path, spill_path.capacity_bytes, |
233 | 11 | spill_path.storage_medium)); |
234 | 11 | } |
235 | 7 | init_doris_metrics(store_paths); |
236 | 7 | _store_paths = store_paths; |
237 | 7 | _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths); |
238 | 7 | RETURN_IF_ERROR(_tmp_file_dirs->init()); |
239 | 7 | _user_function_cache = new UserFunctionCache(); |
240 | 7 | static_cast<void>(_user_function_cache->init(doris::config::user_function_dir)); |
241 | 7 | _external_scan_context_mgr = new ExternalScanContextMgr(this); |
242 | 7 | set_stream_mgr(new doris::VDataStreamMgr()); |
243 | 7 | _result_mgr = new ResultBufferMgr(); |
244 | 7 | _result_queue_mgr = new ResultQueueMgr(); |
245 | 7 | _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host); |
246 | 7 | _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host); |
247 | 7 | _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host); |
248 | | |
249 | 7 | TimezoneUtils::load_timezones_to_cache(); |
250 | | |
251 | 7 | static_cast<void>(ThreadPoolBuilder("SendBatchThreadPool") |
252 | 7 | .set_min_threads(config::send_batch_thread_pool_thread_num) |
253 | 7 | .set_max_threads(config::send_batch_thread_pool_thread_num) |
254 | 7 | .set_max_queue_size(config::send_batch_thread_pool_queue_size) |
255 | 7 | .build(&_send_batch_thread_pool)); |
256 | | |
257 | 7 | static_cast<void>(ThreadPoolBuilder("UDFCloseWorkers") |
258 | 7 | .set_min_threads(4) |
259 | 7 | .set_max_threads(std::min(32, CpuInfo::num_cores())) |
260 | 7 | .build(&_udf_close_workers_thread_pool)); |
261 | | |
262 | 7 | auto [buffered_reader_min_threads, buffered_reader_max_threads] = |
263 | 7 | get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread, |
264 | 7 | config::num_buffered_reader_prefetch_thread_pool_max_thread); |
265 | 7 | static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool") |
266 | 7 | .set_min_threads(cast_set<int>(buffered_reader_min_threads)) |
267 | 7 | .set_max_threads(cast_set<int>(buffered_reader_max_threads)) |
268 | 7 | .build(&_buffered_reader_prefetch_thread_pool)); |
269 | | |
270 | 7 | static_cast<void>(ThreadPoolBuilder("SegmentPrefetchThreadPool") |
271 | 7 | .set_min_threads(cast_set<int>( |
272 | 7 | config::segment_prefetch_thread_pool_thread_num_min)) |
273 | 7 | .set_max_threads(cast_set<int>( |
274 | 7 | config::segment_prefetch_thread_pool_thread_num_max)) |
275 | 7 | .build(&_segment_prefetch_thread_pool)); |
276 | | |
277 | 7 | static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool") |
278 | 7 | .set_min_threads(8) |
279 | 7 | .set_max_threads(32) |
280 | 7 | .build(&_send_table_stats_thread_pool)); |
281 | | |
282 | 7 | auto [s3_file_upload_min_threads, s3_file_upload_max_threads] = |
283 | 7 | get_num_threads(config::num_s3_file_upload_thread_pool_min_thread, |
284 | 7 | config::num_s3_file_upload_thread_pool_max_thread); |
285 | 7 | static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool") |
286 | 7 | .set_min_threads(cast_set<int>(s3_file_upload_min_threads)) |
287 | 7 | .set_max_threads(cast_set<int>(s3_file_upload_max_threads)) |
288 | 7 | .build(&_s3_file_upload_thread_pool)); |
289 | | |
290 | | // min num equal to fragment pool's min num |
291 | | // max num is useless because it will start as many as requested in the past |
292 | | // queue size is useless because the max thread num is very large |
293 | 7 | static_cast<void>(ThreadPoolBuilder("LazyReleaseMemoryThreadPool") |
294 | 7 | .set_min_threads(1) |
295 | 7 | .set_max_threads(1) |
296 | 7 | .set_max_queue_size(1000000) |
297 | 7 | .build(&_lazy_release_obj_pool)); |
298 | 7 | static_cast<void>(ThreadPoolBuilder("NonBlockCloseThreadPool") |
299 | 7 | .set_min_threads(cast_set<int>(config::min_nonblock_close_thread_num)) |
300 | 7 | .set_max_threads(cast_set<int>(config::max_nonblock_close_thread_num)) |
301 | 7 | .build(&_non_block_close_thread_pool)); |
302 | 7 | static_cast<void>(ThreadPoolBuilder("S3FileSystemThreadPool") |
303 | 7 | .set_min_threads(config::min_s3_file_system_thread_num) |
304 | 7 | .set_max_threads(config::max_s3_file_system_thread_num) |
305 | 7 | .build(&_s3_file_system_thread_pool)); |
306 | 7 | RETURN_IF_ERROR(init_mem_env()); |
307 | | |
308 | | // NOTE: runtime query statistics mgr could be visited by query and daemon thread |
309 | | // so it should be created before all query begin and deleted after all query and daemon thread stoppped |
310 | 7 | _runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr(); |
311 | 7 | CgroupCpuCtl::init_doris_cgroup_path(); |
312 | 7 | _file_cache_open_fd_cache = std::make_unique<io::FDCache>(); |
313 | 7 | _file_cache_factory = new io::FileCacheFactory(); |
314 | 7 | std::vector<doris::CachePath> cache_paths; |
315 | 7 | init_file_cache_factory(cache_paths); |
316 | 7 | doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths, |
317 | 7 | cache_paths); |
318 | 7 | _pipeline_tracer_ctx = std::make_unique<PipelineTracerContext>(); // before query |
319 | 7 | _init_runtime_filter_timer_queue(); |
320 | | |
321 | 7 | _workload_group_manager = new WorkloadGroupMgr(); |
322 | | |
323 | 7 | _fragment_mgr = new FragmentMgr(this); |
324 | 7 | _result_cache = new ResultCache(config::query_cache_max_size_mb, |
325 | 7 | config::query_cache_elasticity_size_mb); |
326 | 7 | if (config::is_cloud_mode()) { |
327 | 1 | _cluster_info = new CloudClusterInfo(); |
328 | 6 | } else { |
329 | 6 | _cluster_info = new ClusterInfo(); |
330 | 6 | } |
331 | | |
332 | 7 | _load_path_mgr = new LoadPathMgr(this); |
333 | 7 | _bfd_parser = BfdParser::create(); |
334 | 7 | _broker_mgr = new BrokerMgr(this); |
335 | 7 | _load_channel_mgr = new LoadChannelMgr(); |
336 | 7 | auto num_flush_threads = std::min( |
337 | 7 | _store_paths.size() * config::flush_thread_num_per_store, |
338 | 7 | static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu); |
339 | 7 | _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads); |
340 | 7 | _new_load_stream_mgr = NewLoadStreamMgr::create_unique(); |
341 | 7 | _internal_client_cache = new BrpcClientCache<PBackendService_Stub>(); |
342 | 7 | _streaming_client_cache = |
343 | 7 | new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming"); |
344 | 7 | _function_client_cache = |
345 | 7 | new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol); |
346 | 7 | if (config::is_cloud_mode()) { |
347 | 1 | _stream_load_executor = CloudStreamLoadExecutor::create_unique(this); |
348 | 6 | } else { |
349 | 6 | _stream_load_executor = StreamLoadExecutor::create_unique(this); |
350 | 6 | } |
351 | 7 | _routine_load_task_executor = new RoutineLoadTaskExecutor(this); |
352 | 7 | RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit())); |
353 | 7 | _small_file_mgr = new SmallFileMgr(this, config::small_file_dir); |
354 | 7 | _group_commit_mgr = new GroupCommitMgr(this); |
355 | 7 | _cdc_client_mgr = new CdcClientMgr(); |
356 | 7 | _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>(); |
357 | 7 | _load_stream_map_pool = std::make_unique<LoadStreamMapPool>(); |
358 | 7 | _delta_writer_v2_pool = std::make_unique<DeltaWriterV2Pool>(); |
359 | 7 | _wal_manager = WalManager::create_unique(this, config::group_commit_wal_path); |
360 | 7 | _dns_cache = new DNSCache(); |
361 | 7 | _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>(); |
362 | | |
363 | 7 | _spill_file_mgr = new SpillFileManager(std::move(spill_store_map)); |
364 | 7 | _kerberos_ticket_mgr = new kerberos::KerberosTicketMgr(config::kerberos_ccache_path); |
365 | 7 | _hdfs_mgr = new io::HdfsMgr(); |
366 | 7 | _backend_client_cache->init_metrics("backend"); |
367 | 7 | _frontend_client_cache->init_metrics("frontend"); |
368 | 7 | _broker_client_cache->init_metrics("broker"); |
369 | 7 | static_cast<void>(_result_mgr->init()); |
370 | 7 | Status status = _load_path_mgr->init(); |
371 | 7 | if (!status.ok()) { |
372 | 0 | LOG(ERROR) << "Load path mgr init failed. " << status; |
373 | 0 | return status; |
374 | 0 | } |
375 | 7 | _broker_mgr->init(); |
376 | 7 | static_cast<void>(_small_file_mgr->init()); |
377 | 7 | if (!status.ok()) { |
378 | 0 | LOG(ERROR) << "Scanner scheduler init failed. " << status; |
379 | 0 | return status; |
380 | 0 | } |
381 | | |
382 | 7 | RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit())); |
383 | 7 | RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit())); |
384 | 7 | RETURN_IF_ERROR(_wal_manager->init()); |
385 | 7 | _heartbeat_flags = new HeartbeatFlags(); |
386 | | |
387 | 7 | _tablet_schema_cache = |
388 | 7 | TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity); |
389 | | |
390 | 7 | _tablet_column_object_pool = TabletColumnObjectPool::create_global_column_cache( |
391 | 7 | config::tablet_schema_cache_capacity); |
392 | | |
393 | | // Storage engine |
394 | 7 | doris::EngineOptions options; |
395 | 7 | options.store_paths = store_paths; |
396 | 7 | options.broken_paths = broken_paths; |
397 | 7 | options.backend_uid = doris::UniqueId::gen_uid(); |
398 | | // Check if the startup mode has been modified |
399 | 7 | RETURN_IF_ERROR(_check_deploy_mode()); |
400 | 7 | if (config::is_cloud_mode()) { |
401 | 1 | std::cout << "start BE in cloud mode, cloud_unique_id: " << config::cloud_unique_id |
402 | 1 | << ", meta_service_endpoint: " << config::meta_service_endpoint << std::endl; |
403 | 1 | _storage_engine = std::make_unique<CloudStorageEngine>(options); |
404 | 6 | } else { |
405 | 6 | std::cout << "start BE in local mode" << std::endl; |
406 | 6 | _storage_engine = std::make_unique<StorageEngine>(options); |
407 | 6 | } |
408 | 7 | auto st = _storage_engine->open(); |
409 | 7 | if (!st.ok()) { |
410 | 0 | LOG(ERROR) << "Fail to open StorageEngine, res=" << st; |
411 | 0 | return st; |
412 | 0 | } |
413 | 7 | _storage_engine->set_heartbeat_flags(this->heartbeat_flags()); |
414 | 7 | if (st = _storage_engine->start_bg_threads(nullptr); !st.ok()) { |
415 | 0 | LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st; |
416 | 0 | return st; |
417 | 0 | } |
418 | | |
419 | | // should start after storage_engine->open() |
420 | 7 | _stream_load_recorder_manager = new StreamLoadRecorderManager(); |
421 | 7 | _stream_load_recorder_manager->start(); |
422 | | |
423 | | // create internal workload group should be after storage_engin->open() |
424 | 7 | RETURN_IF_ERROR(_create_internal_workload_group()); |
425 | 7 | _workload_sched_mgr = new WorkloadSchedPolicyMgr(); |
426 | 7 | _workload_sched_mgr->start(this); |
427 | | |
428 | | // Initialize packed file manager |
429 | 7 | _packed_file_manager = io::PackedFileManager::instance(); |
430 | 7 | if (config::is_cloud_mode()) { |
431 | 1 | RETURN_IF_ERROR(_packed_file_manager->init()); |
432 | 1 | _packed_file_manager->start_background_manager(); |
433 | | |
434 | | // Start cluster info background worker for compaction read-write separation |
435 | 1 | static_cast<CloudClusterInfo*>(_cluster_info)->start_bg_worker(); |
436 | 1 | } |
437 | | |
438 | 7 | _index_policy_mgr = new IndexPolicyMgr(); |
439 | | |
440 | | // Initialize warmup download rate limiter for cloud mode |
441 | | // Always create the rate limiter in cloud mode to support dynamic rate limit changes |
442 | 7 | if (config::is_cloud_mode()) { |
443 | 1 | int64_t rate_limit = config::file_cache_warmup_download_rate_limit_bytes_per_second; |
444 | | // When rate_limit <= 0, pass 0 to disable rate limiting |
445 | 1 | int64_t rate = rate_limit > 0 ? rate_limit : 0; |
446 | | // max_burst is the same as rate (1 second burst) |
447 | | // limit is 0 which means no total limit |
448 | | // When rate is 0, S3RateLimiter will not throttle (no rate limiting) |
449 | 1 | _warmup_download_rate_limiter = new S3RateLimiterHolder(rate, rate, 0, [&](int64_t ns) { |
450 | 0 | if (ns > 0) { |
451 | 0 | warmup_download_rate_limit_latency << ns / 1000; |
452 | 0 | warmup_download_rate_limit_ns << ns; |
453 | 0 | warmup_download_rate_limit_exceed_req_num << 1; |
454 | 0 | } |
455 | 0 | }); |
456 | 1 | } |
457 | | |
458 | 7 | RETURN_IF_ERROR(_spill_file_mgr->init()); |
459 | 7 | RETURN_IF_ERROR(_runtime_query_statistics_mgr->start_report_thread()); |
460 | 7 | _dict_factory = new doris::DictionaryFactory(); |
461 | 7 | _s_ready = true; |
462 | | |
463 | 7 | init_simdjson_parser(); |
464 | | |
465 | | // Make aws-sdk-cpp InitAPI and ShutdownAPI called in the same thread |
466 | 7 | S3ClientFactory::instance(); |
467 | 7 | return Status::OK(); |
468 | 7 | } |
469 | | |
470 | | // when user not sepcify a workload group in FE, then query could |
471 | | // use dummy workload group. |
472 | 7 | Status ExecEnv::_create_internal_workload_group() { |
473 | 7 | LOG(INFO) << "begin create internal workload group."; |
474 | | |
475 | 7 | RETURN_IF_ERROR(_workload_group_manager->create_internal_wg()); |
476 | 7 | return Status::OK(); |
477 | 7 | } |
478 | | |
479 | 8 | void ExecEnv::_init_runtime_filter_timer_queue() { |
480 | 8 | _runtime_filter_timer_queue = new doris::RuntimeFilterTimerQueue(); |
481 | 8 | _runtime_filter_timer_queue->run(); |
482 | 8 | } |
483 | | |
484 | 7 | void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) { |
485 | | // Load file cache before starting up daemon threads to make sure StorageEngine is read. |
486 | 7 | if (!config::enable_file_cache) { |
487 | 4 | if (config::is_cloud_mode()) { |
488 | 0 | LOG(FATAL) << "Cloud mode requires to enable file cache, plz set " |
489 | 0 | "config::enable_file_cache " |
490 | 0 | "= true"; |
491 | 0 | exit(-1); |
492 | 0 | } |
493 | 4 | return; |
494 | 4 | } |
495 | 3 | if (config::file_cache_each_block_size > config::s3_write_buffer_size || |
496 | 3 | config::s3_write_buffer_size % config::file_cache_each_block_size != 0) { |
497 | 0 | LOG_FATAL( |
498 | 0 | "The config file_cache_each_block_size {} must less than or equal to config " |
499 | 0 | "s3_write_buffer_size {} and config::s3_write_buffer_size % " |
500 | 0 | "config::file_cache_each_block_size must be zero", |
501 | 0 | config::file_cache_each_block_size, config::s3_write_buffer_size); |
502 | 0 | exit(-1); |
503 | 0 | } |
504 | 3 | std::unordered_set<std::string> cache_path_set; |
505 | 3 | Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths); |
506 | 3 | if (!rest) { |
507 | 0 | throw Exception( |
508 | 0 | Status::FatalError("parse config file cache path failed, path={}, reason={}", |
509 | 0 | doris::config::file_cache_path, rest.msg())); |
510 | 0 | } |
511 | | |
512 | 3 | doris::Status cache_status; |
513 | 5 | for (auto& cache_path : cache_paths) { |
514 | 5 | if (cache_path_set.find(cache_path.path) != cache_path_set.end()) { |
515 | 0 | LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path); |
516 | 0 | continue; |
517 | 0 | } |
518 | | |
519 | 5 | cache_status = doris::io::FileCacheFactory::instance()->create_file_cache( |
520 | 5 | cache_path.path, cache_path.init_settings()); |
521 | 5 | if (!cache_status.ok()) { |
522 | 0 | if (!doris::config::ignore_broken_disk) { |
523 | 0 | throw Exception( |
524 | 0 | Status::FatalError("failed to init file cache, err: {}", cache_status)); |
525 | 0 | } |
526 | 0 | LOG(WARNING) << "failed to init file cache, err: " << cache_status; |
527 | 0 | } |
528 | 5 | cache_path_set.emplace(cache_path.path); |
529 | 5 | } |
530 | 3 | } |
531 | | |
532 | 7 | Status ExecEnv::init_mem_env() { |
533 | 7 | bool is_percent = false; |
534 | 7 | std::stringstream ss; |
535 | | // 1. init mem tracker |
536 | 7 | _process_profile = ProcessProfile::create_global_instance(); |
537 | 7 | _heap_profiler = HeapProfiler::create_global_instance(); |
538 | 7 | init_mem_tracker(); |
539 | 7 | thread_context()->thread_mem_tracker_mgr->init(); |
540 | | |
541 | 7 | if (!BitUtil::IsPowerOf2(config::min_buffer_size)) { |
542 | 0 | ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size; |
543 | 0 | return Status::InternalError(ss.str()); |
544 | 0 | } |
545 | | |
546 | 7 | _id_manager = new IdManager(); |
547 | 7 | _cache_manager = CacheManager::create_global_instance(); |
548 | | |
549 | 7 | int64_t storage_cache_limit = |
550 | 7 | ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(), |
551 | 7 | MemInfo::physical_mem(), &is_percent); |
552 | 7 | while (!is_percent && storage_cache_limit > MemInfo::mem_limit() / 2) { |
553 | 0 | storage_cache_limit = storage_cache_limit / 2; |
554 | 0 | } |
555 | 7 | int32_t index_percentage = config::index_page_cache_percentage; |
556 | 7 | int32_t num_shards = config::storage_page_cache_shard_size; |
557 | 7 | if ((num_shards & (num_shards - 1)) != 0) { |
558 | 0 | int old_num_shards = num_shards; |
559 | 0 | num_shards = cast_set<int>(BitUtil::RoundUpToPowerOfTwo(num_shards)); |
560 | 0 | LOG(WARNING) << "num_shards should be power of two, but got " << old_num_shards |
561 | 0 | << ". Rounded up to " << num_shards |
562 | 0 | << ". Please modify the 'storage_page_cache_shard_size' parameter in your " |
563 | 0 | "conf file to be a power of two for better performance."; |
564 | 0 | } |
565 | 7 | if (storage_cache_limit < num_shards * 2) { |
566 | 0 | LOG(WARNING) << "storage_cache_limit(" << storage_cache_limit << ") less than num_shards(" |
567 | 0 | << num_shards |
568 | 0 | << ") * 2, cache capacity will be 0, continuing to use " |
569 | 0 | "cache will only have negative effects, will be disabled."; |
570 | 0 | } |
571 | 7 | int64_t pk_storage_page_cache_limit = |
572 | 7 | ParseUtil::parse_mem_spec(config::pk_storage_page_cache_limit, MemInfo::mem_limit(), |
573 | 7 | MemInfo::physical_mem(), &is_percent); |
574 | 7 | while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() / 2) { |
575 | 0 | pk_storage_page_cache_limit = storage_cache_limit / 2; |
576 | 0 | } |
577 | 7 | _storage_page_cache = StoragePageCache::create_global_cache( |
578 | 7 | storage_cache_limit, index_percentage, pk_storage_page_cache_limit, num_shards); |
579 | 7 | LOG(INFO) << "Storage page cache memory limit: " |
580 | 7 | << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES) |
581 | 7 | << ", origin config value: " << config::storage_page_cache_limit; |
582 | | |
583 | | // Init row cache |
584 | 7 | int64_t row_cache_mem_limit = |
585 | 7 | ParseUtil::parse_mem_spec(config::row_cache_mem_limit, MemInfo::mem_limit(), |
586 | 7 | MemInfo::physical_mem(), &is_percent); |
587 | 7 | while (!is_percent && row_cache_mem_limit > MemInfo::mem_limit() / 2) { |
588 | | // Reason same as buffer_pool_limit |
589 | 0 | row_cache_mem_limit = row_cache_mem_limit / 2; |
590 | 0 | } |
591 | 7 | _row_cache = RowCache::create_global_cache(row_cache_mem_limit); |
592 | 7 | LOG(INFO) << "Row cache memory limit: " |
593 | 7 | << PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES) |
594 | 7 | << ", origin config value: " << config::row_cache_mem_limit; |
595 | | |
596 | 7 | uint64_t fd_number = config::min_file_descriptor_number; |
597 | 7 | struct rlimit l; |
598 | 7 | int ret = getrlimit(RLIMIT_NOFILE, &l); |
599 | 7 | if (ret != 0) { |
600 | 0 | LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno) |
601 | 0 | << ", use default configuration instead."; |
602 | 7 | } else { |
603 | 7 | fd_number = static_cast<uint64_t>(l.rlim_cur); |
604 | 7 | } |
605 | | |
606 | 7 | int64_t segment_cache_capacity = 0; |
607 | 7 | if (config::is_cloud_mode()) { |
608 | | // when in cloud mode, segment cache hold no system FD |
609 | | // thus the FD num limit makes no sense |
610 | | // cloud mode use FDCache to control FD |
611 | 1 | segment_cache_capacity = UINT32_MAX; |
612 | 6 | } else { |
613 | | // SegmentLoader caches segments in rowset granularity. So the size of |
614 | | // opened files will greater than segment_cache_capacity. |
615 | 6 | segment_cache_capacity = config::segment_cache_capacity; |
616 | 6 | int64_t segment_cache_fd_limit = fd_number / 100 * config::segment_cache_fd_percentage; |
617 | 6 | if (segment_cache_capacity < 0 || segment_cache_capacity > segment_cache_fd_limit) { |
618 | 6 | segment_cache_capacity = segment_cache_fd_limit; |
619 | 6 | } |
620 | 6 | } |
621 | | |
622 | 7 | int64_t segment_cache_mem_limit = |
623 | 7 | MemInfo::mem_limit() / 100 * config::segment_cache_memory_percentage; |
624 | | |
625 | 7 | _segment_loader = new SegmentLoader(segment_cache_mem_limit, segment_cache_capacity); |
626 | 7 | LOG(INFO) << "segment_cache_capacity <= fd_number * 1 / 5, fd_number: " << fd_number |
627 | 7 | << " segment_cache_capacity: " << segment_cache_capacity |
628 | 7 | << " min_segment_cache_mem_limit " << segment_cache_mem_limit; |
629 | | |
630 | 7 | _schema_cache = new SchemaCache(config::schema_cache_capacity); |
631 | | |
632 | 7 | size_t block_file_cache_fd_cache_size = |
633 | 7 | std::min((uint64_t)config::file_cache_max_file_reader_cache_size, fd_number / 3); |
634 | 7 | LOG(INFO) << "max file reader cache size is: " << block_file_cache_fd_cache_size |
635 | 7 | << ", resource hard limit is: " << fd_number |
636 | 7 | << ", config file_cache_max_file_reader_cache_size is: " |
637 | 7 | << config::file_cache_max_file_reader_cache_size; |
638 | 7 | config::file_cache_max_file_reader_cache_size = block_file_cache_fd_cache_size; |
639 | | |
640 | 7 | _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num); |
641 | | |
642 | 7 | _lookup_connection_cache = |
643 | 7 | LookupConnectionCache::create_global_instance(config::lookup_connection_cache_capacity); |
644 | | |
645 | | // use memory limit |
646 | 7 | int64_t inverted_index_cache_limit = |
647 | 7 | ParseUtil::parse_mem_spec(config::inverted_index_searcher_cache_limit, |
648 | 7 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
649 | 7 | while (!is_percent && inverted_index_cache_limit > MemInfo::mem_limit() / 2) { |
650 | | // Reason same as buffer_pool_limit |
651 | 0 | inverted_index_cache_limit = inverted_index_cache_limit / 2; |
652 | 0 | } |
653 | 7 | _inverted_index_searcher_cache = |
654 | 7 | InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit, 256); |
655 | 7 | LOG(INFO) << "Inverted index searcher cache memory limit: " |
656 | 7 | << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES) |
657 | 7 | << ", origin config value: " << config::inverted_index_searcher_cache_limit; |
658 | | |
659 | | // use memory limit |
660 | 7 | int64_t inverted_index_query_cache_limit = |
661 | 7 | ParseUtil::parse_mem_spec(config::inverted_index_query_cache_limit, |
662 | 7 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
663 | 7 | while (!is_percent && inverted_index_query_cache_limit > MemInfo::mem_limit() / 2) { |
664 | | // Reason same as buffer_pool_limit |
665 | 0 | inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2; |
666 | 0 | } |
667 | 7 | _inverted_index_query_cache = InvertedIndexQueryCache::create_global_cache( |
668 | 7 | inverted_index_query_cache_limit, config::inverted_index_query_cache_shards); |
669 | 7 | LOG(INFO) << "Inverted index query match cache memory limit: " |
670 | 7 | << PrettyPrinter::print(inverted_index_query_cache_limit, TUnit::BYTES) |
671 | 7 | << ", origin config value: " << config::inverted_index_query_cache_limit; |
672 | | |
673 | | // use memory limit |
674 | 7 | int64_t condition_cache_limit = config::condition_cache_limit * 1024L * 1024L; |
675 | 7 | _condition_cache = ConditionCache::create_global_cache(condition_cache_limit); |
676 | 7 | LOG(INFO) << "Condition cache memory limit: " |
677 | 7 | << PrettyPrinter::print(condition_cache_limit, TUnit::BYTES) |
678 | 7 | << ", origin config value: " << config::condition_cache_limit; |
679 | | |
680 | | // Initialize encoding info resolver |
681 | 7 | _encoding_info_resolver = new segment_v2::EncodingInfoResolver(); |
682 | | |
683 | | // init orc memory pool |
684 | 7 | _orc_memory_pool = new doris::ORCMemoryPool(); |
685 | 7 | _arrow_memory_pool = new doris::ArrowMemoryPool(); |
686 | | |
687 | 7 | _query_cache = QueryCache::create_global_cache(config::query_cache_size * 1024L * 1024L); |
688 | 7 | LOG(INFO) << "query cache memory limit: " << config::query_cache_size << "MB"; |
689 | | |
690 | | // The default delete bitmap cache is set to 100MB, |
691 | | // which can be insufficient and cause performance issues when the amount of user data is large. |
692 | | // To mitigate the problem of an inadequate cache, |
693 | | // we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size. |
694 | 7 | int64_t delete_bitmap_agg_cache_cache_limit = |
695 | 7 | ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit, |
696 | 7 | MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent); |
697 | 7 | _delete_bitmap_agg_cache = DeleteBitmapAggCache::create_instance(std::max( |
698 | 7 | delete_bitmap_agg_cache_cache_limit, config::delete_bitmap_agg_cache_capacity)); |
699 | | |
700 | 7 | return Status::OK(); |
701 | 7 | } |
702 | | |
703 | 8 | void ExecEnv::init_mem_tracker() { |
704 | 8 | mem_tracker_limiter_pool.resize(MEM_TRACKER_GROUP_NUM, |
705 | 8 | TrackerLimiterGroup()); // before all mem tracker init. |
706 | 8 | _s_tracking_memory = true; |
707 | 8 | _orphan_mem_tracker = |
708 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan"); |
709 | 8 | _brpc_iobuf_block_memory_tracker = |
710 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "IOBufBlockMemory"); |
711 | 8 | _segcompaction_mem_tracker = |
712 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, "SegCompaction"); |
713 | 8 | _tablets_no_cache_mem_tracker = MemTrackerLimiter::create_shared( |
714 | 8 | MemTrackerLimiter::Type::METADATA, "Tablets(not in SchemaCache, TabletSchemaCache)"); |
715 | 8 | _segments_no_cache_mem_tracker = MemTrackerLimiter::create_shared( |
716 | 8 | MemTrackerLimiter::Type::METADATA, "Segments(not in SegmentCache)"); |
717 | 8 | _rowsets_no_cache_mem_tracker = |
718 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "Rowsets"); |
719 | 8 | _point_query_executor_mem_tracker = |
720 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor"); |
721 | 8 | _query_cache_mem_tracker = |
722 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::CACHE, "QueryCache"); |
723 | 8 | _block_compression_mem_tracker = |
724 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression"); |
725 | 8 | _rowid_storage_reader_tracker = |
726 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader"); |
727 | 8 | _subcolumns_tree_tracker = |
728 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree"); |
729 | 8 | _s3_file_buffer_tracker = |
730 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer"); |
731 | 8 | _stream_load_pipe_tracker = |
732 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe"); |
733 | 8 | _parquet_meta_tracker = |
734 | 8 | MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "ParquetMeta"); |
735 | 8 | } |
736 | | |
737 | 7 | Status ExecEnv::_check_deploy_mode() { |
738 | 11 | for (auto _path : _store_paths) { |
739 | 11 | auto deploy_mode_path = fmt::format("{}/{}", _path.path, DEPLOY_MODE_PREFIX); |
740 | 11 | std::string expected_mode = doris::config::is_cloud_mode() ? "cloud" : "local"; |
741 | 11 | bool exists = false; |
742 | 11 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(deploy_mode_path, &exists)); |
743 | 11 | if (exists) { |
744 | | // check if is ok |
745 | 8 | io::FileReaderSPtr reader; |
746 | 8 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(deploy_mode_path, &reader)); |
747 | 8 | size_t fsize = reader->size(); |
748 | 8 | if (fsize > 0) { |
749 | 8 | std::string actual_mode; |
750 | 8 | actual_mode.resize(fsize, '\0'); |
751 | 8 | size_t bytes_read = 0; |
752 | 8 | RETURN_IF_ERROR(reader->read_at(0, {actual_mode.data(), fsize}, &bytes_read)); |
753 | 8 | DCHECK_EQ(fsize, bytes_read); |
754 | 8 | if (expected_mode != actual_mode) { |
755 | 0 | return Status::InternalError( |
756 | 0 | "You can't switch deploy mode from {} to {}, " |
757 | 0 | "maybe you need to check be.conf\n", |
758 | 0 | actual_mode.c_str(), expected_mode.c_str()); |
759 | 0 | } |
760 | 8 | LOG(INFO) << "The current deployment mode is " << expected_mode << "."; |
761 | 8 | } |
762 | 8 | } else { |
763 | 3 | io::FileWriterPtr file_writer; |
764 | 3 | RETURN_IF_ERROR( |
765 | 3 | io::global_local_filesystem()->create_file(deploy_mode_path, &file_writer)); |
766 | 3 | RETURN_IF_ERROR(file_writer->append(expected_mode)); |
767 | 3 | RETURN_IF_ERROR(file_writer->close()); |
768 | 3 | LOG(INFO) << "The file deploy_mode doesn't exist, create it."; |
769 | 3 | auto cluster_id_path = fmt::format("{}/{}", _path.path, CLUSTER_ID_PREFIX); |
770 | 3 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists)); |
771 | 3 | if (exists) { |
772 | 0 | LOG(WARNING) << "This may be an upgrade from old version," |
773 | 0 | << "or the deploy_mode file has been manually deleted"; |
774 | 0 | } |
775 | 3 | } |
776 | 11 | } |
777 | 7 | return Status::OK(); |
778 | 7 | } |
779 | | |
780 | | #ifdef BE_TEST |
781 | | void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) { |
782 | | this->_new_load_stream_mgr = std::move(new_load_stream_mgr); |
783 | | } |
784 | | |
785 | | void ExecEnv::clear_new_load_stream_mgr() { |
786 | | this->_new_load_stream_mgr.reset(); |
787 | | } |
788 | | |
789 | | void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) { |
790 | | this->_stream_load_executor = std::move(stream_load_executor); |
791 | | } |
792 | | |
793 | | void ExecEnv::clear_stream_load_executor() { |
794 | | this->_stream_load_executor.reset(); |
795 | | } |
796 | | |
797 | | void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) { |
798 | | this->_wal_manager = std::move(wm); |
799 | | } |
800 | | void ExecEnv::clear_wal_mgr() { |
801 | | this->_wal_manager.reset(); |
802 | | } |
803 | | #endif |
804 | | // TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method. |
805 | | // We need to stop all threads before releasing resource. |
806 | 16 | void ExecEnv::destroy() { |
807 | | //Only destroy once after init |
808 | 16 | if (!ready()) { |
809 | 9 | return; |
810 | 9 | } |
811 | | // Memory barrier to prevent other threads from accessing destructed resources |
812 | 7 | _s_ready = false; |
813 | | |
814 | 7 | SAFE_STOP(_wal_manager); |
815 | 7 | _wal_manager.reset(); |
816 | 7 | SAFE_STOP(_load_channel_mgr); |
817 | 7 | SAFE_STOP(_broker_mgr); |
818 | 7 | SAFE_STOP(_load_path_mgr); |
819 | 7 | SAFE_STOP(_result_mgr); |
820 | 7 | SAFE_STOP(_group_commit_mgr); |
821 | | // _routine_load_task_executor should be stopped before _new_load_stream_mgr. |
822 | 7 | SAFE_STOP(_routine_load_task_executor); |
823 | 7 | SAFE_STOP(_stream_load_recorder_manager); |
824 | | // stop workload scheduler |
825 | 7 | SAFE_STOP(_workload_sched_mgr); |
826 | | // stop pipline step 2, cgroup execution |
827 | 7 | SAFE_STOP(_workload_group_manager); |
828 | | |
829 | 7 | SAFE_STOP(_external_scan_context_mgr); |
830 | 7 | SAFE_STOP(_fragment_mgr); |
831 | 7 | SAFE_STOP(_runtime_filter_timer_queue); |
832 | | // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped. |
833 | 7 | _load_stream_mgr.reset(); |
834 | 7 | _new_load_stream_mgr.reset(); |
835 | 7 | _stream_load_executor.reset(); |
836 | 7 | _memtable_memory_limiter.reset(); |
837 | 7 | _delta_writer_v2_pool.reset(); |
838 | 7 | _load_stream_map_pool.reset(); |
839 | 7 | SAFE_STOP(_write_cooldown_meta_executors); |
840 | | |
841 | | // _id_manager must be destoried before tablet schema cache |
842 | 7 | SAFE_DELETE(_id_manager); |
843 | | |
844 | | // Stop cluster info background worker before storage engine is destroyed |
845 | 7 | if (config::is_cloud_mode() && _cluster_info) { |
846 | 0 | static_cast<CloudClusterInfo*>(_cluster_info)->stop_bg_worker(); |
847 | 0 | } |
848 | | |
849 | | // StorageEngine must be destoried before _cache_manager destory |
850 | 7 | SAFE_STOP(_storage_engine); |
851 | 7 | _storage_engine.reset(); |
852 | | |
853 | 7 | SAFE_STOP(_spill_file_mgr); |
854 | 7 | if (_runtime_query_statistics_mgr) { |
855 | 3 | _runtime_query_statistics_mgr->stop_report_thread(); |
856 | 3 | } |
857 | 7 | SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool); |
858 | 7 | SAFE_SHUTDOWN(_segment_prefetch_thread_pool); |
859 | 7 | SAFE_SHUTDOWN(_s3_file_upload_thread_pool); |
860 | 7 | SAFE_SHUTDOWN(_lazy_release_obj_pool); |
861 | 7 | SAFE_SHUTDOWN(_non_block_close_thread_pool); |
862 | 7 | SAFE_SHUTDOWN(_s3_file_system_thread_pool); |
863 | 7 | SAFE_SHUTDOWN(_send_batch_thread_pool); |
864 | 7 | SAFE_SHUTDOWN(_udf_close_workers_thread_pool); |
865 | 7 | SAFE_SHUTDOWN(_send_table_stats_thread_pool); |
866 | | |
867 | 7 | SAFE_DELETE(_load_channel_mgr); |
868 | | |
869 | 7 | SAFE_DELETE(_inverted_index_query_cache); |
870 | 7 | SAFE_DELETE(_inverted_index_searcher_cache); |
871 | 7 | SAFE_DELETE(_condition_cache); |
872 | 7 | SAFE_DELETE(_encoding_info_resolver); |
873 | 7 | SAFE_DELETE(_lookup_connection_cache); |
874 | 7 | SAFE_DELETE(_schema_cache); |
875 | 7 | SAFE_DELETE(_segment_loader); |
876 | 7 | SAFE_DELETE(_row_cache); |
877 | 7 | SAFE_DELETE(_query_cache); |
878 | 7 | SAFE_DELETE(_delete_bitmap_agg_cache); |
879 | | |
880 | | // Free resource after threads are stopped. |
881 | | // Some threads are still running, like threads created by _new_load_stream_mgr ... |
882 | 7 | SAFE_DELETE(_tablet_schema_cache); |
883 | 7 | SAFE_DELETE(_tablet_column_object_pool); |
884 | | |
885 | | // _storage_page_cache must be destoried before _cache_manager |
886 | 7 | SAFE_DELETE(_storage_page_cache); |
887 | | |
888 | 7 | SAFE_DELETE(_small_file_mgr); |
889 | 7 | SAFE_DELETE(_broker_mgr); |
890 | 7 | SAFE_DELETE(_load_path_mgr); |
891 | 7 | SAFE_DELETE(_result_mgr); |
892 | 7 | SAFE_DELETE(_file_meta_cache); |
893 | 7 | SAFE_DELETE(_group_commit_mgr); |
894 | 7 | SAFE_DELETE(_cdc_client_mgr); |
895 | 7 | SAFE_DELETE(_routine_load_task_executor); |
896 | 7 | SAFE_DELETE(_stream_load_recorder_manager); |
897 | | // _stream_load_executor |
898 | 7 | SAFE_DELETE(_function_client_cache); |
899 | 7 | SAFE_DELETE(_streaming_client_cache); |
900 | 7 | SAFE_DELETE(_internal_client_cache); |
901 | | |
902 | 7 | SAFE_DELETE(_bfd_parser); |
903 | 7 | SAFE_DELETE(_result_cache); |
904 | 7 | SAFE_DELETE(_vstream_mgr); |
905 | | // When _vstream_mgr is deconstructed, it will try call query context's dctor and will |
906 | | // access spill stream mgr, so spill stream mgr should be deconstructed after data stream manager |
907 | 7 | SAFE_DELETE(_spill_file_mgr); |
908 | 7 | SAFE_DELETE(_fragment_mgr); |
909 | 7 | SAFE_DELETE(_workload_sched_mgr); |
910 | 7 | SAFE_DELETE(_workload_group_manager); |
911 | 7 | SAFE_DELETE(_file_cache_factory); |
912 | 7 | SAFE_DELETE(_runtime_filter_timer_queue); |
913 | 7 | SAFE_DELETE(_dict_factory); |
914 | | // TODO(zhiqiang): Maybe we should call shutdown before release thread pool? |
915 | 7 | _lazy_release_obj_pool.reset(nullptr); |
916 | 7 | _non_block_close_thread_pool.reset(nullptr); |
917 | 7 | _s3_file_system_thread_pool.reset(nullptr); |
918 | 7 | _send_table_stats_thread_pool.reset(nullptr); |
919 | 7 | _buffered_reader_prefetch_thread_pool.reset(nullptr); |
920 | 7 | _segment_prefetch_thread_pool.reset(nullptr); |
921 | 7 | _s3_file_upload_thread_pool.reset(nullptr); |
922 | 7 | _send_batch_thread_pool.reset(nullptr); |
923 | 7 | _udf_close_workers_thread_pool.reset(nullptr); |
924 | 7 | _write_cooldown_meta_executors.reset(nullptr); |
925 | | |
926 | 7 | SAFE_DELETE(_broker_client_cache); |
927 | 7 | SAFE_DELETE(_frontend_client_cache); |
928 | 7 | SAFE_DELETE(_backend_client_cache); |
929 | 7 | SAFE_DELETE(_result_queue_mgr); |
930 | | |
931 | 7 | SAFE_DELETE(_external_scan_context_mgr); |
932 | 7 | SAFE_DELETE(_user_function_cache); |
933 | | |
934 | | // cache_manager must be destoried after all cache. |
935 | | // https://github.com/apache/doris/issues/24082#issuecomment-1712544039 |
936 | 7 | SAFE_DELETE(_cache_manager); |
937 | 7 | _file_cache_open_fd_cache.reset(nullptr); |
938 | | |
939 | | // _heartbeat_flags must be destoried after staroge engine |
940 | 7 | SAFE_DELETE(_heartbeat_flags); |
941 | | |
942 | | // Master Info is a thrift object, it could be the last one to deconstruct. |
943 | | // Master info should be deconstruct later than fragment manager, because fragment will |
944 | | // access cluster_info.backend_id to access some info. If there is a running query and master |
945 | | // info is deconstructed then BE process will core at coordinator back method in fragment mgr. |
946 | 7 | SAFE_DELETE(_cluster_info); |
947 | | |
948 | | // NOTE: runtime query statistics mgr could be visited by query and daemon thread |
949 | | // so it should be created before all query begin and deleted after all query and daemon thread stoppped |
950 | 7 | SAFE_DELETE(_runtime_query_statistics_mgr); |
951 | | |
952 | 7 | SAFE_DELETE(_arrow_memory_pool); |
953 | | |
954 | 7 | SAFE_DELETE(_orc_memory_pool); |
955 | | |
956 | | // dns cache is a global instance and need to be released at last |
957 | 7 | SAFE_DELETE(_dns_cache); |
958 | 7 | SAFE_DELETE(_kerberos_ticket_mgr); |
959 | 7 | SAFE_DELETE(_hdfs_mgr); |
960 | | // PackedFileManager is a singleton, just stop its background thread |
961 | 7 | if (_packed_file_manager) { |
962 | 3 | _packed_file_manager->stop_background_manager(); |
963 | 3 | _packed_file_manager = nullptr; |
964 | 3 | } |
965 | | |
966 | 7 | SAFE_DELETE(_process_profile); |
967 | 7 | SAFE_DELETE(_heap_profiler); |
968 | | |
969 | 7 | SAFE_DELETE(_index_policy_mgr); |
970 | 7 | SAFE_DELETE(_warmup_download_rate_limiter); |
971 | | |
972 | 7 | _s_tracking_memory = false; |
973 | | |
974 | 7 | clear_storage_resource(); |
975 | 7 | PythonServerManager::instance().shutdown(); |
976 | | LOG(INFO) << "Doris exec envorinment is destoried."; |
977 | 7 | } |
978 | | |
979 | | } // namespace doris |
980 | | |
981 | | namespace doris::config { |
982 | | // Callback to update warmup download rate limiter when config changes is registered |
983 | | DEFINE_ON_UPDATE(file_cache_warmup_download_rate_limit_bytes_per_second, |
984 | | [](int64_t old_val, int64_t new_val) { |
985 | | auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter(); |
986 | | if (rate_limiter != nullptr && new_val != old_val) { |
987 | | // Reset rate limiter with new rate limit value |
988 | | // When new_val <= 0, pass 0 to disable rate limiting |
989 | | int64_t rate = new_val > 0 ? new_val : 0; |
990 | | rate_limiter->reset(rate, rate, 0); |
991 | | if (rate > 0) { |
992 | | LOG(INFO) << "Warmup download rate limiter updated from " << old_val |
993 | | << " to " << new_val << " bytes/s"; |
994 | | } else { |
995 | | LOG(INFO) << "Warmup download rate limiter disabled"; |
996 | | } |
997 | | } |
998 | | }); |
999 | | } // namespace doris::config |