Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/buffered_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <condition_variable>
24
#include <memory>
25
#include <mutex>
26
#include <string>
27
#include <utility>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "core/custom_allocator.h"
32
#include "core/typeid_cast.h"
33
#include "io/cache/cached_remote_file_reader.h"
34
#include "io/file_factory.h"
35
#include "io/fs/broker_file_reader.h"
36
#include "io/fs/file_reader.h"
37
#include "io/fs/path.h"
38
#include "io/fs/s3_file_reader.h"
39
#include "io/io_common.h"
40
#include "runtime/runtime_profile.h"
41
#include "storage/olap_define.h"
42
#include "util/slice.h"
43
namespace doris {
44
45
#include "common/compile_check_begin.h"
46
47
namespace io {
48
49
class FileSystem;
50
51
struct PrefetchRange {
52
    size_t start_offset;
53
    size_t end_offset;
54
55
    PrefetchRange(size_t start_offset, size_t end_offset)
56
4.42k
            : start_offset(start_offset), end_offset(end_offset) {}
57
58
6
    PrefetchRange() : start_offset(0), end_offset(0) {}
59
60
    bool operator==(const PrefetchRange& other) const {
61
        return (start_offset == other.start_offset) && (end_offset == other.end_offset);
62
    }
63
64
0
    bool operator!=(const PrefetchRange& other) const { return !(*this == other); }
65
66
0
    PrefetchRange span(const PrefetchRange& other) const {
67
0
        return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
68
0
    }
69
1.25k
    PrefetchRange seq_span(const PrefetchRange& other) const {
70
1.25k
        return {start_offset, other.end_offset};
71
1.25k
    }
72
73
    //Ranges needs to be sorted.
74
    static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
75
            const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
76
139
            int64_t once_max_read_bytes) {
77
139
        if (seq_ranges.empty()) {
78
0
            return {};
79
0
        }
80
        // Merge overlapping ranges
81
139
        std::vector<PrefetchRange> result;
82
139
        PrefetchRange last = seq_ranges.front();
83
1.38k
        for (size_t i = 1; i < seq_ranges.size(); ++i) {
84
1.25k
            PrefetchRange current = seq_ranges[i];
85
1.25k
            PrefetchRange merged = last.seq_span(current);
86
1.25k
            if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
87
1.25k
                last.end_offset + max_merge_distance_bytes >= current.start_offset) {
88
1.24k
                last = merged;
89
1.24k
            } else {
90
8
                result.push_back(last);
91
8
                last = current;
92
8
            }
93
1.25k
        }
94
139
        result.push_back(last);
95
139
        return result;
96
139
    }
97
};
98
99
class RangeFinder {
100
public:
101
104
    virtual ~RangeFinder() = default;
102
    virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
103
    virtual size_t get_max_range_size() const = 0;
104
};
105
106
class LinearProbeRangeFinder : public RangeFinder {
107
public:
108
104
    LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}
109
110
    Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;
111
112
104
    size_t get_max_range_size() const override {
113
104
        size_t max_range_size = 0;
114
107
        for (const auto& range : _ranges) {
115
107
            max_range_size = std::max(max_range_size, range.end_offset - range.start_offset);
116
107
        }
117
104
        return max_range_size;
118
104
    }
119
120
104
    ~LinearProbeRangeFinder() override = default;
121
122
private:
123
    std::vector<io::PrefetchRange> _ranges;
124
    size_t index {0};
125
};
126
127
/**
128
 * The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
129
 * For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
130
 * I first merge the access to the orc files to be read (of course there is a problem of read amplification,
131
 * but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
132
 * and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
133
 */
134
class RangeCacheFileReader : public io::FileReader {
135
    struct RangeCacheReaderStatistics {
136
        int64_t request_io = 0;
137
        int64_t request_bytes = 0;
138
        int64_t request_time = 0;
139
        int64_t read_to_cache_time = 0;
140
        int64_t cache_refresh_count = 0;
141
        int64_t read_to_cache_bytes = 0;
142
    };
143
144
public:
145
    RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
146
                         std::shared_ptr<RangeFinder> range_finder);
147
148
104
    ~RangeCacheFileReader() override = default;
149
150
3
    Status close() override {
151
3
        if (!_closed) {
152
3
            _closed = true;
153
3
        }
154
3
        return Status::OK();
155
3
    }
156
157
0
    const io::Path& path() const override { return _inner_reader->path(); }
158
159
0
    size_t size() const override { return _size; }
160
161
0
    bool closed() const override { return _closed; }
162
163
0
    int64_t mtime() const override { return _inner_reader->mtime(); }
164
165
protected:
166
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
167
                        const IOContext* io_ctx) override;
168
169
    void _collect_profile_before_close() override;
170
171
private:
172
    RuntimeProfile* _profile = nullptr;
173
    io::FileReaderSPtr _inner_reader;
174
    std::shared_ptr<RangeFinder> _range_finder;
175
176
    OwnedSlice _cache;
177
    int64_t _current_start_offset = -1;
178
179
    size_t _size;
180
    bool _closed = false;
181
182
    RuntimeProfile::Counter* _request_io = nullptr;
183
    RuntimeProfile::Counter* _request_bytes = nullptr;
184
    RuntimeProfile::Counter* _request_time = nullptr;
185
    RuntimeProfile::Counter* _read_to_cache_time = nullptr;
186
    RuntimeProfile::Counter* _cache_refresh_count = nullptr;
187
    RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
188
    RangeCacheReaderStatistics _cache_statistics;
189
    /**
190
     * `RangeCacheFileReader`:
191
     *   1. `CacheRefreshCount`: how many IOs are merged
192
     *   2. `ReadToCacheBytes`: how much data is actually read after merging
193
     *   3. `ReadToCacheTime`: how long it takes to read data after merging
194
     *   4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
195
     *   5. `RequestIO`: how many times the apache-orc library calls this read interface
196
     *   6. `RequestTime`: how long it takes the apache-orc library to call this read interface
197
     *
198
     *   It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
199
     *   the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
200
     *  because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
201
     */
202
};
203
204
/**
205
 * A FileReader that efficiently supports random access format like parquet and orc.
206
 * In order to merge small IO in parquet and orc, the random access ranges should be generated
207
 * when creating the reader. The random access ranges is a list of ranges that order by offset.
208
 * The range in random access ranges should be reading sequentially, can be skipped, but can't be
209
 * read repeatedly. When calling read_at, if the start offset located in random access ranges, the
210
 * slice size should not span two ranges.
211
 *
212
 * For example, in parquet, the random access ranges is the column offsets in a row group.
213
 *
214
 * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader
215
 * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges
216
 * into small buffers(name as box, default 1MB, 128MB in total). A box can be occupied by many ranges,
217
 * and use a reference counter to record how many ranges are cached in the box. If reference counter
218
 * equals zero, the box can be release or reused by other ranges. When there is no empty box for a new
219
 * read operation, the read operation will do directly.
220
 */
221
class MergeRangeFileReader : public io::FileReader {
222
public:
223
    struct Statistics {
224
        int64_t copy_time = 0;
225
        int64_t read_time = 0;
226
        int64_t request_io = 0;
227
        int64_t merged_io = 0;
228
        int64_t request_bytes = 0;
229
        int64_t merged_bytes = 0;
230
    };
231
232
    struct RangeCachedData {
233
        size_t start_offset;
234
        size_t end_offset;
235
        std::vector<int16_t> ref_box;
236
        std::vector<uint32_t> box_start_offset;
237
        std::vector<uint32_t> box_end_offset;
238
        bool has_read = false;
239
240
        RangeCachedData(size_t start_offset, size_t end_offset)
241
0
                : start_offset(start_offset), end_offset(end_offset) {}
242
243
679
        RangeCachedData() : start_offset(0), end_offset(0) {}
244
245
1.25k
        bool empty() const { return start_offset == end_offset; }
246
247
1.25k
        bool contains(size_t offset) const { return start_offset <= offset && offset < end_offset; }
248
249
1.16k
        void reset() {
250
1.16k
            start_offset = 0;
251
1.16k
            end_offset = 0;
252
1.16k
            ref_box.clear();
253
1.16k
            box_start_offset.clear();
254
1.16k
            box_end_offset.clear();
255
1.16k
        }
256
257
0
        int16_t release_last_box() {
258
0
            // we can only release the last referenced box to ensure sequential read in range
259
0
            if (!empty()) {
260
0
                int16_t last_box_ref = ref_box.back();
261
0
                uint32_t released_size = box_end_offset.back() - box_start_offset.back();
262
0
                ref_box.pop_back();
263
0
                box_start_offset.pop_back();
264
0
                box_end_offset.pop_back();
265
0
                end_offset -= released_size;
266
0
                if (empty()) {
267
0
                    reset();
268
0
                }
269
0
                return last_box_ref;
270
0
            }
271
0
            return -1;
272
0
        }
273
    };
274
275
    static constexpr size_t TOTAL_BUFFER_SIZE = 128 * 1024 * 1024;  // 128MB
276
    static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024;      // 8MB
277
    static constexpr size_t BOX_SIZE = 1 * 1024 * 1024;             // 1MB
278
    static constexpr size_t SMALL_IO = 2 * 1024 * 1024;             // 2MB
279
    static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
280
281
    MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
282
                         const std::vector<PrefetchRange>& random_access_ranges,
283
                         int64_t merge_read_slice_size = READ_SLICE_SIZE)
284
62
            : _profile(profile),
285
62
              _reader(std::move(reader)),
286
62
              _random_access_ranges(random_access_ranges) {
287
62
        _range_cached_data.resize(random_access_ranges.size());
288
62
        _size = _reader->size();
289
62
        _remaining = TOTAL_BUFFER_SIZE;
290
62
        _is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
291
62
        _max_amplified_ratio = config::max_amplified_read_ratio;
292
        // Equivalent min size of each IO that can reach the maximum storage speed limit:
293
        // 1MB for oss, 8KB for hdfs
294
62
        _equivalent_io_size =
295
62
                _is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size;
296
297
62
        _merged_read_slice_size = merge_read_slice_size;
298
299
62
        if (_merged_read_slice_size < 0) {
300
10
            _merged_read_slice_size = READ_SLICE_SIZE;
301
10
        }
302
303
62
        if (_profile != nullptr) {
304
50
            const char* random_profile = "MergedSmallIO";
305
50
            ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
306
50
            _copy_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CopyTime", random_profile, 1);
307
50
            _read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadTime", random_profile, 1);
308
50
            _request_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT,
309
50
                                                       random_profile, 1);
310
50
            _merged_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO", TUnit::UNIT,
311
50
                                                      random_profile, 1);
312
50
            _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
313
50
                                                          random_profile, 1);
314
50
            _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES,
315
50
                                                         random_profile, 1);
316
50
        }
317
62
    }
318
319
62
    ~MergeRangeFileReader() override = default;
320
321
0
    Status close() override {
322
0
        if (!_closed) {
323
0
            _closed = true;
324
0
        }
325
0
        return Status::OK();
326
0
    }
327
328
642
    const io::Path& path() const override { return _reader->path(); }
329
330
0
    size_t size() const override { return _size; }
331
332
0
    bool closed() const override { return _closed; }
333
334
642
    int64_t mtime() const override { return _reader->mtime(); }
335
336
    // for test only
337
    size_t buffer_remaining() const { return _remaining; }
338
339
    // for test only
340
    const std::vector<RangeCachedData>& range_cached_data() const { return _range_cached_data; }
341
342
    // for test only
343
    const std::vector<int16_t>& box_reference() const { return _box_ref; }
344
345
    // for test only
346
    const Statistics& statistics() const { return _statistics; }
347
348
protected:
349
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
350
                        const IOContext* io_ctx) override;
351
352
48
    void _collect_profile_before_close() override {
353
48
        if (_profile != nullptr) {
354
48
            COUNTER_UPDATE(_copy_time, _statistics.copy_time);
355
48
            COUNTER_UPDATE(_read_time, _statistics.read_time);
356
48
            COUNTER_UPDATE(_request_io, _statistics.request_io);
357
48
            COUNTER_UPDATE(_merged_io, _statistics.merged_io);
358
48
            COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
359
48
            COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
360
48
            if (_reader != nullptr) {
361
48
                _reader->collect_profile_before_close();
362
48
            }
363
48
        }
364
48
    }
365
366
private:
367
    RuntimeProfile::Counter* _copy_time = nullptr;
368
    RuntimeProfile::Counter* _read_time = nullptr;
369
    RuntimeProfile::Counter* _request_io = nullptr;
370
    RuntimeProfile::Counter* _merged_io = nullptr;
371
    RuntimeProfile::Counter* _request_bytes = nullptr;
372
    RuntimeProfile::Counter* _merged_bytes = nullptr;
373
374
    int _search_read_range(size_t start_offset, size_t end_offset);
375
    void _clean_cached_data(RangeCachedData& cached_data);
376
    void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice result,
377
                      size_t* bytes_read);
378
    Status _fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read,
379
                     const IOContext* io_ctx);
380
    void _dec_box_ref(int16_t box_index);
381
382
    RuntimeProfile* _profile = nullptr;
383
    io::FileReaderSPtr _reader;
384
    const std::vector<PrefetchRange> _random_access_ranges;
385
    std::vector<RangeCachedData> _range_cached_data;
386
    size_t _size;
387
    bool _closed = false;
388
    size_t _remaining;
389
390
    std::unique_ptr<OwnedSlice> _read_slice;
391
    std::vector<OwnedSlice> _boxes;
392
    int16_t _last_box_ref = -1;
393
    uint32_t _last_box_usage = 0;
394
    std::vector<int16_t> _box_ref;
395
    bool _is_oss;
396
    double _max_amplified_ratio;
397
    size_t _equivalent_io_size;
398
    int64_t _merged_read_slice_size;
399
400
    Statistics _statistics;
401
};
402
403
/**
404
 * Create a file reader suitable for accessing scenarios:
405
 * 1. When file size < config::in_memory_file_size, create InMemoryFileReader file reader
406
 * 2. When reading sequential file(csv/json), create PrefetchBufferedReader
407
 * 3. When reading random access file(parquet/orc), create normal file reader
408
 */
409
class DelegateReader {
410
public:
411
    enum AccessMode { SEQUENTIAL, RANDOM };
412
413
    static Result<io::FileReaderSPtr> create_file_reader(
414
            RuntimeProfile* profile, const FileSystemProperties& system_properties,
415
            const FileDescription& file_description, const io::FileReaderOptions& reader_options,
416
            AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr,
417
            const PrefetchRange file_range = PrefetchRange(0, 0));
418
419
    static Result<io::FileReaderSPtr> create_file_reader(
420
            RuntimeProfile* profile, const FileSystemProperties& system_properties,
421
            const FileDescription& file_description, const io::FileReaderOptions& reader_options,
422
            AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
423
            const PrefetchRange file_range = PrefetchRange(0, 0));
424
};
425
426
class PrefetchBufferedReader;
427
struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public ProfileCollector {
428
    enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
429
430
    PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size,
431
                   io::FileReader* reader, std::shared_ptr<const IOContext> io_ctx,
432
                   std::function<void(PrefetchBuffer&)> sync_profile)
433
176
            : _file_range(file_range),
434
176
              _size(buffer_size),
435
176
              _whole_buffer_size(whole_buffer_size),
436
176
              _reader(reader),
437
176
              _io_ctx_holder(std::move(io_ctx)),
438
176
              _io_ctx(_io_ctx_holder.get()),
439
176
              _buf(new char[buffer_size]),
440
176
              _sync_profile(std::move(sync_profile)) {}
441
442
    PrefetchBuffer(PrefetchBuffer&& other)
443
            : _offset(other._offset),
444
              _file_range(other._file_range),
445
              _random_access_ranges(other._random_access_ranges),
446
              _size(other._size),
447
              _whole_buffer_size(other._whole_buffer_size),
448
              _reader(other._reader),
449
              _io_ctx_holder(std::move(other._io_ctx_holder)),
450
              _io_ctx(_io_ctx_holder.get()),
451
              _buf(std::move(other._buf)),
452
0
              _sync_profile(std::move(other._sync_profile)) {}
453
454
176
    ~PrefetchBuffer() = default;
455
456
    size_t _offset {0};
457
    // [start_offset, end_offset) is the range that can be prefetched.
458
    // Notice that the reader can read out of [start_offset, end_offset), because FE does not align the file
459
    // according to the format when splitting it.
460
    const PrefetchRange _file_range;
461
    const std::vector<PrefetchRange>* _random_access_ranges = nullptr;
462
    size_t _size {0};
463
    size_t _len {0};
464
    size_t _whole_buffer_size;
465
    io::FileReader* _reader = nullptr;
466
    std::shared_ptr<const IOContext> _io_ctx_holder;
467
    const IOContext* _io_ctx = nullptr;
468
    std::unique_ptr<char[]> _buf;
469
    BufferStatus _buffer_status {BufferStatus::RESET};
470
    std::mutex _lock;
471
    std::condition_variable _prefetched;
472
    Status _prefetch_status {Status::OK()};
473
    std::atomic_bool _exceed = false;
474
    std::function<void(PrefetchBuffer&)> _sync_profile;
475
    struct Statistics {
476
        int64_t copy_time {0};
477
        int64_t read_time {0};
478
        int64_t prefetch_request_io {0};
479
        int64_t prefetch_request_bytes {0};
480
        int64_t request_io {0};
481
        int64_t request_bytes {0};
482
    };
483
    Statistics _statis;
484
485
    // @brief: reset the start offset of this buffer to offset
486
    // @param: the new start offset for this buffer
487
    void reset_offset(size_t offset);
488
    // @brief: start to fetch the content between [_offset, _offset + _size)
489
    void prefetch_buffer();
490
    // @brief: used by BufferedReader to read the prefetched data
491
    // @param[off] read start address
492
    // @param[buf] buffer to put the actual content
493
    // @param[buf_len] maximum len trying to read
494
    // @param[bytes_read] actual bytes read
495
    Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read);
496
    // @brief: shut down the buffer until the prior prefetching task is done
497
    void close();
498
    // @brief: to detect whether this buffer contains off
499
    // @param[off] detect offset
500
131
    bool inline contains(size_t off) const { return _offset <= off && off < _offset + _size; }
501
502
0
    void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
503
0
        _random_access_ranges = random_access_ranges;
504
0
    }
505
506
    // binary search the last prefetch buffer that larger or include the offset
507
    int search_read_range(size_t off) const;
508
509
    size_t merge_small_ranges(size_t off, int range_index) const;
510
511
0
    void _collect_profile_at_runtime() override {}
512
513
    void _collect_profile_before_close() override;
514
};
515
516
constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB
517
518
/**
519
 * A buffered reader that prefetch data in the daemon thread pool.
520
 *
521
 * file_range is the range that the file is read.
522
 * random_access_ranges are the column ranges in format, like orc and parquet.
523
 *
524
 * When random_access_ranges is empty:
525
 * The data is prefetched sequentially until the underlying buffers(4 * 4M as default) are full.
526
 * When a buffer is read out, it will fetch data backward in daemon, so the underlying reader should be
527
 * thread-safe, and the access mode of data needs to be sequential.
528
 *
529
 * When random_access_ranges is not empty:
530
 * The data is prefetched order by the random_access_ranges. If some adjacent ranges is small, the underlying reader
531
 * will merge them.
532
 */
533
class PrefetchBufferedReader final : public io::FileReader {
534
public:
535
    PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
536
                           PrefetchRange file_range,
537
                           std::shared_ptr<const IOContext> io_ctx = nullptr,
538
                           int64_t buffer_size = -1L);
539
    ~PrefetchBufferedReader() override;
540
541
    Status close() override;
542
543
0
    const io::Path& path() const override { return _reader->path(); }
544
545
282
    size_t size() const override { return _size; }
546
547
0
    bool closed() const override { return _closed; }
548
549
0
    int64_t mtime() const override { return _reader->mtime(); }
550
551
0
    void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
552
0
        _random_access_ranges = random_access_ranges;
553
0
        for (auto& _pre_buffer : _pre_buffers) {
554
0
            _pre_buffer->set_random_access_ranges(random_access_ranges);
555
0
        }
556
0
    }
557
558
protected:
559
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
560
                        const IOContext* io_ctx) override;
561
562
    void _collect_profile_before_close() override;
563
564
private:
565
    Status _close_internal();
566
307
    size_t get_buffer_pos(int64_t position) const {
567
307
        return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size;
568
307
    }
569
176
    size_t get_buffer_offset(int64_t position) const {
570
176
        return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size;
571
176
    }
572
44
    void reset_all_buffer(size_t position) {
573
220
        for (int64_t i = 0; i < _pre_buffers.size(); i++) {
574
176
            int64_t cur_pos = position + i * s_max_pre_buffer_size;
575
176
            size_t cur_buf_pos = get_buffer_pos(cur_pos);
576
            // reset would do all the prefetch work
577
176
            _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos));
578
176
        }
579
44
    }
580
581
    io::FileReaderSPtr _reader;
582
    PrefetchRange _file_range;
583
    const std::vector<PrefetchRange>* _random_access_ranges = nullptr;
584
    std::shared_ptr<const IOContext> _io_ctx_holder;
585
    const IOContext* _io_ctx = nullptr;
586
    std::vector<std::shared_ptr<PrefetchBuffer>> _pre_buffers;
587
    int64_t _whole_pre_buffer_size;
588
    bool _initialized = false;
589
    bool _closed = false;
590
    size_t _size;
591
};
592
593
/**
594
 * A file reader that read the whole file into memory.
595
 * When a file is small(<8MB), InMemoryFileReader can effectively reduce the number of file accesses
596
 * and greatly improve the access speed of small files.
597
 */
598
class InMemoryFileReader final : public io::FileReader {
599
public:
600
    InMemoryFileReader(io::FileReaderSPtr reader);
601
602
    ~InMemoryFileReader() override;
603
604
    Status close() override;
605
606
635
    const io::Path& path() const override { return _reader->path(); }
607
608
1.36k
    size_t size() const override { return _size; }
609
610
0
    bool closed() const override { return _closed; }
611
612
458
    int64_t mtime() const override { return _reader->mtime(); }
613
614
protected:
615
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
616
                        const IOContext* io_ctx) override;
617
618
    void _collect_profile_before_close() override;
619
620
private:
621
    Status _close_internal();
622
    io::FileReaderSPtr _reader;
623
    std::unique_ptr<char[]> _data;
624
    size_t _size;
625
    bool _closed = false;
626
};
627
628
/**
629
 * Load all the needed data in underlying buffer, so the caller does not need to prepare the data container.
630
 */
631
class BufferedStreamReader {
632
public:
633
    /**
634
     * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read)
635
     * @param buf the buffer address to save the start address of data
636
     * @param offset start offset ot read in stream
637
     * @param bytes_to_read bytes to read
638
     */
639
    virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read,
640
                              const IOContext* io_ctx) = 0;
641
    /**
642
     * Save the data address to slice.data, and the slice.size is the bytes to read.
643
     */
644
    virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0;
645
1.14k
    virtual ~BufferedStreamReader() = default;
646
    // return the file path
647
    virtual std::string path() = 0;
648
649
    virtual int64_t mtime() const = 0;
650
};
651
652
class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
653
public:
654
    BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length,
655
                             size_t max_buf_size);
656
1.13k
    ~BufferedFileStreamReader() override = default;
657
658
    Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read,
659
                      const IOContext* io_ctx) override;
660
    Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
661
1.13k
    std::string path() override { return _file->path(); }
662
663
1.13k
    int64_t mtime() const override { return _file->mtime(); }
664
665
protected:
666
0
    void _collect_profile_before_close() override {
667
0
        if (_file != nullptr) {
668
0
            _file->collect_profile_before_close();
669
0
        }
670
0
    }
671
672
private:
673
    DorisUniqueBufferPtr<uint8_t> _buf;
674
    io::FileReaderSPtr _file;
675
    uint64_t _file_start_offset;
676
    uint64_t _file_end_offset;
677
678
    uint64_t _buf_start_offset = 0;
679
    uint64_t _buf_end_offset = 0;
680
    size_t _buf_size = 0;
681
    size_t _max_buf_size;
682
};
683
684
} // namespace io
685
686
#include "common/compile_check_end.h"
687
688
} // namespace doris