Coverage Report

Created: 2026-07-02 15:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_warm_up_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_warm_up_manager.h"
19
20
#include <bthread/condition_variable.h>
21
#include <bthread/mutex.h>
22
#include <bthread/unstable.h>
23
#include <butil/time.h>
24
#include <bvar/bvar.h>
25
#include <bvar/reducer.h>
26
27
#include <algorithm>
28
#include <chrono>
29
#include <cstddef>
30
#include <list>
31
#include <string>
32
#include <tuple>
33
#include <vector>
34
35
#include "bthread/mutex.h"
36
#include "bvar/bvar.h"
37
#include "cloud/cloud_tablet.h"
38
#include "cloud/cloud_tablet_mgr.h"
39
#include "cloud/config.h"
40
#include "common/cast_set.h"
41
#include "common/check.h"
42
#include "common/config.h"
43
#include "common/logging.h"
44
#include "cpp/sync_point.h"
45
#include "io/cache/block_file_cache_downloader.h"
46
#include "runtime/exec_env.h"
47
#include "service/backend_options.h"
48
#include "storage/index/inverted/inverted_index_desc.h"
49
#include "storage/rowset/beta_rowset.h"
50
#include "storage/tablet/tablet.h"
51
#include "util/brpc_client_cache.h" // BrpcClientCache
52
#include "util/bvar_windowed_adder.h"
53
#include "util/client_cache.h"
54
#include "util/defer_op.h"
55
#include "util/stack_util.h"
56
#include "util/thrift_rpc_helper.h"
57
#include "util/time.h"
58
59
namespace doris {
60
61
// Peer candidate management statistics
62
bvar::Adder<uint64_t> g_peer_candidate_cache_hit("peer_candidate_cache_hit");
63
bvar::Adder<uint64_t> g_peer_candidate_cache_miss("peer_candidate_cache_miss");
64
bvar::Adder<uint64_t> g_peer_lazy_fetch_total("peer_lazy_fetch_total");
65
bvar::Adder<uint64_t> g_peer_lazy_fetch_success("peer_lazy_fetch_success");
66
bvar::Adder<uint64_t> g_peer_lazy_fetch_failed("peer_lazy_fetch_failed");
67
bvar::LatencyRecorder g_peer_lazy_fetch_latency("peer_lazy_fetch_latency");
68
bvar::Adder<uint64_t> g_peer_rpc_failure_eviction("peer_rpc_failure_eviction");
69
bvar::Adder<uint64_t> g_peer_candidate_expiry_eviction("peer_candidate_expiry_eviction");
70
bvar::Adder<uint64_t> g_peer_candidate_rotate("peer_candidate_rotate");
71
bvar::Adder<uint64_t> g_peer_tablet_cooldown_entered("peer_tablet_cooldown_entered");
72
bvar::Adder<uint64_t> g_peer_tablet_cooldown_skipped("peer_tablet_cooldown_skipped");
73
74
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_skipped_rowset_num(
75
        "file_cache_event_driven_warm_up_skipped_rowset_num");
76
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_requested_segment_size(
77
        "file_cache_event_driven_warm_up_requested_segment_size");
78
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_requested_segment_num(
79
        "file_cache_event_driven_warm_up_requested_segment_num");
80
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_requested_index_size(
81
        "file_cache_event_driven_warm_up_requested_index_size");
82
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_requested_index_num(
83
        "file_cache_event_driven_warm_up_requested_index_num");
84
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_submitted_tablet_num(
85
        "file_cache_once_or_periodic_warm_up_submitted_tablet_num");
86
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_finished_tablet_num(
87
        "file_cache_once_or_periodic_warm_up_finished_tablet_num");
88
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_submitted_segment_size(
89
        "file_cache_once_or_periodic_warm_up_submitted_segment_size");
90
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_submitted_segment_num(
91
        "file_cache_once_or_periodic_warm_up_submitted_segment_num");
92
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_submitted_index_size(
93
        "file_cache_once_or_periodic_warm_up_submitted_index_size");
94
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_submitted_index_num(
95
        "file_cache_once_or_periodic_warm_up_submitted_index_num");
96
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_finished_segment_size(
97
        "file_cache_once_or_periodic_warm_up_finished_segment_size");
98
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_finished_segment_num(
99
        "file_cache_once_or_periodic_warm_up_finished_segment_num");
100
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_finished_index_size(
101
        "file_cache_once_or_periodic_warm_up_finished_index_size");
102
bvar::Adder<uint64_t> g_file_cache_once_or_periodic_warm_up_finished_index_num(
103
        "file_cache_once_or_periodic_warm_up_finished_index_num");
104
bvar::Adder<uint64_t> g_file_cache_recycle_cache_requested_segment_num(
105
        "file_cache_recycle_cache_requested_segment_num");
106
bvar::Adder<uint64_t> g_file_cache_recycle_cache_requested_index_num(
107
        "file_cache_recycle_cache_requested_index_num");
108
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_call_unix_ts(
109
        "file_cache_warm_up_rowset_last_call_unix_ts", 0);
110
bvar::Adder<uint64_t> file_cache_warm_up_failed_task_num("file_cache_warm_up", "failed_task_num");
111
bvar::Adder<int64_t> g_balance_tablet_be_mapping_size("balance_tablet_be_mapping_size");
112
113
bvar::LatencyRecorder g_file_cache_warm_up_rowset_wait_for_compaction_latency(
114
        "file_cache_warm_up_rowset_wait_for_compaction_latency");
115
116
// Per-job windowed metrics for source BE
117
// bvar::Window enforces MAX_SECONDS_LIMIT = 3600, so the longest window is 1h.
118
static constexpr int WINDOW_5M = 300;
119
static constexpr int WINDOW_30M = 1800;
120
static constexpr int WINDOW_1H = 3600;
121
122
MBvarWindowedAdder g_warmup_ed_requested_segment_num("warmup_ed_requested_segment_num", {"job_id"},
123
                                                     {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
124
MBvarWindowedAdder g_warmup_ed_requested_segment_size("warmup_ed_requested_segment_size",
125
                                                      {"job_id"},
126
                                                      {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
127
MBvarWindowedAdder g_warmup_ed_requested_index_num("warmup_ed_requested_index_num", {"job_id"},
128
                                                   {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
129
MBvarWindowedAdder g_warmup_ed_requested_index_size("warmup_ed_requested_index_size", {"job_id"},
130
                                                    {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
131
bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_trigger_ts({"job_id"});
132
133
57
CloudWarmUpManager::CloudWarmUpManager(CloudStorageEngine& engine) : _engine(engine) {
134
57
    auto st = ThreadPoolBuilder("CloudWarmUpManagerThreadPool")
135
57
                      .set_min_threads(config::warm_up_manager_thread_pool_size)
136
57
                      .set_max_threads(config::warm_up_manager_thread_pool_size)
137
57
                      .build(&_thread_pool);
138
57
    DORIS_CHECK(st.ok()) << st;
139
57
    _thread_pool_token = _thread_pool->new_token(ThreadPool::ExecutionMode::CONCURRENT);
140
57
    DORIS_CHECK(_thread_pool_token != nullptr);
141
57
    _download_thread = std::thread(&CloudWarmUpManager::handle_jobs, this);
142
57
    _cleanup_thread = std::thread(&CloudWarmUpManager::run_cleanup_loop, this);
143
57
}
144
145
56
CloudWarmUpManager::~CloudWarmUpManager() {
146
56
    {
147
        // Set _closed under both mutexes so that both threads' wait predicates see it.
148
56
        std::lock_guard lock(_mtx);
149
56
        std::lock_guard<std::mutex> cleanup_lock(_cleanup_mtx);
150
56
        _closed = true;
151
56
    }
152
56
    _cond.notify_all();
153
56
    _cleanup_cond.notify_all();
154
56
    if (_download_thread.joinable()) {
155
56
        _download_thread.join();
156
56
    }
157
56
    if (_cleanup_thread.joinable()) {
158
56
        _cleanup_thread.join();
159
56
    }
160
161
56
    _thread_pool_token->shutdown();
162
56
    _thread_pool_token.reset();
163
56
    _thread_pool->shutdown();
164
56
    _thread_pool.reset();
165
166
573k
    for (auto& shard : _balanced_tablets_shards) {
167
573k
        std::unique_lock<bthread::Mutex> lock(shard.mtx);
168
573k
        shard.tablets.clear();
169
573k
    }
170
56
}
171
172
0
std::unordered_map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas(BaseTablet* tablet) {
173
0
    std::unordered_map<std::string, RowsetMetaSharedPtr> id_to_rowset_meta_map;
174
0
    auto visitor = [&id_to_rowset_meta_map](const RowsetSharedPtr& r) {
175
0
        id_to_rowset_meta_map.emplace(r->rowset_meta()->rowset_id().to_string(), r->rowset_meta());
176
0
    };
177
0
    constexpr bool include_stale = false;
178
0
    tablet->traverse_rowsets(visitor, include_stale);
179
0
    return id_to_rowset_meta_map;
180
0
}
181
182
void CloudWarmUpManager::submit_download_tasks(io::Path path, int64_t file_size,
183
                                               io::FileSystemSPtr file_system,
184
                                               int64_t expiration_time,
185
                                               std::shared_ptr<bthread::CountdownEvent> wait,
186
                                               bool is_index, std::function<void(Status)> done_cb,
187
0
                                               int64_t tablet_id) {
188
0
    VLOG_DEBUG << "submit warm up task for file: " << path << ", file_size: " << file_size
189
0
               << ", expiration_time: " << expiration_time
190
0
               << ", is_index: " << (is_index ? "true" : "false");
191
0
    if (file_size < 0) {
192
0
        auto st = file_system->file_size(path, &file_size);
193
0
        if (!st.ok()) [[unlikely]] {
194
0
            LOG(WARNING) << "get file size failed: " << path;
195
0
            file_cache_warm_up_failed_task_num << 1;
196
0
            return;
197
0
        }
198
0
    }
199
0
    if (is_index) {
200
0
        g_file_cache_once_or_periodic_warm_up_submitted_index_num << 1;
201
0
        g_file_cache_once_or_periodic_warm_up_submitted_index_size << file_size;
202
0
    } else {
203
0
        g_file_cache_once_or_periodic_warm_up_submitted_segment_num << 1;
204
0
        g_file_cache_once_or_periodic_warm_up_submitted_segment_size << file_size;
205
0
    }
206
207
0
    const int64_t chunk_size = 10 * 1024 * 1024; // 10MB
208
0
    int64_t offset = 0;
209
0
    int64_t remaining_size = file_size;
210
211
0
    while (remaining_size > 0) {
212
0
        int64_t current_chunk_size = std::min(chunk_size, remaining_size);
213
0
        wait->add_count();
214
215
0
        _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
216
0
                .path = path,
217
0
                .file_size = file_size,
218
0
                .offset = offset,
219
0
                .download_size = current_chunk_size,
220
0
                .file_system = file_system,
221
0
                .ctx = {.expiration_time = expiration_time,
222
0
                        .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
223
0
                        .is_warmup = true},
224
0
                .download_done =
225
0
                        [=, done_cb = std::move(done_cb)](Status st) {
226
0
                            if (done_cb) done_cb(st);
227
0
                            if (!st) {
228
0
                                LOG_WARNING("Warm up error ").error(st);
229
0
                            } else if (is_index) {
230
0
                                g_file_cache_once_or_periodic_warm_up_finished_index_num
231
0
                                        << (offset == 0 ? 1 : 0);
232
0
                                g_file_cache_once_or_periodic_warm_up_finished_index_size
233
0
                                        << current_chunk_size;
234
0
                            } else {
235
0
                                g_file_cache_once_or_periodic_warm_up_finished_segment_num
236
0
                                        << (offset == 0 ? 1 : 0);
237
0
                                g_file_cache_once_or_periodic_warm_up_finished_segment_size
238
0
                                        << current_chunk_size;
239
0
                            }
240
0
                            wait->signal();
241
0
                        },
242
0
                .tablet_id = tablet_id,
243
0
        });
244
245
0
        offset += current_chunk_size;
246
0
        remaining_size -= current_chunk_size;
247
0
    }
248
0
}
249
250
1
void CloudWarmUpManager::handle_jobs() {
251
1
#ifndef BE_TEST
252
1
    constexpr int WAIT_TIME_SECONDS = 600;
253
2
    while (true) {
254
1
        std::shared_ptr<JobMeta> cur_job = nullptr;
255
1
        {
256
1
            std::unique_lock lock(_mtx);
257
2
            while (!_closed && _pending_job_metas.empty()) {
258
1
                _cond.wait(lock);
259
1
            }
260
1
            if (_closed) break;
261
1
            if (!_pending_job_metas.empty()) {
262
0
                cur_job = _pending_job_metas.front();
263
0
            }
264
1
        }
265
266
1
        if (!cur_job) {
267
0
            LOG_WARNING("Warm up job is null");
268
0
            continue;
269
0
        }
270
271
1
        std::shared_ptr<bthread::CountdownEvent> wait =
272
1
                std::make_shared<bthread::CountdownEvent>(0);
273
274
1
        for (int64_t tablet_id : cur_job->tablet_ids) {
275
0
            VLOG_DEBUG << "Warm up tablet " << tablet_id << " stack: " << get_stack_trace();
276
0
            if (_cur_job_id == 0) { // The job is canceled
277
0
                break;
278
0
            }
279
0
            auto res = _engine.tablet_mgr().get_tablet(tablet_id);
280
0
            if (!res.has_value()) {
281
0
                LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(res.error());
282
0
                continue;
283
0
            }
284
0
            auto tablet = res.value();
285
0
            auto st = tablet->sync_rowsets();
286
0
            if (!st) {
287
0
                LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(st);
288
0
                continue;
289
0
            }
290
291
0
            auto tablet_meta = tablet->tablet_meta();
292
0
            auto rs_metas = snapshot_rs_metas(tablet.get());
293
0
            for (auto& [_, rs] : rs_metas) {
294
0
                auto storage_resource = rs->remote_storage_resource();
295
0
                if (!storage_resource) {
296
0
                    LOG(WARNING) << storage_resource.error();
297
0
                    continue;
298
0
                }
299
300
0
                int64_t expiration_time = tablet_meta->ttl_seconds();
301
0
                if (!tablet->add_rowset_warmup_state(*rs, WarmUpTriggerSource::JOB)) {
302
0
                    LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id()
303
0
                              << ", skip it";
304
0
                    continue;
305
0
                }
306
0
                for (int64_t seg_id = 0; seg_id < rs->num_segments(); seg_id++) {
307
                    // 1st. download segment files
308
                    // Use rs->fs() instead of storage_resource.value()->fs to support packed
309
                    // files. PackedFileSystem wrapper in RowsetMeta::fs() handles the index_map
310
                    // lookup and reads from the correct packed file.
311
0
                    if (!config::file_cache_enable_only_warm_up_idx) {
312
0
                        submit_download_tasks(
313
0
                                storage_resource.value()->remote_segment_path(*rs, seg_id),
314
0
                                rs->segment_file_size(cast_set<int>(seg_id)), rs->fs(),
315
0
                                expiration_time, wait, false,
316
0
                                [tablet, rs, seg_id](Status st) {
317
0
                                    VLOG_DEBUG << "warmup rowset " << rs->version() << " segment "
318
0
                                               << seg_id << " completed";
319
0
                                    if (tablet->complete_rowset_segment_warmup(
320
0
                                                      WarmUpTriggerSource::JOB, rs->rowset_id(), st,
321
0
                                                      1, 0)
322
0
                                                .trigger_source == WarmUpTriggerSource::JOB) {
323
0
                                        VLOG_DEBUG << "warmup rowset " << rs->version()
324
0
                                                   << " completed";
325
0
                                    }
326
0
                                },
327
0
                                tablet_id);
328
0
                    }
329
330
                    // 2nd. download inverted index files
331
0
                    int64_t file_size = -1;
332
0
                    auto schema_ptr = rs->tablet_schema();
333
0
                    auto idx_version = schema_ptr->get_inverted_index_storage_format();
334
0
                    const auto& idx_file_info = rs->inverted_index_file_info(cast_set<int>(seg_id));
335
0
                    if (idx_version == InvertedIndexStorageFormatPB::V1) {
336
0
                        auto&& inverted_index_info =
337
0
                                rs->inverted_index_file_info(cast_set<int>(seg_id));
338
0
                        std::unordered_map<int64_t, int64_t> index_size_map;
339
0
                        for (const auto& info : inverted_index_info.index_info()) {
340
0
                            if (info.index_file_size() != -1) {
341
0
                                index_size_map[info.index_id()] = info.index_file_size();
342
0
                            } else {
343
0
                                VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
344
0
                                           << ", index_id " << info.index_id();
345
0
                            }
346
0
                        }
347
0
                        for (const auto& index : schema_ptr->inverted_indexes()) {
348
0
                            auto idx_path = storage_resource.value()->remote_idx_v1_path(
349
0
                                    *rs, seg_id, index->index_id(), index->get_index_suffix());
350
0
                            if (idx_file_info.index_info_size() > 0) {
351
0
                                for (const auto& idx_info : idx_file_info.index_info()) {
352
0
                                    if (index->index_id() == idx_info.index_id() &&
353
0
                                        index->get_index_suffix() == idx_info.index_suffix()) {
354
0
                                        file_size = idx_info.index_file_size();
355
0
                                        break;
356
0
                                    }
357
0
                                }
358
0
                            }
359
0
                            tablet->update_rowset_warmup_state_inverted_idx_num(
360
0
                                    WarmUpTriggerSource::JOB, rs->rowset_id(), 1);
361
0
                            submit_download_tasks(
362
0
                                    idx_path, file_size, rs->fs(), expiration_time, wait, true,
363
0
                                    [=](Status st) {
364
0
                                        VLOG_DEBUG << "warmup rowset " << rs->version()
365
0
                                                   << " segment " << seg_id
366
0
                                                   << "inverted idx:" << idx_path << " completed";
367
0
                                        if (tablet->complete_rowset_segment_warmup(
368
0
                                                          WarmUpTriggerSource::JOB, rs->rowset_id(),
369
0
                                                          st, 0, 1)
370
0
                                                    .trigger_source == WarmUpTriggerSource::JOB) {
371
0
                                            VLOG_DEBUG << "warmup rowset " << rs->version()
372
0
                                                       << " completed";
373
0
                                        }
374
0
                                    },
375
0
                                    tablet_id);
376
0
                        }
377
0
                    } else {
378
0
                        if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
379
0
                            auto idx_path =
380
0
                                    storage_resource.value()->remote_idx_v2_path(*rs, seg_id);
381
0
                            file_size = idx_file_info.has_index_size() ? idx_file_info.index_size()
382
0
                                                                       : -1;
383
0
                            tablet->update_rowset_warmup_state_inverted_idx_num(
384
0
                                    WarmUpTriggerSource::JOB, rs->rowset_id(), 1);
385
0
                            submit_download_tasks(
386
0
                                    idx_path, file_size, rs->fs(), expiration_time, wait, true,
387
0
                                    [=](Status st) {
388
0
                                        VLOG_DEBUG << "warmup rowset " << rs->version()
389
0
                                                   << " segment " << seg_id
390
0
                                                   << "inverted idx:" << idx_path << " completed";
391
0
                                        if (tablet->complete_rowset_segment_warmup(
392
0
                                                          WarmUpTriggerSource::JOB, rs->rowset_id(),
393
0
                                                          st, 0, 1)
394
0
                                                    .trigger_source == WarmUpTriggerSource::JOB) {
395
0
                                            VLOG_DEBUG << "warmup rowset " << rs->version()
396
0
                                                       << " completed";
397
0
                                        }
398
0
                                    },
399
0
                                    tablet_id);
400
0
                        }
401
0
                    }
402
0
                }
403
0
            }
404
0
            g_file_cache_once_or_periodic_warm_up_finished_tablet_num << 1;
405
0
        }
406
407
1
        timespec time;
408
1
        time.tv_sec = UnixSeconds() + WAIT_TIME_SECONDS;
409
1
        if (wait->timed_wait(time)) {
410
0
            LOG_WARNING("Warm up {} tablets take a long time", cur_job->tablet_ids.size());
411
0
        }
412
1
        {
413
1
            std::unique_lock lock(_mtx);
414
1
            _finish_job.push_back(cur_job);
415
            // _pending_job_metas may be cleared by a CLEAR_JOB request
416
            // so we need to check it again.
417
1
            if (!_pending_job_metas.empty()) {
418
                // We can not call pop_front before the job is finished,
419
                // because GET_CURRENT_JOB_STATE_AND_LEASE is relying on the pending job size.
420
0
                _pending_job_metas.pop_front();
421
0
            }
422
1
        }
423
1
    }
424
1
#endif
425
1
}
426
427
JobMeta::JobMeta(const TJobMeta& meta)
428
0
        : be_ip(meta.be_ip), brpc_port(meta.brpc_port), tablet_ids(meta.tablet_ids) {
429
0
    switch (meta.download_type) {
430
0
    case TDownloadType::BE:
431
0
        download_type = DownloadType::BE;
432
0
        break;
433
0
    case TDownloadType::S3:
434
0
        download_type = DownloadType::S3;
435
0
        break;
436
0
    }
437
0
}
438
439
0
Status CloudWarmUpManager::check_and_set_job_id(int64_t job_id) {
440
0
    std::lock_guard lock(_mtx);
441
0
    if (_cur_job_id == 0) {
442
0
        _cur_job_id = job_id;
443
0
    }
444
0
    Status st = Status::OK();
445
0
    if (_cur_job_id != job_id) {
446
0
        st = Status::InternalError("The job {} is running", _cur_job_id);
447
0
    }
448
0
    return st;
449
0
}
450
451
0
Status CloudWarmUpManager::check_and_set_batch_id(int64_t job_id, int64_t batch_id, bool* retry) {
452
0
    std::lock_guard lock(_mtx);
453
0
    Status st = Status::OK();
454
0
    if (_cur_job_id != 0 && _cur_job_id != job_id) {
455
0
        st = Status::InternalError("The job {} is not current job, current job is {}", job_id,
456
0
                                   _cur_job_id);
457
0
        return st;
458
0
    }
459
0
    if (_cur_job_id == 0) {
460
0
        _cur_job_id = job_id;
461
0
    }
462
0
    if (_cur_batch_id == batch_id) {
463
0
        *retry = true;
464
0
        return st;
465
0
    }
466
0
    if (_pending_job_metas.empty()) {
467
0
        _cur_batch_id = batch_id;
468
0
    } else {
469
0
        st = Status::InternalError("The batch {} is not finish", _cur_batch_id);
470
0
    }
471
0
    return st;
472
0
}
473
474
0
void CloudWarmUpManager::add_job(const std::vector<TJobMeta>& job_metas) {
475
0
    {
476
0
        std::lock_guard lock(_mtx);
477
0
        std::for_each(job_metas.begin(), job_metas.end(), [this](const TJobMeta& meta) {
478
0
            _pending_job_metas.emplace_back(std::make_shared<JobMeta>(meta));
479
0
            g_file_cache_once_or_periodic_warm_up_submitted_tablet_num << meta.tablet_ids.size();
480
0
        });
481
0
    }
482
0
    _cond.notify_all();
483
0
}
484
485
#ifdef BE_TEST
486
void CloudWarmUpManager::consumer_job() {
487
    {
488
        std::unique_lock lock(_mtx);
489
        _finish_job.push_back(_pending_job_metas.front());
490
        _pending_job_metas.pop_front();
491
    }
492
}
493
494
#endif
495
496
0
std::tuple<int64_t, int64_t, int64_t, int64_t> CloudWarmUpManager::get_current_job_state() {
497
0
    std::lock_guard lock(_mtx);
498
0
    return std::make_tuple(_cur_job_id, _cur_batch_id, _pending_job_metas.size(),
499
0
                           _finish_job.size());
500
0
}
501
502
0
Status CloudWarmUpManager::clear_job(int64_t job_id) {
503
0
    std::lock_guard lock(_mtx);
504
0
    Status st = Status::OK();
505
0
    if (job_id == _cur_job_id) {
506
0
        _cur_job_id = 0;
507
0
        _cur_batch_id = -1;
508
0
        _pending_job_metas.clear();
509
0
        _finish_job.clear();
510
0
    } else {
511
0
        st = Status::InternalError("The job {} is not current job, current job is {}", job_id,
512
0
                                   _cur_job_id);
513
0
    }
514
0
    return st;
515
0
}
516
517
Status CloudWarmUpManager::set_event(int64_t job_id, TWarmUpEventType::type event, bool clear,
518
8
                                     const std::vector<int64_t>* table_ids) {
519
8
    DBUG_EXECUTE_IF("CloudWarmUpManager.set_event.ignore_all", {
520
8
        LOG(INFO) << "Ignore set_event request, job_id=" << job_id << ", event=" << event
521
8
                  << ", clear=" << clear;
522
8
        return Status::OK();
523
8
    });
524
8
    std::lock_guard lock(_mtx);
525
8
    Status st = Status::OK();
526
8
    if (event == TWarmUpEventType::type::LOAD) {
527
7
        if (clear) {
528
1
            _tablet_replica_cache.erase(job_id);
529
1
            _event_driven_filters.erase(job_id);
530
1
            LOG(INFO) << "Clear event driven sync, job_id=" << job_id << ", event=" << event;
531
6
        } else if (!_tablet_replica_cache.contains(job_id)) {
532
5
            static_cast<void>(_tablet_replica_cache[job_id]);
533
5
            if (table_ids != nullptr) {
534
                // table-level filter: set to the given table_id set (may be empty,
535
                // meaning all matched tables were deleted — warm up nothing)
536
4
                _event_driven_filters[job_id] =
537
4
                        std::unordered_set<int64_t>(table_ids->begin(), table_ids->end());
538
4
                LOG(INFO) << "Set event driven sync with table filter, job_id=" << job_id
539
4
                          << ", event=" << event << ", table_ids_size=" << table_ids->size();
540
4
            } else {
541
                // cluster-level: no filter, warm up all tables
542
1
                _event_driven_filters[job_id] = std::nullopt;
543
1
                LOG(INFO) << "Set event driven sync, job_id=" << job_id << ", event=" << event;
544
1
            }
545
5
        } else if (table_ids != nullptr) {
546
            // Update table_ids for an existing job (may be empty)
547
1
            _event_driven_filters[job_id] =
548
1
                    std::unordered_set<int64_t>(table_ids->begin(), table_ids->end());
549
1
            LOG(INFO) << "Updated table filter for event driven sync, job_id=" << job_id
550
1
                      << ", table_ids_size=" << table_ids->size();
551
1
        }
552
7
    } else {
553
1
        st = Status::InternalError("The event {} is not supported yet", event);
554
1
    }
555
8
    return st;
556
8
}
557
558
std::vector<JobReplicaInfo> CloudWarmUpManager::get_replica_info(int64_t tablet_id,
559
                                                                 int64_t table_id,
560
                                                                 bool bypass_cache,
561
200k
                                                                 bool& cache_hit) {
562
200k
    std::vector<JobReplicaInfo> replicas;
563
200k
    std::vector<int64_t> cancelled_jobs;
564
200k
    std::lock_guard<std::mutex> lock(_mtx);
565
200k
    cache_hit = false;
566
200k
    for (auto& [job_id, cache] : _tablet_replica_cache) {
567
        // Check table-level filter: skip this job if table_id doesn't match
568
        // table_id == 0 means the caller doesn't have table context (e.g., recycle_cache),
569
        // so skip filtering
570
4
        if (table_id != 0) {
571
2
            auto filter_it = _event_driven_filters.find(job_id);
572
2
            if (filter_it != _event_driven_filters.end() && filter_it->second.has_value()) {
573
2
                if (filter_it->second->find(table_id) == filter_it->second->end()) {
574
1
                    VLOG_DEBUG << "get_replica_info: table_id=" << table_id
575
0
                               << " not in filter for job_id=" << job_id << ", skipping";
576
1
                    continue;
577
1
                }
578
2
            }
579
2
        }
580
581
3
        if (!bypass_cache) {
582
3
            auto it = cache.find(tablet_id);
583
3
            if (it != cache.end()) {
584
                // check ttl expire
585
3
                auto now = std::chrono::steady_clock::now();
586
3
                auto sec = std::chrono::duration_cast<std::chrono::seconds>(now - it->second.first);
587
3
                if (sec.count() < config::warmup_tablet_replica_info_cache_ttl_sec) {
588
3
                    replicas.push_back(JobReplicaInfo {job_id, it->second.second});
589
3
                    VLOG_DEBUG << "get_replica_info: cache hit, tablet_id=" << tablet_id
590
0
                               << ", job_id=" << job_id;
591
3
                    cache_hit = true;
592
3
                    continue;
593
3
                } else {
594
0
                    VLOG_DEBUG << "get_replica_info: cache expired, tablet_id=" << tablet_id
595
0
                               << ", job_id=" << job_id;
596
0
                    cache.erase(it);
597
0
                }
598
3
            }
599
0
            VLOG_DEBUG << "get_replica_info: cache miss, tablet_id=" << tablet_id
600
0
                       << ", job_id=" << job_id;
601
0
        }
602
603
0
        if (!cache_hit) {
604
            // We are trying to save one retry by refresh all the remaining caches
605
0
            bypass_cache = true;
606
0
        }
607
0
        ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
608
0
        if (cluster_info == nullptr) {
609
0
            LOG(WARNING) << "get_replica_info: have not get FE Master heartbeat yet, job_id="
610
0
                         << job_id;
611
0
            continue;
612
0
        }
613
0
        TNetworkAddress master_addr = cluster_info->master_fe_addr;
614
0
        if (master_addr.hostname == "" || master_addr.port == 0) {
615
0
            LOG(WARNING) << "get_replica_info: have not get FE Master heartbeat yet, job_id="
616
0
                         << job_id;
617
0
            continue;
618
0
        }
619
620
0
        TGetTabletReplicaInfosRequest request;
621
0
        TGetTabletReplicaInfosResult result;
622
0
        request.warm_up_job_id = job_id;
623
0
        request.__isset.warm_up_job_id = true;
624
0
        request.tablet_ids.emplace_back(tablet_id);
625
0
        Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
626
0
                master_addr.hostname, master_addr.port,
627
0
                [&request, &result](FrontendServiceConnection& client) {
628
0
                    client->getTabletReplicaInfos(result, request);
629
0
                });
630
631
0
        if (!rpc_st.ok()) {
632
0
            LOG(WARNING) << "get_replica_info: rpc failed error=" << rpc_st
633
0
                         << ", tablet id=" << tablet_id << ", job_id=" << job_id;
634
0
            continue;
635
0
        }
636
637
0
        auto st = Status::create<false>(result.status);
638
0
        if (!st.ok()) {
639
0
            if (st.is<ErrorCode::CANCELLED>()) {
640
0
                LOG(INFO) << "get_replica_info: warm up job cancelled, tablet_id=" << tablet_id
641
0
                          << ", job_id=" << job_id;
642
0
                cancelled_jobs.push_back(job_id);
643
0
            } else {
644
0
                LOG(WARNING) << "get_replica_info: failed status=" << st
645
0
                             << ", tablet id=" << tablet_id << ", job_id=" << job_id;
646
0
            }
647
0
            continue;
648
0
        }
649
0
        VLOG_DEBUG << "get_replica_info: got " << result.tablet_replica_infos.size()
650
0
                   << " tablets, tablet id=" << tablet_id << ", job_id=" << job_id;
651
652
0
        for (const auto& it : result.tablet_replica_infos) {
653
0
            auto tid = it.first;
654
0
            VLOG_DEBUG << "get_replica_info: got " << it.second.size()
655
0
                       << " replica_infos, tablet id=" << tid << ", job_id=" << job_id;
656
0
            for (const auto& replica : it.second) {
657
0
                cache[tid] = std::make_pair(std::chrono::steady_clock::now(), replica);
658
0
                replicas.push_back(JobReplicaInfo {job_id, replica});
659
0
                LOG(INFO) << "get_replica_info: cache add, tablet_id=" << tid
660
0
                          << ", job_id=" << job_id;
661
0
            }
662
0
        }
663
0
    }
664
200k
    for (auto job_id : cancelled_jobs) {
665
0
        LOG(INFO) << "get_replica_info: erasing cancelled job, job_id=" << job_id;
666
0
        _tablet_replica_cache.erase(job_id);
667
0
    }
668
18.4E
    VLOG_DEBUG << "get_replica_info: return " << replicas.size()
669
18.4E
               << " replicas, tablet id=" << tablet_id;
670
200k
    return replicas;
671
200k
}
672
673
void CloudWarmUpManager::warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
674
194k
                                        int64_t sync_wait_timeout_ms) {
675
194k
    if (sync_wait_timeout_ms <= 0) {
676
194k
        auto rs_meta_pb = std::make_shared<RowsetMetaPB>(rs_meta.get_rowset_pb());
677
194k
        auto st = _thread_pool_token->submit_func([this, rs_meta_pb, table_id,
678
195k
                                                   sync_wait_timeout_ms]() {
679
195k
            RowsetMeta async_rs_meta;
680
195k
            bool init_succeed = async_rs_meta.init_from_pb(*rs_meta_pb);
681
195k
            TEST_SYNC_POINT_CALLBACK("CloudWarmUpManager::warm_up_rowset.async_init_from_pb",
682
195k
                                     &init_succeed);
683
195k
            if (!init_succeed) {
684
1
                LOG(WARNING) << "Failed to init rowset meta when warming up rowset asynchronously";
685
1
                return;
686
1
            }
687
195k
            _warm_up_rowset(async_rs_meta, table_id, sync_wait_timeout_ms);
688
195k
        });
689
194k
        if (!st.ok()) {
690
0
            LOG(WARNING) << "Failed to submit warm up rowset task: " << st;
691
0
            file_cache_warm_up_failed_task_num << 1;
692
0
        }
693
194k
        return;
694
194k
    }
695
696
19
    bthread::Mutex mu;
697
19
    bthread::ConditionVariable cv;
698
19
    bool finished = false;
699
19
    std::unique_lock<bthread::Mutex> lock(mu);
700
19
    auto st = _thread_pool_token->submit_func([&, this]() {
701
1
        _warm_up_rowset(rs_meta, table_id, sync_wait_timeout_ms);
702
1
        std::unique_lock<bthread::Mutex> l(mu);
703
1
        finished = true;
704
1
        cv.notify_one();
705
1
    });
706
19
    if (!st.ok()) {
707
0
        LOG(WARNING) << "Failed to submit warm up rowset task: " << st;
708
0
        file_cache_warm_up_failed_task_num << 1;
709
19
    } else {
710
21
        while (!finished) {
711
2
            TEST_SYNC_POINT_CALLBACK("CloudWarmUpManager::warm_up_rowset.before_wait", &cv);
712
2
            cv.wait(lock);
713
2
        }
714
19
    }
715
19
}
716
717
void CloudWarmUpManager::_warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
718
193k
                                         int64_t sync_wait_timeout_ms) {
719
193k
    TEST_SYNC_POINT_CALLBACK("CloudWarmUpManager::_warm_up_rowset.enter", &rs_meta,
720
193k
                             &sync_wait_timeout_ms);
721
193k
    bool cache_hit = false;
722
193k
    auto replicas = get_replica_info(rs_meta.tablet_id(), table_id, false, cache_hit);
723
195k
    if (replicas.empty()) {
724
195k
        VLOG_DEBUG << "There is no need to warmup tablet=" << rs_meta.tablet_id()
725
35
                   << ", skipping rowset=" << rs_meta.rowset_id().to_string();
726
195k
        g_file_cache_event_driven_warm_up_skipped_rowset_num << 1;
727
195k
        return;
728
195k
    }
729
18.4E
    Status st = _do_warm_up_rowset(rs_meta, table_id, replicas, sync_wait_timeout_ms, !cache_hit);
730
18.4E
    if (cache_hit && !st.ok() && st.is<ErrorCode::TABLE_NOT_FOUND>()) {
731
0
        replicas = get_replica_info(rs_meta.tablet_id(), table_id, true, cache_hit);
732
0
        st = _do_warm_up_rowset(rs_meta, table_id, replicas, sync_wait_timeout_ms, true);
733
0
    }
734
18.4E
    if (!st.ok()) {
735
0
        LOG(WARNING) << "Failed to warm up rowset, tablet_id=" << rs_meta.tablet_id()
736
0
                     << ", rowset_id=" << rs_meta.rowset_id().to_string() << ", status=" << st;
737
0
    }
738
18.4E
}
739
740
Status CloudWarmUpManager::_build_warm_up_rowset_result(
741
        const std::vector<WarmUpRowsetFailure>& failures, size_t replica_count, int64_t tablet_id,
742
3
        int64_t table_id, const std::string& rowset_id) {
743
3
    if (failures.empty()) {
744
1
        return Status::OK();
745
1
    }
746
747
2
    int code = failures.front().code;
748
2
    std::string failure_msg;
749
6
    for (size_t i = 0; i < failures.size(); ++i) {
750
4
        if (failures[i].code == ErrorCode::TABLE_NOT_FOUND) {
751
1
            code = ErrorCode::TABLE_NOT_FOUND;
752
1
        }
753
4
        if (i > 0) {
754
2
            failure_msg.append("; ");
755
2
        }
756
4
        failure_msg.append(failures[i].reason);
757
4
    }
758
759
2
    return Status::Error(code,
760
2
                         "warm up rowset failed on {}/{} replicas, tablet_id={}, table_id={}, "
761
2
                         "rowset_id={}, failures=[{}]",
762
2
                         failures.size(), replica_count, tablet_id, table_id, rowset_id,
763
2
                         failure_msg);
764
3
}
765
766
Status CloudWarmUpManager::_do_warm_up_rowset(RowsetMeta& rs_meta, int64_t table_id,
767
                                              std::vector<JobReplicaInfo>& replicas,
768
                                              int64_t sync_wait_timeout_ms,
769
0
                                              bool skip_existence_check) {
770
0
    auto tablet_id = rs_meta.tablet_id();
771
0
    int64_t now_ts = std::chrono::duration_cast<std::chrono::microseconds>(
772
0
                             std::chrono::system_clock::now().time_since_epoch())
773
0
                             .count();
774
0
    g_file_cache_warm_up_rowset_last_call_unix_ts.set_value(now_ts);
775
0
    std::vector<WarmUpRowsetFailure> failures;
776
0
    auto add_failure = [&failures](const JobReplicaInfo& info, const std::string& target,
777
0
                                   const Status& st) {
778
0
        failures.push_back(WarmUpRowsetFailure {
779
0
                .code = st.code(),
780
0
                .reason = "job_id=" + std::to_string(info.job_id) +
781
0
                          ", backend_id=" + std::to_string(info.replica.backend_id) +
782
0
                          ", target=" + target + ", status=" + st.to_string_no_stack()});
783
0
    };
784
785
0
    for (auto& info : replicas) {
786
0
        std::string job_id_str = std::to_string(info.job_id);
787
0
        std::string target = get_host_port(info.replica.host, info.replica.brpc_port);
788
0
        int64_t trigger_ts_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
789
0
                                        std::chrono::system_clock::now().time_since_epoch())
790
0
                                        .count();
791
792
0
        PWarmUpRowsetRequest request;
793
0
        request.add_rowset_metas()->CopyFrom(rs_meta.get_rowset_pb());
794
0
        request.set_unix_ts_us(now_ts);
795
0
        request.set_sync_wait_timeout_ms(sync_wait_timeout_ms);
796
0
        request.set_skip_existence_check(skip_existence_check);
797
0
        request.set_job_id(info.job_id);
798
0
        request.set_upstream_trigger_ts_ms(trigger_ts_ms);
799
800
        // send sync request
801
0
        std::string host = info.replica.host;
802
0
        auto dns_cache = ExecEnv::GetInstance()->dns_cache();
803
0
        if (dns_cache == nullptr) {
804
0
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
805
0
        } else if (!is_valid_ip(info.replica.host)) {
806
0
            Status status = dns_cache->get(info.replica.host, &host);
807
0
            if (!status.ok()) {
808
0
                LOG(WARNING) << "failed to get ip from host " << info.replica.host << ": "
809
0
                             << status.to_string();
810
0
                add_failure(info, target, status);
811
0
                continue;
812
0
            }
813
0
        }
814
0
        std::string brpc_addr = get_host_port(host, info.replica.brpc_port);
815
0
        Status st = Status::OK();
816
0
        std::shared_ptr<PBackendService_Stub> brpc_stub =
817
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
818
0
                        brpc_addr);
819
0
        if (!brpc_stub) {
820
0
            st = Status::RpcError("Address {} is wrong", brpc_addr);
821
0
            add_failure(info, target, st);
822
0
            continue;
823
0
        }
824
825
        // update metrics
826
0
        auto schema_ptr = rs_meta.tablet_schema();
827
0
        auto idx_version = schema_ptr->get_inverted_index_storage_format();
828
0
        for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
829
0
            auto seg_size = rs_meta.segment_file_size(cast_set<int>(segment_id));
830
831
0
            g_file_cache_event_driven_warm_up_requested_segment_num << 1;
832
0
            g_warmup_ed_requested_segment_num.put({job_id_str}, 1);
833
834
0
            g_file_cache_event_driven_warm_up_requested_segment_size << seg_size;
835
0
            g_warmup_ed_requested_segment_size.put({job_id_str}, seg_size);
836
837
0
            if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
838
0
                if (idx_version == InvertedIndexStorageFormatPB::V1) {
839
0
                    auto&& inverted_index_info =
840
0
                            rs_meta.inverted_index_file_info(cast_set<int>(segment_id));
841
0
                    if (inverted_index_info.index_info().empty()) {
842
0
                        VLOG_DEBUG << "No index info available for segment " << segment_id;
843
0
                        continue;
844
0
                    }
845
0
                    for (const auto& idx_info : inverted_index_info.index_info()) {
846
0
                        g_file_cache_event_driven_warm_up_requested_index_num << 1;
847
0
                        g_warmup_ed_requested_index_num.put({job_id_str}, 1);
848
849
0
                        if (idx_info.index_file_size() != -1) {
850
0
                            g_file_cache_event_driven_warm_up_requested_index_size
851
0
                                    << idx_info.index_file_size();
852
0
                            g_warmup_ed_requested_index_size.put({job_id_str},
853
0
                                                                 idx_info.index_file_size());
854
0
                        } else {
855
0
                            VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id
856
0
                                       << ", index_id " << idx_info.index_id();
857
0
                        }
858
0
                    }
859
0
                } else { // InvertedIndexStorageFormatPB::V2
860
0
                    auto&& inverted_index_info =
861
0
                            rs_meta.inverted_index_file_info(cast_set<int>(segment_id));
862
0
                    g_file_cache_event_driven_warm_up_requested_index_num << 1;
863
0
                    g_warmup_ed_requested_index_num.put({job_id_str}, 1);
864
865
0
                    if (inverted_index_info.has_index_size()) {
866
0
                        g_file_cache_event_driven_warm_up_requested_index_size
867
0
                                << inverted_index_info.index_size();
868
0
                        g_warmup_ed_requested_index_size.put({job_id_str},
869
0
                                                             inverted_index_info.index_size());
870
0
                    } else {
871
0
                        VLOG_DEBUG << "index_size is not set for segment " << segment_id;
872
0
                    }
873
0
                }
874
0
            }
875
0
        }
876
877
        // Update last trigger timestamp
878
0
        auto* trigger_ts =
879
0
                g_warmup_ed_last_trigger_ts.get_stats(std::list<std::string> {job_id_str});
880
0
        if (trigger_ts) {
881
0
            trigger_ts->set_value(trigger_ts_ms);
882
0
        }
883
884
0
        brpc::Controller cntl;
885
0
        if (sync_wait_timeout_ms > 0) {
886
0
            cntl.set_timeout_ms(sync_wait_timeout_ms + 1000);
887
0
        }
888
0
        PWarmUpRowsetResponse response;
889
0
        MonotonicStopWatch watch;
890
0
        watch.start();
891
0
        brpc_stub->warm_up_rowset(&cntl, &request, &response, nullptr);
892
0
        if (cntl.Failed()) {
893
0
            LOG_WARNING("warm up rowset {} for tablet {} failed, rpc error: {}",
894
0
                        rs_meta.rowset_id().to_string(), tablet_id, cntl.ErrorText());
895
0
            add_failure(info, target, Status::RpcError(cntl.ErrorText()));
896
0
            continue;
897
0
        }
898
0
        if (sync_wait_timeout_ms > 0) {
899
0
            auto cost_us = watch.elapsed_time_microseconds();
900
0
            VLOG_DEBUG << "warm up rowset wait for compaction: " << cost_us << " us";
901
0
            if (cost_us / 1000 > sync_wait_timeout_ms) {
902
0
                LOG_WARNING(
903
0
                        "Warm up rowset {} for tabelt {} wait for compaction timeout, takes {} ms",
904
0
                        rs_meta.rowset_id().to_string(), tablet_id, cost_us / 1000);
905
0
            }
906
0
            g_file_cache_warm_up_rowset_wait_for_compaction_latency << cost_us;
907
0
        }
908
0
        auto status = Status::create<false>(response.status());
909
0
        if (response.has_status() && !status.ok()) {
910
0
            LOG(INFO) << "warm_up_rowset failed, tablet_id=" << rs_meta.tablet_id()
911
0
                      << ", rowset_id=" << rs_meta.rowset_id().to_string()
912
0
                      << ", target=" << info.replica.host << ", skip_existence_check"
913
0
                      << skip_existence_check << ", status=" << status;
914
0
            add_failure(info, target, status);
915
0
        }
916
0
    }
917
0
    return _build_warm_up_rowset_result(failures, replicas.size(), tablet_id, table_id,
918
0
                                        rs_meta.rowset_id().to_string());
919
0
}
920
921
void CloudWarmUpManager::recycle_cache(int64_t tablet_id,
922
5.54k
                                       const std::vector<RecycledRowsets>& rowsets) {
923
5.54k
    bthread::Mutex mu;
924
5.54k
    bthread::ConditionVariable cv;
925
5.54k
    std::unique_lock<bthread::Mutex> lock(mu);
926
5.54k
    auto st = _thread_pool_token->submit_func([&, this]() {
927
5.54k
        std::unique_lock<bthread::Mutex> l(mu);
928
5.54k
        _recycle_cache(tablet_id, rowsets);
929
5.54k
        cv.notify_one();
930
5.54k
    });
931
5.54k
    if (!st.ok()) {
932
0
        LOG(WARNING) << "Failed to submit recycle cache task, tablet_id=" << tablet_id
933
0
                     << ", error=" << st;
934
5.54k
    } else {
935
5.54k
        cv.wait(lock);
936
5.54k
    }
937
5.54k
}
938
939
void CloudWarmUpManager::_recycle_cache(int64_t tablet_id,
940
5.54k
                                        const std::vector<RecycledRowsets>& rowsets) {
941
5.54k
    LOG(INFO) << "recycle_cache: tablet_id=" << tablet_id << ", num_rowsets=" << rowsets.size();
942
5.54k
    bool cache_hit = false;
943
5.54k
    auto replicas = get_replica_info(tablet_id, /*table_id=*/0, false, cache_hit);
944
5.54k
    if (replicas.empty()) {
945
5.54k
        return;
946
5.54k
    }
947
948
0
    PRecycleCacheRequest request;
949
0
    for (const auto& rowset : rowsets) {
950
0
        RecycleCacheMeta* meta = request.add_cache_metas();
951
0
        meta->set_tablet_id(tablet_id);
952
0
        meta->set_rowset_id(rowset.rowset_id.to_string());
953
0
        meta->set_num_segments(rowset.num_segments);
954
0
        for (const auto& name : rowset.index_file_names) {
955
0
            meta->add_index_file_names(name);
956
0
        }
957
0
        g_file_cache_recycle_cache_requested_segment_num << rowset.num_segments;
958
0
        g_file_cache_recycle_cache_requested_index_num << rowset.index_file_names.size();
959
0
    }
960
0
    auto dns_cache = ExecEnv::GetInstance()->dns_cache();
961
0
    for (auto& replica : replicas) {
962
        // send sync request
963
0
        std::string host = replica.replica.host;
964
0
        if (dns_cache == nullptr) {
965
0
            LOG(WARNING) << "DNS cache is not initialized, skipping hostname resolve";
966
0
        } else if (!is_valid_ip(replica.replica.host)) {
967
0
            Status status = dns_cache->get(replica.replica.host, &host);
968
0
            if (!status.ok()) {
969
0
                LOG(WARNING) << "failed to get ip from host " << replica.replica.host << ": "
970
0
                             << status.to_string();
971
0
                return;
972
0
            }
973
0
        }
974
0
        std::string brpc_addr = get_host_port(host, replica.replica.brpc_port);
975
0
        Status st = Status::OK();
976
0
        std::shared_ptr<PBackendService_Stub> brpc_stub =
977
0
                ExecEnv::GetInstance()->brpc_internal_client_cache()->get_new_client_no_cache(
978
0
                        brpc_addr);
979
0
        if (!brpc_stub) {
980
0
            st = Status::RpcError("Address {} is wrong", brpc_addr);
981
0
            continue;
982
0
        }
983
0
        brpc::Controller cntl;
984
0
        PRecycleCacheResponse response;
985
0
        brpc_stub->recycle_cache(&cntl, &request, &response, nullptr);
986
0
    }
987
0
}
988
989
// Balance warm up cache management methods implementation
990
void CloudWarmUpManager::record_balanced_tablet(int64_t tablet_id, const std::string& host,
991
                                                int32_t brpc_port,
992
55
                                                const std::string& compute_group_id) {
993
55
    int64_t now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
994
55
                             std::chrono::system_clock::now().time_since_epoch())
995
55
                             .count();
996
997
55
    PeerCandidate candidate;
998
55
    candidate.host = host;
999
55
    candidate.brpc_port = brpc_port;
1000
55
    candidate.compute_group_id = compute_group_id;
1001
55
    candidate.last_access_time_ms = now_ms;
1002
55
    candidate.consecutive_rpc_failures = 0;
1003
1004
55
    auto& shard = get_shard(tablet_id);
1005
55
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1006
1007
55
    auto [it, inserted] = shard.tablets.try_emplace(tablet_id);
1008
55
    if (inserted) {
1009
        // Only increment the gauge counter on first insertion.
1010
38
        g_balance_tablet_be_mapping_size << 1;
1011
38
    }
1012
1013
55
    auto& cands = it->second.candidates;
1014
    // Warmup rebalance: a tablet has at most one warm-up peer (the current rebalance source).
1015
    // Upsert: replace existing same-CG entry if present, otherwise prepend.
1016
55
    auto same_cg_it = std::find_if(cands.begin(), cands.end(), [&](const PeerCandidate& c) {
1017
19
        return c.compute_group_id == compute_group_id;
1018
19
    });
1019
1020
55
    if (same_cg_it != cands.end()) {
1021
        // Update in-place, preserve position (already at or near front from prior insert).
1022
2
        same_cg_it->host = std::move(candidate.host);
1023
2
        same_cg_it->brpc_port = candidate.brpc_port;
1024
2
        same_cg_it->last_access_time_ms = candidate.last_access_time_ms;
1025
2
        same_cg_it->consecutive_rpc_failures = 0;
1026
53
    } else {
1027
        // New CG entry: insert at front (warmup has highest priority).
1028
53
        cands.insert(cands.begin(), std::move(candidate));
1029
53
    }
1030
1031
55
    VLOG_DEBUG << "Recorded balanced warm up cache tablet: tablet_id=" << tablet_id
1032
0
               << ", host=" << host << ":" << brpc_port
1033
0
               << ", compute_group_id=" << compute_group_id;
1034
55
}
1035
1036
3
void CloudWarmUpManager::remove_balanced_tablet(int64_t tablet_id) {
1037
3
    auto& shard = get_shard(tablet_id);
1038
3
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1039
3
    auto it = shard.tablets.find(tablet_id);
1040
3
    if (it != shard.tablets.end()) {
1041
2
        shard.tablets.erase(it);
1042
2
        g_balance_tablet_be_mapping_size << -1;
1043
2
        VLOG_DEBUG << "Removed balanced warm up cache tablet by timer, tablet_id=" << tablet_id;
1044
2
    }
1045
3
}
1046
1047
0
void CloudWarmUpManager::remove_balanced_tablets(const std::vector<int64_t>& tablet_ids) {
1048
    // Group tablet_ids by shard to minimize lock contention
1049
0
    std::array<std::vector<int64_t>, SHARD_COUNT> shard_groups;
1050
0
    for (int64_t tablet_id : tablet_ids) {
1051
0
        shard_groups[get_shard_index(tablet_id)].push_back(tablet_id);
1052
0
    }
1053
1054
    // Process each shard
1055
0
    for (size_t i = 0; i < SHARD_COUNT; ++i) {
1056
0
        if (shard_groups[i].empty()) continue;
1057
1058
0
        auto& shard = _balanced_tablets_shards[i];
1059
0
        std::unique_lock<bthread::Mutex> lock(shard.mtx);
1060
0
        for (int64_t tablet_id : shard_groups[i]) {
1061
0
            auto it = shard.tablets.find(tablet_id);
1062
0
            if (it != shard.tablets.end()) {
1063
0
                shard.tablets.erase(it);
1064
0
                g_balance_tablet_be_mapping_size << -1;
1065
0
                VLOG_DEBUG << "Removed balanced warm up cache tablet: tablet_id=" << tablet_id;
1066
0
            }
1067
0
        }
1068
0
    }
1069
0
}
1070
1071
// Cleanup loop: runs on a dedicated pthread, wakes up periodically to evict
1072
// expired peer candidates and empty tablet entries.
1073
57
void CloudWarmUpManager::run_cleanup_loop() {
1074
59
    while (true) {
1075
58
        {
1076
58
            std::unique_lock<std::mutex> lock(_cleanup_mtx);
1077
58
            _cleanup_cond.wait_for(lock,
1078
58
                                   std::chrono::seconds(config::peer_candidate_cleanup_interval_s),
1079
80
                                   [this]() { return _closed; });
1080
58
            if (_closed) break;
1081
58
        }
1082
1083
2
        int64_t now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1084
2
                                 std::chrono::system_clock::now().time_since_epoch())
1085
2
                                 .count();
1086
2
        int64_t expiry_ms = config::peer_candidate_expiry_s * 1000LL;
1087
1088
10.2k
        for (auto& shard : _balanced_tablets_shards) {
1089
10.2k
            std::unique_lock<bthread::Mutex> lock(shard.mtx);
1090
10.2k
            auto tablet_it = shard.tablets.begin();
1091
10.2k
            while (tablet_it != shard.tablets.end()) {
1092
0
                auto& tpc = tablet_it->second;
1093
                // Remove expired candidates
1094
0
                auto& cands = tpc.candidates;
1095
0
                size_t cands_before = cands.size();
1096
0
                cands.erase(std::remove_if(cands.begin(), cands.end(),
1097
0
                                           [&](const PeerCandidate& c) {
1098
0
                                               return (now_ms - c.last_access_time_ms) >= expiry_ms;
1099
0
                                           }),
1100
0
                            cands.end());
1101
0
                size_t removed = cands_before - cands.size();
1102
0
                if (removed > 0) {
1103
0
                    g_peer_candidate_expiry_eviction << removed;
1104
0
                }
1105
                // Remove the tablet entry if no candidates remain
1106
0
                if (cands.empty()) {
1107
0
                    tablet_it = shard.tablets.erase(tablet_it);
1108
0
                    g_balance_tablet_be_mapping_size << -1;
1109
0
                } else {
1110
0
                    ++tablet_it;
1111
0
                }
1112
0
            }
1113
10.2k
        }
1114
2
    }
1115
57
}
1116
1117
// fetch_candidates_from_fe: lazy fetch path — appends candidates to the end
1118
// (lower priority than warmup-inserted ones).  Uses singleflight to avoid
1119
// duplicate concurrent RPCs for the same tablet.
1120
1
void CloudWarmUpManager::fetch_candidates_from_fe(int64_t tablet_id) {
1121
    // --- singleflight check ---
1122
1
    {
1123
1
        auto& shard = get_shard(tablet_id);
1124
1
        std::unique_lock<bthread::Mutex> lock(shard.mtx);
1125
1
        auto it = shard.tablets.find(tablet_id);
1126
1
        if (it != shard.tablets.end() && it->second.fetching_from_fe) {
1127
0
            return; // another fetch is already in flight
1128
0
        }
1129
        // Increment gauge when we create a genuinely new tablet entry
1130
1
        if (it == shard.tablets.end()) {
1131
1
            g_balance_tablet_be_mapping_size << 1;
1132
1
        }
1133
        // Mark as fetching (creates entry if not present).
1134
1
        shard.tablets[tablet_id].fetching_from_fe = true;
1135
1
    }
1136
1137
    // Use Defer to absolutely guarantee we reset the fetching flag on return
1138
1
    Defer defer_fetching_reset {[this, tablet_id]() {
1139
1
        auto& shard = get_shard(tablet_id);
1140
1
        std::unique_lock<bthread::Mutex> lock(shard.mtx);
1141
1
        auto it = shard.tablets.find(tablet_id);
1142
1
        if (it != shard.tablets.end()) {
1143
1
            it->second.fetching_from_fe = false;
1144
1
        }
1145
1
    }};
1146
1147
    // --- RPC to FE (without warm_up_job_id) ---
1148
1
    ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info();
1149
1
    if (cluster_info == nullptr) {
1150
0
        LOG(WARNING) << "fetch_candidates_from_fe: have not got FE Master heartbeat yet"
1151
0
                     << ", tablet_id=" << tablet_id;
1152
0
        return;
1153
0
    }
1154
1
    TNetworkAddress master_addr = cluster_info->master_fe_addr;
1155
1
    if (master_addr.hostname.empty() || master_addr.port == 0) {
1156
1
        LOG(WARNING) << "fetch_candidates_from_fe: FE master address unknown"
1157
1
                     << ", tablet_id=" << tablet_id;
1158
1
        return;
1159
1
    }
1160
1161
0
    TGetTabletReplicaInfosRequest request;
1162
0
    TGetTabletReplicaInfosResult result;
1163
    // No warm_up_job_id — lazy fetch path
1164
0
    request.tablet_ids.emplace_back(tablet_id);
1165
1166
0
    g_peer_lazy_fetch_total << 1;
1167
0
    const auto rpc_start = std::chrono::steady_clock::now();
1168
0
    Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>(
1169
0
            master_addr.hostname, master_addr.port,
1170
0
            [&request, &result](FrontendServiceConnection& client) {
1171
0
                client->getTabletReplicaInfos(result, request);
1172
0
            });
1173
0
    g_peer_lazy_fetch_latency << std::chrono::duration_cast<std::chrono::microseconds>(
1174
0
                                         std::chrono::steady_clock::now() - rpc_start)
1175
0
                                         .count();
1176
1177
0
    if (!rpc_st.ok()) {
1178
0
        LOG(WARNING) << "fetch_candidates_from_fe: rpc failed, tablet_id=" << tablet_id
1179
0
                     << ", error=" << rpc_st;
1180
0
        g_peer_lazy_fetch_failed << 1;
1181
0
        return;
1182
0
    }
1183
1184
0
    auto st = Status::create<false>(result.status);
1185
0
    if (!st.ok()) {
1186
0
        LOG(WARNING) << "fetch_candidates_from_fe: FE returned error, tablet_id=" << tablet_id
1187
0
                     << ", status=" << st;
1188
0
        g_peer_lazy_fetch_failed << 1;
1189
0
        return;
1190
0
    }
1191
1192
0
    int64_t now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1193
0
                             std::chrono::system_clock::now().time_since_epoch())
1194
0
                             .count();
1195
1196
    // Parse the results OUTSIDE the lock
1197
0
    std::vector<PeerCandidate> new_candidates;
1198
0
    const std::string& self_host = BackendOptions::get_localhost();
1199
0
    const int32_t self_brpc_port = config::brpc_port;
1200
1201
0
    auto it_res = result.tablet_replica_infos.find(tablet_id);
1202
0
    if (it_res != result.tablet_replica_infos.end()) {
1203
0
        const auto& replicas = it_res->second;
1204
        // Pre-allocate memory since we know the upper bound of candidates
1205
0
        new_candidates.reserve(replicas.size());
1206
1207
0
        for (const auto& replica : replicas) {
1208
            // Skip self: a BE must not peer-read from its own file cache
1209
0
            if (replica.host == self_host && replica.brpc_port == self_brpc_port) {
1210
0
                VLOG_DEBUG << "fetch_candidates_from_fe: skipping self candidate " << replica.host
1211
0
                           << ":" << replica.brpc_port << " for tablet_id=" << tablet_id;
1212
0
                continue;
1213
0
            }
1214
1215
0
            PeerCandidate& candidate = new_candidates.emplace_back();
1216
0
            candidate.host = replica.host;
1217
0
            candidate.brpc_port = replica.brpc_port;
1218
0
            if (replica.__isset.cloud_compute_group_id) {
1219
0
                candidate.compute_group_id = replica.cloud_compute_group_id;
1220
0
            }
1221
0
            candidate.last_access_time_ms = now_ms;
1222
0
            candidate.consecutive_rpc_failures = 0;
1223
0
        }
1224
0
    }
1225
1226
0
    g_peer_lazy_fetch_success << 1;
1227
1228
    // --- Merge results back into shard ---
1229
    // Acquire lock only to append to the candidates vector
1230
0
    {
1231
0
        auto& shard = get_shard(tablet_id);
1232
0
        std::unique_lock<bthread::Mutex> lock(shard.mtx);
1233
0
        auto it = shard.tablets.find(tablet_id);
1234
        // Safely check if tablet is still there
1235
0
        if (it != shard.tablets.end()) {
1236
0
            auto& tpc = it->second;
1237
0
            tpc.candidates.insert(tpc.candidates.end(),
1238
0
                                  std::make_move_iterator(new_candidates.begin()),
1239
0
                                  std::make_move_iterator(new_candidates.end()));
1240
0
            LOG(INFO) << "fetch_candidates_from_fe: tablet_id=" << tablet_id << " got "
1241
0
                      << tpc.candidates.size() << " total candidates from FE";
1242
0
            VLOG_DEBUG << "fetch_candidates_from_fe: added " << new_candidates.size()
1243
0
                       << " candidates for tablet_id=" << tablet_id;
1244
0
        }
1245
0
    }
1246
0
}
1247
1248
60
std::vector<PeerCandidate> CloudWarmUpManager::get_peer_candidates(int64_t tablet_id) {
1249
60
    auto& shard = get_shard(tablet_id);
1250
60
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1251
60
    auto it = shard.tablets.find(tablet_id);
1252
60
    if (it == shard.tablets.end()) {
1253
5
        g_peer_candidate_cache_miss << 1;
1254
5
        return {};
1255
5
    }
1256
    // Update last_access_time_ms for all candidates to keep them alive
1257
55
    int64_t now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1258
55
                             std::chrono::system_clock::now().time_since_epoch())
1259
55
                             .count();
1260
85
    for (auto& c : it->second.candidates) {
1261
85
        c.last_access_time_ms = now_ms;
1262
85
    }
1263
55
    auto& tpc = it->second;
1264
    // Cooldown check: if this tablet is in cooldown, return empty to skip peer.
1265
55
    if (tpc.cooldown_until_ms > 0 && now_ms < tpc.cooldown_until_ms) {
1266
2
        g_peer_tablet_cooldown_skipped << 1;
1267
2
        return {};
1268
2
    }
1269
    // Cooldown expired — reset for next cycle.
1270
53
    if (tpc.cooldown_until_ms > 0) {
1271
1
        tpc.cooldown_until_ms = 0;
1272
1
        tpc.consecutive_all_miss = 0;
1273
1
    }
1274
53
    auto result = tpc.candidates;
1275
53
    if (result.empty()) {
1276
0
        g_peer_candidate_cache_miss << 1;
1277
53
    } else {
1278
53
        g_peer_candidate_cache_hit << 1;
1279
        // Apply compute group affinity: if a previous read succeeded from a particular
1280
        // compute group, move its candidates to the front so the next read tries it first.
1281
        // stable_partition preserves relative order within each group.
1282
        //
1283
        // Example:
1284
        // Candidates: [A(CG1), B(CG2), C(CG1), D(CG3)]
1285
        // pref = "CG1"
1286
        // After stable_partition: [A(CG1), C(CG1), B(CG2), D(CG3)]
1287
        // (A remains before C, and B remains before D)
1288
53
        if (!tpc.last_successful_compute_group_id.empty()) {
1289
11
            const std::string& pref = tpc.last_successful_compute_group_id;
1290
18
            std::stable_partition(result.begin(), result.end(), [&pref](const PeerCandidate& c) {
1291
18
                return c.compute_group_id == pref;
1292
18
            });
1293
11
        }
1294
53
    }
1295
53
    return result;
1296
55
}
1297
1298
void CloudWarmUpManager::update_peer_candidate_on_success(int64_t tablet_id,
1299
17
                                                          const std::string& compute_group_id) {
1300
17
    auto& shard = get_shard(tablet_id);
1301
17
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1302
17
    auto it = shard.tablets.find(tablet_id);
1303
17
    if (it == shard.tablets.end()) {
1304
1
        return;
1305
1
    }
1306
16
    it->second.last_successful_compute_group_id = compute_group_id;
1307
16
    it->second.consecutive_all_miss = 0;
1308
16
    it->second.cooldown_until_ms = 0;
1309
16
}
1310
1311
void CloudWarmUpManager::update_peer_candidate_on_rpc_failure(int64_t tablet_id,
1312
                                                              const std::string& host,
1313
12
                                                              int32_t brpc_port) {
1314
12
    auto& shard = get_shard(tablet_id);
1315
12
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1316
12
    auto it = shard.tablets.find(tablet_id);
1317
12
    if (it == shard.tablets.end()) {
1318
1
        return;
1319
1
    }
1320
11
    auto& cands = it->second.candidates;
1321
12
    for (auto cit = cands.begin(); cit != cands.end(); ++cit) {
1322
11
        if (cit->host == host && cit->brpc_port == brpc_port) {
1323
10
            ++cit->consecutive_rpc_failures;
1324
10
            if (cit->consecutive_rpc_failures >= config::peer_rpc_failure_eviction_threshold) {
1325
2
                LOG(INFO) << "Evicting peer candidate due to consecutive RPC failures"
1326
2
                          << ", tablet_id=" << tablet_id << ", host=" << host << ":" << brpc_port
1327
2
                          << ", failures=" << cit->consecutive_rpc_failures;
1328
2
                g_peer_rpc_failure_eviction << 1;
1329
2
                cands.erase(cit);
1330
                // If all candidates have been evicted, remove the tablet entry
1331
                // entirely so that the gauge stays accurate.
1332
2
                if (cands.empty()) {
1333
1
                    shard.tablets.erase(it);
1334
1
                    g_balance_tablet_be_mapping_size << -1;
1335
1
                }
1336
2
            }
1337
10
            break;
1338
10
        }
1339
11
    }
1340
11
}
1341
1342
void CloudWarmUpManager::rotate_peer_candidate_on_cache_miss(int64_t tablet_id,
1343
                                                             const std::string& host,
1344
15
                                                             int32_t brpc_port) {
1345
15
    auto& shard = get_shard(tablet_id);
1346
15
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1347
15
    auto it = shard.tablets.find(tablet_id);
1348
15
    if (it == shard.tablets.end()) {
1349
1
        return;
1350
1
    }
1351
14
    auto& cands = it->second.candidates;
1352
16
    auto cit = std::find_if(cands.begin(), cands.end(), [&](const PeerCandidate& c) {
1353
16
        return c.host == host && c.brpc_port == brpc_port;
1354
16
    });
1355
14
    if (cit != cands.end() && std::next(cit) != cands.end()) {
1356
        // Move this candidate to the end so the next read tries a different one.
1357
        // This ensures that if the first N candidates are all cache-miss, the system
1358
        // gradually converges to whichever compute group actually has the data.
1359
        //
1360
        // Example:
1361
        // cands: [B, C, D],  cit points to B (front, cache miss)
1362
        // std::rotate(B, C, end) → [C, D, B]
1363
        // Next read tries C first instead of B.
1364
        //
1365
        // Also clear affinity if the rotated candidate belongs to the currently preferred
1366
        // compute group.  Without this, get_peer_candidates() would stable_partition that
1367
        // CG back to the front on the very next call — completely undoing the rotate.
1368
10
        if (it->second.last_successful_compute_group_id == cit->compute_group_id) {
1369
1
            it->second.last_successful_compute_group_id.clear();
1370
1
        }
1371
10
        std::rotate(cit, std::next(cit), cands.end());
1372
10
    }
1373
    // Always count the metric when the candidate is found, even if it is the
1374
    // last (or only) element where rotation is a no-op.
1375
14
    if (cit != cands.end()) {
1376
13
        g_peer_candidate_rotate << 1;
1377
13
    }
1378
14
}
1379
1380
10
bool CloudWarmUpManager::is_peer_cooldown(int64_t tablet_id) const {
1381
10
    const auto& shard = get_shard(tablet_id);
1382
10
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1383
10
    auto it = shard.tablets.find(tablet_id);
1384
10
    if (it == shard.tablets.end()) {
1385
2
        return false;
1386
2
    }
1387
8
    if (it->second.cooldown_until_ms <= 0) {
1388
3
        return false;
1389
3
    }
1390
5
    int64_t now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1391
5
                             std::chrono::system_clock::now().time_since_epoch())
1392
5
                             .count();
1393
5
    return now_ms < it->second.cooldown_until_ms;
1394
8
}
1395
1396
19
void CloudWarmUpManager::record_peer_all_miss(int64_t tablet_id) {
1397
19
    auto& shard = get_shard(tablet_id);
1398
19
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1399
19
    auto it = shard.tablets.find(tablet_id);
1400
19
    if (it == shard.tablets.end()) {
1401
2
        return;
1402
2
    }
1403
17
    auto& tpc = it->second;
1404
17
    tpc.consecutive_all_miss++;
1405
17
    if (tpc.consecutive_all_miss >= config::peer_all_miss_cooldown_threshold) {
1406
4
        int64_t now_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
1407
4
                                 std::chrono::system_clock::now().time_since_epoch())
1408
4
                                 .count();
1409
4
        tpc.cooldown_until_ms = now_ms + config::peer_all_miss_cooldown_duration_s * 1000;
1410
4
        g_peer_tablet_cooldown_entered << 1;
1411
4
        LOG(INFO) << "Peer read cooldown entered for tablet_id=" << tablet_id << " after "
1412
4
                  << tpc.consecutive_all_miss << " consecutive all-miss races"
1413
4
                  << ", cooldown_duration_s=" << config::peer_all_miss_cooldown_duration_s;
1414
4
    }
1415
17
}
1416
1417
std::optional<TabletPeerCandidates> CloudWarmUpManager::get_tablet_peer_info(
1418
0
        int64_t tablet_id) const {
1419
0
    const auto& shard = get_shard(tablet_id);
1420
0
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1421
0
    auto it = shard.tablets.find(tablet_id);
1422
0
    if (it == shard.tablets.end()) {
1423
0
        return std::nullopt;
1424
0
    }
1425
0
    return it->second; // copy under lock
1426
0
}
1427
1428
std::vector<std::pair<int64_t, TabletPeerCandidates>> CloudWarmUpManager::get_all_peer_info(
1429
0
        int64_t limit) const {
1430
0
    std::vector<std::pair<int64_t, TabletPeerCandidates>> result;
1431
0
    for (size_t i = 0; i < SHARD_COUNT; ++i) {
1432
0
        const auto& shard = _balanced_tablets_shards[i];
1433
0
        std::unique_lock<bthread::Mutex> lock(shard.mtx);
1434
0
        for (const auto& [tid, tpc] : shard.tablets) {
1435
0
            result.emplace_back(tid, tpc);
1436
0
            if (limit > 0 && static_cast<int64_t>(result.size()) >= limit) {
1437
0
                return result;
1438
0
            }
1439
0
        }
1440
0
    }
1441
0
    return result;
1442
0
}
1443
1444
void CloudWarmUpManager::set_tablet_peer_candidates(int64_t tablet_id,
1445
1
                                                    TabletPeerCandidates candidates) {
1446
1
    auto& shard = get_shard(tablet_id);
1447
1
    std::unique_lock<bthread::Mutex> lock(shard.mtx);
1448
1
    auto [it, inserted] = shard.tablets.insert_or_assign(tablet_id, std::move(candidates));
1449
1
    if (inserted) {
1450
1
        g_balance_tablet_be_mapping_size << 1;
1451
1
    }
1452
1
}
1453
1454
} // namespace doris