be/src/io/fs/buffered_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stddef.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <condition_variable> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <string> |
27 | | #include <utility> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/custom_allocator.h" |
32 | | #include "core/typeid_cast.h" |
33 | | #include "io/cache/cached_remote_file_reader.h" |
34 | | #include "io/file_factory.h" |
35 | | #include "io/fs/broker_file_reader.h" |
36 | | #include "io/fs/file_reader.h" |
37 | | #include "io/fs/path.h" |
38 | | #include "io/fs/s3_file_reader.h" |
39 | | #include "io/io_common.h" |
40 | | #include "runtime/runtime_profile.h" |
41 | | #include "storage/olap_define.h" |
42 | | #include "util/slice.h" |
43 | | namespace doris { |
44 | | |
45 | | #include "common/compile_check_begin.h" |
46 | | |
47 | | namespace io { |
48 | | |
49 | | class FileSystem; |
50 | | |
51 | | struct PrefetchRange { |
52 | | size_t start_offset; |
53 | | size_t end_offset; |
54 | | |
55 | | PrefetchRange(size_t start_offset, size_t end_offset) |
56 | 4.42k | : start_offset(start_offset), end_offset(end_offset) {} |
57 | | |
58 | 6 | PrefetchRange() : start_offset(0), end_offset(0) {} |
59 | | |
60 | | bool operator==(const PrefetchRange& other) const { |
61 | | return (start_offset == other.start_offset) && (end_offset == other.end_offset); |
62 | | } |
63 | | |
64 | 0 | bool operator!=(const PrefetchRange& other) const { return !(*this == other); } |
65 | | |
66 | 0 | PrefetchRange span(const PrefetchRange& other) const { |
67 | 0 | return {std::min(start_offset, other.end_offset), std::max(start_offset, other.end_offset)}; |
68 | 0 | } |
69 | 1.25k | PrefetchRange seq_span(const PrefetchRange& other) const { |
70 | 1.25k | return {start_offset, other.end_offset}; |
71 | 1.25k | } |
72 | | |
73 | | //Ranges needs to be sorted. |
74 | | static std::vector<PrefetchRange> merge_adjacent_seq_ranges( |
75 | | const std::vector<PrefetchRange>& seq_ranges, int64_t max_merge_distance_bytes, |
76 | 139 | int64_t once_max_read_bytes) { |
77 | 139 | if (seq_ranges.empty()) { |
78 | 0 | return {}; |
79 | 0 | } |
80 | | // Merge overlapping ranges |
81 | 139 | std::vector<PrefetchRange> result; |
82 | 139 | PrefetchRange last = seq_ranges.front(); |
83 | 1.38k | for (size_t i = 1; i < seq_ranges.size(); ++i) { |
84 | 1.25k | PrefetchRange current = seq_ranges[i]; |
85 | 1.25k | PrefetchRange merged = last.seq_span(current); |
86 | 1.25k | if (merged.end_offset <= once_max_read_bytes + merged.start_offset && |
87 | 1.25k | last.end_offset + max_merge_distance_bytes >= current.start_offset) { |
88 | 1.24k | last = merged; |
89 | 1.24k | } else { |
90 | 8 | result.push_back(last); |
91 | 8 | last = current; |
92 | 8 | } |
93 | 1.25k | } |
94 | 139 | result.push_back(last); |
95 | 139 | return result; |
96 | 139 | } |
97 | | }; |
98 | | |
99 | | class RangeFinder { |
100 | | public: |
101 | 104 | virtual ~RangeFinder() = default; |
102 | | virtual Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) = 0; |
103 | | virtual size_t get_max_range_size() const = 0; |
104 | | }; |
105 | | |
106 | | class LinearProbeRangeFinder : public RangeFinder { |
107 | | public: |
108 | 104 | LinearProbeRangeFinder(std::vector<io::PrefetchRange>&& ranges) : _ranges(std::move(ranges)) {} |
109 | | |
110 | | Status get_range_for(int64_t desired_offset, io::PrefetchRange& result_range) override; |
111 | | |
112 | 104 | size_t get_max_range_size() const override { |
113 | 104 | size_t max_range_size = 0; |
114 | 107 | for (const auto& range : _ranges) { |
115 | 107 | max_range_size = std::max(max_range_size, range.end_offset - range.start_offset); |
116 | 107 | } |
117 | 104 | return max_range_size; |
118 | 104 | } |
119 | | |
120 | 104 | ~LinearProbeRangeFinder() override = default; |
121 | | |
122 | | private: |
123 | | std::vector<io::PrefetchRange> _ranges; |
124 | | size_t index {0}; |
125 | | }; |
126 | | |
127 | | /** |
128 | | * The reader provides a solution to read one range at a time. You can customize RangeFinder to meet your scenario. |
129 | | * For me, since there will be tiny stripes when reading orc files, in order to reduce the requests to hdfs, |
130 | | * I first merge the access to the orc files to be read (of course there is a problem of read amplification, |
131 | | * but in my scenario, compared with reading hdfs multiple times, it is faster to read more data on hdfs at one time), |
132 | | * and then because the actual reading of orc files is in order from front to back, I provide LinearProbeRangeFinder. |
133 | | */ |
134 | | class RangeCacheFileReader : public io::FileReader { |
135 | | struct RangeCacheReaderStatistics { |
136 | | int64_t request_io = 0; |
137 | | int64_t request_bytes = 0; |
138 | | int64_t request_time = 0; |
139 | | int64_t read_to_cache_time = 0; |
140 | | int64_t cache_refresh_count = 0; |
141 | | int64_t read_to_cache_bytes = 0; |
142 | | }; |
143 | | |
144 | | public: |
145 | | RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader, |
146 | | std::shared_ptr<RangeFinder> range_finder); |
147 | | |
148 | 104 | ~RangeCacheFileReader() override = default; |
149 | | |
150 | 3 | Status close() override { |
151 | 3 | if (!_closed) { |
152 | 3 | _closed = true; |
153 | 3 | } |
154 | 3 | return Status::OK(); |
155 | 3 | } |
156 | | |
157 | 0 | const io::Path& path() const override { return _inner_reader->path(); } |
158 | | |
159 | 0 | size_t size() const override { return _size; } |
160 | | |
161 | 0 | bool closed() const override { return _closed; } |
162 | | |
163 | 0 | int64_t mtime() const override { return _inner_reader->mtime(); } |
164 | | |
165 | | protected: |
166 | | Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
167 | | const IOContext* io_ctx) override; |
168 | | |
169 | | void _collect_profile_before_close() override; |
170 | | |
171 | | private: |
172 | | RuntimeProfile* _profile = nullptr; |
173 | | io::FileReaderSPtr _inner_reader; |
174 | | std::shared_ptr<RangeFinder> _range_finder; |
175 | | |
176 | | OwnedSlice _cache; |
177 | | int64_t _current_start_offset = -1; |
178 | | |
179 | | size_t _size; |
180 | | bool _closed = false; |
181 | | |
182 | | RuntimeProfile::Counter* _request_io = nullptr; |
183 | | RuntimeProfile::Counter* _request_bytes = nullptr; |
184 | | RuntimeProfile::Counter* _request_time = nullptr; |
185 | | RuntimeProfile::Counter* _read_to_cache_time = nullptr; |
186 | | RuntimeProfile::Counter* _cache_refresh_count = nullptr; |
187 | | RuntimeProfile::Counter* _read_to_cache_bytes = nullptr; |
188 | | RangeCacheReaderStatistics _cache_statistics; |
189 | | /** |
190 | | * `RangeCacheFileReader`: |
191 | | * 1. `CacheRefreshCount`: how many IOs are merged |
192 | | * 2. `ReadToCacheBytes`: how much data is actually read after merging |
193 | | * 3. `ReadToCacheTime`: how long it takes to read data after merging |
194 | | * 4. `RequestBytes`: how many bytes does the apache-orc library actually need to read the orc file |
195 | | * 5. `RequestIO`: how many times the apache-orc library calls this read interface |
196 | | * 6. `RequestTime`: how long it takes the apache-orc library to call this read interface |
197 | | * |
198 | | * It should be noted that `RangeCacheFileReader` is a wrapper of the reader that actually reads data,such as |
199 | | * the hdfs reader, so strictly speaking, `CacheRefreshCount` is not equal to how many IOs are initiated to hdfs, |
200 | | * because each time the hdfs reader is requested, the hdfs reader may not be able to read all the data at once. |
201 | | */ |
202 | | }; |
203 | | |
204 | | /** |
205 | | * A FileReader that efficiently supports random access format like parquet and orc. |
206 | | * In order to merge small IO in parquet and orc, the random access ranges should be generated |
207 | | * when creating the reader. The random access ranges is a list of ranges that order by offset. |
208 | | * The range in random access ranges should be reading sequentially, can be skipped, but can't be |
209 | | * read repeatedly. When calling read_at, if the start offset located in random access ranges, the |
210 | | * slice size should not span two ranges. |
211 | | * |
212 | | * For example, in parquet, the random access ranges is the column offsets in a row group. |
213 | | * |
214 | | * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader |
215 | | * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges |
216 | | * into small buffers(name as box, default 1MB, 128MB in total). A box can be occupied by many ranges, |
217 | | * and use a reference counter to record how many ranges are cached in the box. If reference counter |
218 | | * equals zero, the box can be release or reused by other ranges. When there is no empty box for a new |
219 | | * read operation, the read operation will do directly. |
220 | | */ |
221 | | class MergeRangeFileReader : public io::FileReader { |
222 | | public: |
223 | | struct Statistics { |
224 | | int64_t copy_time = 0; |
225 | | int64_t read_time = 0; |
226 | | int64_t request_io = 0; |
227 | | int64_t merged_io = 0; |
228 | | int64_t request_bytes = 0; |
229 | | int64_t merged_bytes = 0; |
230 | | }; |
231 | | |
232 | | struct RangeCachedData { |
233 | | size_t start_offset; |
234 | | size_t end_offset; |
235 | | std::vector<int16_t> ref_box; |
236 | | std::vector<uint32_t> box_start_offset; |
237 | | std::vector<uint32_t> box_end_offset; |
238 | | bool has_read = false; |
239 | | |
240 | | RangeCachedData(size_t start_offset, size_t end_offset) |
241 | 0 | : start_offset(start_offset), end_offset(end_offset) {} |
242 | | |
243 | 679 | RangeCachedData() : start_offset(0), end_offset(0) {} |
244 | | |
245 | 1.25k | bool empty() const { return start_offset == end_offset; } |
246 | | |
247 | 1.25k | bool contains(size_t offset) const { return start_offset <= offset && offset < end_offset; } |
248 | | |
249 | 1.16k | void reset() { |
250 | 1.16k | start_offset = 0; |
251 | 1.16k | end_offset = 0; |
252 | 1.16k | ref_box.clear(); |
253 | 1.16k | box_start_offset.clear(); |
254 | 1.16k | box_end_offset.clear(); |
255 | 1.16k | } |
256 | | |
257 | 0 | int16_t release_last_box() { |
258 | 0 | // we can only release the last referenced box to ensure sequential read in range |
259 | 0 | if (!empty()) { |
260 | 0 | int16_t last_box_ref = ref_box.back(); |
261 | 0 | uint32_t released_size = box_end_offset.back() - box_start_offset.back(); |
262 | 0 | ref_box.pop_back(); |
263 | 0 | box_start_offset.pop_back(); |
264 | 0 | box_end_offset.pop_back(); |
265 | 0 | end_offset -= released_size; |
266 | 0 | if (empty()) { |
267 | 0 | reset(); |
268 | 0 | } |
269 | 0 | return last_box_ref; |
270 | 0 | } |
271 | 0 | return -1; |
272 | 0 | } |
273 | | }; |
274 | | |
275 | | static constexpr size_t TOTAL_BUFFER_SIZE = 128 * 1024 * 1024; // 128MB |
276 | | static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024; // 8MB |
277 | | static constexpr size_t BOX_SIZE = 1 * 1024 * 1024; // 1MB |
278 | | static constexpr size_t SMALL_IO = 2 * 1024 * 1024; // 2MB |
279 | | static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 128 |
280 | | |
281 | | MergeRangeFileReader(RuntimeProfile* profile, io::FileReaderSPtr reader, |
282 | | const std::vector<PrefetchRange>& random_access_ranges, |
283 | | int64_t merge_read_slice_size = READ_SLICE_SIZE) |
284 | 62 | : _profile(profile), |
285 | 62 | _reader(std::move(reader)), |
286 | 62 | _random_access_ranges(random_access_ranges) { |
287 | 62 | _range_cached_data.resize(random_access_ranges.size()); |
288 | 62 | _size = _reader->size(); |
289 | 62 | _remaining = TOTAL_BUFFER_SIZE; |
290 | 62 | _is_oss = typeid_cast<io::S3FileReader*>(_reader.get()) != nullptr; |
291 | 62 | _max_amplified_ratio = config::max_amplified_read_ratio; |
292 | | // Equivalent min size of each IO that can reach the maximum storage speed limit: |
293 | | // 1MB for oss, 8KB for hdfs |
294 | 62 | _equivalent_io_size = |
295 | 62 | _is_oss ? config::merged_oss_min_io_size : config::merged_hdfs_min_io_size; |
296 | | |
297 | 62 | _merged_read_slice_size = merge_read_slice_size; |
298 | | |
299 | 62 | if (_merged_read_slice_size < 0) { |
300 | 10 | _merged_read_slice_size = READ_SLICE_SIZE; |
301 | 10 | } |
302 | | |
303 | 62 | if (_profile != nullptr) { |
304 | 50 | const char* random_profile = "MergedSmallIO"; |
305 | 50 | ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1); |
306 | 50 | _copy_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "CopyTime", random_profile, 1); |
307 | 50 | _read_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadTime", random_profile, 1); |
308 | 50 | _request_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, |
309 | 50 | random_profile, 1); |
310 | 50 | _merged_io = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedIO", TUnit::UNIT, |
311 | 50 | random_profile, 1); |
312 | 50 | _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES, |
313 | 50 | random_profile, 1); |
314 | 50 | _merged_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "MergedBytes", TUnit::BYTES, |
315 | 50 | random_profile, 1); |
316 | 50 | } |
317 | 62 | } |
318 | | |
319 | 62 | ~MergeRangeFileReader() override = default; |
320 | | |
321 | 0 | Status close() override { |
322 | 0 | if (!_closed) { |
323 | 0 | _closed = true; |
324 | 0 | } |
325 | 0 | return Status::OK(); |
326 | 0 | } |
327 | | |
328 | 642 | const io::Path& path() const override { return _reader->path(); } |
329 | | |
330 | 0 | size_t size() const override { return _size; } |
331 | | |
332 | 0 | bool closed() const override { return _closed; } |
333 | | |
334 | 642 | int64_t mtime() const override { return _reader->mtime(); } |
335 | | |
336 | | // for test only |
337 | | size_t buffer_remaining() const { return _remaining; } |
338 | | |
339 | | // for test only |
340 | | const std::vector<RangeCachedData>& range_cached_data() const { return _range_cached_data; } |
341 | | |
342 | | // for test only |
343 | | const std::vector<int16_t>& box_reference() const { return _box_ref; } |
344 | | |
345 | | // for test only |
346 | | const Statistics& statistics() const { return _statistics; } |
347 | | |
348 | | protected: |
349 | | Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
350 | | const IOContext* io_ctx) override; |
351 | | |
352 | 48 | void _collect_profile_before_close() override { |
353 | 48 | if (_profile != nullptr) { |
354 | 48 | COUNTER_UPDATE(_copy_time, _statistics.copy_time); |
355 | 48 | COUNTER_UPDATE(_read_time, _statistics.read_time); |
356 | 48 | COUNTER_UPDATE(_request_io, _statistics.request_io); |
357 | 48 | COUNTER_UPDATE(_merged_io, _statistics.merged_io); |
358 | 48 | COUNTER_UPDATE(_request_bytes, _statistics.request_bytes); |
359 | 48 | COUNTER_UPDATE(_merged_bytes, _statistics.merged_bytes); |
360 | 48 | if (_reader != nullptr) { |
361 | 48 | _reader->collect_profile_before_close(); |
362 | 48 | } |
363 | 48 | } |
364 | 48 | } |
365 | | |
366 | | private: |
367 | | RuntimeProfile::Counter* _copy_time = nullptr; |
368 | | RuntimeProfile::Counter* _read_time = nullptr; |
369 | | RuntimeProfile::Counter* _request_io = nullptr; |
370 | | RuntimeProfile::Counter* _merged_io = nullptr; |
371 | | RuntimeProfile::Counter* _request_bytes = nullptr; |
372 | | RuntimeProfile::Counter* _merged_bytes = nullptr; |
373 | | |
374 | | int _search_read_range(size_t start_offset, size_t end_offset); |
375 | | void _clean_cached_data(RangeCachedData& cached_data); |
376 | | void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice result, |
377 | | size_t* bytes_read); |
378 | | Status _fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read, |
379 | | const IOContext* io_ctx); |
380 | | void _dec_box_ref(int16_t box_index); |
381 | | |
382 | | RuntimeProfile* _profile = nullptr; |
383 | | io::FileReaderSPtr _reader; |
384 | | const std::vector<PrefetchRange> _random_access_ranges; |
385 | | std::vector<RangeCachedData> _range_cached_data; |
386 | | size_t _size; |
387 | | bool _closed = false; |
388 | | size_t _remaining; |
389 | | |
390 | | std::unique_ptr<OwnedSlice> _read_slice; |
391 | | std::vector<OwnedSlice> _boxes; |
392 | | int16_t _last_box_ref = -1; |
393 | | uint32_t _last_box_usage = 0; |
394 | | std::vector<int16_t> _box_ref; |
395 | | bool _is_oss; |
396 | | double _max_amplified_ratio; |
397 | | size_t _equivalent_io_size; |
398 | | int64_t _merged_read_slice_size; |
399 | | |
400 | | Statistics _statistics; |
401 | | }; |
402 | | |
403 | | /** |
404 | | * Create a file reader suitable for accessing scenarios: |
405 | | * 1. When file size < config::in_memory_file_size, create InMemoryFileReader file reader |
406 | | * 2. When reading sequential file(csv/json), create PrefetchBufferedReader |
407 | | * 3. When reading random access file(parquet/orc), create normal file reader |
408 | | */ |
409 | | class DelegateReader { |
410 | | public: |
411 | | enum AccessMode { SEQUENTIAL, RANDOM }; |
412 | | |
413 | | static Result<io::FileReaderSPtr> create_file_reader( |
414 | | RuntimeProfile* profile, const FileSystemProperties& system_properties, |
415 | | const FileDescription& file_description, const io::FileReaderOptions& reader_options, |
416 | | AccessMode access_mode = SEQUENTIAL, const IOContext* io_ctx = nullptr, |
417 | | const PrefetchRange file_range = PrefetchRange(0, 0)); |
418 | | |
419 | | static Result<io::FileReaderSPtr> create_file_reader( |
420 | | RuntimeProfile* profile, const FileSystemProperties& system_properties, |
421 | | const FileDescription& file_description, const io::FileReaderOptions& reader_options, |
422 | | AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx, |
423 | | const PrefetchRange file_range = PrefetchRange(0, 0)); |
424 | | }; |
425 | | |
426 | | class PrefetchBufferedReader; |
427 | | struct PrefetchBuffer : std::enable_shared_from_this<PrefetchBuffer>, public ProfileCollector { |
428 | | enum class BufferStatus { RESET, PENDING, PREFETCHED, CLOSED }; |
429 | | |
430 | | PrefetchBuffer(const PrefetchRange file_range, size_t buffer_size, size_t whole_buffer_size, |
431 | | io::FileReader* reader, std::shared_ptr<const IOContext> io_ctx, |
432 | | std::function<void(PrefetchBuffer&)> sync_profile) |
433 | 176 | : _file_range(file_range), |
434 | 176 | _size(buffer_size), |
435 | 176 | _whole_buffer_size(whole_buffer_size), |
436 | 176 | _reader(reader), |
437 | 176 | _io_ctx_holder(std::move(io_ctx)), |
438 | 176 | _io_ctx(_io_ctx_holder.get()), |
439 | 176 | _buf(new char[buffer_size]), |
440 | 176 | _sync_profile(std::move(sync_profile)) {} |
441 | | |
442 | | PrefetchBuffer(PrefetchBuffer&& other) |
443 | | : _offset(other._offset), |
444 | | _file_range(other._file_range), |
445 | | _random_access_ranges(other._random_access_ranges), |
446 | | _size(other._size), |
447 | | _whole_buffer_size(other._whole_buffer_size), |
448 | | _reader(other._reader), |
449 | | _io_ctx_holder(std::move(other._io_ctx_holder)), |
450 | | _io_ctx(_io_ctx_holder.get()), |
451 | | _buf(std::move(other._buf)), |
452 | 0 | _sync_profile(std::move(other._sync_profile)) {} |
453 | | |
454 | 176 | ~PrefetchBuffer() = default; |
455 | | |
456 | | size_t _offset {0}; |
457 | | // [start_offset, end_offset) is the range that can be prefetched. |
458 | | // Notice that the reader can read out of [start_offset, end_offset), because FE does not align the file |
459 | | // according to the format when splitting it. |
460 | | const PrefetchRange _file_range; |
461 | | const std::vector<PrefetchRange>* _random_access_ranges = nullptr; |
462 | | size_t _size {0}; |
463 | | size_t _len {0}; |
464 | | size_t _whole_buffer_size; |
465 | | io::FileReader* _reader = nullptr; |
466 | | std::shared_ptr<const IOContext> _io_ctx_holder; |
467 | | const IOContext* _io_ctx = nullptr; |
468 | | std::unique_ptr<char[]> _buf; |
469 | | BufferStatus _buffer_status {BufferStatus::RESET}; |
470 | | std::mutex _lock; |
471 | | std::condition_variable _prefetched; |
472 | | Status _prefetch_status {Status::OK()}; |
473 | | std::atomic_bool _exceed = false; |
474 | | std::function<void(PrefetchBuffer&)> _sync_profile; |
475 | | struct Statistics { |
476 | | int64_t copy_time {0}; |
477 | | int64_t read_time {0}; |
478 | | int64_t prefetch_request_io {0}; |
479 | | int64_t prefetch_request_bytes {0}; |
480 | | int64_t request_io {0}; |
481 | | int64_t request_bytes {0}; |
482 | | }; |
483 | | Statistics _statis; |
484 | | |
485 | | // @brief: reset the start offset of this buffer to offset |
486 | | // @param: the new start offset for this buffer |
487 | | void reset_offset(size_t offset); |
488 | | // @brief: start to fetch the content between [_offset, _offset + _size) |
489 | | void prefetch_buffer(); |
490 | | // @brief: used by BufferedReader to read the prefetched data |
491 | | // @param[off] read start address |
492 | | // @param[buf] buffer to put the actual content |
493 | | // @param[buf_len] maximum len trying to read |
494 | | // @param[bytes_read] actual bytes read |
495 | | Status read_buffer(size_t off, const char* buf, size_t buf_len, size_t* bytes_read); |
496 | | // @brief: shut down the buffer until the prior prefetching task is done |
497 | | void close(); |
498 | | // @brief: to detect whether this buffer contains off |
499 | | // @param[off] detect offset |
500 | 131 | bool inline contains(size_t off) const { return _offset <= off && off < _offset + _size; } |
501 | | |
502 | 0 | void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) { |
503 | 0 | _random_access_ranges = random_access_ranges; |
504 | 0 | } |
505 | | |
506 | | // binary search the last prefetch buffer that larger or include the offset |
507 | | int search_read_range(size_t off) const; |
508 | | |
509 | | size_t merge_small_ranges(size_t off, int range_index) const; |
510 | | |
511 | 0 | void _collect_profile_at_runtime() override {} |
512 | | |
513 | | void _collect_profile_before_close() override; |
514 | | }; |
515 | | |
516 | | constexpr int64_t s_max_pre_buffer_size = 4 * 1024 * 1024; // 4MB |
517 | | |
518 | | /** |
519 | | * A buffered reader that prefetch data in the daemon thread pool. |
520 | | * |
521 | | * file_range is the range that the file is read. |
522 | | * random_access_ranges are the column ranges in format, like orc and parquet. |
523 | | * |
524 | | * When random_access_ranges is empty: |
525 | | * The data is prefetched sequentially until the underlying buffers(4 * 4M as default) are full. |
526 | | * When a buffer is read out, it will fetch data backward in daemon, so the underlying reader should be |
527 | | * thread-safe, and the access mode of data needs to be sequential. |
528 | | * |
529 | | * When random_access_ranges is not empty: |
530 | | * The data is prefetched order by the random_access_ranges. If some adjacent ranges is small, the underlying reader |
531 | | * will merge them. |
532 | | */ |
533 | | class PrefetchBufferedReader final : public io::FileReader { |
534 | | public: |
535 | | PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, |
536 | | PrefetchRange file_range, |
537 | | std::shared_ptr<const IOContext> io_ctx = nullptr, |
538 | | int64_t buffer_size = -1L); |
539 | | ~PrefetchBufferedReader() override; |
540 | | |
541 | | Status close() override; |
542 | | |
543 | 0 | const io::Path& path() const override { return _reader->path(); } |
544 | | |
545 | 282 | size_t size() const override { return _size; } |
546 | | |
547 | 0 | bool closed() const override { return _closed; } |
548 | | |
549 | 0 | int64_t mtime() const override { return _reader->mtime(); } |
550 | | |
551 | 0 | void set_random_access_ranges(const std::vector<PrefetchRange>* random_access_ranges) { |
552 | 0 | _random_access_ranges = random_access_ranges; |
553 | 0 | for (auto& _pre_buffer : _pre_buffers) { |
554 | 0 | _pre_buffer->set_random_access_ranges(random_access_ranges); |
555 | 0 | } |
556 | 0 | } |
557 | | |
558 | | protected: |
559 | | Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
560 | | const IOContext* io_ctx) override; |
561 | | |
562 | | void _collect_profile_before_close() override; |
563 | | |
564 | | private: |
565 | | Status _close_internal(); |
566 | 307 | size_t get_buffer_pos(int64_t position) const { |
567 | 307 | return (position % _whole_pre_buffer_size) / s_max_pre_buffer_size; |
568 | 307 | } |
569 | 176 | size_t get_buffer_offset(int64_t position) const { |
570 | 176 | return (position / s_max_pre_buffer_size) * s_max_pre_buffer_size; |
571 | 176 | } |
572 | 44 | void reset_all_buffer(size_t position) { |
573 | 220 | for (int64_t i = 0; i < _pre_buffers.size(); i++) { |
574 | 176 | int64_t cur_pos = position + i * s_max_pre_buffer_size; |
575 | 176 | size_t cur_buf_pos = get_buffer_pos(cur_pos); |
576 | | // reset would do all the prefetch work |
577 | 176 | _pre_buffers[cur_buf_pos]->reset_offset(get_buffer_offset(cur_pos)); |
578 | 176 | } |
579 | 44 | } |
580 | | |
581 | | io::FileReaderSPtr _reader; |
582 | | PrefetchRange _file_range; |
583 | | const std::vector<PrefetchRange>* _random_access_ranges = nullptr; |
584 | | std::shared_ptr<const IOContext> _io_ctx_holder; |
585 | | const IOContext* _io_ctx = nullptr; |
586 | | std::vector<std::shared_ptr<PrefetchBuffer>> _pre_buffers; |
587 | | int64_t _whole_pre_buffer_size; |
588 | | bool _initialized = false; |
589 | | bool _closed = false; |
590 | | size_t _size; |
591 | | }; |
592 | | |
593 | | /** |
594 | | * A file reader that read the whole file into memory. |
595 | | * When a file is small(<8MB), InMemoryFileReader can effectively reduce the number of file accesses |
596 | | * and greatly improve the access speed of small files. |
597 | | */ |
598 | | class InMemoryFileReader final : public io::FileReader { |
599 | | public: |
600 | | InMemoryFileReader(io::FileReaderSPtr reader); |
601 | | |
602 | | ~InMemoryFileReader() override; |
603 | | |
604 | | Status close() override; |
605 | | |
606 | 635 | const io::Path& path() const override { return _reader->path(); } |
607 | | |
608 | 1.36k | size_t size() const override { return _size; } |
609 | | |
610 | 0 | bool closed() const override { return _closed; } |
611 | | |
612 | 458 | int64_t mtime() const override { return _reader->mtime(); } |
613 | | |
614 | | protected: |
615 | | Status read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
616 | | const IOContext* io_ctx) override; |
617 | | |
618 | | void _collect_profile_before_close() override; |
619 | | |
620 | | private: |
621 | | Status _close_internal(); |
622 | | io::FileReaderSPtr _reader; |
623 | | std::unique_ptr<char[]> _data; |
624 | | size_t _size; |
625 | | bool _closed = false; |
626 | | }; |
627 | | |
628 | | /** |
629 | | * Load all the needed data in underlying buffer, so the caller does not need to prepare the data container. |
630 | | */ |
631 | | class BufferedStreamReader { |
632 | | public: |
633 | | /** |
634 | | * Return the address of underlying buffer that locates the start of data between [offset, offset + bytes_to_read) |
635 | | * @param buf the buffer address to save the start address of data |
636 | | * @param offset start offset ot read in stream |
637 | | * @param bytes_to_read bytes to read |
638 | | */ |
639 | | virtual Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read, |
640 | | const IOContext* io_ctx) = 0; |
641 | | /** |
642 | | * Save the data address to slice.data, and the slice.size is the bytes to read. |
643 | | */ |
644 | | virtual Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) = 0; |
645 | 1.14k | virtual ~BufferedStreamReader() = default; |
646 | | // return the file path |
647 | | virtual std::string path() = 0; |
648 | | |
649 | | virtual int64_t mtime() const = 0; |
650 | | }; |
651 | | |
652 | | class BufferedFileStreamReader : public BufferedStreamReader, public ProfileCollector { |
653 | | public: |
654 | | BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, uint64_t length, |
655 | | size_t max_buf_size); |
656 | 1.13k | ~BufferedFileStreamReader() override = default; |
657 | | |
658 | | Status read_bytes(const uint8_t** buf, uint64_t offset, const size_t bytes_to_read, |
659 | | const IOContext* io_ctx) override; |
660 | | Status read_bytes(Slice& slice, uint64_t offset, const IOContext* io_ctx) override; |
661 | 1.13k | std::string path() override { return _file->path(); } |
662 | | |
663 | 1.13k | int64_t mtime() const override { return _file->mtime(); } |
664 | | |
665 | | protected: |
666 | 0 | void _collect_profile_before_close() override { |
667 | 0 | if (_file != nullptr) { |
668 | 0 | _file->collect_profile_before_close(); |
669 | 0 | } |
670 | 0 | } |
671 | | |
672 | | private: |
673 | | DorisUniqueBufferPtr<uint8_t> _buf; |
674 | | io::FileReaderSPtr _file; |
675 | | uint64_t _file_start_offset; |
676 | | uint64_t _file_end_offset; |
677 | | |
678 | | uint64_t _buf_start_offset = 0; |
679 | | uint64_t _buf_end_offset = 0; |
680 | | size_t _buf_size = 0; |
681 | | size_t _max_buf_size; |
682 | | }; |
683 | | |
684 | | } // namespace io |
685 | | |
686 | | #include "common/compile_check_end.h" |
687 | | |
688 | | } // namespace doris |