Coverage Report

Created: 2026-07-02 13:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/cache/cached_remote_file_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/cache/cached_remote_file_reader.h"
19
20
#include <brpc/controller.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <glog/logging.h>
25
26
#include <algorithm>
27
#include <atomic>
28
#include <condition_variable>
29
#include <cstring>
30
#include <functional>
31
#include <list>
32
#include <memory>
33
#include <mutex>
34
#include <thread>
35
#include <vector>
36
37
#include "cloud/cloud_warm_up_manager.h"
38
#include "cloud/config.h"
39
#include "common/compiler_util.h" // IWYU pragma: keep
40
#include "common/config.h"
41
#include "cpp/sync_point.h"
42
#include "cpp/token_bucket_rate_limiter.h"
43
#include "io/cache/block_file_cache.h"
44
#include "io/cache/block_file_cache_factory.h"
45
#include "io/cache/block_file_cache_profile.h"
46
#include "io/cache/file_block.h"
47
#include "io/cache/file_cache_common.h"
48
#include "io/cache/peer_file_cache_reader.h"
49
#include "io/fs/file_reader.h"
50
#include "io/fs/local_file_system.h"
51
#include "io/io_common.h"
52
#include "runtime/exec_env.h"
53
#include "runtime/runtime_profile.h"
54
#include "runtime/thread_context.h"
55
#include "runtime/workload_management/io_throttle.h"
56
#include "service/backend_options.h"
57
#include "util/bit_util.h"
58
#include "util/brpc_client_cache.h" // BrpcClientCache
59
#include "util/client_cache.h"
60
#include "util/concurrency_stats.h"
61
#include "util/debug_points.h"
62
63
namespace doris::io {
64
65
bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read");
66
bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read");
67
bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num");
68
bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum");
69
bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes(
70
        "cached_remote_reader_skip_local_cache_io_sum_bytes");
71
bvar::Adder<uint64_t> g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num");
72
bvar::Adder<uint64_t> g_read_cache_direct_partial_num(
73
        "cached_remote_reader_cache_direct_partial_num");
74
bvar::Adder<uint64_t> g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num");
75
bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes(
76
        "cached_remote_reader_cache_direct_whole_bytes");
77
bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes(
78
        "cached_remote_reader_cache_direct_partial_bytes");
79
bvar::Adder<uint64_t> g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes");
80
bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes(
81
        "cached_remote_reader_cache_indirect_total_bytes");
82
bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found(
83
        "cached_remote_reader_self_heal_on_not_found");
84
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window(
85
        "cached_remote_reader_indirect_bytes_1min_window", &g_read_cache_indirect_bytes, 60);
86
bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_total_bytes_1min_window(
87
        "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes,
88
        60);
89
bvar::Adder<uint64_t> g_failed_get_peer_addr_counter(
90
        "cached_remote_reader_failed_get_peer_addr_counter");
91
92
CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader,
93
                                               const FileReaderOptions& opts)
94
7.14k
        : _tablet_id(opts.tablet_id), _remote_file_reader(std::move(remote_file_reader)) {
95
7.14k
    DCHECK(!opts.is_doris_table || _tablet_id > 0);
96
7.14k
    _is_doris_table = opts.is_doris_table;
97
7.14k
    if (_is_doris_table) {
98
7.14k
        _cache_hash = BlockFileCache::hash(path().filename().native());
99
7.14k
        _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
100
7.14k
        if (config::enable_read_cache_file_directly) {
101
            // this is designed for and test in doris table, external table need extra tests
102
16
            _cache_file_readers = _cache->get_blocks_by_key(_cache_hash);
103
16
        }
104
7.14k
    } else {
105
        // Use path and modification time to build cache key
106
3
        std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime);
107
3
        _cache_hash = BlockFileCache::hash(unique_path);
108
3
        if (opts.cache_base_path.empty()) {
109
            // if cache path is not specified by session variable, chose randomly.
110
1
            _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
111
2
        } else {
112
            // from query session variable: file_cache_base_path
113
2
            _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path);
114
2
            if (_cache == nullptr) {
115
1
                LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path
116
1
                             << ", using random instead.";
117
1
                _cache = FileCacheFactory::instance()->get_by_path(_cache_hash);
118
1
            }
119
2
        }
120
3
    }
121
7.14k
}
122
123
717k
void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) {
124
717k
    if (_is_doris_table && config::enable_read_cache_file_directly) {
125
30
        std::lock_guard lock(_mtx);
126
30
        DCHECK(file_block->state() == FileBlock::State::DOWNLOADED);
127
30
        file_block->_owned_by_cached_reader = true;
128
30
        _cache_file_readers.emplace(file_block->offset(), std::move(file_block));
129
30
    }
130
717k
}
131
132
7.14k
CachedRemoteFileReader::~CachedRemoteFileReader() {
133
7.14k
    for (auto& it : _cache_file_readers) {
134
51
        it.second->_owned_by_cached_reader = false;
135
51
    }
136
7.14k
    static_cast<void>(close());
137
7.14k
}
138
139
8.16k
Status CachedRemoteFileReader::close() {
140
8.16k
    return _remote_file_reader->close();
141
8.16k
}
142
143
std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, size_t read_size,
144
718k
                                                               size_t length) {
145
718k
    size_t left = offset;
146
718k
    size_t right = offset + read_size - 1;
147
718k
    size_t align_left =
148
718k
            (left / config::file_cache_each_block_size) * config::file_cache_each_block_size;
149
718k
    size_t align_right =
150
718k
            (right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size;
151
718k
    align_right = align_right < length ? align_right : length;
152
718k
    size_t align_size = align_right - align_left;
153
718k
    if (align_size < config::file_cache_each_block_size && align_left != 0) {
154
4
        align_size += config::file_cache_each_block_size;
155
4
        align_left -= config::file_cache_each_block_size;
156
4
    }
157
718k
    return std::make_pair(align_left, align_size);
158
718k
}
159
160
namespace {
161
// Execute S3 read
162
Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer,
163
                       ReadStatistics& stats, const IOContext* io_ctx,
164
5.14k
                       FileReaderSPtr remote_file_reader) {
165
5.14k
    s3_read_counter << 1;
166
5.14k
    SCOPED_RAW_TIMER(&stats.remote_read_timer);
167
5.14k
    stats.from_peer_cache = false;
168
5.14k
    return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx);
169
5.14k
}
170
171
// Get peer connection info from tablet_id
172
std::pair<std::string, int> get_peer_connection_info(int64_t tablet_id,
173
0
                                                     const std::string& file_path) {
174
0
    std::string host = "";
175
0
    int port = 0;
176
177
0
    DCHECK(tablet_id > 0);
178
0
    auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
179
0
    if (auto tablet_info = manager.get_balanced_tablet_info(tablet_id)) {
180
0
        host = tablet_info->first;
181
0
        port = tablet_info->second;
182
0
    } else {
183
0
        LOG_EVERY_N(WARNING, 100) << "get peer connection info not found"
184
0
                                  << ", tablet_id=" << tablet_id << ", file_path=" << file_path;
185
0
    }
186
187
0
    DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", {
188
0
        host = dp->param<std::string>("host", "127.0.0.1");
189
0
        port = dp->param("port", 9060);
190
0
        LOG_WARNING("debug point PeerFileCacheReader::_fetch_from_peer_cache_blocks")
191
0
                .tag("host", host)
192
0
                .tag("port", port);
193
0
    });
194
195
0
    return {host, port};
196
0
}
197
198
// Execute peer read with fallback to S3
199
// file_size is the size of the file
200
// used to calculate the rightmost boundary value of the block, due to inaccurate current block meta information.
201
Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start,
202
                         size_t& size, std::unique_ptr<char[]>& buffer,
203
                         const std::string& file_path, size_t file_size, bool is_doris_table,
204
0
                         int64_t tablet_id, ReadStatistics& stats, const IOContext* io_ctx) {
205
0
    auto [host, port] = get_peer_connection_info(tablet_id, file_path);
206
0
    VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port
207
0
               << ", file_path=" << file_path;
208
209
0
    if (host.empty() || port == 0) {
210
0
        g_failed_get_peer_addr_counter << 1;
211
0
        VLOG_DEBUG << "PeerFileCacheReader host or port is empty"
212
0
                   << ", host=" << host << ", port=" << port << ", file_path=" << file_path;
213
0
        return Status::InternalError<false>("host or port is empty");
214
0
    }
215
0
    SCOPED_RAW_TIMER(&stats.peer_read_timer);
216
0
    peer_read_counter << 1;
217
0
    PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port);
218
0
    auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size,
219
0
                                       file_size, io_ctx);
220
0
    if (!st.ok()) {
221
0
        VLOG_DEBUG << "PeerFileCacheReader read from peer failed"
222
0
                   << ", host=" << host << ", port=" << port << ", error=" << st.msg();
223
0
    }
224
0
    stats.from_peer_cache = true;
225
0
    return st;
226
0
}
227
228
} // anonymous namespace
229
230
Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockSPtr>& empty_blocks,
231
                                                    size_t empty_start, size_t& size,
232
                                                    std::unique_ptr<char[]>& buffer,
233
                                                    ReadStatistics& stats,
234
5.14k
                                                    const IOContext* io_ctx) {
235
5.14k
    DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", {
236
        // Determine read type from debug point or default to S3
237
5.14k
        std::string read_type = "s3";
238
5.14k
        read_type = dp->param<std::string>("type", "s3");
239
5.14k
        LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type")
240
5.14k
                .tag("path", path().native())
241
5.14k
                .tag("off", empty_start)
242
5.14k
                .tag("size", size)
243
5.14k
                .tag("type", read_type);
244
        // Execute appropriate read strategy
245
5.14k
        if (read_type == "s3") {
246
5.14k
            return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
247
5.14k
        } else {
248
5.14k
            return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(),
249
5.14k
                                     this->size(), _is_doris_table, _tablet_id, stats, io_ctx);
250
5.14k
        }
251
5.14k
    });
252
253
5.14k
    if (!doris::config::is_cloud_mode() || !_is_doris_table || io_ctx->is_warmup ||
254
5.14k
        !doris::config::enable_cache_read_from_peer) {
255
5.14k
        return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
256
5.14k
    } else {
257
        // first try peer read, if peer failed, fallback to S3
258
        // peer timeout is 5 seconds
259
        // TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first
260
        // ATTN: Save original size before peer read, as it may be modified by fetch_blocks, read peer ref size
261
0
        size_t original_size = size;
262
0
        auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(),
263
0
                                    this->size(), _is_doris_table, _tablet_id, stats, io_ctx);
264
0
        if (!st.ok()) {
265
            // Restore original size for S3 fallback, as peer read may have modified it
266
0
            size = original_size;
267
            // Fallback to S3
268
0
            return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader);
269
0
        }
270
0
        return st;
271
0
    }
272
5.14k
}
273
274
Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
275
718k
                                            const IOContext* io_ctx) {
276
718k
    size_t already_read = 0;
277
718k
    SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at);
278
279
718k
    const IOContext default_io_ctx;
280
718k
    if (io_ctx == nullptr) {
281
1
        io_ctx = &default_io_ctx;
282
1
    }
283
718k
    DCHECK(io_ctx);
284
718k
    const bool is_dryrun = io_ctx->is_dryrun;
285
718k
    DCHECK(!closed());
286
718k
    if (offset > size()) {
287
1
        return Status::InvalidArgument(
288
1
                fmt::format("offset exceeds file size(offset: {}, file size: {}, path: {})", offset,
289
1
                            size(), path().native()));
290
1
    }
291
718k
    size_t bytes_req = result.size;
292
718k
    bytes_req = std::min(bytes_req, size() - offset);
293
718k
    if (UNLIKELY(bytes_req == 0)) {
294
1
        *bytes_read = 0;
295
1
        return Status::OK();
296
1
    }
297
298
718k
    ReadStatistics stats;
299
718k
    stats.bytes_read += bytes_req;
300
718k
    bool read_success = false;
301
718k
    MonotonicStopWatch read_at_sw;
302
718k
    read_at_sw.start();
303
718k
    auto defer_func = [&](int*) {
304
718k
        if (config::print_stack_when_cache_miss) {
305
0
            if (io_ctx->file_cache_stats == nullptr && !stats.hit_cache && !io_ctx->is_warmup) {
306
0
                LOG_INFO("[verbose] {}", Status::InternalError<true>("not hit cache"));
307
0
            }
308
0
        }
309
718k
        if (!stats.hit_cache && config::read_cluster_cache_opt_verbose_log) {
310
0
            LOG_INFO(
311
0
                    "[verbose] not hit cache, path: {}, offset: {}, size: {}, cost: {} ms, warmup: "
312
0
                    "{}",
313
0
                    path().native(), offset, bytes_req, read_at_sw.elapsed_time_milliseconds(),
314
0
                    io_ctx->is_warmup);
315
0
        }
316
718k
        if (is_dryrun) {
317
2
            return;
318
2
        }
319
718k
        if (!read_success) {
320
2
            return;
321
2
        }
322
718k
        if (!io_ctx->is_warmup) {
323
            // update stats increment in this reading procedure for file cache metrics
324
718k
            const auto file_cache_read_type =
325
718k
                    io_ctx->is_inverted_index
326
718k
                            ? FileCacheReadType::INVERTED_INDEX
327
718k
                            : (io_ctx->is_index_data ? FileCacheReadType::SEGMENT_FOOTER_INDEX
328
718k
                                                     : FileCacheReadType::DATA);
329
718k
            FileCacheStatistics fcache_stats_increment;
330
718k
            _update_stats(stats, &fcache_stats_increment, file_cache_read_type);
331
718k
            io::FileCacheMetrics::instance().update(&fcache_stats_increment);
332
718k
        }
333
718k
        if (io_ctx->file_cache_stats) {
334
            // update stats in io_ctx, for query profile
335
709k
            const auto file_cache_read_type =
336
709k
                    io_ctx->is_inverted_index
337
709k
                            ? FileCacheReadType::INVERTED_INDEX
338
709k
                            : (io_ctx->is_index_data ? FileCacheReadType::SEGMENT_FOOTER_INDEX
339
709k
                                                     : FileCacheReadType::DATA);
340
709k
            _update_stats(stats, io_ctx->file_cache_stats, file_cache_read_type);
341
709k
        }
342
718k
    };
343
718k
    std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func));
344
718k
    if (_is_doris_table && config::enable_read_cache_file_directly) {
345
        // read directly
346
1.02k
        SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer);
347
1.02k
        size_t need_read_size = bytes_req;
348
1.02k
        std::shared_lock lock(_mtx);
349
1.02k
        if (!_cache_file_readers.empty()) {
350
            // find the last offset > offset.
351
1.01k
            auto iter = _cache_file_readers.upper_bound(offset);
352
1.01k
            if (iter != _cache_file_readers.begin()) {
353
1.01k
                iter--;
354
1.01k
            }
355
1.01k
            size_t cur_offset = offset;
356
2.45k
            while (need_read_size != 0 && iter != _cache_file_readers.end()) {
357
1.44k
                if (iter->second->offset() > cur_offset ||
358
1.44k
                    iter->second->range().right < cur_offset) {
359
5
                    break;
360
5
                }
361
1.44k
                size_t file_offset = cur_offset - iter->second->offset();
362
1.44k
                size_t reserve_bytes =
363
1.44k
                        std::min(need_read_size, iter->second->range().size() - file_offset);
364
1.44k
                if (is_dryrun) [[unlikely]] {
365
0
                    g_skip_local_cache_io_sum_bytes << reserve_bytes;
366
1.44k
                } else {
367
1.44k
                    SCOPED_RAW_TIMER(&stats.local_read_timer);
368
1.44k
                    if (!iter->second
369
1.44k
                                 ->read(Slice(result.data + (cur_offset - offset), reserve_bytes),
370
1.44k
                                        file_offset)
371
1.44k
                                 .ok()) { //TODO: maybe read failed because block evict, should handle error
372
0
                        break;
373
0
                    }
374
1.44k
                    stats.bytes_read_from_local += reserve_bytes;
375
1.44k
                }
376
1.44k
                _cache->add_need_update_lru_block(iter->second);
377
1.44k
                need_read_size -= reserve_bytes;
378
1.44k
                cur_offset += reserve_bytes;
379
1.44k
                already_read += reserve_bytes;
380
1.44k
                iter++;
381
1.44k
            }
382
1.01k
            if (need_read_size == 0) {
383
1.00k
                *bytes_read = bytes_req;
384
1.00k
                stats.hit_cache = true;
385
1.00k
                g_read_cache_direct_whole_num << 1;
386
1.00k
                g_read_cache_direct_whole_bytes << bytes_req;
387
1.00k
                read_success = true;
388
1.00k
                return Status::OK();
389
1.00k
            } else {
390
9
                g_read_cache_direct_partial_num << 1;
391
9
                g_read_cache_direct_partial_bytes << already_read;
392
9
            }
393
1.01k
        }
394
1.02k
    }
395
    // read from cache or remote
396
717k
    g_read_cache_indirect_num << 1;
397
717k
    size_t indirect_read_bytes = 0;
398
717k
    auto [align_left, align_size] =
399
717k
            s_align_size(offset + already_read, bytes_req - already_read, size());
400
717k
    CacheContext cache_context(io_ctx);
401
717k
    cache_context.stats = &stats;
402
717k
    cache_context.tablet_id = _tablet_id;
403
717k
    MonotonicStopWatch sw;
404
717k
    sw.start();
405
406
717k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment();
407
717k
    FileBlocksHolder holder =
408
717k
            _cache->get_or_set(_cache_hash, align_left, align_size, cache_context);
409
717k
    ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement();
410
411
717k
    stats.cache_get_or_set_timer += sw.elapsed_time();
412
717k
    std::vector<FileBlockSPtr> empty_blocks;
413
717k
    for (auto& block : holder.file_blocks) {
414
717k
        switch (block->state()) {
415
5.16k
        case FileBlock::State::EMPTY:
416
5.16k
            VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}",
417
0
                                      path().native(), _cache_hash.to_string(), _cache_hash.high(),
418
0
                                      _cache_hash.low(), block->offset(), block->get_cache_file());
419
5.16k
            block->get_or_set_downloader();
420
5.16k
            if (block->is_downloader()) {
421
5.16k
                empty_blocks.push_back(block);
422
5.16k
                TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY");
423
5.16k
            }
424
5.16k
            stats.hit_cache = false;
425
5.16k
            break;
426
3
        case FileBlock::State::SKIP_CACHE:
427
3
            VLOG_DEBUG << fmt::format(
428
0
                    "Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}",
429
0
                    path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(),
430
0
                    block->offset(), block->get_cache_file());
431
3
            empty_blocks.push_back(block);
432
3
            stats.hit_cache = false;
433
3
            stats.skip_cache = true;
434
3
            break;
435
2
        case FileBlock::State::DOWNLOADING:
436
2
            stats.hit_cache = false;
437
2
            break;
438
712k
        case FileBlock::State::DOWNLOADED:
439
712k
            _insert_file_reader(block);
440
712k
            break;
441
717k
        }
442
717k
    }
443
717k
    size_t empty_start = 0;
444
717k
    size_t empty_end = 0;
445
717k
    if (!empty_blocks.empty()) {
446
5.14k
        empty_start = empty_blocks.front()->range().left;
447
5.14k
        empty_end = empty_blocks.back()->range().right;
448
5.14k
        size_t size = empty_end - empty_start + 1;
449
5.14k
        std::unique_ptr<char[]> buffer(new char[size]);
450
451
        // Apply rate limiting for warmup download tasks (node level)
452
        // Rate limiting is applied before remote read to limit both S3 read and local cache write
453
5.14k
        if (io_ctx->is_warmup) {
454
1
            auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter();
455
1
            if (rate_limiter != nullptr) {
456
0
                rate_limiter->add(size);
457
0
            }
458
1
        }
459
460
        // Determine read type and execute remote read
461
5.14k
        RETURN_IF_ERROR(
462
5.14k
                _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx));
463
5.14k
        bool empty_blocks_from_peer_cache = stats.from_peer_cache;
464
465
5.14k
        {
466
5.14k
            SCOPED_CONCURRENCY_COUNT(
467
5.14k
                    ConcurrencyStatsManager::instance().cached_remote_reader_write_back);
468
5.16k
            for (auto& block : empty_blocks) {
469
5.16k
                if (block->state() == FileBlock::State::SKIP_CACHE) {
470
3
                    continue;
471
3
                }
472
5.16k
                SCOPED_RAW_TIMER(&stats.local_write_timer);
473
5.16k
                char* cur_ptr = buffer.get() + block->range().left - empty_start;
474
5.16k
                size_t block_size = block->range().size();
475
5.16k
                Status st = block->append(Slice(cur_ptr, block_size));
476
5.16k
                if (st.ok()) {
477
5.15k
                    st = block->finalize();
478
5.15k
                }
479
5.16k
                if (!st.ok()) {
480
2
                    LOG_EVERY_N(WARNING, 100)
481
1
                            << "Write data to file cache failed. err=" << st.msg();
482
5.15k
                } else {
483
5.15k
                    _insert_file_reader(block);
484
5.15k
                }
485
5.16k
                stats.bytes_write_into_file_cache += block_size;
486
5.16k
            }
487
5.14k
        }
488
        // copy from memory directly
489
5.14k
        size_t right_offset = offset + bytes_req - 1;
490
5.14k
        if (empty_start <= right_offset && empty_end >= offset + already_read && !is_dryrun) {
491
5.14k
            size_t copy_left_offset = std::max(offset + already_read, empty_start);
492
5.14k
            size_t copy_right_offset = std::min(right_offset, empty_end);
493
5.14k
            char* dst = result.data + (copy_left_offset - offset);
494
5.14k
            char* src = buffer.get() + (copy_left_offset - empty_start);
495
5.14k
            size_t copy_size = copy_right_offset - copy_left_offset + 1;
496
5.14k
            memcpy(dst, src, copy_size);
497
5.14k
            indirect_read_bytes += copy_size;
498
5.14k
            if (empty_blocks_from_peer_cache) {
499
0
                stats.bytes_read_from_peer += copy_size;
500
5.14k
            } else {
501
5.14k
                stats.bytes_read_from_remote += copy_size;
502
5.14k
            }
503
5.14k
        }
504
5.14k
    }
505
506
717k
    size_t current_offset = offset + already_read;
507
717k
    size_t end_offset = offset + bytes_req - 1;
508
717k
    bool need_self_heal = false;
509
717k
    *bytes_read = already_read;
510
717k
    for (auto& block : holder.file_blocks) {
511
717k
        if (current_offset > end_offset) {
512
0
            break;
513
0
        }
514
717k
        size_t left = block->range().left;
515
717k
        size_t right = block->range().right;
516
717k
        if (right < current_offset) {
517
1
            continue;
518
1
        }
519
717k
        size_t read_size =
520
717k
                end_offset > right ? right - current_offset + 1 : end_offset - current_offset + 1;
521
717k
        if (empty_start <= left && right <= empty_end) {
522
5.16k
            *bytes_read += read_size;
523
5.16k
            current_offset = right + 1;
524
5.16k
            continue;
525
5.16k
        }
526
712k
        FileBlock::State block_state = block->state();
527
712k
        int64_t wait_time = 0;
528
712k
        static int64_t max_wait_time = 10;
529
712k
        TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time);
530
712k
        if (block_state != FileBlock::State::DOWNLOADED) {
531
2
            SCOPED_CONCURRENCY_COUNT(
532
2
                    ConcurrencyStatsManager::instance().cached_remote_reader_blocking);
533
3
            do {
534
3
                SCOPED_RAW_TIMER(&stats.remote_wait_timer);
535
3
                SCOPED_RAW_TIMER(&stats.remote_read_timer);
536
3
                TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING");
537
3
                block_state = block->wait();
538
3
                if (block_state != FileBlock::State::DOWNLOADING) {
539
1
                    break;
540
1
                }
541
3
            } while (++wait_time < max_wait_time);
542
2
        }
543
712k
        if (wait_time == max_wait_time) [[unlikely]] {
544
1
            LOG_WARNING("Waiting too long for the download to complete");
545
1
        }
546
712k
        {
547
712k
            Status st;
548
            /*
549
             * If block_state == EMPTY, the thread reads the data from remote.
550
             * If block_state == DOWNLOADED, when the cache file is deleted by the other process,
551
             * the thread reads the data from remote too.
552
             */
553
712k
            if (block_state == FileBlock::State::DOWNLOADED) {
554
712k
                if (is_dryrun) [[unlikely]] {
555
1
                    g_skip_local_cache_io_sum_bytes << read_size;
556
712k
                } else {
557
712k
                    size_t file_offset = current_offset - left;
558
712k
                    SCOPED_RAW_TIMER(&stats.local_read_timer);
559
712k
                    SCOPED_CONCURRENCY_COUNT(
560
712k
                            ConcurrencyStatsManager::instance().cached_remote_reader_local_read);
561
712k
                    st = block->read(Slice(result.data + (current_offset - offset), read_size),
562
712k
                                     file_offset);
563
712k
                    if (st.ok()) {
564
712k
                        indirect_read_bytes += read_size;
565
712k
                        stats.bytes_read_from_local += read_size;
566
712k
                    }
567
712k
                }
568
712k
            }
569
712k
            if (!st || block_state != FileBlock::State::DOWNLOADED) {
570
3
                if (is_dryrun) [[unlikely]] {
571
                    // dryrun mode uses a null buffer, skip actual remote IO
572
3
                } else {
573
3
                    if (block_state == FileBlock::State::DOWNLOADED &&
574
3
                        st.is<ErrorCode::NOT_FOUND>()) {
575
1
                        need_self_heal = true;
576
1
                        g_read_cache_self_heal_on_not_found << 1;
577
1
                        LOG_EVERY_N(WARNING, 100)
578
1
                                << "Cache block file is missing, will self-heal by clearing cache "
579
1
                                   "hash. "
580
1
                                << "path=" << path().native()
581
1
                                << ", hash=" << _cache_hash.to_string() << ", offset=" << left
582
1
                                << ", err=" << st.msg();
583
1
                    }
584
3
                    LOG(WARNING) << "Read data failed from file cache downloaded by others. err="
585
3
                                 << st.msg() << ", block state=" << block_state;
586
3
                    size_t nest_bytes_read {0};
587
3
                    stats.hit_cache = false;
588
3
                    stats.from_peer_cache = false;
589
3
                    s3_read_counter << 1;
590
3
                    SCOPED_RAW_TIMER(&stats.remote_read_timer);
591
3
                    RETURN_IF_ERROR(_remote_file_reader->read_at(
592
3
                            current_offset,
593
3
                            Slice(result.data + (current_offset - offset), read_size),
594
3
                            &nest_bytes_read));
595
2
                    indirect_read_bytes += read_size;
596
2
                    DCHECK(nest_bytes_read == read_size);
597
2
                    stats.bytes_read_from_remote += nest_bytes_read;
598
2
                }
599
3
            }
600
712k
        }
601
712k
        *bytes_read += read_size;
602
712k
        current_offset = right + 1;
603
712k
    }
604
717k
    if (need_self_heal && _cache != nullptr) {
605
1
        _cache->remove_if_cached_async(_cache_hash);
606
1
    }
607
717k
    g_read_cache_indirect_bytes << indirect_read_bytes;
608
717k
    g_read_cache_indirect_total_bytes << *bytes_read;
609
610
717k
    DCHECK(*bytes_read == bytes_req);
611
717k
    if (!is_dryrun) {
612
717k
        DCHECK_EQ(stats.bytes_read_from_local + stats.bytes_read_from_remote +
613
717k
                          stats.bytes_read_from_peer,
614
717k
                  bytes_req);
615
717k
    }
616
717k
    read_success = true;
617
717k
    return Status::OK();
618
717k
}
619
620
void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats,
621
                                           FileCacheStatistics* statis,
622
1.42M
                                           FileCacheReadType read_type) const {
623
1.42M
    if (statis == nullptr) {
624
0
        return;
625
0
    }
626
1.42M
    if (read_stats.bytes_read_from_local > 0) {
627
1.41M
        statis->num_local_io_total++;
628
1.41M
        statis->bytes_read_from_local += read_stats.bytes_read_from_local;
629
1.41M
    }
630
1.42M
    if (read_stats.bytes_read_from_remote > 0) {
631
10.2k
        statis->num_remote_io_total++;
632
10.2k
        statis->bytes_read_from_remote += read_stats.bytes_read_from_remote;
633
10.2k
        statis->remote_io_timer += read_stats.remote_read_timer;
634
10.2k
    }
635
1.42M
    if (read_stats.bytes_read_from_peer > 0) {
636
0
        statis->num_peer_io_total++;
637
0
        statis->bytes_read_from_peer += read_stats.bytes_read_from_peer;
638
0
        statis->peer_io_timer += read_stats.peer_read_timer;
639
0
    }
640
1.42M
    statis->remote_wait_timer += read_stats.remote_wait_timer;
641
1.42M
    statis->local_io_timer += read_stats.local_read_timer;
642
1.42M
    statis->num_skip_cache_io_total += read_stats.skip_cache;
643
1.42M
    statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache;
644
1.42M
    statis->write_cache_io_timer += read_stats.local_write_timer;
645
646
1.42M
    statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer;
647
1.42M
    statis->cache_get_or_set_timer += read_stats.cache_get_or_set_timer;
648
1.42M
    statis->lock_wait_timer += read_stats.lock_wait_timer;
649
1.42M
    statis->get_timer += read_stats.get_timer;
650
1.42M
    statis->set_timer += read_stats.set_timer;
651
652
1.42M
    auto update_index_stats = [&](int64_t& num_local_io_total, int64_t& num_remote_io_total,
653
1.42M
                                  int64_t& num_peer_io_total, int64_t& bytes_read_from_local,
654
1.42M
                                  int64_t& bytes_read_from_remote, int64_t& bytes_read_from_peer,
655
1.42M
                                  int64_t& local_io_timer, int64_t& remote_io_timer,
656
1.42M
                                  int64_t& peer_io_timer) {
657
2
        if (read_stats.bytes_read_from_local > 0) {
658
0
            num_local_io_total++;
659
0
            bytes_read_from_local += read_stats.bytes_read_from_local;
660
0
        }
661
2
        if (read_stats.bytes_read_from_remote > 0) {
662
2
            num_remote_io_total++;
663
2
            bytes_read_from_remote += read_stats.bytes_read_from_remote;
664
2
            remote_io_timer += read_stats.remote_read_timer;
665
2
        }
666
2
        if (read_stats.bytes_read_from_peer > 0) {
667
0
            num_peer_io_total++;
668
0
            bytes_read_from_peer += read_stats.bytes_read_from_peer;
669
0
            peer_io_timer += read_stats.peer_read_timer;
670
0
        }
671
2
        local_io_timer += read_stats.local_read_timer;
672
2
    };
673
674
1.42M
    switch (read_type) {
675
1.42M
    case FileCacheReadType::DATA:
676
1.42M
        break;
677
0
    case FileCacheReadType::INVERTED_INDEX:
678
0
        update_index_stats(
679
0
                statis->inverted_index_num_local_io_total,
680
0
                statis->inverted_index_num_remote_io_total,
681
0
                statis->inverted_index_num_peer_io_total,
682
0
                statis->inverted_index_bytes_read_from_local,
683
0
                statis->inverted_index_bytes_read_from_remote,
684
0
                statis->inverted_index_bytes_read_from_peer, statis->inverted_index_local_io_timer,
685
0
                statis->inverted_index_remote_io_timer, statis->inverted_index_peer_io_timer);
686
0
        break;
687
2
    case FileCacheReadType::SEGMENT_FOOTER_INDEX:
688
2
        update_index_stats(statis->segment_footer_index_num_local_io_total,
689
2
                           statis->segment_footer_index_num_remote_io_total,
690
2
                           statis->segment_footer_index_num_peer_io_total,
691
2
                           statis->segment_footer_index_bytes_read_from_local,
692
2
                           statis->segment_footer_index_bytes_read_from_remote,
693
2
                           statis->segment_footer_index_bytes_read_from_peer,
694
2
                           statis->segment_footer_index_local_io_timer,
695
2
                           statis->segment_footer_index_remote_io_timer,
696
2
                           statis->segment_footer_index_peer_io_timer);
697
2
        break;
698
1.42M
    }
699
700
1.42M
    g_skip_cache_sum << read_stats.skip_cache;
701
1.42M
}
702
703
0
void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) {
704
0
    if (offset >= this->size() || size == 0) {
705
0
        return;
706
0
    }
707
708
0
    size = std::min(size, this->size() - offset);
709
710
0
    ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool();
711
0
    if (pool == nullptr) {
712
0
        return;
713
0
    }
714
715
0
    IOContext dryrun_ctx;
716
0
    if (io_ctx != nullptr) {
717
0
        dryrun_ctx = *io_ctx;
718
0
    }
719
0
    dryrun_ctx.is_dryrun = true;
720
0
    dryrun_ctx.query_id = nullptr;
721
0
    dryrun_ctx.file_cache_stats = nullptr;
722
0
    dryrun_ctx.file_reader_stats = nullptr;
723
724
0
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
725
0
            << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}",
726
0
                           offset, size, path().filename().native());
727
0
    std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this();
728
0
    auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() {
729
0
        auto self = weak_this.lock();
730
0
        if (self == nullptr) {
731
0
            return;
732
0
        }
733
0
        size_t bytes_read;
734
0
        Slice dummy_buffer((char*)nullptr, size);
735
0
        (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx);
736
0
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log)
737
0
                << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}",
738
0
                               offset, size, self->path().filename().native());
739
0
    });
740
741
0
    if (!st.ok()) {
742
0
        VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size
743
0
                   << " error=" << st.to_string();
744
0
    }
745
0
}
746
747
} // namespace doris::io