be/src/io/cache/cached_remote_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/cache/cached_remote_file_reader.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <fmt/format.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <glog/logging.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <atomic> |
28 | | #include <condition_variable> |
29 | | #include <cstring> |
30 | | #include <functional> |
31 | | #include <list> |
32 | | #include <memory> |
33 | | #include <mutex> |
34 | | #include <thread> |
35 | | #include <vector> |
36 | | |
37 | | #include "cloud/cloud_warm_up_manager.h" |
38 | | #include "cloud/config.h" |
39 | | #include "common/compiler_util.h" // IWYU pragma: keep |
40 | | #include "common/config.h" |
41 | | #include "common/metrics/doris_metrics.h" |
42 | | #include "cpp/s3_rate_limiter.h" |
43 | | #include "cpp/sync_point.h" |
44 | | #include "io/cache/block_file_cache.h" |
45 | | #include "io/cache/block_file_cache_factory.h" |
46 | | #include "io/cache/block_file_cache_profile.h" |
47 | | #include "io/cache/file_block.h" |
48 | | #include "io/cache/file_cache_common.h" |
49 | | #include "io/cache/peer_file_cache_reader.h" |
50 | | #include "io/fs/file_reader.h" |
51 | | #include "io/fs/local_file_system.h" |
52 | | #include "io/io_common.h" |
53 | | #include "runtime/exec_env.h" |
54 | | #include "runtime/runtime_profile.h" |
55 | | #include "runtime/thread_context.h" |
56 | | #include "runtime/workload_management/io_throttle.h" |
57 | | #include "service/backend_options.h" |
58 | | #include "storage/storage_policy.h" |
59 | | #include "util/bit_util.h" |
60 | | #include "util/brpc_client_cache.h" // BrpcClientCache |
61 | | #include "util/client_cache.h" |
62 | | #include "util/concurrency_stats.h" |
63 | | #include "util/debug_points.h" |
64 | | |
65 | | namespace doris::io { |
66 | | |
67 | | bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read"); |
68 | | bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read"); |
69 | | bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); |
70 | | bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum"); |
71 | | bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes( |
72 | | "cached_remote_reader_skip_local_cache_io_sum_bytes"); |
73 | | bvar::Adder<uint64_t> g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num"); |
74 | | bvar::Adder<uint64_t> g_read_cache_direct_partial_num( |
75 | | "cached_remote_reader_cache_direct_partial_num"); |
76 | | bvar::Adder<uint64_t> g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num"); |
77 | | bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes( |
78 | | "cached_remote_reader_cache_direct_whole_bytes"); |
79 | | bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes( |
80 | | "cached_remote_reader_cache_direct_partial_bytes"); |
81 | | bvar::Adder<uint64_t> g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes"); |
82 | | bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes( |
83 | | "cached_remote_reader_cache_indirect_total_bytes"); |
84 | | bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found( |
85 | | "cached_remote_reader_self_heal_on_not_found"); |
86 | | bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window( |
87 | | "cached_remote_reader_indirect_bytes_1min_window", &g_read_cache_indirect_bytes, 60); |
88 | | bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_total_bytes_1min_window( |
89 | | "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes, |
90 | | 60); |
91 | | bvar::Adder<uint64_t> g_failed_get_peer_addr_counter( |
92 | | "cached_remote_reader_failed_get_peer_addr_counter"); |
93 | | |
94 | | CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, |
95 | | const FileReaderOptions& opts) |
96 | 2.01k | : _remote_file_reader(std::move(remote_file_reader)) { |
97 | 2.01k | _is_doris_table = opts.is_doris_table; |
98 | 2.01k | if (_is_doris_table) { |
99 | 2.01k | _cache_hash = BlockFileCache::hash(path().filename().native()); |
100 | 2.01k | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
101 | 2.01k | if (config::enable_read_cache_file_directly) { |
102 | | // this is designed for and test in doris table, external table need extra tests |
103 | 2.00k | _cache_file_readers = _cache->get_blocks_by_key(_cache_hash); |
104 | 2.00k | } |
105 | 2.01k | } else { |
106 | | // Use path and modification time to build cache key |
107 | 3 | std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime); |
108 | 3 | _cache_hash = BlockFileCache::hash(unique_path); |
109 | 3 | if (opts.cache_base_path.empty()) { |
110 | | // if cache path is not specified by session variable, chose randomly. |
111 | 1 | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
112 | 2 | } else { |
113 | | // from query session variable: file_cache_base_path |
114 | 2 | _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path); |
115 | 2 | if (_cache == nullptr) { |
116 | 1 | LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path |
117 | 1 | << ", using random instead."; |
118 | 1 | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
119 | 1 | } |
120 | 2 | } |
121 | 3 | } |
122 | 2.01k | } |
123 | | |
124 | 66 | void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) { |
125 | 66 | if (_is_doris_table && config::enable_read_cache_file_directly) { |
126 | 44 | std::lock_guard lock(_mtx); |
127 | 44 | DCHECK(file_block->state() == FileBlock::State::DOWNLOADED); |
128 | 44 | file_block->_owned_by_cached_reader = true; |
129 | 44 | _cache_file_readers.emplace(file_block->offset(), std::move(file_block)); |
130 | 44 | } |
131 | 66 | } |
132 | | |
133 | 2.01k | CachedRemoteFileReader::~CachedRemoteFileReader() { |
134 | 2.01k | for (auto& it : _cache_file_readers) { |
135 | 963 | it.second->_owned_by_cached_reader = false; |
136 | 963 | } |
137 | 2.01k | static_cast<void>(close()); |
138 | 2.01k | } |
139 | | |
140 | 3.03k | Status CachedRemoteFileReader::close() { |
141 | 3.03k | return _remote_file_reader->close(); |
142 | 3.03k | } |
143 | | |
144 | | std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, size_t read_size, |
145 | 2.34k | size_t length) { |
146 | 2.34k | size_t left = offset; |
147 | 2.34k | size_t right = offset + read_size - 1; |
148 | 2.34k | size_t align_left = |
149 | 2.34k | (left / config::file_cache_each_block_size) * config::file_cache_each_block_size; |
150 | 2.34k | size_t align_right = |
151 | 2.34k | (right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size; |
152 | 2.34k | align_right = align_right < length ? align_right : length; |
153 | 2.34k | size_t align_size = align_right - align_left; |
154 | 2.34k | if (align_size < config::file_cache_each_block_size && align_left != 0) { |
155 | 4 | align_size += config::file_cache_each_block_size; |
156 | 4 | align_left -= config::file_cache_each_block_size; |
157 | 4 | } |
158 | 2.34k | return std::make_pair(align_left, align_size); |
159 | 2.34k | } |
160 | | |
161 | | namespace { |
162 | 0 | std::optional<int64_t> extract_tablet_id(const std::string& file_path) { |
163 | 0 | return StorageResource::parse_tablet_id_from_path(file_path); |
164 | 0 | } |
165 | | |
166 | | // Get peer connection info from tablet_id |
167 | 0 | std::pair<std::string, int> get_peer_connection_info(const std::string& file_path) { |
168 | 0 | std::string host = ""; |
169 | 0 | int port = 0; |
170 | | |
171 | | // Try to get tablet_id from actual path and lookup tablet info |
172 | 0 | if (auto tablet_id = extract_tablet_id(file_path)) { |
173 | 0 | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
174 | 0 | if (auto tablet_info = manager.get_balanced_tablet_info(*tablet_id)) { |
175 | 0 | host = tablet_info->first; |
176 | 0 | port = tablet_info->second; |
177 | 0 | } else { |
178 | 0 | VLOG_DEBUG << "get peer connection info not found" |
179 | 0 | << ", tablet_id=" << *tablet_id << ", file_path=" << file_path; |
180 | 0 | } |
181 | 0 | } else { |
182 | 0 | VLOG_DEBUG << "parse tablet id from path failed" |
183 | 0 | << "tablet_id=null, file_path=" << file_path; |
184 | 0 | } |
185 | |
|
186 | 0 | DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", { |
187 | 0 | host = dp->param<std::string>("host", "127.0.0.1"); |
188 | 0 | port = dp->param("port", 9060); |
189 | 0 | LOG_WARNING("debug point PeerFileCacheReader::_fetch_from_peer_cache_blocks") |
190 | 0 | .tag("host", host) |
191 | 0 | .tag("port", port); |
192 | 0 | }); |
193 | |
|
194 | 0 | return {host, port}; |
195 | 0 | } |
196 | | |
197 | | // Execute peer read with fallback to S3 |
198 | | // file_size is the size of the file |
199 | | // used to calculate the rightmost boundary value of the block, due to inaccurate current block meta information. |
200 | | Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, |
201 | | size_t& size, std::unique_ptr<char[]>& buffer, |
202 | | const std::string& file_path, size_t file_size, bool is_doris_table, |
203 | 0 | ReadStatistics& stats, const IOContext* io_ctx) { |
204 | 0 | auto [host, port] = get_peer_connection_info(file_path); |
205 | 0 | VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port |
206 | 0 | << ", file_path=" << file_path; |
207 | |
|
208 | 0 | if (host.empty() || port == 0) { |
209 | 0 | g_failed_get_peer_addr_counter << 1; |
210 | 0 | VLOG_DEBUG << "PeerFileCacheReader host or port is empty" |
211 | 0 | << ", host=" << host << ", port=" << port << ", file_path=" << file_path; |
212 | 0 | return Status::InternalError<false>("host or port is empty"); |
213 | 0 | } |
214 | 0 | SCOPED_RAW_TIMER(&stats.peer_read_timer); |
215 | 0 | peer_read_counter << 1; |
216 | 0 | PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port); |
217 | 0 | auto st = peer_reader.fetch_blocks(empty_blocks, empty_start, Slice(buffer.get(), size), &size, |
218 | 0 | file_size, io_ctx); |
219 | 0 | if (!st.ok()) { |
220 | 0 | VLOG_DEBUG << "PeerFileCacheReader read from peer failed" |
221 | 0 | << ", host=" << host << ", port=" << port << ", error=" << st.msg(); |
222 | 0 | } |
223 | 0 | stats.from_peer_cache = true; |
224 | 0 | return st; |
225 | 0 | } |
226 | | |
227 | | // Execute S3 read |
228 | | Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer, |
229 | | ReadStatistics& stats, const IOContext* io_ctx, |
230 | 1.32k | FileReaderSPtr remote_file_reader) { |
231 | 1.32k | s3_read_counter << 1; |
232 | 1.32k | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
233 | 1.32k | stats.from_peer_cache = false; |
234 | 1.32k | return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); |
235 | 1.32k | } |
236 | | |
237 | | } // anonymous namespace |
238 | | |
239 | | Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockSPtr>& empty_blocks, |
240 | | size_t empty_start, size_t& size, |
241 | | std::unique_ptr<char[]>& buffer, |
242 | | ReadStatistics& stats, |
243 | 1.32k | const IOContext* io_ctx) { |
244 | 1.32k | DBUG_EXECUTE_IF("CachedRemoteFileReader.read_at_impl.change_type", { |
245 | | // Determine read type from debug point or default to S3 |
246 | 1.32k | std::string read_type = "s3"; |
247 | 1.32k | read_type = dp->param<std::string>("type", "s3"); |
248 | 1.32k | LOG_WARNING("CachedRemoteFileReader.read_at_impl.change_type") |
249 | 1.32k | .tag("path", path().native()) |
250 | 1.32k | .tag("off", empty_start) |
251 | 1.32k | .tag("size", size) |
252 | 1.32k | .tag("type", read_type); |
253 | | // Execute appropriate read strategy |
254 | 1.32k | if (read_type == "s3") { |
255 | 1.32k | return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); |
256 | 1.32k | } else { |
257 | 1.32k | return execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), |
258 | 1.32k | this->size(), _is_doris_table, stats, io_ctx); |
259 | 1.32k | } |
260 | 1.32k | }); |
261 | | |
262 | 1.32k | if (!doris::config::is_cloud_mode() || !_is_doris_table || io_ctx->is_warmup || |
263 | 1.32k | !doris::config::enable_cache_read_from_peer) { |
264 | 1.32k | return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); |
265 | 1.32k | } else { |
266 | | // first try peer read, if peer failed, fallback to S3 |
267 | | // peer timeout is 5 seconds |
268 | | // TODO(dx): here peer and s3 reader need to get data in parallel, and take the one that is correct and returns first |
269 | | // ATTN: Save original size before peer read, as it may be modified by fetch_blocks, read peer ref size |
270 | 0 | size_t original_size = size; |
271 | 0 | auto st = execute_peer_read(empty_blocks, empty_start, size, buffer, path().native(), |
272 | 0 | this->size(), _is_doris_table, stats, io_ctx); |
273 | 0 | if (!st.ok()) { |
274 | | // Restore original size for S3 fallback, as peer read may have modified it |
275 | 0 | size = original_size; |
276 | | // Fallback to S3 |
277 | 0 | return execute_s3_read(empty_start, size, buffer, stats, io_ctx, _remote_file_reader); |
278 | 0 | } |
279 | 0 | return st; |
280 | 0 | } |
281 | 1.32k | } |
282 | | |
283 | | Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
284 | 9.79k | const IOContext* io_ctx) { |
285 | 9.79k | size_t already_read = 0; |
286 | 9.79k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at); |
287 | | |
288 | 9.79k | const bool is_dryrun = io_ctx->is_dryrun; |
289 | 9.79k | DCHECK(!closed()); |
290 | 9.79k | DCHECK(io_ctx); |
291 | 9.79k | if (offset > size()) { |
292 | 1 | return Status::InvalidArgument( |
293 | 1 | fmt::format("offset exceeds file size(offset: {}, file size: {}, path: {})", offset, |
294 | 1 | size(), path().native())); |
295 | 1 | } |
296 | 9.79k | size_t bytes_req = result.size; |
297 | 9.79k | bytes_req = std::min(bytes_req, size() - offset); |
298 | 9.79k | if (UNLIKELY(bytes_req == 0)) { |
299 | 1 | *bytes_read = 0; |
300 | 1 | return Status::OK(); |
301 | 1 | } |
302 | | |
303 | 9.79k | ReadStatistics stats; |
304 | 9.79k | stats.bytes_read += bytes_req; |
305 | 9.79k | MonotonicStopWatch read_at_sw; |
306 | 9.79k | read_at_sw.start(); |
307 | 9.80k | auto defer_func = [&](int*) { |
308 | 9.80k | if (config::print_stack_when_cache_miss) { |
309 | 0 | if (io_ctx->file_cache_stats == nullptr && !stats.hit_cache && !io_ctx->is_warmup) { |
310 | 0 | LOG_INFO("[verbose] {}", Status::InternalError<true>("not hit cache")); |
311 | 0 | } |
312 | 0 | } |
313 | 9.80k | if (!stats.hit_cache && config::read_cluster_cache_opt_verbose_log) { |
314 | 0 | LOG_INFO( |
315 | 0 | "[verbose] not hit cache, path: {}, offset: {}, size: {}, cost: {} ms, warmup: " |
316 | 0 | "{}", |
317 | 0 | path().native(), offset, bytes_req, read_at_sw.elapsed_time_milliseconds(), |
318 | 0 | io_ctx->is_warmup); |
319 | 0 | } |
320 | 9.80k | if (is_dryrun) { |
321 | 2 | return; |
322 | 2 | } |
323 | | // update stats increment in this reading procedure for file cache metrics |
324 | 9.79k | FileCacheStatistics fcache_stats_increment; |
325 | 9.79k | _update_stats(stats, &fcache_stats_increment, io_ctx->is_inverted_index); |
326 | 9.79k | io::FileCacheMetrics::instance().update(&fcache_stats_increment); |
327 | 9.79k | if (io_ctx->file_cache_stats) { |
328 | | // update stats in io_ctx, for query profile |
329 | 25 | _update_stats(stats, io_ctx->file_cache_stats, io_ctx->is_inverted_index); |
330 | 25 | } |
331 | 9.79k | }; |
332 | 9.79k | std::unique_ptr<int, decltype(defer_func)> defer((int*)0x01, std::move(defer_func)); |
333 | 9.80k | if (_is_doris_table && config::enable_read_cache_file_directly) { |
334 | | // read directly |
335 | 9.78k | SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer); |
336 | 9.78k | size_t need_read_size = bytes_req; |
337 | 9.78k | std::shared_lock lock(_mtx); |
338 | 9.78k | if (!_cache_file_readers.empty()) { |
339 | | // find the last offset > offset. |
340 | 8.47k | auto iter = _cache_file_readers.upper_bound(offset); |
341 | 8.47k | if (iter != _cache_file_readers.begin()) { |
342 | 8.47k | iter--; |
343 | 8.47k | } |
344 | 8.47k | size_t cur_offset = offset; |
345 | 17.3k | while (need_read_size != 0 && iter != _cache_file_readers.end()) { |
346 | 8.91k | if (iter->second->offset() > cur_offset || |
347 | 8.91k | iter->second->range().right < cur_offset) { |
348 | 4 | break; |
349 | 4 | } |
350 | 8.91k | size_t file_offset = cur_offset - iter->second->offset(); |
351 | 8.91k | size_t reserve_bytes = |
352 | 8.91k | std::min(need_read_size, iter->second->range().size() - file_offset); |
353 | 8.91k | if (is_dryrun) [[unlikely]] { |
354 | 0 | g_skip_local_cache_io_sum_bytes << reserve_bytes; |
355 | 8.91k | } else { |
356 | 8.91k | SCOPED_RAW_TIMER(&stats.local_read_timer); |
357 | 8.91k | if (!iter->second |
358 | 8.91k | ->read(Slice(result.data + (cur_offset - offset), reserve_bytes), |
359 | 8.91k | file_offset) |
360 | 8.91k | .ok()) { //TODO: maybe read failed because block evict, should handle error |
361 | 0 | break; |
362 | 0 | } |
363 | 8.91k | } |
364 | 8.91k | _cache->add_need_update_lru_block(iter->second); |
365 | 8.91k | need_read_size -= reserve_bytes; |
366 | 8.91k | cur_offset += reserve_bytes; |
367 | 8.91k | already_read += reserve_bytes; |
368 | 8.91k | iter++; |
369 | 8.91k | } |
370 | 8.47k | if (need_read_size == 0) { |
371 | 8.47k | *bytes_read = bytes_req; |
372 | 8.47k | stats.hit_cache = true; |
373 | 8.47k | g_read_cache_direct_whole_num << 1; |
374 | 8.47k | g_read_cache_direct_whole_bytes << bytes_req; |
375 | 8.47k | return Status::OK(); |
376 | 8.47k | } else { |
377 | 1 | g_read_cache_direct_partial_num << 1; |
378 | 1 | g_read_cache_direct_partial_bytes << already_read; |
379 | 1 | } |
380 | 8.47k | } |
381 | 9.78k | } |
382 | | // read from cache or remote |
383 | 1.32k | g_read_cache_indirect_num << 1; |
384 | 1.32k | size_t indirect_read_bytes = 0; |
385 | 1.32k | auto [align_left, align_size] = |
386 | 1.32k | s_align_size(offset + already_read, bytes_req - already_read, size()); |
387 | 1.32k | CacheContext cache_context(io_ctx); |
388 | 1.32k | cache_context.stats = &stats; |
389 | 1.32k | auto tablet_id = get_tablet_id(path().string()); |
390 | 1.32k | cache_context.tablet_id = tablet_id.value_or(0); |
391 | 1.32k | MonotonicStopWatch sw; |
392 | 1.32k | sw.start(); |
393 | | |
394 | 1.32k | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment(); |
395 | 1.32k | FileBlocksHolder holder = |
396 | 1.32k | _cache->get_or_set(_cache_hash, align_left, align_size, cache_context); |
397 | 1.32k | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement(); |
398 | | |
399 | 1.32k | stats.cache_get_or_set_timer += sw.elapsed_time(); |
400 | 1.32k | std::vector<FileBlockSPtr> empty_blocks; |
401 | 1.35k | for (auto& block : holder.file_blocks) { |
402 | 1.35k | switch (block->state()) { |
403 | 63 | case FileBlock::State::EMPTY: |
404 | 63 | VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}", |
405 | 0 | path().native(), _cache_hash.to_string(), _cache_hash.high(), |
406 | 0 | _cache_hash.low(), block->offset(), block->get_cache_file()); |
407 | 63 | block->get_or_set_downloader(); |
408 | 63 | if (block->is_downloader()) { |
409 | 63 | empty_blocks.push_back(block); |
410 | 63 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY"); |
411 | 63 | } |
412 | 63 | stats.hit_cache = false; |
413 | 63 | break; |
414 | 1.28k | case FileBlock::State::SKIP_CACHE: |
415 | 1.28k | VLOG_DEBUG << fmt::format( |
416 | 0 | "Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}", |
417 | 0 | path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(), |
418 | 0 | block->offset(), block->get_cache_file()); |
419 | 1.28k | empty_blocks.push_back(block); |
420 | 1.28k | stats.hit_cache = false; |
421 | 1.28k | stats.skip_cache = true; |
422 | 1.28k | break; |
423 | 2 | case FileBlock::State::DOWNLOADING: |
424 | 2 | stats.hit_cache = false; |
425 | 2 | break; |
426 | 5 | case FileBlock::State::DOWNLOADED: |
427 | 5 | _insert_file_reader(block); |
428 | 5 | break; |
429 | 1.35k | } |
430 | 1.35k | } |
431 | 1.32k | size_t empty_start = 0; |
432 | 1.32k | size_t empty_end = 0; |
433 | 1.32k | if (!empty_blocks.empty()) { |
434 | 1.32k | empty_start = empty_blocks.front()->range().left; |
435 | 1.32k | empty_end = empty_blocks.back()->range().right; |
436 | 1.32k | size_t size = empty_end - empty_start + 1; |
437 | 1.32k | std::unique_ptr<char[]> buffer(new char[size]); |
438 | | |
439 | | // Apply rate limiting for warmup download tasks (node level) |
440 | | // Rate limiting is applied before remote read to limit both S3 read and local cache write |
441 | 1.32k | if (io_ctx->is_warmup) { |
442 | 0 | auto* rate_limiter = ExecEnv::GetInstance()->warmup_download_rate_limiter(); |
443 | 0 | if (rate_limiter != nullptr) { |
444 | 0 | rate_limiter->add(size); |
445 | 0 | } |
446 | 0 | } |
447 | | |
448 | | // Determine read type and execute remote read |
449 | 1.32k | RETURN_IF_ERROR( |
450 | 1.32k | _execute_remote_read(empty_blocks, empty_start, size, buffer, stats, io_ctx)); |
451 | | |
452 | 1.32k | { |
453 | 1.32k | SCOPED_CONCURRENCY_COUNT( |
454 | 1.32k | ConcurrencyStatsManager::instance().cached_remote_reader_write_back); |
455 | 1.34k | for (auto& block : empty_blocks) { |
456 | 1.34k | if (block->state() == FileBlock::State::SKIP_CACHE) { |
457 | 1.28k | continue; |
458 | 1.28k | } |
459 | 63 | SCOPED_RAW_TIMER(&stats.local_write_timer); |
460 | 63 | char* cur_ptr = buffer.get() + block->range().left - empty_start; |
461 | 63 | size_t block_size = block->range().size(); |
462 | 63 | Status st = block->append(Slice(cur_ptr, block_size)); |
463 | 63 | if (st.ok()) { |
464 | 62 | st = block->finalize(); |
465 | 62 | } |
466 | 63 | if (!st.ok()) { |
467 | 2 | LOG_EVERY_N(WARNING, 100) |
468 | 1 | << "Write data to file cache failed. err=" << st.msg(); |
469 | 61 | } else { |
470 | 61 | _insert_file_reader(block); |
471 | 61 | } |
472 | 63 | stats.bytes_write_into_file_cache += block_size; |
473 | 63 | } |
474 | 1.32k | } |
475 | | // copy from memory directly |
476 | 1.32k | size_t right_offset = offset + bytes_req - 1; |
477 | 1.32k | if (empty_start <= right_offset && empty_end >= offset + already_read && !is_dryrun) { |
478 | 1.32k | size_t copy_left_offset = std::max(offset + already_read, empty_start); |
479 | 1.32k | size_t copy_right_offset = std::min(right_offset, empty_end); |
480 | 1.32k | char* dst = result.data + (copy_left_offset - offset); |
481 | 1.32k | char* src = buffer.get() + (copy_left_offset - empty_start); |
482 | 1.32k | size_t copy_size = copy_right_offset - copy_left_offset + 1; |
483 | 1.32k | memcpy(dst, src, copy_size); |
484 | 1.32k | indirect_read_bytes += copy_size; |
485 | 1.32k | } |
486 | 1.32k | } |
487 | | |
488 | 1.32k | size_t current_offset = offset; |
489 | 1.32k | size_t end_offset = offset + bytes_req - 1; |
490 | 1.32k | bool need_self_heal = false; |
491 | 1.32k | *bytes_read = 0; |
492 | 1.35k | for (auto& block : holder.file_blocks) { |
493 | 1.35k | if (current_offset > end_offset) { |
494 | 0 | break; |
495 | 0 | } |
496 | 1.35k | size_t left = block->range().left; |
497 | 1.35k | size_t right = block->range().right; |
498 | 1.35k | if (right < offset) { |
499 | 1 | continue; |
500 | 1 | } |
501 | 1.34k | size_t read_size = |
502 | 1.34k | end_offset > right ? right - current_offset + 1 : end_offset - current_offset + 1; |
503 | 1.34k | if (empty_start <= left && right <= empty_end) { |
504 | 1.34k | *bytes_read += read_size; |
505 | 1.34k | current_offset = right + 1; |
506 | 1.34k | continue; |
507 | 1.34k | } |
508 | 7 | FileBlock::State block_state = block->state(); |
509 | 7 | int64_t wait_time = 0; |
510 | 7 | static int64_t max_wait_time = 10; |
511 | 7 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time); |
512 | 7 | if (block_state != FileBlock::State::DOWNLOADED) { |
513 | 2 | SCOPED_CONCURRENCY_COUNT( |
514 | 2 | ConcurrencyStatsManager::instance().cached_remote_reader_blocking); |
515 | 3 | do { |
516 | 3 | SCOPED_RAW_TIMER(&stats.remote_wait_timer); |
517 | 3 | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
518 | 3 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING"); |
519 | 3 | block_state = block->wait(); |
520 | 3 | if (block_state != FileBlock::State::DOWNLOADING) { |
521 | 1 | break; |
522 | 1 | } |
523 | 3 | } while (++wait_time < max_wait_time); |
524 | 2 | } |
525 | 7 | if (wait_time == max_wait_time) [[unlikely]] { |
526 | 1 | LOG_WARNING("Waiting too long for the download to complete"); |
527 | 1 | } |
528 | 7 | { |
529 | 7 | Status st; |
530 | | /* |
531 | | * If block_state == EMPTY, the thread reads the data from remote. |
532 | | * If block_state == DOWNLOADED, when the cache file is deleted by the other process, |
533 | | * the thread reads the data from remote too. |
534 | | */ |
535 | 7 | if (block_state == FileBlock::State::DOWNLOADED) { |
536 | 6 | if (is_dryrun) [[unlikely]] { |
537 | 1 | g_skip_local_cache_io_sum_bytes << read_size; |
538 | 5 | } else { |
539 | 5 | size_t file_offset = current_offset - left; |
540 | 5 | SCOPED_RAW_TIMER(&stats.local_read_timer); |
541 | 5 | SCOPED_CONCURRENCY_COUNT( |
542 | 5 | ConcurrencyStatsManager::instance().cached_remote_reader_local_read); |
543 | 5 | st = block->read(Slice(result.data + (current_offset - offset), read_size), |
544 | 5 | file_offset); |
545 | 5 | indirect_read_bytes += read_size; |
546 | 5 | } |
547 | 6 | } |
548 | 7 | if (!st || block_state != FileBlock::State::DOWNLOADED) { |
549 | 3 | if (is_dryrun) [[unlikely]] { |
550 | | // dryrun mode uses a null buffer, skip actual remote IO |
551 | 3 | } else { |
552 | 3 | if (block_state == FileBlock::State::DOWNLOADED && |
553 | 3 | st.is<ErrorCode::NOT_FOUND>()) { |
554 | 1 | need_self_heal = true; |
555 | 1 | g_read_cache_self_heal_on_not_found << 1; |
556 | 1 | LOG_EVERY_N(WARNING, 100) |
557 | 1 | << "Cache block file is missing, will self-heal by clearing cache " |
558 | 1 | "hash. " |
559 | 1 | << "path=" << path().native() |
560 | 1 | << ", hash=" << _cache_hash.to_string() << ", offset=" << left |
561 | 1 | << ", err=" << st.msg(); |
562 | 1 | } |
563 | 3 | LOG(WARNING) << "Read data failed from file cache downloaded by others. err=" |
564 | 3 | << st.msg() << ", block state=" << block_state; |
565 | 3 | size_t nest_bytes_read {0}; |
566 | 3 | stats.hit_cache = false; |
567 | 3 | stats.from_peer_cache = false; |
568 | 3 | s3_read_counter << 1; |
569 | 3 | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
570 | 3 | RETURN_IF_ERROR(_remote_file_reader->read_at( |
571 | 3 | current_offset, |
572 | 3 | Slice(result.data + (current_offset - offset), read_size), |
573 | 3 | &nest_bytes_read)); |
574 | 2 | indirect_read_bytes += read_size; |
575 | 2 | DCHECK(nest_bytes_read == read_size); |
576 | 2 | } |
577 | 3 | } |
578 | 7 | } |
579 | 6 | *bytes_read += read_size; |
580 | 6 | current_offset = right + 1; |
581 | 6 | } |
582 | 1.32k | if (need_self_heal && _cache != nullptr) { |
583 | 1 | _cache->remove_if_cached_async(_cache_hash); |
584 | 1 | } |
585 | 1.32k | g_read_cache_indirect_bytes << indirect_read_bytes; |
586 | 1.32k | g_read_cache_indirect_total_bytes << *bytes_read; |
587 | | |
588 | 1.32k | DCHECK(*bytes_read == bytes_req); |
589 | 1.32k | return Status::OK(); |
590 | 1.32k | } |
591 | | |
592 | | void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, |
593 | | FileCacheStatistics* statis, |
594 | 9.82k | bool is_inverted_index) const { |
595 | 9.82k | if (statis == nullptr) { |
596 | 0 | return; |
597 | 0 | } |
598 | 9.82k | if (read_stats.hit_cache) { |
599 | 8.47k | statis->num_local_io_total++; |
600 | 8.47k | statis->bytes_read_from_local += read_stats.bytes_read; |
601 | 8.47k | } else { |
602 | 1.35k | if (read_stats.from_peer_cache) { |
603 | 0 | statis->num_peer_io_total++; |
604 | 0 | statis->bytes_read_from_peer += read_stats.bytes_read; |
605 | 0 | statis->peer_io_timer += read_stats.peer_read_timer; |
606 | 1.35k | } else { |
607 | 1.35k | statis->num_remote_io_total++; |
608 | 1.35k | statis->bytes_read_from_remote += read_stats.bytes_read; |
609 | 1.35k | statis->remote_io_timer += read_stats.remote_read_timer; |
610 | 1.35k | } |
611 | 1.35k | } |
612 | 9.82k | statis->remote_wait_timer += read_stats.remote_wait_timer; |
613 | 9.82k | statis->local_io_timer += read_stats.local_read_timer; |
614 | 9.82k | statis->num_skip_cache_io_total += read_stats.skip_cache; |
615 | 9.82k | statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache; |
616 | 9.82k | statis->write_cache_io_timer += read_stats.local_write_timer; |
617 | | |
618 | 9.82k | statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer; |
619 | 9.82k | statis->cache_get_or_set_timer += read_stats.cache_get_or_set_timer; |
620 | 9.82k | statis->lock_wait_timer += read_stats.lock_wait_timer; |
621 | 9.82k | statis->get_timer += read_stats.get_timer; |
622 | 9.82k | statis->set_timer += read_stats.set_timer; |
623 | | |
624 | 9.82k | if (is_inverted_index) { |
625 | 0 | if (read_stats.hit_cache) { |
626 | 0 | statis->inverted_index_num_local_io_total++; |
627 | 0 | statis->inverted_index_bytes_read_from_local += read_stats.bytes_read; |
628 | 0 | } else { |
629 | 0 | if (read_stats.from_peer_cache) { |
630 | 0 | statis->inverted_index_num_peer_io_total++; |
631 | 0 | statis->inverted_index_bytes_read_from_peer += read_stats.bytes_read; |
632 | 0 | statis->inverted_index_peer_io_timer += read_stats.peer_read_timer; |
633 | 0 | } else { |
634 | 0 | statis->inverted_index_num_remote_io_total++; |
635 | 0 | statis->inverted_index_bytes_read_from_remote += read_stats.bytes_read; |
636 | 0 | statis->inverted_index_remote_io_timer += read_stats.remote_read_timer; |
637 | 0 | } |
638 | 0 | } |
639 | 0 | statis->inverted_index_local_io_timer += read_stats.local_read_timer; |
640 | 0 | } |
641 | | |
642 | 9.82k | g_skip_cache_sum << read_stats.skip_cache; |
643 | 9.82k | } |
644 | | |
645 | 0 | void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) { |
646 | 0 | if (offset >= this->size() || size == 0) { |
647 | 0 | return; |
648 | 0 | } |
649 | | |
650 | 0 | size = std::min(size, this->size() - offset); |
651 | |
|
652 | 0 | ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool(); |
653 | 0 | if (pool == nullptr) { |
654 | 0 | return; |
655 | 0 | } |
656 | | |
657 | 0 | IOContext dryrun_ctx; |
658 | 0 | if (io_ctx != nullptr) { |
659 | 0 | dryrun_ctx = *io_ctx; |
660 | 0 | } |
661 | 0 | dryrun_ctx.is_dryrun = true; |
662 | 0 | dryrun_ctx.query_id = nullptr; |
663 | 0 | dryrun_ctx.file_cache_stats = nullptr; |
664 | 0 | dryrun_ctx.file_reader_stats = nullptr; |
665 | |
|
666 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) |
667 | 0 | << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}", |
668 | 0 | offset, size, path().filename().native()); |
669 | 0 | std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this(); |
670 | 0 | auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() { |
671 | 0 | auto self = weak_this.lock(); |
672 | 0 | if (self == nullptr) { |
673 | 0 | return; |
674 | 0 | } |
675 | 0 | size_t bytes_read; |
676 | 0 | Slice dummy_buffer((char*)nullptr, size); |
677 | 0 | (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx); |
678 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) |
679 | 0 | << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}", |
680 | 0 | offset, size, self->path().filename().native()); |
681 | 0 | }); |
682 | |
|
683 | 0 | if (!st.ok()) { |
684 | 0 | VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size |
685 | 0 | << " error=" << st.to_string(); |
686 | 0 | } |
687 | 0 | } |
688 | | |
689 | | } // namespace doris::io |