Coverage Report

Created: 2026-04-10 05:02

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/buffered_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/buffered_reader.h"
19
20
#include <bvar/reducer.h>
21
#include <bvar/window.h>
22
#include <string.h>
23
24
#include <algorithm>
25
#include <chrono>
26
#include <cstdint>
27
#include <memory>
28
29
#include "common/cast_set.h"
30
#include "common/compiler_util.h" // IWYU pragma: keep
31
#include "common/config.h"
32
#include "common/status.h"
33
#include "core/custom_allocator.h"
34
#include "runtime/exec_env.h"
35
#include "runtime/runtime_profile.h"
36
#include "runtime/thread_context.h"
37
#include "runtime/workload_management/io_throttle.h"
38
#include "util/slice.h"
39
#include "util/threadpool.h"
40
namespace doris {
41
42
#include "common/compile_check_begin.h"
43
44
namespace io {
45
struct IOContext;
46
47
// add bvar to capture the download bytes per second by buffered reader
48
bvar::Adder<uint64_t> g_bytes_downloaded("buffered_reader", "bytes_downloaded");
49
bvar::PerSecond<bvar::Adder<uint64_t>> g_bytes_downloaded_per_second("buffered_reader",
50
                                                                     "bytes_downloaded_per_second",
51
                                                                     &g_bytes_downloaded, 60);
52
53
Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
54
1.32k
                                          const IOContext* io_ctx) {
55
1.32k
    _statistics.request_io++;
56
1.32k
    *bytes_read = 0;
57
1.32k
    if (result.size == 0) {
58
0
        return Status::OK();
59
0
    }
60
1.32k
    const int range_index = _search_read_range(offset, offset + result.size);
61
1.32k
    if (range_index < 0) {
62
0
        SCOPED_RAW_TIMER(&_statistics.read_time);
63
0
        Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
64
0
        _statistics.merged_io++;
65
0
        _statistics.request_bytes += *bytes_read;
66
0
        _statistics.merged_bytes += *bytes_read;
67
0
        return st;
68
0
    }
69
1.32k
    if (offset + result.size > _random_access_ranges[range_index].end_offset) {
70
        // return _reader->read_at(offset, result, bytes_read, io_ctx);
71
0
        return Status::IOError("Range in RandomAccessReader should be read sequentially");
72
0
    }
73
74
1.32k
    size_t has_read = 0;
75
1.32k
    RangeCachedData& cached_data = _range_cached_data[range_index];
76
1.32k
    cached_data.has_read = true;
77
1.32k
    if (cached_data.contains(offset)) {
78
        // has cached data in box
79
1.26k
        _read_in_box(cached_data, offset, result, &has_read);
80
1.26k
        _statistics.request_bytes += has_read;
81
1.26k
        if (has_read == result.size) {
82
            // all data is read in cache
83
1.26k
            *bytes_read = has_read;
84
1.26k
            return Status::OK();
85
1.26k
        }
86
1.26k
    } else if (!cached_data.empty()) {
87
        // the data in range may be skipped or ignored
88
0
        for (int16_t box_index : cached_data.ref_box) {
89
0
            _dec_box_ref(box_index);
90
0
        }
91
0
        cached_data.reset();
92
0
    }
93
94
66
    size_t to_read = result.size - has_read;
95
66
    if (to_read >= SMALL_IO || to_read >= _remaining) {
96
0
        SCOPED_RAW_TIMER(&_statistics.read_time);
97
0
        size_t read_size = 0;
98
0
        RETURN_IF_ERROR(_reader->read_at(offset + has_read, Slice(result.data + has_read, to_read),
99
0
                                         &read_size, io_ctx));
100
0
        *bytes_read = has_read + read_size;
101
0
        _statistics.merged_io++;
102
0
        _statistics.request_bytes += read_size;
103
0
        _statistics.merged_bytes += read_size;
104
0
        return Status::OK();
105
0
    }
106
107
    // merge small IO
108
66
    size_t merge_start = offset + has_read;
109
66
    const size_t merge_end = merge_start + _merged_read_slice_size;
110
    // <slice_size, is_content>
111
66
    std::vector<std::pair<size_t, bool>> merged_slice;
112
66
    size_t content_size = 0;
113
66
    size_t hollow_size = 0;
114
66
    if (merge_start > _random_access_ranges[range_index].end_offset) {
115
0
        return Status::IOError("Fail to merge small IO");
116
0
    }
117
66
    int merge_index = range_index;
118
666
    while (merge_start < merge_end && merge_index < _random_access_ranges.size()) {
119
617
        size_t content_max = _remaining - content_size;
120
617
        if (content_max == 0) {
121
0
            break;
122
0
        }
123
617
        if (merge_index != range_index && _range_cached_data[merge_index].has_read) {
124
            // don't read or merge twice
125
0
            break;
126
0
        }
127
617
        if (_random_access_ranges[merge_index].end_offset > merge_end) {
128
1
            size_t add_content = std::min(merge_end - merge_start, content_max);
129
1
            content_size += add_content;
130
1
            merge_start += add_content;
131
1
            merged_slice.emplace_back(add_content, true);
132
1
            break;
133
1
        }
134
616
        size_t add_content =
135
616
                std::min(_random_access_ranges[merge_index].end_offset - merge_start, content_max);
136
616
        content_size += add_content;
137
616
        merge_start += add_content;
138
616
        merged_slice.emplace_back(add_content, true);
139
616
        if (merge_start != _random_access_ranges[merge_index].end_offset) {
140
0
            break;
141
0
        }
142
616
        if (merge_index < _random_access_ranges.size() - 1 && merge_start < merge_end) {
143
567
            size_t gap = _random_access_ranges[merge_index + 1].start_offset -
144
567
                         _random_access_ranges[merge_index].end_offset;
145
567
            if ((content_size + hollow_size) > SMALL_IO && gap >= SMALL_IO) {
146
                // too large gap
147
0
                break;
148
0
            }
149
567
            if (gap < merge_end - merge_start && content_size < _remaining &&
150
567
                !_range_cached_data[merge_index + 1].has_read) {
151
551
                hollow_size += gap;
152
551
                merge_start = _random_access_ranges[merge_index + 1].start_offset;
153
551
                merged_slice.emplace_back(gap, false);
154
551
            } else {
155
                // there's no enough memory to read hollow data
156
16
                break;
157
16
            }
158
567
        }
159
600
        merge_index++;
160
600
    }
161
66
    content_size = 0;
162
66
    hollow_size = 0;
163
66
    std::vector<std::pair<double, size_t>> ratio_and_size;
164
    // Calculate the read amplified ratio for each merge operation and the size of the merged data.
165
    // Find the largest size of the merged data whose amplified ratio is less than config::max_amplified_read_ratio
166
1.16k
    for (const std::pair<size_t, bool>& slice : merged_slice) {
167
1.16k
        if (slice.second) {
168
617
            content_size += slice.first;
169
617
            if (slice.first > 0) {
170
617
                ratio_and_size.emplace_back((double)hollow_size / (double)content_size,
171
617
                                            content_size + hollow_size);
172
617
            }
173
617
        } else {
174
551
            hollow_size += slice.first;
175
551
        }
176
1.16k
    }
177
66
    size_t best_merged_size = 0;
178
683
    for (int i = 0; i < ratio_and_size.size(); ++i) {
179
617
        const std::pair<double, size_t>& rs = ratio_and_size[i];
180
617
        size_t equivalent_size = rs.second / (i + 1);
181
617
        if (rs.second > best_merged_size) {
182
617
            if (rs.first <= _max_amplified_ratio ||
183
617
                (_max_amplified_ratio < 1 && equivalent_size <= _equivalent_io_size)) {
184
617
                best_merged_size = rs.second;
185
617
            }
186
617
        }
187
617
    }
188
189
66
    if (best_merged_size == to_read) {
190
        // read directly to avoid copy operation
191
5
        SCOPED_RAW_TIMER(&_statistics.read_time);
192
5
        size_t read_size = 0;
193
5
        RETURN_IF_ERROR(_reader->read_at(offset + has_read, Slice(result.data + has_read, to_read),
194
5
                                         &read_size, io_ctx));
195
5
        *bytes_read = has_read + read_size;
196
5
        _statistics.merged_io++;
197
5
        _statistics.request_bytes += read_size;
198
5
        _statistics.merged_bytes += read_size;
199
5
        return Status::OK();
200
5
    }
201
202
61
    merge_start = offset + has_read;
203
61
    size_t merge_read_size = 0;
204
61
    RETURN_IF_ERROR(
205
61
            _fill_box(range_index, merge_start, best_merged_size, &merge_read_size, io_ctx));
206
61
    if (cached_data.start_offset != merge_start) {
207
0
        return Status::IOError("Wrong start offset in merged IO");
208
0
    }
209
210
    // read from cached data
211
61
    size_t box_read_size = 0;
212
61
    _read_in_box(cached_data, merge_start, Slice(result.data + has_read, to_read), &box_read_size);
213
61
    *bytes_read = has_read + box_read_size;
214
61
    _statistics.request_bytes += box_read_size;
215
61
    if (*bytes_read < result.size && box_read_size < merge_read_size) {
216
0
        return Status::IOError("Can't read enough bytes in merged IO");
217
0
    }
218
61
    return Status::OK();
219
61
}
220
221
1.32k
int MergeRangeFileReader::_search_read_range(size_t start_offset, size_t end_offset) {
222
1.32k
    if (_random_access_ranges.empty()) {
223
0
        return -1;
224
0
    }
225
1.32k
    int left = 0, right = cast_set<int>(_random_access_ranges.size()) - 1;
226
5.62k
    do {
227
5.62k
        int mid = left + (right - left) / 2;
228
5.62k
        const PrefetchRange& range = _random_access_ranges[mid];
229
5.62k
        if (range.start_offset <= start_offset && start_offset < range.end_offset) {
230
1.32k
            if (range.start_offset <= end_offset && end_offset <= range.end_offset) {
231
1.32k
                return mid;
232
1.32k
            } else {
233
0
                return -1;
234
0
            }
235
4.29k
        } else if (range.start_offset > start_offset) {
236
2.04k
            right = mid - 1;
237
2.25k
        } else {
238
2.25k
            left = mid + 1;
239
2.25k
        }
240
5.62k
    } while (left <= right);
241
0
    return -1;
242
1.32k
}
243
244
608
void MergeRangeFileReader::_clean_cached_data(RangeCachedData& cached_data) {
245
608
    if (!cached_data.empty()) {
246
0
        for (int i = 0; i < cached_data.ref_box.size(); ++i) {
247
0
            DCHECK_GT(cached_data.box_end_offset[i], cached_data.box_start_offset[i]);
248
0
            int16_t box_index = cached_data.ref_box[i];
249
0
            DCHECK_GT(_box_ref[box_index], 0);
250
0
            _box_ref[box_index]--;
251
0
        }
252
0
    }
253
608
    cached_data.reset();
254
608
}
255
256
686
void MergeRangeFileReader::_dec_box_ref(int16_t box_index) {
257
686
    if (--_box_ref[box_index] == 0) {
258
154
        _remaining += BOX_SIZE;
259
154
    }
260
686
    if (box_index == _last_box_ref) {
261
47
        _last_box_ref = -1;
262
47
        _last_box_usage = 0;
263
47
    }
264
686
}
265
266
void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t offset, Slice result,
267
1.32k
                                        size_t* bytes_read) {
268
1.32k
    SCOPED_RAW_TIMER(&_statistics.copy_time);
269
1.32k
    auto handle_in_box = [&](size_t remaining, char* copy_out) {
270
1.32k
        size_t to_handle = remaining;
271
1.32k
        int cleaned_box = 0;
272
2.72k
        for (int i = 0; i < cached_data.ref_box.size() && remaining > 0; ++i) {
273
1.39k
            int16_t box_index = cached_data.ref_box[i];
274
1.39k
            size_t box_to_handle = std::min(remaining, (size_t)(cached_data.box_end_offset[i] -
275
1.39k
                                                                cached_data.box_start_offset[i]));
276
1.39k
            if (copy_out != nullptr) {
277
1.39k
            }
278
1.39k
            if (copy_out != nullptr) {
279
1.39k
                memcpy(copy_out + to_handle - remaining,
280
1.39k
                       _boxes[box_index].data() + cached_data.box_start_offset[i], box_to_handle);
281
1.39k
            }
282
1.39k
            remaining -= box_to_handle;
283
1.39k
            cached_data.box_start_offset[i] += box_to_handle;
284
1.39k
            if (cached_data.box_start_offset[i] == cached_data.box_end_offset[i]) {
285
686
                cleaned_box++;
286
686
                _dec_box_ref(box_index);
287
686
            }
288
1.39k
        }
289
1.32k
        DCHECK_EQ(remaining, 0);
290
1.32k
        if (cleaned_box > 0) {
291
648
            cached_data.ref_box.erase(cached_data.ref_box.begin(),
292
648
                                      cached_data.ref_box.begin() + cleaned_box);
293
648
            cached_data.box_start_offset.erase(cached_data.box_start_offset.begin(),
294
648
                                               cached_data.box_start_offset.begin() + cleaned_box);
295
648
            cached_data.box_end_offset.erase(cached_data.box_end_offset.begin(),
296
648
                                             cached_data.box_end_offset.begin() + cleaned_box);
297
648
        }
298
1.32k
        cached_data.start_offset += to_handle;
299
1.32k
        if (cached_data.start_offset == cached_data.end_offset) {
300
608
            _clean_cached_data(cached_data);
301
608
        }
302
1.32k
    };
303
304
1.32k
    if (offset > cached_data.start_offset) {
305
        // the data in range may be skipped
306
1
        size_t to_skip = offset - cached_data.start_offset;
307
1
        handle_in_box(to_skip, nullptr);
308
1
    }
309
310
1.32k
    size_t to_read = std::min(cached_data.end_offset - cached_data.start_offset, result.size);
311
1.32k
    handle_in_box(to_read, result.data);
312
1.32k
    *bytes_read = to_read;
313
1.32k
}
314
315
Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read,
316
61
                                       size_t* bytes_read, const IOContext* io_ctx) {
317
61
    if (!_read_slice) {
318
45
        _read_slice = std::make_unique<OwnedSlice>(_merged_read_slice_size);
319
45
    }
320
321
61
    *bytes_read = 0;
322
61
    {
323
61
        SCOPED_RAW_TIMER(&_statistics.read_time);
324
61
        RETURN_IF_ERROR(_reader->read_at(start_offset, Slice(_read_slice->data(), to_read),
325
61
                                         bytes_read, io_ctx));
326
61
        _statistics.merged_io++;
327
61
        _statistics.merged_bytes += *bytes_read;
328
61
    }
329
330
61
    SCOPED_RAW_TIMER(&_statistics.copy_time);
331
61
    size_t copy_start = start_offset;
332
61
    const size_t copy_end = start_offset + *bytes_read;
333
    // copy data into small boxes
334
    // tuple(box_index, box_start_offset, file_start_offset, file_end_offset)
335
61
    std::vector<std::tuple<int16_t, uint32_t, size_t, size_t>> filled_boxes;
336
337
690
    auto fill_box = [&](int16_t fill_box_ref, uint32_t box_usage, size_t box_copy_end) {
338
690
        size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - box_usage);
339
690
        memcpy(_boxes[fill_box_ref].data() + box_usage,
340
690
               _read_slice->data() + copy_start - start_offset, copy_size);
341
690
        filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, copy_start + copy_size);
342
690
        copy_start += copy_size;
343
690
        _last_box_ref = fill_box_ref;
344
690
        _last_box_usage = box_usage + cast_set<int>(copy_size);
345
690
        _box_ref[fill_box_ref]++;
346
690
        if (box_usage == 0) {
347
155
            _remaining -= BOX_SIZE;
348
155
        }
349
690
    };
350
351
61
    for (int fill_range_index = range_index;
352
673
         fill_range_index < _random_access_ranges.size() && copy_start < copy_end;
353
612
         ++fill_range_index) {
354
612
        RangeCachedData& fill_range_cache = _range_cached_data[fill_range_index];
355
612
        DCHECK(fill_range_cache.empty());
356
612
        fill_range_cache.reset();
357
612
        const PrefetchRange& fill_range = _random_access_ranges[fill_range_index];
358
612
        if (fill_range.start_offset > copy_start) {
359
            // don't copy hollow data
360
180
            size_t hollow_size = fill_range.start_offset - copy_start;
361
180
            DCHECK_GT(copy_end - copy_start, hollow_size);
362
180
            copy_start += hollow_size;
363
180
        }
364
365
612
        const size_t range_copy_end = std::min(copy_end, fill_range.end_offset);
366
        // reuse the remaining capacity of last box
367
612
        if (_last_box_ref >= 0 && _last_box_usage < BOX_SIZE) {
368
535
            fill_box(_last_box_ref, _last_box_usage, range_copy_end);
369
535
        }
370
        // reuse the former released box
371
1.95k
        for (int16_t i = 0; i < _boxes.size() && copy_start < range_copy_end; ++i) {
372
1.33k
            if (_box_ref[i] == 0) {
373
8
                fill_box(i, 0, range_copy_end);
374
8
            }
375
1.33k
        }
376
        // apply for new box to copy data
377
759
        while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) {
378
147
            _boxes.emplace_back(BOX_SIZE);
379
147
            _box_ref.emplace_back(0);
380
147
            fill_box(cast_set<int16_t>(_boxes.size()) - 1, 0, range_copy_end);
381
147
        }
382
612
        DCHECK_EQ(copy_start, range_copy_end);
383
384
612
        if (!filled_boxes.empty()) {
385
612
            fill_range_cache.start_offset = std::get<2>(filled_boxes[0]);
386
612
            fill_range_cache.end_offset = std::get<3>(filled_boxes.back());
387
690
            for (auto& tuple : filled_boxes) {
388
690
                fill_range_cache.ref_box.emplace_back(std::get<0>(tuple));
389
690
                fill_range_cache.box_start_offset.emplace_back(std::get<1>(tuple));
390
690
                fill_range_cache.box_end_offset.emplace_back(
391
690
                        std::get<1>(tuple) + std::get<3>(tuple) - std::get<2>(tuple));
392
690
            }
393
612
            filled_boxes.clear();
394
612
        }
395
612
    }
396
61
    return Status::OK();
397
61
}
398
399
// there exists occasions where the buffer is already closed but
400
// some prior tasks are still queued in thread pool, so we have to check whether
401
// the buffer is closed each time the condition variable is notified.
402
267
void PrefetchBuffer::reset_offset(size_t offset) {
403
267
    {
404
267
        std::unique_lock lck {_lock};
405
267
        if (!_prefetched.wait_for(
406
267
                    lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
407
267
                    [this]() { return _buffer_status != BufferStatus::PENDING; })) {
408
0
            _prefetch_status = Status::TimedOut("time out when reset prefetch buffer");
409
0
            return;
410
0
        }
411
267
        if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
412
0
            _prefetched.notify_all();
413
0
            return;
414
0
        }
415
267
        _buffer_status = BufferStatus::RESET;
416
267
        _offset = offset;
417
267
        _prefetched.notify_all();
418
267
    }
419
267
    if (UNLIKELY(offset >= _file_range.end_offset)) {
420
167
        _len = 0;
421
167
        _exceed = true;
422
167
        return;
423
167
    } else {
424
100
        _exceed = false;
425
100
    }
426
    // Lazy-allocate the backing buffer in the calling (query) thread, which has a
427
    // MemTrackerLimiter attached. The prefetch thread pool threads are "Orphan" threads
428
    // without a tracker, so allocation must not happen there.
429
100
    if (_buf.empty()) {
430
84
        _buf.resize(_size);
431
84
    }
432
100
    _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func(
433
100
            [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); });
434
100
}
435
436
// only this function would run concurrently in another thread
437
100
void PrefetchBuffer::prefetch_buffer() {
438
100
    {
439
100
        std::unique_lock lck {_lock};
440
100
        if (!_prefetched.wait_for(
441
100
                    lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
442
100
                    [this]() {
443
100
                        return _buffer_status == BufferStatus::RESET ||
444
100
                               _buffer_status == BufferStatus::CLOSED;
445
100
                    })) {
446
0
            _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
447
0
            return;
448
0
        }
449
        // in case buffer is already closed
450
100
        if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) {
451
0
            _prefetched.notify_all();
452
0
            return;
453
0
        }
454
100
        _buffer_status = BufferStatus::PENDING;
455
100
        _prefetched.notify_all();
456
100
    }
457
458
0
    int read_range_index = search_read_range(_offset);
459
100
    size_t buf_size;
460
100
    if (read_range_index == -1) {
461
100
        buf_size =
462
100
                _file_range.end_offset - _offset > _size ? _size : _file_range.end_offset - _offset;
463
100
    } else {
464
0
        buf_size = merge_small_ranges(_offset, read_range_index);
465
0
    }
466
467
100
    _len = 0;
468
100
    Status s;
469
470
100
    {
471
100
        SCOPED_RAW_TIMER(&_statis.read_time);
472
100
        s = _reader->read_at(_offset, Slice {_buf.data(), buf_size}, &_len, _io_ctx);
473
100
    }
474
100
    if (UNLIKELY(s.ok() && buf_size != _len)) {
475
        // This indicates that the data size returned by S3 object storage is smaller than what we requested,
476
        // which seems to be a violation of the S3 protocol since our request range was valid.
477
        // We currently consider this situation a bug and will treat this task as a failure.
478
0
        s = Status::InternalError("Data size returned by S3 is smaller than requested");
479
0
        LOG(WARNING) << "Data size returned by S3 is smaller than requested" << _reader->path()
480
0
                     << " request bytes " << buf_size << " returned size " << _len;
481
0
    }
482
100
    g_bytes_downloaded << _len;
483
100
    _statis.prefetch_request_io += 1;
484
100
    _statis.prefetch_request_bytes += _len;
485
100
    std::unique_lock lck {_lock};
486
100
    if (!_prefetched.wait_for(lck,
487
100
                              std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
488
100
                              [this]() { return _buffer_status == BufferStatus::PENDING; })) {
489
0
        _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer");
490
0
        return;
491
0
    }
492
100
    if (!s.ok() && _offset < _reader->size()) {
493
        // We should print the error msg since this buffer might not be accessed by the consumer
494
        // which would result in the status being missed
495
0
        LOG_WARNING("prefetch path {} failed, offset {}, error {}", _reader->path().native(),
496
0
                    _offset, s.to_string());
497
0
        _prefetch_status = std::move(s);
498
0
    }
499
100
    _buffer_status = BufferStatus::PREFETCHED;
500
100
    _prefetched.notify_all();
501
    // eof would come up with len == 0, it would be handled by read_buffer
502
100
}
503
504
100
int PrefetchBuffer::search_read_range(size_t off) const {
505
100
    if (_random_access_ranges == nullptr || _random_access_ranges->empty()) {
506
100
        return -1;
507
100
    }
508
0
    const std::vector<PrefetchRange>& random_access_ranges = *_random_access_ranges;
509
0
    int left = 0, right = cast_set<int>(random_access_ranges.size()) - 1;
510
0
    do {
511
0
        int mid = left + (right - left) / 2;
512
0
        const PrefetchRange& range = random_access_ranges[mid];
513
0
        if (range.start_offset <= off && range.end_offset > off) {
514
0
            return mid;
515
0
        } else if (range.start_offset > off) {
516
0
            right = mid;
517
0
        } else {
518
0
            left = mid + 1;
519
0
        }
520
0
    } while (left < right);
521
0
    if (random_access_ranges[right].start_offset > off) {
522
0
        return right;
523
0
    } else {
524
0
        return -1;
525
0
    }
526
0
}
527
528
0
size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const {
529
0
    if (_random_access_ranges == nullptr || _random_access_ranges->empty()) {
530
0
        return _size;
531
0
    }
532
0
    int64_t remaining = _size;
533
0
    const std::vector<PrefetchRange>& random_access_ranges = *_random_access_ranges;
534
0
    while (remaining > 0 && range_index < random_access_ranges.size()) {
535
0
        const PrefetchRange& range = random_access_ranges[range_index];
536
0
        if (range.start_offset <= off && range.end_offset > off) {
537
0
            remaining -= range.end_offset - off;
538
0
            off = range.end_offset;
539
0
            range_index++;
540
0
        } else if (range.start_offset > off) {
541
            // merge small range
542
0
            size_t hollow = range.start_offset - off;
543
0
            if (hollow < remaining) {
544
0
                remaining -= hollow;
545
0
                off = range.start_offset;
546
0
            } else {
547
0
                break;
548
0
            }
549
0
        } else {
550
0
            DCHECK(false);
551
0
        }
552
0
    }
553
0
    if (remaining < 0 || remaining == _size) {
554
0
        remaining = 0;
555
0
    }
556
0
    return _size - remaining;
557
0
}
558
559
Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len,
560
131
                                   size_t* bytes_read) {
561
131
    if (UNLIKELY(off >= _file_range.end_offset)) {
562
        // Reader can read out of [start_offset, end_offset) by synchronous method.
563
0
        return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx);
564
0
    }
565
131
    if (_exceed) {
566
0
        reset_offset((off / _size) * _size);
567
0
        return read_buffer(off, out, buf_len, bytes_read);
568
0
    }
569
131
    {
570
131
        std::unique_lock lck {_lock};
571
        // buffer must be prefetched or it's closed
572
131
        if (!_prefetched.wait_for(
573
131
                    lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
574
193
                    [this]() {
575
193
                        return _buffer_status == BufferStatus::PREFETCHED ||
576
193
                               _buffer_status == BufferStatus::CLOSED;
577
193
                    })) {
578
0
            _prefetch_status = Status::TimedOut("time out when read prefetch buffer");
579
0
            return _prefetch_status;
580
0
        }
581
131
        if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) {
582
0
            return Status::OK();
583
0
        }
584
131
    }
585
131
    RETURN_IF_ERROR(_prefetch_status);
586
    // there is only parquet would do not sequence read
587
    // it would read the end of the file first
588
131
    if (UNLIKELY(!contains(off))) {
589
0
        reset_offset((off / _size) * _size);
590
0
        return read_buffer(off, out, buf_len, bytes_read);
591
0
    }
592
131
    if (UNLIKELY(0 == _len || _offset + _len < off)) {
593
0
        return Status::OK();
594
0
    }
595
596
131
    {
597
131
        LIMIT_REMOTE_SCAN_IO(bytes_read);
598
        // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has
599
131
        size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off});
600
131
        {
601
131
            SCOPED_RAW_TIMER(&_statis.copy_time);
602
131
            memcpy((void*)out, _buf.data() + (off - _offset), read_len);
603
131
        }
604
131
        *bytes_read = read_len;
605
131
        _statis.request_io += 1;
606
131
        _statis.request_bytes += read_len;
607
131
    }
608
131
    if (off + *bytes_read == _offset + _len) {
609
91
        reset_offset(_offset + _whole_buffer_size);
610
91
    }
611
131
    return Status::OK();
612
131
}
613
614
176
void PrefetchBuffer::close() {
615
176
    std::unique_lock lck {_lock};
616
    // in case _reader still tries to write to the buf after we close the buffer
617
176
    if (!_prefetched.wait_for(lck,
618
176
                              std::chrono::milliseconds(config::buffered_reader_read_timeout_ms),
619
176
                              [this]() { return _buffer_status != BufferStatus::PENDING; })) {
620
0
        _prefetch_status = Status::TimedOut("time out when close prefetch buffer");
621
0
        return;
622
0
    }
623
176
    _buffer_status = BufferStatus::CLOSED;
624
176
    _prefetched.notify_all();
625
    // Explicitly release the backing buffer here, in the calling (query) thread which has a
626
    // MemTrackerLimiter. The destructor may run in the thread pool's Orphan thread (when the
627
    // last shared_ptr ref is released after the prefetch lambda completes), so we must not
628
    // rely on ~PODArray() to release memory — that would trigger memory_orphan_check().
629
176
    PODArray<char>().swap(_buf);
630
176
}
631
632
72
void PrefetchBuffer::_collect_profile_before_close() {
633
72
    if (_sync_profile != nullptr) {
634
72
        _sync_profile(*this);
635
72
    }
636
72
}
637
638
// buffered reader
639
PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
640
                                               PrefetchRange file_range,
641
                                               std::shared_ptr<const IOContext> io_ctx,
642
                                               int64_t buffer_size)
643
44
        : _reader(std::move(reader)), _file_range(file_range), _io_ctx_holder(std::move(io_ctx)) {
644
44
    if (_io_ctx_holder == nullptr) {
645
4
        _io_ctx_holder = std::make_shared<IOContext>();
646
4
    }
647
44
    _io_ctx = _io_ctx_holder.get();
648
44
    if (buffer_size == -1L) {
649
44
        buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
650
44
    }
651
44
    _size = _reader->size();
652
44
    _whole_pre_buffer_size = buffer_size;
653
44
    _file_range.end_offset = std::min(_file_range.end_offset, _size);
654
44
    int buffer_num = buffer_size > s_max_pre_buffer_size
655
44
                             ? cast_set<int>(buffer_size) / cast_set<int>(s_max_pre_buffer_size)
656
44
                             : 1;
657
44
    std::function<void(PrefetchBuffer&)> sync_buffer = nullptr;
658
44
    if (profile != nullptr) {
659
40
        const char* prefetch_buffered_reader = "PrefetchBufferedReader";
660
40
        ADD_TIMER(profile, prefetch_buffered_reader);
661
40
        auto copy_time = ADD_CHILD_TIMER(profile, "CopyTime", prefetch_buffered_reader);
662
40
        auto read_time = ADD_CHILD_TIMER(profile, "ReadTime", prefetch_buffered_reader);
663
40
        auto prefetch_request_io =
664
40
                ADD_CHILD_COUNTER(profile, "PreRequestIO", TUnit::UNIT, prefetch_buffered_reader);
665
40
        auto prefetch_request_bytes = ADD_CHILD_COUNTER(profile, "PreRequestBytes", TUnit::BYTES,
666
40
                                                        prefetch_buffered_reader);
667
40
        auto request_io =
668
40
                ADD_CHILD_COUNTER(profile, "RequestIO", TUnit::UNIT, prefetch_buffered_reader);
669
40
        auto request_bytes =
670
40
                ADD_CHILD_COUNTER(profile, "RequestBytes", TUnit::BYTES, prefetch_buffered_reader);
671
72
        sync_buffer = [=](PrefetchBuffer& buf) {
672
72
            COUNTER_UPDATE(copy_time, buf._statis.copy_time);
673
72
            COUNTER_UPDATE(read_time, buf._statis.read_time);
674
72
            COUNTER_UPDATE(prefetch_request_io, buf._statis.prefetch_request_io);
675
72
            COUNTER_UPDATE(prefetch_request_bytes, buf._statis.prefetch_request_bytes);
676
72
            COUNTER_UPDATE(request_io, buf._statis.request_io);
677
72
            COUNTER_UPDATE(request_bytes, buf._statis.request_bytes);
678
72
        };
679
40
    }
680
    // set the _cur_offset of this reader as same as the inner reader's,
681
    // to make sure the buffer reader will start to read at right position.
682
220
    for (int i = 0; i < buffer_num; i++) {
683
176
        _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>(
684
176
                _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(),
685
176
                _io_ctx_holder, sync_buffer));
686
176
    }
687
44
}
688
689
44
PrefetchBufferedReader::~PrefetchBufferedReader() {
690
    /// Better not to call virtual functions in a destructor.
691
44
    static_cast<void>(_close_internal());
692
44
}
693
694
Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
695
104
                                            const IOContext* io_ctx) {
696
104
    if (!_initialized) {
697
44
        reset_all_buffer(offset);
698
44
        _initialized = true;
699
44
    }
700
104
    if (UNLIKELY(result.get_size() == 0 || offset >= size())) {
701
5
        *bytes_read = 0;
702
5
        return Status::OK();
703
5
    }
704
99
    size_t nbytes = result.get_size();
705
99
    int actual_bytes_read = 0;
706
230
    while (actual_bytes_read < nbytes && offset < size()) {
707
131
        size_t read_num = 0;
708
131
        auto buffer_pos = get_buffer_pos(offset);
709
131
        RETURN_IF_ERROR(
710
131
                _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read,
711
131
                                                      nbytes - actual_bytes_read, &read_num));
712
131
        actual_bytes_read += read_num;
713
131
        offset += read_num;
714
131
    }
715
99
    *bytes_read = actual_bytes_read;
716
99
    return Status::OK();
717
99
}
718
719
6
Status PrefetchBufferedReader::close() {
720
6
    return _close_internal();
721
6
}
722
723
50
Status PrefetchBufferedReader::_close_internal() {
724
50
    if (!_closed) {
725
44
        _closed = true;
726
44
        std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
727
176
                      [](std::shared_ptr<PrefetchBuffer>& buffer) { buffer->close(); });
728
44
        return _reader->close();
729
44
    }
730
731
6
    return Status::OK();
732
50
}
733
734
18
void PrefetchBufferedReader::_collect_profile_before_close() {
735
18
    std::for_each(_pre_buffers.begin(), _pre_buffers.end(),
736
72
                  [](std::shared_ptr<PrefetchBuffer>& buffer) {
737
72
                      buffer->collect_profile_before_close();
738
72
                  });
739
18
    if (_reader != nullptr) {
740
18
        _reader->collect_profile_before_close();
741
18
    }
742
18
}
743
744
// InMemoryFileReader
745
900
InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) {
746
900
    _size = _reader->size();
747
900
}
748
749
902
InMemoryFileReader::~InMemoryFileReader() {
750
902
    static_cast<void>(_close_internal());
751
902
}
752
753
258
Status InMemoryFileReader::close() {
754
258
    return _close_internal();
755
258
}
756
757
1.16k
Status InMemoryFileReader::_close_internal() {
758
1.16k
    if (!_closed) {
759
902
        _closed = true;
760
902
        return _reader->close();
761
902
    }
762
258
    return Status::OK();
763
1.16k
}
764
765
Status InMemoryFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
766
2.68k
                                        const IOContext* io_ctx) {
767
2.68k
    if (_data == nullptr) {
768
901
        _data = std::make_unique_for_overwrite<char[]>(_size);
769
770
901
        size_t file_size = 0;
771
901
        RETURN_IF_ERROR(_reader->read_at(0, Slice(_data.get(), _size), &file_size, io_ctx));
772
901
        DCHECK_EQ(file_size, _size);
773
901
    }
774
2.68k
    if (UNLIKELY(offset > _size)) {
775
0
        return Status::IOError("Out of bounds access");
776
0
    }
777
2.68k
    *bytes_read = std::min(result.size, _size - offset);
778
2.68k
    memcpy(result.data, _data.get() + offset, *bytes_read);
779
2.68k
    return Status::OK();
780
2.68k
}
781
782
295
void InMemoryFileReader::_collect_profile_before_close() {
783
295
    if (_reader != nullptr) {
784
295
        _reader->collect_profile_before_close();
785
295
    }
786
295
}
787
788
// BufferedFileStreamReader
789
BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset,
790
                                                   uint64_t length, size_t max_buf_size)
791
1.21k
        : _file(file),
792
1.21k
          _file_start_offset(offset),
793
1.21k
          _file_end_offset(offset + length),
794
1.21k
          _max_buf_size(max_buf_size) {}
795
796
Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset,
797
4.29k
                                            const size_t bytes_to_read, const IOContext* io_ctx) {
798
4.29k
    if (offset < _file_start_offset || offset >= _file_end_offset ||
799
4.29k
        offset + bytes_to_read > _file_end_offset) {
800
0
        return Status::IOError(
801
0
                "Out-of-bounds Access: offset={}, bytes_to_read={}, file_start={}, "
802
0
                "file_end={}",
803
0
                offset, bytes_to_read, _file_start_offset, _file_end_offset);
804
0
    }
805
4.29k
    int64_t end_offset = offset + bytes_to_read;
806
4.29k
    if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) {
807
2.53k
        *buf = _buf.get() + offset - _buf_start_offset;
808
2.53k
        return Status::OK();
809
2.53k
    }
810
1.76k
    size_t buf_size = std::max(_max_buf_size, bytes_to_read);
811
1.76k
    if (_buf_size < buf_size) {
812
1.42k
        auto new_buf = make_unique_buffer<uint8_t>(buf_size);
813
1.42k
        if (offset >= _buf_start_offset && offset < _buf_end_offset) {
814
304
            memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset,
815
304
                   _buf_end_offset - offset);
816
304
        }
817
1.42k
        _buf = std::move(new_buf);
818
1.42k
        _buf_size = buf_size;
819
1.42k
    } else if (offset > _buf_start_offset && offset < _buf_end_offset) {
820
105
        memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset);
821
105
    }
822
1.76k
    if (offset < _buf_start_offset || offset >= _buf_end_offset) {
823
1.35k
        _buf_end_offset = offset;
824
1.35k
    }
825
1.76k
    _buf_start_offset = offset;
826
1.76k
    int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
827
1.76k
    int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset);
828
1.76k
    int64_t has_read = 0;
829
3.52k
    while (has_read < to_read) {
830
1.76k
        size_t loop_read = 0;
831
1.76k
        Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read);
832
1.76k
        RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx));
833
1.76k
        if (loop_read == 0) {
834
0
            break;
835
0
        }
836
1.76k
        has_read += loop_read;
837
1.76k
    }
838
1.76k
    if (has_read != to_read) {
839
0
        return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read);
840
0
    }
841
1.76k
    _buf_end_offset += to_read;
842
1.76k
    *buf = _buf.get();
843
1.76k
    return Status::OK();
844
1.76k
}
845
846
Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset,
847
2.14k
                                            const IOContext* io_ctx) {
848
2.14k
    return read_bytes((const uint8_t**)&slice.data, offset, slice.size, io_ctx);
849
2.14k
}
850
851
Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
852
        RuntimeProfile* profile, const FileSystemProperties& system_properties,
853
        const FileDescription& file_description, const io::FileReaderOptions& reader_options,
854
747
        AccessMode access_mode, const IOContext* io_ctx, const PrefetchRange file_range) {
855
747
    std::shared_ptr<const IOContext> io_ctx_holder;
856
747
    if (io_ctx != nullptr) {
857
        // Old API: best-effort safety by copying the IOContext onto the heap.
858
706
        io_ctx_holder = std::make_shared<IOContext>(*io_ctx);
859
706
    }
860
747
    return create_file_reader(profile, system_properties, file_description, reader_options,
861
747
                              access_mode, std::move(io_ctx_holder), file_range);
862
747
}
863
864
Result<io::FileReaderSPtr> DelegateReader::create_file_reader(
865
        RuntimeProfile* profile, const FileSystemProperties& system_properties,
866
        const FileDescription& file_description, const io::FileReaderOptions& reader_options,
867
        AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
868
1.11k
        const PrefetchRange file_range) {
869
1.11k
    if (io_ctx == nullptr) {
870
41
        io_ctx = std::make_shared<IOContext>();
871
41
    }
872
1.11k
    return FileFactory::create_file_reader(system_properties, file_description, reader_options,
873
1.11k
                                           profile)
874
1.11k
            .transform([&](auto&& reader) -> io::FileReaderSPtr {
875
1.11k
                if (reader->size() < config::in_memory_file_size &&
876
1.11k
                    typeid_cast<io::S3FileReader*>(reader.get())) {
877
901
                    return std::make_shared<InMemoryFileReader>(std::move(reader));
878
901
                }
879
880
218
                if (access_mode == AccessMode::SEQUENTIAL) {
881
78
                    bool is_thread_safe = false;
882
78
                    if (typeid_cast<io::S3FileReader*>(reader.get())) {
883
40
                        is_thread_safe = true;
884
40
                    } else if (auto* cached_reader =
885
38
                                       typeid_cast<io::CachedRemoteFileReader*>(reader.get());
886
38
                               cached_reader &&
887
38
                               typeid_cast<io::S3FileReader*>(cached_reader->get_remote_reader())) {
888
0
                        is_thread_safe = true;
889
0
                    }
890
78
                    if (is_thread_safe) {
891
                        // PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently.
892
40
                        return std::make_shared<io::PrefetchBufferedReader>(
893
40
                                profile, std::move(reader), file_range, io_ctx);
894
40
                    }
895
78
                }
896
897
178
                return reader;
898
218
            });
899
1.11k
}
900
901
Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset,
902
6
                                             io::PrefetchRange& result_range) {
903
9
    while (index < _ranges.size()) {
904
9
        io::PrefetchRange& range = _ranges[index];
905
9
        if (range.end_offset > desired_offset) {
906
6
            if (range.start_offset > desired_offset) [[unlikely]] {
907
0
                return Status::InvalidArgument("Invalid desiredOffset");
908
0
            }
909
6
            result_range = range;
910
6
            return Status::OK();
911
6
        }
912
3
        ++index;
913
3
    }
914
0
    return Status::InvalidArgument("Invalid desiredOffset");
915
6
}
916
917
RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
918
                                           std::shared_ptr<RangeFinder> range_finder)
919
104
        : _profile(profile),
920
104
          _inner_reader(std::move(inner_reader)),
921
104
          _range_finder(std::move(range_finder)) {
922
104
    _size = _inner_reader->size();
923
104
    uint64_t max_cache_size =
924
104
            std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size());
925
104
    _cache = OwnedSlice(max_cache_size);
926
927
104
    if (_profile != nullptr) {
928
101
        const char* random_profile = "RangeCacheFileReader";
929
101
        ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
930
101
        _request_io =
931
101
                ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1);
932
101
        _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
933
101
                                                      random_profile, 1);
934
101
        _request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1);
935
101
        _read_to_cache_time =
936
101
                ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1);
937
101
        _cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount",
938
101
                                                            TUnit::UNIT, random_profile, 1);
939
101
        _read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes",
940
101
                                                            TUnit::BYTES, random_profile, 1);
941
101
    }
942
104
}
943
944
Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
945
6
                                          const IOContext* io_ctx) {
946
6
    auto request_size = result.size;
947
948
6
    _cache_statistics.request_io++;
949
6
    _cache_statistics.request_bytes += request_size;
950
6
    SCOPED_RAW_TIMER(&_cache_statistics.request_time);
951
952
6
    PrefetchRange range;
953
6
    if (_range_finder->get_range_for(offset, range)) [[likely]] {
954
6
        if (_current_start_offset != range.start_offset) { // need read new range to cache.
955
6
            auto range_size = range.end_offset - range.start_offset;
956
957
6
            _cache_statistics.cache_refresh_count++;
958
6
            _cache_statistics.read_to_cache_bytes += range_size;
959
6
            SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time);
960
961
6
            Slice cache_slice = {_cache.data(), range_size};
962
6
            RETURN_IF_ERROR(
963
6
                    _inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx));
964
965
6
            if (*bytes_read != range_size) [[unlikely]] {
966
0
                return Status::InternalError(
967
0
                        "RangeCacheFileReader use inner reader read bytes {} not eq expect size {}",
968
0
                        *bytes_read, range_size);
969
0
            }
970
971
6
            _current_start_offset = range.start_offset;
972
6
        }
973
974
6
        int64_t buffer_offset = offset - _current_start_offset;
975
6
        memcpy(result.data, _cache.data() + buffer_offset, request_size);
976
6
        *bytes_read = request_size;
977
978
6
        return Status::OK();
979
6
    } else {
980
0
        return Status::InternalError("RangeCacheFileReader read  not in Ranges. Offset = {}",
981
0
                                     offset);
982
        //                RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx));
983
        //                return Status::OK();
984
        // think return error is ok,otherwise it will cover up the error.
985
0
    }
986
6
}
987
988
101
void RangeCacheFileReader::_collect_profile_before_close() {
989
101
    if (_profile != nullptr) {
990
101
        COUNTER_UPDATE(_request_io, _cache_statistics.request_io);
991
101
        COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes);
992
101
        COUNTER_UPDATE(_request_time, _cache_statistics.request_time);
993
101
        COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time);
994
101
        COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count);
995
101
        COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes);
996
101
        if (_inner_reader != nullptr) {
997
101
            _inner_reader->collect_profile_before_close();
998
101
        }
999
101
    }
1000
101
}
1001
1002
} // namespace io
1003
1004
#include "common/compile_check_end.h"
1005
1006
} // namespace doris