Coverage Report

Created: 2026-07-02 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_internal_service.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_internal_service.h"
19
20
#include <brpc/controller.h>
21
#include <bthread/countdown_event.h>
22
#include <butil/iobuf.h>
23
#include <fmt/format.h>
24
25
#include <algorithm>
26
#include <chrono>
27
#include <limits>
28
#include <list>
29
#include <memory>
30
#include <optional>
31
#include <thread>
32
#include <unordered_map>
33
#include <vector>
34
35
#include "cloud/cloud_storage_engine.h"
36
#include "cloud/cloud_tablet.h"
37
#include "cloud/cloud_tablet_mgr.h"
38
#include "cloud/cloud_warm_up_manager.h"
39
#include "cloud/cloud_warmup_metrics.h"
40
#include "cloud/config.h"
41
#include "io/cache/block_file_cache.h"
42
#include "io/cache/block_file_cache_downloader.h"
43
#include "io/cache/block_file_cache_factory.h"
44
#include "io/fs/path.h"
45
#include "runtime/thread_context.h"
46
#include "runtime/workload_management/io_throttle.h"
47
#include "storage/storage_policy.h"
48
#include "util/async_io.h"
49
#include "util/bvar_windowed_adder.h"
50
#include "util/debug_points.h"
51
52
namespace doris {
53
#include "common/compile_check_avoid_begin.h"
54
55
bvar::Adder<uint64_t> g_file_cache_get_by_peer_num("file_cache_get_by_peer_num");
56
bvar::Adder<uint64_t> g_file_cache_get_by_peer_blocks_num("file_cache_get_by_peer_blocks_num");
57
bvar::Adder<uint64_t> g_file_cache_get_by_peer_success_num("file_cache_get_by_peer_success_num");
58
bvar::Adder<uint64_t> g_file_cache_get_by_peer_failed_num("file_cache_get_by_peer_failed_num");
59
bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency(
60
        "file_cache_get_by_peer_server_latency");
61
bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency(
62
        "file_cache_get_by_peer_read_cache_file_latency");
63
bvar::Adder<uint64_t> g_file_cache_get_by_peer_offer_failed_num(
64
        "file_cache_get_by_peer_offer_failed_num");
65
bvar::Adder<uint64_t> g_file_cache_get_by_peer_queue_timeout_num(
66
        "file_cache_get_by_peer_queue_timeout_num");
67
bvar::LatencyRecorder g_file_cache_get_by_peer_queue_wait_latency(
68
        "file_cache_get_by_peer_queue_wait_latency");
69
bvar::LatencyRecorder g_file_cache_get_by_peer_handle_cache_block_req_latency(
70
        "file_cache_get_by_peer_handle_cache_block_req_latency");
71
bvar::LatencyRecorder g_file_cache_get_by_peer_get_cache_latency(
72
        "file_cache_get_by_peer_get_cache_latency");
73
bvar::LatencyRecorder g_file_cache_get_by_peer_get_or_set_latency(
74
        "file_cache_get_by_peer_get_or_set_latency");
75
bvar::Adder<uint64_t> g_file_cache_get_by_peer_get_or_set_calls(
76
        "file_cache_get_by_peer_get_or_set_calls");
77
bvar::Adder<uint64_t> g_file_cache_get_by_peer_get_or_set_blocks_total(
78
        "file_cache_get_by_peer_get_or_set_blocks_total");
79
bvar::Adder<uint64_t> g_file_cache_get_by_peer_request_blocks_total(
80
        "file_cache_get_by_peer_request_blocks_total");
81
bvar::LatencyRecorder g_file_cache_get_by_peer_request_blocks_per_rpc(
82
        "file_cache_get_by_peer_request_blocks_per_rpc");
83
bvar::Adder<uint64_t> g_file_cache_get_by_peer_response_blocks_total(
84
        "file_cache_get_by_peer_response_blocks_total");
85
bvar::Adder<uint64_t> g_file_cache_get_by_peer_response_bytes_total(
86
        "file_cache_get_by_peer_response_bytes_total");
87
bvar::Adder<uint64_t> g_file_cache_get_by_peer_not_downloaded_block_num(
88
        "file_cache_get_by_peer_not_downloaded_block_num");
89
bvar::LatencyRecorder g_file_cache_get_by_peer_read_file_block_total_latency(
90
        "file_cache_get_by_peer_read_file_block_total_latency");
91
bvar::LatencyRecorder g_file_cache_get_by_peer_set_response_data_latency(
92
        "file_cache_get_by_peer_set_response_data_latency");
93
bvar::Adder<uint64_t> g_file_cache_get_by_peer_attachment_response_num(
94
        "file_cache_get_by_peer_attachment_response_num");
95
bvar::Adder<uint64_t> g_file_cache_get_by_peer_pb_response_num(
96
        "file_cache_get_by_peer_pb_response_num");
97
98
bvar::Adder<int64_t> g_peer_server_fill_requested("peer_server_fill_requested");
99
bvar::Adder<int64_t> g_peer_server_fill_success("peer_server_fill_success");
100
bvar::Adder<int64_t> g_peer_server_fill_timeout("peer_server_fill_timeout");
101
bvar::Adder<int64_t> g_peer_server_fill_rejected("peer_server_fill_rejected");
102
bvar::LatencyRecorder g_peer_server_fill_latency("peer_server_fill_latency");
103
bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency(
104
        "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency");
105
106
// Concurrency guard for server-side S3 pull-through fills.
107
static std::atomic<int32_t> g_active_server_fills {0};
108
bvar::PassiveStatus<int32_t> g_peer_active_fills(
109
        "peer_active_fills",
110
9.57k
        [](void*) { return g_active_server_fills.load(std::memory_order_relaxed); }, nullptr);
111
112
CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env)
113
1
        : PInternalService(exec_env), _engine(engine) {}
114
115
0
CloudInternalServiceImpl::~CloudInternalServiceImpl() = default;
116
117
void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller,
118
                                                const doris::PAlterVaultSyncRequest* request,
119
                                                PAlterVaultSyncResponse* response,
120
0
                                                google::protobuf::Closure* done) {
121
0
    LOG(INFO) << "alter be to sync vault info from Meta Service";
122
    // If the vaults containing hdfs vault then it would try to create hdfs connection using jni
123
    // which would acuiqre one thread local jniEnv. But bthread context can't guarantee that the brpc
124
    // worker thread wouldn't do bthread switch between worker threads.
125
0
    bool ret = _heavy_work_pool.try_offer([this, done]() {
126
0
        brpc::ClosureGuard closure_guard(done);
127
0
        _engine.sync_storage_vault();
128
0
    });
129
0
    if (!ret) {
130
0
        brpc::ClosureGuard closure_guard(done);
131
0
        LOG(WARNING) << "fail to offer alter_vault_sync request to the work pool, pool="
132
0
                     << _heavy_work_pool.get_info();
133
0
    }
134
0
}
135
136
0
FileCacheType cache_type_to_pb(io::FileCacheType type) {
137
0
    switch (type) {
138
0
    case io::FileCacheType::TTL:
139
0
        return FileCacheType::TTL;
140
0
    case io::FileCacheType::INDEX:
141
0
        return FileCacheType::INDEX;
142
0
    case io::FileCacheType::NORMAL:
143
0
        return FileCacheType::NORMAL;
144
0
    default:
145
0
        DCHECK(false);
146
0
    }
147
0
    return FileCacheType::NORMAL;
148
0
}
149
150
0
static int64_t current_unix_time_us() {
151
0
    return std::chrono::duration_cast<std::chrono::microseconds>(
152
0
                   std::chrono::system_clock::now().time_since_epoch())
153
0
            .count();
154
0
}
155
156
static std::optional<int64_t> warm_up_rowset_cross_host_latency_us(int64_t start_unix_ts_us,
157
0
                                                                   int64_t end_unix_ts_us) {
158
    // The start timestamp is generated by the caller BE. Mixed-version callers may omit it, and
159
    // system clocks across BEs are not guaranteed to be ordered.
160
0
    if (start_unix_ts_us <= 0 || end_unix_ts_us < start_unix_ts_us) {
161
0
        return std::nullopt;
162
0
    }
163
0
    return end_unix_ts_us - start_unix_ts_us;
164
0
}
165
166
static void add_file_cache_block_meta_to_response(
167
        PGetFileCacheMetaResponse* resp, int64_t tablet_id, const std::string& rowset_id,
168
        int32_t segment_id, const std::string& file_name,
169
        const std::tuple<int64_t, int64_t, io::FileCacheType, int64_t>& tuple,
170
0
        const RowsetSharedPtr& rowset, bool is_index) {
171
0
    FileCacheBlockMeta* meta = resp->add_file_cache_block_metas();
172
0
    meta->set_tablet_id(tablet_id);
173
0
    meta->set_rowset_id(rowset_id);
174
0
    meta->set_segment_id(segment_id);
175
0
    meta->set_file_name(file_name);
176
177
0
    if (!is_index) {
178
        // .dat
179
0
        meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id));
180
0
        meta->set_file_type(doris::FileType::SEGMENT_FILE);
181
0
    } else {
182
        // .idx
183
0
        const auto& idx_file_info = rowset->rowset_meta()->inverted_index_file_info(segment_id);
184
0
        meta->set_file_size(idx_file_info.has_index_size() ? idx_file_info.index_size() : -1);
185
0
        meta->set_file_type(doris::FileType::INVERTED_INDEX_FILE);
186
0
    }
187
188
0
    meta->set_offset(std::get<0>(tuple));
189
0
    meta->set_size(std::get<1>(tuple));
190
0
    meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple)));
191
0
    meta->set_expiration_time(std::get<3>(tuple));
192
0
}
193
194
static void process_segment_file_cache_meta(PGetFileCacheMetaResponse* resp,
195
                                            const RowsetSharedPtr& rowset, int64_t tablet_id,
196
                                            const std::string& rowset_id, int32_t segment_id,
197
0
                                            bool is_index) {
198
0
    const char* extension = is_index ? ".idx" : ".dat";
199
0
    std::string file_name = fmt::format("{}_{}{}", rowset_id, segment_id, extension);
200
0
    auto cache_key = io::BlockFileCache::hash(file_name);
201
0
    auto* cache = io::FileCacheFactory::instance()->get_by_path(cache_key);
202
0
    if (!cache) return;
203
0
    auto segments_meta = cache->get_hot_blocks_meta(cache_key);
204
0
    for (const auto& tuple : segments_meta) {
205
0
        add_file_cache_block_meta_to_response(resp, tablet_id, rowset_id, segment_id, file_name,
206
0
                                              tuple, rowset, is_index);
207
0
    }
208
0
}
209
210
void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id(
211
        google::protobuf::RpcController* controller [[maybe_unused]],
212
        const PGetFileCacheMetaRequest* request, PGetFileCacheMetaResponse* response,
213
0
        google::protobuf::Closure* done) {
214
0
    brpc::ClosureGuard closure_guard(done);
215
0
    if (!config::enable_file_cache) {
216
0
        LOG_WARNING("try to access tablet file cache meta, but file cache not enabled");
217
0
        return;
218
0
    }
219
0
    auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
220
0
                            std::chrono::steady_clock::now().time_since_epoch())
221
0
                            .count();
222
0
    std::ostringstream tablet_ids_stream;
223
0
    int count = 0;
224
0
    for (const auto& tablet_id : request->tablet_ids()) {
225
0
        tablet_ids_stream << tablet_id << ", ";
226
0
        count++;
227
0
        if (count >= 10) {
228
0
            break;
229
0
        }
230
0
    }
231
0
    LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size()
232
0
              << ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]";
233
0
    for (const auto& tablet_id : request->tablet_ids()) {
234
0
        auto res = _engine.tablet_mgr().get_tablet(tablet_id);
235
0
        if (!res.has_value()) {
236
0
            LOG(ERROR) << "failed to get tablet: " << tablet_id
237
0
                       << " err msg: " << res.error().msg();
238
0
            continue;
239
0
        }
240
0
        CloudTabletSPtr tablet = std::move(res.value());
241
0
        auto st = tablet->sync_rowsets();
242
0
        if (!st) {
243
            // just log failed, try it best
244
0
            LOG(WARNING) << "failed to sync rowsets: " << tablet_id
245
0
                         << " err msg: " << st.to_string();
246
0
        }
247
0
        auto rowsets = tablet->get_snapshot_rowset();
248
249
0
        for (const RowsetSharedPtr& rowset : rowsets) {
250
0
            std::string rowset_id = rowset->rowset_id().to_string();
251
0
            for (int32_t segment_id = 0; segment_id < rowset->num_segments(); ++segment_id) {
252
0
                process_segment_file_cache_meta(response, rowset, tablet_id, rowset_id, segment_id,
253
0
                                                false);
254
0
                process_segment_file_cache_meta(response, rowset, tablet_id, rowset_id, segment_id,
255
0
                                                true);
256
0
            }
257
0
        }
258
0
    }
259
0
    auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
260
0
                          std::chrono::steady_clock::now().time_since_epoch())
261
0
                          .count();
262
0
    g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts);
263
0
    LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took "
264
0
              << end_ts - begin_ts << " us";
265
0
    VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString()
266
0
               << ", response=" << response->DebugString();
267
0
}
268
269
namespace {
270
// Helper functions for fetch_peer_data
271
36
inline int64_t elapsed_us(std::chrono::steady_clock::time_point start) {
272
36
    return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() -
273
36
                                                                 start)
274
36
            .count();
275
36
}
276
277
1
int64_t get_rowset_meta_tablet_id_from_request(const PFetchPeerDataRequest* request) {
278
1
    return request->has_rowset_meta() && request->rowset_meta().has_tablet_id()
279
1
                   ? request->rowset_meta().tablet_id()
280
1
                   : -1;
281
1
}
282
283
1
std::string get_rowset_meta_resource_id_from_request(const PFetchPeerDataRequest* request) {
284
1
    if (request->has_rowset_meta() && request->rowset_meta().has_resource_id()) {
285
1
        return request->rowset_meta().resource_id();
286
1
    }
287
0
    return "";
288
1
}
289
290
7
std::string get_peer_cache_filename(std::string_view path) {
291
7
    return io::Path(std::string(path)).filename().native();
292
7
}
293
294
std::string format_peer_request_context(const PFetchPeerDataRequest* request,
295
0
                                        const io::UInt128Wrapper& hash, size_t file_size) {
296
0
    const std::string file_size_str =
297
0
            request->has_file_size() ? std::to_string(request->file_size()) : "unknown";
298
0
    return fmt::format(
299
0
            "type={}, path={}, cache_hash={}, request_fill={}, fill_tablet_id={}, "
300
0
            "fill_remote_path={}, fill_resource_id={}, file_size={}, resolved_file_size={}, "
301
0
            "cache_req_count={}, support_attachment={}",
302
0
            request->type(), request->path(), hash.to_string(),
303
0
            request->has_request_cache_fill() && request->request_cache_fill(),
304
0
            get_rowset_meta_tablet_id_from_request(request), request->path(),
305
0
            get_rowset_meta_resource_id_from_request(request), file_size_str,
306
0
            file_size == std::numeric_limits<size_t>::max() ? std::string("unknown")
307
0
                                                            : std::to_string(file_size),
308
0
            request->cache_req_size(),
309
0
            request->has_support_attachment() && request->support_attachment());
310
0
}
311
312
std::string format_peer_cache_block_context(const PFetchPeerDataRequest* request,
313
                                            const CacheBlockReqest& cb_req,
314
                                            const io::FileBlockSPtr& fb,
315
                                            const io::UInt128Wrapper& hash, size_t file_size,
316
0
                                            bool do_fill) {
317
0
    return fmt::format("{}, req_block=[offset={}, size={}], do_fill={}, block={}, cache_file={}",
318
0
                       format_peer_request_context(request, hash, file_size), cb_req.block_offset(),
319
0
                       cb_req.block_size(), do_fill, fb->get_info_for_log(), fb->get_cache_file());
320
0
}
321
322
std::string format_peer_fill_context(const io::FileBlockSPtr& fb, int64_t fill_tablet_id,
323
                                     const std::string& filename, const std::string& resource_id,
324
                                     const std::string& remote_path, int64_t file_size,
325
1
                                     int64_t offset, int64_t size, int32_t timeout_ms) {
326
1
    return fmt::format(
327
1
            "tablet_id={}, filename={}, resource_id={}, remote_path={}, file_size={}, "
328
1
            "request_range=[offset={}, size={}], timeout_ms={}, block={}, cache_file={}",
329
1
            fill_tablet_id, filename, resource_id.empty() ? "<unknown>" : resource_id,
330
1
            remote_path.empty() ? "<unknown>" : remote_path, file_size, offset, size, timeout_ms,
331
1
            fb->get_info_for_log(), fb->get_cache_file());
332
1
}
333
334
1
bool wait_for_file_block_state(const io::FileBlockSPtr& fb, int32_t timeout_ms) {
335
1
    const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms);
336
4
    while (true) {
337
4
        const auto state = fb->state();
338
4
        if (state == io::FileBlock::State::DOWNLOADED ||
339
4
            state == io::FileBlock::State::SKIP_CACHE || state == io::FileBlock::State::EMPTY) {
340
1
            return true;
341
1
        }
342
3
        if (std::chrono::steady_clock::now() >= deadline) {
343
0
            return false;
344
0
        }
345
3
        fb->wait();
346
3
    }
347
1
}
348
349
0
Status handle_peer_file_range_request(const std::string& path, PFetchPeerDataResponse* response) {
350
    // Legacy path: PEER_FILE_RANGE still returns payload via protobuf bytes.
351
    // Keep this for compatibility until range path is migrated to attachment mode.
352
    // Read specific range [file_offset, file_offset+file_size) across cached blocks
353
0
    auto datas =
354
0
            io::FileCacheFactory::instance()->get_cache_data_by_path(get_peer_cache_filename(path));
355
0
    for (auto& cb : datas) {
356
0
        *(response->add_datas()) = std::move(cb);
357
0
    }
358
0
    return Status::OK();
359
0
}
360
361
0
void set_error_response(PFetchPeerDataResponse* response, const std::string& error_msg) {
362
0
    response->mutable_status()->add_error_msgs(error_msg);
363
0
    response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
364
0
}
365
366
1
void set_too_many_tasks_response(PFetchPeerDataResponse* response, const std::string& error_msg) {
367
1
    response->mutable_status()->add_error_msgs(error_msg);
368
1
    response->mutable_status()->set_status_code(TStatusCode::TOO_MANY_TASKS);
369
1
}
370
371
bool try_reject_if_queue_timed_out(std::chrono::steady_clock::time_point enqueue_ts,
372
2
                                   PFetchPeerDataResponse* response) {
373
2
    auto wait_us = elapsed_us(enqueue_ts);
374
2
    g_file_cache_get_by_peer_queue_wait_latency << wait_us;
375
2
    auto wait_ms = wait_us / 1000;
376
2
    if (wait_ms <= config::peer_fetch_queue_timeout_ms) {
377
1
        return false;
378
1
    }
379
380
1
    const std::string msg = fmt::format("fetch peer data queue timeout, wait_ms={}, timeout_ms={}",
381
1
                                        wait_ms, config::peer_fetch_queue_timeout_ms);
382
1
    g_file_cache_get_by_peer_queue_timeout_num << 1;
383
1
    set_too_many_tasks_response(response, msg);
384
1
    return true;
385
2
}
386
387
Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block, size_t file_size,
388
9
                       doris::CacheBlockPB* output, butil::IOBuf* response_attachment) {
389
9
    auto total_start = std::chrono::steady_clock::now();
390
9
    int64_t set_data_us = 0;
391
9
    Defer report {[&]() {
392
9
        g_file_cache_get_by_peer_read_file_block_total_latency << elapsed_us(total_start);
393
9
        if (set_data_us > 0) {
394
2
            g_file_cache_get_by_peer_set_response_data_latency << set_data_us;
395
2
        }
396
9
    }};
397
    // ATTN: calculate the rightmost boundary value of the block, due to inaccurate current block meta information.
398
    // see CachedRemoteFileReader::read_at_impl for more details.
399
    // Ensure file_size >= file_block->offset() to avoid underflow
400
9
    if (file_size < file_block->offset()) {
401
0
        LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset("
402
0
                     << file_block->offset() << ")";
403
0
        return Status::InternalError<false>("file_size less than block offset");
404
0
    }
405
9
    size_t read_size = std::min(static_cast<size_t>(file_size - file_block->offset()),
406
9
                                file_block->range().size());
407
9
    output->set_block_offset(static_cast<int64_t>(file_block->offset()));
408
9
    output->set_block_size(static_cast<int64_t>(read_size));
409
9
    if (read_size == 0) {
410
0
        return Status::OK();
411
0
    }
412
413
9
    Status read_st = Status::OK();
414
    // Attachment payload mode: protobuf carries metadata only, payload goes to attachment.
415
    // This allows FS cache to use a file-descriptor->IOBuf path directly.
416
9
    if (response_attachment != nullptr) {
417
7
        size_t bytes_read = 0;
418
7
        auto begin_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
419
7
                                          std::chrono::steady_clock::now().time_since_epoch())
420
7
                                          .count();
421
7
        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
422
7
        read_st = file_block->read_to_iobuf(response_attachment, /*read_offset=*/0, read_size,
423
7
                                            &bytes_read);
424
7
        auto end_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
425
7
                                        std::chrono::steady_clock::now().time_since_epoch())
426
7
                                        .count();
427
7
        g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts);
428
429
7
        if (read_st.ok()) {
430
7
            if (bytes_read != read_size) {
431
0
                return Status::InternalError<false>(
432
0
                        "peer cache read short data, expected={}, actual={}", read_size,
433
0
                        bytes_read);
434
0
            }
435
7
            g_file_cache_get_by_peer_response_bytes_total << bytes_read;
436
7
            return Status::OK();
437
7
        }
438
7
    } else {
439
2
        std::string data;
440
2
        data.resize(read_size);
441
2
        auto begin_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
442
2
                                          std::chrono::steady_clock::now().time_since_epoch())
443
2
                                          .count();
444
2
        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker());
445
2
        Slice slice(data.data(), data.size());
446
2
        read_st = file_block->read(slice, /*read_offset=*/0);
447
2
        auto end_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>(
448
2
                                        std::chrono::steady_clock::now().time_since_epoch())
449
2
                                        .count();
450
2
        g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts);
451
452
2
        if (read_st.ok()) {
453
2
            auto set_data_start = std::chrono::steady_clock::now();
454
2
            output->set_data(std::move(data));
455
2
            set_data_us = elapsed_us(set_data_start);
456
2
            g_file_cache_get_by_peer_response_bytes_total << read_size;
457
2
            return Status::OK();
458
2
        }
459
2
    }
460
461
0
    g_file_cache_get_by_peer_failed_num << 1;
462
0
    LOG(WARNING) << "read cache block failed, file_size=" << file_size
463
0
                 << ", block=" << file_block->get_info_for_log()
464
0
                 << ", cache_file=" << file_block->get_cache_file() << ", err=" << read_st;
465
0
    return read_st;
466
9
}
467
468
// Trigger S3 -> local cache fill for the given file block.
469
// Returns OK when the block is DOWNLOADED after the fill.
470
// Returns TOO_MANY_TASKS when the fill slot is exhausted (server healthy but overloaded):
471
//   client should not rotate or evict, just fall back to S3 and retry same candidate later.
472
// Returns NOT_FOUND for soft misses (tablet not found, fill incomplete, timeout):
473
//   client should rotate the candidate to try a different CG next time.
474
// The peer uses request.path as the full remote path. tablet_id/filename are kept for logging.
475
Status trigger_peer_server_fill(io::FileBlockSPtr& fb, int64_t fill_tablet_id,
476
                                const std::string& filename, const std::string& resource_id,
477
                                const std::string& remote_path, int64_t file_size, int64_t offset,
478
1
                                int64_t size, int32_t timeout_ms) {
479
1
    g_peer_server_fill_requested << 1;
480
481
    // Concurrency guard: atomically reserve a fill slot.
482
    // Excess requests are rejected so the client falls back to its own S3 read.
483
    // Return NOT_FOUND so the client rotates the candidate instead of evicting it.
484
1
    if (g_active_server_fills.fetch_add(1, std::memory_order_relaxed) >=
485
1
        config::max_concurrent_peer_server_fills) {
486
0
        g_active_server_fills.fetch_sub(1, std::memory_order_relaxed);
487
0
        g_peer_server_fill_rejected << 1;
488
0
        VLOG_DEBUG << "trigger_peer_server_fill: rejected (concurrency limit "
489
0
                   << config::max_concurrent_peer_server_fills << "), tablet_id=" << fill_tablet_id;
490
        // TOO_MANY_TASKS: server is healthy but overloaded. Client must not rotate or evict;
491
        // just fall back to S3 for this request and retry the same candidate next time.
492
0
        return Status::Error<ErrorCode::TOO_MANY_TASKS, false>("fill slot exhausted");
493
0
    }
494
    // RAII decrement: runs on every return path below.
495
1
    Defer fill_guard {[]() { g_active_server_fills.fetch_sub(1, std::memory_order_relaxed); }};
496
497
1
    if (remote_path.empty() || resource_id.empty()) {
498
0
        const std::string ctx =
499
0
                format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path,
500
0
                                         file_size, offset, size, timeout_ms);
501
0
        LOG(WARNING) << "trigger_peer_server_fill: missing remote_path or resource_id, " << ctx;
502
0
        g_peer_server_fill_rejected << 1;
503
0
        return Status::NotFound<false>("fill: missing remote_path or resource_id, {}", ctx);
504
0
    }
505
1
    auto storage_resource = doris::get_storage_resource(resource_id);
506
1
    if (!storage_resource.has_value()) {
507
1
        const std::string ctx =
508
1
                format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path,
509
1
                                         file_size, offset, size, timeout_ms);
510
1
        LOG(WARNING) << "trigger_peer_server_fill: storage resource not found, " << ctx;
511
1
        g_peer_server_fill_rejected << 1;
512
1
        return Status::NotFound<false>("fill: storage resource not found, {}", ctx);
513
1
    }
514
0
    auto fs = storage_resource->first.fs;
515
516
0
    const auto initial_state = fb->state();
517
0
    if (initial_state == io::FileBlock::State::DOWNLOADING) {
518
        // Another thread already owns the block downloader. Wait up to the request timeout instead
519
        // of the shorter per-wait timeout in FileBlock::wait().
520
0
        [[maybe_unused]] const bool completed = wait_for_file_block_state(fb, timeout_ms);
521
0
        const std::string ctx =
522
0
                format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path,
523
0
                                         file_size, offset, size, timeout_ms);
524
0
        return fb->state() == io::FileBlock::State::DOWNLOADED
525
0
                       ? Status::OK()
526
0
                       : Status::NotFound<false>("fill: concurrent download incomplete, {}", ctx);
527
0
    }
528
0
    if (initial_state != io::FileBlock::State::EMPTY) {
529
0
        const std::string ctx =
530
0
                format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path,
531
0
                                         file_size, offset, size, timeout_ms);
532
0
        return initial_state == io::FileBlock::State::DOWNLOADED
533
0
                       ? Status::OK()
534
0
                       : Status::NotFound<false>("fill: unexpected initial block state, {}", ctx);
535
0
    }
536
537
0
    auto fill_start = std::chrono::steady_clock::now();
538
0
    auto fill_done = std::make_shared<bthread::CountdownEvent>(1);
539
0
    auto fill_status = std::make_shared<Status>(Status::OK());
540
0
    io::DownloadFileMeta download_meta {
541
0
            .path = remote_path,
542
0
            .file_size = file_size,
543
0
            .offset = offset,
544
0
            .download_size = size,
545
0
            .file_system = fs,
546
0
            .ctx = {.is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
547
                    // Pull-through fill must go straight to remote storage. If this download
548
                    // re-enters peer race, the original block can remain DOWNLOADING for the
549
                    // duration of nested peer retries and timeouts.
550
0
                    .is_warmup = false,
551
0
                    .bypass_peer_read = true},
552
0
            .download_done =
553
0
                    [fill_done, fill_status](Status st) {
554
0
                        *fill_status = std::move(st);
555
0
                        fill_done->signal();
556
0
                    },
557
0
            .tablet_id = fill_tablet_id,
558
0
    };
559
560
0
    io::DownloadTask task(std::move(download_meta));
561
0
    ExecEnv::GetInstance()
562
0
            ->storage_engine()
563
0
            .to_cloud()
564
0
            .file_cache_block_downloader()
565
0
            .submit_download_task(std::move(task));
566
567
0
    const timespec due_time = butil::milliseconds_from_now(timeout_ms);
568
0
    const bool timed_out = fill_done->timed_wait(due_time) != 0;
569
570
0
    int64_t fill_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
571
0
                              std::chrono::steady_clock::now() - fill_start)
572
0
                              .count();
573
0
    g_peer_server_fill_latency << fill_ms * 1000; // LatencyRecorder takes microseconds
574
575
0
    if (!timed_out && fill_status->ok() && fb->state() == io::FileBlock::State::DOWNLOADING) {
576
0
        const int32_t settle_timeout_ms =
577
0
                std::max<int32_t>(1, timeout_ms - static_cast<int32_t>(fill_ms));
578
0
        [[maybe_unused]] const bool settled = wait_for_file_block_state(fb, settle_timeout_ms);
579
0
    }
580
581
0
    auto final_state = fb->state();
582
0
    if (final_state == io::FileBlock::State::DOWNLOADED) {
583
0
        g_peer_server_fill_success << 1;
584
0
        return Status::OK();
585
0
    }
586
0
    if (timed_out) {
587
0
        LOG(WARNING) << "trigger_peer_server_fill: fill timeout, elapsed_ms=" << fill_ms << ", "
588
0
                     << format_peer_fill_context(fb, fill_tablet_id, filename, resource_id,
589
0
                                                 remote_path, file_size, offset, size, timeout_ms);
590
0
        g_peer_server_fill_timeout << 1;
591
0
    } else if (!fill_status->ok()) {
592
0
        LOG(WARNING) << "trigger_peer_server_fill: fill failed, elapsed_ms=" << fill_ms
593
0
                     << ", status=" << fill_status->to_string() << ", "
594
0
                     << format_peer_fill_context(fb, fill_tablet_id, filename, resource_id,
595
0
                                                 remote_path, file_size, offset, size, timeout_ms);
596
0
    }
597
    // Any non-DOWNLOADED outcome is a soft failure: the server is otherwise healthy so the
598
    // client should rotate the candidate rather than evict it.
599
0
    return Status::NotFound<false>(
600
0
            "fill: block not downloaded, {}",
601
0
            format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path,
602
0
                                     file_size, offset, size, timeout_ms));
603
0
}
604
605
Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request,
606
                                            PFetchPeerDataResponse* response,
607
7
                                            brpc::Controller* cntl) {
608
7
    auto handle_start = std::chrono::steady_clock::now();
609
7
    const uint64_t request_blocks = request->cache_req_size();
610
7
    uint64_t response_blocks = 0;
611
7
    uint64_t get_or_set_calls = 0;
612
7
    uint64_t get_or_set_blocks = 0;
613
7
    int64_t get_cache_us = 0;
614
7
    Defer report {[&]() {
615
7
        g_file_cache_get_by_peer_handle_cache_block_req_latency << elapsed_us(handle_start);
616
7
        g_file_cache_get_by_peer_get_cache_latency << get_cache_us;
617
7
        g_file_cache_get_by_peer_get_or_set_calls << get_or_set_calls;
618
7
        g_file_cache_get_by_peer_get_or_set_blocks_total << get_or_set_blocks;
619
7
        g_file_cache_get_by_peer_request_blocks_total << request_blocks;
620
7
        g_file_cache_get_by_peer_request_blocks_per_rpc << request_blocks;
621
7
        g_file_cache_get_by_peer_response_blocks_total << response_blocks;
622
7
    }};
623
7
    const auto& path = request->path();
624
7
    const auto cache_key_path = get_peer_cache_filename(path);
625
7
    auto hash = io::BlockFileCache::hash(cache_key_path);
626
7
    auto get_cache_start = std::chrono::steady_clock::now();
627
7
    auto* cache = io::FileCacheFactory::instance()->get_by_path(hash);
628
7
    get_cache_us = elapsed_us(get_cache_start);
629
7
    if (cache == nullptr) {
630
0
        g_file_cache_get_by_peer_failed_num << 1;
631
0
        set_error_response(response, "can't get file cache instance");
632
0
        return Status::InternalError<false>("can't get file cache instance");
633
0
    }
634
635
7
    io::CacheContext ctx {};
636
7
    io::ReadStatistics local_stats;
637
7
    ctx.stats = &local_stats;
638
7
    const size_t file_size =
639
7
            request->has_file_size()
640
7
                    ? static_cast<size_t>(std::max<int64_t>(0, request->file_size()))
641
7
                    : std::numeric_limits<size_t>::max();
642
    // Enable attachment mode only when client advertises support.
643
    // This keeps mixed-version rolling upgrades safe.
644
7
    const bool use_attachment =
645
7
            cntl != nullptr && request->has_support_attachment() && request->support_attachment();
646
7
    response->set_data_in_attachment(use_attachment);
647
7
    if (use_attachment) {
648
4
        g_file_cache_get_by_peer_attachment_response_num << 1;
649
4
    } else {
650
3
        g_file_cache_get_by_peer_pb_response_num << 1;
651
3
    }
652
653
7
    const bool do_fill = request->has_request_cache_fill() && request->request_cache_fill() &&
654
7
                         config::enable_peer_server_cache_fill;
655
656
9
    for (const auto& cb_req : request->cache_req()) {
657
9
        size_t offset = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_offset()));
658
9
        size_t size = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_size()));
659
9
        if (offset >= file_size) {
660
0
            continue;
661
0
        }
662
        // Clip tail requests before get_or_set so peer reads do not synthesize EMPTY blocks past
663
        // EOF and then fail the whole RPC.
664
9
        size = std::min(size, file_size - offset);
665
9
        if (size == 0) {
666
0
            continue;
667
0
        }
668
9
        DBUG_EXECUTE_IF(
669
9
                "CloudInternalServiceImpl::handle_peer_file_cache_block_request_hold_before_get_or_"
670
9
                "set",
671
9
                {
672
9
                    int sleep_ms = dp->param<int>("sleep_ms", 300);
673
9
                    bthread_usleep(sleep_ms * 1000);
674
9
                });
675
9
        auto get_or_set_start = std::chrono::steady_clock::now();
676
9
        auto holder = cache->get_or_set(hash, offset, size, ctx);
677
9
        g_file_cache_get_by_peer_get_or_set_latency << elapsed_us(get_or_set_start);
678
9
        ++get_or_set_calls;
679
9
        get_or_set_blocks += holder.file_blocks.size();
680
681
10
        for (auto& fb : holder.file_blocks) {
682
10
            auto fb_state = fb->state();
683
10
            if (fb_state == io::FileBlock::State::DOWNLOADING) {
684
1
                if (do_fill) {
685
                    // Only peer fill requests should wait longer here. Plain peer-cache reads keep
686
                    // the short wait semantics so they can fail fast and let the client race S3.
687
1
                    [[maybe_unused]] const bool completed = wait_for_file_block_state(
688
1
                            fb, config::peer_server_cache_fill_timeout_ms);
689
1
                    fb_state = fb->state();
690
1
                } else {
691
                    // Wait for in-progress download to complete using the normal short timeout.
692
0
                    fb_state = fb->wait();
693
0
                }
694
1
            }
695
10
            if (fb_state == io::FileBlock::State::EMPTY) {
696
1
                if (!do_fill) {
697
0
                    const std::string msg =
698
0
                            fmt::format("cache block not downloaded, {}",
699
0
                                        format_peer_cache_block_context(request, cb_req, fb, hash,
700
0
                                                                        file_size, do_fill));
701
0
                    g_file_cache_get_by_peer_failed_num << 1;
702
0
                    g_file_cache_get_by_peer_not_downloaded_block_num << 1;
703
0
                    LOG(WARNING) << msg;
704
                    // Use NOT_FOUND so the client can distinguish "block not cached"
705
                    // from an actual RPC/server error.  On NOT_FOUND the client rotates
706
                    // the candidate to the end of its list (trying another CG next time)
707
                    // rather than incrementing the RPC-failure eviction counter.
708
0
                    response->mutable_status()->add_error_msgs(msg);
709
0
                    response->mutable_status()->set_status_code(TStatusCode::NOT_FOUND);
710
0
                    return Status::NotFound<false>(msg);
711
0
                }
712
                // Server-side fill: request.path already carries the full remote path.
713
1
                auto fill_st = trigger_peer_server_fill(
714
1
                        fb, get_rowset_meta_tablet_id_from_request(request), cache_key_path,
715
1
                        get_rowset_meta_resource_id_from_request(request), path,
716
1
                        request->has_file_size() ? request->file_size() : -1,
717
1
                        static_cast<int64_t>(fb->range().left),
718
1
                        static_cast<int64_t>(fb->range().size()),
719
1
                        config::peer_server_cache_fill_timeout_ms);
720
1
                if (!fill_st.ok()) {
721
1
                    g_file_cache_get_by_peer_failed_num << 1;
722
1
                    g_file_cache_get_by_peer_not_downloaded_block_num << 1;
723
1
                    if (fill_st.is<ErrorCode::TOO_MANY_TASKS>()) {
724
                        // Server slot exhausted: healthy but overloaded. Client must not rotate
725
                        // or evict — just fall back to S3 and retry same candidate next time.
726
0
                        response->mutable_status()->add_error_msgs(std::string(fill_st.msg()));
727
0
                        response->mutable_status()->set_status_code(TStatusCode::TOO_MANY_TASKS);
728
1
                    } else if (fill_st.is<ErrorCode::NOT_FOUND>()) {
729
                        // Soft miss (fill incomplete, timeout, unexpected state) — client rotates,
730
                        // not evicts.
731
1
                        response->mutable_status()->add_error_msgs(std::string(fill_st.msg()));
732
1
                        response->mutable_status()->set_status_code(TStatusCode::NOT_FOUND);
733
1
                    } else {
734
0
                        LOG(WARNING) << "cache block fill failed, status=" << fill_st << ", "
735
0
                                     << format_peer_cache_block_context(request, cb_req, fb, hash,
736
0
                                                                        file_size, do_fill);
737
0
                        set_error_response(response, "cache block not ready");
738
0
                    }
739
1
                    return fill_st;
740
1
                }
741
0
                fb_state = io::FileBlock::State::DOWNLOADED;
742
0
            }
743
9
            if (fb_state != io::FileBlock::State::DOWNLOADED) {
744
                // A concurrent download was in progress (DOWNLOADING at request time) but its
745
                // wait() returned a non-DOWNLOADED state (e.g., timed-out while still
746
                // DOWNLOADING, or some other non-EMPTY intermediate state).  The server is
747
                // healthy; the block just isn't available yet.  Return NOT_FOUND so the client
748
                // rotates the candidate instead of evicting it.
749
0
                const std::string msg =
750
0
                        fmt::format("cache block not ready after wait, {}",
751
0
                                    format_peer_cache_block_context(request, cb_req, fb, hash,
752
0
                                                                    file_size, do_fill));
753
0
                g_file_cache_get_by_peer_failed_num << 1;
754
0
                g_file_cache_get_by_peer_not_downloaded_block_num << 1;
755
0
                LOG(WARNING) << msg;
756
0
                response->mutable_status()->add_error_msgs(msg);
757
0
                response->mutable_status()->set_status_code(TStatusCode::NOT_FOUND);
758
0
                return Status::NotFound<false>(msg);
759
0
            }
760
761
9
            g_file_cache_get_by_peer_blocks_num << 1;
762
9
            doris::CacheBlockPB* out = response->add_datas();
763
            // In attachment mode, metadata order must match attachment append order because the
764
            // client consumes attachment payload sequentially using resp.datas() order.
765
9
            Status read_status = read_file_block(
766
9
                    fb, file_size, out, use_attachment ? &cntl->response_attachment() : nullptr);
767
9
            if (!read_status.ok()) {
768
0
                set_error_response(response, "read cache file error");
769
0
                return read_status;
770
0
            }
771
9
            ++response_blocks;
772
9
        }
773
9
    }
774
775
6
    return Status::OK();
776
7
}
777
} // namespace
778
779
#ifdef BE_TEST
780
Status test_handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request,
781
                                                 PFetchPeerDataResponse* response,
782
                                                 brpc::Controller* cntl) {
783
    return handle_peer_file_cache_block_request(request, response, cntl);
784
}
785
786
bool test_try_reject_if_queue_timed_out(std::chrono::steady_clock::time_point enqueue_ts,
787
                                        PFetchPeerDataResponse* response) {
788
    return try_reject_if_queue_timed_out(enqueue_ts, response);
789
}
790
#endif
791
792
void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller,
793
                                               const PFetchPeerDataRequest* request,
794
                                               PFetchPeerDataResponse* response,
795
0
                                               google::protobuf::Closure* done) {
796
0
    auto enqueue_ts = std::chrono::steady_clock::now();
797
    // Lifetime: cntl is owned by brpc framework and valid until done->Run() is called.
798
    // The ClosureGuard inside the lambda ensures done->Run() happens after all cntl usage,
799
    // so capturing the raw pointer by value is safe.
800
0
    auto* cntl = static_cast<brpc::Controller*>(controller);
801
0
    bool ret = _peer_fetch_pool.try_offer([request, response, done, enqueue_ts, cntl]() {
802
0
        brpc::ClosureGuard closure_guard(done);
803
0
        g_file_cache_get_by_peer_num << 1;
804
0
        if (try_reject_if_queue_timed_out(enqueue_ts, response)) {
805
0
            return;
806
0
        }
807
808
0
        if (!config::enable_file_cache) {
809
0
            LOG_WARNING("try to access file cache data, but file cache not enabled");
810
0
            return;
811
0
        }
812
813
0
        auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>(
814
0
                                std::chrono::steady_clock::now().time_since_epoch())
815
0
                                .count();
816
817
0
        const auto type = request->type();
818
0
        const auto& path = request->path();
819
0
        response->mutable_status()->set_status_code(TStatusCode::OK);
820
821
0
        Status status = Status::OK();
822
0
        if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) {
823
0
            status = handle_peer_file_range_request(path, response);
824
0
        } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) {
825
0
            status = handle_peer_file_cache_block_request(request, response, cntl);
826
0
        }
827
828
0
        if (!status.ok()) {
829
0
            const std::string msg =
830
0
                    "fetch peer data failed: " + status.to_string() + ", " +
831
0
                    format_peer_request_context(
832
0
                            request, io::BlockFileCache::hash(get_peer_cache_filename(path)),
833
0
                            request->has_file_size() ? static_cast<size_t>(std::max<int64_t>(
834
0
                                                               0, request->file_size()))
835
0
                                                     : std::numeric_limits<size_t>::max());
836
0
            if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::TOO_MANY_TASKS>()) {
837
0
                VLOG_DEBUG << msg;
838
0
            } else {
839
0
                LOG(WARNING) << msg;
840
0
            }
841
0
            auto* resp_status = response->mutable_status();
842
0
            if (resp_status->status_code() == TStatusCode::OK) {
843
0
                set_error_response(response, status.to_string());
844
0
            } else if (resp_status->error_msgs().empty()) {
845
0
                resp_status->add_error_msgs(status.to_string());
846
0
            }
847
0
        }
848
849
0
        DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", {
850
0
            int st_us = dp->param<int>("sleep", 1000);
851
0
            LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us);
852
0
            bthread_usleep(st_us);
853
0
        });
854
855
0
        auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>(
856
0
                              std::chrono::steady_clock::now().time_since_epoch())
857
0
                              .count();
858
        // Latency covers every completed callback (including failures) so the
859
        // server-side fail-fast paths still show up in the latency histogram.
860
        // success_num must only count actual OK results, otherwise dedup
861
        // TOO_MANY_TASKS / NOT_FOUND / handler errors all fall through here
862
        // and the success rate is meaningless. Use file_cache_get_by_peer_num
863
        // for the total completed-callback count.
864
0
        g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts);
865
0
        if (status.ok()) {
866
0
            g_file_cache_get_by_peer_success_num << 1;
867
0
        }
868
869
0
        VLOG_DEBUG << "fetch cache request=" << request->DebugString()
870
0
                   << ", response=" << response->DebugString();
871
0
    });
872
873
0
    if (!ret) {
874
0
        g_file_cache_get_by_peer_offer_failed_num << 1;
875
0
        brpc::ClosureGuard closure_guard(done);
876
0
        const std::string msg = fmt::format(
877
0
                "fail to offer fetch peer data request to the peer fetch work pool, pool={}",
878
0
                _peer_fetch_pool.get_info());
879
0
        set_too_many_tasks_response(response, msg);
880
0
        LOG(WARNING) << msg;
881
0
    }
882
0
}
883
884
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_num(
885
        "file_cache_event_driven_warm_up_submitted_segment_num");
886
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_segment_num(
887
        "file_cache_event_driven_warm_up_finished_segment_num");
888
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_segment_num(
889
        "file_cache_event_driven_warm_up_failed_segment_num");
890
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_size(
891
        "file_cache_event_driven_warm_up_submitted_segment_size");
892
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_segment_size(
893
        "file_cache_event_driven_warm_up_finished_segment_size");
894
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_segment_size(
895
        "file_cache_event_driven_warm_up_failed_segment_size");
896
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_index_num(
897
        "file_cache_event_driven_warm_up_submitted_index_num");
898
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_index_num(
899
        "file_cache_event_driven_warm_up_finished_index_num");
900
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_index_num(
901
        "file_cache_event_driven_warm_up_failed_index_num");
902
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_index_size(
903
        "file_cache_event_driven_warm_up_submitted_index_size");
904
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_index_size(
905
        "file_cache_event_driven_warm_up_finished_index_size");
906
bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_index_size(
907
        "file_cache_event_driven_warm_up_failed_index_size");
908
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_handle_unix_ts(
909
        "file_cache_warm_up_rowset_last_handle_unix_ts", 0);
910
bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_finish_unix_ts(
911
        "file_cache_warm_up_rowset_last_finish_unix_ts", 0);
912
bvar::LatencyRecorder g_file_cache_warm_up_rowset_latency("file_cache_warm_up_rowset_latency");
913
bvar::LatencyRecorder g_file_cache_warm_up_rowset_request_to_handle_latency(
914
        "file_cache_warm_up_rowset_request_to_handle_latency");
915
bvar::LatencyRecorder g_file_cache_warm_up_rowset_handle_to_finish_latency(
916
        "file_cache_warm_up_rowset_handle_to_finish_latency");
917
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_slow_count(
918
        "file_cache_warm_up_rowset_slow_count");
919
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_request_to_handle_slow_count(
920
        "file_cache_warm_up_rowset_request_to_handle_slow_count");
921
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_handle_to_finish_slow_count(
922
        "file_cache_warm_up_rowset_handle_to_finish_slow_count");
923
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num(
924
        "file_cache_warm_up_rowset_wait_for_compaction_num");
925
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num(
926
        "file_cache_warm_up_rowset_wait_for_compaction_timeout_num");
927
928
// Per-job windowed metrics for target BE
929
// bvar::Window enforces MAX_SECONDS_LIMIT = 3600, so the longest window is 1h.
930
static constexpr int WINDOW_5M = 300;
931
static constexpr int WINDOW_30M = 1800;
932
static constexpr int WINDOW_1H = 3600;
933
934
MBvarWindowedAdder g_warmup_ed_finish_segment_num("warmup_ed_finish_segment_num", {"job_id"},
935
                                                  {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
936
MBvarWindowedAdder g_warmup_ed_finish_segment_size("warmup_ed_finish_segment_size", {"job_id"},
937
                                                   {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
938
MBvarWindowedAdder g_warmup_ed_finish_index_num("warmup_ed_finish_index_num", {"job_id"},
939
                                                {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
940
MBvarWindowedAdder g_warmup_ed_finish_index_size("warmup_ed_finish_index_size", {"job_id"},
941
                                                 {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
942
MBvarWindowedAdder g_warmup_ed_fail_segment_num("warmup_ed_fail_segment_num", {"job_id"},
943
                                                {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
944
MBvarWindowedAdder g_warmup_ed_fail_segment_size("warmup_ed_fail_segment_size", {"job_id"},
945
                                                 {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
946
MBvarWindowedAdder g_warmup_ed_fail_index_num("warmup_ed_fail_index_num", {"job_id"},
947
                                              {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
948
MBvarWindowedAdder g_warmup_ed_fail_index_size("warmup_ed_fail_index_size", {"job_id"},
949
                                               {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false);
950
bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_finish_ts({"job_id"});
951
952
0
void update_warmup_ed_last_finish_ts(const std::string& job_id_str) {
953
0
    auto* finish_ts = g_warmup_ed_last_finish_ts.get_stats(std::list<std::string> {job_id_str});
954
0
    if (finish_ts) {
955
0
        finish_ts->set_value(std::chrono::duration_cast<std::chrono::milliseconds>(
956
0
                                     std::chrono::system_clock::now().time_since_epoch())
957
0
                                     .count());
958
0
    }
959
0
}
960
961
0
void record_warmup_ed_finish_segment(const std::string& job_id_str, int64_t segment_size) {
962
0
    g_warmup_ed_finish_segment_num.put({job_id_str}, 1);
963
0
    g_warmup_ed_finish_segment_size.put({job_id_str}, segment_size);
964
0
    update_warmup_ed_last_finish_ts(job_id_str);
965
0
}
966
967
0
void record_warmup_ed_finish_index(const std::string& job_id_str, int64_t idx_size) {
968
0
    g_warmup_ed_finish_index_num.put({job_id_str}, 1);
969
0
    g_warmup_ed_finish_index_size.put({job_id_str}, idx_size);
970
0
    update_warmup_ed_last_finish_ts(job_id_str);
971
0
}
972
973
0
void record_warmup_ed_fail_segment(const std::string& job_id_str, int64_t segment_size) {
974
0
    g_warmup_ed_fail_segment_num.put({job_id_str}, 1);
975
0
    g_warmup_ed_fail_segment_size.put({job_id_str}, segment_size);
976
0
}
977
978
0
void record_warmup_ed_fail_index(const std::string& job_id_str, int64_t idx_size) {
979
0
    g_warmup_ed_fail_index_num.put({job_id_str}, 1);
980
0
    g_warmup_ed_fail_index_size.put({job_id_str}, idx_size);
981
0
}
982
983
void record_warmup_ed_skipped_rowset_as_finished(RowsetMeta& rs_meta,
984
0
                                                 const std::string& job_id_str) {
985
0
    auto schema_ptr = rs_meta.tablet_schema();
986
0
    bool has_inverted_index = schema_ptr->has_inverted_index() || schema_ptr->has_ann_index();
987
0
    auto idx_version = schema_ptr->get_inverted_index_storage_format();
988
0
    for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
989
0
        record_warmup_ed_finish_segment(job_id_str, rs_meta.segment_file_size(segment_id));
990
991
0
        if (!has_inverted_index) {
992
0
            continue;
993
0
        }
994
0
        auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
995
0
        if (idx_version == InvertedIndexStorageFormatPB::V1) {
996
0
            std::unordered_map<int64_t, int64_t> index_size_map;
997
0
            for (const auto& info : inverted_index_info.index_info()) {
998
0
                if (info.index_file_size() != -1) {
999
0
                    index_size_map[info.index_id()] = info.index_file_size();
1000
0
                } else {
1001
0
                    VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id
1002
0
                               << ", index_id " << info.index_id();
1003
0
                }
1004
0
            }
1005
0
            for (const auto& index : schema_ptr->inverted_indexes()) {
1006
0
                record_warmup_ed_finish_index(job_id_str, index_size_map[index->index_id()]);
1007
0
            }
1008
0
        } else { // InvertedIndexStorageFormatPB::V2
1009
0
            int64_t idx_size = 0;
1010
0
            if (inverted_index_info.has_index_size()) {
1011
0
                idx_size = inverted_index_info.index_size();
1012
0
            } else {
1013
0
                VLOG_DEBUG << "index_size is not set for segment " << segment_id;
1014
0
            }
1015
0
            record_warmup_ed_finish_index(job_id_str, idx_size);
1016
0
        }
1017
0
    }
1018
0
}
1019
1020
void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id,
1021
                                  int64_t segment_id, std::shared_ptr<CloudTablet> tablet,
1022
                                  std::shared_ptr<bthread::CountdownEvent> wait, Version version,
1023
                                  int64_t segment_size, int64_t request_ts, int64_t handle_ts,
1024
0
                                  std::string job_id_str, int64_t upstream_trigger_ts_ms) {
1025
0
    DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", {
1026
0
        auto sleep_time = dp->param<int>("sleep", 3);
1027
0
        LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}",
1028
0
                 rowset_id.to_string(), version.to_string(), sleep_time);
1029
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
1030
0
    });
1031
0
    DBUG_EXECUTE_IF(
1032
0
            "CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_"
1033
0
            "error",
1034
0
            {
1035
0
                st = Status::InternalError("injected error");
1036
0
                LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}", tablet_id,
1037
0
                         rowset_id.to_string(), st.to_string());
1038
0
            });
1039
0
    if (st.ok()) {
1040
0
        g_file_cache_event_driven_warm_up_finished_segment_num << 1;
1041
0
        g_file_cache_event_driven_warm_up_finished_segment_size << segment_size;
1042
0
        record_warmup_ed_finish_segment(job_id_str, segment_size);
1043
0
        int64_t now_ts = current_unix_time_us();
1044
0
        g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
1045
0
        auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts);
1046
0
        if (rowset_latency_us.has_value()) {
1047
0
            g_file_cache_warm_up_rowset_latency << *rowset_latency_us;
1048
0
        }
1049
0
        g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts);
1050
0
        if (rowset_latency_us.has_value() &&
1051
0
            *rowset_latency_us > config::warm_up_rowset_slow_log_ms * 1000) {
1052
0
            g_file_cache_warm_up_rowset_slow_count << 1;
1053
0
            LOG(INFO) << "warm up rowset took " << *rowset_latency_us
1054
0
                      << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string()
1055
0
                      << ", segment_id: " << segment_id;
1056
0
        }
1057
0
        if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
1058
0
            g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
1059
0
            LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
1060
0
                      << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string()
1061
0
                      << ", segment_id: " << segment_id;
1062
0
        }
1063
0
    } else {
1064
0
        g_file_cache_event_driven_warm_up_failed_segment_num << 1;
1065
0
        g_file_cache_event_driven_warm_up_failed_segment_size << segment_size;
1066
0
        record_warmup_ed_fail_segment(job_id_str, segment_size);
1067
0
        LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id
1068
0
                     << " rowset_id: " << rowset_id.to_string() << ", error: " << st;
1069
0
    }
1070
0
    if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, st, 1,
1071
0
                                               0)
1072
0
                .trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
1073
0
        VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string()
1074
0
                   << ") completed";
1075
0
    }
1076
0
    g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms);
1077
0
    if (wait) {
1078
0
        wait->signal();
1079
0
    }
1080
0
}
1081
1082
void handle_inverted_index_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id,
1083
                                         int64_t segment_id, std::string index_path,
1084
                                         std::shared_ptr<CloudTablet> tablet,
1085
                                         std::shared_ptr<bthread::CountdownEvent> wait,
1086
                                         Version version, uint64_t idx_size, int64_t request_ts,
1087
                                         int64_t handle_ts, std::string job_id_str,
1088
0
                                         int64_t upstream_trigger_ts_ms) {
1089
0
    DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", {
1090
0
        auto sleep_time = dp->param<int>("sleep", 3);
1091
0
        LOG_INFO(
1092
0
                "[verbose] block download for rowset={}, inverted index "
1093
0
                "file={}, sleep={}",
1094
0
                rowset_id.to_string(), index_path, sleep_time);
1095
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
1096
0
    });
1097
0
    if (st.ok()) {
1098
0
        g_file_cache_event_driven_warm_up_finished_index_num << 1;
1099
0
        g_file_cache_event_driven_warm_up_finished_index_size << idx_size;
1100
0
        record_warmup_ed_finish_index(job_id_str, static_cast<int64_t>(idx_size));
1101
0
        int64_t now_ts = current_unix_time_us();
1102
0
        g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts);
1103
0
        auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts);
1104
0
        if (rowset_latency_us.has_value()) {
1105
0
            g_file_cache_warm_up_rowset_latency << *rowset_latency_us;
1106
0
        }
1107
0
        g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts);
1108
0
        if (rowset_latency_us.has_value() &&
1109
0
            *rowset_latency_us > config::warm_up_rowset_slow_log_ms * 1000) {
1110
0
            g_file_cache_warm_up_rowset_slow_count << 1;
1111
0
            LOG(INFO) << "warm up rowset took " << *rowset_latency_us
1112
0
                      << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string()
1113
0
                      << ", segment_id: " << segment_id;
1114
0
        }
1115
0
        if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) {
1116
0
            g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1;
1117
0
            LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts
1118
0
                      << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string()
1119
0
                      << ", segment_id: " << segment_id;
1120
0
        }
1121
0
    } else {
1122
0
        g_file_cache_event_driven_warm_up_failed_index_num << 1;
1123
0
        g_file_cache_event_driven_warm_up_failed_index_size << idx_size;
1124
0
        record_warmup_ed_fail_index(job_id_str, static_cast<int64_t>(idx_size));
1125
0
        LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id
1126
0
                     << " rowset_id: " << rowset_id << ", error: " << st;
1127
0
    }
1128
0
    if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, st, 0,
1129
0
                                               1)
1130
0
                .trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) {
1131
0
        VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string()
1132
0
                   << ") completed";
1133
0
    }
1134
0
    g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms);
1135
0
    if (wait) {
1136
0
        wait->signal();
1137
0
    }
1138
0
}
1139
1140
void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* controller
1141
                                              [[maybe_unused]],
1142
                                              const PWarmUpRowsetRequest* request,
1143
                                              PWarmUpRowsetResponse* response,
1144
0
                                              google::protobuf::Closure* done) {
1145
0
    brpc::ClosureGuard closure_guard(done);
1146
0
    std::shared_ptr<bthread::CountdownEvent> wait = nullptr;
1147
0
    timespec due_time;
1148
0
    if (request->has_sync_wait_timeout_ms() && request->sync_wait_timeout_ms() > 0) {
1149
0
        g_file_cache_warm_up_rowset_wait_for_compaction_num << 1;
1150
0
        wait = std::make_shared<bthread::CountdownEvent>(0);
1151
0
        VLOG_DEBUG << "sync_wait_timeout: " << request->sync_wait_timeout_ms() << " ms";
1152
0
        due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms());
1153
0
    }
1154
1155
    // Extract job_id from request (0 if not set, for backward compatibility)
1156
0
    std::string job_id_str = std::to_string(request->has_job_id() ? request->job_id() : 0);
1157
0
    int64_t upstream_trigger_ts_ms =
1158
0
            request->has_upstream_trigger_ts_ms() ? request->upstream_trigger_ts_ms() : 0;
1159
1160
0
    for (auto& rs_meta_pb : request->rowset_metas()) {
1161
0
        RowsetMeta rs_meta;
1162
0
        rs_meta.init_from_pb(rs_meta_pb);
1163
0
        auto storage_resource = rs_meta.remote_storage_resource();
1164
0
        if (!storage_resource) {
1165
0
            LOG(WARNING) << storage_resource.error();
1166
0
            continue;
1167
0
        }
1168
0
        int64_t tablet_id = rs_meta.tablet_id();
1169
0
        auto rowset_id = rs_meta.rowset_id();
1170
0
        bool local_only = !(request->has_skip_existence_check() && request->skip_existence_check());
1171
0
        auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = */ false,
1172
0
                                                   /* sync_delete_bitmap = */ true,
1173
0
                                                   /* sync_stats = */ nullptr,
1174
0
                                                   /* local_only = */ local_only);
1175
0
        if (!res.has_value()) {
1176
0
            LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(res.error());
1177
0
            if (res.error().msg().find("local_only=true") != std::string::npos ||
1178
0
                res.error().msg().find("force_use_only_cached=true") != std::string::npos) {
1179
0
                res.error().set_code(ErrorCode::TABLE_NOT_FOUND);
1180
0
            }
1181
0
            res.error().to_protobuf(response->mutable_status());
1182
0
            continue;
1183
0
        }
1184
0
        auto tablet = res.value();
1185
0
        auto tablet_meta = tablet->tablet_meta();
1186
1187
0
        int64_t handle_ts = current_unix_time_us();
1188
0
        g_file_cache_warm_up_rowset_last_handle_unix_ts.set_value(handle_ts);
1189
0
        int64_t request_ts = request->has_unix_ts_us() ? request->unix_ts_us() : 0;
1190
0
        auto request_to_handle_latency_us =
1191
0
                warm_up_rowset_cross_host_latency_us(request_ts, handle_ts);
1192
0
        if (request_to_handle_latency_us.has_value()) {
1193
0
            g_file_cache_warm_up_rowset_request_to_handle_latency << *request_to_handle_latency_us;
1194
0
        }
1195
0
        if (request_to_handle_latency_us.has_value() &&
1196
0
            *request_to_handle_latency_us > config::warm_up_rowset_slow_log_ms * 1000) {
1197
0
            g_file_cache_warm_up_rowset_request_to_handle_slow_count << 1;
1198
0
            LOG(INFO) << "warm up rowset (request to handle) took " << *request_to_handle_latency_us
1199
0
                      << " us, tablet_id: " << rs_meta.tablet_id()
1200
0
                      << ", rowset_id: " << rowset_id.to_string();
1201
0
        }
1202
0
        int64_t expiration_time = tablet_meta->ttl_seconds();
1203
1204
0
        if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) {
1205
0
            LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string()
1206
0
                      << ", skip it";
1207
0
            g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str,
1208
0
                                                                     upstream_trigger_ts_ms);
1209
0
            record_warmup_ed_skipped_rowset_as_finished(rs_meta, job_id_str);
1210
0
            continue;
1211
0
        }
1212
0
        if (rs_meta.num_segments() == 0) {
1213
0
            g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str,
1214
0
                                                                     upstream_trigger_ts_ms);
1215
0
        }
1216
1217
0
        for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) {
1218
0
            if (!config::file_cache_enable_only_warm_up_idx) {
1219
0
                auto segment_size = rs_meta.segment_file_size(segment_id);
1220
1221
                // Use rs_meta.fs() instead of storage_resource.value()->fs to support packed files.
1222
                // PackedFileSystem wrapper in rs_meta.fs() handles the index_map lookup and
1223
                // reads from the correct packed file.
1224
0
                io::DownloadFileMeta download_meta {
1225
0
                        .path = storage_resource.value()->remote_segment_path(rs_meta, segment_id),
1226
0
                        .file_size = segment_size,
1227
0
                        .offset = 0,
1228
0
                        .download_size = segment_size,
1229
0
                        .file_system = rs_meta.fs(),
1230
0
                        .ctx = {.is_index_data = false,
1231
0
                                .expiration_time = expiration_time,
1232
0
                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1233
0
                                .is_warmup = true},
1234
0
                        .download_done =
1235
0
                                [=, version = rs_meta.version()](Status st) {
1236
0
                                    handle_segment_download_done(
1237
0
                                            st, tablet_id, rowset_id, segment_id, tablet, wait,
1238
0
                                            version, segment_size, request_ts, handle_ts,
1239
0
                                            job_id_str, upstream_trigger_ts_ms);
1240
0
                                },
1241
0
                        .tablet_id = tablet_id};
1242
1243
0
                g_file_cache_event_driven_warm_up_submitted_segment_num << 1;
1244
0
                g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size;
1245
0
                if (wait) {
1246
0
                    wait->add_count();
1247
0
                }
1248
0
                g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str,
1249
0
                                                                           upstream_trigger_ts_ms);
1250
1251
0
                _engine.file_cache_block_downloader().submit_download_task(download_meta);
1252
0
            }
1253
1254
            // Use rs_meta.fs() to support packed files for inverted index download.
1255
0
            auto download_inverted_index = [&, tablet, job_id_str](std::string index_path,
1256
0
                                                                   uint64_t idx_size) {
1257
0
                io::DownloadFileMeta download_meta {
1258
0
                        .path = io::Path(index_path),
1259
0
                        .file_size = static_cast<int64_t>(idx_size),
1260
0
                        .file_system = rs_meta.fs(),
1261
0
                        .ctx = {.is_index_data = false, // DORIS-20877
1262
0
                                .expiration_time = expiration_time,
1263
0
                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1264
0
                                .is_warmup = true},
1265
0
                        .download_done =
1266
0
                                [=, version = rs_meta.version()](Status st) {
1267
0
                                    handle_inverted_index_download_done(
1268
0
                                            st, tablet_id, rowset_id, segment_id, index_path,
1269
0
                                            tablet, wait, version, idx_size, request_ts, handle_ts,
1270
0
                                            job_id_str, upstream_trigger_ts_ms);
1271
0
                                },
1272
0
                        .tablet_id = tablet_id,
1273
0
                };
1274
0
                g_file_cache_event_driven_warm_up_submitted_index_num << 1;
1275
0
                g_file_cache_event_driven_warm_up_submitted_index_size << idx_size;
1276
0
                tablet->update_rowset_warmup_state_inverted_idx_num(
1277
0
                        WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, 1);
1278
0
                if (wait) {
1279
0
                    wait->add_count();
1280
0
                }
1281
0
                g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str,
1282
0
                                                                           upstream_trigger_ts_ms);
1283
0
                _engine.file_cache_block_downloader().submit_download_task(download_meta);
1284
0
            };
1285
1286
            // inverted index
1287
0
            auto schema_ptr = rs_meta.tablet_schema();
1288
0
            auto idx_version = schema_ptr->get_inverted_index_storage_format();
1289
1290
0
            if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
1291
0
                if (idx_version == InvertedIndexStorageFormatPB::V1) {
1292
0
                    auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
1293
0
                    std::unordered_map<int64_t, int64_t> index_size_map;
1294
0
                    for (const auto& info : inverted_index_info.index_info()) {
1295
0
                        if (info.index_file_size() != -1) {
1296
0
                            index_size_map[info.index_id()] = info.index_file_size();
1297
0
                        } else {
1298
0
                            VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id
1299
0
                                       << ", index_id " << info.index_id();
1300
0
                        }
1301
0
                    }
1302
0
                    for (const auto& index : schema_ptr->inverted_indexes()) {
1303
0
                        auto idx_path = storage_resource.value()->remote_idx_v1_path(
1304
0
                                rs_meta, segment_id, index->index_id(), index->get_index_suffix());
1305
0
                        download_inverted_index(idx_path, index_size_map[index->index_id()]);
1306
0
                    }
1307
0
                } else { // InvertedIndexStorageFormatPB::V2
1308
0
                    auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id);
1309
0
                    int64_t idx_size = 0;
1310
0
                    if (inverted_index_info.has_index_size()) {
1311
0
                        idx_size = inverted_index_info.index_size();
1312
0
                    } else {
1313
0
                        VLOG_DEBUG << "index_size is not set for segment " << segment_id;
1314
0
                    }
1315
0
                    auto idx_path =
1316
0
                            storage_resource.value()->remote_idx_v2_path(rs_meta, segment_id);
1317
0
                    download_inverted_index(idx_path, idx_size);
1318
0
                }
1319
0
            }
1320
0
        }
1321
0
    }
1322
0
    if (wait && wait->timed_wait(due_time)) {
1323
0
        g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num << 1;
1324
0
        LOG_WARNING("the time spent warming up {} rowsets exceeded {} ms",
1325
0
                    request->rowset_metas().size(), request->sync_wait_timeout_ms());
1326
0
    }
1327
0
}
1328
1329
bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_segment_num(
1330
        "file_cache_recycle_cache_finished_segment_num");
1331
bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_index_num(
1332
        "file_cache_recycle_cache_finished_index_num");
1333
1334
void CloudInternalServiceImpl::recycle_cache(google::protobuf::RpcController* controller
1335
                                             [[maybe_unused]],
1336
                                             const PRecycleCacheRequest* request,
1337
                                             PRecycleCacheResponse* response,
1338
0
                                             google::protobuf::Closure* done) {
1339
0
    brpc::ClosureGuard closure_guard(done);
1340
1341
0
    if (!config::enable_file_cache) {
1342
0
        return;
1343
0
    }
1344
0
    for (const auto& meta : request->cache_metas()) {
1345
0
        for (int64_t segment_id = 0; segment_id < meta.num_segments(); segment_id++) {
1346
0
            auto file_key = Segment::file_cache_key(meta.rowset_id(), segment_id);
1347
0
            auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
1348
0
            file_cache->remove_if_cached_async(file_key);
1349
0
            g_file_cache_recycle_cache_finished_segment_num << 1;
1350
0
        }
1351
1352
        // inverted index
1353
0
        for (const auto& file_name : meta.index_file_names()) {
1354
0
            auto file_key = io::BlockFileCache::hash(file_name);
1355
0
            auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
1356
0
            file_cache->remove_if_cached_async(file_key);
1357
0
            g_file_cache_recycle_cache_finished_index_num << 1;
1358
0
        }
1359
0
    }
1360
0
}
1361
1362
#include "common/compile_check_avoid_end.h"
1363
} // namespace doris