Coverage Report

Created: 2026-04-10 12:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/buffered_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stddef.h>
21
#include <stdint.h>
22
23
#include <condition_variable>
24
#include <memory>
25
#include <mutex>
26
#include <string>
27
#include <utility>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "core/custom_allocator.h"
32
#include "core/pod_array.h"
33
#include "core/typeid_cast.h"
34
#include "io/cache/cached_remote_file_reader.h"
35
#include "io/file_factory.h"
36
#include "io/fs/broker_file_reader.h"
37
#include "io/fs/file_reader.h"
38
#include "io/fs/path.h"
39
#include "io/fs/s3_file_reader.h"
40
#include "io/io_common.h"
41
#include "runtime/runtime_profile.h"
42
#include "storage/olap_define.h"
43
#include "util/slice.h"
44
namespace doris {
45
46
namespace io {
47
48
class FileSystem;
49
50
struct PrefetchRange {
51
    size_t start_offset;
52
    size_t end_offset;
53
54
    PrefetchRange(size_t start_offset, size_t end_offset)
55
508k
            : start_offset(start_offset), end_offset(end_offset) {}
56
57
6
    PrefetchRange() : start_offset(0), end_offset(0) {}
58
59
    bool operator==(const PrefetchRange& other) const {
60
        return (start_offset == other.start_offset) && (end_offset == other.end_offset);
61
    }
62
63
0
    bool operator!=(const PrefetchRange& other) const { return !(*this == other); }
64
65
0
    PrefetchRange span(const PrefetchRange& other) const {
66
0
        return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)};
67
0
    }
68
147k
    PrefetchRange seq_span(const PrefetchRange& other) const {
69
147k
        return {start_offset, other.end_offset};
70
147k
    }
71
72
    //Ranges needs to be sorted.
73
    static std::vector<PrefetchRange> merge_adjacent_seq_ranges(
74
            const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes,
75
38.5k
            int64_t once_max_read_bytes) {
76
38.5k
        if (seq_ranges.empty()) {
77
1.44k
            return {};
78
1.44k
        }
79
        // Merge overlapping ranges
80
37.0k
        std::vector<PrefetchRange> result;
81
37.0k
        PrefetchRange last = seq_ranges.front();
82
184k
        for (size_t i = 1; i < seq_ranges.size(); ++i) {
83
147k
            PrefetchRange current = seq_ranges[i];
84
147k
            PrefetchRange merged = last.seq_span(current);
85
147k
            if (merged.end_offset <= once_max_read_bytes + merged.start_offset &&
86
147k
                last.end_offset + max_merge_distance_bytes >= current.start_offset) {
87
142k
                last = merged;
88
142k
            } else {
89
4.65k
                result.push_back(last);
90
4.65k
                last = current;
91
4.65k
            }
92
147k
        }
93
37.0k
        result.push_back(last);
94
37.0k
        return result;
95
38.5k
    }
96
};
97
98
class RangeFinder {
99
public:
100
23.3k
    virtual ~RangeFinder() = default;
101
    virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0;
102
    virtual size_t get_max_range_size() const = 0;
103
};
104
105
class LinearProbeRangeFinder : public RangeFinder {
106
public:
107
23.1k
    LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {}
108
109
    Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override;
110
111
23.2k
    size_t get_max_range_size() const override {
112
23.2k
        size_t max_range_size = 0;
113
25.0k
        for (const auto& range : _ranges) {
114
25.0k
            max_range_size = std::max(max_range_size, range.end_offset - range.start_offset);
115
25.0k
        }
116
23.2k
        return max_range_size;
117
23.2k
    }
118
119
23.3k
    ~LinearProbeRangeFinder() override = default;
120
121
private:
122
    std::vector<io::PrefetchRange> _ranges;
123
    size_t index {0};
124
};
125
126
/**
127
 * The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario.
128
 * For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs,
129
 * I first merge the access to the orc files to be read (of course there is a problem of read amplification,
130
 * but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time),
131
 * and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder.
132
 */
133
class RangeCacheFileReader : public io::FileReader {
134
    struct RangeCacheReaderStatistics {
135
        int64_t request_io = 0;
136
        int64_t request_bytes = 0;
137
        int64_t request_time = 0;
138
        int64_t read_to_cache_time = 0;
139
        int64_t cache_refresh_count = 0;
140
        int64_t read_to_cache_bytes = 0;
141
    };
142
143
public:
144
    RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader,
145
                         std::shared_ptr<RangeFinder> range_finder);
146
147
23.3k
    ~RangeCacheFileReader() override = default;
148
149
3
    Status close() override {
150
3
        if (!_closed) {
151
3
            _closed = true;
152
3
        }
153
3
        return Status::OK();
154
3
    }
155
156
0
    const io::Path& path() const override { return _inner_reader->path(); }
157
158
0
    size_t size() const override { return _size; }
159
160
0
    bool closed() const override { return _closed; }
161
162
0
    int64_t mtime() const override { return _inner_reader->mtime(); }
163
164
protected:
165
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
166
                        const IOContext* io_ctx) override;
167
168
    void _collect_profile_before_close() override;
169
170
private:
171
    RuntimeProfile* _profile = nullptr;
172
    io::FileReaderSPtr _inner_reader;
173
    std::shared_ptr<RangeFinder> _range_finder;
174
175
    OwnedSlice _cache;
176
    int64_t _current_start_offset = -1;
177
178
    size_t _size;
179
    bool _closed = false;
180
181
    RuntimeProfile::Counter* _request_io = nullptr;
182
    RuntimeProfile::Counter* _request_bytes = nullptr;
183
    RuntimeProfile::Counter* _request_time = nullptr;
184
    RuntimeProfile::Counter* _read_to_cache_time = nullptr;
185
    RuntimeProfile::Counter* _cache_refresh_count = nullptr;
186
    RuntimeProfile::Counter* _read_to_cache_bytes = nullptr;
187
    RangeCacheReaderStatistics _cache_statistics;
188
    /**
189
     * `RangeCacheFileReader`:
190
     *   1. `CacheRefreshCount`: how many IOs are merged
191
     *   2. `ReadToCacheBytes`: how much data is actually read after merging
192
     *   3. `ReadToCacheTime`: how long it takes to read data after merging
193
     *   4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file
194
     *   5. `RequestIO`: how many times the apache-orc library calls this read interface
195
     *   6. `RequestTime`: how long it takes the apache-orc library to call this read interface
196
     *
197
     *   It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as
198
     *   the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs,
199
     *  because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once.
200
     */
201
};
202
203
/**
204
 * A FileReader that efficiently supports random access format like parquet and orc.
205
 * In order to merge small IO in parquet and orc, the random access ranges should be generated
206
 * when creating the reader. The random access ranges is a list of ranges that order by offset.
207
 * The range in random access ranges should be reading sequentially, can be skipped, but can't be
208
 * read repeatedly. When calling read_at, if the start offset located in random access ranges, the
209
 * slice size should not span two ranges.
210
 *
211
 * For example, in parquet, the random access ranges is the column offsets in a row group.
212
 *
213
 * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader
214
 * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges
215
 * into small buffers(name as box, default 1MB, 128MB in total). A box can be occupied by many ranges,
216
 * and use a reference counter to record how many ranges are cached in the box. If reference counter
217
 * equals zero, the box can be release or reused by other ranges. When there is no empty box for a new
218
 * read operation, the read operation will do directly.
219
 */
220
class MergeRangeFileReader : public io::FileReader {
221
public:
222
    struct Statistics {
223
        int64_t copy_time = 0;
224
        int64_t read_time = 0;
225
        int64_t request_io = 0;
226
        int64_t merged_io = 0;
227
        int64_t request_bytes = 0;
228
        int64_t merged_bytes = 0;
229
    };
230
231
    struct RangeCachedData {
232
        size_t start_offset;
233
        size_t end_offset;
234
        std::vector<int16_t> ref_box;
235
        std::vector<uint32_t> box_start_offset;
236
        std::vector<uint32_t> box_end_offset;
237
        bool has_read = false;
238
239
        RangeCachedData(size_t start_offset, size_t end_offset)
240
0
                : start_offset(start_offset), end_offset(end_offset) {}
241
242
100k
        RangeCachedData() : start_offset(0), end_offset(0) {}
243
244
207k
        bool empty() const { return start_offset == end_offset; }
245
246
1.09M
        bool contains(size_t offset) const { return start_offset <= offset && offset < end_offset; }
247
248
190k
        void reset() {
249
190k
            start_offset = 0;
250
190k
            end_offset = 0;
251
190k
            ref_box.clear();
252
190k
            box_start_offset.clear();
253
190k
            box_end_offset.clear();
254
190k
        }
255
256
0
        int16_t release_last_box() {
257
0
            // we can only release the last referenced box to ensure sequential read in range
258
0
            if (!empty()) {
259
0
                int16_t last_box_ref = ref_box.back();
260
0
                uint32_t released_size = box_end_offset.back() - box_start_offset.back();
261
0
                ref_box.pop_back();
262
0
                box_start_offset.pop_back();
263
0
                box_end_offset.pop_back();
264
0
                end_offset -= released_size;
265
0
                if (empty()) {
266
0
                    reset();
267
0
                }
268
0
                return last_box_ref;
269
0
            }
270
0
            return -1;
271
0
        }
272
    };
273
274
    static constexpr size_t TOTAL_BUFFER_SIZE = 128 * 1024 * 1024;  // 128MB
275
    static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024;      // 8MB
276
    static constexpr size_t BOX_SIZE = 1 * 1024 * 1024;             // 1MB
277
    static constexpr size_t SMALL_IO = 2 * 1024 * 1024;             // 2MB
278
    static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128
279
280
    MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
281
                         const std::vector<PrefetchRange>& random_access_ranges,
282
                         int64_t merge_read_slice_size = READ_SLICE_SIZE)
283
16.7k
            : _profile(profile),
284
16.7k
              _reader(std::move(reader)),
285
16.7k
              _random_access_ranges(random_access_ranges) {
286
16.7k
        _range_cached_data.resize(random_access_ranges.size());
287
16.7k
        _size = _reader->size();
288
16.7k
        _remaining = TOTAL_BUFFER_SIZE;
289
16.7k
        _is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr;
290
16.7k
        _max_amplified_ratio = config::max_amplified_read_ratio;
291
        // Equivalent min size of each IO that can reach the maximum storage speed limit:
292
        // 1MB for oss, 8KB for hdfs
293
16.7k
        _equivalent_io_size =
294
16.7k
                _is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size;
295
296
16.7k
        _merged_read_slice_size = merge_read_slice_size;
297
298
16.7k
        if (_merged_read_slice_size < 0) {
299
12
            _merged_read_slice_size = READ_SLICE_SIZE;
300
12
        }
301
302
16.7k
        if (_profile != nullptr) {
303
16.7k
            const char* random_profile = "MergedSmallIO";
304
16.7k
            ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1);
305
16.7k
            _copy_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CopyTime", random_profile, 1);
306
16.7k
            _read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadTime", random_profile, 1);
307
16.7k
            _request_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT,
308
16.7k
                                                       random_profile, 1);
309
16.7k
            _merged_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO", TUnit::UNIT,
310
16.7k
                                                      random_profile, 1);
311
16.7k
            _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES,
312
16.7k
                                                          random_profile, 1);
313
16.7k
            _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES,
314
16.7k
                                                         random_profile, 1);
315
16.7k
        }
316
16.7k
    }
317
318
16.8k
    ~MergeRangeFileReader() override = default;
319
320
0
    Status close() override {
321
0
        if (!_closed) {
322
0
            _closed = true;
323
0
        }
324
0
        return Status::OK();
325
0
    }
326
327
100k
    const io::Path& path() const override { return _reader->path(); }
328
329
0
    size_t size() const override { return _size; }
330
331
0
    bool closed() const override { return _closed; }
332
333
100k
    int64_t mtime() const override { return _reader->mtime(); }
334
335
    // for test only
336
    size_t buffer_remaining() const { return _remaining; }
337
338
    // for test only
339
    const std::vector<RangeCachedData>& range_cached_data() const { return _range_cached_data; }
340
341
    // for test only
342
    const std::vector<int16_t>& box_reference() const { return _box_ref; }
343
344
    // for test only
345
    const Statistics& statistics() const { return _statistics; }
346
347
protected:
348
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
349
                        const IOContext* io_ctx) override;
350
351
16.6k
    void _collect_profile_before_close() override {
352
16.6k
        if (_profile != nullptr) {
353
16.6k
            COUNTER_UPDATE(_copy_time, _statistics.copy_time);
354
16.6k
            COUNTER_UPDATE(_read_time, _statistics.read_time);
355
16.6k
            COUNTER_UPDATE(_request_io, _statistics.request_io);
356
16.6k
            COUNTER_UPDATE(_merged_io, _statistics.merged_io);
357
16.6k
            COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
358
16.6k
            COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes);
359
16.6k
            if (_reader != nullptr) {
360
16.6k
                _reader->collect_profile_before_close();
361
16.6k
            }
362
16.6k
        }
363
16.6k
    }
364
365
private:
366
    RuntimeProfile::Counter* _copy_time = nullptr;
367
    RuntimeProfile::Counter* _read_time = nullptr;
368
    RuntimeProfile::Counter* _request_io = nullptr;
369
    RuntimeProfile::Counter* _merged_io = nullptr;
370
    RuntimeProfile::Counter* _request_bytes = nullptr;
371
    RuntimeProfile::Counter* _merged_bytes = nullptr;
372
373
    int _search_read_range(size_t start_offset, size_t end_offset);
374
    void _clean_cached_data(RangeCachedData& cached_data);
375
    void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice result,
376
                      size_t* bytes_read);
377
    Status _fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read,
378
                     const IOContext* io_ctx);
379
    void _dec_box_ref(int16_t box_index);
380
381
    RuntimeProfile* _profile = nullptr;
382
    io::FileReaderSPtr _reader;
383
    const std::vector<PrefetchRange> _random_access_ranges;
384
    std::vector<RangeCachedData> _range_cached_data;
385
    size_t _size;
386
    bool _closed = false;
387
    size_t _remaining;
388
389
    std::unique_ptr<OwnedSlice> _read_slice;
390
    std::vector<OwnedSlice> _boxes;
391
    int16_t _last_box_ref = -1;
392
    uint32_t _last_box_usage = 0;
393
    std::vector<int16_t> _box_ref;
394
    bool _is_oss;
395
    double _max_amplified_ratio;
396
    size_t _equivalent_io_size;
397
    int64_t _merged_read_slice_size;
398
399
    Statistics _statistics;
400
};
401
402
/**
403
 * Create a file reader suitable for accessing scenarios:
404
 * 1. When file size < config::in_memory_file_size, create InMemoryFileReader file reader
405
 * 2. When reading sequential file(csv/json), create PrefetchBufferedReader
406
 * 3. When reading random access file(parquet/orc), create normal file reader
407
 */
408
class DelegateReader {
409
public:
410
    enum AccessMode { SEQUENTIAL, RANDOM };
411
412
    static Result<io::FileReaderSPtr> create_file_reader(
413
            RuntimeProfile* profile, const FileSystemProperties& system_properties,
414
            const FileDescription& file_description, const io::FileReaderOptions& reader_options,
415
            AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr,
416
            const PrefetchRange file_range = PrefetchRange(0, 0));
417
418
    static Result<io::FileReaderSPtr> create_file_reader(
419
            RuntimeProfile* profile, const FileSystemProperties& system_properties,
420
            const FileDescription& file_description, const io::FileReaderOptions& reader_options,
421
            AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx,
422
            const PrefetchRange file_range = PrefetchRange(0, 0));
423
};
424
425
class PrefetchBufferedReader;
426
struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public ProfileCollector {
427
    enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED };
428
429
    PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size,
430
                   io::FileReader* reader, std::shared_ptr<const IOContext> io_ctx,
431
                   std::function<void(PrefetchBuffer&)> sync_profile)
432
240
            : _file_range(file_range),
433
240
              _size(buffer_size),
434
240
              _whole_buffer_size(whole_buffer_size),
435
240
              _reader(reader),
436
240
              _io_ctx_holder(std::move(io_ctx)),
437
240
              _io_ctx(_io_ctx_holder.get()),
438
240
              _sync_profile(std::move(sync_profile)) {}
439
440
    PrefetchBuffer(PrefetchBuffer&& other)
441
            : _offset(other._offset),
442
              _file_range(other._file_range),
443
              _random_access_ranges(other._random_access_ranges),
444
              _size(other._size),
445
              _whole_buffer_size(other._whole_buffer_size),
446
              _reader(other._reader),
447
              _io_ctx_holder(std::move(other._io_ctx_holder)),
448
              _io_ctx(_io_ctx_holder.get()),
449
              _buf(std::move(other._buf)),
450
0
              _sync_profile(std::move(other._sync_profile)) {}
451
452
240
    ~PrefetchBuffer() = default;
453
454
    size_t _offset {0};
455
    // [start_offset, end_offset) is the range that can be prefetched.
456
    // Notice that the reader can read out of [start_offset, end_offset), because FE does not align the file
457
    // according to the format when splitting it.
458
    const PrefetchRange _file_range;
459
    const std::vector<PrefetchRange>* _random_access_ranges = nullptr;
460
    size_t _size {0};
461
    size_t _len {0};
462
    size_t _whole_buffer_size;
463
    io::FileReader* _reader = nullptr;
464
    std::shared_ptr<const IOContext> _io_ctx_holder;
465
    const IOContext* _io_ctx = nullptr;
466
    PODArray<char> _buf;
467
    BufferStatus _buffer_status {BufferStatus::RESET};
468
    std::mutex _lock;
469
    std::condition_variable _prefetched;
470
    Status _prefetch_status {Status::OK()};
471
    std::atomic_bool _exceed = false;
472
    std::function<void(PrefetchBuffer&)> _sync_profile;
473
    struct Statistics {
474
        int64_t copy_time {0};
475
        int64_t read_time {0};
476
        int64_t prefetch_request_io {0};
477
        int64_t prefetch_request_bytes {0};
478
        int64_t request_io {0};
479
        int64_t request_bytes {0};
480
    };
481
    Statistics _statis;
482
483
    // @brief: reset the start offset of this buffer to offset
484
    // @param: the new start offset for this buffer
485
    void reset_offset(size_t offset);
486
    // @brief: start to fetch the content between [_offset, _offset + _size)
487
    void prefetch_buffer();
488
    // @brief: used by BufferedReader to read the prefetched data
489
    // @param[off] read start address
490
    // @param[buf] buffer to put the actual content
491
    // @param[buf_len] maximum len trying to read
492
    // @param[bytes_read] actual bytes read
493
    Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read);
494
    // @brief: shut down the buffer until the prior prefetching task is done
495
    void close();
496
    // @brief: to detect whether this buffer contains off
497
    // @param[off] detect offset
498
251
    bool inline contains(size_t off) const { return _offset <= off && off < _offset + _size; }
499
500
0
    void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
501
0
        _random_access_ranges = random_access_ranges;
502
0
    }
503
504
    // binary search the last prefetch buffer that larger or include the offset
505
    int search_read_range(size_t off) const;
506
507
    size_t merge_small_ranges(size_t off, int range_index) const;
508
509
0
    void _collect_profile_at_runtime() override {}
510
511
    void _collect_profile_before_close() override;
512
};
513
514
constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB
515
516
/**
517
 * A buffered reader that prefetch data in the daemon thread pool.
518
 *
519
 * file_range is the range that the file is read.
520
 * random_access_ranges are the column ranges in format, like orc and parquet.
521
 *
522
 * When random_access_ranges is empty:
523
 * The data is prefetched sequentially until the underlying buffers(4 * 4M as default) are full.
524
 * When a buffer is read out, it will fetch data backward in daemon, so the underlying reader should be
525
 * thread-safe, and the access mode of data needs to be sequential.
526
 *
527
 * When random_access_ranges is not empty:
528
 * The data is prefetched order by the random_access_ranges. If some adjacent ranges is small, the underlying reader
529
 * will merge them.
530
 */
531
class PrefetchBufferedReader final : public io::FileReader {
532
public:
533
    PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
534
                           PrefetchRange file_range,
535
                           std::shared_ptr<const IOContext> io_ctx = nullptr,
536
                           int64_t buffer_size = -1L);
537
    ~PrefetchBufferedReader() override;
538
539
    Status close() override;
540
541
0
    const io::Path& path() const override { return _reader->path(); }
542
543
530
    size_t size() const override { return _size; }
544
545
0
    bool closed() const override { return _closed; }
546
547
0
    int64_t mtime() const override { return _reader->mtime(); }
548
549
0
    void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) {
550
0
        _random_access_ranges = random_access_ranges;
551
0
        for (auto& _pre_buffer : _pre_buffers) {
552
0
            _pre_buffer->set_random_access_ranges(random_access_ranges);
553
0
        }
554
0
    }
555
556
protected:
557
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
558
                        const IOContext* io_ctx) override;
559
560
    void _collect_profile_before_close() override;
561
562
private:
563
    Status _close_internal();
564
491
    size_t get_buffer_pos(int64_t position) const {
565
491
        return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size;
566
491
    }
567
240
    size_t get_buffer_offset(int64_t position) const {
568
240
        return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size;
569
240
    }
570
60
    void reset_all_buffer(size_t position) {
571
300
        for (int64_t i = 0; i < _pre_buffers.size(); i++) {
572
240
            int64_t cur_pos = position + i * s_max_pre_buffer_size;
573
240
            size_t cur_buf_pos = get_buffer_pos(cur_pos);
574
            // reset would do all the prefetch work
575
240
            _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos));
576
240
        }
577
60
    }
578
579
    io::FileReaderSPtr _reader;
580
    PrefetchRange _file_range;
581
    const std::vector<PrefetchRange>* _random_access_ranges = nullptr;
582
    std::shared_ptr<const IOContext> _io_ctx_holder;
583
    const IOContext* _io_ctx = nullptr;
584
    std::vector<std::shared_ptr<PrefetchBuffer>> _pre_buffers;
585
    int64_t _whole_pre_buffer_size;
586
    bool _initialized = false;
587
    bool _closed = false;
588
    size_t _size;
589
};
590
591
/**
592
 * A file reader that read the whole file into memory.
593
 * When a file is small(<8MB), InMemoryFileReader can effectively reduce the number of file accesses
594
 * and greatly improve the access speed of small files.
595
 */
596
class InMemoryFileReader final : public io::FileReader {
597
public:
598
    InMemoryFileReader(io::FileReaderSPtr reader);
599
600
    ~InMemoryFileReader() override;
601
602
    Status close() override;
603
604
105k
    const io::Path& path() const override { return _reader->path(); }
605
606
65.1k
    size_t size() const override { return _size; }
607
608
0
    bool closed() const override { return _closed; }
609
610
73.6k
    int64_t mtime() const override { return _reader->mtime(); }
611
612
protected:
613
    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
614
                        const IOContext* io_ctx) override;
615
616
    void _collect_profile_before_close() override;
617
618
private:
619
    Status _close_internal();
620
    io::FileReaderSPtr _reader;
621
    std::unique_ptr<char[]> _data;
622
    size_t _size;
623
    bool _closed = false;
624
};
625
626
/**
627
 * Load all the needed data in underlying buffer, so the caller does not need to prepare the data container.
628
 */
629
class BufferedStreamReader {
630
public:
631
    /**
632
     * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read)
633
     * @param buf the buffer address to save the start address of data
634
     * @param offset start offset ot read in stream
635
     * @param bytes_to_read bytes to read
636
     */
637
    virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read,
638
                              const IOContext* io_ctx) = 0;
639
    /**
640
     * Save the data address to slice.data, and the slice.size is the bytes to read.
641
     */
642
    virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0;
643
174k
    virtual ~BufferedStreamReader() = default;
644
    // return the file path
645
    virtual std::string path() = 0;
646
647
    virtual int64_t mtime() const = 0;
648
};
649
650
class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector {
651
public:
652
    BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length,
653
                             size_t max_buf_size);
654
174k
    ~BufferedFileStreamReader() override = default;
655
656
    Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read,
657
                      const IOContext* io_ctx) override;
658
    Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override;
659
174k
    std::string path() override { return _file->path(); }
660
661
174k
    int64_t mtime() const override { return _file->mtime(); }
662
663
protected:
664
0
    void _collect_profile_before_close() override {
665
0
        if (_file != nullptr) {
666
0
            _file->collect_profile_before_close();
667
0
        }
668
0
    }
669
670
private:
671
    DorisUniqueBufferPtr<uint8_t> _buf;
672
    io::FileReaderSPtr _file;
673
    uint64_t _file_start_offset;
674
    uint64_t _file_end_offset;
675
676
    uint64_t _buf_start_offset = 0;
677
    uint64_t _buf_end_offset = 0;
678
    size_t _buf_size = 0;
679
    size_t _max_buf_size;
680
};
681
682
} // namespace io
683
684
} // namespace doris