Coverage Report

Created: 2026-06-17 03:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_warm_up_manager.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <bthread/countdown_event.h>
21
#include <gen_cpp/BackendService.h>
22
23
#include <condition_variable>
24
#include <cstddef>
25
#include <deque>
26
#include <mutex>
27
#include <string>
28
#include <thread>
29
#include <unordered_set>
30
#include <vector>
31
32
#include "cloud/cloud_storage_engine.h"
33
#include "cloud/cloud_tablet.h"
34
#include "common/status.h"
35
#include "util/threadpool.h"
36
37
namespace doris {
38
39
enum class DownloadType {
40
    BE,
41
    S3,
42
};
43
44
// Filter for event-driven warmup jobs.
45
// nullopt = cluster-level (no table filter, warm up all tables)
46
// has_value = table-level filter (only warm up tables in the set)
47
using EventDrivenJobFilter = std::optional<std::unordered_set<int64_t>>;
48
49
struct JobReplicaInfo {
50
    int64_t job_id;
51
    TReplicaInfo replica;
52
};
53
54
struct JobMeta {
55
0
    JobMeta() = default;
56
    JobMeta(const TJobMeta& meta);
57
    DownloadType download_type;
58
    std::string be_ip;
59
    int32_t brpc_port;
60
    std::vector<int64_t> tablet_ids;
61
};
62
63
// manager for
64
// table warm up
65
// cluster warm up
66
// balance peer addr cache
67
class CloudWarmUpManager {
68
public:
69
    explicit CloudWarmUpManager(CloudStorageEngine& engine);
70
    ~CloudWarmUpManager();
71
    // Set the job id if the id is zero
72
    Status check_and_set_job_id(int64_t job_id);
73
74
    // Set the batch id to record download progress
75
    Status check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry = nullptr);
76
77
    // Add the dowload job
78
    void add_job(const std::vector<TJobMeta>& job_metas);
79
80
#ifdef BE_TEST
81
    void consumer_job();
82
#endif
83
84
    // Get the job state tuple<cur_job_id, cur_batch_id, pending_job_metas_size, _finish_job_size>
85
    std::tuple<int64_t, int64_t, int64_t, int64_t> get_current_job_state();
86
87
    // Cancel the job
88
    Status clear_job(int64_t job_id);
89
90
    Status set_event(int64_t job_id, TWarmUpEventType::type event, bool clear = false,
91
                     const std::vector<int64_t>* table_ids = nullptr);
92
93
    // If `sync_wait_timeout_ms` <= 0, the function will send the warm-up RPC
94
    // and return immediately without waiting for the warm-up to complete.
95
    // If `sync_wait_timeout_ms` > 0, the function will wait for the warm-up
96
    // to finish or until the specified timeout (in milliseconds) is reached.
97
    //
98
    // @param rs_meta Metadata of the rowset to be warmed up.
99
    // @param sync_wait_timeout_ms Timeout in milliseconds to wait for the warm-up
100
    //                              to complete. Non-positive value means no waiting.
101
    void warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms = -1);
102
103
    void recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
104
105
    // Balance warm up cache management methods
106
    void record_balanced_tablet(int64_t tablet_id, const std::string& host, int32_t brpc_port);
107
    std::optional<std::pair<std::string, int32_t>> get_balanced_tablet_info(int64_t tablet_id);
108
    void remove_balanced_tablet(int64_t tablet_id);
109
    void remove_balanced_tablets(const std::vector<int64_t>& tablet_ids);
110
    bool is_balanced_tablet_expired(const std::chrono::system_clock::time_point& ctime) const;
111
    std::unordered_map<int64_t, std::pair<std::string, int32_t>> get_all_balanced_tablets() const;
112
113
private:
114
    struct WarmUpRowsetFailure {
115
        int code;
116
        std::string reason;
117
    };
118
119
    static Status _build_warm_up_rowset_result(const std::vector<WarmUpRowsetFailure>& failures,
120
                                               size_t replica_count, int64_t tablet_id,
121
                                               int64_t table_id, const std::string& rowset_id);
122
123
    void schedule_remove_balanced_tablet(int64_t tablet_id);
124
    static void clean_up_expired_mappings(void* arg);
125
    void handle_jobs();
126
127
    Status _do_warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
128
                              std::vector<JobReplicaInfo>& replicas, int64_t sync_wait_timeout_ms,
129
                              bool skip_existence_check);
130
131
    std::vector<JobReplicaInfo> get_replica_info(int64_t tablet_id, int64_t table_id,
132
                                                 bool bypass_cache, bool& cache_hit);
133
134
    void _warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id, int64_t sync_wait_timeout_ms);
135
    void _recycle_cache(int64_t tablet_id, const std::vector<RecycledRowsets>& rowsets);
136
137
    void submit_download_tasks(io::Path path, int64_t file_size, io::FileSystemSPtr file_system,
138
                               int64_t expiration_time,
139
                               std::shared_ptr<bthread::CountdownEvent> wait, bool is_index = false,
140
                               std::function<void(Status)> done_cb = nullptr,
141
                               int64_t tablet_id = -1);
142
    std::mutex _mtx;
143
    std::condition_variable _cond;
144
    int64_t _cur_job_id {0};
145
    int64_t _cur_batch_id {-1};
146
    std::deque<std::shared_ptr<JobMeta>> _pending_job_metas;
147
    std::vector<std::shared_ptr<JobMeta>> _finish_job;
148
    std::thread _download_thread;
149
    bool _closed {false};
150
    // the attribute for compile in ut
151
    [[maybe_unused]] CloudStorageEngine& _engine;
152
153
    // timestamp, info
154
    using CacheEntry = std::pair<std::chrono::steady_clock::time_point, TReplicaInfo>;
155
    // tablet_id -> entry
156
    using Cache = std::unordered_map<int64_t, CacheEntry>;
157
    // job_id -> cache
158
    std::unordered_map<int64_t, Cache> _tablet_replica_cache;
159
    // job_id -> table filter (nullopt = cluster-level, no filter)
160
    std::unordered_map<int64_t, EventDrivenJobFilter> _event_driven_filters;
161
    std::unique_ptr<ThreadPool> _thread_pool;
162
    std::unique_ptr<ThreadPoolToken> _thread_pool_token;
163
164
    // Sharded lock for better performance
165
    static constexpr size_t SHARD_COUNT = 10240;
166
    struct Shard {
167
        mutable std::mutex mtx;
168
        std::unordered_map<int64_t, JobMeta> tablets;
169
    };
170
    std::array<Shard, SHARD_COUNT> _balanced_tablets_shards;
171
172
    // Helper methods for shard operations
173
0
    size_t get_shard_index(int64_t tablet_id) const {
174
0
        return std::hash<int64_t> {}(tablet_id) % SHARD_COUNT;
175
0
    }
176
0
    Shard& get_shard(int64_t tablet_id) {
177
0
        return _balanced_tablets_shards[get_shard_index(tablet_id)];
178
0
    }
179
};
180
181
} // namespace doris