be/src/io/cache/cached_remote_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/cache/cached_remote_file_reader.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <fmt/format.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <glog/logging.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <atomic> |
28 | | #include <condition_variable> |
29 | | #include <cstring> |
30 | | #include <functional> |
31 | | #include <list> |
32 | | #include <memory> |
33 | | #include <mutex> |
34 | | #include <thread> |
35 | | #include <vector> |
36 | | |
37 | | #include "cloud/cloud_warm_up_manager.h" |
38 | | #include "cloud/config.h" |
39 | | #include "common/compiler_util.h" // IWYU pragma: keep |
40 | | #include "common/config.h" |
41 | | #include "common/metrics/doris_metrics.h" |
42 | | #include "cpp/s3_rate_limiter.h" |
43 | | #include "cpp/sync_point.h" |
44 | | #include "io/cache/block_file_cache.h" |
45 | | #include "io/cache/block_file_cache_factory.h" |
46 | | #include "io/cache/block_file_cache_profile.h" |
47 | | #include "io/cache/file_block.h" |
48 | | #include "io/cache/file_cache_common.h" |
49 | | #include "io/cache/peer_file_cache_reader.h" |
50 | | #include "io/fs/file_reader.h" |
51 | | #include "io/fs/local_file_system.h" |
52 | | #include "io/io_common.h" |
53 | | #include "runtime/exec_env.h" |
54 | | #include "runtime/runtime_profile.h" |
55 | | #include "runtime/thread_context.h" |
56 | | #include "runtime/workload_management/io_throttle.h" |
57 | | #include "service/backend_options.h" |
58 | | #include "storage/storage_policy.h" |
59 | | #include "util/bit_util.h" |
60 | | #include "util/brpc_client_cache.h" // BrpcClientCache |
61 | | #include "util/client_cache.h" |
62 | | #include "util/concurrency_stats.h" |
63 | | #include "util/debug_points.h" |
64 | | |
65 | | namespace doris::io { |
66 | | |
67 | | bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read"); |
68 | | bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read"); |
69 | | bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); |
70 | | bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum"); |
71 | | bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes( |
72 | | "cached_remote_reader_skip_local_cache_io_sum_bytes"); |
73 | | bvar::Adder<uint64_t> g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num"); |
74 | | bvar::Adder<uint64_t> g_read_cache_direct_partial_num( |
75 | | "cached_remote_reader_cache_direct_partial_num"); |
76 | | bvar::Adder<uint64_t> g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num"); |
77 | | bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes( |
78 | | "cached_remote_reader_cache_direct_whole_bytes"); |
79 | | bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes( |
80 | | "cached_remote_reader_cache_direct_partial_bytes"); |
81 | | bvar::Adder<uint64_t> g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes"); |
82 | | bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes( |
83 | | "cached_remote_reader_cache_indirect_total_bytes"); |
84 | | bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found( |
85 | | "cached_remote_reader_self_heal_on_not_found"); |
86 | | bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window( |
87 | | "cached_remote_reader_indirect_bytes_1min_window", &g_read_cache_indirect_bytes, 60); |
88 | | bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_total_bytes_1min_window( |
89 | | "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes, |
90 | | 60); |
91 | | bvar::Adder<uint64_t> g_failed_get_peer_addr_counter( |
92 | | "cached_remote_reader_failed_get_peer_addr_counter"); |
93 | | |
94 | | CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, |
95 | | const FileReaderOptions& opts) |
96 | 2.01k | : _remote_file_reader(std::move(remote_file_reader)) { |
97 | 2.01k | _is_doris_table = opts.is_doris_table; |
98 | 2.01k | if (_is_doris_table) { |
99 | 2.01k | _cache_hash = BlockFileCache::hash(path().filename().native()); |
100 | 2.01k | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
101 | 2.01k | if (config::enable_read_cache_file_directly) { |
102 | | // this is designed for and test in doris table, external table need extra tests |
103 | 2.00k | _cache_file_readers = _cache->get_blocks_by_key(_cache_hash); |
104 | 2.00k | } |
105 | 2.01k | } else { |
106 | | // Use path and modification time to build cache key |
107 | 5 | std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime); |
108 | 5 | _cache_hash = BlockFileCache::hash(unique_path); |
109 | 5 | if (opts.cache_base_path.empty()) { |
110 | | // if cache path is not specified by session variable, chose randomly. |
111 | 1 | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
112 | 4 | } else { |
113 | | // from query session variable: file_cache_base_path |
114 | 4 | _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path); |
115 | 4 | if (_cache == nullptr) { |
116 | 1 | LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path |
117 | 1 | << ", using random instead."; |
118 | 1 | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
119 | 1 | } |
120 | 4 | } |
121 | 5 | } |
122 | 2.01k | } |
123 | | |
124 | 35 | void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) { |
125 | 35 | if (_is_doris_table && config::enable_read_cache_file_directly) { |
126 | 13 | std::lock_guard lock(_mtx); |
127 | 13 | DCHECK(file_block->state() == FileBlock::State::DOWNLOADED); |
128 | 13 | file_block->_owned_by_cached_reader = true; |
129 | 13 | _cache_file_readers.emplace(file_block->offset(), std::move(file_block)); |
130 | 13 | } |
131 | 35 | } |
132 | | |
133 | 2.01k | CachedRemoteFileReader::~CachedRemoteFileReader() { |
134 | 2.01k | for (auto& it : _cache_file_readers) { |
135 | 1.03k | it.second->_owned_by_cached_reader = false; |
136 | 1.03k | } |
137 | 2.01k | static_cast<void>(close()); |
138 | 2.01k | } |
139 | | |
140 | 3.03k | Status CachedRemoteFileReader::close() { |
141 | 3.03k | return _remote_file_reader->close(); |
142 | 3.03k | } |
143 | | |
144 | | std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, size_t read_size, |
145 | 1.03k | size_t length) { |
146 | 1.03k | size_t left = offset; |
147 | 1.03k | size_t right = offset + read_size - 1; |
148 | 1.03k | size_t align_left = |
149 | 1.03k | (left / config::file_cache_each_block_size) * config::file_cache_each_block_size; |
150 | 1.03k | size_t align_right = |
151 | 1.03k | (right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size; |
152 | 1.03k | align_right = align_right < length ? align_right : length; |
153 | 1.03k | size_t align_size = align_right - align_left; |
154 | 1.03k | if (align_size < config::file_cache_each_block_size && align_left != 0) { |
155 | 4 | align_size += config::file_cache_each_block_size; |
156 | 4 | align_left -= config::file_cache_each_block_size; |
157 | 4 | } |
158 | 1.03k | return std::make_pair(align_left, align_size); |
159 | 1.03k | } |
160 | | |
161 | | namespace { |
162 | 0 | std::optional<int64_t> extract_tablet_id(const std::string& file_path) { |
163 | 0 | return StorageResource::parse_tablet_id_from_path(file_path); |
164 | 0 | } |
165 | | |
166 | | // Get peer connection info from tablet_id |
167 | 0 | std::pair<std::string, int> get_peer_connection_info(const std::string& file_path) { |
168 | 0 | std::string host = ""; |
169 | 0 | int port = 0; |
170 | | |
171 | | // Try to get tablet_id from actual path and lookup tablet info |
172 | 0 | if (auto tablet_id = extract_tablet_id(file_path)) { |
173 | 0 | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
174 | 0 | if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) { |
175 | 0 | host = tablet_info->first; |
176 | 0 | port = tablet_info->second; |
177 | 0 | } else { |
178 | 0 | LOG_EVERY_N(WARNING, 100) |
179 | 0 | << "get peer connection info not found" |
180 | 0 | << ", tablet_id=" << *tablet_id << ", file_path=" << file_path; |
181 | 0 | } |
182 | 0 | } else { |
183 | 0 | LOG_EVERY_N(WARNING, 100) << "parse tablet id from path failed" |
184 | 0 | << "tablet_id=null, file_path=" << file_path; |
185 | 0 | } |
186 | |
|
187 | 0 | DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", { |
188 | 0 | host = dp->param<std::string>("host", "127.0.0.1"); |
189 | 0 | port = dp->param("port", 9060); |
190 | 0 | LOG_WARNING("debug point PeerFileCacheReader::_fetch_from_peer_cache_blocks") |
191 | 0 | .tag("host", host) |
192 | 0 | .tag("port", port); |
193 | 0 | }); |
194 | |
|
195 | 0 | return {host, port}; |
196 | 0 | } |
197 | | |
198 | | // Execute peer read with fallback to S3 |
199 | | // file_size is the size of the file |
200 | | // used to calculate the rightmost boundary value of the block, due to inaccurate current block meta information. |
201 | | Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, |
202 | | size_t& size, std::unique_ptr<char[]>& buffer, |
203 | | const std::string& file_path, size_t file_size, bool is_doris_table, |
204 | 0 | ReadStatistics& stats, const IOContext* io_ctx) { |
205 | 0 | auto [host, port] = get_peer_connection_info(file_path); |
206 | 0 | VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port |
207 | 0 | << ", file_path=" << file_path; |
208 | |
|
209 | 0 | if (host.empty() || port == 0) { |
210 | 0 | g_failed_get_peer_addr_counter << 1; |
211 | 0 | LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is empty" |
212 | 0 | << ", host=" << host << ", port=" << port |
213 | 0 | << ", file_path=" << file_path; |
214 | 0 | return Status::InternalError<false>("host or port is empty"); |
215 | 0 | } |
216 | 0 | SCOPED_RAW_TIMER(&stats.peer_read_timer); |
217 | 0 | peer_read_counter << 1; |
218 | 0 | PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port); |
219 | 0 | auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size, |
220 | 0 | file_size, io_ctx); |
221 | 0 | if (!st.ok()) { |
222 | 0 | LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader read from peer failed" |
223 | 0 | << ", host=" << host << ", port=" << port |
224 | 0 | << ", error=" << st.msg(); |
225 | 0 | } |
226 | 0 | stats.from_peer_cache = true; |
227 | 0 | return st; |
228 | 0 | } |
229 | | |
230 | | // Execute S3 read |
231 | | Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer, |
232 | | ReadStatistics& stats, const IOContext* io_ctx, |
233 | 20 | FileReaderSPtr remote_file_reader) { |
234 | 20 | s3_read_counter << 1; |
235 | 20 | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
236 | 20 | stats.from_peer_cache = false; |
237 | 20 | return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); |
238 | 20 | } |
239 | | |
240 | | } // anonymous namespace |
241 | | |
242 | | Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockSPtr>& empty_blocks, |
243 | | size_t empty_start, size_t& size, |
244 | | std::unique_ptr<char[]>& buffer, |
245 | | ReadStatistics& stats, |
246 | 20 | const IOContext* io_ctx) { |
247 | 20 | DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", { |
248 | | // Determine read type from debug point or default to S3 |
249 | 20 | std::string read_type = "s3"; |
250 | 20 | read_type = dp->param<std::string>("type", "s3"); |
251 | 20 | LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type") |
252 | 20 | .tag("path", path().native()) |
253 | 20 | .tag("off", empty_start) |
254 | 20 | .tag("size", size) |
255 | 20 | .tag("type", read_type); |
256 | | // Execute appropriate read strategy |
257 | 20 | if (read_type == "s3") { |
258 | 20 | return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); |
259 | 20 | } else { |
260 | 20 | return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), |
261 | 20 | this->size(), _is_doris_table, stats, io_ctx); |
262 | 20 | } |
263 | 20 | }); |
264 | | |
265 | 20 | if (!doris::config::is_cloud_mode() || !_is_doris_table || io_ctx->is_warmup || |
266 | 20 | !doris::config::enable_cache_read_from_peer) { |
267 | 20 | return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); |
268 | 20 | } else { |
269 | | // first try peer read, if peer failed, fallback to S3 |
270 | | // peer timeout is 5 seconds |
271 | | // TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first |
272 | | // ATTN: Save original size before peer read, as it may be modified by fetch_blocks, read peer ref size |
273 | 0 | size_t original_size = size; |
274 | 0 | auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), |
275 | 0 | this->size(), _is_doris_table, stats, io_ctx); |
276 | 0 | if (!st.ok()) { |
277 | | // Restore original size for S3 fallback, as peer read may have modified it |
278 | 0 | size = original_size; |
279 | | // Fallback to S3 |
280 | 0 | return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); |
281 | 0 | } |
282 | 0 | return st; |
283 | 0 | } |
284 | 20 | } |
285 | | |
286 | | Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
287 | 9.14k | const IOContext* io_ctx) { |
288 | 9.14k | size_t already_read = 0; |
289 | 9.14k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at); |
290 | | |
291 | 9.14k | const bool is_dryrun = io_ctx->is_dryrun; |
292 | 9.14k | DCHECK(!closed()); |
293 | 9.14k | DCHECK(io_ctx); |
294 | 9.14k | if (offset > size()) { |
295 | 1 | return Status::InvalidArgument( |
296 | 1 | fmt::format("offset exceeds file size(offset: {}, file size: {}, path: {})", offset, |
297 | 1 | size(), path().native())); |
298 | 1 | } |
299 | 9.14k | size_t bytes_req = result.size; |
300 | 9.14k | bytes_req = std::min(bytes_req, size() - offset); |
301 | 9.14k | if (UNLIKELY(bytes_req == 0)) { |
302 | 1 | *bytes_read = 0; |
303 | 1 | return Status::OK(); |
304 | 1 | } |
305 | | |
306 | 9.14k | ReadStatistics stats; |
307 | 9.14k | stats.bytes_read += bytes_req; |
308 | 9.14k | MonotonicStopWatch read_at_sw; |
309 | 9.14k | read_at_sw.start(); |
310 | 9.14k | auto defer_func = [&](int*) { |
311 | 9.14k | if (config::print_stack_when_cache_miss) { |
312 | 0 | if (io_ctx->file_cache_stats == nullptr && !stats.hit_cache && !io_ctx->is_warmup) { |
313 | 0 | LOG_INFO("[verbose] {}", Status::InternalError<true>("not hit cache")); |
314 | 0 | } |
315 | 0 | } |
316 | 9.14k | if (!stats.hit_cache && config::read_cluster_cache_opt_verbose_log) { |
317 | 0 | LOG_INFO( |
318 | 0 | "[verbose] not hit cache, path: {}, offset: {}, size: {}, cost: {} ms, warmup: " |
319 | 0 | "{}", |
320 | 0 | path().native(), offset, bytes_req, read_at_sw.elapsed_time_milliseconds(), |
321 | 0 | io_ctx->is_warmup); |
322 | 0 | } |
323 | 9.14k | if (is_dryrun) { |
324 | 2 | return; |
325 | 2 | } |
326 | | // update stats increment in this reading procedure for file cache metrics |
327 | 9.14k | FileCacheStatistics fcache_stats_increment; |
328 | 9.14k | _update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index); |
329 | 9.14k | io::FileCacheMetrics::instance().update(&fcache_stats_increment); |
330 | 9.14k | if (io_ctx->file_cache_stats) { |
331 | | // update stats in io_ctx, for query profile |
332 | 25 | _update_stats(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index); |
333 | 25 | } |
334 | 9.14k | }; |
335 | 9.14k | std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func)); |
336 | 9.15k | if (_is_doris_table && config::enable_read_cache_file_directly) { |
337 | | // read directly |
338 | 9.13k | SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer); |
339 | 9.13k | size_t need_read_size = bytes_req; |
340 | 9.13k | std::shared_lock lock(_mtx); |
341 | 9.13k | if (!_cache_file_readers.empty()) { |
342 | | // find the last offset > offset. |
343 | 9.12k | auto iter = _cache_file_readers.upper_bound(offset); |
344 | 9.12k | if (iter != _cache_file_readers.begin()) { |
345 | 9.12k | iter--; |
346 | 9.12k | } |
347 | 9.12k | size_t cur_offset = offset; |
348 | 18.6k | while (need_read_size != 0 && iter != _cache_file_readers.end()) { |
349 | 9.57k | if (iter->second->offset() > cur_offset || |
350 | 9.57k | iter->second->range().right < cur_offset) { |
351 | 4 | break; |
352 | 4 | } |
353 | 9.56k | size_t file_offset = cur_offset - iter->second->offset(); |
354 | 9.56k | size_t reserve_bytes = |
355 | 9.56k | std::min(need_read_size, iter->second->range().size() - file_offset); |
356 | 9.56k | if (is_dryrun) [[unlikely]] { |
357 | 0 | g_skip_local_cache_io_sum_bytes << reserve_bytes; |
358 | 9.56k | } else { |
359 | 9.56k | SCOPED_RAW_TIMER(&stats.local_read_timer); |
360 | 9.56k | if (!iter->second |
361 | 9.56k | ->read(Slice(result.data + (cur_offset - offset), reserve_bytes), |
362 | 9.56k | file_offset) |
363 | 9.56k | .ok()) { //TODO: maybe read failed because block evict, should handle error |
364 | 0 | break; |
365 | 0 | } |
366 | 9.56k | } |
367 | 9.56k | _cache->add_need_update_lru_block(iter->second); |
368 | 9.56k | need_read_size -= reserve_bytes; |
369 | 9.56k | cur_offset += reserve_bytes; |
370 | 9.56k | already_read += reserve_bytes; |
371 | 9.56k | iter++; |
372 | 9.56k | } |
373 | 9.12k | if (need_read_size == 0) { |
374 | 9.12k | *bytes_read = bytes_req; |
375 | 9.12k | stats.hit_cache = true; |
376 | 9.12k | g_read_cache_direct_whole_num << 1; |
377 | 9.12k | g_read_cache_direct_whole_bytes << bytes_req; |
378 | 9.12k | return Status::OK(); |
379 | 9.12k | } else { |
380 | 3 | g_read_cache_direct_partial_num << 1; |
381 | 3 | g_read_cache_direct_partial_bytes << already_read; |
382 | 3 | } |
383 | 9.12k | } |
384 | 9.13k | } |
385 | | // read from cache or remote |
386 | 19 | g_read_cache_indirect_num << 1; |
387 | 19 | size_t indirect_read_bytes = 0; |
388 | 19 | auto [align_left, align_size] = |
389 | 19 | s_align_size(offset + already_read, bytes_req - already_read, size()); |
390 | 19 | CacheContext cache_context(io_ctx); |
391 | 19 | cache_context.stats = &stats; |
392 | 19 | auto tablet_id = get_tablet_id(path().string()); |
393 | 19 | cache_context.tablet_id = tablet_id.value_or(0); |
394 | 19 | MonotonicStopWatch sw; |
395 | 19 | sw.start(); |
396 | | |
397 | 19 | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment(); |
398 | 19 | FileBlocksHolder holder = |
399 | 19 | _cache->get_or_set(_cache_hash, align_left, align_size, cache_context); |
400 | 19 | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement(); |
401 | | |
402 | 19 | stats.cache_get_or_set_timer += sw.elapsed_time(); |
403 | 19 | std::vector<FileBlockSPtr> empty_blocks; |
404 | 42 | for (auto& block : holder.file_blocks) { |
405 | 42 | switch (block->state()) { |
406 | 32 | case FileBlock::State::EMPTY: |
407 | 32 | VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}", |
408 | 0 | path().native(), _cache_hash.to_string(), _cache_hash.high(), |
409 | 0 | _cache_hash.low(), block->offset(), block->get_cache_file()); |
410 | 32 | block->get_or_set_downloader(); |
411 | 32 | if (block->is_downloader()) { |
412 | 32 | empty_blocks.push_back(block); |
413 | 32 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY"); |
414 | 32 | } |
415 | 32 | stats.hit_cache = false; |
416 | 32 | break; |
417 | 3 | case FileBlock::State::SKIP_CACHE: |
418 | 3 | VLOG_DEBUG << fmt::format( |
419 | 0 | "Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}", |
420 | 0 | path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(), |
421 | 0 | block->offset(), block->get_cache_file()); |
422 | 3 | empty_blocks.push_back(block); |
423 | 3 | stats.hit_cache = false; |
424 | 3 | stats.skip_cache = true; |
425 | 3 | break; |
426 | 2 | case FileBlock::State::DOWNLOADING: |
427 | 2 | stats.hit_cache = false; |
428 | 2 | break; |
429 | 5 | case FileBlock::State::DOWNLOADED: |
430 | 5 | _insert_file_reader(block); |
431 | 5 | break; |
432 | 42 | } |
433 | 42 | } |
434 | 19 | size_t empty_start = 0; |
435 | 19 | size_t empty_end = 0; |
436 | 20 | if (!empty_blocks.empty()) { |
437 | 20 | empty_start = empty_blocks.front()->range().left; |
438 | 20 | empty_end = empty_blocks.back()->range().right; |
439 | 20 | size_t size = empty_end - empty_start + 1; |
440 | 20 | std::unique_ptr<char[]> buffer(new char[size]); |
441 | | |
442 | | // Apply rate limiting for warmup download tasks (node level) |
443 | | // Rate limiting is applied before remote read to limit both S3 read and local cache write |
444 | 20 | if (io_ctx->is_warmup) { |
445 | 0 | auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter(); |
446 | 0 | if (rate_limiter != nullptr) { |
447 | 0 | rate_limiter->add(size); |
448 | 0 | } |
449 | 0 | } |
450 | | |
451 | | // Determine read type and execute remote read |
452 | 20 | RETURN_IF_ERROR( |
453 | 20 | _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); |
454 | | |
455 | 20 | { |
456 | 20 | SCOPED_CONCURRENCY_COUNT( |
457 | 20 | ConcurrencyStatsManager::instance().cached_remote_reader_write_back); |
458 | 35 | for (auto& block : empty_blocks) { |
459 | 35 | if (block->state() == FileBlock::State::SKIP_CACHE) { |
460 | 3 | continue; |
461 | 3 | } |
462 | 32 | SCOPED_RAW_TIMER(&stats.local_write_timer); |
463 | 32 | char* cur_ptr = buffer.get() + block->range().left - empty_start; |
464 | 32 | size_t block_size = block->range().size(); |
465 | 32 | Status st = block->append(Slice(cur_ptr, block_size)); |
466 | 32 | if (st.ok()) { |
467 | 31 | st = block->finalize(); |
468 | 31 | } |
469 | 32 | if (!st.ok()) { |
470 | 2 | LOG_EVERY_N(WARNING, 100) |
471 | 1 | << "Write data to file cache failed. err=" << st.msg(); |
472 | 30 | } else { |
473 | 30 | _insert_file_reader(block); |
474 | 30 | } |
475 | 32 | stats.bytes_write_into_file_cache += block_size; |
476 | 32 | } |
477 | 20 | } |
478 | | // copy from memory directly |
479 | 20 | size_t right_offset = offset + bytes_req - 1; |
480 | 20 | if (empty_start <= right_offset && empty_end >= offset + already_read && !is_dryrun) { |
481 | 19 | size_t copy_left_offset = std::max(offset + already_read, empty_start); |
482 | 19 | size_t copy_right_offset = std::min(right_offset, empty_end); |
483 | 19 | char* dst = result.data + (copy_left_offset - offset); |
484 | 19 | char* src = buffer.get() + (copy_left_offset - empty_start); |
485 | 19 | size_t copy_size = copy_right_offset - copy_left_offset + 1; |
486 | 19 | memcpy(dst, src, copy_size); |
487 | 19 | indirect_read_bytes += copy_size; |
488 | 19 | } |
489 | 20 | } |
490 | | |
491 | 19 | size_t current_offset = offset; |
492 | 19 | size_t end_offset = offset + bytes_req - 1; |
493 | 19 | bool need_self_heal = false; |
494 | 19 | *bytes_read = 0; |
495 | 42 | for (auto& block : holder.file_blocks) { |
496 | 42 | if (current_offset > end_offset) { |
497 | 0 | break; |
498 | 0 | } |
499 | 42 | size_t left = block->range().left; |
500 | 42 | size_t right = block->range().right; |
501 | 42 | if (right < offset) { |
502 | 1 | continue; |
503 | 1 | } |
504 | 41 | size_t read_size = |
505 | 41 | end_offset > right ? right - current_offset + 1 : end_offset - current_offset + 1; |
506 | 41 | if (empty_start <= left && right <= empty_end) { |
507 | 34 | *bytes_read += read_size; |
508 | 34 | current_offset = right + 1; |
509 | 34 | continue; |
510 | 34 | } |
511 | 7 | FileBlock::State block_state = block->state(); |
512 | 7 | int64_t wait_time = 0; |
513 | 7 | static int64_t max_wait_time = 10; |
514 | 7 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time); |
515 | 7 | if (block_state != FileBlock::State::DOWNLOADED) { |
516 | 2 | SCOPED_CONCURRENCY_COUNT( |
517 | 2 | ConcurrencyStatsManager::instance().cached_remote_reader_blocking); |
518 | 3 | do { |
519 | 3 | SCOPED_RAW_TIMER(&stats.remote_wait_timer); |
520 | 3 | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
521 | 3 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING"); |
522 | 3 | block_state = block->wait(); |
523 | 3 | if (block_state != FileBlock::State::DOWNLOADING) { |
524 | 1 | break; |
525 | 1 | } |
526 | 3 | } while (++wait_time < max_wait_time); |
527 | 2 | } |
528 | 7 | if (wait_time == max_wait_time) [[unlikely]] { |
529 | 1 | LOG_WARNING("Waiting too long for the download to complete"); |
530 | 1 | } |
531 | 7 | { |
532 | 7 | Status st; |
533 | | /* |
534 | | * If block_state == EMPTY, the thread reads the data from remote. |
535 | | * If block_state == DOWNLOADED, when the cache file is deleted by the other process, |
536 | | * the thread reads the data from remote too. |
537 | | */ |
538 | 7 | if (block_state == FileBlock::State::DOWNLOADED) { |
539 | 6 | if (is_dryrun) [[unlikely]] { |
540 | 1 | g_skip_local_cache_io_sum_bytes << read_size; |
541 | 5 | } else { |
542 | 5 | size_t file_offset = current_offset - left; |
543 | 5 | SCOPED_RAW_TIMER(&stats.local_read_timer); |
544 | 5 | SCOPED_CONCURRENCY_COUNT( |
545 | 5 | ConcurrencyStatsManager::instance().cached_remote_reader_local_read); |
546 | 5 | st = block->read(Slice(result.data + (current_offset - offset), read_size), |
547 | 5 | file_offset); |
548 | 5 | indirect_read_bytes += read_size; |
549 | 5 | } |
550 | 6 | } |
551 | 7 | if (!st || block_state != FileBlock::State::DOWNLOADED) { |
552 | 3 | if (is_dryrun) [[unlikely]] { |
553 | | // dryrun mode uses a null buffer, skip actual remote IO |
554 | 3 | } else { |
555 | 3 | if (block_state == FileBlock::State::DOWNLOADED && |
556 | 3 | st.is<ErrorCode::NOT_FOUND>()) { |
557 | 1 | need_self_heal = true; |
558 | 1 | g_read_cache_self_heal_on_not_found << 1; |
559 | 1 | LOG_EVERY_N(WARNING, 100) |
560 | 1 | << "Cache block file is missing, will self-heal by clearing cache " |
561 | 1 | "hash. " |
562 | 1 | << "path=" << path().native() |
563 | 1 | << ", hash=" << _cache_hash.to_string() << ", offset=" << left |
564 | 1 | << ", err=" << st.msg(); |
565 | 1 | } |
566 | 3 | LOG(WARNING) << "Read data failed from file cache downloaded by others. err=" |
567 | 3 | << st.msg() << ", block state=" << block_state; |
568 | 3 | size_t nest_bytes_read {0}; |
569 | 3 | stats.hit_cache = false; |
570 | 3 | stats.from_peer_cache = false; |
571 | 3 | s3_read_counter << 1; |
572 | 3 | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
573 | 3 | RETURN_IF_ERROR(_remote_file_reader->read_at( |
574 | 3 | current_offset, |
575 | 3 | Slice(result.data + (current_offset - offset), read_size), |
576 | 3 | &nest_bytes_read)); |
577 | 2 | indirect_read_bytes += read_size; |
578 | 2 | DCHECK(nest_bytes_read == read_size); |
579 | 2 | } |
580 | 3 | } |
581 | 7 | } |
582 | 6 | *bytes_read += read_size; |
583 | 6 | current_offset = right + 1; |
584 | 6 | } |
585 | 18 | if (need_self_heal && _cache != nullptr) { |
586 | 1 | _cache->remove_if_cached_async(_cache_hash); |
587 | 1 | } |
588 | 18 | g_read_cache_indirect_bytes << indirect_read_bytes; |
589 | 18 | g_read_cache_indirect_total_bytes << *bytes_read; |
590 | | |
591 | 18 | DCHECK(*bytes_read == bytes_req); |
592 | 18 | return Status::OK(); |
593 | 19 | } |
594 | | |
595 | | void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, |
596 | | FileCacheStatistics* statis, |
597 | 9.16k | bool is_inverted_index) const { |
598 | 9.16k | if (statis == nullptr) { |
599 | 0 | return; |
600 | 0 | } |
601 | 9.16k | if (read_stats.hit_cache) { |
602 | 9.12k | statis->num_local_io_total++; |
603 | 9.12k | statis->bytes_read_from_local += read_stats.bytes_read; |
604 | 9.12k | } else { |
605 | 43 | if (read_stats.from_peer_cache) { |
606 | 0 | statis->num_peer_io_total++; |
607 | 0 | statis->bytes_read_from_peer += read_stats.bytes_read; |
608 | 0 | statis->peer_io_timer += read_stats.peer_read_timer; |
609 | 43 | } else { |
610 | 43 | statis->num_remote_io_total++; |
611 | 43 | statis->bytes_read_from_remote += read_stats.bytes_read; |
612 | 43 | statis->remote_io_timer += read_stats.remote_read_timer; |
613 | 43 | } |
614 | 43 | } |
615 | 9.16k | statis->remote_wait_timer += read_stats.remote_wait_timer; |
616 | 9.16k | statis->local_io_timer += read_stats.local_read_timer; |
617 | 9.16k | statis->num_skip_cache_io_total += read_stats.skip_cache; |
618 | 9.16k | statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache; |
619 | 9.16k | statis->write_cache_io_timer += read_stats.local_write_timer; |
620 | | |
621 | 9.16k | statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer; |
622 | 9.16k | statis->cache_get_or_set_timer += read_stats.cache_get_or_set_timer; |
623 | 9.16k | statis->lock_wait_timer += read_stats.lock_wait_timer; |
624 | 9.16k | statis->get_timer += read_stats.get_timer; |
625 | 9.16k | statis->set_timer += read_stats.set_timer; |
626 | | |
627 | 9.16k | if (is_inverted_index) { |
628 | 0 | if (read_stats.hit_cache) { |
629 | 0 | statis->inverted_index_num_local_io_total++; |
630 | 0 | statis->inverted_index_bytes_read_from_local += read_stats.bytes_read; |
631 | 0 | } else { |
632 | 0 | if (read_stats.from_peer_cache) { |
633 | 0 | statis->inverted_index_num_peer_io_total++; |
634 | 0 | statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read; |
635 | 0 | statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; |
636 | 0 | } else { |
637 | 0 | statis->inverted_index_num_remote_io_total++; |
638 | 0 | statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; |
639 | 0 | statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; |
640 | 0 | } |
641 | 0 | } |
642 | 0 | statis->inverted_index_local_io_timer += read_stats.local_read_timer; |
643 | 0 | } |
644 | | |
645 | 9.16k | g_skip_cache_sum << read_stats.skip_cache; |
646 | 9.16k | } |
647 | | |
648 | 0 | void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) { |
649 | 0 | if (offset >= this->size() || size == 0) { |
650 | 0 | return; |
651 | 0 | } |
652 | | |
653 | 0 | size = std::min(size, this->size() - offset); |
654 | |
|
655 | 0 | ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool(); |
656 | 0 | if (pool == nullptr) { |
657 | 0 | return; |
658 | 0 | } |
659 | | |
660 | 0 | IOContext dryrun_ctx; |
661 | 0 | if (io_ctx != nullptr) { |
662 | 0 | dryrun_ctx = *io_ctx; |
663 | 0 | } |
664 | 0 | dryrun_ctx.is_dryrun = true; |
665 | 0 | dryrun_ctx.query_id = nullptr; |
666 | 0 | dryrun_ctx.file_cache_stats = nullptr; |
667 | 0 | dryrun_ctx.file_reader_stats = nullptr; |
668 | |
|
669 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) |
670 | 0 | << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}", |
671 | 0 | offset, size, path().filename().native()); |
672 | 0 | std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this(); |
673 | 0 | auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() { |
674 | 0 | auto self = weak_this.lock(); |
675 | 0 | if (self == nullptr) { |
676 | 0 | return; |
677 | 0 | } |
678 | 0 | size_t bytes_read; |
679 | 0 | Slice dummy_buffer((char*)nullptr, size); |
680 | 0 | (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx); |
681 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) |
682 | 0 | << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}", |
683 | 0 | offset, size, self->path().filename().native()); |
684 | 0 | }); |
685 | |
|
686 | 0 | if (!st.ok()) { |
687 | 0 | VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size |
688 | 0 | << " error=" << st.to_string(); |
689 | 0 | } |
690 | 0 | } |
691 | | |
692 | | } // namespace doris::io |