Coverage Report

Created: 2026-06-02 18:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/storage_engine.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <bvar/bvar.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <atomic>
27
#include <condition_variable>
28
#include <cstdint>
29
#include <ctime>
30
#include <map>
31
#include <memory>
32
#include <mutex>
33
#include <set>
34
#include <shared_mutex>
35
#include <string>
36
#include <unordered_map>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "agent/task_worker_pool.h"
41
#include "common/config.h"
42
#include "common/status.h"
43
#include "runtime/heartbeat_flags.h"
44
#include "storage/adaptive_thread_pool_controller.h"
45
#include "storage/compaction/compaction_permit_limiter.h"
46
#include "storage/delete/calc_delete_bitmap_executor.h"
47
#include "storage/olap_common.h"
48
#include "storage/options.h"
49
#include "storage/rowset/pending_rowset_helper.h"
50
#include "storage/rowset/rowset_fwd.h"
51
#include "storage/segment/segment.h"
52
#include "storage/tablet/tablet_fwd.h"
53
#include "storage/task/index_builder.h"
54
#include "util/countdown_latch.h"
55
56
namespace doris {
57
58
class DataDir;
59
class EngineTask;
60
class MemTableFlushExecutor;
61
class SegcompactionWorker;
62
class BaseCompaction;
63
class CumulativeCompaction;
64
class SingleReplicaCompaction;
65
class CumulativeCompactionPolicy;
66
class StreamLoadRecorder;
67
class TCloneReq;
68
class TCreateTabletReq;
69
class TabletManager;
70
class Thread;
71
class ThreadPool;
72
class TxnManager;
73
class ReportWorker;
74
class CreateTabletRRIdxCache;
75
struct DirInfo;
76
class SnapshotManager;
77
class WorkloadGroup;
78
79
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
80
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
81
using CumuCompactionPolicyTable =
82
        std::unordered_map<std::string_view, std::shared_ptr<CumulativeCompactionPolicy>>;
83
84
class StorageEngine;
85
class CloudStorageEngine;
86
87
extern bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap;
88
extern bvar::Status<int64_t> g_max_rowsets_with_useless_delete_bitmap_version;
89
90
// StorageEngine singleton to manage all Table pointers.
91
// Providing add/drop/get operations.
92
// StorageEngine instance doesn't own the Table resources, just hold the pointer,
93
// allocation/deallocation must be done outside.
94
class BaseStorageEngine {
95
protected:
96
    enum Type : uint8_t {
97
        LOCAL, // Shared-nothing integrated compute and storage architecture
98
        CLOUD, // Separating compute and storage architecture
99
    };
100
    Type _type;
101
102
public:
103
    BaseStorageEngine(Type type, const UniqueId& backend_uid);
104
    virtual ~BaseStorageEngine();
105
106
    StorageEngine& to_local();
107
    CloudStorageEngine& to_cloud();
108
109
    virtual Status open() = 0;
110
    virtual void stop() = 0;
111
    virtual bool stopped() = 0;
112
113
    // start all background threads. This should be call after env is ready.
114
    virtual Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) = 0;
115
116
    /* Parameters:
117
     * - tablet_id: the id of tablet to get
118
     * - sync_stats: the stats of sync rowset
119
     * - force_use_only_cached: whether only use cached tablet meta
120
     * - cache_on_miss: whether cache the tablet meta when missing in cache
121
     */
122
    virtual Result<BaseTabletSPtr> get_tablet(int64_t tablet_id,
123
                                              SyncRowsetStats* sync_stats = nullptr,
124
                                              bool force_use_only_cached = false,
125
                                              bool cache_on_miss = true) = 0;
126
127
    virtual Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
128
                                   bool force_use_only_cached = false) = 0;
129
130
    void register_report_listener(ReportWorker* listener);
131
    void deregister_report_listener(ReportWorker* listener);
132
    void notify_listeners();
133
    bool notify_listener(std::string_view name);
134
135
0
    void set_heartbeat_flags(HeartbeatFlags* heartbeat_flags) {
136
0
        _heartbeat_flags = heartbeat_flags;
137
0
    }
138
    virtual Status set_cluster_id(int32_t cluster_id) = 0;
139
0
    int32_t effective_cluster_id() const { return _effective_cluster_id; }
140
141
    RowsetId next_rowset_id();
142
143
21
    MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }
144
0
    AdaptiveThreadPoolController* adaptive_thread_controller() {
145
0
        return &_adaptive_thread_controller;
146
0
    }
147
29
    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() {
148
29
        return _calc_delete_bitmap_executor.get();
149
29
    }
150
151
9
    CalcDeleteBitmapExecutor* calc_delete_bitmap_executor_for_load() {
152
9
        return _calc_delete_bitmap_executor_for_load.get();
153
9
    }
154
155
    void add_quering_rowset(RowsetSharedPtr rs);
156
157
    RowsetSharedPtr get_quering_rowset(RowsetId rs_id);
158
159
    int64_t memory_limitation_bytes_per_thread_for_schema_change() const;
160
161
0
    int get_disk_num() { return _disk_num; }
162
163
    Status init_stream_load_recorder(const std::string& stream_load_record_path);
164
165
0
    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
166
0
        return _stream_load_recorder;
167
0
    }
168
169
protected:
170
    void _start_adaptive_thread_controller();
171
    void _evict_querying_rowset();
172
    void _evict_quring_rowset_thread_callback();
173
    bool _should_delay_large_task();
174
175
    int32_t _effective_cluster_id = -1;
176
    HeartbeatFlags* _heartbeat_flags = nullptr;
177
178
    // For task, tablet and disk report
179
    std::mutex _report_mtx;
180
    std::vector<ReportWorker*> _report_listeners;
181
182
    std::unique_ptr<RowsetIdGenerator> _rowset_id_generator;
183
    std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;
184
    AdaptiveThreadPoolController _adaptive_thread_controller;
185
    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor;
186
    std::unique_ptr<CalcDeleteBitmapExecutor> _calc_delete_bitmap_executor_for_load;
187
    CountDownLatch _stop_background_threads_latch;
188
189
    // Hold reference of quering rowsets
190
    std::mutex _quering_rowsets_mutex;
191
    std::unordered_map<RowsetId, RowsetSharedPtr> _querying_rowsets;
192
    std::shared_ptr<Thread> _evict_quering_rowset_thread;
193
194
    int64_t _memory_limitation_bytes_for_schema_change;
195
196
    int _disk_num {-1};
197
198
    std::shared_ptr<StreamLoadRecorder> _stream_load_recorder;
199
200
    std::shared_ptr<bvar::Status<size_t>> _tablet_max_delete_bitmap_score_metrics;
201
    std::shared_ptr<bvar::Status<size_t>> _tablet_max_base_rowset_delete_bitmap_score_metrics;
202
203
    std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
204
    std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool;
205
    int _cumu_compaction_thread_pool_used_threads {0};
206
    int _cumu_compaction_thread_pool_small_tasks_running {0};
207
};
208
209
class CompactionSubmitRegistry {
210
    using TabletSet = std::unordered_set<TabletSharedPtr>;
211
    using Registry = std::map<DataDir*, TabletSet>;
212
213
public:
214
377
    CompactionSubmitRegistry() = default;
215
    CompactionSubmitRegistry(CompactionSubmitRegistry&& r);
216
217
    // create a snapshot for current registry, operations to the snapshot can be lock-free.
218
    CompactionSubmitRegistry create_snapshot();
219
220
    void reset(const std::vector<DataDir*>& stores);
221
222
    uint32_t count_executing_compaction(DataDir* dir, CompactionType compaction_type);
223
    uint32_t count_executing_cumu_and_base(DataDir* dir);
224
225
    bool has_compaction_task(DataDir* dir, CompactionType compaction_type);
226
227
    bool insert(TabletSharedPtr tablet, CompactionType compaction_type);
228
229
    void remove(TabletSharedPtr tablet, CompactionType compaction_type,
230
                std::function<void()> wakeup_cb);
231
232
    void jsonfy_compaction_status(std::string* result);
233
234
    std::vector<TabletCompactionContext> pick_topn_tablets_for_compaction(
235
            TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type,
236
            const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score);
237
238
private:
239
    TabletSet& _get_tablet_set(DataDir* dir, CompactionType compaction_type);
240
241
    std::mutex _tablet_submitted_compaction_mutex;
242
    Registry _tablet_submitted_cumu_compaction;
243
    Registry _tablet_submitted_base_compaction;
244
    Registry _tablet_submitted_full_compaction;
245
    Registry _tablet_submitted_binlog_compaction;
246
};
247
248
class StorageEngine final : public BaseStorageEngine {
249
public:
250
    StorageEngine(const EngineOptions& options);
251
    ~StorageEngine() override;
252
253
    Status open() override;
254
255
    Status create_tablet(const TCreateTabletReq& request, RuntimeProfile* profile);
256
257
    /* Parameters:
258
     * - tablet_id: the id of tablet to get
259
     * - sync_stats: the stats of sync rowset
260
     * - force_use_only_cached: whether only use cached tablet meta
261
     * - cache_on_miss: whether cache the tablet meta when missing in cache
262
     */
263
    Result<BaseTabletSPtr> get_tablet(int64_t tablet_id, SyncRowsetStats* sync_stats = nullptr,
264
                                      bool force_use_only_cached = false,
265
                                      bool cache_on_miss = true) override;
266
267
    Status get_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta,
268
                           bool force_use_only_cached = false) override;
269
270
    void clear_transaction_task(const TTransactionId transaction_id);
271
    void clear_transaction_task(const TTransactionId transaction_id,
272
                                const std::vector<TPartitionId>& partition_ids);
273
274
    std::vector<DataDir*> get_stores(bool include_unused = false);
275
276
    // get all info of root_path
277
    Status get_all_data_dir_info(std::vector<DataDirInfo>* data_dir_infos, bool need_update);
278
279
    static int64_t get_file_or_directory_size(const std::string& file_path);
280
281
    // get root path for creating tablet. The returned vector of root path should be round robin,
282
    // for avoiding that all the tablet would be deployed one disk.
283
    std::vector<DataDir*> get_stores_for_create_tablet(int64_t partition_id,
284
                                                       TStorageMedium::type storage_medium);
285
286
    DataDir* get_store(const std::string& path);
287
288
0
    uint32_t available_storage_medium_type_count() const {
289
0
        return _available_storage_medium_type_count;
290
0
    }
291
292
    Status set_cluster_id(int32_t cluster_id) override;
293
294
    void start_delete_unused_rowset();
295
    void add_unused_rowset(RowsetSharedPtr rowset);
296
    using DeleteBitmapKeyRanges =
297
            std::vector<std::tuple<DeleteBitmap::BitmapKey, DeleteBitmap::BitmapKey>>;
298
    void add_unused_delete_bitmap_key_ranges(int64_t tablet_id,
299
                                             const std::vector<RowsetId>& rowsets,
300
                                             const DeleteBitmapKeyRanges& key_ranges);
301
302
    // Obtain shard path for new tablet.
303
    //
304
    // @param [out] shard_path choose an available root_path to clone new tablet
305
    // @return error code
306
    Status obtain_shard_path(TStorageMedium::type storage_medium, int64_t path_hash,
307
                             std::string* shared_path, DataDir** store, int64_t partition_id);
308
309
    // Load new tablet to make it effective.
310
    //
311
    // @param [in] root_path specify root path of new tablet
312
    // @param [in] request specify new tablet info
313
    // @param [in] restore whether we're restoring a tablet from trash
314
    // @return OK if load tablet success
315
    Status load_header(const std::string& shard_path, const TCloneReq& request,
316
                       bool restore = false);
317
318
2.31k
    TabletManager* tablet_manager() { return _tablet_manager.get(); }
319
154
    TxnManager* txn_manager() { return _txn_manager.get(); }
320
11
    SnapshotManager* snapshot_mgr() { return _snapshot_mgr.get(); }
321
    // Rowset garbage collection helpers
322
    bool check_rowset_id_in_unused_rowsets(const RowsetId& rowset_id);
323
562
    PendingRowsetSet& pending_local_rowsets() { return _pending_local_rowsets; }
324
5
    PendingRowsetSet& pending_remote_rowsets() { return _pending_remote_rowsets; }
325
    PendingRowsetGuard add_pending_rowset(const RowsetWriterContext& ctx);
326
327
0
    RowsetTypePB default_rowset_type() const {
328
0
        if (_heartbeat_flags != nullptr && _heartbeat_flags->is_set_default_rowset_type_to_beta()) {
329
0
            return BETA_ROWSET;
330
0
        }
331
0
        return _default_rowset_type;
332
0
    }
333
334
    Status start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr = nullptr) override;
335
336
    // clear trash and snapshot file
337
    // option: update disk usage after sweep
338
    Status start_trash_sweep(double* usage, bool ignore_guard = false);
339
340
    // Must call stop() before storage_engine is deconstructed
341
    void stop() override;
342
343
    void get_tablet_rowset_versions(const PGetTabletVersionsRequest* request,
344
                                    PGetTabletVersionsResponse* response);
345
346
    bool get_peer_replica_info(int64_t tablet_id, TReplicaInfo* replica, std::string* token);
347
348
    bool get_peers_replica_backends(int64_t tablet_id, std::vector<TBackend>* backends);
349
350
    bool should_fetch_from_peer(int64_t tablet_id);
351
352
0
    const std::shared_ptr<StreamLoadRecorder>& get_stream_load_recorder() {
353
0
        return _stream_load_recorder;
354
0
    }
355
356
    void get_compaction_status_json(std::string* result);
357
358
    Status submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
359
                                  bool force, bool eager = true, int trigger_method = 0);
360
    Status submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker,
361
                                      SegCompactionCandidatesSharedPtr segments);
362
363
0
    ThreadPool* tablet_publish_txn_thread_pool() { return _tablet_publish_txn_thread_pool.get(); }
364
3.21k
    bool stopped() override { return _stopped; }
365
366
    Status process_index_change_task(const TAlterInvertedIndexReq& reqest);
367
368
    void gc_binlogs(const std::unordered_map<int64_t, int64_t>& gc_tablet_infos);
369
370
    void add_async_publish_task(int64_t partition_id, int64_t tablet_id, int64_t publish_version,
371
                                int64_t transaction_id, bool is_recover, int64_t commit_tso);
372
    int64_t get_pending_publish_min_version(int64_t tablet_id);
373
374
    bool add_broken_path(std::string path);
375
    bool remove_broken_path(std::string path);
376
377
11
    std::set<std::string> get_broken_paths() { return _broken_paths; }
378
379
    Status submit_clone_task(Tablet* tablet, int64_t version);
380
381
    std::unordered_map<int64_t, std::unique_ptr<TaskWorkerPoolIf>>* workers;
382
383
12
    int64_t get_compaction_num_per_round() const { return _compaction_num_per_round; }
384
385
private:
386
    // Instance should be inited from `static open()`
387
    // MUST NOT be called in other circumstances.
388
    Status _open();
389
390
    Status _init_store_map();
391
392
    void _update_storage_medium_type_count();
393
394
    // Some check methods
395
    Status _check_file_descriptor_number();
396
    Status _check_all_root_path_cluster_id();
397
    Status _judge_and_update_effective_cluster_id(int32_t cluster_id);
398
399
    void _exit_if_too_many_disks_are_failed();
400
401
    void _clean_unused_txns();
402
403
    void _clean_unused_rowset_metas();
404
405
    void _clean_unused_binlog_metas();
406
407
    void _clean_unused_delete_bitmap();
408
409
    void _clean_unused_pending_publish_info();
410
411
    void _clean_unused_partial_update_info();
412
413
    Status _do_sweep(const std::string& scan_root, const time_t& local_tm_now,
414
                     const int32_t expire);
415
416
    // All these xxx_callback() functions are for Background threads
417
    // unused rowset monitor thread
418
    void _unused_rowset_monitor_thread_callback();
419
420
    // garbage sweep thread process function. clear snapshot and trash folder
421
    void _garbage_sweeper_thread_callback();
422
423
    // delete tablet with io error process function
424
    void _disk_stat_monitor_thread_callback();
425
426
    // path gc process function
427
    void _path_gc_thread_callback(DataDir* data_dir);
428
429
    void _tablet_path_check_callback();
430
431
    void _tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs);
432
433
    // parse the default rowset type config to RowsetTypePB
434
    void _parse_default_rowset_type();
435
436
    // Disk status monitoring. Monitoring unused_flag Road King's new corresponding root_path unused flag,
437
    // When the unused mark is detected, the corresponding table information is deleted from the memory, and the disk data does not move.
438
    // When the disk status is unusable, but the unused logo is not _push_tablet_into_submitted_compactiondetected, you need to download it from root_path
439
    // Reload the data.
440
    void _start_disk_stat_monitor();
441
442
    void _compaction_tasks_producer_callback();
443
    void _binlog_compaction_tasks_producer_callback();
444
445
    void _update_replica_infos_callback();
446
447
    std::vector<TabletCompactionContext> _generate_compaction_tasks(
448
            CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score);
449
    void _update_cumulative_compaction_policy();
450
    CumuCompactionPolicyTable _snapshot_cumulative_compaction_policy();
451
    std::shared_ptr<CumulativeCompactionPolicy> _get_cumulative_compaction_policy(
452
            std::string_view compaction_policy);
453
454
    void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet,
455
                                               CompactionType compaction_type);
456
457
    Status _submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type,
458
                                   bool force, int trigger_method = 0,
459
                                   int8_t prefer_compaction_level = -1);
460
461
    void _handle_compaction(TabletSharedPtr tablet, std::shared_ptr<CompactionMixin> compaction,
462
                            CompactionType compaction_type, int64_t permits, bool force,
463
                            int64_t compaction_id = 0);
464
465
    Status _submit_single_replica_compaction_task(TabletSharedPtr tablet,
466
                                                  CompactionType compaction_type);
467
468
    void _adjust_compaction_thread_num();
469
470
    void _cooldown_tasks_producer_callback();
471
    void _remove_unused_remote_files_callback();
472
    void do_remove_unused_remote_files();
473
    void _cold_data_compaction_producer_callback();
474
    void _handle_cold_data_compaction(TabletSharedPtr tablet);
475
    void _follow_cooldown_meta(TabletSharedPtr tablet);
476
477
    Status _handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker,
478
                                  SegCompactionCandidatesSharedPtr segments,
479
                                  uint64_t submission_time);
480
481
    Status _handle_index_change(IndexBuilderSharedPtr index_builder);
482
483
    void _gc_binlogs(int64_t tablet_id, int64_t version);
484
485
    void _async_publish_callback();
486
487
    void _process_async_publish();
488
489
    Status _persist_broken_paths();
490
491
    bool _increase_low_priority_task_nums(DataDir* dir);
492
493
    void _decrease_low_priority_task_nums(DataDir* dir);
494
495
    void _get_candidate_stores(TStorageMedium::type storage_medium,
496
                               std::vector<DirInfo>& dir_infos);
497
498
    int _get_and_set_next_disk_index(int64_t partition_id, TStorageMedium::type storage_medium);
499
500
    int32_t _auto_get_interval_by_disk_capacity(DataDir* data_dir);
501
502
    void _check_tablet_delete_bitmap_score_callback();
503
504
private:
505
    EngineOptions _options;
506
    std::mutex _store_lock;
507
    std::mutex _trash_sweep_lock;
508
    std::map<std::string, std::unique_ptr<DataDir>> _store_map;
509
    std::set<std::string> _broken_paths;
510
    std::mutex _broken_paths_mutex;
511
512
    uint32_t _available_storage_medium_type_count;
513
514
    bool _is_all_cluster_id_exist;
515
516
    std::atomic_bool _stopped {false};
517
518
    std::mutex _gc_mutex;
519
    std::unordered_map<RowsetId, RowsetSharedPtr> _unused_rowsets;
520
    // tablet_id, unused_rowsets, [start_version, end_version]
521
    std::vector<std::tuple<int64_t, std::vector<RowsetId>, DeleteBitmapKeyRanges>>
522
            _unused_delete_bitmap;
523
    PendingRowsetSet _pending_local_rowsets;
524
    PendingRowsetSet _pending_remote_rowsets;
525
526
    std::shared_ptr<Thread> _unused_rowset_monitor_thread;
527
    // thread to monitor snapshot expiry
528
    std::shared_ptr<Thread> _garbage_sweeper_thread;
529
    // thread to monitor disk stat
530
    std::shared_ptr<Thread> _disk_stat_monitor_thread;
531
    // thread to produce both base and cumulative compaction tasks
532
    std::shared_ptr<Thread> _compaction_tasks_producer_thread;
533
    std::shared_ptr<Thread> _binlog_compaction_tasks_producer_thread;
534
    std::shared_ptr<Thread> _update_replica_infos_thread;
535
    std::shared_ptr<Thread> _cache_clean_thread;
536
    // threads to clean all file descriptor not actively in use
537
    std::vector<std::shared_ptr<Thread>> _path_gc_threads;
538
    // thread to produce tablet checkpoint tasks
539
    std::shared_ptr<Thread> _tablet_checkpoint_tasks_producer_thread;
540
    // thread to check tablet path
541
    std::shared_ptr<Thread> _tablet_path_check_thread;
542
    // thread to clean tablet lookup cache
543
    std::shared_ptr<Thread> _lookup_cache_clean_thread;
544
545
    std::mutex _engine_task_mutex;
546
547
    std::unique_ptr<TabletManager> _tablet_manager;
548
    std::unique_ptr<TxnManager> _txn_manager;
549
550
    // Used to control the migration from segment_v1 to segment_v2, can be deleted in futrue.
551
    // Type of new loaded data
552
    RowsetTypePB _default_rowset_type;
553
554
    std::unique_ptr<ThreadPool> _single_replica_compaction_thread_pool;
555
    std::unique_ptr<ThreadPool> _binlog_compaction_thread_pool;
556
557
    std::unique_ptr<ThreadPool> _seg_compaction_thread_pool;
558
    std::unique_ptr<ThreadPool> _cold_data_compaction_thread_pool;
559
560
    std::unique_ptr<ThreadPool> _tablet_publish_txn_thread_pool;
561
562
    std::unique_ptr<ThreadPool> _tablet_meta_checkpoint_thread_pool;
563
564
    CompactionPermitLimiter _permit_limiter;
565
566
    CompactionSubmitRegistry _compaction_submit_registry;
567
568
    std::mutex _low_priority_task_nums_mutex;
569
    std::unordered_map<DataDir*, int32_t> _low_priority_task_nums;
570
571
    std::mutex _peer_replica_infos_mutex;
572
    // key: tabletId
573
    std::unordered_map<int64_t, TReplicaInfo> _peer_replica_infos;
574
    std::string _token;
575
576
    std::atomic<int32_t> _wakeup_producer_flag {0};
577
578
    std::mutex _compaction_producer_sleep_mutex;
579
    std::condition_variable _compaction_producer_sleep_cv;
580
581
    // we use unordered_map to store all cumulative compaction policy sharded ptr
582
    std::mutex _cumulative_compaction_policy_mtx;
583
    CumuCompactionPolicyTable _cumulative_compaction_policies;
584
585
    std::shared_ptr<Thread> _cooldown_tasks_producer_thread;
586
    std::shared_ptr<Thread> _remove_unused_remote_files_thread;
587
    std::shared_ptr<Thread> _cold_data_compaction_producer_thread;
588
589
    std::shared_ptr<Thread> _cache_file_cleaner_tasks_producer_thread;
590
591
    std::unique_ptr<PriorityThreadPool> _cooldown_thread_pool;
592
593
    std::mutex _running_cooldown_mutex;
594
    std::unordered_set<int64_t> _running_cooldown_tablets;
595
596
    std::mutex _cold_compaction_tablet_submitted_mtx;
597
    std::unordered_set<int64_t> _cold_compaction_tablet_submitted;
598
599
    std::mutex _cumu_compaction_delay_mtx;
600
601
    // tablet_id, publish_version, transaction_id, partition_id, commit_tso
602
    std::map<int64_t, std::map<int64_t, std::tuple<int64_t, int64_t, int64_t>>>
603
            _async_publish_tasks;
604
    // aync publish for discontinuous versions of merge_on_write table
605
    std::shared_ptr<Thread> _async_publish_thread;
606
    std::shared_mutex _async_publish_lock;
607
608
    std::atomic<bool> _need_clean_trash {false};
609
610
    // next index for create tablet
611
    std::map<TStorageMedium::type, int> _last_use_index;
612
613
    std::unique_ptr<CreateTabletRRIdxCache> _create_tablet_idx_lru_cache;
614
615
    std::unique_ptr<SnapshotManager> _snapshot_mgr;
616
617
    // thread to check tablet delete bitmap count tasks
618
    std::shared_ptr<Thread> _check_delete_bitmap_score_thread;
619
620
    int64_t _last_get_peers_replica_backends_time_ms {0};
621
622
    int64_t _compaction_num_per_round {1};
623
};
624
625
// lru cache for create tabelt round robin in disks
626
// key: partitionId_medium
627
// value: index
628
class CreateTabletRRIdxCache : public LRUCachePolicy {
629
public:
630
    // get key, delimiter with DELIMITER '-'
631
79
    static std::string get_key(int64_t partition_id, TStorageMedium::type medium) {
632
79
        return fmt::format("{}-{}", partition_id, medium);
633
79
    }
634
635
    // -1 not found key in lru
636
    int get_index(const std::string& key);
637
638
    void set_index(const std::string& key, int next_idx);
639
640
    class CacheValue : public LRUCacheValueBase {
641
    public:
642
        int idx = 0;
643
    };
644
645
    CreateTabletRRIdxCache(size_t capacity)
646
377
            : LRUCachePolicy(CachePolicy::CacheType::CREATE_TABLET_RR_IDX_CACHE, capacity,
647
377
                             LRUCacheType::NUMBER,
648
377
                             /*stale_sweep_time_s*/ 30 * 60, /*num shards*/ 1,
649
377
                             /*element count capacity */ 0,
650
377
                             /*enable prune*/ true, /*is lru-k*/ false) {}
651
};
652
653
struct DirInfo {
654
    DataDir* data_dir;
655
656
    double usage = 0;
657
    int available_level = 0;
658
659
1
    bool operator<(const DirInfo& other) const {
660
1
        if (available_level != other.available_level) {
661
0
            return available_level < other.available_level;
662
0
        }
663
1
        return data_dir->path_hash() < other.data_dir->path_hash();
664
1
    }
665
};
666
667
} // namespace doris