Coverage Report

Created: 2026-04-01 14:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/storage_engine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <bvar/bvar.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <atomic>
27
#include <condition_variable>
28
#include <cstdint>
29
#include <ctime>
30
#include <map>
31
#include <memory>
32
#include <mutex>
33
#include <set>
34
#include <shared_mutex>
35
#include <string>
36
#include <unordered_map>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "agent/task_worker_pool.h"
41
#include "common/config.h"
42
#include "common/status.h"
43
#include "runtime/heartbeat_flags.h"
44
#include "storage/adaptive_thread_pool_controller.h"
45
#include "storage/compaction/compaction_permit_limiter.h"
46
#include "storage/delete/calc_delete_bitmap_executor.h"
47
#include "storage/olap_common.h"
48
#include "storage/options.h"
49
#include "storage/rowset/pending_rowset_helper.h"
50
#include "storage/rowset/rowset_fwd.h"
51
#include "storage/segment/segment.h"
52
#include "storage/tablet/tablet_fwd.h"
53
#include "storage/task/index_builder.h"
54
#include "util/countdown_latch.h"
55
56
namespace doris {
57
58
class DataDir;
59
class EngineTask;
60
class MemTableFlushExecutor;
61
class SegcompactionWorker;
62
class BaseCompaction;
63
class CumulativeCompaction;
64
class SingleReplicaCompaction;
65
class CumulativeCompactionPolicy;
66
class StreamLoadRecorder;
67
class TCloneReq;
68
class TCreateTabletReq;
69
class TabletManager;
70
class Thread;
71
class ThreadPool;
72
class TxnManager;
73
class ReportWorker;
74
class CreateTabletRRIdxCache;
75
struct DirInfo;
76
class SnapshotManager;
77
class WorkloadGroup;
78
79
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
80
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
81
using CumuCompactionPolicyTable =
82
        std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>;
83
84
class StorageEngine;
85
class CloudStorageEngine;
86
87
extern bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap;
88
extern bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap_version;
89
90
// StorageEngine singleton to manage all Table pointers.
91
// Providing add/drop/get operations.
92
// StorageEngine instance doesn't own the Table resources, just hold the pointer,
93
// allocation/deallocation must be done outside.
94
class BaseStorageEngine {
95
protected:
96
    enum Type : uint8_t {
97
        LOCAL, // Shared-nothing integrated compute and storage architecture
98
        CLOUD, // Separating compute and storage architecture
99
    };
100
    Type _type;
101
102
public:
103
    BaseStorageEngine(Type type, const UniqueId& backend_uid);
104
    virtual ~BaseStorageEngine();
105
106
    StorageEngine& to_local();
107
    CloudStorageEngine& to_cloud();
108
109
    virtual Status open() = 0;
110
    virtual void stop() = 0;
111
    virtual bool stopped() = 0;
112
113
    // start all background threads. This should be call after env is ready.
114
    virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) = 0;
115
116
    /* Parameters:
117
     * - tablet_id: the id of tablet to get
118
     * - sync_stats: the stats of sync rowset
119
     * - force_use_only_cached: whether only use cached tablet meta
120
     * - cache_on_miss: whether cache the tablet meta when missing in cache
121
     */
122
    virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
123
                                              SyncRowsetStats* sync_stats = nullptr,
124
                                              bool force_use_only_cached = false,
125
                                              bool cache_on_miss = true) = 0;
126
127
    virtual Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
128
                                   bool force_use_only_cached = false) = 0;
129
130
    void register_report_listener(ReportWorker* listener);
131
    void deregister_report_listener(ReportWorker* listener);
132
    void notify_listeners();
133
    bool notify_listener(std::string_view name);
134
135
7
    void set_heartbeat_flags(HeartbeatFlags* heartbeat_flags) {
136
7
        _heartbeat_flags = heartbeat_flags;
137
7
    }
138
    virtual Status set_cluster_id(int32_t cluster_id) = 0;
139
7
    int32_t effective_cluster_id() const { return _effective_cluster_id; }
140
141
    RowsetId next_rowset_id();
142
143
174k
    MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }
144
52
    AdaptiveThreadPoolController* adaptive_thread_controller() {
145
52
        return &_adaptive_thread_controller;
146
52
    }
147
185k
    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
148
185k
        return _calc_delete_bitmap_executor.get();
149
185k
    }
150
151
57.0k
    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor_for_load() {
152
57.0k
        return _calc_delete_bitmap_executor_for_load.get();
153
57.0k
    }
154
155
    void add_quering_rowset(RowsetSharedPtr rs);
156
157
    RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
158
159
    int64_t memory_limitation_bytes_per_thread_for_schema_change() const;
160
161
5.43k
    int get_disk_num() { return _disk_num; }
162
163
    Status init_stream_load_recorder(const std::string& stream_load_record_path);
164
165
2.27k
    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
166
2.27k
        return _stream_load_recorder;
167
2.27k
    }
168
169
protected:
170
    void _start_adaptive_thread_controller();
171
    void _evict_querying_rowset();
172
    void _evict_quring_rowset_thread_callback();
173
    bool _should_delay_large_task();
174
175
    int32_t _effective_cluster_id = -1;
176
    HeartbeatFlags* _heartbeat_flags = nullptr;
177
178
    // For task, tablet and disk report
179
    std::mutex _report_mtx;
180
    std::vector<ReportWorker*> _report_listeners;
181
182
    std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
183
    std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
184
    AdaptiveThreadPoolController _adaptive_thread_controller;
185
    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
186
    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor_for_load;
187
    CountDownLatch _stop_background_threads_latch;
188
189
    // Hold reference of quering rowsets
190
    std::mutex _quering_rowsets_mutex;
191
    std::unordered_map<RowsetId, RowsetSharedPtr> _querying_rowsets;
192
    std::shared_ptr<Thread> _evict_quering_rowset_thread;
193
194
    int64_t _memory_limitation_bytes_for_schema_change;
195
196
    int _disk_num {-1};
197
198
    std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
199
200
    std::shared_ptr<bvar::Status<size_t>> _tablet_max_delete_bitmap_score_metrics;
201
    std::shared_ptr<bvar::Status<size_t>> _tablet_max_base_rowset_delete_bitmap_score_metrics;
202
203
    std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
204
    std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
205
    int _cumu_compaction_thread_pool_used_threads {0};
206
    int _cumu_compaction_thread_pool_small_tasks_running {0};
207
};
208
209
class CompactionSubmitRegistry {
210
    using TabletSet = std::unordered_set<TabletSharedPtr>;
211
    using Registry = std::map<DataDir*, TabletSet>;
212
213
public:
214
8.60k
    CompactionSubmitRegistry() = default;
215
    CompactionSubmitRegistry(CompactionSubmitRegistry&& r);
216
217
    // create a snapshot for current registry, operations to the snapshot can be lock-free.
218
    CompactionSubmitRegistry create_snapshot();
219
220
    void reset(const std::vector<DataDir*>& stores);
221
222
    uint32_t count_executing_compaction(DataDir* dir, CompactionType compaction_type);
223
    uint32_t count_executing_cumu_and_base(DataDir* dir);
224
225
    bool has_compaction_task(DataDir* dir, CompactionType compaction_type);
226
227
    bool insert(TabletSharedPtr tablet, CompactionType compaction_type);
228
229
    void remove(TabletSharedPtr tablet, CompactionType compaction_type,
230
                std::function<void()> wakeup_cb);
231
232
    void jsonfy_compaction_status(std::string* result);
233
234
    std::vector<TabletSharedPtr> pick_topn_tablets_for_compaction(
235
            TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
236
            const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score);
237
238
private:
239
    TabletSet& _get_tablet_set(DataDir* dir, CompactionType compaction_type);
240
241
    std::mutex _tablet_submitted_compaction_mutex;
242
    Registry _tablet_submitted_cumu_compaction;
243
    Registry _tablet_submitted_base_compaction;
244
    Registry _tablet_submitted_full_compaction;
245
};
246
247
class StorageEngine final : public BaseStorageEngine {
248
public:
249
    StorageEngine(const EngineOptions& options);
250
    ~StorageEngine() override;
251
252
    Status open() override;
253
254
    Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile);
255
256
    /* Parameters:
257
     * - tablet_id: the id of tablet to get
258
     * - sync_stats: the stats of sync rowset
259
     * - force_use_only_cached: whether only use cached tablet meta
260
     * - cache_on_miss: whether cache the tablet meta when missing in cache
261
     */
262
    Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
263
                                      bool force_use_only_cached = false,
264
                                      bool cache_on_miss = true) override;
265
266
    Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
267
                           bool force_use_only_cached = false) override;
268
269
    void clear_transaction_task(const TTransactionId transaction_id);
270
    void clear_transaction_task(const TTransactionId transaction_id,
271
                                const std::vector<TPartitionId>& partition_ids);
272
273
    std::vector<DataDir*> get_stores(bool include_unused = false);
274
275
    // get all info of root_path
276
    Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, bool need_update);
277
278
    static int64_t get_file_or_directory_size(const std::string& file_path);
279
280
    // get root path for creating tablet. The returned vector of root path should be round robin,
281
    // for avoiding that all the tablet would be deployed one disk.
282
    std::vector<DataDir*> get_stores_for_create_tablet(int64_t partition_id,
283
                                                       TStorageMedium::type storage_medium);
284
285
    DataDir* get_store(const std::string& path);
286
287
0
    uint32_t available_storage_medium_type_count() const {
288
0
        return _available_storage_medium_type_count;
289
0
    }
290
291
    Status set_cluster_id(int32_t cluster_id) override;
292
293
    void start_delete_unused_rowset();
294
    void add_unused_rowset(RowsetSharedPtr rowset);
295
    using DeleteBitmapKeyRanges =
296
            std::vector<std::tuple<DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>;
297
    void add_unused_delete_bitmap_key_ranges(int64_t tablet_id,
298
                                             const std::vector<RowsetId>& rowsets,
299
                                             const DeleteBitmapKeyRanges& key_ranges);
300
301
    // Obtain shard path for new tablet.
302
    //
303
    // @param [out] shard_path choose an available root_path to clone new tablet
304
    // @return error code
305
    Status obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash,
306
                             std::string* shared_path, DataDir** store, int64_t partition_id);
307
308
    // Load new tablet to make it effective.
309
    //
310
    // @param [in] root_path specify root path of new tablet
311
    // @param [in] request specify new tablet info
312
    // @param [in] restore whether we're restoring a tablet from trash
313
    // @return OK if load tablet success
314
    Status load_header(const std::string& shard_path, const TCloneReq& request,
315
                       bool restore = false);
316
317
1.33M
    TabletManager* tablet_manager() { return _tablet_manager.get(); }
318
88.0k
    TxnManager* txn_manager() { return _txn_manager.get(); }
319
822
    SnapshotManager* snapshot_mgr() { return _snapshot_mgr.get(); }
320
    // Rowset garbage collection helpers
321
    bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
322
99.0k
    PendingRowsetSet& pending_local_rowsets() { return _pending_local_rowsets; }
323
5
    PendingRowsetSet& pending_remote_rowsets() { return _pending_remote_rowsets; }
324
    PendingRowsetGuard add_pending_rowset(const RowsetWriterContext& ctx);
325
326
0
    RowsetTypePB default_rowset_type() const {
327
0
        if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
328
0
            return BETA_ROWSET;
329
0
        }
330
0
        return _default_rowset_type;
331
0
    }
332
333
    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
334
335
    // clear trash and snapshot file
336
    // option: update disk usage after sweep
337
    Status start_trash_sweep(double* usage, bool ignore_guard = false);
338
339
    // Must call stop() before storage_engine is deconstructed
340
    void stop() override;
341
342
    void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
343
                                    PGetTabletVersionsResponse* response);
344
345
    bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token);
346
347
    bool get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends);
348
349
    bool should_fetch_from_peer(int64_t tablet_id);
350
351
71
    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
352
71
        return _stream_load_recorder;
353
71
    }
354
355
    void get_compaction_status_json(std::string* result);
356
357
    Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
358
                                  bool force, bool eager = true);
359
    Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
360
                                      SegCompactionCandidatesSharedPtr segments);
361
362
2.44k
    ThreadPool* tablet_publish_txn_thread_pool() { return _tablet_publish_txn_thread_pool.get(); }
363
14.4k
    bool stopped() override { return _stopped; }
364
365
    Status process_index_change_task(const TAlterInvertedIndexReq& reqest);
366
367
    void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
368
369
    void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
370
                                int64_t transaction_id, bool is_recover);
371
    int64_t get_pending_publish_min_version(int64_t tablet_id);
372
373
    bool add_broken_path(std::string path);
374
    bool remove_broken_path(std::string path);
375
376
159
    std::set<std::string> get_broken_paths() { return _broken_paths; }
377
378
    Status submit_clone_task(Tablet* tablet, int64_t version);
379
380
    std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
381
382
9.92k
    int64_t get_compaction_num_per_round() const { return _compaction_num_per_round; }
383
384
private:
385
    // Instance should be inited from `static open()`
386
    // MUST NOT be called in other circumstances.
387
    Status _open();
388
389
    Status _init_store_map();
390
391
    void _update_storage_medium_type_count();
392
393
    // Some check methods
394
    Status _check_file_descriptor_number();
395
    Status _check_all_root_path_cluster_id();
396
    Status _judge_and_update_effective_cluster_id(int32_t cluster_id);
397
398
    void _exit_if_too_many_disks_are_failed();
399
400
    void _clean_unused_txns();
401
402
    void _clean_unused_rowset_metas();
403
404
    void _clean_unused_binlog_metas();
405
406
    void _clean_unused_delete_bitmap();
407
408
    void _clean_unused_pending_publish_info();
409
410
    void _clean_unused_partial_update_info();
411
412
    Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
413
                     const int32_t expire);
414
415
    // All these xxx_callback() functions are for Background threads
416
    // unused rowset monitor thread
417
    void _unused_rowset_monitor_thread_callback();
418
419
    // garbage sweep thread process function. clear snapshot and trash folder
420
    void _garbage_sweeper_thread_callback();
421
422
    // delete tablet with io error process function
423
    void _disk_stat_monitor_thread_callback();
424
425
    // path gc process function
426
    void _path_gc_thread_callback(DataDir* data_dir);
427
428
    void _tablet_path_check_callback();
429
430
    void _tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs);
431
432
    // parse the default rowset type config to RowsetTypePB
433
    void _parse_default_rowset_type();
434
435
    // Disk status monitoring. Monitoring unused_flag Road King's new corresponding root_path unused flag,
436
    // When the unused mark is detected, the corresponding table information is deleted from the memory, and the disk data does not move.
437
    // When the disk status is unusable, but the unused logo is not _push_tablet_into_submitted_compactiondetected, you need to download it from root_path
438
    // Reload the data.
439
    void _start_disk_stat_monitor();
440
441
    void _compaction_tasks_producer_callback();
442
443
    void _update_replica_infos_callback();
444
445
    std::vector<TabletSharedPtr> _generate_compaction_tasks(CompactionType compaction_type,
446
                                                            std::vector<DataDir*>& data_dirs,
447
                                                            bool check_score);
448
    void _update_cumulative_compaction_policy();
449
450
    void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
451
                                               CompactionType compaction_type);
452
453
    Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
454
                                   bool force);
455
456
    void _handle_compaction(TabletSharedPtr tablet, std::shared_ptr<CompactionMixin> compaction,
457
                            CompactionType compaction_type, int64_t permits, bool force);
458
459
    Status _submit_single_replica_compaction_task(TabletSharedPtr tablet,
460
                                                  CompactionType compaction_type);
461
462
    void _adjust_compaction_thread_num();
463
464
    void _cooldown_tasks_producer_callback();
465
    void _remove_unused_remote_files_callback();
466
    void do_remove_unused_remote_files();
467
    void _cold_data_compaction_producer_callback();
468
    void _handle_cold_data_compaction(TabletSharedPtr tablet);
469
    void _follow_cooldown_meta(TabletSharedPtr tablet);
470
471
    Status _handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
472
                                  SegCompactionCandidatesSharedPtr segments,
473
                                  uint64_t submission_time);
474
475
    Status _handle_index_change(IndexBuilderSharedPtr index_builder);
476
477
    void _gc_binlogs(int64_t tablet_id, int64_t version);
478
479
    void _async_publish_callback();
480
481
    void _process_async_publish();
482
483
    Status _persist_broken_paths();
484
485
    bool _increase_low_priority_task_nums(DataDir* dir);
486
487
    void _decrease_low_priority_task_nums(DataDir* dir);
488
489
    void _get_candidate_stores(TStorageMedium::type storage_medium,
490
                               std::vector<DirInfo>& dir_infos);
491
492
    int _get_and_set_next_disk_index(int64_t partition_id, TStorageMedium::type storage_medium);
493
494
    int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir);
495
496
    void _check_tablet_delete_bitmap_score_callback();
497
498
private:
499
    EngineOptions _options;
500
    std::mutex _store_lock;
501
    std::mutex _trash_sweep_lock;
502
    std::map<std::string, std::unique_ptr<DataDir>> _store_map;
503
    std::set<std::string> _broken_paths;
504
    std::mutex _broken_paths_mutex;
505
506
    uint32_t _available_storage_medium_type_count;
507
508
    bool _is_all_cluster_id_exist;
509
510
    std::atomic_bool _stopped {false};
511
512
    std::mutex _gc_mutex;
513
    std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
514
    // tablet_id, unused_rowsets, [start_version, end_version]
515
    std::vector<std::tuple<int64_t, std::vector<RowsetId>, DeleteBitmapKeyRanges>>
516
            _unused_delete_bitmap;
517
    PendingRowsetSet _pending_local_rowsets;
518
    PendingRowsetSet _pending_remote_rowsets;
519
520
    std::shared_ptr<Thread> _unused_rowset_monitor_thread;
521
    // thread to monitor snapshot expiry
522
    std::shared_ptr<Thread> _garbage_sweeper_thread;
523
    // thread to monitor disk stat
524
    std::shared_ptr<Thread> _disk_stat_monitor_thread;
525
    // thread to produce both base and cumulative compaction tasks
526
    std::shared_ptr<Thread> _compaction_tasks_producer_thread;
527
    std::shared_ptr<Thread> _update_replica_infos_thread;
528
    std::shared_ptr<Thread> _cache_clean_thread;
529
    // threads to clean all file descriptor not actively in use
530
    std::vector<std::shared_ptr<Thread>> _path_gc_threads;
531
    // thread to produce tablet checkpoint tasks
532
    std::shared_ptr<Thread> _tablet_checkpoint_tasks_producer_thread;
533
    // thread to check tablet path
534
    std::shared_ptr<Thread> _tablet_path_check_thread;
535
    // thread to clean tablet lookup cache
536
    std::shared_ptr<Thread> _lookup_cache_clean_thread;
537
538
    std::mutex _engine_task_mutex;
539
540
    std::unique_ptr<TabletManager> _tablet_manager;
541
    std::unique_ptr<TxnManager> _txn_manager;
542
543
    // Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue.
544
    // Type of new loaded data
545
    RowsetTypePB _default_rowset_type;
546
547
    std::unique_ptr<ThreadPool> _single_replica_compaction_thread_pool;
548
549
    std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
550
    std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
551
552
    std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
553
554
    std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
555
556
    CompactionPermitLimiter _permit_limiter;
557
558
    CompactionSubmitRegistry _compaction_submit_registry;
559
560
    std::mutex _low_priority_task_nums_mutex;
561
    std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
562
563
    std::mutex _peer_replica_infos_mutex;
564
    // key: tabletId
565
    std::unordered_map<int64_t, TReplicaInfo> _peer_replica_infos;
566
    std::string _token;
567
568
    std::atomic<int32_t> _wakeup_producer_flag {0};
569
570
    std::mutex _compaction_producer_sleep_mutex;
571
    std::condition_variable _compaction_producer_sleep_cv;
572
573
    // we use unordered_map to store all cumulative compaction policy sharded ptr
574
    CumuCompactionPolicyTable _cumulative_compaction_policies;
575
576
    std::shared_ptr<Thread> _cooldown_tasks_producer_thread;
577
    std::shared_ptr<Thread> _remove_unused_remote_files_thread;
578
    std::shared_ptr<Thread> _cold_data_compaction_producer_thread;
579
580
    std::shared_ptr<Thread> _cache_file_cleaner_tasks_producer_thread;
581
582
    std::unique_ptr<PriorityThreadPool> _cooldown_thread_pool;
583
584
    std::mutex _running_cooldown_mutex;
585
    std::unordered_set<int64_t> _running_cooldown_tablets;
586
587
    std::mutex _cold_compaction_tablet_submitted_mtx;
588
    std::unordered_set<int64_t> _cold_compaction_tablet_submitted;
589
590
    std::mutex _cumu_compaction_delay_mtx;
591
592
    // tablet_id, publish_version, transaction_id, partition_id
593
    std::map<int64_t, std::map<int64_t, std::pair<int64_t, int64_t>>> _async_publish_tasks;
594
    // aync publish for discontinuous versions of merge_on_write table
595
    std::shared_ptr<Thread> _async_publish_thread;
596
    std::shared_mutex _async_publish_lock;
597
598
    std::atomic<bool> _need_clean_trash {false};
599
600
    // next index for create tablet
601
    std::map<TStorageMedium::type, int> _last_use_index;
602
603
    std::unique_ptr<CreateTabletRRIdxCache> _create_tablet_idx_lru_cache;
604
605
    std::unique_ptr<SnapshotManager> _snapshot_mgr;
606
607
    // thread to check tablet delete bitmap count tasks
608
    std::shared_ptr<Thread> _check_delete_bitmap_score_thread;
609
610
    int64_t _last_get_peers_replica_backends_time_ms {0};
611
612
    int64_t _compaction_num_per_round {1};
613
};
614
615
// lru cache for create tabelt round robin in disks
616
// key: partitionId_medium
617
// value: index
618
class CreateTabletRRIdxCache : public LRUCachePolicy {
619
public:
620
    // get key, delimiter with DELIMITER '-'
621
6.30k
    static std::string get_key(int64_t partition_id, TStorageMedium::type medium) {
622
6.30k
        return fmt::format("{}-{}", partition_id, medium);
623
6.30k
    }
624
625
    // -1 not found key in lru
626
    int get_index(const std::string& key);
627
628
    void set_index(const std::string& key, int next_idx);
629
630
    class CacheValue : public LRUCacheValueBase {
631
    public:
632
        int idx = 0;
633
    };
634
635
    CreateTabletRRIdxCache(size_t capacity)
636
348
            : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity,
637
348
                             LRUCacheType::NUMBER,
638
348
                             /*stale_sweep_time_s*/ 30 * 60, /*num shards*/ 1,
639
348
                             /*element count capacity */ 0,
640
348
                             /*enable prune*/ true, /*is lru-k*/ false) {}
641
};
642
643
struct DirInfo {
644
    DataDir* data_dir;
645
646
    double usage = 0;
647
    int available_level = 0;
648
649
2
    bool operator<(const DirInfo& other) const {
650
2
        if (available_level != other.available_level) {
651
0
            return available_level < other.available_level;
652
0
        }
653
2
        return data_dir->path_hash() < other.data_dir->path_hash();
654
2
    }
655
};
656
657
} // namespace doris