Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_warm_up_manager.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/countdown_event.h>
21
#include <gen_cpp/BackendService.h>
22
23
#include <condition_variable>
24
#include <deque>
25
#include <mutex>
26
#include <string>
27
#include <thread>
28
#include <vector>
29
30
#include "cloud/cloud_storage_engine.h"
31
#include "cloud/cloud_tablet.h"
32
#include "common/status.h"
33
#include "util/threadpool.h"
34
35
namespace doris {
36
37
enum class DownloadType {
38
    BE,
39
    S3,
40
};
41
42
struct JobMeta {
43
0
    JobMeta() = default;
44
    JobMeta(const TJobMeta& meta);
45
    DownloadType download_type;
46
    std::string be_ip;
47
    int32_t brpc_port;
48
    std::vector<int64_t> tablet_ids;
49
};
50
51
// manager for
52
// table warm up
53
// cluster warm up
54
// balance peer addr cache
55
class CloudWarmUpManager {
56
public:
57
    explicit CloudWarmUpManager(CloudStorageEngine& engine);
58
    ~CloudWarmUpManager();
59
    // Set the job id if the id is zero
60
    Status check_and_set_job_id(int64_t job_id);
61
62
    // Set the batch id to record download progress
63
    Status check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry = nullptr);
64
65
    // Add the dowload job
66
    void add_job(const std::vector<TJobMeta>& job_metas);
67
68
#ifdef BE_TEST
69
    void consumer_job();
70
#endif
71
72
    // Get the job state tuple<cur_job_id, cur_batch_id, pending_job_metas_size, _finish_job_size>
73
    std::tuple<int64_t, int64_t, int64_t, int64_t> get_current_job_state();
74
75
    // Cancel the job
76
    Status clear_job(int64_t job_id);
77
78
    Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false);
79
80
    // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
81
    // and return immediately without waiting for the warm-up to complete.
82
    // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
83
    // to finish or until the specified timeout (in milliseconds) is reached.
84
    //
85
    // @param rs_meta Metadata of the rowset to be warmed up.
86
    // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
87
    //                              to complete. Non-positive value means no waiting.
88
    void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1);
89
90
    void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
91
92
    // Balance warm up cache management methods
93
    void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port);
94
    std::optional<std::pair<std::string, int32_t>> get_balanced_tablet_info(int64_t tablet_id);
95
    void remove_balanced_tablet(int64_t tablet_id);
96
    void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
97
    bool is_balanced_tablet_expired(const std::chrono::system_clock::time_point& ctime) const;
98
    std::unordered_map<int64_t, std::pair<std::string, int32_t>> get_all_balanced_tablets() const;
99
100
private:
101
    void schedule_remove_balanced_tablet(int64_t tablet_id);
102
    static void clean_up_expired_mappings(void* arg);
103
    void handle_jobs();
104
105
    Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& replicas,
106
                              int64_t sync_wait_timeout_ms, bool skip_existence_check);
107
108
    std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool bypass_cache,
109
                                               bool& cache_hit);
110
111
    void _warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms);
112
    void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
113
114
    void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
115
                               int64_t expiration_time,
116
                               std::shared_ptr<bthread::CountdownEvent> wait, bool is_index = false,
117
                               std::function<void(Status)> done_cb = nullptr);
118
    std::mutex _mtx;
119
    std::condition_variable _cond;
120
    int64_t _cur_job_id {0};
121
    int64_t _cur_batch_id {-1};
122
    std::deque<std::shared_ptr<JobMeta>> _pending_job_metas;
123
    std::vector<std::shared_ptr<JobMeta>> _finish_job;
124
    std::thread _download_thread;
125
    bool _closed {false};
126
    // the attribute for compile in ut
127
    [[maybe_unused]] CloudStorageEngine& _engine;
128
129
    // timestamp, info
130
    using CacheEntry = std::pair<std::chrono::steady_clock::time_point, TReplicaInfo>;
131
    // tablet_id -> entry
132
    using Cache = std::unordered_map<int64_t, CacheEntry>;
133
    // job_id -> cache
134
    std::unordered_map<int64_t, Cache> _tablet_replica_cache;
135
    std::unique_ptr<ThreadPool> _thread_pool;
136
    std::unique_ptr<ThreadPoolToken> _thread_pool_token;
137
138
    // Sharded lock for better performance
139
    static constexpr size_t SHARD_COUNT = 10240;
140
    struct Shard {
141
        mutable std::mutex mtx;
142
        std::unordered_map<int64_t, JobMeta> tablets;
143
    };
144
    std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;
145
146
    // Helper methods for shard operations
147
0
    size_t get_shard_index(int64_t tablet_id) const {
148
0
        return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
149
0
    }
150
0
    Shard& get_shard(int64_t tablet_id) {
151
0
        return _balanced_tablets_shards[get_shard_index(tablet_id)];
152
0
    }
153
};
154
155
} // namespace doris