Coverage Report

Created: 2026-06-09 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_warm_up_manager.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/countdown_event.h>
21
#include <bthread/mutex.h>
22
#include <gen_cpp/BackendService.h>
23
24
#include <array>
25
#include <condition_variable>
26
#include <deque>
27
#include <mutex>
28
#include <optional>
29
#include <string>
30
#include <thread>
31
#include <vector>
32
33
#include "cloud/cloud_storage_engine.h"
34
#include "cloud/cloud_tablet.h"
35
#include "common/status.h"
36
#include "util/threadpool.h"
37
38
namespace doris {
39
40
struct RecycledRowsets;
41
42
enum class DownloadType {
43
    BE,
44
    S3,
45
};
46
47
struct JobMeta {
48
    JobMeta() = default;
49
    JobMeta(const TJobMeta& meta);
50
    DownloadType download_type;
51
    std::string be_ip;
52
    int32_t brpc_port;
53
    std::vector<int64_t> tablet_ids;
54
};
55
56
// Represents a single peer candidate for cross compute group peer read
57
struct PeerCandidate {
58
    std::string host;
59
    int32_t brpc_port {0};
60
    std::string compute_group_id;
61
    int64_t last_access_time_ms {0}; // ms since epoch, used for expiry
62
    int32_t consecutive_rpc_failures {0};
63
};
64
65
// Holds all peer candidates for a single tablet
66
struct TabletPeerCandidates {
67
    // candidates[0] is the highest priority (warmup-inserted candidates go to front)
68
    std::vector<PeerCandidate> candidates;
69
    // last successful compute group used for this tablet
70
    std::string last_successful_compute_group_id;
71
    // singleflight guard: true while an async fetch from FE is in progress
72
    bool fetching_from_fe {false};
73
    // Cooldown: consecutive all-miss count and cooldown deadline.
74
    // When all candidates miss N times in a row, temporarily skip peer for this tablet.
75
    int32_t consecutive_all_miss {0};
76
    int64_t cooldown_until_ms {0}; // ms since epoch; 0 = not in cooldown
77
};
78
79
// manager for
80
// table warm up
81
// cluster warm up
82
// balance peer addr cache
83
class CloudWarmUpManager {
84
public:
85
    explicit CloudWarmUpManager(CloudStorageEngine& engine);
86
    ~CloudWarmUpManager();
87
    // Set the job id if the id is zero
88
    Status check_and_set_job_id(int64_t job_id);
89
90
    // Set the batch id to record download progress
91
    Status check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry = nullptr);
92
93
    // Add the dowload job
94
    void add_job(const std::vector<TJobMeta>& job_metas);
95
96
#ifdef BE_TEST
97
    void consumer_job();
98
#endif
99
100
    // Get the job state tuple<cur_job_id, cur_batch_id, pending_job_metas_size, _finish_job_size>
101
    std::tuple<int64_t, int64_t, int64_t, int64_t> get_current_job_state();
102
103
    // Cancel the job
104
    Status clear_job(int64_t job_id);
105
106
    Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false);
107
108
    // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
109
    // and return immediately without waiting for the warm-up to complete.
110
    // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
111
    // to finish or until the specified timeout (in milliseconds) is reached.
112
    //
113
    // @param rs_meta Metadata of the rowset to be warmed up.
114
    // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
115
    //                              to complete. Non-positive value means no waiting.
116
    void warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms = -1);
117
118
    void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
119
120
    // Balance warm up cache management methods
121
    // compute_group_id defaults to "" for backward compatibility
122
    void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port,
123
                                const std::string& compute_group_id = "");
124
    void remove_balanced_tablet(int64_t tablet_id);
125
    void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
126
127
    // Cross compute group peer read candidate management
128
    void fetch_candidates_from_fe(int64_t tablet_id);
129
    std::vector<PeerCandidate> get_peer_candidates(int64_t tablet_id);
130
    void update_peer_candidate_on_success(int64_t tablet_id, const std::string& compute_group_id);
131
    void update_peer_candidate_on_rpc_failure(int64_t tablet_id, const std::string& host,
132
                                              int32_t brpc_port);
133
    // Rotate a cache-miss candidate to the end of the list so next read tries a different one.
134
    void rotate_peer_candidate_on_cache_miss(int64_t tablet_id, const std::string& host,
135
                                             int32_t brpc_port);
136
    // Record that all candidates missed for this tablet in a single race.
137
    // After consecutive_all_miss reaches threshold, sets a cooldown period during which
138
    // get_peer_candidates returns empty to avoid wasting peer RPCs.
139
    void record_peer_all_miss(int64_t tablet_id);
140
    // Check if peer read is in cooldown for this tablet (all candidates repeatedly missed).
141
    bool is_peer_cooldown(int64_t tablet_id) const;
142
143
    // ---- HTTP debug/admin API ----
144
    // Read-only snapshot of TabletPeerCandidates for a single tablet.
145
    // Returns nullopt if the tablet has no peer candidates.
146
    std::optional<TabletPeerCandidates> get_tablet_peer_info(int64_t tablet_id) const;
147
    // Snapshot of all tablets with peer candidates. If limit > 0, returns at most that many.
148
    std::vector<std::pair<int64_t, TabletPeerCandidates>> get_all_peer_info(
149
            int64_t limit = 0) const;
150
    // Force-set the full TabletPeerCandidates for a tablet (admin override).
151
    void set_tablet_peer_candidates(int64_t tablet_id, TabletPeerCandidates candidates);
152
153
private:
154
    void schedule_remove_balanced_tablet(int64_t tablet_id);
155
    static void clean_up_expired_mappings(void* arg);
156
    void handle_jobs();
157
    void run_cleanup_loop();
158
159
    Status _do_warm_up_rowset(RowsetMeta& rs_meta, std::vector<TReplicaInfo>& replicas,
160
                              int64_t sync_wait_timeout_ms, bool skip_existence_check);
161
162
    std::vector<TReplicaInfo> get_replica_info(int64_t tablet_id, bool bypass_cache,
163
                                               bool& cache_hit);
164
165
    void _warm_up_rowset(RowsetMeta& rs_meta, int64_t sync_wait_timeout_ms);
166
    void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
167
168
    void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
169
                               int64_t expiration_time,
170
                               std::shared_ptr<bthread::CountdownEvent> wait, bool is_index = false,
171
                               std::function<void(Status)> done_cb = nullptr,
172
                               int64_t tablet_id = -1);
173
    std::mutex _mtx;
174
    std::condition_variable _cond;
175
    int64_t _cur_job_id {0};
176
    int64_t _cur_batch_id {-1};
177
    std::deque<std::shared_ptr<JobMeta>> _pending_job_metas;
178
    std::vector<std::shared_ptr<JobMeta>> _finish_job;
179
    std::thread _download_thread;
180
    bool _closed {false};
181
    // the attribute for compile in ut
182
    [[maybe_unused]] CloudStorageEngine& _engine;
183
184
    // timestamp, info
185
    using CacheEntry = std::pair<std::chrono::steady_clock::time_point, TReplicaInfo>;
186
    // tablet_id -> entry
187
    using Cache = std::unordered_map<int64_t, CacheEntry>;
188
    // job_id -> cache
189
    std::unordered_map<int64_t, Cache> _tablet_replica_cache;
190
    std::unique_ptr<ThreadPool> _thread_pool;
191
    std::unique_ptr<ThreadPoolToken> _thread_pool_token;
192
193
    // Sharded lock for better performance
194
    // bthread::Mutex is used because peer read path runs in bthread context
195
    static constexpr size_t SHARD_COUNT = 10240;
196
    struct Shard {
197
        mutable bthread::Mutex mtx;
198
        std::unordered_map<int64_t, TabletPeerCandidates> tablets;
199
    };
200
    std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;
201
202
    // Helper methods for shard operations
203
184
    size_t get_shard_index(int64_t tablet_id) const {
204
184
        return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
205
184
    }
206
174
    Shard& get_shard(int64_t tablet_id) {
207
174
        return _balanced_tablets_shards[get_shard_index(tablet_id)];
208
174
    }
209
210
10
    const Shard& get_shard(int64_t tablet_id) const {
211
10
        return _balanced_tablets_shards[get_shard_index(tablet_id)];
212
10
    }
213
214
    // Cleanup thread and its synchronization primitives
215
    std::thread _cleanup_thread;
216
    std::mutex _cleanup_mtx;
217
    std::condition_variable _cleanup_cond;
218
};
219
220
} // namespace doris