Coverage Report

Created: 2026-04-14 03:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/buffered_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/buffered_reader.h"
19
20
#include <bvar/reducer.h>
21
#include <bvar/window.h>
22
#include <string.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <cstdint>
27
#include <memory>
28
29
#include "common/cast_set.h"
30
#include "common/compiler_util.h" // IWYU pragma: keep
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "core/custom_allocator.h"
34
#include "runtime/exec_env.h"
35
#include "runtime/runtime_profile.h"
36
#include "runtime/thread_context.h"
37
#include "runtime/workload_management/io_throttle.h"
38
#include "util/slice.h"
39
#include "util/threadpool.h"
40
namespace doris {
41
42
namespace io {
43
struct IOContext;
44
45
// add bvar to capture the download bytes per second by buffered reader
46
bvar::Adder<uint64_t> g_bytes_downloaded("buffered_reader", "bytes_downloaded");
47
bvar::PerSecond<bvar::Adder<uint64_t>> g_bytes_downloaded_per_second("buffered_reader",
48
                                                                     "bytes_downloaded_per_second",
49
                                                                     &g_bytes_downloaded, 60);
50
51
Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
52
1.05M
                                          const IOContext* io_ctx) {
53
1.05M
    _statistics.request_io++;
54
1.05M
    *bytes_read = 0;
55
1.05M
    if (result.size == 0) {
56
0
        return Status::OK();
57
0
    }
58
1.05M
    const int range_index = _search_read_range(offset, offset + result.size);
59
1.05M
    if (range_index < 0) {
60
806
        SCOPED_RAW_TIMER(&_statistics.read_time);
61
806
        Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
62
806
        _statistics.merged_io++;
63
806
        _statistics.request_bytes += *bytes_read;
64
806
        _statistics.merged_bytes += *bytes_read;
65
806
        return st;
66
806
    }
67
1.05M
    if (offset + result.size > _random_access_ranges[range_index].end_offset) {
68
        // return _reader->read_at(offset, result, bytes_read, io_ctx);
69
0
        return Status::IOError("Range in RandomAccessReader should be read sequentially");
70
0
    }
71
72
1.05M
    size_t has_read = 0;
73
1.05M
    RangeCachedData& cached_data = _range_cached_data[range_index];
74
1.05M
    cached_data.has_read = true;
75
1.05M
    if (cached_data.contains(offset)) {
76
        // has cached data in box
77
1.03M
        _read_in_box(cached_data, offset, result, &has_read);
78
1.03M
        _statistics.request_bytes += has_read;
79
1.03M
        if (has_read == result.size) {
80
            // all data is read in cache
81
1.03M
            *bytes_read = has_read;
82
1.03M
            return Status::OK();
83
1.03M
        }
84
1.03M
    } else if (!cached_data.empty()) {
85
        // the data in range may be skipped or ignored
86
4
        for (int16_t box_index : cached_data.ref_box) {
87
4
            _dec_box_ref(box_index);
88
4
        }
89
4
        cached_data.reset();
90
4
    }
91
92
16.8k
    size_t to_read = result.size - has_read;
93
16.8k
    if (to_read >= SMALL_IO || to_read >= _remaining) {
94
0
        SCOPED_RAW_TIMER(&_statistics.read_time);
95
0
        size_t read_size = 0;
96
0
        RETURN_IF_ERROR(_reader->read_at(offset + has_read, Slice(result.data + has_read, to_read),
97
0
                                         &read_size, io_ctx));
98
0
        *bytes_read = has_read + read_size;
99
0
        _statistics.merged_io++;
100
0
        _statistics.request_bytes += read_size;
101
0
        _statistics.merged_bytes += read_size;
102
0
        return Status::OK();
103
0
    }
104
105
    // merge small IO
106
16.8k
    size_t merge_start = offset + has_read;
107
16.8k
    const size_t merge_end = merge_start + _merged_read_slice_size;
108
    // <slice_size, is_content>
109
16.8k
    std::vector<std::pair<size_t, bool>> merged_slice;
110
16.8k
    size_t content_size = 0;
111
16.8k
    size_t hollow_size = 0;
112
16.8k
    if (merge_start > _random_access_ranges[range_index].end_offset) {
113
0
        return Status::IOError("Fail to merge small IO");
114
0
    }
115
16.8k
    int merge_index = range_index;
116
116k
    while (merge_start < merge_end && merge_index < _random_access_ranges.size()) {
117
100k
        size_t content_max = _remaining - content_size;
118
100k
        if (content_max == 0) {
119
0
            break;
120
0
        }
121
100k
        if (merge_index != range_index && _range_cached_data[merge_index].has_read) {
122
            // don't read or merge twice
123
0
            break;
124
0
        }
125
100k
        if (_random_access_ranges[merge_index].end_offset > merge_end) {
126
162
            size_t add_content = std::min(merge_end - merge_start, content_max);
127
162
            content_size += add_content;
128
162
            merge_start += add_content;
129
162
            merged_slice.emplace_back(add_content, true);
130
162
            break;
131
162
        }
132
99.8k
        size_t add_content =
133
99.8k
                std::min(_random_access_ranges[merge_index].end_offset - merge_start, content_max);
134
99.8k
        content_size += add_content;
135
99.8k
        merge_start += add_content;
136
99.8k
        merged_slice.emplace_back(add_content, true);
137
99.8k
        if (merge_start != _random_access_ranges[merge_index].end_offset) {
138
0
            break;
139
0
        }
140
99.8k
        if (merge_index < _random_access_ranges.size() - 1 && merge_start < merge_end) {
141
83.3k
            size_t gap = _random_access_ranges[merge_index + 1].start_offset -
142
83.3k
                         _random_access_ranges[merge_index].end_offset;
143
83.3k
            if ((content_size + hollow_size) > SMALL_IO && gap >= SMALL_IO) {
144
                // too large gap
145
40
                break;
146
40
            }
147
83.3k
            if (gap < merge_end - merge_start && content_size < _remaining &&
148
83.3k
                !_range_cached_data[merge_index + 1].has_read) {
149
83.1k
                hollow_size += gap;
150
83.1k
                merge_start = _random_access_ranges[merge_index + 1].start_offset;
151
83.1k
                merged_slice.emplace_back(gap, false);
152
83.1k
            } else {
153
                // there's no enough memory to read hollow data
154
160
                break;
155
160
            }
156
83.3k
        }
157
99.6k
        merge_index++;
158
99.6k
    }
159
16.8k
    content_size = 0;
160
16.8k
    hollow_size = 0;
161
16.8k
    std::vector<std::pair<double, size_t>> ratio_and_size;
162
    // Calculate the read amplified ratio for each merge operation and the size of the merged data.
163
    // Find the largest size of the merged data whose amplified ratio is less than config::max_amplified_read_ratio
164
183k
    for (const std::pair<size_t, bool>& slice : merged_slice) {
165
183k
        if (slice.second) {
166
100k
            content_size += slice.first;
167
100k
            if (slice.first > 0) {
168
100k
                ratio_and_size.emplace_back((double)hollow_size / (double)content_size,
169
100k
                                            content_size + hollow_size);
170
100k
            }
171
100k
        } else {
172
83.1k
            hollow_size += slice.first;
173
83.1k
        }
174
183k
    }
175
16.8k
    size_t best_merged_size = 0;
176
116k
    for (int i = 0; i < ratio_and_size.size(); ++i) {
177
100k
        const std::pair<double, size_t>& rs = ratio_and_size[i];
178
100k
        size_t equivalent_size = rs.second / (i + 1);
179
100k
        if (rs.second > best_merged_size) {
180
100k
            if (rs.first <= _max_amplified_ratio ||
181
100k
                (_max_amplified_ratio < 1 && equivalent_size <= _equivalent_io_size)) {
182
99.7k
                best_merged_size = rs.second;
183
99.7k
            }
184
100k
        }
185
100k
    }
186
187
16.8k
    if (best_merged_size == to_read) {
188
        // read directly to avoid copy operation
189
4.76k
        SCOPED_RAW_TIMER(&_statistics.read_time);
190
4.76k
        size_t read_size = 0;
191
4.76k
        RETURN_IF_ERROR(_reader->read_at(offset + has_read, Slice(result.data + has_read, to_read),
192
4.76k
                                         &read_size, io_ctx));
193
4.76k
        *bytes_read = has_read + read_size;
194
4.76k
        _statistics.merged_io++;
195
4.76k
        _statistics.request_bytes += read_size;
196
4.76k
        _statistics.merged_bytes += read_size;
197
4.76k
        return Status::OK();
198
4.76k
    }
199
200
12.0k
    merge_start = offset + has_read;
201
12.0k
    size_t merge_read_size = 0;
202
12.0k
    RETURN_IF_ERROR(
203
12.0k
            _fill_box(range_index, merge_start, best_merged_size, &merge_read_size, io_ctx));
204
12.0k
    if (cached_data.start_offset != merge_start) {
205
0
        return Status::IOError("Wrong start offset in merged IO");
206
0
    }
207
208
    // read from cached data
209
12.0k
    size_t box_read_size = 0;
210
12.0k
    _read_in_box(cached_data, merge_start, Slice(result.data + has_read, to_read), &box_read_size);
211
12.0k
    *bytes_read = has_read + box_read_size;
212
12.0k
    _statistics.request_bytes += box_read_size;
213
12.0k
    if (*bytes_read < result.size && box_read_size < merge_read_size) {
214
0
        return Status::IOError("Can't read enough bytes in merged IO");
215
0
    }
216
12.0k
    return Status::OK();
217
12.0k
}
218
219
1.05M
int MergeRangeFileReader::_search_read_range(size_t start_offset, size_t end_offset) {
220
1.05M
    if (_random_access_ranges.empty()) {
221
0
        return -1;
222
0
    }
223
1.05M
    int left = 0, right = cast_set<int>(_random_access_ranges.size()) - 1;
224
3.05M
    do {
225
3.05M
        int mid = left + (right - left) / 2;
226
3.05M
        const PrefetchRange& range = _random_access_ranges[mid];
227
3.05M
        if (range.start_offset <= start_offset && start_offset < range.end_offset) {
228
1.05M
            if (range.start_offset <= end_offset && end_offset <= range.end_offset) {
229
1.05M
                return mid;
230
1.05M
            } else {
231
8
                return -1;
232
8
            }
233
1.99M
        } else if (range.start_offset > start_offset) {
234
774k
            right = mid - 1;
235
1.22M
        } else {
236
1.22M
            left = mid + 1;
237
1.22M
        }
238
3.05M
    } while (left <= right);
239
772
    return -1;
240
1.05M
}
241
242
94.1k
void MergeRangeFileReader::_clean_cached_data(RangeCachedData& cached_data) {
243
94.1k
    if (!cached_data.empty()) {
244
0
        for (int i = 0; i < cached_data.ref_box.size(); ++i) {
245
0
            DCHECK_GT(cached_data.box_end_offset[i], cached_data.box_start_offset[i]);
246
0
            int16_t box_index = cached_data.ref_box[i];
247
0
            DCHECK_GT(_box_ref[box_index], 0);
248
0
            _box_ref[box_index]--;
249
0
        }
250
0
    }
251
94.1k
    cached_data.reset();
252
94.1k
}
253
254
96.0k
void MergeRangeFileReader::_dec_box_ref(int16_t box_index) {
255
96.0k
    if (--_box_ref[box_index] == 0) {
256
13.1k
        _remaining += BOX_SIZE;
257
13.1k
    }
258
96.0k
    if (box_index == _last_box_ref) {
259
11.4k
        _last_box_ref = -1;
260
11.4k
        _last_box_usage = 0;
261
11.4k
    }
262
96.0k
}
263
264
void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t offset, Slice result,
265
1.04M
                                        size_t* bytes_read) {
266
1.04M
    SCOPED_RAW_TIMER(&_statistics.copy_time);
267
1.05M
    auto handle_in_box = [&](size_t remaining, char* copy_out) {
268
1.05M
        size_t to_handle = remaining;
269
1.05M
        int cleaned_box = 0;
270
2.10M
        for (int i = 0; i < cached_data.ref_box.size() && remaining > 0; ++i) {
271
1.05M
            int16_t box_index = cached_data.ref_box[i];
272
1.05M
            size_t box_to_handle = std::min(remaining, (size_t)(cached_data.box_end_offset[i] -
273
1.05M
                                                                cached_data.box_start_offset[i]));
274
1.05M
            if (copy_out != nullptr) {
275
1.04M
            }
276
1.05M
            if (copy_out != nullptr) {
277
1.04M
                memcpy(copy_out + to_handle - remaining,
278
1.04M
                       _boxes[box_index].data() + cached_data.box_start_offset[i], box_to_handle);
279
1.04M
            }
280
1.05M
            remaining -= box_to_handle;
281
1.05M
            cached_data.box_start_offset[i] += box_to_handle;
282
1.05M
            if (cached_data.box_start_offset[i] == cached_data.box_end_offset[i]) {
283
96.0k
                cleaned_box++;
284
96.0k
                _dec_box_ref(box_index);
285
96.0k
            }
286
1.05M
        }
287
1.05M
        DCHECK_EQ(remaining, 0);
288
1.05M
        if (cleaned_box > 0) {
289
95.9k
            cached_data.ref_box.erase(cached_data.ref_box.begin(),
290
95.9k
                                      cached_data.ref_box.begin() + cleaned_box);
291
95.9k
            cached_data.box_start_offset.erase(cached_data.box_start_offset.begin(),
292
95.9k
                                               cached_data.box_start_offset.begin() + cleaned_box);
293
95.9k
            cached_data.box_end_offset.erase(cached_data.box_end_offset.begin(),
294
95.9k
                                             cached_data.box_end_offset.begin() + cleaned_box);
295
95.9k
        }
296
1.05M
        cached_data.start_offset += to_handle;
297
1.05M
        if (cached_data.start_offset == cached_data.end_offset) {
298
94.1k
            _clean_cached_data(cached_data);
299
94.1k
        }
300
1.05M
    };
301
302
1.04M
    if (offset > cached_data.start_offset) {
303
        // the data in range may be skipped
304
6.93k
        size_t to_skip = offset - cached_data.start_offset;
305
6.93k
        handle_in_box(to_skip, nullptr);
306
6.93k
    }
307
308
1.04M
    size_t to_read = std::min(cached_data.end_offset - cached_data.start_offset, result.size);
309
1.04M
    handle_in_box(to_read, result.data);
310
1.04M
    *bytes_read = to_read;
311
1.04M
}
312
313
Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read,
314
12.1k
                                       size_t* bytes_read, const IOContext* io_ctx) {
315
12.1k
    if (!_read_slice) {
316
11.5k
        _read_slice = std::make_unique<OwnedSlice>(_merged_read_slice_size);
317
11.5k
    }
318
319
12.1k
    *bytes_read = 0;
320
12.1k
    {
321
12.1k
        SCOPED_RAW_TIMER(&_statistics.read_time);
322
12.1k
        RETURN_IF_ERROR(_reader->read_at(start_offset, Slice(_read_slice->data(), to_read),
323
12.1k
                                         bytes_read, io_ctx));
324
12.1k
        _statistics.merged_io++;
325
12.1k
        _statistics.merged_bytes += *bytes_read;
326
12.1k
    }
327
328
12.1k
    SCOPED_RAW_TIMER(&_statistics.copy_time);
329
12.1k
    size_t copy_start = start_offset;
330
12.1k
    const size_t copy_end = start_offset + *bytes_read;
331
    // copy data into small boxes
332
    // tuple(box_index, box_start_offset, file_start_offset, file_end_offset)
333
12.1k
    std::vector<std::tuple<int16_t, uint32_t, size_t, size_t>> filled_boxes;
334
335
96.8k
    auto fill_box = [&](int16_t fill_box_ref, uint32_t box_usage, size_t box_copy_end) {
336
96.8k
        size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - box_usage);
337
96.8k
        memcpy(_boxes[fill_box_ref].data() + box_usage,
338
96.8k
               _read_slice->data() + copy_start - start_offset, copy_size);
339
96.8k
        filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, copy_start + copy_size);
340
96.8k
        copy_start += copy_size;
341
96.8k
        _last_box_ref = fill_box_ref;
342
96.8k
        _last_box_usage = box_usage + cast_set<int>(copy_size);
343
96.8k
        _box_ref[fill_box_ref]++;
344
96.8k
        if (box_usage == 0) {
345
13.5k
            _remaining -= BOX_SIZE;
346
13.5k
        }
347
96.8k
    };
348
349
12.1k
    for (int fill_range_index = range_index;
350
107k
         fill_range_index < _random_access_ranges.size() && copy_start < copy_end;
351
94.9k
         ++fill_range_index) {
352
94.9k
        RangeCachedData& fill_range_cache = _range_cached_data[fill_range_index];
353
94.9k
        DCHECK(fill_range_cache.empty());
354
94.9k
        fill_range_cache.reset();
355
94.9k
        const PrefetchRange& fill_range = _random_access_ranges[fill_range_index];
356
94.9k
        if (fill_range.start_offset > copy_start) {
357
            // don't copy hollow data
358
46.0k
            size_t hollow_size = fill_range.start_offset - copy_start;
359
46.0k
            DCHECK_GT(copy_end - copy_start, hollow_size);
360
46.0k
            copy_start += hollow_size;
361
46.0k
        }
362
363
94.9k
        const size_t range_copy_end = std::min(copy_end, fill_range.end_offset);
364
        // reuse the remaining capacity of last box
365
94.9k
        if (_last_box_ref >= 0 && _last_box_usage < BOX_SIZE) {
366
83.2k
            fill_box(_last_box_ref, _last_box_usage, range_copy_end);
367
83.2k
        }
368
        // reuse the former released box
369
102k
        for (int16_t i = 0; i < _boxes.size() && copy_start < range_copy_end; ++i) {
370
7.58k
            if (_box_ref[i] == 0) {
371
174
                fill_box(i, 0, range_copy_end);
372
174
            }
373
7.58k
        }
374
        // apply for new box to copy data
375
108k
        while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) {
376
13.4k
            _boxes.emplace_back(BOX_SIZE);
377
13.4k
            _box_ref.emplace_back(0);
378
13.4k
            fill_box(cast_set<int16_t>(_boxes.size()) - 1, 0, range_copy_end);
379
13.4k
        }
380
94.9k
        DCHECK_EQ(copy_start, range_copy_end);
381
382
94.9k
        if (!filled_boxes.empty()) {
383
94.9k
            fill_range_cache.start_offset = std::get<2>(filled_boxes[0]);
384
94.9k
            fill_range_cache.end_offset = std::get<3>(filled_boxes.back());
385
96.8k
            for (auto& tuple : filled_boxes) {
386
96.8k
                fill_range_cache.ref_box.emplace_back(std::get<0>(tuple));
387
96.8k
                fill_range_cache.box_start_offset.emplace_back(std::get<1>(tuple));
388
96.8k
                fill_range_cache.box_end_offset.emplace_back(
389
96.8k
                        std::get<1>(tuple) + std::get<3>(tuple) - std::get<2>(tuple));
390
96.8k
            }
391
94.9k
            filled_boxes.clear();
392
94.9k
        }
393
94.9k
    }
394
12.1k
    return Status::OK();
395
12.1k
}
396
397
// there exists occasions where the buffer is already closed but
398
// some prior tasks are still queued in thread pool, so we have to check whether
399
// the buffer is closed each time the condition variable is notified.
400
127
void PrefetchBuffer::reset_offset(size_t offset) {
401
127
    {
402
127
        std::unique_lock lck {_lock};
403
127
        if (!_prefetched.wait_for(
404
127
                    lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
405
127
                    [this]() { return _buffer_status != BufferStatus::PENDING; })) {
406
0
            _prefetch_status = Status::TimedOut("time out when reset prefetch buffer");
407
0
            return;
408
0
        }
409
127
        if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
410
0
            _prefetched.notify_all();
411
0
            return;
412
0
        }
413
127
        _buffer_status = BufferStatus::RESET;
414
127
        _offset = offset;
415
127
        _prefetched.notify_all();
416
127
    }
417
127
    if (UNLIKELY(offset >= _file_range.end_offset)) {
418
47
        _len = 0;
419
47
        _exceed = true;
420
47
        return;
421
80
    } else {
422
80
        _exceed = false;
423
80
    }
424
    // Lazy-allocate the backing buffer in the calling (query) thread, which has a
425
    // MemTrackerLimiter attached. The prefetch thread pool threads are "Orphan" threads
426
    // without a tracker, so allocation must not happen there.
427
80
    if (_buf.empty()) {
428
68
        _buf.resize(_size);
429
68
    }
430
80
    _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
431
80
            [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
432
80
}
433
434
// only this function would run concurrently in another thread
435
80
void PrefetchBuffer::prefetch_buffer() {
436
80
    {
437
80
        std::unique_lock lck {_lock};
438
80
        if (!_prefetched.wait_for(
439
80
                    lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
440
80
                    [this]() {
441
80
                        return _buffer_status == BufferStatus::RESET ||
442
80
                               _buffer_status == BufferStatus::CLOSED;
443
80
                    })) {
444
0
            _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
445
0
            return;
446
0
        }
447
        // in case buffer is already closed
448
80
        if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
449
0
            _prefetched.notify_all();
450
0
            return;
451
0
        }
452
80
        _buffer_status = BufferStatus::PENDING;
453
80
        _prefetched.notify_all();
454
80
    }
455
456
0
    int read_range_index = search_read_range(_offset);
457
80
    size_t buf_size;
458
80
    if (read_range_index == -1) {
459
80
        buf_size =
460
80
                _file_range.end_offset - _offset > _size ? _size : _file_range.end_offset - _offset;
461
80
    } else {
462
0
        buf_size = merge_small_ranges(_offset, read_range_index);
463
0
    }
464
465
80
    _len = 0;
466
80
    Status s;
467
468
80
    {
469
80
        SCOPED_RAW_TIMER(&_statis.read_time);
470
80
        s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, _io_ctx);
471
80
    }
472
80
    if (UNLIKELY(s.ok() && buf_size != _len)) {
473
        // This indicates that the data size returned by S3 object storage is smaller than what we requested,
474
        // which seems to be a violation of the S3 protocol since our request range was valid.
475
        // We currently consider this situation a bug and will treat this task as a failure.
476
0
        s = Status::InternalError("Data size returned by S3 is smaller than requested");
477
0
        LOG(WARNING) << "Data size returned by S3 is smaller than requested" << _reader->path()
478
0
                     << " request bytes " << buf_size << " returned size " << _len;
479
0
    }
480
80
    g_bytes_downloaded << _len;
481
80
    _statis.prefetch_request_io += 1;
482
80
    _statis.prefetch_request_bytes += _len;
483
80
    std::unique_lock lck {_lock};
484
80
    if (!_prefetched.wait_for(lck,
485
80
                              std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
486
80
                              [this]() { return _buffer_status == BufferStatus::PENDING; })) {
487
0
        _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
488
0
        return;
489
0
    }
490
80
    if (!s.ok() && _offset < _reader->size()) {
491
        // We should print the error msg since this buffer might not be accessed by the consumer
492
        // which would result in the status being missed
493
0
        LOG_WARNING("prefetch path {} failed, offset {}, error {}", _reader->path().native(),
494
0
                    _offset, s.to_string());
495
0
        _prefetch_status = std::move(s);
496
0
    }
497
80
    _buffer_status = BufferStatus::PREFETCHED;
498
80
    _prefetched.notify_all();
499
    // eof would come up with len == 0, it would be handled by read_buffer
500
80
}
501
502
80
int PrefetchBuffer::search_read_range(size_t off) const {
503
80
    if (_random_access_ranges == nullptr || _random_access_ranges->empty()) {
504
80
        return -1;
505
80
    }
506
0
    const std::vector<PrefetchRange>& random_access_ranges = *_random_access_ranges;
507
0
    int left = 0, right = cast_set<int>(random_access_ranges.size()) - 1;
508
0
    do {
509
0
        int mid = left + (right - left) / 2;
510
0
        const PrefetchRange& range = random_access_ranges[mid];
511
0
        if (range.start_offset <= off && range.end_offset > off) {
512
0
            return mid;
513
0
        } else if (range.start_offset > off) {
514
0
            right = mid;
515
0
        } else {
516
0
            left = mid + 1;
517
0
        }
518
0
    } while (left < right);
519
0
    if (random_access_ranges[right].start_offset > off) {
520
0
        return right;
521
0
    } else {
522
0
        return -1;
523
0
    }
524
0
}
525
526
0
size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const {
527
0
    if (_random_access_ranges == nullptr || _random_access_ranges->empty()) {
528
0
        return _size;
529
0
    }
530
0
    int64_t remaining = _size;
531
0
    const std::vector<PrefetchRange>& random_access_ranges = *_random_access_ranges;
532
0
    while (remaining > 0 && range_index < random_access_ranges.size()) {
533
0
        const PrefetchRange& range = random_access_ranges[range_index];
534
0
        if (range.start_offset <= off && range.end_offset > off) {
535
0
            remaining -= range.end_offset - off;
536
0
            off = range.end_offset;
537
0
            range_index++;
538
0
        } else if (range.start_offset > off) {
539
            // merge small range
540
0
            size_t hollow = range.start_offset - off;
541
0
            if (hollow < remaining) {
542
0
                remaining -= hollow;
543
0
                off = range.start_offset;
544
0
            } else {
545
0
                break;
546
0
            }
547
0
        } else {
548
0
            DCHECK(false);
549
0
        }
550
0
    }
551
0
    if (remaining < 0 || remaining == _size) {
552
0
        remaining = 0;
553
0
    }
554
0
    return _size - remaining;
555
0
}
556
557
Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
558
131
                                   size_t* bytes_read) {
559
131
    if (UNLIKELY(off >= _file_range.end_offset)) {
560
        // Reader can read out of [start_offset, end_offset) by synchronous method.
561
0
        return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx);
562
0
    }
563
131
    if (_exceed) {
564
0
        reset_offset((off / _size) * _size);
565
0
        return read_buffer(off, out, buf_len, bytes_read);
566
0
    }
567
131
    {
568
131
        std::unique_lock lck {_lock};
569
        // buffer must be prefetched or it's closed
570
131
        if (!_prefetched.wait_for(
571
131
                    lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
572
155
                    [this]() {
573
155
                        return _buffer_status == BufferStatus::PREFETCHED ||
574
155
                               _buffer_status == BufferStatus::CLOSED;
575
155
                    })) {
576
0
            _prefetch_status = Status::TimedOut("time out when read prefetch buffer");
577
0
            return _prefetch_status;
578
0
        }
579
131
        if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) {
580
0
            return Status::OK();
581
0
        }
582
131
    }
583
131
    RETURN_IF_ERROR(_prefetch_status);
584
    // there is only parquet would do not sequence read
585
    // it would read the end of the file first
586
131
    if (UNLIKELY(!contains(off))) {
587
0
        reset_offset((off / _size) * _size);
588
0
        return read_buffer(off, out, buf_len, bytes_read);
589
0
    }
590
131
    if (UNLIKELY(0 == _len || _offset + _len < off)) {
591
0
        return Status::OK();
592
0
    }
593
594
131
    {
595
131
        LIMIT_REMOTE_SCAN_IO(bytes_read);
596
        // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has
597
131
        size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off});
598
131
        {
599
131
            SCOPED_RAW_TIMER(&_statis.copy_time);
600
131
            memcpy((void*)out, _buf.data() + (off - _offset), read_len);
601
131
        }
602
131
        *bytes_read = read_len;
603
131
        _statis.request_io += 1;
604
131
        _statis.request_bytes += read_len;
605
131
    }
606
131
    if (off + *bytes_read == _offset + _len) {
607
47
        reset_offset(_offset + _whole_buffer_size);
608
47
    }
609
131
    return Status::OK();
610
131
}
611
612
80
void PrefetchBuffer::close() {
613
80
    std::unique_lock lck {_lock};
614
    // in case _reader still tries to write to the buf after we close the buffer
615
80
    if (!_prefetched.wait_for(lck,
616
80
                              std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
617
80
                              [this]() { return _buffer_status != BufferStatus::PENDING; })) {
618
0
        _prefetch_status = Status::TimedOut("time out when close prefetch buffer");
619
0
        return;
620
0
    }
621
80
    _buffer_status = BufferStatus::CLOSED;
622
80
    _prefetched.notify_all();
623
    // Explicitly release the backing buffer here, in the calling (query) thread which has a
624
    // MemTrackerLimiter. The destructor may run in the thread pool's Orphan thread (when the
625
    // last shared_ptr ref is released after the prefetch lambda completes), so we must not
626
    // rely on ~PODArray() to release memory — that would trigger memory_orphan_check().
627
80
    PODArray<char>().swap(_buf);
628
80
}
629
630
0
void PrefetchBuffer::_collect_profile_before_close() {
631
0
    if (_sync_profile != nullptr) {
632
0
        _sync_profile(*this);
633
0
    }
634
0
}
635
636
// buffered reader
637
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
638
                                               PrefetchRange file_range,
639
                                               std::shared_ptr<const IOContext> io_ctx,
640
                                               int64_t buffer_size)
641
20
        : _reader(std::move(reader)), _file_range(file_range), _io_ctx_holder(std::move(io_ctx)) {
642
20
    if (_io_ctx_holder == nullptr) {
643
4
        _io_ctx_holder = std::make_shared<IOContext>();
644
4
    }
645
20
    _io_ctx = _io_ctx_holder.get();
646
20
    if (buffer_size == -1L) {
647
20
        buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
648
20
    }
649
20
    _size = _reader->size();
650
20
    _whole_pre_buffer_size = buffer_size;
651
20
    _file_range.end_offset = std::min(_file_range.end_offset, _size);
652
20
    int buffer_num = buffer_size > s_max_pre_buffer_size
653
20
                             ? cast_set<int>(buffer_size) / cast_set<int>(s_max_pre_buffer_size)
654
20
                             : 1;
655
20
    std::function<void(PrefetchBuffer&)> sync_buffer = nullptr;
656
20
    if (profile != nullptr) {
657
16
        const char* prefetch_buffered_reader = "PrefetchBufferedReader";
658
16
        ADD_TIMER(profile, prefetch_buffered_reader);
659
16
        auto copy_time = ADD_CHILD_TIMER(profile, "CopyTime", prefetch_buffered_reader);
660
16
        auto read_time = ADD_CHILD_TIMER(profile, "ReadTime", prefetch_buffered_reader);
661
16
        auto prefetch_request_io =
662
16
                ADD_CHILD_COUNTER(profile, "PreRequestIO", TUnit::UNIT, prefetch_buffered_reader);
663
16
        auto prefetch_request_bytes = ADD_CHILD_COUNTER(profile, "PreRequestBytes", TUnit::BYTES,
664
16
                                                        prefetch_buffered_reader);
665
16
        auto request_io =
666
16
                ADD_CHILD_COUNTER(profile, "RequestIO", TUnit::UNIT, prefetch_buffered_reader);
667
16
        auto request_bytes =
668
16
                ADD_CHILD_COUNTER(profile, "RequestBytes", TUnit::BYTES, prefetch_buffered_reader);
669
16
        sync_buffer = [=](PrefetchBuffer& buf) {
670
0
            COUNTER_UPDATE(copy_time, buf._statis.copy_time);
671
0
            COUNTER_UPDATE(read_time, buf._statis.read_time);
672
0
            COUNTER_UPDATE(prefetch_request_io, buf._statis.prefetch_request_io);
673
0
            COUNTER_UPDATE(prefetch_request_bytes, buf._statis.prefetch_request_bytes);
674
0
            COUNTER_UPDATE(request_io, buf._statis.request_io);
675
0
            COUNTER_UPDATE(request_bytes, buf._statis.request_bytes);
676
0
        };
677
16
    }
678
    // set the _cur_offset of this reader as same as the inner reader's,
679
    // to make sure the buffer reader will start to read at right position.
680
100
    for (int i = 0; i < buffer_num; i++) {
681
80
        _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
682
80
                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(),
683
80
                _io_ctx_holder, sync_buffer));
684
80
    }
685
20
}
686
687
20
PrefetchBufferedReader::~PrefetchBufferedReader() {
688
    /// Better not to call virtual functions in a destructor.
689
20
    static_cast<void>(_close_internal());
690
20
}
691
692
Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
693
120
                                            const IOContext* io_ctx) {
694
120
    if (!_initialized) {
695
20
        reset_all_buffer(offset);
696
20
        _initialized = true;
697
20
    }
698
120
    if (UNLIKELY(result.get_size() == 0 || offset >= size())) {
699
13
        *bytes_read = 0;
700
13
        return Status::OK();
701
13
    }
702
107
    size_t nbytes = result.get_size();
703
107
    int actual_bytes_read = 0;
704
238
    while (actual_bytes_read < nbytes && offset < size()) {
705
131
        size_t read_num = 0;
706
131
        auto buffer_pos = get_buffer_pos(offset);
707
131
        RETURN_IF_ERROR(
708
131
                _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read,
709
131
                                                      nbytes - actual_bytes_read, &read_num));
710
131
        actual_bytes_read += read_num;
711
131
        offset += read_num;
712
131
    }
713
107
    *bytes_read = actual_bytes_read;
714
107
    return Status::OK();
715
107
}
716
717
8
Status PrefetchBufferedReader::close() {
718
8
    return _close_internal();
719
8
}
720
721
28
Status PrefetchBufferedReader::_close_internal() {
722
28
    if (!_closed) {
723
20
        _closed = true;
724
20
        std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
725
80
                      [](std::shared_ptr<PrefetchBuffer>& buffer) { buffer->close(); });
726
20
        return _reader->close();
727
20
    }
728
729
8
    return Status::OK();
730
28
}
731
732
0
void PrefetchBufferedReader::_collect_profile_before_close() {
733
0
    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
734
0
                  [](std::shared_ptr<PrefetchBuffer>& buffer) {
735
0
                      buffer->collect_profile_before_close();
736
0
                  });
737
0
    if (_reader != nullptr) {
738
0
        _reader->collect_profile_before_close();
739
0
    }
740
0
}
741
742
// InMemoryFileReader
743
33.5k
InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) {
744
33.5k
    _size = _reader->size();
745
33.5k
}
746
747
33.5k
InMemoryFileReader::~InMemoryFileReader() {
748
33.5k
    static_cast<void>(_close_internal());
749
33.5k
}
750
751
202
Status InMemoryFileReader::close() {
752
202
    return _close_internal();
753
202
}
754
755
33.7k
Status InMemoryFileReader::_close_internal() {
756
33.7k
    if (!_closed) {
757
33.5k
        _closed = true;
758
33.5k
        return _reader->close();
759
33.5k
    }
760
196
    return Status::OK();
761
33.7k
}
762
763
Status InMemoryFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
764
186k
                                        const IOContext* io_ctx) {
765
186k
    if (_data == nullptr) {
766
31.9k
        _data = std::make_unique_for_overwrite<char[]>(_size);
767
768
31.9k
        size_t file_size = 0;
769
31.9k
        RETURN_IF_ERROR(_reader->read_at(0, Slice(_data.get(), _size), &file_size, io_ctx));
770
31.9k
        DCHECK_EQ(file_size, _size);
771
31.9k
    }
772
186k
    if (UNLIKELY(offset > _size)) {
773
0
        return Status::IOError("Out of bounds access");
774
0
    }
775
186k
    *bytes_read = std::min(result.size, _size - offset);
776
186k
    memcpy(result.data, _data.get() + offset, *bytes_read);
777
186k
    return Status::OK();
778
186k
}
779
780
28.2k
void InMemoryFileReader::_collect_profile_before_close() {
781
28.2k
    if (_reader != nullptr) {
782
28.2k
        _reader->collect_profile_before_close();
783
28.2k
    }
784
28.2k
}
785
786
// BufferedFileStreamReader
787
BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset,
788
                                                   uint64_t length, size_t max_buf_size)
789
172k
        : _file(file),
790
172k
          _file_start_offset(offset),
791
172k
          _file_end_offset(offset + length),
792
172k
          _max_buf_size(max_buf_size) {}
793
794
Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset,
795
2.17M
                                            const size_t bytes_to_read, const IOContext* io_ctx) {
796
2.17M
    if (offset < _file_start_offset || offset >= _file_end_offset ||
797
2.17M
        offset + bytes_to_read > _file_end_offset) {
798
0
        return Status::IOError(
799
0
                "Out-of-bounds Access: offset={}, bytes_to_read={}, file_start={}, "
800
0
                "file_end={}",
801
0
                offset, bytes_to_read, _file_start_offset, _file_end_offset);
802
0
    }
803
2.17M
    int64_t end_offset = offset + bytes_to_read;
804
2.17M
    if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) {
805
1.05M
        *buf = _buf.get() + offset - _buf_start_offset;
806
1.05M
        return Status::OK();
807
1.05M
    }
808
1.12M
    size_t buf_size = std::max(_max_buf_size, bytes_to_read);
809
1.12M
    if (_buf_size < buf_size) {
810
182k
        auto new_buf = make_unique_buffer<uint8_t>(buf_size);
811
182k
        if (offset >= _buf_start_offset && offset < _buf_end_offset) {
812
10.6k
            memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset,
813
10.6k
                   _buf_end_offset - offset);
814
10.6k
        }
815
182k
        _buf = std::move(new_buf);
816
182k
        _buf_size = buf_size;
817
940k
    } else if (offset > _buf_start_offset && offset < _buf_end_offset) {
818
902k
        memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset);
819
902k
    }
820
1.12M
    if (offset < _buf_start_offset || offset >= _buf_end_offset) {
821
210k
        _buf_end_offset = offset;
822
210k
    }
823
1.12M
    _buf_start_offset = offset;
824
1.12M
    int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
825
1.12M
    int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset);
826
1.12M
    int64_t has_read = 0;
827
2.24M
    while (has_read < to_read) {
828
1.12M
        size_t loop_read = 0;
829
1.12M
        Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read);
830
1.12M
        RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx));
831
1.12M
        if (loop_read == 0) {
832
0
            break;
833
0
        }
834
1.12M
        has_read += loop_read;
835
1.12M
    }
836
1.12M
    if (has_read != to_read) {
837
0
        return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read);
838
0
    }
839
1.12M
    _buf_end_offset += to_read;
840
1.12M
    *buf = _buf.get();
841
1.12M
    return Status::OK();
842
1.12M
}
843
844
Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset,
845
797k
                                            const IOContext* io_ctx) {
846
797k
    return read_bytes((const uint8_t**)&slice.data, offset, slice.size, io_ctx);
847
797k
}
848
849
Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
850
        RuntimeProfile* profile, const FileSystemProperties& system_properties,
851
        const FileDescription& file_description, const io::FileReaderOptions& reader_options,
852
74.2k
        AccessMode access_mode, const IOContext* io_ctx, const PrefetchRange file_range) {
853
74.2k
    std::shared_ptr<const IOContext> io_ctx_holder;
854
74.2k
    if (io_ctx != nullptr) {
855
        // Old API: best-effort safety by copying the IOContext onto the heap.
856
74.1k
        io_ctx_holder = std::make_shared<IOContext>(*io_ctx);
857
74.1k
    }
858
74.2k
    return create_file_reader(profile, system_properties, file_description, reader_options,
859
74.2k
                              access_mode, std::move(io_ctx_holder), file_range);
860
74.2k
}
861
862
Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
863
        RuntimeProfile* profile, const FileSystemProperties& system_properties,
864
        const FileDescription& file_description, const io::FileReaderOptions& reader_options,
865
        AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
866
75.1k
        const PrefetchRange file_range) {
867
75.1k
    if (io_ctx == nullptr) {
868
41
        io_ctx = std::make_shared<IOContext>();
869
41
    }
870
75.1k
    return FileFactory::create_file_reader(system_properties, file_description, reader_options,
871
75.1k
                                           profile)
872
75.3k
            .transform([&](auto&& reader) -> io::FileReaderSPtr {
873
75.3k
                if (reader->size() < config::in_memory_file_size &&
874
75.3k
                    typeid_cast<io::S3FileReader*>(reader.get())) {
875
33.5k
                    return std::make_shared<InMemoryFileReader>(std::move(reader));
876
33.5k
                }
877
878
41.8k
                if (access_mode == AccessMode::SEQUENTIAL) {
879
5.76k
                    bool is_thread_safe = false;
880
5.76k
                    if (typeid_cast<io::S3FileReader*>(reader.get())) {
881
16
                        is_thread_safe = true;
882
5.74k
                    } else if (auto* cached_reader =
883
5.74k
                                       typeid_cast<io::CachedRemoteFileReader*>(reader.get());
884
5.74k
                               cached_reader &&
885
5.74k
                               typeid_cast<io::S3FileReader*>(cached_reader->get_remote_reader())) {
886
0
                        is_thread_safe = true;
887
0
                    }
888
5.76k
                    if (is_thread_safe) {
889
                        // PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently.
890
16
                        return std::make_shared<io::PrefetchBufferedReader>(
891
16
                                profile, std::move(reader), file_range, io_ctx);
892
16
                    }
893
5.76k
                }
894
895
41.8k
                return reader;
896
41.8k
            });
897
75.1k
}
898
899
Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
900
6
                                             io::PrefetchRange& result_range) {
901
9
    while (index < _ranges.size()) {
902
9
        io::PrefetchRange& range = _ranges[index];
903
9
        if (range.end_offset > desired_offset) {
904
6
            if (range.start_offset > desired_offset) [[unlikely]] {
905
0
                return Status::InvalidArgument("Invalid desiredOffset");
906
0
            }
907
6
            result_range = range;
908
6
            return Status::OK();
909
6
        }
910
3
        ++index;
911
3
    }
912
0
    return Status::InvalidArgument("Invalid desiredOffset");
913
6
}
914
915
RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
916
                                           std::shared_ptr<RangeFinder> range_finder)
917
23.1k
        : _profile(profile),
918
23.1k
          _inner_reader(std::move(inner_reader)),
919
23.1k
          _range_finder(std::move(range_finder)) {
920
23.1k
    _size = _inner_reader->size();
921
23.1k
    uint64_t max_cache_size =
922
23.1k
            std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size());
923
23.1k
    _cache = OwnedSlice(max_cache_size);
924
925
23.2k
    if (_profile != nullptr) {
926
23.2k
        const char* random_profile = "RangeCacheFileReader";
927
23.2k
        ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
928
23.2k
        _request_io =
929
23.2k
                ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1);
930
23.2k
        _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
931
23.2k
                                                      random_profile, 1);
932
23.2k
        _request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1);
933
23.2k
        _read_to_cache_time =
934
23.2k
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1);
935
23.2k
        _cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount",
936
23.2k
                                                            TUnit::UNIT, random_profile, 1);
937
23.2k
        _read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes",
938
23.2k
                                                            TUnit::BYTES, random_profile, 1);
939
23.2k
    }
940
23.1k
}
941
942
Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
943
6
                                          const IOContext* io_ctx) {
944
6
    auto request_size = result.size;
945
946
6
    _cache_statistics.request_io++;
947
6
    _cache_statistics.request_bytes += request_size;
948
6
    SCOPED_RAW_TIMER(&_cache_statistics.request_time);
949
950
6
    PrefetchRange range;
951
6
    if (_range_finder->get_range_for(offset, range)) [[likely]] {
952
6
        if (_current_start_offset != range.start_offset) { // need read new range to cache.
953
6
            auto range_size = range.end_offset - range.start_offset;
954
955
6
            _cache_statistics.cache_refresh_count++;
956
6
            _cache_statistics.read_to_cache_bytes += range_size;
957
6
            SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);
958
959
6
            Slice cache_slice = {_cache.data(), range_size};
960
6
            RETURN_IF_ERROR(
961
6
                    _inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx));
962
963
6
            if (*bytes_read != range_size) [[unlikely]] {
964
0
                return Status::InternalError(
965
0
                        "RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
966
0
                        *bytes_read, range_size);
967
0
            }
968
969
6
            _current_start_offset = range.start_offset;
970
6
        }
971
972
6
        int64_t buffer_offset = offset - _current_start_offset;
973
6
        memcpy(result.data, _cache.data() + buffer_offset, request_size);
974
6
        *bytes_read = request_size;
975
976
6
        return Status::OK();
977
6
    } else {
978
0
        return Status::InternalError("RangeCacheFileReader read  not in Ranges. Offset = {}",
979
0
                                     offset);
980
        //                RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
981
        //                return Status::OK();
982
        // think return error is ok,otherwise it will cover up the error.
983
0
    }
984
6
}
985
986
23.2k
void RangeCacheFileReader::_collect_profile_before_close() {
987
23.2k
    if (_profile != nullptr) {
988
23.2k
        COUNTER_UPDATE(_request_io, _cache_statistics.request_io);
989
23.2k
        COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes);
990
23.2k
        COUNTER_UPDATE(_request_time, _cache_statistics.request_time);
991
23.2k
        COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time);
992
23.2k
        COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count);
993
23.2k
        COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes);
994
23.2k
        if (_inner_reader != nullptr) {
995
23.2k
            _inner_reader->collect_profile_before_close();
996
23.2k
        }
997
23.2k
    }
998
23.2k
}
999
1000
} // namespace io
1001
1002
} // namespace doris