be/src/io/fs/buffered_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/fs/buffered_reader.h" |
19 | | |
20 | | #include <bvar/reducer.h> |
21 | | #include <bvar/window.h> |
22 | | #include <string.h> |
23 | | |
24 | | #include <algorithm> |
25 | | #include <chrono> |
26 | | #include <cstdint> |
27 | | #include <memory> |
28 | | |
29 | | #include "common/cast_set.h" |
30 | | #include "common/compiler_util.h" // IWYU pragma: keep |
31 | | #include "common/config.h" |
32 | | #include "common/status.h" |
33 | | #include "core/custom_allocator.h" |
34 | | #include "runtime/exec_env.h" |
35 | | #include "runtime/runtime_profile.h" |
36 | | #include "runtime/thread_context.h" |
37 | | #include "runtime/workload_management/io_throttle.h" |
38 | | #include "util/slice.h" |
39 | | #include "util/threadpool.h" |
40 | | namespace doris { |
41 | | |
42 | | #include "common/compile_check_begin.h" |
43 | | |
44 | | namespace io { |
45 | | struct IOContext; |
46 | | |
47 | | // add bvar to capture the download bytes per second by buffered reader |
48 | | bvar::Adder<uint64_t> g_bytes_downloaded("buffered_reader", "bytes_downloaded"); |
49 | | bvar::PerSecond<bvar::Adder<uint64_t>> g_bytes_downloaded_per_second("buffered_reader", |
50 | | "bytes_downloaded_per_second", |
51 | | &g_bytes_downloaded, 60); |
52 | | |
53 | | Status MergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
54 | 1.31k | const IOContext* io_ctx) { |
55 | 1.31k | _statistics.request_io++; |
56 | 1.31k | *bytes_read = 0; |
57 | 1.31k | if (result.size == 0) { |
58 | 0 | return Status::OK(); |
59 | 0 | } |
60 | 1.31k | const int range_index = _search_read_range(offset, offset + result.size); |
61 | 1.31k | if (range_index < 0) { |
62 | 0 | SCOPED_RAW_TIMER(&_statistics.read_time); |
63 | 0 | Status st = _reader->read_at(offset, result, bytes_read, io_ctx); |
64 | 0 | _statistics.merged_io++; |
65 | 0 | _statistics.request_bytes += *bytes_read; |
66 | 0 | _statistics.merged_bytes += *bytes_read; |
67 | 0 | return st; |
68 | 0 | } |
69 | 1.31k | if (offset + result.size > _random_access_ranges[range_index].end_offset) { |
70 | | // return _reader->read_at(offset, result, bytes_read, io_ctx); |
71 | 0 | return Status::IOError("Range in RandomAccessReader should be read sequentially"); |
72 | 0 | } |
73 | | |
74 | 1.31k | size_t has_read = 0; |
75 | 1.31k | RangeCachedData& cached_data = _range_cached_data[range_index]; |
76 | 1.31k | cached_data.has_read = true; |
77 | 1.31k | if (cached_data.contains(offset)) { |
78 | | // has cached data in box |
79 | 1.25k | _read_in_box(cached_data, offset, result, &has_read); |
80 | 1.25k | _statistics.request_bytes += has_read; |
81 | 1.25k | if (has_read == result.size) { |
82 | | // all data is read in cache |
83 | 1.25k | *bytes_read = has_read; |
84 | 1.25k | return Status::OK(); |
85 | 1.25k | } |
86 | 1.25k | } else if (!cached_data.empty()) { |
87 | | // the data in range may be skipped or ignored |
88 | 0 | for (int16_t box_index : cached_data.ref_box) { |
89 | 0 | _dec_box_ref(box_index); |
90 | 0 | } |
91 | 0 | cached_data.reset(); |
92 | 0 | } |
93 | | |
94 | 61 | size_t to_read = result.size - has_read; |
95 | 61 | if (to_read >= SMALL_IO || to_read >= _remaining) { |
96 | 0 | SCOPED_RAW_TIMER(&_statistics.read_time); |
97 | 0 | size_t read_size = 0; |
98 | 0 | RETURN_IF_ERROR(_reader->read_at(offset + has_read, Slice(result.data + has_read, to_read), |
99 | 0 | &read_size, io_ctx)); |
100 | 0 | *bytes_read = has_read + read_size; |
101 | 0 | _statistics.merged_io++; |
102 | 0 | _statistics.request_bytes += read_size; |
103 | 0 | _statistics.merged_bytes += read_size; |
104 | 0 | return Status::OK(); |
105 | 0 | } |
106 | | |
107 | | // merge small IO |
108 | 61 | size_t merge_start = offset + has_read; |
109 | 61 | const size_t merge_end = merge_start + _merged_read_slice_size; |
110 | | // <slice_size, is_content> |
111 | 61 | std::vector<std::pair<size_t, bool>> merged_slice; |
112 | 61 | size_t content_size = 0; |
113 | 61 | size_t hollow_size = 0; |
114 | 61 | if (merge_start > _random_access_ranges[range_index].end_offset) { |
115 | 0 | return Status::IOError("Fail to merge small IO"); |
116 | 0 | } |
117 | 61 | int merge_index = range_index; |
118 | 653 | while (merge_start < merge_end && merge_index < _random_access_ranges.size()) { |
119 | 609 | size_t content_max = _remaining - content_size; |
120 | 609 | if (content_max == 0) { |
121 | 0 | break; |
122 | 0 | } |
123 | 609 | if (merge_index != range_index && _range_cached_data[merge_index].has_read) { |
124 | | // don't read or merge twice |
125 | 0 | break; |
126 | 0 | } |
127 | 609 | if (_random_access_ranges[merge_index].end_offset > merge_end) { |
128 | 1 | size_t add_content = std::min(merge_end - merge_start, content_max); |
129 | 1 | content_size += add_content; |
130 | 1 | merge_start += add_content; |
131 | 1 | merged_slice.emplace_back(add_content, true); |
132 | 1 | break; |
133 | 1 | } |
134 | 608 | size_t add_content = |
135 | 608 | std::min(_random_access_ranges[merge_index].end_offset - merge_start, content_max); |
136 | 608 | content_size += add_content; |
137 | 608 | merge_start += add_content; |
138 | 608 | merged_slice.emplace_back(add_content, true); |
139 | 608 | if (merge_start != _random_access_ranges[merge_index].end_offset) { |
140 | 0 | break; |
141 | 0 | } |
142 | 608 | if (merge_index < _random_access_ranges.size() - 1 && merge_start < merge_end) { |
143 | 564 | size_t gap = _random_access_ranges[merge_index + 1].start_offset - |
144 | 564 | _random_access_ranges[merge_index].end_offset; |
145 | 564 | if ((content_size + hollow_size) > SMALL_IO && gap >= SMALL_IO) { |
146 | | // too large gap |
147 | 0 | break; |
148 | 0 | } |
149 | 564 | if (gap < merge_end - merge_start && content_size < _remaining && |
150 | 564 | !_range_cached_data[merge_index + 1].has_read) { |
151 | 548 | hollow_size += gap; |
152 | 548 | merge_start = _random_access_ranges[merge_index + 1].start_offset; |
153 | 548 | merged_slice.emplace_back(gap, false); |
154 | 548 | } else { |
155 | | // there's no enough memory to read hollow data |
156 | 16 | break; |
157 | 16 | } |
158 | 564 | } |
159 | 592 | merge_index++; |
160 | 592 | } |
161 | 61 | content_size = 0; |
162 | 61 | hollow_size = 0; |
163 | 61 | std::vector<std::pair<double, size_t>> ratio_and_size; |
164 | | // Calculate the read amplified ratio for each merge operation and the size of the merged data. |
165 | | // Find the largest size of the merged data whose amplified ratio is less than config::max_amplified_read_ratio |
166 | 1.15k | for (const std::pair<size_t, bool>& slice : merged_slice) { |
167 | 1.15k | if (slice.second) { |
168 | 609 | content_size += slice.first; |
169 | 609 | if (slice.first > 0) { |
170 | 609 | ratio_and_size.emplace_back((double)hollow_size / (double)content_size, |
171 | 609 | content_size + hollow_size); |
172 | 609 | } |
173 | 609 | } else { |
174 | 548 | hollow_size += slice.first; |
175 | 548 | } |
176 | 1.15k | } |
177 | 61 | size_t best_merged_size = 0; |
178 | 670 | for (int i = 0; i < ratio_and_size.size(); ++i) { |
179 | 609 | const std::pair<double, size_t>& rs = ratio_and_size[i]; |
180 | 609 | size_t equivalent_size = rs.second / (i + 1); |
181 | 609 | if (rs.second > best_merged_size) { |
182 | 609 | if (rs.first <= _max_amplified_ratio || |
183 | 609 | (_max_amplified_ratio < 1 && equivalent_size <= _equivalent_io_size)) { |
184 | 609 | best_merged_size = rs.second; |
185 | 609 | } |
186 | 609 | } |
187 | 609 | } |
188 | | |
189 | 61 | if (best_merged_size == to_read) { |
190 | | // read directly to avoid copy operation |
191 | 3 | SCOPED_RAW_TIMER(&_statistics.read_time); |
192 | 3 | size_t read_size = 0; |
193 | 3 | RETURN_IF_ERROR(_reader->read_at(offset + has_read, Slice(result.data + has_read, to_read), |
194 | 3 | &read_size, io_ctx)); |
195 | 3 | *bytes_read = has_read + read_size; |
196 | 3 | _statistics.merged_io++; |
197 | 3 | _statistics.request_bytes += read_size; |
198 | 3 | _statistics.merged_bytes += read_size; |
199 | 3 | return Status::OK(); |
200 | 3 | } |
201 | | |
202 | 58 | merge_start = offset + has_read; |
203 | 58 | size_t merge_read_size = 0; |
204 | 58 | RETURN_IF_ERROR( |
205 | 58 | _fill_box(range_index, merge_start, best_merged_size, &merge_read_size, io_ctx)); |
206 | 58 | if (cached_data.start_offset != merge_start) { |
207 | 0 | return Status::IOError("Wrong start offset in merged IO"); |
208 | 0 | } |
209 | | |
210 | | // read from cached data |
211 | 58 | size_t box_read_size = 0; |
212 | 58 | _read_in_box(cached_data, merge_start, Slice(result.data + has_read, to_read), &box_read_size); |
213 | 58 | *bytes_read = has_read + box_read_size; |
214 | 58 | _statistics.request_bytes += box_read_size; |
215 | 58 | if (*bytes_read < result.size && box_read_size < merge_read_size) { |
216 | 0 | return Status::IOError("Can't read enough bytes in merged IO"); |
217 | 0 | } |
218 | 58 | return Status::OK(); |
219 | 58 | } |
220 | | |
221 | 1.31k | int MergeRangeFileReader::_search_read_range(size_t start_offset, size_t end_offset) { |
222 | 1.31k | if (_random_access_ranges.empty()) { |
223 | 0 | return -1; |
224 | 0 | } |
225 | 1.31k | int left = 0, right = cast_set<int>(_random_access_ranges.size()) - 1; |
226 | 5.60k | do { |
227 | 5.60k | int mid = left + (right - left) / 2; |
228 | 5.60k | const PrefetchRange& range = _random_access_ranges[mid]; |
229 | 5.60k | if (range.start_offset <= start_offset && start_offset < range.end_offset) { |
230 | 1.31k | if (range.start_offset <= end_offset && end_offset <= range.end_offset) { |
231 | 1.31k | return mid; |
232 | 1.31k | } else { |
233 | 0 | return -1; |
234 | 0 | } |
235 | 4.28k | } else if (range.start_offset > start_offset) { |
236 | 2.04k | right = mid - 1; |
237 | 2.24k | } else { |
238 | 2.24k | left = mid + 1; |
239 | 2.24k | } |
240 | 5.60k | } while (left <= right); |
241 | 0 | return -1; |
242 | 1.31k | } |
243 | | |
244 | 602 | void MergeRangeFileReader::_clean_cached_data(RangeCachedData& cached_data) { |
245 | 602 | if (!cached_data.empty()) { |
246 | 0 | for (int i = 0; i < cached_data.ref_box.size(); ++i) { |
247 | 0 | DCHECK_GT(cached_data.box_end_offset[i], cached_data.box_start_offset[i]); |
248 | 0 | int16_t box_index = cached_data.ref_box[i]; |
249 | 0 | DCHECK_GT(_box_ref[box_index], 0); |
250 | 0 | _box_ref[box_index]--; |
251 | 0 | } |
252 | 0 | } |
253 | 602 | cached_data.reset(); |
254 | 602 | } |
255 | | |
256 | 680 | void MergeRangeFileReader::_dec_box_ref(int16_t box_index) { |
257 | 680 | if (--_box_ref[box_index] == 0) { |
258 | 151 | _remaining += BOX_SIZE; |
259 | 151 | } |
260 | 680 | if (box_index == _last_box_ref) { |
261 | 44 | _last_box_ref = -1; |
262 | 44 | _last_box_usage = 0; |
263 | 44 | } |
264 | 680 | } |
265 | | |
266 | | void MergeRangeFileReader::_read_in_box(RangeCachedData& cached_data, size_t offset, Slice result, |
267 | 1.31k | size_t* bytes_read) { |
268 | 1.31k | SCOPED_RAW_TIMER(&_statistics.copy_time); |
269 | 1.31k | auto handle_in_box = [&](size_t remaining, char* copy_out) { |
270 | 1.31k | size_t to_handle = remaining; |
271 | 1.31k | int cleaned_box = 0; |
272 | 2.70k | for (int i = 0; i < cached_data.ref_box.size() && remaining > 0; ++i) { |
273 | 1.38k | int16_t box_index = cached_data.ref_box[i]; |
274 | 1.38k | size_t box_to_handle = std::min(remaining, (size_t)(cached_data.box_end_offset[i] - |
275 | 1.38k | cached_data.box_start_offset[i])); |
276 | 1.38k | if (copy_out != nullptr) { |
277 | 1.38k | } |
278 | 1.38k | if (copy_out != nullptr) { |
279 | 1.38k | memcpy(copy_out + to_handle - remaining, |
280 | 1.38k | _boxes[box_index].data() + cached_data.box_start_offset[i], box_to_handle); |
281 | 1.38k | } |
282 | 1.38k | remaining -= box_to_handle; |
283 | 1.38k | cached_data.box_start_offset[i] += box_to_handle; |
284 | 1.38k | if (cached_data.box_start_offset[i] == cached_data.box_end_offset[i]) { |
285 | 680 | cleaned_box++; |
286 | 680 | _dec_box_ref(box_index); |
287 | 680 | } |
288 | 1.38k | } |
289 | 1.31k | DCHECK_EQ(remaining, 0); |
290 | 1.31k | if (cleaned_box > 0) { |
291 | 642 | cached_data.ref_box.erase(cached_data.ref_box.begin(), |
292 | 642 | cached_data.ref_box.begin() + cleaned_box); |
293 | 642 | cached_data.box_start_offset.erase(cached_data.box_start_offset.begin(), |
294 | 642 | cached_data.box_start_offset.begin() + cleaned_box); |
295 | 642 | cached_data.box_end_offset.erase(cached_data.box_end_offset.begin(), |
296 | 642 | cached_data.box_end_offset.begin() + cleaned_box); |
297 | 642 | } |
298 | 1.31k | cached_data.start_offset += to_handle; |
299 | 1.31k | if (cached_data.start_offset == cached_data.end_offset) { |
300 | 602 | _clean_cached_data(cached_data); |
301 | 602 | } |
302 | 1.31k | }; |
303 | | |
304 | 1.31k | if (offset > cached_data.start_offset) { |
305 | | // the data in range may be skipped |
306 | 0 | size_t to_skip = offset - cached_data.start_offset; |
307 | 0 | handle_in_box(to_skip, nullptr); |
308 | 0 | } |
309 | | |
310 | 1.31k | size_t to_read = std::min(cached_data.end_offset - cached_data.start_offset, result.size); |
311 | 1.31k | handle_in_box(to_read, result.data); |
312 | 1.31k | *bytes_read = to_read; |
313 | 1.31k | } |
314 | | |
315 | | Status MergeRangeFileReader::_fill_box(int range_index, size_t start_offset, size_t to_read, |
316 | 58 | size_t* bytes_read, const IOContext* io_ctx) { |
317 | 58 | if (!_read_slice) { |
318 | 42 | _read_slice = std::make_unique<OwnedSlice>(_merged_read_slice_size); |
319 | 42 | } |
320 | | |
321 | 58 | *bytes_read = 0; |
322 | 58 | { |
323 | 58 | SCOPED_RAW_TIMER(&_statistics.read_time); |
324 | 58 | RETURN_IF_ERROR(_reader->read_at(start_offset, Slice(_read_slice->data(), to_read), |
325 | 58 | bytes_read, io_ctx)); |
326 | 58 | _statistics.merged_io++; |
327 | 58 | _statistics.merged_bytes += *bytes_read; |
328 | 58 | } |
329 | | |
330 | 58 | SCOPED_RAW_TIMER(&_statistics.copy_time); |
331 | 58 | size_t copy_start = start_offset; |
332 | 58 | const size_t copy_end = start_offset + *bytes_read; |
333 | | // copy data into small boxes |
334 | | // tuple(box_index, box_start_offset, file_start_offset, file_end_offset) |
335 | 58 | std::vector<std::tuple<int16_t, uint32_t, size_t, size_t>> filled_boxes; |
336 | | |
337 | 684 | auto fill_box = [&](int16_t fill_box_ref, uint32_t box_usage, size_t box_copy_end) { |
338 | 684 | size_t copy_size = std::min(box_copy_end - copy_start, BOX_SIZE - box_usage); |
339 | 684 | memcpy(_boxes[fill_box_ref].data() + box_usage, |
340 | 684 | _read_slice->data() + copy_start - start_offset, copy_size); |
341 | 684 | filled_boxes.emplace_back(fill_box_ref, box_usage, copy_start, copy_start + copy_size); |
342 | 684 | copy_start += copy_size; |
343 | 684 | _last_box_ref = fill_box_ref; |
344 | 684 | _last_box_usage = box_usage + cast_set<int>(copy_size); |
345 | 684 | _box_ref[fill_box_ref]++; |
346 | 684 | if (box_usage == 0) { |
347 | 152 | _remaining -= BOX_SIZE; |
348 | 152 | } |
349 | 684 | }; |
350 | | |
351 | 58 | for (int fill_range_index = range_index; |
352 | 664 | fill_range_index < _random_access_ranges.size() && copy_start < copy_end; |
353 | 606 | ++fill_range_index) { |
354 | 606 | RangeCachedData& fill_range_cache = _range_cached_data[fill_range_index]; |
355 | 606 | DCHECK(fill_range_cache.empty()); |
356 | 606 | fill_range_cache.reset(); |
357 | 606 | const PrefetchRange& fill_range = _random_access_ranges[fill_range_index]; |
358 | 606 | if (fill_range.start_offset > copy_start) { |
359 | | // don't copy hollow data |
360 | 178 | size_t hollow_size = fill_range.start_offset - copy_start; |
361 | 178 | DCHECK_GT(copy_end - copy_start, hollow_size); |
362 | 178 | copy_start += hollow_size; |
363 | 178 | } |
364 | | |
365 | 606 | const size_t range_copy_end = std::min(copy_end, fill_range.end_offset); |
366 | | // reuse the remaining capacity of last box |
367 | 606 | if (_last_box_ref >= 0 && _last_box_usage < BOX_SIZE) { |
368 | 532 | fill_box(_last_box_ref, _last_box_usage, range_copy_end); |
369 | 532 | } |
370 | | // reuse the former released box |
371 | 1.94k | for (int16_t i = 0; i < _boxes.size() && copy_start < range_copy_end; ++i) { |
372 | 1.33k | if (_box_ref[i] == 0) { |
373 | 8 | fill_box(i, 0, range_copy_end); |
374 | 8 | } |
375 | 1.33k | } |
376 | | // apply for new box to copy data |
377 | 750 | while (copy_start < range_copy_end && _boxes.size() < NUM_BOX) { |
378 | 144 | _boxes.emplace_back(BOX_SIZE); |
379 | 144 | _box_ref.emplace_back(0); |
380 | 144 | fill_box(cast_set<int16_t>(_boxes.size()) - 1, 0, range_copy_end); |
381 | 144 | } |
382 | 606 | DCHECK_EQ(copy_start, range_copy_end); |
383 | | |
384 | 606 | if (!filled_boxes.empty()) { |
385 | 606 | fill_range_cache.start_offset = std::get<2>(filled_boxes[0]); |
386 | 606 | fill_range_cache.end_offset = std::get<3>(filled_boxes.back()); |
387 | 684 | for (auto& tuple : filled_boxes) { |
388 | 684 | fill_range_cache.ref_box.emplace_back(std::get<0>(tuple)); |
389 | 684 | fill_range_cache.box_start_offset.emplace_back(std::get<1>(tuple)); |
390 | 684 | fill_range_cache.box_end_offset.emplace_back( |
391 | 684 | std::get<1>(tuple) + std::get<3>(tuple) - std::get<2>(tuple)); |
392 | 684 | } |
393 | 606 | filled_boxes.clear(); |
394 | 606 | } |
395 | 606 | } |
396 | 58 | return Status::OK(); |
397 | 58 | } |
398 | | |
399 | | // there exists occasions where the buffer is already closed but |
400 | | // some prior tasks are still queued in thread pool, so we have to check whether |
401 | | // the buffer is closed each time the condition variable is notified. |
402 | 267 | void PrefetchBuffer::reset_offset(size_t offset) { |
403 | 267 | { |
404 | 267 | std::unique_lock lck {_lock}; |
405 | 267 | if (!_prefetched.wait_for( |
406 | 267 | lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), |
407 | 267 | [this]() { return _buffer_status != BufferStatus::PENDING; })) { |
408 | 0 | _prefetch_status = Status::TimedOut("time out when reset prefetch buffer"); |
409 | 0 | return; |
410 | 0 | } |
411 | 267 | if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { |
412 | 0 | _prefetched.notify_all(); |
413 | 0 | return; |
414 | 0 | } |
415 | 267 | _buffer_status = BufferStatus::RESET; |
416 | 267 | _offset = offset; |
417 | 267 | _prefetched.notify_all(); |
418 | 267 | } |
419 | 267 | if (UNLIKELY(offset >= _file_range.end_offset)) { |
420 | 167 | _len = 0; |
421 | 167 | _exceed = true; |
422 | 167 | return; |
423 | 167 | } else { |
424 | 100 | _exceed = false; |
425 | 100 | } |
426 | 100 | _prefetch_status = ExecEnv::GetInstance()->buffered_reader_prefetch_thread_pool()->submit_func( |
427 | 100 | [buffer_ptr = shared_from_this()]() { buffer_ptr->prefetch_buffer(); }); |
428 | 100 | } |
429 | | |
430 | | // only this function would run concurrently in another thread |
431 | 100 | void PrefetchBuffer::prefetch_buffer() { |
432 | 100 | { |
433 | 100 | std::unique_lock lck {_lock}; |
434 | 100 | if (!_prefetched.wait_for( |
435 | 100 | lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), |
436 | 100 | [this]() { |
437 | 100 | return _buffer_status == BufferStatus::RESET || |
438 | 100 | _buffer_status == BufferStatus::CLOSED; |
439 | 100 | })) { |
440 | 0 | _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer"); |
441 | 0 | return; |
442 | 0 | } |
443 | | // in case buffer is already closed |
444 | 100 | if (UNLIKELY(_buffer_status == BufferStatus::CLOSED)) { |
445 | 0 | _prefetched.notify_all(); |
446 | 0 | return; |
447 | 0 | } |
448 | 100 | _buffer_status = BufferStatus::PENDING; |
449 | 100 | _prefetched.notify_all(); |
450 | 100 | } |
451 | | |
452 | 0 | int read_range_index = search_read_range(_offset); |
453 | 100 | size_t buf_size; |
454 | 100 | if (read_range_index == -1) { |
455 | 100 | buf_size = |
456 | 100 | _file_range.end_offset - _offset > _size ? _size : _file_range.end_offset - _offset; |
457 | 100 | } else { |
458 | 0 | buf_size = merge_small_ranges(_offset, read_range_index); |
459 | 0 | } |
460 | | |
461 | 100 | _len = 0; |
462 | 100 | Status s; |
463 | | |
464 | 100 | { |
465 | 100 | SCOPED_RAW_TIMER(&_statis.read_time); |
466 | 100 | s = _reader->read_at(_offset, Slice {_buf.get(), buf_size}, &_len, _io_ctx); |
467 | 100 | } |
468 | 100 | if (UNLIKELY(s.ok() && buf_size != _len)) { |
469 | | // This indicates that the data size returned by S3 object storage is smaller than what we requested, |
470 | | // which seems to be a violation of the S3 protocol since our request range was valid. |
471 | | // We currently consider this situation a bug and will treat this task as a failure. |
472 | 0 | s = Status::InternalError("Data size returned by S3 is smaller than requested"); |
473 | 0 | LOG(WARNING) << "Data size returned by S3 is smaller than requested" << _reader->path() |
474 | 0 | << " request bytes " << buf_size << " returned size " << _len; |
475 | 0 | } |
476 | 100 | g_bytes_downloaded << _len; |
477 | 100 | _statis.prefetch_request_io += 1; |
478 | 100 | _statis.prefetch_request_bytes += _len; |
479 | 100 | std::unique_lock lck {_lock}; |
480 | 100 | if (!_prefetched.wait_for(lck, |
481 | 100 | std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), |
482 | 100 | [this]() { return _buffer_status == BufferStatus::PENDING; })) { |
483 | 0 | _prefetch_status = Status::TimedOut("time out when invoking prefetch buffer"); |
484 | 0 | return; |
485 | 0 | } |
486 | 100 | if (!s.ok() && _offset < _reader->size()) { |
487 | | // We should print the error msg since this buffer might not be accessed by the consumer |
488 | | // which would result in the status being missed |
489 | 0 | LOG_WARNING("prefetch path {} failed, offset {}, error {}", _reader->path().native(), |
490 | 0 | _offset, s.to_string()); |
491 | 0 | _prefetch_status = std::move(s); |
492 | 0 | } |
493 | 100 | _buffer_status = BufferStatus::PREFETCHED; |
494 | 100 | _prefetched.notify_all(); |
495 | | // eof would come up with len == 0, it would be handled by read_buffer |
496 | 100 | } |
497 | | |
498 | 100 | int PrefetchBuffer::search_read_range(size_t off) const { |
499 | 100 | if (_random_access_ranges == nullptr || _random_access_ranges->empty()) { |
500 | 100 | return -1; |
501 | 100 | } |
502 | 0 | const std::vector<PrefetchRange>& random_access_ranges = *_random_access_ranges; |
503 | 0 | int left = 0, right = cast_set<int>(random_access_ranges.size()) - 1; |
504 | 0 | do { |
505 | 0 | int mid = left + (right - left) / 2; |
506 | 0 | const PrefetchRange& range = random_access_ranges[mid]; |
507 | 0 | if (range.start_offset <= off && range.end_offset > off) { |
508 | 0 | return mid; |
509 | 0 | } else if (range.start_offset > off) { |
510 | 0 | right = mid; |
511 | 0 | } else { |
512 | 0 | left = mid + 1; |
513 | 0 | } |
514 | 0 | } while (left < right); |
515 | 0 | if (random_access_ranges[right].start_offset > off) { |
516 | 0 | return right; |
517 | 0 | } else { |
518 | 0 | return -1; |
519 | 0 | } |
520 | 0 | } |
521 | | |
522 | 0 | size_t PrefetchBuffer::merge_small_ranges(size_t off, int range_index) const { |
523 | 0 | if (_random_access_ranges == nullptr || _random_access_ranges->empty()) { |
524 | 0 | return _size; |
525 | 0 | } |
526 | 0 | int64_t remaining = _size; |
527 | 0 | const std::vector<PrefetchRange>& random_access_ranges = *_random_access_ranges; |
528 | 0 | while (remaining > 0 && range_index < random_access_ranges.size()) { |
529 | 0 | const PrefetchRange& range = random_access_ranges[range_index]; |
530 | 0 | if (range.start_offset <= off && range.end_offset > off) { |
531 | 0 | remaining -= range.end_offset - off; |
532 | 0 | off = range.end_offset; |
533 | 0 | range_index++; |
534 | 0 | } else if (range.start_offset > off) { |
535 | | // merge small range |
536 | 0 | size_t hollow = range.start_offset - off; |
537 | 0 | if (hollow < remaining) { |
538 | 0 | remaining -= hollow; |
539 | 0 | off = range.start_offset; |
540 | 0 | } else { |
541 | 0 | break; |
542 | 0 | } |
543 | 0 | } else { |
544 | 0 | DCHECK(false); |
545 | 0 | } |
546 | 0 | } |
547 | 0 | if (remaining < 0 || remaining == _size) { |
548 | 0 | remaining = 0; |
549 | 0 | } |
550 | 0 | return _size - remaining; |
551 | 0 | } |
552 | | |
553 | | Status PrefetchBuffer::read_buffer(size_t off, const char* out, size_t buf_len, |
554 | 131 | size_t* bytes_read) { |
555 | 131 | if (UNLIKELY(off >= _file_range.end_offset)) { |
556 | | // Reader can read out of [start_offset, end_offset) by synchronous method. |
557 | 0 | return _reader->read_at(off, Slice {out, buf_len}, bytes_read, _io_ctx); |
558 | 0 | } |
559 | 131 | if (_exceed) { |
560 | 0 | reset_offset((off / _size) * _size); |
561 | 0 | return read_buffer(off, out, buf_len, bytes_read); |
562 | 0 | } |
563 | 131 | auto start = std::chrono::steady_clock::now(); |
564 | | // The baseline time is calculated by dividing the size of each buffer by MB/s. |
565 | | // If it exceeds this value, it is considered a slow I/O operation. |
566 | 131 | constexpr auto read_time_baseline = std::chrono::seconds(s_max_pre_buffer_size / 1024 / 1024); |
567 | 131 | { |
568 | 131 | std::unique_lock lck {_lock}; |
569 | | // buffer must be prefetched or it's closed |
570 | 131 | if (!_prefetched.wait_for( |
571 | 131 | lck, std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), |
572 | 209 | [this]() { |
573 | 209 | return _buffer_status == BufferStatus::PREFETCHED || |
574 | 209 | _buffer_status == BufferStatus::CLOSED; |
575 | 209 | })) { |
576 | 0 | _prefetch_status = Status::TimedOut("time out when read prefetch buffer"); |
577 | 0 | return _prefetch_status; |
578 | 0 | } |
579 | 131 | if (UNLIKELY(BufferStatus::CLOSED == _buffer_status)) { |
580 | 0 | return Status::OK(); |
581 | 0 | } |
582 | 131 | } |
583 | 131 | auto duration = std::chrono::duration_cast<std::chrono::seconds>( |
584 | 131 | std::chrono::steady_clock::now() - start); |
585 | 131 | if (duration > read_time_baseline) [[unlikely]] { |
586 | 0 | LOG_WARNING("The prefetch io is too slow"); |
587 | 0 | } |
588 | 131 | RETURN_IF_ERROR(_prefetch_status); |
589 | | // there is only parquet would do not sequence read |
590 | | // it would read the end of the file first |
591 | 131 | if (UNLIKELY(!contains(off))) { |
592 | 0 | reset_offset((off / _size) * _size); |
593 | 0 | return read_buffer(off, out, buf_len, bytes_read); |
594 | 0 | } |
595 | 131 | if (UNLIKELY(0 == _len || _offset + _len < off)) { |
596 | 0 | return Status::OK(); |
597 | 0 | } |
598 | | |
599 | 131 | { |
600 | 131 | LIMIT_REMOTE_SCAN_IO(bytes_read); |
601 | | // [0]: maximum len trying to read, [1] maximum length buffer can provide, [2] actual len buffer has |
602 | 131 | size_t read_len = std::min({buf_len, _offset + _size - off, _offset + _len - off}); |
603 | 131 | { |
604 | 131 | SCOPED_RAW_TIMER(&_statis.copy_time); |
605 | 131 | memcpy((void*)out, _buf.get() + (off - _offset), read_len); |
606 | 131 | } |
607 | 131 | *bytes_read = read_len; |
608 | 131 | _statis.request_io += 1; |
609 | 131 | _statis.request_bytes += read_len; |
610 | 131 | } |
611 | 131 | if (off + *bytes_read == _offset + _len) { |
612 | 91 | reset_offset(_offset + _whole_buffer_size); |
613 | 91 | } |
614 | 131 | return Status::OK(); |
615 | 131 | } |
616 | | |
617 | 176 | void PrefetchBuffer::close() { |
618 | 176 | std::unique_lock lck {_lock}; |
619 | | // in case _reader still tries to write to the buf after we close the buffer |
620 | 176 | if (!_prefetched.wait_for(lck, |
621 | 176 | std::chrono::milliseconds(config::buffered_reader_read_timeout_ms), |
622 | 177 | [this]() { return _buffer_status != BufferStatus::PENDING; })) { |
623 | 0 | _prefetch_status = Status::TimedOut("time out when close prefetch buffer"); |
624 | 0 | return; |
625 | 0 | } |
626 | 176 | _buffer_status = BufferStatus::CLOSED; |
627 | 176 | _prefetched.notify_all(); |
628 | 176 | } |
629 | | |
630 | 72 | void PrefetchBuffer::_collect_profile_before_close() { |
631 | 72 | if (_sync_profile != nullptr) { |
632 | 72 | _sync_profile(*this); |
633 | 72 | } |
634 | 72 | } |
635 | | |
636 | | // buffered reader |
637 | | PrefetchBufferedReader::PrefetchBufferedReader(RuntimeProfile* profile, io::FileReaderSPtr reader, |
638 | | PrefetchRange file_range, |
639 | | std::shared_ptr<const IOContext> io_ctx, |
640 | | int64_t buffer_size) |
641 | 44 | : _reader(std::move(reader)), _file_range(file_range), _io_ctx_holder(std::move(io_ctx)) { |
642 | 44 | if (_io_ctx_holder == nullptr) { |
643 | 4 | _io_ctx_holder = std::make_shared<IOContext>(); |
644 | 4 | } |
645 | 44 | _io_ctx = _io_ctx_holder.get(); |
646 | 44 | if (buffer_size == -1L) { |
647 | 44 | buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024; |
648 | 44 | } |
649 | 44 | _size = _reader->size(); |
650 | 44 | _whole_pre_buffer_size = buffer_size; |
651 | 44 | _file_range.end_offset = std::min(_file_range.end_offset, _size); |
652 | 44 | int buffer_num = buffer_size > s_max_pre_buffer_size |
653 | 44 | ? cast_set<int>(buffer_size) / cast_set<int>(s_max_pre_buffer_size) |
654 | 44 | : 1; |
655 | 44 | std::function<void(PrefetchBuffer&)> sync_buffer = nullptr; |
656 | 44 | if (profile != nullptr) { |
657 | 40 | const char* prefetch_buffered_reader = "PrefetchBufferedReader"; |
658 | 40 | ADD_TIMER(profile, prefetch_buffered_reader); |
659 | 40 | auto copy_time = ADD_CHILD_TIMER(profile, "CopyTime", prefetch_buffered_reader); |
660 | 40 | auto read_time = ADD_CHILD_TIMER(profile, "ReadTime", prefetch_buffered_reader); |
661 | 40 | auto prefetch_request_io = |
662 | 40 | ADD_CHILD_COUNTER(profile, "PreRequestIO", TUnit::UNIT, prefetch_buffered_reader); |
663 | 40 | auto prefetch_request_bytes = ADD_CHILD_COUNTER(profile, "PreRequestBytes", TUnit::BYTES, |
664 | 40 | prefetch_buffered_reader); |
665 | 40 | auto request_io = |
666 | 40 | ADD_CHILD_COUNTER(profile, "RequestIO", TUnit::UNIT, prefetch_buffered_reader); |
667 | 40 | auto request_bytes = |
668 | 40 | ADD_CHILD_COUNTER(profile, "RequestBytes", TUnit::BYTES, prefetch_buffered_reader); |
669 | 72 | sync_buffer = [=](PrefetchBuffer& buf) { |
670 | 72 | COUNTER_UPDATE(copy_time, buf._statis.copy_time); |
671 | 72 | COUNTER_UPDATE(read_time, buf._statis.read_time); |
672 | 72 | COUNTER_UPDATE(prefetch_request_io, buf._statis.prefetch_request_io); |
673 | 72 | COUNTER_UPDATE(prefetch_request_bytes, buf._statis.prefetch_request_bytes); |
674 | 72 | COUNTER_UPDATE(request_io, buf._statis.request_io); |
675 | 72 | COUNTER_UPDATE(request_bytes, buf._statis.request_bytes); |
676 | 72 | }; |
677 | 40 | } |
678 | | // set the _cur_offset of this reader as same as the inner reader's, |
679 | | // to make sure the buffer reader will start to read at right position. |
680 | 220 | for (int i = 0; i < buffer_num; i++) { |
681 | 176 | _pre_buffers.emplace_back(std::make_shared<PrefetchBuffer>( |
682 | 176 | _file_range, s_max_pre_buffer_size, _whole_pre_buffer_size, _reader.get(), |
683 | 176 | _io_ctx_holder, sync_buffer)); |
684 | 176 | } |
685 | 44 | } |
686 | | |
687 | 44 | PrefetchBufferedReader::~PrefetchBufferedReader() { |
688 | | /// Better not to call virtual functions in a destructor. |
689 | 44 | static_cast<void>(_close_internal()); |
690 | 44 | } |
691 | | |
692 | | Status PrefetchBufferedReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
693 | 104 | const IOContext* io_ctx) { |
694 | 104 | if (!_initialized) { |
695 | 44 | reset_all_buffer(offset); |
696 | 44 | _initialized = true; |
697 | 44 | } |
698 | 104 | if (UNLIKELY(result.get_size() == 0 || offset >= size())) { |
699 | 5 | *bytes_read = 0; |
700 | 5 | return Status::OK(); |
701 | 5 | } |
702 | 99 | size_t nbytes = result.get_size(); |
703 | 99 | int actual_bytes_read = 0; |
704 | 230 | while (actual_bytes_read < nbytes && offset < size()) { |
705 | 131 | size_t read_num = 0; |
706 | 131 | auto buffer_pos = get_buffer_pos(offset); |
707 | 131 | RETURN_IF_ERROR( |
708 | 131 | _pre_buffers[buffer_pos]->read_buffer(offset, result.get_data() + actual_bytes_read, |
709 | 131 | nbytes - actual_bytes_read, &read_num)); |
710 | 131 | actual_bytes_read += read_num; |
711 | 131 | offset += read_num; |
712 | 131 | } |
713 | 99 | *bytes_read = actual_bytes_read; |
714 | 99 | return Status::OK(); |
715 | 99 | } |
716 | | |
717 | 6 | Status PrefetchBufferedReader::close() { |
718 | 6 | return _close_internal(); |
719 | 6 | } |
720 | | |
721 | 50 | Status PrefetchBufferedReader::_close_internal() { |
722 | 50 | if (!_closed) { |
723 | 44 | _closed = true; |
724 | 44 | std::for_each(_pre_buffers.begin(), _pre_buffers.end(), |
725 | 176 | [](std::shared_ptr<PrefetchBuffer>& buffer) { buffer->close(); }); |
726 | 44 | return _reader->close(); |
727 | 44 | } |
728 | | |
729 | 6 | return Status::OK(); |
730 | 50 | } |
731 | | |
732 | 18 | void PrefetchBufferedReader::_collect_profile_before_close() { |
733 | 18 | std::for_each(_pre_buffers.begin(), _pre_buffers.end(), |
734 | 72 | [](std::shared_ptr<PrefetchBuffer>& buffer) { |
735 | 72 | buffer->collect_profile_before_close(); |
736 | 72 | }); |
737 | 18 | if (_reader != nullptr) { |
738 | 18 | _reader->collect_profile_before_close(); |
739 | 18 | } |
740 | 18 | } |
741 | | |
742 | | // InMemoryFileReader |
743 | 867 | InMemoryFileReader::InMemoryFileReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) { |
744 | 867 | _size = _reader->size(); |
745 | 867 | } |
746 | | |
747 | 867 | InMemoryFileReader::~InMemoryFileReader() { |
748 | 867 | static_cast<void>(_close_internal()); |
749 | 867 | } |
750 | | |
751 | 259 | Status InMemoryFileReader::close() { |
752 | 259 | return _close_internal(); |
753 | 259 | } |
754 | | |
755 | 1.12k | Status InMemoryFileReader::_close_internal() { |
756 | 1.12k | if (!_closed) { |
757 | 867 | _closed = true; |
758 | 867 | return _reader->close(); |
759 | 867 | } |
760 | 259 | return Status::OK(); |
761 | 1.12k | } |
762 | | |
763 | | Status InMemoryFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
764 | 2.58k | const IOContext* io_ctx) { |
765 | 2.58k | if (_data == nullptr) { |
766 | 865 | _data = std::make_unique_for_overwrite<char[]>(_size); |
767 | | |
768 | 865 | size_t file_size = 0; |
769 | 865 | RETURN_IF_ERROR(_reader->read_at(0, Slice(_data.get(), _size), &file_size, io_ctx)); |
770 | 865 | DCHECK_EQ(file_size, _size); |
771 | 865 | } |
772 | 2.58k | if (UNLIKELY(offset > _size)) { |
773 | 0 | return Status::IOError("Out of bounds access"); |
774 | 0 | } |
775 | 2.58k | *bytes_read = std::min(result.size, _size - offset); |
776 | 2.58k | memcpy(result.data, _data.get() + offset, *bytes_read); |
777 | 2.58k | return Status::OK(); |
778 | 2.58k | } |
779 | | |
780 | 278 | void InMemoryFileReader::_collect_profile_before_close() { |
781 | 278 | if (_reader != nullptr) { |
782 | 278 | _reader->collect_profile_before_close(); |
783 | 278 | } |
784 | 278 | } |
785 | | |
786 | | // BufferedFileStreamReader |
787 | | BufferedFileStreamReader::BufferedFileStreamReader(io::FileReaderSPtr file, uint64_t offset, |
788 | | uint64_t length, size_t max_buf_size) |
789 | 1.13k | : _file(file), |
790 | 1.13k | _file_start_offset(offset), |
791 | 1.13k | _file_end_offset(offset + length), |
792 | 1.13k | _max_buf_size(max_buf_size) {} |
793 | | |
794 | | Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset, |
795 | 4.05k | const size_t bytes_to_read, const IOContext* io_ctx) { |
796 | 4.05k | if (offset < _file_start_offset || offset >= _file_end_offset || |
797 | 4.05k | offset + bytes_to_read > _file_end_offset) { |
798 | 0 | return Status::IOError( |
799 | 0 | "Out-of-bounds Access: offset={}, bytes_to_read={}, file_start={}, " |
800 | 0 | "file_end={}", |
801 | 0 | offset, bytes_to_read, _file_start_offset, _file_end_offset); |
802 | 0 | } |
803 | 4.05k | int64_t end_offset = offset + bytes_to_read; |
804 | 4.05k | if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) { |
805 | 2.35k | *buf = _buf.get() + offset - _buf_start_offset; |
806 | 2.35k | return Status::OK(); |
807 | 2.35k | } |
808 | 1.69k | size_t buf_size = std::max(_max_buf_size, bytes_to_read); |
809 | 1.69k | if (_buf_size < buf_size) { |
810 | 1.35k | auto new_buf = make_unique_buffer<uint8_t>(buf_size); |
811 | 1.35k | if (offset >= _buf_start_offset && offset < _buf_end_offset) { |
812 | 302 | memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset, |
813 | 302 | _buf_end_offset - offset); |
814 | 302 | } |
815 | 1.35k | _buf = std::move(new_buf); |
816 | 1.35k | _buf_size = buf_size; |
817 | 1.35k | } else if (offset > _buf_start_offset && offset < _buf_end_offset) { |
818 | 105 | memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); |
819 | 105 | } |
820 | 1.69k | if (offset < _buf_start_offset || offset >= _buf_end_offset) { |
821 | 1.28k | _buf_end_offset = offset; |
822 | 1.28k | } |
823 | 1.69k | _buf_start_offset = offset; |
824 | 1.69k | int64_t buf_remaining = _buf_end_offset - _buf_start_offset; |
825 | 1.69k | int64_t to_read = std::min(_buf_size - buf_remaining, _file_end_offset - _buf_end_offset); |
826 | 1.69k | int64_t has_read = 0; |
827 | 3.38k | while (has_read < to_read) { |
828 | 1.69k | size_t loop_read = 0; |
829 | 1.69k | Slice result(_buf.get() + buf_remaining + has_read, to_read - has_read); |
830 | 1.69k | RETURN_IF_ERROR(_file->read_at(_buf_end_offset + has_read, result, &loop_read, io_ctx)); |
831 | 1.69k | if (loop_read == 0) { |
832 | 0 | break; |
833 | 0 | } |
834 | 1.69k | has_read += loop_read; |
835 | 1.69k | } |
836 | 1.69k | if (has_read != to_read) { |
837 | 0 | return Status::Corruption("Try to read {} bytes, but received {} bytes", to_read, has_read); |
838 | 0 | } |
839 | 1.69k | _buf_end_offset += to_read; |
840 | 1.69k | *buf = _buf.get(); |
841 | 1.69k | return Status::OK(); |
842 | 1.69k | } |
843 | | |
844 | | Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset, |
845 | 2.01k | const IOContext* io_ctx) { |
846 | 2.01k | return read_bytes((const uint8_t**)&slice.data, offset, slice.size, io_ctx); |
847 | 2.01k | } |
848 | | |
849 | | Result<io::FileReaderSPtr> DelegateReader::create_file_reader( |
850 | | RuntimeProfile* profile, const FileSystemProperties& system_properties, |
851 | | const FileDescription& file_description, const io::FileReaderOptions& reader_options, |
852 | 713 | AccessMode access_mode, const IOContext* io_ctx, const PrefetchRange file_range) { |
853 | 713 | std::shared_ptr<const IOContext> io_ctx_holder; |
854 | 713 | if (io_ctx != nullptr) { |
855 | | // Old API: best-effort safety by copying the IOContext onto the heap. |
856 | 671 | io_ctx_holder = std::make_shared<IOContext>(*io_ctx); |
857 | 671 | } |
858 | 713 | return create_file_reader(profile, system_properties, file_description, reader_options, |
859 | 713 | access_mode, std::move(io_ctx_holder), file_range); |
860 | 713 | } |
861 | | |
862 | | Result<io::FileReaderSPtr> DelegateReader::create_file_reader( |
863 | | RuntimeProfile* profile, const FileSystemProperties& system_properties, |
864 | | const FileDescription& file_description, const io::FileReaderOptions& reader_options, |
865 | | AccessMode access_mode, std::shared_ptr<const IOContext> io_ctx, |
866 | 1.08k | const PrefetchRange file_range) { |
867 | 1.08k | if (io_ctx == nullptr) { |
868 | 41 | io_ctx = std::make_shared<IOContext>(); |
869 | 41 | } |
870 | 1.08k | return FileFactory::create_file_reader(system_properties, file_description, reader_options, |
871 | 1.08k | profile) |
872 | 1.08k | .transform([&](auto&& reader) -> io::FileReaderSPtr { |
873 | 1.08k | if (reader->size() < config::in_memory_file_size && |
874 | 1.08k | typeid_cast<io::S3FileReader*>(reader.get())) { |
875 | 866 | return std::make_shared<InMemoryFileReader>(std::move(reader)); |
876 | 866 | } |
877 | | |
878 | 214 | if (access_mode == AccessMode::SEQUENTIAL) { |
879 | 78 | bool is_thread_safe = false; |
880 | 78 | if (typeid_cast<io::S3FileReader*>(reader.get())) { |
881 | 40 | is_thread_safe = true; |
882 | 40 | } else if (auto* cached_reader = |
883 | 38 | typeid_cast<io::CachedRemoteFileReader*>(reader.get()); |
884 | 38 | cached_reader && |
885 | 38 | typeid_cast<io::S3FileReader*>(cached_reader->get_remote_reader())) { |
886 | 0 | is_thread_safe = true; |
887 | 0 | } |
888 | 78 | if (is_thread_safe) { |
889 | | // PrefetchBufferedReader needs thread-safe reader to prefetch data concurrently. |
890 | 40 | return std::make_shared<io::PrefetchBufferedReader>( |
891 | 40 | profile, std::move(reader), file_range, io_ctx); |
892 | 40 | } |
893 | 78 | } |
894 | | |
895 | 174 | return reader; |
896 | 214 | }); |
897 | 1.08k | } |
898 | | |
899 | | Status LinearProbeRangeFinder::get_range_for(int64_t desired_offset, |
900 | 6 | io::PrefetchRange& result_range) { |
901 | 9 | while (index < _ranges.size()) { |
902 | 9 | io::PrefetchRange& range = _ranges[index]; |
903 | 9 | if (range.end_offset > desired_offset) { |
904 | 6 | if (range.start_offset > desired_offset) [[unlikely]] { |
905 | 0 | return Status::InvalidArgument("Invalid desiredOffset"); |
906 | 0 | } |
907 | 6 | result_range = range; |
908 | 6 | return Status::OK(); |
909 | 6 | } |
910 | 3 | ++index; |
911 | 3 | } |
912 | 0 | return Status::InvalidArgument("Invalid desiredOffset"); |
913 | 6 | } |
914 | | |
915 | | RangeCacheFileReader::RangeCacheFileReader(RuntimeProfile* profile, io::FileReaderSPtr inner_reader, |
916 | | std::shared_ptr<RangeFinder> range_finder) |
917 | 104 | : _profile(profile), |
918 | 104 | _inner_reader(std::move(inner_reader)), |
919 | 104 | _range_finder(std::move(range_finder)) { |
920 | 104 | _size = _inner_reader->size(); |
921 | 104 | uint64_t max_cache_size = |
922 | 104 | std::max((uint64_t)4096, (uint64_t)_range_finder->get_max_range_size()); |
923 | 104 | _cache = OwnedSlice(max_cache_size); |
924 | | |
925 | 104 | if (_profile != nullptr) { |
926 | 101 | const char* random_profile = "RangeCacheFileReader"; |
927 | 101 | ADD_TIMER_WITH_LEVEL(_profile, random_profile, 1); |
928 | 101 | _request_io = |
929 | 101 | ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestIO", TUnit::UNIT, random_profile, 1); |
930 | 101 | _request_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "RequestBytes", TUnit::BYTES, |
931 | 101 | random_profile, 1); |
932 | 101 | _request_time = ADD_CHILD_TIMER_WITH_LEVEL(_profile, "RequestTime", random_profile, 1); |
933 | 101 | _read_to_cache_time = |
934 | 101 | ADD_CHILD_TIMER_WITH_LEVEL(_profile, "ReadToCacheTime", random_profile, 1); |
935 | 101 | _cache_refresh_count = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "CacheRefreshCount", |
936 | 101 | TUnit::UNIT, random_profile, 1); |
937 | 101 | _read_to_cache_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(_profile, "ReadToCacheBytes", |
938 | 101 | TUnit::BYTES, random_profile, 1); |
939 | 101 | } |
940 | 104 | } |
941 | | |
942 | | Status RangeCacheFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
943 | 6 | const IOContext* io_ctx) { |
944 | 6 | auto request_size = result.size; |
945 | | |
946 | 6 | _cache_statistics.request_io++; |
947 | 6 | _cache_statistics.request_bytes += request_size; |
948 | 6 | SCOPED_RAW_TIMER(&_cache_statistics.request_time); |
949 | | |
950 | 6 | PrefetchRange range; |
951 | 6 | if (_range_finder->get_range_for(offset, range)) [[likely]] { |
952 | 6 | if (_current_start_offset != range.start_offset) { // need read new range to cache. |
953 | 6 | auto range_size = range.end_offset - range.start_offset; |
954 | | |
955 | 6 | _cache_statistics.cache_refresh_count++; |
956 | 6 | _cache_statistics.read_to_cache_bytes += range_size; |
957 | 6 | SCOPED_RAW_TIMER(&_cache_statistics.read_to_cache_time); |
958 | | |
959 | 6 | Slice cache_slice = {_cache.data(), range_size}; |
960 | 6 | RETURN_IF_ERROR( |
961 | 6 | _inner_reader->read_at(range.start_offset, cache_slice, bytes_read, io_ctx)); |
962 | | |
963 | 6 | if (*bytes_read != range_size) [[unlikely]] { |
964 | 0 | return Status::InternalError( |
965 | 0 | "RangeCacheFileReader use inner reader read bytes {} not eq expect size {}", |
966 | 0 | *bytes_read, range_size); |
967 | 0 | } |
968 | | |
969 | 6 | _current_start_offset = range.start_offset; |
970 | 6 | } |
971 | | |
972 | 6 | int64_t buffer_offset = offset - _current_start_offset; |
973 | 6 | memcpy(result.data, _cache.data() + buffer_offset, request_size); |
974 | 6 | *bytes_read = request_size; |
975 | | |
976 | 6 | return Status::OK(); |
977 | 6 | } else { |
978 | 0 | return Status::InternalError("RangeCacheFileReader read not in Ranges. Offset = {}", |
979 | 0 | offset); |
980 | | // RETURN_IF_ERROR(_inner_reader->read_at(offset, result , bytes_read, io_ctx)); |
981 | | // return Status::OK(); |
982 | | // think return error is ok,otherwise it will cover up the error. |
983 | 0 | } |
984 | 6 | } |
985 | | |
986 | 101 | void RangeCacheFileReader::_collect_profile_before_close() { |
987 | 101 | if (_profile != nullptr) { |
988 | 101 | COUNTER_UPDATE(_request_io, _cache_statistics.request_io); |
989 | 101 | COUNTER_UPDATE(_request_bytes, _cache_statistics.request_bytes); |
990 | 101 | COUNTER_UPDATE(_request_time, _cache_statistics.request_time); |
991 | 101 | COUNTER_UPDATE(_read_to_cache_time, _cache_statistics.read_to_cache_time); |
992 | 101 | COUNTER_UPDATE(_cache_refresh_count, _cache_statistics.cache_refresh_count); |
993 | 101 | COUNTER_UPDATE(_read_to_cache_bytes, _cache_statistics.read_to_cache_bytes); |
994 | 101 | if (_inner_reader != nullptr) { |
995 | 101 | _inner_reader->collect_profile_before_close(); |
996 | 101 | } |
997 | 101 | } |
998 | 101 | } |
999 | | |
1000 | | } // namespace io |
1001 | | |
1002 | | #include "common/compile_check_end.h" |
1003 | | |
1004 | | } // namespace doris |