Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/storage_engine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <bvar/bvar.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <atomic>
27
#include <condition_variable>
28
#include <cstdint>
29
#include <ctime>
30
#include <map>
31
#include <memory>
32
#include <mutex>
33
#include <set>
34
#include <shared_mutex>
35
#include <string>
36
#include <unordered_map>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "agent/task_worker_pool.h"
41
#include "common/config.h"
42
#include "common/status.h"
43
#include "runtime/heartbeat_flags.h"
44
#include "storage/compaction/compaction_permit_limiter.h"
45
#include "storage/delete/calc_delete_bitmap_executor.h"
46
#include "storage/olap_common.h"
47
#include "storage/options.h"
48
#include "storage/rowset/pending_rowset_helper.h"
49
#include "storage/rowset/rowset_fwd.h"
50
#include "storage/segment/segment.h"
51
#include "storage/tablet/tablet_fwd.h"
52
#include "storage/task/index_builder.h"
53
#include "util/countdown_latch.h"
54
55
namespace doris {
56
57
class DataDir;
58
class EngineTask;
59
class MemTableFlushExecutor;
60
class SegcompactionWorker;
61
class BaseCompaction;
62
class CumulativeCompaction;
63
class SingleReplicaCompaction;
64
class CumulativeCompactionPolicy;
65
class StreamLoadRecorder;
66
class TCloneReq;
67
class TCreateTabletReq;
68
class TabletManager;
69
class Thread;
70
class ThreadPool;
71
class TxnManager;
72
class ReportWorker;
73
class CreateTabletRRIdxCache;
74
struct DirInfo;
75
class SnapshotManager;
76
class WorkloadGroup;
77
78
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
79
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
80
using CumuCompactionPolicyTable =
81
        std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>;
82
83
class StorageEngine;
84
class CloudStorageEngine;
85
86
extern bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap;
87
extern bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap_version;
88
89
// StorageEngine singleton to manage all Table pointers.
90
// Providing add/drop/get operations.
91
// StorageEngine instance doesn't own the Table resources, just hold the pointer,
92
// allocation/deallocation must be done outside.
93
class BaseStorageEngine {
94
protected:
95
    enum Type : uint8_t {
96
        LOCAL, // Shared-nothing integrated compute and storage architecture
97
        CLOUD, // Separating compute and storage architecture
98
    };
99
    Type _type;
100
101
public:
102
    BaseStorageEngine(Type type, const UniqueId& backend_uid);
103
    virtual ~BaseStorageEngine();
104
105
    StorageEngine& to_local();
106
    CloudStorageEngine& to_cloud();
107
108
    virtual Status open() = 0;
109
    virtual void stop() = 0;
110
    virtual bool stopped() = 0;
111
112
    // start all background threads. This should be call after env is ready.
113
    virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) = 0;
114
115
    /* Parameters:
116
     * - tablet_id: the id of tablet to get
117
     * - sync_stats: the stats of sync rowset
118
     * - force_use_only_cached: whether only use cached tablet meta
119
     * - cache_on_miss: whether cache the tablet meta when missing in cache
120
     */
121
    virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
122
                                              SyncRowsetStats* sync_stats = nullptr,
123
                                              bool force_use_only_cached = false,
124
                                              bool cache_on_miss = true) = 0;
125
126
    virtual Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
127
                                   bool force_use_only_cached = false) = 0;
128
129
    void register_report_listener(ReportWorker* listener);
130
    void deregister_report_listener(ReportWorker* listener);
131
    void notify_listeners();
132
    bool notify_listener(std::string_view name);
133
134
0
    void set_heartbeat_flags(HeartbeatFlags* heartbeat_flags) {
135
0
        _heartbeat_flags = heartbeat_flags;
136
0
    }
137
    virtual Status set_cluster_id(int32_t cluster_id) = 0;
138
0
    int32_t effective_cluster_id() const { return _effective_cluster_id; }
139
140
    RowsetId next_rowset_id();
141
142
43
    MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }
143
28
    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
144
28
        return _calc_delete_bitmap_executor.get();
145
28
    }
146
147
8
    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor_for_load() {
148
8
        return _calc_delete_bitmap_executor_for_load.get();
149
8
    }
150
151
    void add_quering_rowset(RowsetSharedPtr rs);
152
153
    RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
154
155
    int64_t memory_limitation_bytes_per_thread_for_schema_change() const;
156
157
0
    int get_disk_num() { return _disk_num; }
158
159
    Status init_stream_load_recorder(const std::string& stream_load_record_path);
160
161
0
    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
162
0
        return _stream_load_recorder;
163
0
    }
164
165
protected:
166
    void _evict_querying_rowset();
167
    void _evict_quring_rowset_thread_callback();
168
    bool _should_delay_large_task();
169
170
    int32_t _effective_cluster_id = -1;
171
    HeartbeatFlags* _heartbeat_flags = nullptr;
172
173
    // For task, tablet and disk report
174
    std::mutex _report_mtx;
175
    std::vector<ReportWorker*> _report_listeners;
176
177
    std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
178
    std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
179
    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
180
    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor_for_load;
181
    CountDownLatch _stop_background_threads_latch;
182
183
    // Hold reference of quering rowsets
184
    std::mutex _quering_rowsets_mutex;
185
    std::unordered_map<RowsetId, RowsetSharedPtr> _querying_rowsets;
186
    std::shared_ptr<Thread> _evict_quering_rowset_thread;
187
188
    int64_t _memory_limitation_bytes_for_schema_change;
189
190
    int _disk_num {-1};
191
192
    std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
193
194
    std::shared_ptr<bvar::Status<size_t>> _tablet_max_delete_bitmap_score_metrics;
195
    std::shared_ptr<bvar::Status<size_t>> _tablet_max_base_rowset_delete_bitmap_score_metrics;
196
197
    std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
198
    std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
199
    int _cumu_compaction_thread_pool_used_threads {0};
200
    int _cumu_compaction_thread_pool_small_tasks_running {0};
201
};
202
203
class CompactionSubmitRegistry {
204
    using TabletSet = std::unordered_set<TabletSharedPtr>;
205
    using Registry = std::map<DataDir*, TabletSet>;
206
207
public:
208
338
    CompactionSubmitRegistry() = default;
209
    CompactionSubmitRegistry(CompactionSubmitRegistry&& r);
210
211
    // create a snapshot for current registry, operations to the snapshot can be lock-free.
212
    CompactionSubmitRegistry create_snapshot();
213
214
    void reset(const std::vector<DataDir*>& stores);
215
216
    uint32_t count_executing_compaction(DataDir* dir, CompactionType compaction_type);
217
    uint32_t count_executing_cumu_and_base(DataDir* dir);
218
219
    bool has_compaction_task(DataDir* dir, CompactionType compaction_type);
220
221
    bool insert(TabletSharedPtr tablet, CompactionType compaction_type);
222
223
    void remove(TabletSharedPtr tablet, CompactionType compaction_type,
224
                std::function<void()> wakeup_cb);
225
226
    void jsonfy_compaction_status(std::string* result);
227
228
    std::vector<TabletSharedPtr> pick_topn_tablets_for_compaction(
229
            TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
230
            const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score);
231
232
private:
233
    TabletSet& _get_tablet_set(DataDir* dir, CompactionType compaction_type);
234
235
    std::mutex _tablet_submitted_compaction_mutex;
236
    Registry _tablet_submitted_cumu_compaction;
237
    Registry _tablet_submitted_base_compaction;
238
    Registry _tablet_submitted_full_compaction;
239
};
240
241
class StorageEngine final : public BaseStorageEngine {
242
public:
243
    StorageEngine(const EngineOptions& options);
244
    ~StorageEngine() override;
245
246
    Status open() override;
247
248
    Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile);
249
250
    /* Parameters:
251
     * - tablet_id: the id of tablet to get
252
     * - sync_stats: the stats of sync rowset
253
     * - force_use_only_cached: whether only use cached tablet meta
254
     * - cache_on_miss: whether cache the tablet meta when missing in cache
255
     */
256
    Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
257
                                      bool force_use_only_cached = false,
258
                                      bool cache_on_miss = true) override;
259
260
    Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
261
                           bool force_use_only_cached = false) override;
262
263
    void clear_transaction_task(const TTransactionId transaction_id);
264
    void clear_transaction_task(const TTransactionId transaction_id,
265
                                const std::vector<TPartitionId>& partition_ids);
266
267
    std::vector<DataDir*> get_stores(bool include_unused = false);
268
269
    // get all info of root_path
270
    Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, bool need_update);
271
272
    static int64_t get_file_or_directory_size(const std::string& file_path);
273
274
    // get root path for creating tablet. The returned vector of root path should be round robin,
275
    // for avoiding that all the tablet would be deployed one disk.
276
    std::vector<DataDir*> get_stores_for_create_tablet(int64_t partition_id,
277
                                                       TStorageMedium::type storage_medium);
278
279
    DataDir* get_store(const std::string& path);
280
281
0
    uint32_t available_storage_medium_type_count() const {
282
0
        return _available_storage_medium_type_count;
283
0
    }
284
285
    Status set_cluster_id(int32_t cluster_id) override;
286
287
    void start_delete_unused_rowset();
288
    void add_unused_rowset(RowsetSharedPtr rowset);
289
    using DeleteBitmapKeyRanges =
290
            std::vector<std::tuple<DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>;
291
    void add_unused_delete_bitmap_key_ranges(int64_t tablet_id,
292
                                             const std::vector<RowsetId>& rowsets,
293
                                             const DeleteBitmapKeyRanges& key_ranges);
294
295
    // Obtain shard path for new tablet.
296
    //
297
    // @param [out] shard_path choose an available root_path to clone new tablet
298
    // @return error code
299
    Status obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash,
300
                             std::string* shared_path, DataDir** store, int64_t partition_id);
301
302
    // Load new tablet to make it effective.
303
    //
304
    // @param [in] root_path specify root path of new tablet
305
    // @param [in] request specify new tablet info
306
    // @param [in] restore whether we're restoring a tablet from trash
307
    // @return OK if load tablet success
308
    Status load_header(const std::string& shard_path, const TCloneReq& request,
309
                       bool restore = false);
310
311
2.28k
    TabletManager* tablet_manager() { return _tablet_manager.get(); }
312
146
    TxnManager* txn_manager() { return _txn_manager.get(); }
313
10
    SnapshotManager* snapshot_mgr() { return _snapshot_mgr.get(); }
314
    // Rowset garbage collection helpers
315
    bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
316
557
    PendingRowsetSet& pending_local_rowsets() { return _pending_local_rowsets; }
317
5
    PendingRowsetSet& pending_remote_rowsets() { return _pending_remote_rowsets; }
318
    PendingRowsetGuard add_pending_rowset(const RowsetWriterContext& ctx);
319
320
0
    RowsetTypePB default_rowset_type() const {
321
0
        if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
322
0
            return BETA_ROWSET;
323
0
        }
324
0
        return _default_rowset_type;
325
0
    }
326
327
    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
328
329
    // clear trash and snapshot file
330
    // option: update disk usage after sweep
331
    Status start_trash_sweep(double* usage, bool ignore_guard = false);
332
333
    // Must call stop() before storage_engine is deconstructed
334
    void stop() override;
335
336
    void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
337
                                    PGetTabletVersionsResponse* response);
338
339
    bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token);
340
341
    bool get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends);
342
343
    bool should_fetch_from_peer(int64_t tablet_id);
344
345
0
    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
346
0
        return _stream_load_recorder;
347
0
    }
348
349
    void get_compaction_status_json(std::string* result);
350
351
    Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
352
                                  bool force, bool eager = true);
353
    Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
354
                                      SegCompactionCandidatesSharedPtr segments);
355
356
0
    ThreadPool* tablet_publish_txn_thread_pool() { return _tablet_publish_txn_thread_pool.get(); }
357
8.83k
    bool stopped() override { return _stopped; }
358
359
    Status process_index_change_task(const TAlterInvertedIndexReq& reqest);
360
361
    void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
362
363
    void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
364
                                int64_t transaction_id, bool is_recover);
365
    int64_t get_pending_publish_min_version(int64_t tablet_id);
366
367
    bool add_broken_path(std::string path);
368
    bool remove_broken_path(std::string path);
369
370
7
    std::set<std::string> get_broken_paths() { return _broken_paths; }
371
372
    Status submit_clone_task(Tablet* tablet, int64_t version);
373
374
    std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
375
376
12
    int64_t get_compaction_num_per_round() const { return _compaction_num_per_round; }
377
378
private:
379
    // Instance should be inited from `static open()`
380
    // MUST NOT be called in other circumstances.
381
    Status _open();
382
383
    Status _init_store_map();
384
385
    void _update_storage_medium_type_count();
386
387
    // Some check methods
388
    Status _check_file_descriptor_number();
389
    Status _check_all_root_path_cluster_id();
390
    Status _judge_and_update_effective_cluster_id(int32_t cluster_id);
391
392
    void _exit_if_too_many_disks_are_failed();
393
394
    void _clean_unused_txns();
395
396
    void _clean_unused_rowset_metas();
397
398
    void _clean_unused_binlog_metas();
399
400
    void _clean_unused_delete_bitmap();
401
402
    void _clean_unused_pending_publish_info();
403
404
    void _clean_unused_partial_update_info();
405
406
    Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
407
                     const int32_t expire);
408
409
    // All these xxx_callback() functions are for Background threads
410
    // unused rowset monitor thread
411
    void _unused_rowset_monitor_thread_callback();
412
413
    // garbage sweep thread process function. clear snapshot and trash folder
414
    void _garbage_sweeper_thread_callback();
415
416
    // delete tablet with io error process function
417
    void _disk_stat_monitor_thread_callback();
418
419
    // path gc process function
420
    void _path_gc_thread_callback(DataDir* data_dir);
421
422
    void _tablet_path_check_callback();
423
424
    void _tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs);
425
426
    // parse the default rowset type config to RowsetTypePB
427
    void _parse_default_rowset_type();
428
429
    // Disk status monitoring. Monitoring unused_flag Road King's new corresponding root_path unused flag,
430
    // When the unused mark is detected, the corresponding table information is deleted from the memory, and the disk data does not move.
431
    // When the disk status is unusable, but the unused logo is not _push_tablet_into_submitted_compactiondetected, you need to download it from root_path
432
    // Reload the data.
433
    void _start_disk_stat_monitor();
434
435
    void _compaction_tasks_producer_callback();
436
437
    void _update_replica_infos_callback();
438
439
    std::vector<TabletSharedPtr> _generate_compaction_tasks(CompactionType compaction_type,
440
                                                            std::vector<DataDir*>& data_dirs,
441
                                                            bool check_score);
442
    void _update_cumulative_compaction_policy();
443
444
    void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
445
                                               CompactionType compaction_type);
446
447
    Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
448
                                   bool force);
449
450
    void _handle_compaction(TabletSharedPtr tablet, std::shared_ptr<CompactionMixin> compaction,
451
                            CompactionType compaction_type, int64_t permits, bool force);
452
453
    Status _submit_single_replica_compaction_task(TabletSharedPtr tablet,
454
                                                  CompactionType compaction_type);
455
456
    void _adjust_compaction_thread_num();
457
458
    void _cooldown_tasks_producer_callback();
459
    void _remove_unused_remote_files_callback();
460
    void do_remove_unused_remote_files();
461
    void _cold_data_compaction_producer_callback();
462
    void _handle_cold_data_compaction(TabletSharedPtr tablet);
463
    void _follow_cooldown_meta(TabletSharedPtr tablet);
464
465
    Status _handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
466
                                  SegCompactionCandidatesSharedPtr segments,
467
                                  uint64_t submission_time);
468
469
    Status _handle_index_change(IndexBuilderSharedPtr index_builder);
470
471
    void _gc_binlogs(int64_t tablet_id, int64_t version);
472
473
    void _async_publish_callback();
474
475
    void _process_async_publish();
476
477
    Status _persist_broken_paths();
478
479
    bool _increase_low_priority_task_nums(DataDir* dir);
480
481
    void _decrease_low_priority_task_nums(DataDir* dir);
482
483
    void _get_candidate_stores(TStorageMedium::type storage_medium,
484
                               std::vector<DirInfo>& dir_infos);
485
486
    int _get_and_set_next_disk_index(int64_t partition_id, TStorageMedium::type storage_medium);
487
488
    int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir);
489
490
    void _check_tablet_delete_bitmap_score_callback();
491
492
private:
493
    EngineOptions _options;
494
    std::mutex _store_lock;
495
    std::mutex _trash_sweep_lock;
496
    std::map<std::string, std::unique_ptr<DataDir>> _store_map;
497
    std::set<std::string> _broken_paths;
498
    std::mutex _broken_paths_mutex;
499
500
    uint32_t _available_storage_medium_type_count;
501
502
    bool _is_all_cluster_id_exist;
503
504
    std::atomic_bool _stopped {false};
505
506
    std::mutex _gc_mutex;
507
    std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
508
    // tablet_id, unused_rowsets, [start_version, end_version]
509
    std::vector<std::tuple<int64_t, std::vector<RowsetId>, DeleteBitmapKeyRanges>>
510
            _unused_delete_bitmap;
511
    PendingRowsetSet _pending_local_rowsets;
512
    PendingRowsetSet _pending_remote_rowsets;
513
514
    std::shared_ptr<Thread> _unused_rowset_monitor_thread;
515
    // thread to monitor snapshot expiry
516
    std::shared_ptr<Thread> _garbage_sweeper_thread;
517
    // thread to monitor disk stat
518
    std::shared_ptr<Thread> _disk_stat_monitor_thread;
519
    // thread to produce both base and cumulative compaction tasks
520
    std::shared_ptr<Thread> _compaction_tasks_producer_thread;
521
    std::shared_ptr<Thread> _update_replica_infos_thread;
522
    std::shared_ptr<Thread> _cache_clean_thread;
523
    // threads to clean all file descriptor not actively in use
524
    std::vector<std::shared_ptr<Thread>> _path_gc_threads;
525
    // thread to produce tablet checkpoint tasks
526
    std::shared_ptr<Thread> _tablet_checkpoint_tasks_producer_thread;
527
    // thread to check tablet path
528
    std::shared_ptr<Thread> _tablet_path_check_thread;
529
    // thread to clean tablet lookup cache
530
    std::shared_ptr<Thread> _lookup_cache_clean_thread;
531
532
    std::mutex _engine_task_mutex;
533
534
    std::unique_ptr<TabletManager> _tablet_manager;
535
    std::unique_ptr<TxnManager> _txn_manager;
536
537
    // Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue.
538
    // Type of new loaded data
539
    RowsetTypePB _default_rowset_type;
540
541
    std::unique_ptr<ThreadPool> _single_replica_compaction_thread_pool;
542
543
    std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
544
    std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
545
546
    std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
547
548
    std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
549
550
    CompactionPermitLimiter _permit_limiter;
551
552
    CompactionSubmitRegistry _compaction_submit_registry;
553
554
    std::mutex _low_priority_task_nums_mutex;
555
    std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
556
557
    std::mutex _peer_replica_infos_mutex;
558
    // key: tabletId
559
    std::unordered_map<int64_t, TReplicaInfo> _peer_replica_infos;
560
    std::string _token;
561
562
    std::atomic<int32_t> _wakeup_producer_flag {0};
563
564
    std::mutex _compaction_producer_sleep_mutex;
565
    std::condition_variable _compaction_producer_sleep_cv;
566
567
    // we use unordered_map to store all cumulative compaction policy sharded ptr
568
    CumuCompactionPolicyTable _cumulative_compaction_policies;
569
570
    std::shared_ptr<Thread> _cooldown_tasks_producer_thread;
571
    std::shared_ptr<Thread> _remove_unused_remote_files_thread;
572
    std::shared_ptr<Thread> _cold_data_compaction_producer_thread;
573
574
    std::shared_ptr<Thread> _cache_file_cleaner_tasks_producer_thread;
575
576
    std::unique_ptr<PriorityThreadPool> _cooldown_thread_pool;
577
578
    std::mutex _running_cooldown_mutex;
579
    std::unordered_set<int64_t> _running_cooldown_tablets;
580
581
    std::mutex _cold_compaction_tablet_submitted_mtx;
582
    std::unordered_set<int64_t> _cold_compaction_tablet_submitted;
583
584
    std::mutex _cumu_compaction_delay_mtx;
585
586
    // tablet_id, publish_version, transaction_id, partition_id
587
    std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
588
    // aync publish for discontinuous versions of merge_on_write table
589
    std::shared_ptr<Thread> _async_publish_thread;
590
    std::shared_mutex _async_publish_lock;
591
592
    std::atomic<bool> _need_clean_trash {false};
593
594
    // next index for create tablet
595
    std::map<TStorageMedium::type, int> _last_use_index;
596
597
    std::unique_ptr<CreateTabletRRIdxCache> _create_tablet_idx_lru_cache;
598
599
    std::unique_ptr<SnapshotManager> _snapshot_mgr;
600
601
    // thread to check tablet delete bitmap count tasks
602
    std::shared_ptr<Thread> _check_delete_bitmap_score_thread;
603
604
    int64_t _last_get_peers_replica_backends_time_ms {0};
605
606
    int64_t _compaction_num_per_round {1};
607
};
608
609
// lru cache for create tabelt round robin in disks
610
// key: partitionId_medium
611
// value: index
612
class CreateTabletRRIdxCache : public LRUCachePolicy {
613
public:
614
    // get key, delimiter with DELIMITER '-'
615
73
    static std::string get_key(int64_t partition_id, TStorageMedium::type medium) {
616
73
        return fmt::format("{}-{}", partition_id, medium);
617
73
    }
618
619
    // -1 not found key in lru
620
    int get_index(const std::string& key);
621
622
    void set_index(const std::string& key, int next_idx);
623
624
    class CacheValue : public LRUCacheValueBase {
625
    public:
626
        int idx = 0;
627
    };
628
629
    CreateTabletRRIdxCache(size_t capacity)
630
338
            : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity,
631
338
                             LRUCacheType::NUMBER,
632
338
                             /*stale_sweep_time_s*/ 30 * 60, /*num shards*/ 1,
633
338
                             /*element count capacity */ 0,
634
338
                             /*enable prune*/ true, /*is lru-k*/ false) {}
635
};
636
637
struct DirInfo {
638
    DataDir* data_dir;
639
640
    double usage = 0;
641
    int available_level = 0;
642
643
1
    bool operator<(const DirInfo& other) const {
644
1
        if (available_level != other.available_level) {
645
0
            return available_level < other.available_level;
646
0
        }
647
1
        return data_dir->path_hash() < other.data_dir->path_hash();
648
1
    }
649
};
650
651
} // namespace doris