Coverage Report

Created: 2026-07-01 14:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_warm_up_manager.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/countdown_event.h>
21
#include <bthread/mutex.h>
22
#include <gen_cpp/BackendService.h>
23
24
#include <array>
25
#include <condition_variable>
26
#include <cstddef>
27
#include <deque>
28
#include <mutex>
29
#include <optional>
30
#include <string>
31
#include <thread>
32
#include <unordered_set>
33
#include <vector>
34
35
#include "cloud/cloud_storage_engine.h"
36
#include "cloud/cloud_tablet.h"
37
#include "common/status.h"
38
#include "util/threadpool.h"
39
40
namespace doris {
41
42
struct RecycledRowsets;
43
44
enum class DownloadType {
45
    BE,
46
    S3,
47
};
48
49
// Filter for event-driven warmup jobs.
50
// nullopt = cluster-level (no table filter, warm up all tables)
51
// has_value = table-level filter (only warm up tables in the set)
52
using EventDrivenJobFilter = std::optional<std::unordered_set<int64_t>>;
53
54
struct JobReplicaInfo {
55
    int64_t job_id;
56
    TReplicaInfo replica;
57
};
58
59
struct JobMeta {
60
    JobMeta() = default;
61
    JobMeta(const TJobMeta& meta);
62
    DownloadType download_type;
63
    std::string be_ip;
64
    int32_t brpc_port;
65
    std::vector<int64_t> tablet_ids;
66
};
67
68
// Represents a single peer candidate for cross compute group peer read
69
struct PeerCandidate {
70
    std::string host;
71
    int32_t brpc_port {0};
72
    std::string compute_group_id;
73
    int64_t last_access_time_ms {0}; // ms since epoch, used for expiry
74
    int32_t consecutive_rpc_failures {0};
75
};
76
77
// Holds all peer candidates for a single tablet
78
struct TabletPeerCandidates {
79
    // candidates[0] is the highest priority (warmup-inserted candidates go to front)
80
    std::vector<PeerCandidate> candidates;
81
    // last successful compute group used for this tablet
82
    std::string last_successful_compute_group_id;
83
    // singleflight guard: true while an async fetch from FE is in progress
84
    bool fetching_from_fe {false};
85
    // Cooldown: consecutive all-miss count and cooldown deadline.
86
    // When all candidates miss N times in a row, temporarily skip peer for this tablet.
87
    int32_t consecutive_all_miss {0};
88
    int64_t cooldown_until_ms {0}; // ms since epoch; 0 = not in cooldown
89
};
90
91
// manager for
92
// table warm up
93
// cluster warm up
94
// balance peer addr cache
95
class CloudWarmUpManager {
96
public:
97
    explicit CloudWarmUpManager(CloudStorageEngine& engine);
98
    ~CloudWarmUpManager();
99
    // Set the job id if the id is zero
100
    Status check_and_set_job_id(int64_t job_id);
101
102
    // Set the batch id to record download progress
103
    Status check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry = nullptr);
104
105
    // Add the dowload job
106
    void add_job(const std::vector<TJobMeta>& job_metas);
107
108
#ifdef BE_TEST
109
    void consumer_job();
110
#endif
111
112
    // Get the job state tuple<cur_job_id, cur_batch_id, pending_job_metas_size, _finish_job_size>
113
    std::tuple<int64_t, int64_t, int64_t, int64_t> get_current_job_state();
114
115
    // Cancel the job
116
    Status clear_job(int64_t job_id);
117
118
    Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false,
119
                     const std::vector<int64_t>* table_ids = nullptr);
120
121
    // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
122
    // and return immediately without waiting for the warm-up to complete.
123
    // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
124
    // to finish or until the specified timeout (in milliseconds) is reached.
125
    //
126
    // @param rs_meta Metadata of the rowset to be warmed up.
127
    // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
128
    //                              to complete. Non-positive value means no waiting.
129
    void warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms = -1);
130
131
    void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
132
133
    // Balance warm up cache management methods
134
    // compute_group_id defaults to "" for backward compatibility
135
    void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port,
136
                                const std::string& compute_group_id = "");
137
    void remove_balanced_tablet(int64_t tablet_id);
138
    void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
139
140
    // Cross compute group peer read candidate management
141
    void fetch_candidates_from_fe(int64_t tablet_id);
142
    std::vector<PeerCandidate> get_peer_candidates(int64_t tablet_id);
143
    void update_peer_candidate_on_success(int64_t tablet_id, const std::string& compute_group_id);
144
    void update_peer_candidate_on_rpc_failure(int64_t tablet_id, const std::string& host,
145
                                              int32_t brpc_port);
146
    // Rotate a cache-miss candidate to the end of the list so next read tries a different one.
147
    void rotate_peer_candidate_on_cache_miss(int64_t tablet_id, const std::string& host,
148
                                             int32_t brpc_port);
149
    // Record that all candidates missed for this tablet in a single race.
150
    // After consecutive_all_miss reaches threshold, sets a cooldown period during which
151
    // get_peer_candidates returns empty to avoid wasting peer RPCs.
152
    void record_peer_all_miss(int64_t tablet_id);
153
    // Check if peer read is in cooldown for this tablet (all candidates repeatedly missed).
154
    bool is_peer_cooldown(int64_t tablet_id) const;
155
156
    // ---- HTTP debug/admin API ----
157
    // Read-only snapshot of TabletPeerCandidates for a single tablet.
158
    // Returns nullopt if the tablet has no peer candidates.
159
    std::optional<TabletPeerCandidates> get_tablet_peer_info(int64_t tablet_id) const;
160
    // Snapshot of all tablets with peer candidates. If limit > 0, returns at most that many.
161
    std::vector<std::pair<int64_t, TabletPeerCandidates>> get_all_peer_info(
162
            int64_t limit = 0) const;
163
    // Force-set the full TabletPeerCandidates for a tablet (admin override).
164
    void set_tablet_peer_candidates(int64_t tablet_id, TabletPeerCandidates candidates);
165
166
private:
167
    struct WarmUpRowsetFailure {
168
        int code;
169
        std::string reason;
170
    };
171
172
    static Status _build_warm_up_rowset_result(const std::vector<WarmUpRowsetFailure>& failures,
173
                                               size_t replica_count, int64_t tablet_id,
174
                                               int64_t table_id, const std::string& rowset_id);
175
176
    void schedule_remove_balanced_tablet(int64_t tablet_id);
177
    static void clean_up_expired_mappings(void* arg);
178
    void handle_jobs();
179
    void run_cleanup_loop();
180
181
    Status _do_warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
182
                              std::vector<JobReplicaInfo>& replicas, int64_t sync_wait_timeout_ms,
183
                              bool skip_existence_check);
184
185
    std::vector<JobReplicaInfo> get_replica_info(int64_t tablet_id, int64_t table_id,
186
                                                 bool bypass_cache, bool& cache_hit);
187
188
    void _warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms);
189
    void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
190
191
    void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
192
                               int64_t expiration_time,
193
                               std::shared_ptr<bthread::CountdownEvent> wait, bool is_index = false,
194
                               std::function<void(Status)> done_cb = nullptr,
195
                               int64_t tablet_id = -1);
196
    std::mutex _mtx;
197
    std::condition_variable _cond;
198
    int64_t _cur_job_id {0};
199
    int64_t _cur_batch_id {-1};
200
    std::deque<std::shared_ptr<JobMeta>> _pending_job_metas;
201
    std::vector<std::shared_ptr<JobMeta>> _finish_job;
202
    std::thread _download_thread;
203
    bool _closed {false};
204
    // the attribute for compile in ut
205
    [[maybe_unused]] CloudStorageEngine& _engine;
206
207
    // timestamp, info
208
    using CacheEntry = std::pair<std::chrono::steady_clock::time_point, TReplicaInfo>;
209
    // tablet_id -> entry
210
    using Cache = std::unordered_map<int64_t, CacheEntry>;
211
    // job_id -> cache
212
    std::unordered_map<int64_t, Cache> _tablet_replica_cache;
213
    // job_id -> table filter (nullopt = cluster-level, no filter)
214
    std::unordered_map<int64_t, EventDrivenJobFilter> _event_driven_filters;
215
    std::unique_ptr<ThreadPool> _thread_pool;
216
    std::unique_ptr<ThreadPoolToken> _thread_pool_token;
217
218
    // Sharded lock for better performance
219
    // bthread::Mutex is used because peer read path runs in bthread context
220
    static constexpr size_t SHARD_COUNT = 10240;
221
    struct Shard {
222
        mutable bthread::Mutex mtx;
223
        std::unordered_map<int64_t, TabletPeerCandidates> tablets;
224
    };
225
    std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;
226
227
    // Helper methods for shard operations
228
194
    size_t get_shard_index(int64_t tablet_id) const {
229
194
        return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
230
194
    }
231
184
    Shard& get_shard(int64_t tablet_id) {
232
184
        return _balanced_tablets_shards[get_shard_index(tablet_id)];
233
184
    }
234
235
10
    const Shard& get_shard(int64_t tablet_id) const {
236
10
        return _balanced_tablets_shards[get_shard_index(tablet_id)];
237
10
    }
238
239
    // Cleanup thread and its synchronization primitives
240
    std::thread _cleanup_thread;
241
    std::mutex _cleanup_mtx;
242
    std::condition_variable _cleanup_cond;
243
};
244
245
} // namespace doris