Coverage Report

Created: 2026-07-01 22:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/exec_env_init.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
// IWYU pragma: no_include <bthread/errno.h>
19
#include <gen_cpp/HeartbeatService_types.h>
20
#include <gen_cpp/Metrics_types.h>
21
#include <simdjson.h>
22
#include <sys/resource.h>
23
24
#include <cerrno> // IWYU pragma: keep
25
#include <cstdint>
26
#include <cstdlib>
27
#include <cstring>
28
#include <limits>
29
#include <memory>
30
#include <ostream>
31
#include <string>
32
#include <vector>
33
34
#include "cloud/cloud_cluster_info.h"
35
#include "cloud/cloud_meta_mgr.h"
36
#include "cloud/cloud_ms_rpc_rate_limit_services.h"
37
#include "cloud/cloud_ms_rpc_rate_limiters.h"
38
#include "cloud/cloud_storage_engine.h"
39
#include "cloud/cloud_stream_load_executor.h"
40
#include "cloud/cloud_tablet_hotspot.h"
41
#include "cloud/cloud_warm_up_manager.h"
42
#include "cloud/config.h"
43
#include "common/cast_set.h"
44
#include "common/config.h"
45
#include "common/kerberos/kerberos_ticket_mgr.h"
46
#include "common/logging.h"
47
#include "common/metrics/doris_metrics.h"
48
#include "common/multi_version.h"
49
#include "common/status.h"
50
#include "cpp/token_bucket_rate_limiter.h"
51
#include "exec/exchange/vdata_stream_mgr.h"
52
#include "exec/pipeline/task_queue.h"
53
#include "exec/pipeline/task_scheduler.h"
54
#include "exec/scan/scanner_scheduler.h"
55
#include "exec/sink/delta_writer_v2_pool.h"
56
#include "exec/sink/load_stream_map_pool.h"
57
#include "exec/spill/spill_file_manager.h"
58
#include "exprs/function/dictionary_factory.h"
59
#include "format/orc/orc_memory_pool.h"
60
#include "format/parquet/arrow_memory_pool.h"
61
#include "io/cache/block_file_cache_downloader.h"
62
#include "io/cache/block_file_cache_factory.h"
63
#include "io/cache/fs_file_cache_storage.h"
64
#include "io/fs/file_meta_cache.h"
65
#include "io/fs/local_file_reader.h"
66
#include "load/channel/load_channel_mgr.h"
67
#include "load/channel/load_stream_mgr.h"
68
#include "load/group_commit/group_commit_mgr.h"
69
#include "load/group_commit/wal/wal_manager.h"
70
#include "load/load_path_mgr.h"
71
#include "load/memtable/memtable_memory_limiter.h"
72
#include "load/routine_load/routine_load_task_executor.h"
73
#include "load/stream_load/new_load_stream_mgr.h"
74
#include "load/stream_load/stream_load_executor.h"
75
#include "load/stream_load/stream_load_recorder_manager.h"
76
#include "runtime/broker_mgr.h"
77
#include "runtime/cache/result_cache.h"
78
#include "runtime/cdc_client_mgr.h"
79
#include "runtime/exec_env.h"
80
#include "runtime/external_scan_context_mgr.h"
81
#include "runtime/fragment_mgr.h"
82
#include "runtime/heartbeat_flags.h"
83
#include "runtime/index_policy/index_policy_mgr.h"
84
#include "runtime/memory/cache_manager.h"
85
#include "runtime/memory/heap_profiler.h"
86
#include "runtime/memory/mem_tracker.h"
87
#include "runtime/memory/mem_tracker_limiter.h"
88
#include "runtime/memory/thread_mem_tracker_mgr.h"
89
#include "runtime/process_profile.h"
90
#include "runtime/query_cache/query_cache.h"
91
#include "runtime/result_buffer_mgr.h"
92
#include "runtime/result_queue_mgr.h"
93
#include "runtime/runtime_query_statistics_mgr.h"
94
#include "runtime/small_file_mgr.h"
95
#include "runtime/thread_context.h"
96
#include "runtime/user_function_cache.h"
97
#include "runtime/workload_group/workload_group_manager.h"
98
#include "runtime/workload_management/workload_sched_policy_mgr.h"
99
#include "service/backend_options.h"
100
#include "service/backend_service.h"
101
#include "service/point_query_executor.h"
102
#include "storage/cache/ann_index_ivf_list_cache.h"
103
#include "storage/cache/page_cache.h"
104
#include "storage/id_manager.h"
105
#include "storage/index/ann/ann_index_result_cache/ann_index_result_cache.h"
106
#include "storage/index/inverted/inverted_index_cache.h"
107
#include "storage/olap_define.h"
108
#include "storage/options.h"
109
#include "storage/segment/condition_cache.h"
110
#include "storage/segment/encoding_info.h"
111
#include "storage/segment/segment_loader.h"
112
#include "storage/storage_engine.h"
113
#include "storage/storage_policy.h"
114
#include "storage/tablet/tablet_column_object_pool.h"
115
#include "storage/tablet/tablet_meta.h"
116
#include "storage/tablet/tablet_schema_cache.h"
117
#include "udf/python/python_server.h"
118
#include "util/bfd_parser.h"
119
#include "util/bit_util.h"
120
#include "util/brpc_client_cache.h"
121
#include "util/client_cache.h"
122
#include "util/cpu_info.h"
123
#include "util/disk_info.h"
124
#include "util/dns_cache.h"
125
#include "util/mem_info.h"
126
#include "util/parse_util.h"
127
#include "util/pretty_printer.h"
128
#include "util/threadpool.h"
129
#include "util/thrift_rpc_helper.h"
130
#include "util/timezone_utils.h"
131
132
// clang-format off
133
// this must after util/brpc_client_cache.h
134
// /doris/thirdparty/installed/include/brpc/errno.pb.h:69:3: error: expected identifier
135
//  EINTERNAL = 2001,
136
//   ^
137
//  /doris/thirdparty/installed/include/hadoop_hdfs/hdfs.h:61:19: note: expanded from macro 'EINTERNAL'
138
//  #define EINTERNAL 255
139
#include "io/fs/hdfs/hdfs_mgr.h"
140
#include "io/fs/packed_file_manager.h"
141
// clang-format on
142
143
namespace doris {
144
145
class PBackendService_Stub;
146
class PFunctionService_Stub;
147
148
// Warmup download rate limiter metrics
149
bvar::LatencyRecorder warmup_download_rate_limit_latency("warmup_download_rate_limit_latency");
150
bvar::Adder<int64_t> warmup_download_rate_limit_ns("warmup_download_rate_limit_ns");
151
bvar::Adder<int64_t> warmup_download_rate_limit_exceed_req_num(
152
        "warmup_download_rate_limit_exceed_req_num");
153
154
0
static void init_doris_metrics(const std::vector<StorePath>& store_paths) {
155
0
    bool init_system_metrics = config::enable_system_metrics;
156
0
    std::set<std::string> disk_devices;
157
0
    std::vector<std::string> network_interfaces;
158
0
    std::vector<std::string> paths;
159
0
    for (const auto& store_path : store_paths) {
160
0
        paths.emplace_back(store_path.path);
161
0
    }
162
0
    if (init_system_metrics) {
163
0
        auto st = DiskInfo::get_disk_devices(paths, &disk_devices);
164
0
        if (!st.ok()) {
165
0
            LOG(WARNING) << "get disk devices failed, status=" << st;
166
0
            return;
167
0
        }
168
0
        st = get_inet_interfaces(&network_interfaces, BackendOptions::is_bind_ipv6());
169
0
        if (!st.ok()) {
170
0
            LOG(WARNING) << "get inet interfaces failed, status=" << st;
171
0
            return;
172
0
        }
173
0
    }
174
0
    DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces);
175
0
}
176
177
// Used to calculate the num of min thread and max thread based on the passed config
178
0
static std::pair<size_t, size_t> get_num_threads(size_t min_num, size_t max_num) {
179
0
    auto num_cores = doris::CpuInfo::num_cores();
180
0
    min_num = (min_num == 0) ? num_cores : min_num;
181
0
    max_num = (max_num == 0) ? num_cores : max_num;
182
0
    auto factor = max_num / min_num;
183
0
    min_num = std::min(num_cores * factor, min_num);
184
0
    max_num = std::min(min_num * factor, max_num);
185
0
    return {min_num, max_num};
186
0
}
187
188
1.00k
ThreadPool* ExecEnv::non_block_close_thread_pool() {
189
1.00k
    return _non_block_close_thread_pool.get();
190
1.00k
}
191
192
7
ExecEnv::ExecEnv() = default;
193
194
7
ExecEnv::~ExecEnv() {
195
7
    destroy();
196
7
}
197
198
Status ExecEnv::init(ExecEnv* env, const std::vector<StorePath>& store_paths,
199
                     const std::vector<StorePath>& spill_store_paths,
200
0
                     const std::set<std::string>& broken_paths) {
201
0
    return env->_init(store_paths, spill_store_paths, broken_paths);
202
0
}
203
204
// pick simdjson implementation based on CPU capabilities
205
0
inline void init_simdjson_parser() {
206
    // haswell: AVX2 (2013 Intel Haswell or later, all AMD Zen processors)
207
0
    const auto* haswell_implementation = simdjson::get_available_implementations()["haswell"];
208
0
    if (!haswell_implementation || !haswell_implementation->supported_by_runtime_system()) {
209
        // pick available implementation
210
0
        for (const auto* implementation : simdjson::get_available_implementations()) {
211
0
            if (implementation->supported_by_runtime_system()) {
212
0
                LOG(INFO) << "Using SimdJSON implementation : " << implementation->name() << ": "
213
0
                          << implementation->description();
214
0
                simdjson::get_active_implementation() = implementation;
215
0
                return;
216
0
            }
217
0
        }
218
0
        LOG(WARNING) << "No available SimdJSON implementation found.";
219
0
    } else {
220
0
        LOG(INFO) << "Using SimdJSON Haswell implementation";
221
0
    }
222
0
}
223
224
Status ExecEnv::_init(const std::vector<StorePath>& store_paths,
225
                      const std::vector<StorePath>& spill_store_paths,
226
0
                      const std::set<std::string>& broken_paths) {
227
    //Only init once before be destroyed
228
0
    if (ready()) {
229
0
        return Status::OK();
230
0
    }
231
0
    std::unordered_map<std::string, std::unique_ptr<SpillDataDir>> spill_store_map;
232
0
    for (const auto& spill_path : spill_store_paths) {
233
0
        spill_store_map.emplace(spill_path.path, std::make_unique<SpillDataDir>(
234
0
                                                         spill_path.path, spill_path.capacity_bytes,
235
0
                                                         spill_path.storage_medium));
236
0
    }
237
0
    init_doris_metrics(store_paths);
238
0
    _store_paths = store_paths;
239
0
    _tmp_file_dirs = std::make_unique<segment_v2::TmpFileDirs>(_store_paths);
240
0
    RETURN_IF_ERROR(_tmp_file_dirs->init());
241
0
    _user_function_cache = new UserFunctionCache();
242
0
    static_cast<void>(_user_function_cache->init(doris::config::user_function_dir));
243
0
    _external_scan_context_mgr = new ExternalScanContextMgr(this);
244
0
    set_stream_mgr(new doris::VDataStreamMgr());
245
0
    _result_mgr = new ResultBufferMgr();
246
0
    _result_queue_mgr = new ResultQueueMgr();
247
0
    _backend_client_cache = new BackendServiceClientCache(config::max_client_cache_size_per_host);
248
0
    _frontend_client_cache = new FrontendServiceClientCache(config::max_client_cache_size_per_host);
249
0
    _broker_client_cache = new BrokerServiceClientCache(config::max_client_cache_size_per_host);
250
251
0
    TimezoneUtils::load_timezones_to_cache();
252
253
0
    static_cast<void>(ThreadPoolBuilder("SendBatchThreadPool")
254
0
                              .set_min_threads(config::send_batch_thread_pool_thread_num)
255
0
                              .set_max_threads(config::send_batch_thread_pool_thread_num)
256
0
                              .set_max_queue_size(config::send_batch_thread_pool_queue_size)
257
0
                              .build(&_send_batch_thread_pool));
258
259
0
    static_cast<void>(ThreadPoolBuilder("UDFCloseWorkers")
260
0
                              .set_min_threads(4)
261
0
                              .set_max_threads(std::min(32, CpuInfo::num_cores()))
262
0
                              .build(&_udf_close_workers_thread_pool));
263
264
0
    auto [buffered_reader_min_threads, buffered_reader_max_threads] =
265
0
            get_num_threads(config::num_buffered_reader_prefetch_thread_pool_min_thread,
266
0
                            config::num_buffered_reader_prefetch_thread_pool_max_thread);
267
0
    static_cast<void>(ThreadPoolBuilder("BufferedReaderPrefetchThreadPool")
268
0
                              .set_min_threads(cast_set<int>(buffered_reader_min_threads))
269
0
                              .set_max_threads(cast_set<int>(buffered_reader_max_threads))
270
0
                              .build(&_buffered_reader_prefetch_thread_pool));
271
272
0
    static_cast<void>(ThreadPoolBuilder("SegmentPrefetchThreadPool")
273
0
                              .set_min_threads(cast_set<int>(
274
0
                                      config::segment_prefetch_thread_pool_thread_num_min))
275
0
                              .set_max_threads(cast_set<int>(
276
0
                                      config::segment_prefetch_thread_pool_thread_num_max))
277
0
                              .build(&_segment_prefetch_thread_pool));
278
279
0
    static_cast<void>(ThreadPoolBuilder("SendTableStatsThreadPool")
280
0
                              .set_min_threads(8)
281
0
                              .set_max_threads(32)
282
0
                              .build(&_send_table_stats_thread_pool));
283
284
0
    auto [s3_file_upload_min_threads, s3_file_upload_max_threads] =
285
0
            get_num_threads(config::num_s3_file_upload_thread_pool_min_thread,
286
0
                            config::num_s3_file_upload_thread_pool_max_thread);
287
0
    static_cast<void>(ThreadPoolBuilder("S3FileUploadThreadPool")
288
0
                              .set_min_threads(cast_set<int>(s3_file_upload_min_threads))
289
0
                              .set_max_threads(cast_set<int>(s3_file_upload_max_threads))
290
0
                              .build(&_s3_file_upload_thread_pool));
291
292
    // min num equal to fragment pool's min num
293
    // max num is useless because it will start as many as requested in the past
294
    // queue size is useless because the max thread num is very large
295
0
    static_cast<void>(ThreadPoolBuilder("LazyReleaseMemoryThreadPool")
296
0
                              .set_min_threads(1)
297
0
                              .set_max_threads(1)
298
0
                              .set_max_queue_size(1000000)
299
0
                              .build(&_lazy_release_obj_pool));
300
0
    static_cast<void>(ThreadPoolBuilder("NonBlockCloseThreadPool")
301
0
                              .set_min_threads(cast_set<int>(config::min_nonblock_close_thread_num))
302
0
                              .set_max_threads(cast_set<int>(config::max_nonblock_close_thread_num))
303
0
                              .build(&_non_block_close_thread_pool));
304
0
    static_cast<void>(ThreadPoolBuilder("S3FileSystemThreadPool")
305
0
                              .set_min_threads(config::min_s3_file_system_thread_num)
306
0
                              .set_max_threads(config::max_s3_file_system_thread_num)
307
0
                              .build(&_s3_file_system_thread_pool));
308
0
    static_cast<void>(ThreadPoolBuilder("PeerRaceS3ThreadPool")
309
0
                              .set_min_threads(config::min_peer_race_s3_thread_num)
310
0
                              .set_max_threads(config::max_peer_race_s3_thread_num)
311
0
                              .build(&_peer_race_s3_thread_pool));
312
0
    RETURN_IF_ERROR(init_mem_env());
313
314
    // NOTE: runtime query statistics mgr could be visited by query and daemon thread
315
    // so it should be created before all query begin and deleted after all query and daemon thread stoppped
316
0
    _runtime_query_statistics_mgr = new RuntimeQueryStatisticsMgr();
317
0
    CgroupCpuCtl::init_doris_cgroup_path();
318
0
    _file_cache_open_fd_cache = std::make_unique<io::FDCache>();
319
0
    _file_cache_factory = new io::FileCacheFactory();
320
0
    std::vector<doris::CachePath> cache_paths;
321
0
    init_file_cache_factory(cache_paths);
322
0
    doris::io::BeConfDataDirReader::init_be_conf_data_dir(store_paths, spill_store_paths,
323
0
                                                          cache_paths);
324
0
    _init_runtime_filter_timer_queue();
325
326
0
    _workload_group_manager = new WorkloadGroupMgr();
327
328
0
    _fragment_mgr = new FragmentMgr(this);
329
0
    _result_cache = new ResultCache(config::query_cache_max_size_mb,
330
0
                                    config::query_cache_elasticity_size_mb);
331
0
    if (config::is_cloud_mode()) {
332
0
        _cluster_info = new CloudClusterInfo();
333
0
    } else {
334
0
        _cluster_info = new ClusterInfo();
335
0
    }
336
337
0
    _load_path_mgr = new LoadPathMgr(this);
338
0
    _bfd_parser = BfdParser::create();
339
0
    _broker_mgr = new BrokerMgr(this);
340
0
    _load_channel_mgr = new LoadChannelMgr();
341
0
    auto num_flush_threads = std::min(
342
0
            _store_paths.size() * config::flush_thread_num_per_store,
343
0
            static_cast<size_t>(CpuInfo::num_cores()) * config::max_flush_thread_num_per_cpu);
344
0
    _load_stream_mgr = std::make_unique<LoadStreamMgr>(num_flush_threads);
345
0
    _new_load_stream_mgr = NewLoadStreamMgr::create_unique();
346
0
    _internal_client_cache = new BrpcClientCache<PBackendService_Stub>();
347
0
    _streaming_client_cache =
348
0
            new BrpcClientCache<PBackendService_Stub>("baidu_std", "single", "streaming");
349
0
    _function_client_cache =
350
0
            new BrpcClientCache<PFunctionService_Stub>(config::function_service_protocol);
351
0
    if (config::is_cloud_mode()) {
352
0
        _stream_load_executor = CloudStreamLoadExecutor::create_unique(this);
353
0
    } else {
354
0
        _stream_load_executor = StreamLoadExecutor::create_unique(this);
355
0
    }
356
0
    _routine_load_task_executor = new RoutineLoadTaskExecutor(this);
357
0
    RETURN_IF_ERROR(_routine_load_task_executor->init(MemInfo::mem_limit()));
358
0
    _small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
359
0
    _group_commit_mgr = new GroupCommitMgr(this);
360
0
    _cdc_client_mgr = new CdcClientMgr();
361
0
    _memtable_memory_limiter = std::make_unique<MemTableMemoryLimiter>();
362
0
    _load_stream_map_pool = std::make_unique<LoadStreamMapPool>();
363
0
    _delta_writer_v2_pool = std::make_unique<DeltaWriterV2Pool>();
364
0
    _wal_manager = WalManager::create_unique(this, config::group_commit_wal_path);
365
0
    _dns_cache = new DNSCache();
366
0
    _write_cooldown_meta_executors = std::make_unique<WriteCooldownMetaExecutors>();
367
368
0
    _spill_file_mgr = new SpillFileManager(std::move(spill_store_map));
369
0
    _kerberos_ticket_mgr = new kerberos::KerberosTicketMgr(config::kerberos_ccache_path);
370
0
    _hdfs_mgr = new io::HdfsMgr();
371
0
    _backend_client_cache->init_metrics("backend");
372
0
    _frontend_client_cache->init_metrics("frontend");
373
0
    _broker_client_cache->init_metrics("broker");
374
0
    static_cast<void>(_result_mgr->init());
375
0
    Status status = _load_path_mgr->init();
376
0
    if (!status.ok()) {
377
0
        LOG(ERROR) << "Load path mgr init failed. " << status;
378
0
        return status;
379
0
    }
380
0
    _broker_mgr->init();
381
0
    static_cast<void>(_small_file_mgr->init());
382
0
    if (!status.ok()) {
383
0
        LOG(ERROR) << "Scanner scheduler init failed. " << status;
384
0
        return status;
385
0
    }
386
387
0
    RETURN_IF_ERROR(_memtable_memory_limiter->init(MemInfo::mem_limit()));
388
0
    RETURN_IF_ERROR(_load_channel_mgr->init(MemInfo::mem_limit()));
389
0
    RETURN_IF_ERROR(_wal_manager->init());
390
0
    _heartbeat_flags = new HeartbeatFlags();
391
392
0
    _tablet_schema_cache =
393
0
            TabletSchemaCache::create_global_schema_cache(config::tablet_schema_cache_capacity);
394
395
0
    _tablet_column_object_pool = TabletColumnObjectPool::create_global_column_cache(
396
0
            config::tablet_schema_cache_capacity);
397
398
    // Storage engine
399
0
    doris::EngineOptions options;
400
0
    options.store_paths = store_paths;
401
0
    options.broken_paths = broken_paths;
402
0
    options.backend_uid = doris::UniqueId::gen_uid();
403
    // Check if the startup mode has been modified
404
0
    RETURN_IF_ERROR(_check_deploy_mode());
405
0
    if (config::is_cloud_mode()) {
406
0
        std::cout << "start BE in cloud mode, cloud_unique_id: " << config::cloud_unique_id
407
0
                  << ", meta_service_endpoint: " << config::meta_service_endpoint << std::endl;
408
0
        _storage_engine = std::make_unique<CloudStorageEngine>(options);
409
0
    } else {
410
0
        std::cout << "start BE in local mode" << std::endl;
411
0
        _storage_engine = std::make_unique<StorageEngine>(options);
412
0
    }
413
0
    auto st = _storage_engine->open();
414
0
    if (!st.ok()) {
415
0
        LOG(ERROR) << "Fail to open StorageEngine, res=" << st;
416
0
        return st;
417
0
    }
418
0
    _storage_engine->set_heartbeat_flags(this->heartbeat_flags());
419
0
    if (st = _storage_engine->start_bg_threads(nullptr); !st.ok()) {
420
0
        LOG(ERROR) << "Failed to starge bg threads of storage engine, res=" << st;
421
0
        return st;
422
0
    }
423
424
    // should start after storage_engine->open()
425
0
    _stream_load_recorder_manager = new StreamLoadRecorderManager();
426
0
    _stream_load_recorder_manager->start();
427
428
    // create internal workload group should be after storage_engin->open()
429
0
    RETURN_IF_ERROR(_create_internal_workload_group());
430
0
    _workload_sched_mgr = new WorkloadSchedPolicyMgr();
431
0
    _workload_sched_mgr->start(this);
432
433
    // Initialize packed file manager
434
0
    _packed_file_manager = io::PackedFileManager::instance();
435
0
    if (config::is_cloud_mode()) {
436
0
        RETURN_IF_ERROR(_packed_file_manager->init());
437
0
        _packed_file_manager->start_background_manager();
438
439
        // Start cluster info background worker for compaction read-write separation
440
0
        static_cast<CloudClusterInfo*>(_cluster_info)->start_bg_worker();
441
442
        // Initialize MS RPC rate limiters and table-level backpressure handling.
443
0
        _ms_rpc_rate_limit_services = std::make_unique<MSRpcRateLimitServices>();
444
0
        static_cast<CloudStorageEngine*>(_storage_engine.get())
445
0
                ->meta_mgr()
446
0
                .set_host_level_ms_rpc_rate_limiters(
447
0
                        _ms_rpc_rate_limit_services->host_level_ms_rpc_rate_limiters());
448
0
        static_cast<CloudStorageEngine*>(_storage_engine.get())
449
0
                ->meta_mgr()
450
0
                .set_ms_backpressure_handler(
451
0
                        _ms_rpc_rate_limit_services->ms_backpressure_handler());
452
0
    }
453
454
0
    _index_policy_mgr = new IndexPolicyMgr();
455
456
    // Initialize warmup download rate limiter for cloud mode
457
    // Always create the rate limiter in cloud mode to support dynamic rate limit changes
458
0
    if (config::is_cloud_mode()) {
459
0
        int64_t rate_limit = config::file_cache_warmup_download_rate_limit_bytes_per_second;
460
        // When rate_limit <= 0, pass 0 to disable rate limiting
461
0
        int64_t rate = rate_limit > 0 ? rate_limit : 0;
462
        // max_burst is the same as rate (1 second burst)
463
        // limit is 0 which means no total limit
464
        // When rate is 0, S3RateLimiter will not throttle (no rate limiting)
465
0
        _warmup_download_rate_limiter = new S3RateLimiterHolder(rate, rate, 0, [&](int64_t ns) {
466
0
            if (ns > 0) {
467
0
                warmup_download_rate_limit_latency << ns / 1000;
468
0
                warmup_download_rate_limit_ns << ns;
469
0
                warmup_download_rate_limit_exceed_req_num << 1;
470
0
            }
471
0
        });
472
0
    }
473
474
0
    RETURN_IF_ERROR(_spill_file_mgr->init());
475
0
    RETURN_IF_ERROR(_runtime_query_statistics_mgr->start_report_thread());
476
0
    _dict_factory = new doris::DictionaryFactory();
477
0
    _s_ready = true;
478
479
0
    init_simdjson_parser();
480
481
    // Make aws-sdk-cpp InitAPI and ShutdownAPI called in the same thread
482
0
    S3ClientFactory::instance();
483
0
    return Status::OK();
484
0
}
485
486
// when user not sepcify a workload group in FE, then query could
487
// use dummy workload group.
488
0
Status ExecEnv::_create_internal_workload_group() {
489
0
    LOG(INFO) << "begin create internal workload group.";
490
491
0
    RETURN_IF_ERROR(_workload_group_manager->create_internal_wg());
492
0
    return Status::OK();
493
0
}
494
495
2
void ExecEnv::_init_runtime_filter_timer_queue() {
496
2
    _runtime_filter_timer_queue = new doris::RuntimeFilterTimerQueue();
497
2
    _runtime_filter_timer_queue->run();
498
2
}
499
500
0
void ExecEnv::init_file_cache_factory(std::vector<doris::CachePath>& cache_paths) {
501
    // Load file cache before starting up daemon threads to make sure StorageEngine is read.
502
0
    if (!config::enable_file_cache) {
503
0
        if (config::is_cloud_mode()) {
504
0
            LOG(FATAL) << "Cloud mode requires to enable file cache, plz set "
505
0
                          "config::enable_file_cache "
506
0
                          "= true";
507
0
            exit(-1);
508
0
        }
509
0
        return;
510
0
    }
511
0
    if (config::file_cache_each_block_size > config::s3_write_buffer_size ||
512
0
        config::s3_write_buffer_size % config::file_cache_each_block_size != 0) {
513
0
        LOG_FATAL(
514
0
                "The config file_cache_each_block_size {} must less than or equal to config "
515
0
                "s3_write_buffer_size {} and config::s3_write_buffer_size % "
516
0
                "config::file_cache_each_block_size must be zero",
517
0
                config::file_cache_each_block_size, config::s3_write_buffer_size);
518
0
        exit(-1);
519
0
    }
520
0
    std::unordered_set<std::string> cache_path_set;
521
0
    Status rest = doris::parse_conf_cache_paths(doris::config::file_cache_path, cache_paths);
522
0
    if (!rest) {
523
0
        throw Exception(
524
0
                Status::FatalError("parse config file cache path failed, path={}, reason={}",
525
0
                                   doris::config::file_cache_path, rest.msg()));
526
0
    }
527
528
0
    doris::Status cache_status;
529
0
    for (auto& cache_path : cache_paths) {
530
0
        if (cache_path_set.find(cache_path.path) != cache_path_set.end()) {
531
0
            LOG(WARNING) << fmt::format("cache path {} is duplicate", cache_path.path);
532
0
            continue;
533
0
        }
534
535
0
        cache_status = doris::io::FileCacheFactory::instance()->create_file_cache(
536
0
                cache_path.path, cache_path.init_settings());
537
0
        if (!cache_status.ok()) {
538
0
            if (!doris::config::ignore_broken_disk) {
539
0
                throw Exception(
540
0
                        Status::FatalError("failed to init file cache, err: {}", cache_status));
541
0
            }
542
0
            LOG(WARNING) << "failed to init file cache, err: " << cache_status;
543
0
        }
544
0
        cache_path_set.emplace(cache_path.path);
545
0
    }
546
0
}
547
548
0
Status ExecEnv::init_mem_env() {
549
0
    bool is_percent = false;
550
0
    std::stringstream ss;
551
    // 1. init mem tracker
552
0
    _process_profile = ProcessProfile::create_global_instance();
553
0
    _heap_profiler = HeapProfiler::create_global_instance();
554
0
    init_mem_tracker();
555
0
    thread_context()->thread_mem_tracker_mgr->init();
556
557
0
    if (!BitUtil::IsPowerOf2(config::min_buffer_size)) {
558
0
        ss << "Config min_buffer_size must be a power-of-two: " << config::min_buffer_size;
559
0
        return Status::InternalError(ss.str());
560
0
    }
561
562
0
    _id_manager = new IdManager();
563
0
    _cache_manager = CacheManager::create_global_instance();
564
565
0
    int64_t storage_cache_limit =
566
0
            ParseUtil::parse_mem_spec(config::storage_page_cache_limit, MemInfo::mem_limit(),
567
0
                                      MemInfo::physical_mem(), &is_percent);
568
0
    while (!is_percent && storage_cache_limit > MemInfo::mem_limit() / 2) {
569
0
        storage_cache_limit = storage_cache_limit / 2;
570
0
    }
571
0
    int32_t index_percentage = config::index_page_cache_percentage;
572
0
    int32_t num_shards = config::storage_page_cache_shard_size;
573
0
    if ((num_shards & (num_shards - 1)) != 0) {
574
0
        int old_num_shards = num_shards;
575
0
        num_shards = cast_set<int>(BitUtil::RoundUpToPowerOfTwo(num_shards));
576
0
        LOG(WARNING) << "num_shards should be power of two, but got " << old_num_shards
577
0
                     << ". Rounded up to " << num_shards
578
0
                     << ". Please modify the 'storage_page_cache_shard_size' parameter in your "
579
0
                        "conf file to be a power of two for better performance.";
580
0
    }
581
0
    if (storage_cache_limit < num_shards * 2) {
582
0
        LOG(WARNING) << "storage_cache_limit(" << storage_cache_limit << ") less than num_shards("
583
0
                     << num_shards
584
0
                     << ") * 2, cache capacity will be 0, continuing to use "
585
0
                        "cache will only have negative effects, will be disabled.";
586
0
    }
587
0
    int64_t pk_storage_page_cache_limit =
588
0
            ParseUtil::parse_mem_spec(config::pk_storage_page_cache_limit, MemInfo::mem_limit(),
589
0
                                      MemInfo::physical_mem(), &is_percent);
590
0
    while (!is_percent && pk_storage_page_cache_limit > MemInfo::mem_limit() / 2) {
591
0
        pk_storage_page_cache_limit = storage_cache_limit / 2;
592
0
    }
593
0
    _storage_page_cache = StoragePageCache::create_global_cache(
594
0
            storage_cache_limit, index_percentage, pk_storage_page_cache_limit, num_shards);
595
0
    LOG(INFO) << "Storage page cache memory limit: "
596
0
              << PrettyPrinter::print(storage_cache_limit, TUnit::BYTES)
597
0
              << ", origin config value: " << config::storage_page_cache_limit;
598
599
    // Init ANN index IVF list cache (dedicated cache for IVF on disk)
600
0
    {
601
0
        int64_t ann_cache_limit = ParseUtil::parse_mem_spec(config::ann_index_ivf_list_cache_limit,
602
0
                                                            MemInfo::mem_limit(),
603
0
                                                            MemInfo::physical_mem(), &is_percent);
604
0
        while (!is_percent && ann_cache_limit > MemInfo::mem_limit() / 2) {
605
0
            ann_cache_limit = ann_cache_limit / 2;
606
0
        }
607
0
        _ann_index_ivf_list_cache = AnnIndexIVFListCache::create_global_cache(ann_cache_limit);
608
0
        LOG(INFO) << "ANN index IVF list cache memory limit: "
609
0
                  << PrettyPrinter::print(ann_cache_limit, TUnit::BYTES)
610
0
                  << ", origin config value: " << config::ann_index_ivf_list_cache_limit;
611
0
    }
612
613
    // Init row cache
614
0
    int64_t row_cache_mem_limit =
615
0
            ParseUtil::parse_mem_spec(config::row_cache_mem_limit, MemInfo::mem_limit(),
616
0
                                      MemInfo::physical_mem(), &is_percent);
617
0
    while (!is_percent && row_cache_mem_limit > MemInfo::mem_limit() / 2) {
618
        // Reason same as buffer_pool_limit
619
0
        row_cache_mem_limit = row_cache_mem_limit / 2;
620
0
    }
621
0
    _row_cache = RowCache::create_global_cache(row_cache_mem_limit);
622
0
    LOG(INFO) << "Row cache memory limit: "
623
0
              << PrettyPrinter::print(row_cache_mem_limit, TUnit::BYTES)
624
0
              << ", origin config value: " << config::row_cache_mem_limit;
625
626
0
    uint64_t fd_number = config::min_file_descriptor_number;
627
0
    struct rlimit l;
628
0
    int ret = getrlimit(RLIMIT_NOFILE, &l);
629
0
    if (ret != 0) {
630
0
        LOG(WARNING) << "call getrlimit() failed. errno=" << strerror(errno)
631
0
                     << ", use default configuration instead.";
632
0
    } else {
633
0
        fd_number = static_cast<uint64_t>(l.rlim_cur);
634
0
    }
635
#ifdef __APPLE__
636
    // On macOS, rlim_cur can be RLIM_INFINITY (INT64_MAX), which causes
637
    // fd_number / 100 * percentage to overflow and crash cast_set<uint32_t>.
638
    // Linux kernels cap this via fs.nr_open (default 1M), so only macOS needs this.
639
    {
640
        constexpr uint64_t max_fd = UINT32_MAX >> 2;
641
        if (fd_number > max_fd) {
642
            fd_number = max_fd;
643
        }
644
    }
645
#endif
646
647
0
    int64_t segment_cache_capacity = 0;
648
0
    if (config::is_cloud_mode()) {
649
        // when in cloud mode, segment cache hold no system FD
650
        // thus the FD num limit makes no sense
651
        // cloud mode use FDCache to control FD
652
0
        segment_cache_capacity = UINT32_MAX;
653
0
    } else {
654
        // SegmentLoader caches segments in rowset granularity. So the size of
655
        // opened files will greater than segment_cache_capacity.
656
0
        segment_cache_capacity = config::segment_cache_capacity;
657
0
        int64_t segment_cache_fd_limit = fd_number / 100 * config::segment_cache_fd_percentage;
658
0
        if (segment_cache_capacity < 0 || segment_cache_capacity > segment_cache_fd_limit) {
659
0
            segment_cache_capacity = segment_cache_fd_limit;
660
0
        }
661
0
    }
662
663
0
    int64_t segment_cache_mem_limit =
664
0
            MemInfo::mem_limit() / 100 * config::segment_cache_memory_percentage;
665
666
0
    _segment_loader = new SegmentLoader(segment_cache_mem_limit, segment_cache_capacity);
667
0
    LOG(INFO) << "segment_cache_capacity <= fd_number * 1 / 5, fd_number: " << fd_number
668
0
              << " segment_cache_capacity: " << segment_cache_capacity
669
0
              << " min_segment_cache_mem_limit " << segment_cache_mem_limit;
670
671
0
    size_t block_file_cache_fd_cache_size =
672
0
            std::min((uint64_t)config::file_cache_max_file_reader_cache_size, fd_number / 3);
673
0
    LOG(INFO) << "max file reader cache size is: " << block_file_cache_fd_cache_size
674
0
              << ", resource hard limit is: " << fd_number
675
0
              << ", config file_cache_max_file_reader_cache_size is: "
676
0
              << config::file_cache_max_file_reader_cache_size;
677
0
    config::file_cache_max_file_reader_cache_size = block_file_cache_fd_cache_size;
678
679
0
    _file_meta_cache = new FileMetaCache(config::max_external_file_meta_cache_num);
680
681
0
    _lookup_connection_cache =
682
0
            LookupConnectionCache::create_global_instance(config::lookup_connection_cache_capacity);
683
684
    // use memory limit
685
0
    int64_t inverted_index_cache_limit =
686
0
            ParseUtil::parse_mem_spec(config::inverted_index_searcher_cache_limit,
687
0
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
688
0
    while (!is_percent && inverted_index_cache_limit > MemInfo::mem_limit() / 2) {
689
        // Reason same as buffer_pool_limit
690
0
        inverted_index_cache_limit = inverted_index_cache_limit / 2;
691
0
    }
692
0
    _inverted_index_searcher_cache =
693
0
            InvertedIndexSearcherCache::create_global_instance(inverted_index_cache_limit, 256);
694
0
    LOG(INFO) << "Inverted index searcher cache memory limit: "
695
0
              << PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
696
0
              << ", origin config value: " << config::inverted_index_searcher_cache_limit;
697
698
    // use memory limit
699
0
    int64_t inverted_index_query_cache_limit =
700
0
            ParseUtil::parse_mem_spec(config::inverted_index_query_cache_limit,
701
0
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
702
0
    while (!is_percent && inverted_index_query_cache_limit > MemInfo::mem_limit() / 2) {
703
        // Reason same as buffer_pool_limit
704
0
        inverted_index_query_cache_limit = inverted_index_query_cache_limit / 2;
705
0
    }
706
0
    _inverted_index_query_cache = InvertedIndexQueryCache::create_global_cache(
707
0
            inverted_index_query_cache_limit, config::inverted_index_query_cache_shards);
708
0
    LOG(INFO) << "Inverted index query match cache memory limit: "
709
0
              << PrettyPrinter::print(inverted_index_query_cache_limit, TUnit::BYTES)
710
0
              << ", origin config value: " << config::inverted_index_query_cache_limit;
711
712
    // ANN index topn result cache
713
0
    int64_t ann_topn_cache_limit =
714
0
            ParseUtil::parse_mem_spec(config::ann_index_result_cache_limit, MemInfo::mem_limit(),
715
0
                                      MemInfo::physical_mem(), &is_percent);
716
0
    _ann_index_result_cache =
717
0
            segment_v2::AnnIndexResultCache::create_global_cache(ann_topn_cache_limit);
718
0
    LOG(INFO) << "ANN index topn result cache memory limit: "
719
0
              << PrettyPrinter::print(ann_topn_cache_limit, TUnit::BYTES)
720
0
              << ", origin config value: " << config::ann_index_result_cache_limit;
721
722
    // use memory limit
723
0
    int64_t condition_cache_limit = config::condition_cache_limit * 1024L * 1024L;
724
0
    _condition_cache = ConditionCache::create_global_cache(condition_cache_limit);
725
0
    LOG(INFO) << "Condition cache memory limit: "
726
0
              << PrettyPrinter::print(condition_cache_limit, TUnit::BYTES)
727
0
              << ", origin config value: " << config::condition_cache_limit;
728
729
    // Initialize encoding info resolver
730
0
    _encoding_info_resolver = new segment_v2::EncodingInfoResolver();
731
732
    // init orc memory pool
733
0
    _orc_memory_pool = new doris::ORCMemoryPool();
734
0
    _arrow_memory_pool = new doris::ArrowMemoryPool();
735
736
0
    _query_cache = QueryCache::create_global_cache(config::query_cache_size * 1024L * 1024L);
737
0
    LOG(INFO) << "query cache memory limit: " << config::query_cache_size << "MB";
738
739
    // The default delete bitmap cache is set to 100MB,
740
    // which can be insufficient and cause performance issues when the amount of user data is large.
741
    // To mitigate the problem of an inadequate cache,
742
    // we will take the larger of 0.5% of the total memory and 100MB as the delete bitmap cache size.
743
0
    int64_t delete_bitmap_agg_cache_cache_limit =
744
0
            ParseUtil::parse_mem_spec(config::delete_bitmap_dynamic_agg_cache_limit,
745
0
                                      MemInfo::mem_limit(), MemInfo::physical_mem(), &is_percent);
746
0
    _delete_bitmap_agg_cache = DeleteBitmapAggCache::create_instance(std::max(
747
0
            delete_bitmap_agg_cache_cache_limit, config::delete_bitmap_agg_cache_capacity));
748
749
0
    return Status::OK();
750
0
}
751
752
1
void ExecEnv::init_mem_tracker() {
753
1
    mem_tracker_limiter_pool.resize(MEM_TRACKER_GROUP_NUM,
754
1
                                    TrackerLimiterGroup()); // before all mem tracker init.
755
1
    _s_tracking_memory = true;
756
1
    _orphan_mem_tracker =
757
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "Orphan");
758
1
    _brpc_iobuf_block_memory_tracker =
759
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "IOBufBlockMemory");
760
1
    _segcompaction_mem_tracker =
761
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::COMPACTION, "SegCompaction");
762
1
    _tablets_no_cache_mem_tracker = MemTrackerLimiter::create_shared(
763
1
            MemTrackerLimiter::Type::METADATA, "Tablets(not in TabletSchemaCache)");
764
1
    _segments_no_cache_mem_tracker = MemTrackerLimiter::create_shared(
765
1
            MemTrackerLimiter::Type::METADATA, "Segments(not in SegmentCache)");
766
1
    _rowsets_no_cache_mem_tracker =
767
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "Rowsets");
768
1
    _point_query_executor_mem_tracker =
769
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "PointQueryExecutor");
770
1
    _query_cache_mem_tracker =
771
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::CACHE, "QueryCache");
772
1
    _block_compression_mem_tracker =
773
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "BlockCompression");
774
1
    _rowid_storage_reader_tracker =
775
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "RowIdStorageReader");
776
1
    _subcolumns_tree_tracker =
777
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "SubcolumnsTree");
778
1
    _s3_file_buffer_tracker =
779
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL, "S3FileBuffer");
780
1
    _stream_load_pipe_tracker =
781
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, "StreamLoadPipe");
782
1
    _parquet_meta_tracker =
783
1
            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::METADATA, "ParquetMeta");
784
1
}
785
786
0
Status ExecEnv::_check_deploy_mode() {
787
0
    for (auto _path : _store_paths) {
788
0
        auto deploy_mode_path = fmt::format("{}/{}", _path.path, DEPLOY_MODE_PREFIX);
789
0
        std::string expected_mode = doris::config::is_cloud_mode() ? "cloud" : "local";
790
0
        bool exists = false;
791
0
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(deploy_mode_path, &exists));
792
0
        if (exists) {
793
            // check if is ok
794
0
            io::FileReaderSPtr reader;
795
0
            RETURN_IF_ERROR(io::global_local_filesystem()->open_file(deploy_mode_path, &reader));
796
0
            size_t fsize = reader->size();
797
0
            if (fsize > 0) {
798
0
                std::string actual_mode;
799
0
                actual_mode.resize(fsize, '\0');
800
0
                size_t bytes_read = 0;
801
0
                RETURN_IF_ERROR(reader->read_at(0, {actual_mode.data(), fsize}, &bytes_read));
802
0
                DCHECK_EQ(fsize, bytes_read);
803
0
                if (expected_mode != actual_mode) {
804
0
                    return Status::InternalError(
805
0
                            "You can't switch deploy mode from {} to {}, "
806
0
                            "maybe you need to check be.conf\n",
807
0
                            actual_mode.c_str(), expected_mode.c_str());
808
0
                }
809
0
                LOG(INFO) << "The current deployment mode is " << expected_mode << ".";
810
0
            }
811
0
        } else {
812
0
            io::FileWriterPtr file_writer;
813
0
            RETURN_IF_ERROR(
814
0
                    io::global_local_filesystem()->create_file(deploy_mode_path, &file_writer));
815
0
            RETURN_IF_ERROR(file_writer->append(expected_mode));
816
0
            RETURN_IF_ERROR(file_writer->close());
817
0
            LOG(INFO) << "The file deploy_mode doesn't exist, create it.";
818
0
            auto cluster_id_path = fmt::format("{}/{}", _path.path, CLUSTER_ID_PREFIX);
819
0
            RETURN_IF_ERROR(io::global_local_filesystem()->exists(cluster_id_path, &exists));
820
0
            if (exists) {
821
0
                LOG(WARNING) << "This may be an upgrade from old version,"
822
0
                             << "or the deploy_mode file has been manually deleted";
823
0
            }
824
0
        }
825
0
    }
826
0
    return Status::OK();
827
0
}
828
829
#ifdef BE_TEST
830
2
void ExecEnv::set_new_load_stream_mgr(std::unique_ptr<NewLoadStreamMgr>&& new_load_stream_mgr) {
831
2
    this->_new_load_stream_mgr = std::move(new_load_stream_mgr);
832
2
}
833
834
2
void ExecEnv::clear_new_load_stream_mgr() {
835
2
    this->_new_load_stream_mgr.reset();
836
2
}
837
838
1
void ExecEnv::set_stream_load_executor(std::unique_ptr<StreamLoadExecutor>&& stream_load_executor) {
839
1
    this->_stream_load_executor = std::move(stream_load_executor);
840
1
}
841
842
1
void ExecEnv::clear_stream_load_executor() {
843
1
    this->_stream_load_executor.reset();
844
1
}
845
846
2
void ExecEnv::set_wal_mgr(std::unique_ptr<WalManager>&& wm) {
847
2
    this->_wal_manager = std::move(wm);
848
2
}
849
2
void ExecEnv::clear_wal_mgr() {
850
2
    this->_wal_manager.reset();
851
2
}
852
#endif
853
// TODO(zhiqiang): Need refactor all thread pool. Each thread pool must have a Stop method.
854
// We need to stop all threads before releasing resource.
855
12
void ExecEnv::destroy() {
856
    //Only destroy once after init
857
12
    if (!ready()) {
858
8
        return;
859
8
    }
860
    // Memory barrier to prevent other threads from accessing destructed resources
861
4
    _s_ready = false;
862
863
4
    SAFE_STOP(_wal_manager);
864
4
    _wal_manager.reset();
865
4
    SAFE_STOP(_load_channel_mgr);
866
4
    SAFE_STOP(_broker_mgr);
867
4
    SAFE_STOP(_load_path_mgr);
868
4
    SAFE_STOP(_result_mgr);
869
4
    SAFE_STOP(_group_commit_mgr);
870
    // _routine_load_task_executor should be stopped before _new_load_stream_mgr.
871
4
    SAFE_STOP(_routine_load_task_executor);
872
4
    SAFE_STOP(_stream_load_recorder_manager);
873
    // stop workload scheduler
874
4
    SAFE_STOP(_workload_sched_mgr);
875
    // stop pipline step 2, cgroup execution
876
4
    SAFE_STOP(_workload_group_manager);
877
878
4
    SAFE_STOP(_external_scan_context_mgr);
879
4
    SAFE_STOP(_fragment_mgr);
880
4
    SAFE_STOP(_runtime_filter_timer_queue);
881
    // NewLoadStreamMgr should be destoried before storage_engine & after fragment_mgr stopped.
882
4
    _load_stream_mgr.reset();
883
4
    _new_load_stream_mgr.reset();
884
4
    _stream_load_executor.reset();
885
4
    _memtable_memory_limiter.reset();
886
4
    _delta_writer_v2_pool.reset();
887
4
    _load_stream_map_pool.reset();
888
4
    SAFE_STOP(_write_cooldown_meta_executors);
889
890
    // _id_manager must be destoried before tablet schema cache
891
4
    SAFE_DELETE(_id_manager);
892
893
    // Stop cluster info background worker before storage engine is destroyed
894
4
    if (config::is_cloud_mode() && _cluster_info) {
895
0
        static_cast<CloudClusterInfo*>(_cluster_info)->stop_bg_worker();
896
0
    }
897
898
    // StorageEngine must be destoried before _cache_manager destory.
899
4
    SAFE_STOP(_storage_engine);
900
4
    _storage_engine.reset();
901
902
4
    SAFE_STOP(_spill_file_mgr);
903
4
    if (_runtime_query_statistics_mgr) {
904
0
        _runtime_query_statistics_mgr->stop_report_thread();
905
0
    }
906
4
    SAFE_SHUTDOWN(_buffered_reader_prefetch_thread_pool);
907
4
    SAFE_SHUTDOWN(_segment_prefetch_thread_pool);
908
4
    SAFE_SHUTDOWN(_s3_file_upload_thread_pool);
909
4
    SAFE_SHUTDOWN(_lazy_release_obj_pool);
910
4
    SAFE_SHUTDOWN(_non_block_close_thread_pool);
911
4
    SAFE_SHUTDOWN(_s3_file_system_thread_pool);
912
4
    SAFE_SHUTDOWN(_peer_race_s3_thread_pool);
913
4
    SAFE_SHUTDOWN(_send_batch_thread_pool);
914
4
    SAFE_SHUTDOWN(_udf_close_workers_thread_pool);
915
4
    SAFE_SHUTDOWN(_send_table_stats_thread_pool);
916
917
4
    SAFE_DELETE(_load_channel_mgr);
918
919
4
    SAFE_DELETE(_inverted_index_query_cache);
920
4
    SAFE_DELETE(_inverted_index_searcher_cache);
921
4
    SAFE_DELETE(_condition_cache);
922
4
    SAFE_DELETE(_encoding_info_resolver);
923
4
    SAFE_DELETE(_lookup_connection_cache);
924
4
    SAFE_DELETE(_segment_loader);
925
4
    SAFE_DELETE(_row_cache);
926
4
    SAFE_DELETE(_query_cache);
927
4
    SAFE_DELETE(_delete_bitmap_agg_cache);
928
929
    // Free resource after threads are stopped.
930
    // Some threads are still running, like threads created by _new_load_stream_mgr ...
931
4
    SAFE_DELETE(_tablet_schema_cache);
932
4
    SAFE_DELETE(_tablet_column_object_pool);
933
934
    // _storage_page_cache must be destoried before _cache_manager
935
4
    SAFE_DELETE(_ann_index_ivf_list_cache);
936
4
    SAFE_DELETE(_storage_page_cache);
937
938
4
    SAFE_DELETE(_small_file_mgr);
939
4
    SAFE_DELETE(_broker_mgr);
940
4
    SAFE_DELETE(_load_path_mgr);
941
4
    SAFE_DELETE(_result_mgr);
942
4
    SAFE_DELETE(_file_meta_cache);
943
4
    SAFE_DELETE(_group_commit_mgr);
944
4
    SAFE_DELETE(_cdc_client_mgr);
945
4
    SAFE_DELETE(_routine_load_task_executor);
946
4
    SAFE_DELETE(_stream_load_recorder_manager);
947
    // _stream_load_executor
948
4
    SAFE_DELETE(_function_client_cache);
949
4
    SAFE_DELETE(_streaming_client_cache);
950
4
    SAFE_DELETE(_internal_client_cache);
951
952
4
    SAFE_DELETE(_bfd_parser);
953
4
    SAFE_DELETE(_result_cache);
954
4
    SAFE_DELETE(_vstream_mgr);
955
    // When _vstream_mgr is deconstructed, it will try call query context's dctor and will
956
    // access spill stream mgr, so spill stream mgr should be deconstructed after data stream manager
957
4
    SAFE_DELETE(_spill_file_mgr);
958
4
    SAFE_DELETE(_fragment_mgr);
959
4
    SAFE_DELETE(_workload_sched_mgr);
960
4
    SAFE_DELETE(_workload_group_manager);
961
4
    SAFE_DELETE(_file_cache_factory);
962
4
    SAFE_DELETE(_runtime_filter_timer_queue);
963
4
    SAFE_DELETE(_dict_factory);
964
    // TODO(zhiqiang): Maybe we should call shutdown before release thread pool?
965
4
    _lazy_release_obj_pool.reset(nullptr);
966
4
    _non_block_close_thread_pool.reset(nullptr);
967
4
    _s3_file_system_thread_pool.reset(nullptr);
968
4
    _peer_race_s3_thread_pool.reset(nullptr);
969
4
    _send_table_stats_thread_pool.reset(nullptr);
970
4
    _buffered_reader_prefetch_thread_pool.reset(nullptr);
971
4
    _segment_prefetch_thread_pool.reset(nullptr);
972
4
    _s3_file_upload_thread_pool.reset(nullptr);
973
4
    _send_batch_thread_pool.reset(nullptr);
974
4
    _udf_close_workers_thread_pool.reset(nullptr);
975
4
    _write_cooldown_meta_executors.reset(nullptr);
976
977
4
    SAFE_DELETE(_broker_client_cache);
978
4
    SAFE_DELETE(_frontend_client_cache);
979
4
    SAFE_DELETE(_backend_client_cache);
980
4
    SAFE_DELETE(_result_queue_mgr);
981
982
4
    SAFE_DELETE(_external_scan_context_mgr);
983
4
    SAFE_DELETE(_user_function_cache);
984
985
    // cache_manager must be destoried after all cache.
986
    // https://github.com/apache/doris/issues/24082#issuecomment-1712544039
987
4
    SAFE_DELETE(_cache_manager);
988
4
    _file_cache_open_fd_cache.reset(nullptr);
989
990
    // _heartbeat_flags must be destoried after staroge engine
991
4
    SAFE_DELETE(_heartbeat_flags);
992
993
    // Master Info is a thrift object, it could be the last one to deconstruct.
994
    // Master info should be deconstruct later than fragment manager, because fragment will
995
    // access cluster_info.backend_id to access some info. If there is a running query and master
996
    // info is deconstructed then BE process will core at coordinator back method in fragment mgr.
997
4
    SAFE_DELETE(_cluster_info);
998
999
    // NOTE: runtime query statistics mgr could be visited by query and daemon thread
1000
    // so it should be created before all query begin and deleted after all query and daemon thread stoppped
1001
4
    SAFE_DELETE(_runtime_query_statistics_mgr);
1002
1003
4
    SAFE_DELETE(_arrow_memory_pool);
1004
1005
4
    SAFE_DELETE(_orc_memory_pool);
1006
1007
    // dns cache is a global instance and need to be released at last
1008
4
    SAFE_DELETE(_dns_cache);
1009
4
    SAFE_DELETE(_kerberos_ticket_mgr);
1010
4
    SAFE_DELETE(_hdfs_mgr);
1011
    // PackedFileManager is a singleton, just stop its background thread
1012
4
    if (_packed_file_manager) {
1013
0
        _packed_file_manager->stop_background_manager();
1014
0
        _packed_file_manager = nullptr;
1015
0
    }
1016
1017
4
    SAFE_DELETE(_process_profile);
1018
4
    SAFE_DELETE(_heap_profiler);
1019
1020
4
    SAFE_DELETE(_index_policy_mgr);
1021
4
    SAFE_DELETE(_warmup_download_rate_limiter);
1022
1023
4
    _s_tracking_memory = false;
1024
1025
4
    clear_storage_resource();
1026
4
    PythonServerManager::instance().shutdown();
1027
4
    LOG(INFO) << "Doris exec envorinment is destoried.";
1028
4
}
1029
1030
} // namespace doris
1031
1032
namespace doris::config {
1033
namespace {
1034
1035
0
void refresh_ms_rpc_rate_limiters() {
1036
0
    auto* services = ExecEnv::GetInstance()->ms_rpc_rate_limit_services();
1037
0
    if (services != nullptr) {
1038
0
        services->reset_host_level_rate_limiters();
1039
0
    }
1040
0
}
1041
1042
0
void refresh_ms_backpressure_throttle_params() {
1043
0
    auto* services = ExecEnv::GetInstance()->ms_rpc_rate_limit_services();
1044
0
    if (services != nullptr) {
1045
0
        services->update_backpressure_throttle_params(ms_backpressure_upgrade_top_k,
1046
0
                                                      ms_backpressure_throttle_ratio,
1047
0
                                                      ms_rpc_table_qps_limit_floor);
1048
0
    }
1049
0
}
1050
1051
0
void refresh_ms_backpressure_coordinator_params() {
1052
0
    auto* services = ExecEnv::GetInstance()->ms_rpc_rate_limit_services();
1053
0
    if (services != nullptr) {
1054
0
        services->update_backpressure_coordinator_params(ms_backpressure_upgrade_interval_ms,
1055
0
                                                         ms_backpressure_downgrade_interval_ms);
1056
0
    }
1057
0
}
1058
1059
} // namespace
1060
1061
// Callback to update warmup download rate limiter when config changes is registered
1062
DEFINE_ON_UPDATE(file_cache_warmup_download_rate_limit_bytes_per_second,
1063
                 [](int64_t old_val, int64_t new_val) {
1064
                     auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter();
1065
                     if (rate_limiter != nullptr && new_val != old_val) {
1066
                         // Reset rate limiter with new rate limit value
1067
                         // When new_val <= 0, pass 0 to disable rate limiting
1068
                         int64_t rate = new_val > 0 ? new_val : 0;
1069
                         rate_limiter->reset(rate, rate, 0);
1070
                         if (rate > 0) {
1071
                             LOG(INFO) << "Warmup download rate limiter updated from " << old_val
1072
                                       << " to " << new_val << " bytes/s";
1073
                         } else {
1074
                             LOG(INFO) << "Warmup download rate limiter disabled";
1075
                         }
1076
                     }
1077
                 });
1078
1079
DEFINE_ON_UPDATE(ms_rpc_qps_default, [](int32_t old_val, int32_t new_val) {
1080
    if (old_val != new_val) {
1081
        refresh_ms_rpc_rate_limiters();
1082
    }
1083
});
1084
1085
#define DEFINE_MS_RPC_QPS_ON_UPDATE(enum_name, config_suffix, display_name)             \
1086
    DEFINE_ON_UPDATE(ms_rpc_qps_##config_suffix, [](int32_t old_val, int32_t new_val) { \
1087
        if (old_val != new_val) {                                                       \
1088
            refresh_ms_rpc_rate_limiters();                                             \
1089
        }                                                                               \
1090
    });
1091
META_SERVICE_RPC_TYPES(DEFINE_MS_RPC_QPS_ON_UPDATE)
1092
#undef DEFINE_MS_RPC_QPS_ON_UPDATE
1093
1094
DEFINE_ON_UPDATE(ms_backpressure_upgrade_interval_ms, [](int32_t old_val, int32_t new_val) {
1095
    if (old_val != new_val) {
1096
        refresh_ms_backpressure_coordinator_params();
1097
    }
1098
});
1099
DEFINE_ON_UPDATE(ms_backpressure_downgrade_interval_ms, [](int32_t old_val, int32_t new_val) {
1100
    if (old_val != new_val) {
1101
        refresh_ms_backpressure_coordinator_params();
1102
    }
1103
});
1104
DEFINE_ON_UPDATE(ms_backpressure_upgrade_top_k, [](int32_t old_val, int32_t new_val) {
1105
    if (old_val != new_val) {
1106
        refresh_ms_backpressure_throttle_params();
1107
    }
1108
});
1109
DEFINE_ON_UPDATE(ms_backpressure_throttle_ratio, [](double old_val, double new_val) {
1110
    if (old_val != new_val) {
1111
        refresh_ms_backpressure_throttle_params();
1112
    }
1113
});
1114
DEFINE_ON_UPDATE(ms_rpc_table_qps_limit_floor, [](double old_val, double new_val) {
1115
    if (old_val != new_val) {
1116
        refresh_ms_backpressure_throttle_params();
1117
    }
1118
});
1119
} // namespace doris::config