Coverage Report

Created: 2026-07-04 14:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/cached_remote_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/cache/cached_remote_file_reader.h"
19
20
#include <brpc/controller.h>
21
#include <bthread/bthread.h>
22
#include <bthread/condition_variable.h>
23
#include <bthread/mutex.h>
24
#include <fmt/format.h>
25
#include <gen_cpp/Types_types.h>
26
#include <gen_cpp/internal_service.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <atomic>
31
#include <condition_variable>
32
#include <cstring>
33
#include <functional>
34
#include <list>
35
#include <memory>
36
#include <mutex>
37
#include <thread>
38
#include <vector>
39
40
#include "cloud/cloud_cluster_info.h"
41
#include "cloud/cloud_warm_up_manager.h"
42
#include "cloud/config.h"
43
#include "common/compiler_util.h" // IWYU pragma: keep
44
#include "common/config.h"
45
#include "common/metrics/doris_metrics.h"
46
#include "cpp/sync_point.h"
47
#include "io/cache/block_file_cache.h"
48
#include "io/cache/block_file_cache_factory.h"
49
#include "io/cache/block_file_cache_profile.h"
50
#include "io/cache/file_block.h"
51
#include "io/cache/peer_file_cache_reader.h"
52
#include "io/fs/file_reader.h"
53
#include "io/fs/local_file_system.h"
54
#include "io/io_common.h"
55
#include "runtime/exec_env.h"
56
#include "runtime/runtime_profile.h"
57
#include "runtime/thread_context.h"
58
#include "runtime/workload_management/io_throttle.h"
59
#include "service/backend_options.h"
60
#include "util/bit_util.h"
61
#include "util/brpc_client_cache.h" // BrpcClientCache
62
#include "util/bthread_utils.h"
63
#include "util/client_cache.h"
64
#include "util/concurrency_stats.h"
65
#include "util/debug_points.h"
66
#include "util/defer_op.h"
67
68
namespace doris::io {
69
70
bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
71
bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read");
72
bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
73
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
74
bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
75
        "cached_remote_reader_skip_local_cache_io_sum_bytes");
76
bvar::Adder<uint64_t> g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num");
77
bvar::Adder<uint64_t> g_read_cache_direct_partial_num(
78
        "cached_remote_reader_cache_direct_partial_num");
79
bvar::Adder<uint64_t> g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num");
80
bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes(
81
        "cached_remote_reader_cache_direct_whole_bytes");
82
bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
83
        "cached_remote_reader_cache_direct_partial_bytes");
84
bvar::Adder<uint64_t> g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
85
bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
86
        "cached_remote_reader_cache_indirect_total_bytes");
87
bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found(
88
        "cached_remote_reader_self_heal_on_not_found");
89
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
90
        "cached_remote_reader_indirect_bytes_1min_window", &g_read_cache_indirect_bytes, 60);
91
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_total_bytes_1min_window(
92
        "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes,
93
        60);
94
bvar::Adder<uint64_t> g_failed_get_peer_addr_counter(
95
        "cached_remote_reader_failed_get_peer_addr_counter");
96
97
static std::atomic<int> g_active_peer_races {0};
98
bvar::PassiveStatus<int> g_active_peer_races_bvar(
99
        "peer_race_active_count",
100
3.27k
        [](void*) { return g_active_peer_races.load(std::memory_order_relaxed); }, nullptr);
101
// Cross-CG peer read race statistics
102
bvar::Adder<uint64_t> g_peer_race_peer_win("peer_race_peer_win");
103
bvar::Adder<uint64_t> g_peer_race_s3_win("peer_race_s3_win");
104
bvar::Adder<uint64_t> g_peer_race_both_fail("peer_race_both_fail");
105
bvar::Adder<uint64_t> g_peer_cross_compute_group_read("peer_cross_compute_group_read");
106
bvar::Adder<uint64_t> g_peer_same_compute_group_read("peer_same_compute_group_read");
107
bvar::Adder<uint64_t> g_peer_lazy_fetch_triggered("peer_lazy_fetch_triggered");
108
109
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader,
110
                                               const FileReaderOptions& opts)
111
7.17k
        : _is_doris_table(opts.is_doris_table),
112
7.17k
          _tablet_id(opts.tablet_id),
113
7.17k
          _storage_resource_id(opts.storage_resource_id),
114
7.17k
          _remote_file_reader(std::move(remote_file_reader)) {
115
7.17k
    DCHECK(!_is_doris_table || _tablet_id > 0);
116
7.17k
    if (_is_doris_table) {
117
7.17k
        _init_doris_table_cache();
118
7.17k
    } else {
119
6
        _init_external_table_cache(opts);
120
6
    }
121
7.17k
}
122
123
7.17k
void CachedRemoteFileReader::_init_doris_table_cache() {
124
7.17k
    _cache_hash = BlockFileCache::hash(path().filename().native());
125
7.17k
    _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
126
7.17k
    if (_can_read_cache_file_directly()) {
127
        // this is designed for and test in doris table, external table need extra tests
128
16
        _cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
129
16
    }
130
7.17k
}
131
132
8
void CachedRemoteFileReader::_init_external_table_cache(const FileReaderOptions& opts) {
133
    // Use path and modification time to build cache key.
134
8
    std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
135
8
    _cache_hash = BlockFileCache::hash(unique_path);
136
8
    if (opts.cache_base_path.empty()) {
137
        // If cache path is not specified by session variable, choose randomly.
138
1
        _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
139
1
        return;
140
1
    }
141
142
    // From query session variable: file_cache_base_path.
143
7
    _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
144
7
    if (_cache != nullptr) {
145
6
        return;
146
6
    }
147
148
7
    LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path
149
1
                 << ", using random instead.";
150
1
    _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
151
1
}
152
153
1.43M
bool CachedRemoteFileReader::_can_read_cache_file_directly() const {
154
1.43M
    return _is_doris_table && config::enable_read_cache_file_directly;
155
1.43M
}
156
157
5.18k
bool CachedRemoteFileReader::_should_read_from_peer(const IOContext* io_ctx) const {
158
5.18k
    return doris::config::is_cloud_mode() && _is_doris_table && _tablet_id > 0 &&
159
5.18k
           !io_ctx->is_warmup && !io_ctx->bypass_peer_read &&
160
5.18k
           doris::config::enable_cache_read_from_peer;
161
5.18k
}
162
163
717k
void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
164
717k
    if (_can_read_cache_file_directly()) {
165
34
        std::lock_guard lock(_mtx);
166
34
        DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
167
34
        file_block->_owned_by_cached_reader = true;
168
34
        _cache_file_readers.emplace(file_block->offset(), std::move(file_block));
169
34
    }
170
717k
}
171
172
7.17k
CachedRemoteFileReader::~CachedRemoteFileReader() {
173
7.17k
    for (auto& it : _cache_file_readers) {
174
55
        it.second->_owned_by_cached_reader = false;
175
55
    }
176
7.17k
    static_cast<void>(close());
177
7.17k
}
178
179
8.19k
Status CachedRemoteFileReader::close() {
180
8.19k
    return _remote_file_reader->close();
181
8.19k
}
182
183
std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, size_t read_size,
184
718k
                                                               size_t length) {
185
718k
    size_t left = offset;
186
718k
    size_t right = offset + read_size - 1;
187
718k
    size_t align_left =
188
718k
            (left / config::file_cache_each_block_size) * config::file_cache_each_block_size;
189
718k
    size_t align_right =
190
718k
            (right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size;
191
718k
    align_right = align_right < length ? align_right : length;
192
718k
    size_t align_size = align_right - align_left;
193
718k
    if (align_size < config::file_cache_each_block_size && align_left != 0) {
194
3
        align_size += config::file_cache_each_block_size;
195
3
        align_left -= config::file_cache_each_block_size;
196
3
    }
197
718k
    return std::make_pair(align_left, align_size);
198
718k
}
199
200
namespace {
201
struct PeerFetchLayout {
202
    std::vector<size_t> block_offsets;
203
    std::vector<size_t> block_sizes;
204
    size_t total_size = 0;
205
};
206
207
9
bool is_fill_not_found(const Status& st, bool request_fill) {
208
9
    return request_fill && st.is<ErrorCode::NOT_FOUND>();
209
9
}
210
211
48
bool contains_file_block(const PeerFetchedBlockSet& fetched_blocks, const FileBlockSPtr& block) {
212
48
    return fetched_blocks.contains(block.get());
213
48
}
214
215
5.24k
size_t clip_peer_block_size(const FileBlock::Range& range, size_t file_size) {
216
5.24k
    if (range.left >= file_size) {
217
0
        return 0;
218
0
    }
219
5.24k
    return std::min(file_size - range.left, range.size());
220
5.24k
}
221
222
PeerFetchLayout build_peer_fetch_layout(const std::vector<FileBlockSPtr>& blocks,
223
5.18k
                                        size_t file_size) {
224
5.18k
    PeerFetchLayout layout;
225
5.18k
    layout.block_offsets.reserve(blocks.size());
226
5.18k
    layout.block_sizes.reserve(blocks.size());
227
5.24k
    for (const auto& block : blocks) {
228
5.24k
        const size_t block_size = clip_peer_block_size(block->range(), file_size);
229
5.24k
        layout.block_offsets.push_back(layout.total_size);
230
5.24k
        layout.block_sizes.push_back(block_size);
231
5.24k
        layout.total_size += block_size;
232
5.24k
    }
233
5.18k
    return layout;
234
5.18k
}
235
236
Status write_peer_payloads_into_block(const FileBlockSPtr& block,
237
                                      std::vector<const PeerFetchChunk*>& chunks,
238
40
                                      size_t* block_size) {
239
40
    if (block_size == nullptr) {
240
0
        return Status::InvalidArgument("peer block write requires non-null block_size");
241
0
    }
242
40
    *block_size = 0;
243
40
    if (chunks.empty()) {
244
0
        return Status::OK();
245
0
    }
246
40
    std::sort(chunks.begin(), chunks.end(),
247
40
              [](const PeerFetchChunk* lhs, const PeerFetchChunk* rhs) {
248
0
                  return lhs->block_offset < rhs->block_offset;
249
0
              });
250
40
    butil::IOBuf payload;
251
40
    for (const auto* chunk : chunks) {
252
40
        *block_size += chunk->payload.length();
253
40
        payload.append(chunk->payload);
254
40
    }
255
40
    DCHECK(*block_size != 0);
256
40
    return block->append_iobuf(payload);
257
40
}
258
259
void copy_peer_chunk_to_result(const PeerFetchChunk& chunk, size_t offset, size_t right_offset,
260
                               size_t already_read, Slice result, size_t& indirect_read_bytes,
261
37
                               SourceReadBreakdown& source_read_breakdown) {
262
37
    const size_t payload_size = chunk.payload.length();
263
37
    if (payload_size == 0) {
264
0
        return;
265
0
    }
266
37
    const size_t chunk_left = chunk.block_offset;
267
37
    const size_t chunk_right = chunk_left + payload_size - 1;
268
37
    const size_t copy_left_offset = std::max(offset + already_read, chunk_left);
269
37
    const size_t copy_right_offset = std::min(right_offset, chunk_right);
270
37
    if (copy_left_offset > copy_right_offset) {
271
0
        return;
272
0
    }
273
37
    const size_t copy_offset = copy_left_offset - chunk_left;
274
37
    const size_t copy_size = copy_right_offset - copy_left_offset + 1;
275
37
    char* dst = result.data + (copy_left_offset - offset);
276
37
    chunk.payload.copy_to(dst, copy_size, copy_offset);
277
37
    indirect_read_bytes += copy_size;
278
37
    source_read_breakdown.peer_bytes += copy_size;
279
37
}
280
281
// Execute peer read targeting a specific host:port.
282
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks,
283
                         PeerFetchResult* peer_result, const std::string& file_path,
284
                         size_t file_size, bool is_doris_table, ReadStatistics& stats,
285
12
                         const IOContext* io_ctx, const std::string& host, int32_t port) {
286
12
    VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port
287
0
               << ", file_path=" << file_path;
288
289
12
    if (host.empty() || port == 0) {
290
0
        g_failed_get_peer_addr_counter << 1;
291
0
        LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is empty"
292
0
                                  << ", host=" << host << ", port=" << port
293
0
                                  << ", file_path=" << file_path;
294
0
        return Status::InternalError<false>("host or port is empty");
295
0
    }
296
12
    SCOPED_RAW_TIMER(&stats.peer_read_timer);
297
12
    peer_read_counter << 1;
298
12
    PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
299
    // Serial peer read: source BE has the data from rebalance warm-up; no fill needed.
300
12
    auto st = peer_reader.fetch_blocks(empty_blocks, peer_result, file_size, io_ctx,
301
12
                                       /*request_fill=*/false);
302
12
    if (!st.ok()) {
303
4
        LOG_WARNING("PeerFileCacheReader read from peer failed")
304
4
                .tag("host", host)
305
4
                .tag("port", port)
306
4
                .tag("error", st.msg());
307
4
    }
308
12
    stats.from_peer_cache = st.ok();
309
12
    return st;
310
12
}
311
312
// Execute S3 read
313
Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer,
314
                       ReadStatistics& stats, const IOContext* io_ctx,
315
5.15k
                       FileReaderSPtr remote_file_reader) {
316
5.15k
    s3_read_counter << 1;
317
5.15k
    SCOPED_RAW_TIMER(&stats.remote_read_timer);
318
5.15k
    stats.from_peer_cache = false;
319
5.15k
    return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx);
320
5.15k
}
321
322
18
CloudWarmUpManager& get_warm_up_manager() {
323
18
    return ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
324
18
}
325
326
// Shared state for peer-vs-S3 winner race.
327
// Uses bthread primitives — never std::mutex/condition_variable in bthread context.
328
struct RaceState {
329
    bthread::Mutex mtx;
330
    bthread::ConditionVariable cv;
331
    int winner = -1; // 0=peer won, 1=s3 won, -1=undecided, -2=both failed
332
    bool peer_done = false;
333
    bool s3_done = false;
334
    Status peer_status;
335
    Status s3_status;
336
    std::unique_ptr<char[]> s3_buf;
337
    PeerFetchResult peer_res;
338
    std::string peer_winner_cg_id; // compute_group_id of the winning peer candidate
339
    std::string peer_winner_host;  // host of the winning peer candidate
340
    int64_t peer_elapsed_ns = 0;   // wall-clock time of the entire peer path (including retries)
341
    int64_t peer_winner_io_ns = 0; // I/O time of the winning candidate only
342
};
343
344
// Peer race logic: try candidates sequentially until one succeeds or all fail.
345
// NOTE: Do NOT capture io_ctx here — it points into the caller's stack which may be destroyed
346
// when S3 wins the race and the caller returns before this bthread finishes.
347
void run_peer_race(std::shared_ptr<RaceState> race, std::vector<FileBlockSPtr> empty_blocks,
348
                   const std::string& file_path, size_t file_sz, bool is_doris,
349
                   std::shared_ptr<CloudWarmUpManager> manager,
350
                   std::vector<doris::PeerCandidate> candidates, int64_t tablet_id,
351
14
                   std::string resource_id, std::shared_ptr<ResourceContext> parent_resource_ctx) {
352
14
    std::unique_ptr<AttachTask> attach_task;
353
14
    if (parent_resource_ctx != nullptr) {
354
3
        attach_task = std::make_unique<AttachTask>(parent_resource_ctx);
355
3
    }
356
357
14
    bool all_tried = true;
358
14
    MonotonicStopWatch peer_sw;
359
14
    peer_sw.start();
360
361
22
    for (size_t i = 0; i < candidates.size(); ++i) {
362
        // Before issuing the next RPC, check if S3 already won.
363
17
        if (i > 0) {
364
3
            TEST_SYNC_POINT("run_peer_race::between_candidates");
365
3
            std::unique_lock<bthread::Mutex> lk(race->mtx);
366
3
            if (race->winner > 0) {
367
                // S3 already won — stop, but not all candidates were tried.
368
1
                all_tried = false;
369
1
                break;
370
1
            }
371
3
        }
372
373
16
        const auto& cand = candidates[i];
374
16
        peer_read_counter << 1;
375
16
        PeerFileCacheReader peer_reader(file_path, is_doris, cand.host, cand.brpc_port);
376
16
        PeerFetchResult local_peer_res;
377
16
        const bool request_fill =
378
16
                !config::peer_cache_fill_compute_group_id.empty() &&
379
16
                cand.compute_group_id == config::peer_cache_fill_compute_group_id &&
380
16
                !resource_id.empty() && !file_path.empty();
381
16
        MonotonicStopWatch cand_sw;
382
16
        cand_sw.start();
383
16
        auto st = peer_reader.fetch_blocks(empty_blocks, &local_peer_res, file_sz,
384
16
                                           /*ctx=*/nullptr, request_fill, tablet_id, resource_id);
385
16
        if (st.ok()) {
386
7
            manager->update_peer_candidate_on_success(tablet_id, cand.compute_group_id);
387
7
            std::unique_lock<bthread::Mutex> lk(race->mtx);
388
7
            if (race->winner < 0) {
389
7
                race->winner = 0;
390
7
                race->peer_res = std::move(local_peer_res);
391
7
                race->peer_winner_cg_id = cand.compute_group_id;
392
7
                race->peer_winner_host = cand.host;
393
7
                race->peer_elapsed_ns = peer_sw.elapsed_time();
394
7
                race->peer_winner_io_ns = cand_sw.elapsed_time();
395
7
            }
396
7
            race->peer_done = true;
397
7
            race->peer_status = Status::OK();
398
7
            race->cv.notify_all();
399
7
            return;
400
7
        }
401
402
        // Handle per-candidate failure.
403
9
        if (st.template is<ErrorCode::TOO_MANY_TASKS>()) {
404
0
            all_tried = false;
405
0
            break;
406
0
        }
407
9
        if (is_fill_not_found(st, request_fill)) {
408
            // Pull-through fill already told us this designated fill CG could not serve the block
409
            // in time. Do not serially retry additional candidates in the same race; let S3 win
410
            // instead of paying more peer RPC latency.
411
1
            manager->rotate_peer_candidate_on_cache_miss(tablet_id, cand.host, cand.brpc_port);
412
1
            all_tried = false;
413
1
            break;
414
1
        }
415
8
        if (st.template is<ErrorCode::NOT_FOUND>()) {
416
5
            manager->rotate_peer_candidate_on_cache_miss(tablet_id, cand.host, cand.brpc_port);
417
5
        } else {
418
3
            manager->update_peer_candidate_on_rpc_failure(tablet_id, cand.host, cand.brpc_port);
419
3
        }
420
8
    }
421
422
7
    if (all_tried) {
423
5
        manager->record_peer_all_miss(tablet_id);
424
5
    }
425
7
    std::unique_lock<bthread::Mutex> lk(race->mtx);
426
7
    race->peer_done = true;
427
7
    race->peer_status = Status::InternalError<false>("peer: all candidates failed");
428
7
    if (race->winner < 0 && race->s3_done) {
429
0
        race->winner = race->s3_status.ok() ? 1 : -2;
430
0
    }
431
7
    race->cv.notify_all();
432
7
}
433
434
// Apply hedge delay, then submit S3 read to the thread pool (or run inline).
435
void launch_s3_race(std::shared_ptr<RaceState> race, size_t empty_start, size_t span_size,
436
                    const IOContext* io_ctx, FileReaderSPtr remote_reader,
437
                    std::shared_ptr<ResourceContext> parent_resource_ctx,
438
14
                    std::shared_ptr<CachedRemoteFileReader> owner) {
439
    // Raw S3 read body.
440
    // `owner` keeps the CachedRemoteFileReader alive until the S3 task finishes,
441
    // preventing close() from being called on remote_reader while we are still reading.
442
    // Do NOT capture io_ctx: it points into the caller's stack/iterator which may be
443
    // destroyed when the query is cancelled before this background task runs. The S3
444
    // leg of the race is a best-effort background task whose result is discarded if the
445
    // peer wins; passing nullptr is safe because S3FileReader::read_at_impl ignores it.
446
14
    auto do_s3_read = [race, empty_start, span_size, remote_reader, owner]() {
447
8
        (void)owner;
448
8
        auto s3_buf = std::make_unique<char[]>(span_size);
449
8
        size_t read_size = span_size;
450
8
        s3_read_counter << 1;
451
8
        TEST_SYNC_POINT("CachedRemoteFileReader::_execute_winner_race::s3_before_read");
452
8
        auto st = remote_reader->read_at(empty_start, Slice(s3_buf.get(), span_size), &read_size,
453
8
                                         nullptr);
454
8
        std::unique_lock<bthread::Mutex> lk(race->mtx);
455
8
        race->s3_done = true;
456
8
        race->s3_status = st;
457
8
        if (st.ok() && race->winner < 0) {
458
7
            race->winner = 1;
459
7
            race->s3_buf = std::move(s3_buf);
460
7
        }
461
8
        race->cv.notify_all();
462
8
    };
463
464
    // Hedge delay: give peer a head start, but wake up early if peer finishes.
465
    // Uses cv.wait_for() instead of bthread_usleep() so the calling thread is
466
    // unblocked as soon as the peer bthread signals completion, avoiding the
467
    // unconditional 20ms sleep that dominated latency on cache-miss-heavy queries.
468
14
    bool peer_already_won = false;
469
14
    if (config::peer_race_hedge_delay_ms > 0) {
470
13
        std::unique_lock<bthread::Mutex> lk(race->mtx);
471
13
        if (!race->peer_done) {
472
13
            race->cv.wait_for(lk, static_cast<long>(config::peer_race_hedge_delay_ms) * 1000);
473
13
        }
474
13
        peer_already_won = (race->winner == 0);
475
13
        if (peer_already_won) {
476
6
            race->s3_done = true;
477
6
            race->s3_status = Status::InternalError<false>("skipped: peer won during hedge delay");
478
6
        }
479
13
    }
480
481
14
    if (!peer_already_won) {
482
8
        auto s3_fn = [do_s3_read, parent_resource_ctx]() mutable {
483
7
            std::unique_ptr<AttachTask> attach_task;
484
7
            if (parent_resource_ctx != nullptr) {
485
1
                attach_task = std::make_unique<AttachTask>(parent_resource_ctx);
486
1
            }
487
7
            do_s3_read();
488
7
        };
489
8
        auto* s3_pool = ExecEnv::GetInstance()->peer_race_s3_thread_pool();
490
8
        if (s3_pool == nullptr || !s3_pool->submit_func(s3_fn).ok()) {
491
1
            do_s3_read();
492
1
        }
493
8
    }
494
14
}
495
496
// Wait for the race to finish and populate the output accordingly.
497
Status collect_race_result(std::shared_ptr<RaceState> race, size_t span_size,
498
                           std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result,
499
14
                           ReadStatistics& stats, const IOContext* io_ctx) {
500
14
    {
501
14
        std::unique_lock<bthread::Mutex> lk(race->mtx);
502
21
        while (race->winner < 0 && !(race->peer_done && race->s3_done)) {
503
7
            race->cv.wait(lk);
504
7
        }
505
14
    }
506
14
    g_active_peer_races.fetch_sub(1, std::memory_order_relaxed);
507
508
14
    const std::string self_cg_id =
509
14
            static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info())
510
14
                    ->cloud_compute_group_id();
511
512
14
    if (race->winner == 0) {
513
        // Peer won.
514
7
        if (peer_result != nullptr) {
515
7
            *peer_result = std::move(race->peer_res);
516
7
        }
517
7
        stats.from_peer_cache = true;
518
7
        stats.peer_read_timer += race->peer_elapsed_ns;
519
7
        g_peer_race_peer_win << 1;
520
7
        const bool is_cross_cg =
521
7
                !race->peer_winner_cg_id.empty() && race->peer_winner_cg_id != self_cg_id;
522
7
        if (is_cross_cg) {
523
5
            g_peer_cross_compute_group_read << 1;
524
5
        } else {
525
2
            g_peer_same_compute_group_read << 1;
526
2
        }
527
7
        if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) {
528
7
            io_ctx->file_cache_stats->num_peer_race_peer_win++;
529
7
            io_ctx->file_cache_stats->peer_hosts.insert(race->peer_winner_host);
530
7
            if (is_cross_cg) {
531
5
                io_ctx->file_cache_stats->num_cross_cg_peer_io_total++;
532
5
                io_ctx->file_cache_stats->bytes_read_from_cross_cg_peer += span_size;
533
5
                io_ctx->file_cache_stats->cross_cg_peer_io_timer += race->peer_winner_io_ns;
534
5
            } else {
535
2
                io_ctx->file_cache_stats->num_same_cg_peer_io_total++;
536
2
                io_ctx->file_cache_stats->bytes_read_from_same_cg_peer += span_size;
537
2
                io_ctx->file_cache_stats->same_cg_peer_io_timer += race->peer_winner_io_ns;
538
2
            }
539
7
        }
540
7
        return Status::OK();
541
7
    } else if (race->winner == 1) {
542
        // S3 won.
543
7
        buffer = std::move(race->s3_buf);
544
7
        stats.from_peer_cache = false;
545
7
        g_peer_race_s3_win << 1;
546
7
        if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) {
547
7
            io_ctx->file_cache_stats->num_peer_race_s3_win++;
548
7
        }
549
7
        return Status::OK();
550
7
    }
551
0
    g_peer_race_both_fail << 1;
552
0
    return Status::InternalError<false>("peer race: both peer and s3 failed");
553
14
}
554
555
} // anonymous namespace
556
557
Status CachedRemoteFileReader::_execute_s3_fallback(size_t empty_start, size_t span_size,
558
                                                    std::unique_ptr<char[]>& buffer,
559
                                                    PeerFetchResult* peer_result,
560
                                                    ReadStatistics& stats,
561
5.15k
                                                    const IOContext* io_ctx) {
562
5.15k
    if (peer_result != nullptr) {
563
5.15k
        peer_result->clear();
564
5.15k
    }
565
5.15k
    buffer.reset(new char[span_size]);
566
5.15k
    size_t read_size = span_size;
567
5.15k
    return execute_s3_read(empty_start, read_size, buffer, stats, io_ctx, _remote_file_reader);
568
5.15k
}
569
570
Status CachedRemoteFileReader::_execute_sequential_peer_read(
571
        const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, size_t span_size,
572
        std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, ReadStatistics& stats,
573
        const IOContext* io_ctx, const std::vector<doris::PeerCandidate>& candidates,
574
1
        int64_t tablet_id) {
575
    // candidates[0] already reflects last_successful_compute_group_id affinity:
576
    // get_peer_candidates() applies stable_partition before returning.
577
1
    if (candidates.empty()) {
578
0
        return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx);
579
0
    }
580
581
1
    auto& manager = get_warm_up_manager();
582
1
    PeerFetchResult serial_res;
583
1
    const int64_t timer_before = stats.peer_read_timer;
584
1
    auto st = execute_peer_read(empty_blocks, &serial_res, path().native(), this->size(),
585
1
                                _is_doris_table, stats, io_ctx, candidates[0].host,
586
1
                                candidates[0].brpc_port);
587
1
    if (st.ok()) {
588
1
        manager.update_peer_candidate_on_success(tablet_id, candidates[0].compute_group_id);
589
1
        if (peer_result != nullptr) {
590
1
            *peer_result = std::move(serial_res);
591
1
        }
592
        // Update profile counters for cross/same CG stats.
593
1
        const std::string self_cg_id =
594
1
                static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info())
595
1
                        ->cloud_compute_group_id();
596
1
        const bool is_cross_cg = !candidates[0].compute_group_id.empty() &&
597
1
                                 candidates[0].compute_group_id != self_cg_id;
598
1
        if (is_cross_cg) {
599
1
            g_peer_cross_compute_group_read << 1;
600
1
        } else {
601
0
            g_peer_same_compute_group_read << 1;
602
0
        }
603
1
        if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) {
604
1
            io_ctx->file_cache_stats->peer_hosts.insert(candidates[0].host);
605
1
            if (is_cross_cg) {
606
1
                io_ctx->file_cache_stats->num_cross_cg_peer_io_total++;
607
1
                io_ctx->file_cache_stats->bytes_read_from_cross_cg_peer += span_size;
608
1
                io_ctx->file_cache_stats->cross_cg_peer_io_timer +=
609
1
                        stats.peer_read_timer - timer_before;
610
1
            } else {
611
0
                io_ctx->file_cache_stats->num_same_cg_peer_io_total++;
612
0
                io_ctx->file_cache_stats->bytes_read_from_same_cg_peer += span_size;
613
0
                io_ctx->file_cache_stats->same_cg_peer_io_timer +=
614
0
                        stats.peer_read_timer - timer_before;
615
0
            }
616
1
        }
617
1
        return st;
618
1
    }
619
    // Track failure so affinity / eviction logic stays consistent with the race path.
620
0
    if (st.is<ErrorCode::TOO_MANY_TASKS>()) {
621
        // Server healthy but overloaded — don't penalize candidate.
622
0
    } else if (st.is<ErrorCode::NOT_FOUND>()) {
623
0
        manager.rotate_peer_candidate_on_cache_miss(tablet_id, candidates[0].host,
624
0
                                                    candidates[0].brpc_port);
625
0
    } else {
626
0
        manager.update_peer_candidate_on_rpc_failure(tablet_id, candidates[0].host,
627
0
                                                     candidates[0].brpc_port);
628
0
    }
629
0
    return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx);
630
1
}
631
632
Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockSPtr>& empty_blocks,
633
                                                    size_t empty_start, size_t span_size,
634
                                                    std::unique_ptr<char[]>& buffer,
635
                                                    PeerFetchResult* peer_result,
636
                                                    ReadStatistics& stats,
637
5.18k
                                                    const IOContext* io_ctx) {
638
    // --- Non-peer path: direct S3 ---
639
5.18k
    if (!_should_read_from_peer(io_ctx)) {
640
5.15k
        return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx);
641
5.15k
    }
642
643
    // --- UT debug point: injected peer address ---
644
28
    DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
645
28
        std::string dp_host = dp->param<std::string>("host", "127.0.0.1");
646
28
        int32_t dp_port = dp->param("port", 9060);
647
28
        buffer.reset();
648
28
        DCHECK(peer_result != nullptr);
649
28
        peer_result->clear();
650
28
        auto st = execute_peer_read(empty_blocks, peer_result, path().native(), this->size(),
651
28
                                    _is_doris_table, stats, io_ctx, dp_host, dp_port);
652
28
        if (st.ok()) return st;
653
28
        return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx);
654
28
    });
655
656
    // --- Resolve tablet and obtain peer candidates ---
657
17
    int64_t tablet_id = _tablet_id;
658
17
    auto& manager = get_warm_up_manager();
659
17
    auto candidates = manager.get_peer_candidates(tablet_id);
660
17
    if (candidates.empty()) {
661
2
        if (!manager.is_peer_cooldown(tablet_id)) {
662
            // Cold miss: trigger background FE fetch and fall back to S3.
663
1
            g_peer_lazy_fetch_triggered << 1;
664
1
            auto manager_ptr =
665
1
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager_ptr();
666
1
            start_bthread([manager_ptr = std::move(manager_ptr), tablet_id]() {
667
1
                manager_ptr->fetch_candidates_from_fe(tablet_id);
668
1
            });
669
1
        }
670
2
        return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx);
671
2
    }
672
673
    // --- Dispatch: concurrent race or sequential fallback ---
674
    // Candidates are already sorted by last_successful_compute_group_id affinity
675
    // (stable_partition in get_peer_candidates), so the winner race peer bthread
676
    // naturally tries the most promising candidate first — whether same-CG or cross-CG.
677
15
    if (config::enable_peer_s3_race) {
678
14
        return _execute_winner_race(empty_blocks, empty_start, span_size, buffer, peer_result,
679
14
                                    stats, io_ctx, candidates, tablet_id);
680
14
    }
681
1
    return _execute_sequential_peer_read(empty_blocks, empty_start, span_size, buffer, peer_result,
682
1
                                         stats, io_ctx, candidates, tablet_id);
683
15
}
684
685
Status CachedRemoteFileReader::_execute_winner_race(
686
        const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, size_t span_size,
687
        std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, ReadStatistics& stats,
688
        const IOContext* io_ctx, const std::vector<doris::PeerCandidate>& candidates,
689
14
        int64_t tablet_id) {
690
    // Reserve a race slot; degrade to sequential if at limit.
691
14
    if (g_active_peer_races.fetch_add(1, std::memory_order_relaxed) >=
692
14
        config::max_concurrent_peer_races) {
693
0
        g_active_peer_races.fetch_sub(1, std::memory_order_relaxed);
694
0
        return _execute_sequential_peer_read(empty_blocks, empty_start, span_size, buffer,
695
0
                                             peer_result, stats, io_ctx, candidates, tablet_id);
696
0
    }
697
698
14
    auto race = std::make_shared<RaceState>();
699
14
    auto manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager_ptr();
700
701
    // Capture context for child threads.
702
14
    const std::string file_path = path().native();
703
14
    const size_t file_sz = this->size();
704
14
    const bool is_doris = _is_doris_table;
705
14
    auto remote_reader = _remote_file_reader;
706
14
    std::shared_ptr<ResourceContext> parent_resource_ctx;
707
14
    auto* parent_thread_context = thread_context();
708
14
    if (parent_thread_context != nullptr && parent_thread_context->is_attach_task()) {
709
3
        parent_resource_ctx = parent_thread_context->resource_ctx();
710
3
    }
711
712
    // Launch peer bthread.
713
14
    start_bthread(
714
14
            [race, empty_blocks = std::move(empty_blocks), file_path, file_sz, is_doris,
715
14
             manager = std::move(manager), candidates = std::move(candidates), tablet_id,
716
14
             resource_id = _storage_resource_id, parent_resource_ctx]() mutable {
717
14
                run_peer_race(race, std::move(empty_blocks), file_path, file_sz, is_doris,
718
14
                              std::move(manager), std::move(candidates), tablet_id,
719
14
                              std::move(resource_id), parent_resource_ctx);
720
14
            },
721
14
            /*init_thread_ctx=*/true);
722
723
    // Launch S3 (with optional hedge delay).
724
    // Pass shared_from_this() so the background S3 task holds a reference to this
725
    // reader, preventing destruction (and close()) until the S3 task completes.
726
14
    launch_s3_race(race, empty_start, span_size, io_ctx, remote_reader, parent_resource_ctx,
727
14
                   shared_from_this());
728
729
    // Collect race result.
730
14
    return collect_race_result(race, span_size, buffer, peer_result, stats, io_ctx);
731
14
}
732
733
bool CachedRemoteFileReader::_try_read_from_cached_files_directly(
734
        size_t offset, Slice result, size_t bytes_req, bool is_dryrun, ReadStatistics& stats,
735
718k
        SourceReadBreakdown& source_read_breakdown, size_t& already_read, size_t* bytes_read) {
736
718k
    if (!_can_read_cache_file_directly()) {
737
717k
        return false;
738
717k
    }
739
740
1.05k
    SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
741
1.05k
    size_t need_read_size = bytes_req;
742
1.05k
    std::shared_lock lock(_mtx);
743
1.05k
    if (_cache_file_readers.empty()) {
744
14
        return false;
745
14
    }
746
747
1.04k
    auto iter = _cache_file_readers.upper_bound(offset);
748
1.04k
    if (iter != _cache_file_readers.begin()) {
749
1.01k
        --iter;
750
1.01k
    }
751
752
1.04k
    size_t current_offset = offset;
753
2.48k
    while (need_read_size != 0 && iter != _cache_file_readers.end()) {
754
1.44k
        if (iter->second->offset() > current_offset ||
755
1.44k
            iter->second->range().right < current_offset) {
756
6
            break;
757
6
        }
758
759
1.43k
        size_t file_offset = current_offset - iter->second->offset();
760
1.43k
        size_t reserve_bytes = std::min(need_read_size, iter->second->range().size() - file_offset);
761
1.43k
        if (is_dryrun) [[unlikely]] {
762
1
            g_skip_local_cache_io_sum_bytes << reserve_bytes;
763
1.43k
        } else {
764
1.43k
            SCOPED_RAW_TIMER(&stats.local_read_timer);
765
1.43k
            if (!iter->second
766
1.43k
                         ->read(Slice(result.data + (current_offset - offset), reserve_bytes),
767
1.43k
                                file_offset)
768
1.43k
                         .ok()) { // TODO: maybe read failed because block evict, should handle error
769
0
                break;
770
0
            }
771
1.43k
            source_read_breakdown.local_bytes += reserve_bytes;
772
1.43k
        }
773
774
1.43k
        _cache->add_need_update_lru_block(iter->second);
775
1.43k
        need_read_size -= reserve_bytes;
776
1.43k
        current_offset += reserve_bytes;
777
1.43k
        already_read += reserve_bytes;
778
1.43k
        ++iter;
779
1.43k
    }
780
781
1.04k
    if (need_read_size == 0) {
782
1.00k
        *bytes_read = bytes_req;
783
1.00k
        stats.hit_cache = true;
784
1.00k
        g_read_cache_direct_whole_num << 1;
785
1.00k
        g_read_cache_direct_whole_bytes << bytes_req;
786
1.00k
        return true;
787
1.00k
    }
788
789
37
    g_read_cache_direct_partial_num << 1;
790
37
    g_read_cache_direct_partial_bytes << already_read;
791
37
    return false;
792
1.04k
}
793
794
std::vector<FileBlockSPtr> CachedRemoteFileReader::_collect_remote_read_blocks(
795
717k
        const FileBlocksHolder& holder, ReadStatistics& stats) {
796
717k
    std::vector<FileBlockSPtr> empty_blocks;
797
717k
    for (auto& block : holder.file_blocks) {
798
717k
        switch (block->state()) {
799
5.24k
        case FileBlock::State::EMPTY:
800
5.24k
            VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}",
801
0
                                      path().native(), _cache_hash.to_string(), _cache_hash.high(),
802
0
                                      _cache_hash.low(), block->offset(), block->get_cache_file());
803
5.24k
            block->get_or_set_downloader();
804
5.24k
            if (block->is_downloader()) {
805
5.24k
                empty_blocks.push_back(block);
806
5.24k
                TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY");
807
5.24k
            }
808
5.24k
            stats.hit_cache = false;
809
5.24k
            break;
810
3
        case FileBlock::State::SKIP_CACHE:
811
3
            VLOG_DEBUG << fmt::format(
812
0
                    "Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}",
813
0
                    path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(),
814
0
                    block->offset(), block->get_cache_file());
815
3
            empty_blocks.push_back(block);
816
3
            stats.hit_cache = false;
817
3
            stats.skip_cache = true;
818
3
            break;
819
2
        case FileBlock::State::DOWNLOADING:
820
2
            stats.hit_cache = false;
821
2
            break;
822
712k
        case FileBlock::State::DOWNLOADED:
823
712k
            _insert_file_reader(block);
824
712k
            break;
825
717k
        }
826
717k
    }
827
717k
    return empty_blocks;
828
717k
}
829
830
Status CachedRemoteFileReader::_read_remote_blocks_into_cache(
831
        const std::vector<FileBlockSPtr>& empty_blocks, size_t offset, size_t bytes_req,
832
        size_t already_read, Slice result, bool is_dryrun, ReadStatistics& stats,
833
        SourceReadBreakdown& source_read_breakdown, const IOContext* io_ctx,
834
        size_t& indirect_read_bytes, size_t& empty_start, size_t& empty_end,
835
717k
        PeerFetchedBlockSet& peer_fetched_blocks) {
836
717k
    empty_start = 0;
837
717k
    empty_end = 0;
838
717k
    peer_fetched_blocks.clear();
839
717k
    if (empty_blocks.empty()) {
840
712k
        return Status::OK();
841
712k
    }
842
843
5.18k
    empty_start = empty_blocks.front()->range().left;
844
5.18k
    empty_end = empty_blocks.back()->range().right;
845
5.18k
    const size_t span_read_size = empty_end - empty_start + 1;
846
5.18k
    const auto peer_fetch_layout = build_peer_fetch_layout(empty_blocks, size());
847
5.18k
    std::unique_ptr<char[]> buffer;
848
5.18k
    PeerFetchResult peer_result;
849
850
5.18k
    RETURN_IF_ERROR(_execute_remote_read(empty_blocks, empty_start, span_read_size, buffer,
851
5.18k
                                         &peer_result, stats, io_ctx));
852
853
5.18k
    std::vector<std::vector<const PeerFetchChunk*>> peer_chunks_by_block;
854
5.18k
    if (stats.from_peer_cache) {
855
        // Peer returns sparse payloads; remember the exact sparse blocks that were filled.
856
15
        peer_fetched_blocks.reserve(empty_blocks.size());
857
40
        for (const auto& block : empty_blocks) {
858
40
            peer_fetched_blocks.insert(block.get());
859
40
        }
860
15
        peer_chunks_by_block.resize(empty_blocks.size());
861
40
        for (const auto& chunk : peer_result.chunks) {
862
40
            DCHECK_LT(chunk.block_index, empty_blocks.size());
863
40
            peer_chunks_by_block[chunk.block_index].push_back(&chunk);
864
40
        }
865
15
    }
866
867
5.18k
    SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_write_back);
868
10.4k
    for (size_t idx = 0; idx < empty_blocks.size(); ++idx) {
869
5.24k
        auto& block = empty_blocks[idx];
870
5.24k
        if (block->state() == FileBlock::State::SKIP_CACHE) {
871
3
            continue;
872
3
        }
873
874
5.24k
        SCOPED_RAW_TIMER(&stats.local_write_timer);
875
5.24k
        size_t block_size = block->range().size();
876
5.24k
        Status st;
877
5.24k
        if (stats.from_peer_cache) {
878
40
            block_size = peer_fetch_layout.block_sizes[idx];
879
40
            if (block_size == 0) {
880
0
                continue;
881
0
            }
882
40
            st = write_peer_payloads_into_block(block, peer_chunks_by_block[idx], &block_size);
883
5.20k
        } else {
884
5.20k
            char* current_ptr = buffer.get() + block->range().left - empty_start;
885
5.20k
            st = block->append(Slice(current_ptr, block_size));
886
5.20k
        }
887
5.24k
        if (st.ok()) {
888
5.24k
            st = block->finalize();
889
5.24k
        }
890
5.24k
        if (!st.ok()) {
891
2
            LOG(WARNING) << "write data to file cache failed, source="
892
2
                         << (stats.from_peer_cache ? "peer" : "remote")
893
2
                         << ", path=" << path().native() << ", tablet_id=" << _tablet_id
894
2
                         << ", file_size=" << size() << ", cache_hash=" << _cache_hash.to_string()
895
2
                         << ", write_block_size=" << block_size
896
2
                         << ", block=" << block->get_info_for_log()
897
2
                         << ", cache_file=" << block->get_cache_file() << ", err=" << st;
898
5.24k
        } else {
899
5.24k
            _insert_file_reader(block);
900
5.24k
            stats.bytes_write_into_file_cache += block_size;
901
5.24k
        }
902
5.24k
    }
903
904
5.18k
    const size_t right_offset = offset + bytes_req - 1;
905
5.18k
    if (stats.from_peer_cache) {
906
15
        if (is_dryrun) {
907
1
            return Status::OK();
908
1
        }
909
37
        for (const auto& chunk : peer_result.chunks) {
910
37
            copy_peer_chunk_to_result(chunk, offset, right_offset, already_read, result,
911
37
                                      indirect_read_bytes, source_read_breakdown);
912
37
        }
913
14
        return Status::OK();
914
15
    }
915
916
5.16k
    if (empty_start <= right_offset && empty_end >= offset + already_read && !is_dryrun) {
917
5.16k
        size_t copy_left_offset = std::max(offset + already_read, empty_start);
918
5.16k
        size_t copy_right_offset = std::min(right_offset, empty_end);
919
5.16k
        char* dst = result.data + (copy_left_offset - offset);
920
5.16k
        char* src = buffer.get() + (copy_left_offset - empty_start);
921
5.16k
        size_t copy_size = copy_right_offset - copy_left_offset + 1;
922
5.16k
        memcpy(dst, src, copy_size);
923
5.16k
        indirect_read_bytes += copy_size;
924
5.16k
        source_read_breakdown.remote_bytes += copy_size;
925
5.16k
    }
926
5.16k
    return Status::OK();
927
5.18k
}
928
929
Status CachedRemoteFileReader::_read_remaining_blocks_from_cache(
930
        const FileBlocksHolder& holder, size_t offset, size_t bytes_req, Slice result,
931
        bool is_dryrun, size_t empty_start, size_t empty_end,
932
        const PeerFetchedBlockSet& peer_fetched_blocks, ReadStatistics& stats,
933
        SourceReadBreakdown& source_read_breakdown, size_t& indirect_read_bytes, size_t* bytes_read,
934
717k
        const IOContext* io_ctx) {
935
717k
    size_t current_offset = offset + *bytes_read;
936
717k
    size_t end_offset = offset + bytes_req - 1;
937
717k
    bool need_self_heal = false;
938
717k
    for (auto& block : holder.file_blocks) {
939
717k
        if (current_offset > end_offset) {
940
0
            break;
941
0
        }
942
943
717k
        size_t left = block->range().left;
944
717k
        size_t right = block->range().right;
945
717k
        if (right < offset) {
946
1
            continue;
947
1
        }
948
949
717k
        size_t read_size =
950
717k
                end_offset > right ? right - current_offset + 1 : end_offset - current_offset + 1;
951
717k
        if (!peer_fetched_blocks.empty() && contains_file_block(peer_fetched_blocks, block)) {
952
            // For sparse peer reads, skip only blocks fetched from peer. Other blocks inside the
953
            // enclosing span may still come from local cache.
954
41
            *bytes_read += read_size;
955
41
            current_offset = right + 1;
956
41
            continue;
957
41
        }
958
717k
        if (peer_fetched_blocks.empty() && empty_start <= left && right <= empty_end) {
959
5.20k
            *bytes_read += read_size;
960
5.20k
            current_offset = right + 1;
961
5.20k
            continue;
962
5.20k
        }
963
964
712k
        FileBlock::State block_state = block->state();
965
712k
        int64_t wait_time = 0;
966
712k
        static int64_t max_wait_time = 10;
967
712k
        TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time);
968
712k
        if (block_state != FileBlock::State::DOWNLOADED) {
969
3
            SCOPED_CONCURRENCY_COUNT(
970
3
                    ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
971
4
            do {
972
4
                SCOPED_RAW_TIMER(&stats.remote_wait_timer);
973
4
                TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING");
974
4
                block_state = block->wait();
975
4
                if (block_state != FileBlock::State::DOWNLOADING) {
976
2
                    break;
977
2
                }
978
4
            } while (++wait_time < max_wait_time);
979
3
        }
980
712k
        if (wait_time == max_wait_time) [[unlikely]] {
981
1
            LOG_WARNING("Waiting too long for the download to complete");
982
1
        }
983
984
712k
        Status st;
985
        /*
986
         * If block_state == EMPTY, the thread reads the data from remote.
987
         * If block_state == DOWNLOADED, when the cache file is deleted by the other process,
988
         * the thread reads the data from remote too.
989
         */
990
712k
        if (block_state == FileBlock::State::DOWNLOADED) {
991
712k
            if (is_dryrun) [[unlikely]] {
992
1
                g_skip_local_cache_io_sum_bytes << read_size;
993
712k
            } else {
994
712k
                size_t file_offset = current_offset - left;
995
712k
                SCOPED_RAW_TIMER(&stats.local_read_timer);
996
712k
                SCOPED_CONCURRENCY_COUNT(
997
712k
                        ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
998
712k
                st = block->read(Slice(result.data + (current_offset - offset), read_size),
999
712k
                                 file_offset);
1000
712k
                indirect_read_bytes += read_size;
1001
712k
                if (st.ok()) {
1002
712k
                    source_read_breakdown.local_bytes += read_size;
1003
712k
                }
1004
712k
            }
1005
712k
            if (block_state == FileBlock::State::DOWNLOADED && st.is<ErrorCode::NOT_FOUND>()) {
1006
1
                need_self_heal = true;
1007
1
                g_read_cache_self_heal_on_not_found << 1;
1008
1
                LOG_EVERY_N(WARNING, 100)
1009
1
                        << "Cache block file is missing, will self-heal by clearing cache hash. "
1010
1
                        << "path=" << path().native() << ", hash=" << _cache_hash.to_string()
1011
1
                        << ", offset=" << left << ", err=" << st.msg();
1012
1
            }
1013
712k
        }
1014
712k
        if (!st || block_state != FileBlock::State::DOWNLOADED) {
1015
4
            if (is_dryrun) [[unlikely]] {
1016
0
                *bytes_read += read_size;
1017
0
                current_offset = right + 1;
1018
0
                continue;
1019
0
            }
1020
4
            LOG(WARNING) << "Read data failed from file cache downloaded by others. err="
1021
4
                         << st.msg() << ", block state=" << block_state;
1022
4
            size_t remote_bytes_read {0};
1023
4
            stats.hit_cache = false;
1024
4
            stats.from_peer_cache = false;
1025
4
            s3_read_counter << 1;
1026
4
            SCOPED_RAW_TIMER(&stats.remote_read_timer);
1027
4
            RETURN_IF_ERROR(_remote_file_reader->read_at(
1028
4
                    current_offset, Slice(result.data + (current_offset - offset), read_size),
1029
4
                    &remote_bytes_read, io_ctx));
1030
3
            indirect_read_bytes += read_size;
1031
3
            source_read_breakdown.remote_bytes += remote_bytes_read;
1032
3
            DCHECK(remote_bytes_read == read_size);
1033
3
        }
1034
1035
712k
        *bytes_read += read_size;
1036
712k
        current_offset = right + 1;
1037
712k
    }
1038
717k
    if (need_self_heal && _cache != nullptr) {
1039
1
        _cache->remove_if_cached_async(_cache_hash);
1040
1
    }
1041
717k
    return Status::OK();
1042
717k
}
1043
1044
Status CachedRemoteFileReader::_read_from_indirect_cache(size_t offset, Slice result,
1045
                                                         size_t bytes_req, size_t already_read,
1046
                                                         bool is_dryrun, size_t* bytes_read,
1047
                                                         ReadStatistics& stats,
1048
                                                         SourceReadBreakdown& source_read_breakdown,
1049
717k
                                                         const IOContext* io_ctx) {
1050
717k
    g_read_cache_indirect_num << 1;
1051
717k
    size_t indirect_read_bytes = 0;
1052
717k
    auto [align_left, align_size] =
1053
717k
            s_align_size(offset + already_read, bytes_req - already_read, size());
1054
717k
    CacheContext cache_context(io_ctx);
1055
717k
    cache_context.stats = &stats;
1056
717k
    MonotonicStopWatch sw;
1057
717k
    sw.start();
1058
717k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment();
1059
717k
    FileBlocksHolder holder =
1060
717k
            _cache->get_or_set(_cache_hash, align_left, align_size, cache_context);
1061
717k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement();
1062
717k
    stats.cache_get_or_set_timer += sw.elapsed_time();
1063
1064
717k
    auto empty_blocks = _collect_remote_read_blocks(holder, stats);
1065
717k
    size_t empty_start = 0;
1066
717k
    size_t empty_end = 0;
1067
717k
    PeerFetchedBlockSet peer_fetched_blocks;
1068
717k
    RETURN_IF_ERROR(_read_remote_blocks_into_cache(empty_blocks, offset, bytes_req, already_read,
1069
717k
                                                   result, is_dryrun, stats, source_read_breakdown,
1070
717k
                                                   io_ctx, indirect_read_bytes, empty_start,
1071
717k
                                                   empty_end, peer_fetched_blocks));
1072
717k
    *bytes_read = already_read;
1073
717k
    RETURN_IF_ERROR(_read_remaining_blocks_from_cache(holder, offset, bytes_req, result, is_dryrun,
1074
717k
                                                      empty_start, empty_end, peer_fetched_blocks,
1075
717k
                                                      stats, source_read_breakdown,
1076
717k
                                                      indirect_read_bytes, bytes_read, io_ctx));
1077
717k
    g_read_cache_indirect_bytes << indirect_read_bytes;
1078
717k
    g_read_cache_indirect_total_bytes << *bytes_read;
1079
717k
    DCHECK(*bytes_read == bytes_req);
1080
717k
    return Status::OK();
1081
717k
}
1082
1083
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
1084
718k
                                            const IOContext* io_ctx) {
1085
718k
    SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
1086
718k
    IOContext default_io_ctx;
1087
718k
    if (io_ctx == nullptr) {
1088
1
        io_ctx = &default_io_ctx;
1089
1
    }
1090
718k
    DCHECK(io_ctx);
1091
718k
    DCHECK(!closed());
1092
1093
718k
    const bool is_dryrun = io_ctx->is_dryrun;
1094
718k
    if (offset > size()) {
1095
1
        return Status::InvalidArgument(
1096
1
                fmt::format("offset exceeds file size(offset: {}, file size: {}, path: {})", offset,
1097
1
                            size(), path().native()));
1098
1
    }
1099
1100
718k
    size_t bytes_req = std::min(result.size, size() - offset);
1101
718k
    if (UNLIKELY(bytes_req == 0)) {
1102
1
        *bytes_read = 0;
1103
1
        return Status::OK();
1104
1
    }
1105
1106
718k
    ReadStatistics stats;
1107
718k
    SourceReadBreakdown source_read_breakdown;
1108
718k
    Status read_st = Status::OK();
1109
718k
    MonotonicStopWatch read_at_sw;
1110
718k
    read_at_sw.start();
1111
718k
    stats.bytes_read += bytes_req;
1112
718k
    Defer defer {[&]() {
1113
718k
        if (config::print_stack_when_cache_miss) {
1114
0
            if (io_ctx->file_cache_stats == nullptr && !stats.hit_cache && !io_ctx->is_warmup) {
1115
0
                LOG_INFO("[verbose] {}", Status::InternalError<true>("not hit cache"));
1116
0
            }
1117
0
        }
1118
718k
        if (!stats.hit_cache && config::read_cluster_cache_opt_verbose_log) {
1119
0
            LOG_INFO(
1120
0
                    "[verbose] not hit cache, path: {}, offset: {}, size: {}, cost: {} ms, warmup: "
1121
0
                    "{}",
1122
0
                    path().native(), offset, bytes_req, read_at_sw.elapsed_time_milliseconds(),
1123
0
                    io_ctx->is_warmup);
1124
0
        }
1125
718k
        if (read_st.ok() && !is_dryrun) {
1126
            // Only successful reads contribute to query profile and file-cache metrics.
1127
718k
            const auto file_cache_read_type =
1128
718k
                    io_ctx->is_inverted_index
1129
718k
                            ? FileCacheReadType::INVERTED_INDEX
1130
718k
                            : (io_ctx->is_index_data ? FileCacheReadType::SEGMENT_FOOTER_INDEX
1131
718k
                                                     : FileCacheReadType::DATA);
1132
718k
            if (io_ctx->file_cache_stats) {
1133
709k
                _update_stats(stats, source_read_breakdown, io_ctx->file_cache_stats,
1134
709k
                              file_cache_read_type);
1135
709k
            }
1136
718k
            if (!io_ctx->is_warmup) {
1137
718k
                FileCacheStatistics fcache_stats_increment;
1138
718k
                _update_stats(stats, source_read_breakdown, &fcache_stats_increment,
1139
718k
                              file_cache_read_type);
1140
718k
                io::FileCacheMetrics::instance().update(&fcache_stats_increment);
1141
718k
            }
1142
718k
        }
1143
718k
    }};
1144
1145
718k
    size_t already_read = 0;
1146
718k
    if (_try_read_from_cached_files_directly(offset, result, bytes_req, is_dryrun, stats,
1147
718k
                                             source_read_breakdown, already_read, bytes_read)) {
1148
1.00k
        return Status::OK();
1149
1.00k
    }
1150
1151
717k
    read_st = _read_from_indirect_cache(offset, result, bytes_req, already_read, is_dryrun,
1152
717k
                                        bytes_read, stats, source_read_breakdown, io_ctx);
1153
717k
    return read_st;
1154
718k
}
1155
1156
0
void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) {
1157
0
    if (offset >= this->size() || size == 0) {
1158
0
        return;
1159
0
    }
1160
1161
0
    size = std::min(size, this->size() - offset);
1162
1163
0
    ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool();
1164
0
    if (pool == nullptr) {
1165
0
        return;
1166
0
    }
1167
1168
0
    IOContext dryrun_ctx;
1169
0
    if (io_ctx != nullptr) {
1170
0
        dryrun_ctx = *io_ctx;
1171
0
    }
1172
0
    dryrun_ctx.is_dryrun = true;
1173
0
    dryrun_ctx.query_id = nullptr;
1174
0
    dryrun_ctx.file_cache_stats = nullptr;
1175
0
    dryrun_ctx.file_reader_stats = nullptr;
1176
1177
0
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
1178
0
            << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}",
1179
0
                           offset, size, path().filename().native());
1180
0
    std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this();
1181
0
    auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() {
1182
0
        auto self = weak_this.lock();
1183
0
        if (self == nullptr) {
1184
0
            return;
1185
0
        }
1186
0
        size_t bytes_read = 0;
1187
0
        Slice dummy_buffer((char*)nullptr, size);
1188
0
        (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx);
1189
0
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
1190
0
                << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}",
1191
0
                               offset, size, self->path().filename().native());
1192
0
    });
1193
1194
0
    if (!st.ok()) {
1195
0
        VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size
1196
0
                   << " error=" << st.to_string();
1197
0
    }
1198
0
}
1199
1200
void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
1201
                                           const SourceReadBreakdown& source_read_breakdown,
1202
                                           FileCacheStatistics* statis,
1203
1.42M
                                           FileCacheReadType read_type) const {
1204
1.42M
    if (statis == nullptr) {
1205
0
        return;
1206
0
    }
1207
1.42M
    const bool has_source_bytes = source_read_breakdown.local_bytes != 0 ||
1208
1.42M
                                  source_read_breakdown.remote_bytes != 0 ||
1209
1.42M
                                  source_read_breakdown.peer_bytes != 0;
1210
1.42M
    if (has_source_bytes) {
1211
1.42M
        if (source_read_breakdown.local_bytes != 0) {
1212
1.41M
            statis->num_local_io_total++;
1213
1.41M
            statis->bytes_read_from_local += source_read_breakdown.local_bytes;
1214
1.41M
        }
1215
1.42M
        if (source_read_breakdown.peer_bytes != 0 || read_stats.from_peer_cache) {
1216
            // Count peer IO whenever peer was used, even if its fetched blocks were entirely
1217
            // outside the copy range (e.g., backward-aligned prefetch block before
1218
            // offset+already_read).  In that case peer_bytes==0 but the peer RPC did happen
1219
            // and wrote data into the local file cache.
1220
28
            statis->num_peer_io_total++;
1221
28
            statis->bytes_read_from_peer += source_read_breakdown.peer_bytes;
1222
28
            statis->peer_io_timer += read_stats.peer_read_timer;
1223
28
        }
1224
1.42M
        if (source_read_breakdown.remote_bytes != 0) {
1225
10.3k
            statis->num_remote_io_total++;
1226
10.3k
            statis->bytes_read_from_remote += source_read_breakdown.remote_bytes;
1227
10.3k
            statis->remote_io_timer += read_stats.remote_read_timer;
1228
10.3k
        }
1229
18.4E
    } else if (read_stats.hit_cache) {
1230
0
        statis->num_local_io_total++;
1231
0
        statis->bytes_read_from_local += read_stats.bytes_read;
1232
18.4E
    } else if (read_stats.from_peer_cache) {
1233
0
        statis->num_peer_io_total++;
1234
0
        statis->bytes_read_from_peer += read_stats.bytes_read;
1235
0
        statis->peer_io_timer += read_stats.peer_read_timer;
1236
18.4E
    } else {
1237
18.4E
        statis->num_remote_io_total++;
1238
18.4E
        statis->bytes_read_from_remote += read_stats.bytes_read;
1239
18.4E
        statis->remote_io_timer += read_stats.remote_read_timer;
1240
18.4E
    }
1241
1.42M
    statis->remote_wait_timer += read_stats.remote_wait_timer;
1242
1.42M
    statis->local_io_timer += read_stats.local_read_timer;
1243
1.42M
    statis->num_skip_cache_io_total += read_stats.skip_cache;
1244
1.42M
    statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
1245
1.42M
    statis->write_cache_io_timer += read_stats.local_write_timer;
1246
1247
1.42M
    statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer;
1248
1.42M
    statis->cache_get_or_set_timer += read_stats.cache_get_or_set_timer;
1249
1.42M
    statis->lock_wait_timer += read_stats.lock_wait_timer;
1250
1.42M
    statis->get_timer += read_stats.get_timer;
1251
1.42M
    statis->set_timer += read_stats.set_timer;
1252
1253
1.42M
    auto update_index_stats = [&](int64_t& num_local_io_total, int64_t& num_remote_io_total,
1254
1.42M
                                  int64_t& num_peer_io_total, int64_t& bytes_read_from_local,
1255
1.42M
                                  int64_t& bytes_read_from_remote, int64_t& bytes_read_from_peer,
1256
1.42M
                                  int64_t& local_io_timer, int64_t& remote_io_timer,
1257
1.42M
                                  int64_t& peer_io_timer) {
1258
2
        if (has_source_bytes) {
1259
2
            if (source_read_breakdown.local_bytes != 0) {
1260
0
                num_local_io_total++;
1261
0
                bytes_read_from_local += source_read_breakdown.local_bytes;
1262
0
            }
1263
2
            if (source_read_breakdown.peer_bytes != 0 || read_stats.from_peer_cache) {
1264
0
                num_peer_io_total++;
1265
0
                bytes_read_from_peer += source_read_breakdown.peer_bytes;
1266
0
                peer_io_timer += read_stats.peer_read_timer;
1267
0
            }
1268
2
            if (source_read_breakdown.remote_bytes != 0) {
1269
2
                num_remote_io_total++;
1270
2
                bytes_read_from_remote += source_read_breakdown.remote_bytes;
1271
2
                remote_io_timer += read_stats.remote_read_timer;
1272
2
            }
1273
2
        } else if (read_stats.hit_cache) {
1274
0
            num_local_io_total++;
1275
0
            bytes_read_from_local += read_stats.bytes_read;
1276
0
        } else if (read_stats.from_peer_cache) {
1277
0
            num_peer_io_total++;
1278
0
            bytes_read_from_peer += read_stats.bytes_read;
1279
0
            peer_io_timer += read_stats.peer_read_timer;
1280
0
        } else {
1281
0
            num_remote_io_total++;
1282
0
            bytes_read_from_remote += read_stats.bytes_read;
1283
0
            remote_io_timer += read_stats.remote_read_timer;
1284
0
        }
1285
2
        local_io_timer += read_stats.local_read_timer;
1286
2
    };
1287
1288
1.42M
    switch (read_type) {
1289
1.42M
    case FileCacheReadType::DATA:
1290
1.42M
        break;
1291
0
    case FileCacheReadType::INVERTED_INDEX:
1292
0
        update_index_stats(
1293
0
                statis->inverted_index_num_local_io_total,
1294
0
                statis->inverted_index_num_remote_io_total,
1295
0
                statis->inverted_index_num_peer_io_total,
1296
0
                statis->inverted_index_bytes_read_from_local,
1297
0
                statis->inverted_index_bytes_read_from_remote,
1298
0
                statis->inverted_index_bytes_read_from_peer, statis->inverted_index_local_io_timer,
1299
0
                statis->inverted_index_remote_io_timer, statis->inverted_index_peer_io_timer);
1300
0
        break;
1301
2
    case FileCacheReadType::SEGMENT_FOOTER_INDEX:
1302
2
        update_index_stats(statis->segment_footer_index_num_local_io_total,
1303
2
                           statis->segment_footer_index_num_remote_io_total,
1304
2
                           statis->segment_footer_index_num_peer_io_total,
1305
2
                           statis->segment_footer_index_bytes_read_from_local,
1306
2
                           statis->segment_footer_index_bytes_read_from_remote,
1307
2
                           statis->segment_footer_index_bytes_read_from_peer,
1308
2
                           statis->segment_footer_index_local_io_timer,
1309
2
                           statis->segment_footer_index_remote_io_timer,
1310
2
                           statis->segment_footer_index_peer_io_timer);
1311
2
        break;
1312
1.42M
    }
1313
1314
1.42M
    g_skip_cache_sum << read_stats.skip_cache;
1315
1.42M
}
1316
1317
} // namespace doris::io