be/src/io/cache/cached_remote_file_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "io/cache/cached_remote_file_reader.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <bthread/bthread.h> |
22 | | #include <bthread/condition_variable.h> |
23 | | #include <bthread/mutex.h> |
24 | | #include <fmt/format.h> |
25 | | #include <gen_cpp/Types_types.h> |
26 | | #include <gen_cpp/internal_service.pb.h> |
27 | | #include <glog/logging.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <atomic> |
31 | | #include <condition_variable> |
32 | | #include <cstring> |
33 | | #include <functional> |
34 | | #include <list> |
35 | | #include <memory> |
36 | | #include <mutex> |
37 | | #include <thread> |
38 | | #include <vector> |
39 | | |
40 | | #include "cloud/cloud_cluster_info.h" |
41 | | #include "cloud/cloud_warm_up_manager.h" |
42 | | #include "cloud/config.h" |
43 | | #include "common/compiler_util.h" // IWYU pragma: keep |
44 | | #include "common/config.h" |
45 | | #include "common/metrics/doris_metrics.h" |
46 | | #include "cpp/sync_point.h" |
47 | | #include "io/cache/block_file_cache.h" |
48 | | #include "io/cache/block_file_cache_factory.h" |
49 | | #include "io/cache/block_file_cache_profile.h" |
50 | | #include "io/cache/file_block.h" |
51 | | #include "io/cache/peer_file_cache_reader.h" |
52 | | #include "io/fs/file_reader.h" |
53 | | #include "io/fs/local_file_system.h" |
54 | | #include "io/io_common.h" |
55 | | #include "runtime/exec_env.h" |
56 | | #include "runtime/runtime_profile.h" |
57 | | #include "runtime/thread_context.h" |
58 | | #include "runtime/workload_management/io_throttle.h" |
59 | | #include "service/backend_options.h" |
60 | | #include "util/bit_util.h" |
61 | | #include "util/brpc_client_cache.h" // BrpcClientCache |
62 | | #include "util/bthread_utils.h" |
63 | | #include "util/client_cache.h" |
64 | | #include "util/concurrency_stats.h" |
65 | | #include "util/debug_points.h" |
66 | | #include "util/defer_op.h" |
67 | | |
68 | | namespace doris::io { |
69 | | |
70 | | bvar::Adder<uint64_t> s3_read_counter("cached_remote_reader_s3_read"); |
71 | | bvar::Adder<uint64_t> peer_read_counter("cached_remote_reader_peer_read"); |
72 | | bvar::LatencyRecorder g_skip_cache_num("cached_remote_reader_skip_cache_num"); |
73 | | bvar::Adder<uint64_t> g_skip_cache_sum("cached_remote_reader_skip_cache_sum"); |
74 | | bvar::Adder<uint64_t> g_skip_local_cache_io_sum_bytes( |
75 | | "cached_remote_reader_skip_local_cache_io_sum_bytes"); |
76 | | bvar::Adder<uint64_t> g_read_cache_direct_whole_num("cached_remote_reader_cache_direct_whole_num"); |
77 | | bvar::Adder<uint64_t> g_read_cache_direct_partial_num( |
78 | | "cached_remote_reader_cache_direct_partial_num"); |
79 | | bvar::Adder<uint64_t> g_read_cache_indirect_num("cached_remote_reader_cache_indirect_num"); |
80 | | bvar::Adder<uint64_t> g_read_cache_direct_whole_bytes( |
81 | | "cached_remote_reader_cache_direct_whole_bytes"); |
82 | | bvar::Adder<uint64_t> g_read_cache_direct_partial_bytes( |
83 | | "cached_remote_reader_cache_direct_partial_bytes"); |
84 | | bvar::Adder<uint64_t> g_read_cache_indirect_bytes("cached_remote_reader_cache_indirect_bytes"); |
85 | | bvar::Adder<uint64_t> g_read_cache_indirect_total_bytes( |
86 | | "cached_remote_reader_cache_indirect_total_bytes"); |
87 | | bvar::Adder<uint64_t> g_read_cache_self_heal_on_not_found( |
88 | | "cached_remote_reader_self_heal_on_not_found"); |
89 | | bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_bytes_1min_window( |
90 | | "cached_remote_reader_indirect_bytes_1min_window", &g_read_cache_indirect_bytes, 60); |
91 | | bvar::Window<bvar::Adder<uint64_t>> g_read_cache_indirect_total_bytes_1min_window( |
92 | | "cached_remote_reader_indirect_total_bytes_1min_window", &g_read_cache_indirect_total_bytes, |
93 | | 60); |
94 | | bvar::Adder<uint64_t> g_failed_get_peer_addr_counter( |
95 | | "cached_remote_reader_failed_get_peer_addr_counter"); |
96 | | |
97 | | static std::atomic<int> g_active_peer_races {0}; |
98 | | bvar::PassiveStatus<int> g_active_peer_races_bvar( |
99 | | "peer_race_active_count", |
100 | 3.27k | [](void*) { return g_active_peer_races.load(std::memory_order_relaxed); }, nullptr); |
101 | | // Cross-CG peer read race statistics |
102 | | bvar::Adder<uint64_t> g_peer_race_peer_win("peer_race_peer_win"); |
103 | | bvar::Adder<uint64_t> g_peer_race_s3_win("peer_race_s3_win"); |
104 | | bvar::Adder<uint64_t> g_peer_race_both_fail("peer_race_both_fail"); |
105 | | bvar::Adder<uint64_t> g_peer_cross_compute_group_read("peer_cross_compute_group_read"); |
106 | | bvar::Adder<uint64_t> g_peer_same_compute_group_read("peer_same_compute_group_read"); |
107 | | bvar::Adder<uint64_t> g_peer_lazy_fetch_triggered("peer_lazy_fetch_triggered"); |
108 | | |
109 | | CachedRemoteFileReader::CachedRemoteFileReader(FileReaderSPtr remote_file_reader, |
110 | | const FileReaderOptions& opts) |
111 | 7.17k | : _is_doris_table(opts.is_doris_table), |
112 | 7.17k | _tablet_id(opts.tablet_id), |
113 | 7.17k | _storage_resource_id(opts.storage_resource_id), |
114 | 7.17k | _remote_file_reader(std::move(remote_file_reader)) { |
115 | 7.17k | DCHECK(!_is_doris_table || _tablet_id > 0); |
116 | 7.17k | if (_is_doris_table) { |
117 | 7.17k | _init_doris_table_cache(); |
118 | 7.17k | } else { |
119 | 6 | _init_external_table_cache(opts); |
120 | 6 | } |
121 | 7.17k | } |
122 | | |
123 | 7.17k | void CachedRemoteFileReader::_init_doris_table_cache() { |
124 | 7.17k | _cache_hash = BlockFileCache::hash(path().filename().native()); |
125 | 7.17k | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
126 | 7.17k | if (_can_read_cache_file_directly()) { |
127 | | // this is designed for and test in doris table, external table need extra tests |
128 | 16 | _cache_file_readers = _cache->get_blocks_by_key(_cache_hash); |
129 | 16 | } |
130 | 7.17k | } |
131 | | |
132 | 8 | void CachedRemoteFileReader::_init_external_table_cache(const FileReaderOptions& opts) { |
133 | | // Use path and modification time to build cache key. |
134 | 8 | std::string unique_path = fmt::format("{}:{}", path().native(), opts.mtime); |
135 | 8 | _cache_hash = BlockFileCache::hash(unique_path); |
136 | 8 | if (opts.cache_base_path.empty()) { |
137 | | // If cache path is not specified by session variable, choose randomly. |
138 | 1 | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
139 | 1 | return; |
140 | 1 | } |
141 | | |
142 | | // From query session variable: file_cache_base_path. |
143 | 7 | _cache = FileCacheFactory::instance()->get_by_path(opts.cache_base_path); |
144 | 7 | if (_cache != nullptr) { |
145 | 6 | return; |
146 | 6 | } |
147 | | |
148 | 7 | LOG(WARNING) << "Can't get cache from base path: " << opts.cache_base_path |
149 | 1 | << ", using random instead."; |
150 | 1 | _cache = FileCacheFactory::instance()->get_by_path(_cache_hash); |
151 | 1 | } |
152 | | |
153 | 1.43M | bool CachedRemoteFileReader::_can_read_cache_file_directly() const { |
154 | 1.43M | return _is_doris_table && config::enable_read_cache_file_directly; |
155 | 1.43M | } |
156 | | |
157 | 5.18k | bool CachedRemoteFileReader::_should_read_from_peer(const IOContext* io_ctx) const { |
158 | 5.18k | return doris::config::is_cloud_mode() && _is_doris_table && _tablet_id > 0 && |
159 | 5.18k | !io_ctx->is_warmup && !io_ctx->bypass_peer_read && |
160 | 5.18k | doris::config::enable_cache_read_from_peer; |
161 | 5.18k | } |
162 | | |
163 | 717k | void CachedRemoteFileReader::_insert_file_reader(FileBlockSPtr file_block) { |
164 | 717k | if (_can_read_cache_file_directly()) { |
165 | 34 | std::lock_guard lock(_mtx); |
166 | 34 | DCHECK(file_block->state() == FileBlock::State::DOWNLOADED); |
167 | 34 | file_block->_owned_by_cached_reader = true; |
168 | 34 | _cache_file_readers.emplace(file_block->offset(), std::move(file_block)); |
169 | 34 | } |
170 | 717k | } |
171 | | |
172 | 7.17k | CachedRemoteFileReader::~CachedRemoteFileReader() { |
173 | 7.17k | for (auto& it : _cache_file_readers) { |
174 | 55 | it.second->_owned_by_cached_reader = false; |
175 | 55 | } |
176 | 7.17k | static_cast<void>(close()); |
177 | 7.17k | } |
178 | | |
179 | 8.19k | Status CachedRemoteFileReader::close() { |
180 | 8.19k | return _remote_file_reader->close(); |
181 | 8.19k | } |
182 | | |
183 | | std::pair<size_t, size_t> CachedRemoteFileReader::s_align_size(size_t offset, size_t read_size, |
184 | 718k | size_t length) { |
185 | 718k | size_t left = offset; |
186 | 718k | size_t right = offset + read_size - 1; |
187 | 718k | size_t align_left = |
188 | 718k | (left / config::file_cache_each_block_size) * config::file_cache_each_block_size; |
189 | 718k | size_t align_right = |
190 | 718k | (right / config::file_cache_each_block_size + 1) * config::file_cache_each_block_size; |
191 | 718k | align_right = align_right < length ? align_right : length; |
192 | 718k | size_t align_size = align_right - align_left; |
193 | 718k | if (align_size < config::file_cache_each_block_size && align_left != 0) { |
194 | 3 | align_size += config::file_cache_each_block_size; |
195 | 3 | align_left -= config::file_cache_each_block_size; |
196 | 3 | } |
197 | 718k | return std::make_pair(align_left, align_size); |
198 | 718k | } |
199 | | |
200 | | namespace { |
201 | | struct PeerFetchLayout { |
202 | | std::vector<size_t> block_offsets; |
203 | | std::vector<size_t> block_sizes; |
204 | | size_t total_size = 0; |
205 | | }; |
206 | | |
207 | 9 | bool is_fill_not_found(const Status& st, bool request_fill) { |
208 | 9 | return request_fill && st.is<ErrorCode::NOT_FOUND>(); |
209 | 9 | } |
210 | | |
211 | 48 | bool contains_file_block(const PeerFetchedBlockSet& fetched_blocks, const FileBlockSPtr& block) { |
212 | 48 | return fetched_blocks.contains(block.get()); |
213 | 48 | } |
214 | | |
215 | 5.24k | size_t clip_peer_block_size(const FileBlock::Range& range, size_t file_size) { |
216 | 5.24k | if (range.left >= file_size) { |
217 | 0 | return 0; |
218 | 0 | } |
219 | 5.24k | return std::min(file_size - range.left, range.size()); |
220 | 5.24k | } |
221 | | |
222 | | PeerFetchLayout build_peer_fetch_layout(const std::vector<FileBlockSPtr>& blocks, |
223 | 5.18k | size_t file_size) { |
224 | 5.18k | PeerFetchLayout layout; |
225 | 5.18k | layout.block_offsets.reserve(blocks.size()); |
226 | 5.18k | layout.block_sizes.reserve(blocks.size()); |
227 | 5.24k | for (const auto& block : blocks) { |
228 | 5.24k | const size_t block_size = clip_peer_block_size(block->range(), file_size); |
229 | 5.24k | layout.block_offsets.push_back(layout.total_size); |
230 | 5.24k | layout.block_sizes.push_back(block_size); |
231 | 5.24k | layout.total_size += block_size; |
232 | 5.24k | } |
233 | 5.18k | return layout; |
234 | 5.18k | } |
235 | | |
236 | | Status write_peer_payloads_into_block(const FileBlockSPtr& block, |
237 | | std::vector<const PeerFetchChunk*>& chunks, |
238 | 40 | size_t* block_size) { |
239 | 40 | if (block_size == nullptr) { |
240 | 0 | return Status::InvalidArgument("peer block write requires non-null block_size"); |
241 | 0 | } |
242 | 40 | *block_size = 0; |
243 | 40 | if (chunks.empty()) { |
244 | 0 | return Status::OK(); |
245 | 0 | } |
246 | 40 | std::sort(chunks.begin(), chunks.end(), |
247 | 40 | [](const PeerFetchChunk* lhs, const PeerFetchChunk* rhs) { |
248 | 0 | return lhs->block_offset < rhs->block_offset; |
249 | 0 | }); |
250 | 40 | butil::IOBuf payload; |
251 | 40 | for (const auto* chunk : chunks) { |
252 | 40 | *block_size += chunk->payload.length(); |
253 | 40 | payload.append(chunk->payload); |
254 | 40 | } |
255 | 40 | DCHECK(*block_size != 0); |
256 | 40 | return block->append_iobuf(payload); |
257 | 40 | } |
258 | | |
259 | | void copy_peer_chunk_to_result(const PeerFetchChunk& chunk, size_t offset, size_t right_offset, |
260 | | size_t already_read, Slice result, size_t& indirect_read_bytes, |
261 | 37 | SourceReadBreakdown& source_read_breakdown) { |
262 | 37 | const size_t payload_size = chunk.payload.length(); |
263 | 37 | if (payload_size == 0) { |
264 | 0 | return; |
265 | 0 | } |
266 | 37 | const size_t chunk_left = chunk.block_offset; |
267 | 37 | const size_t chunk_right = chunk_left + payload_size - 1; |
268 | 37 | const size_t copy_left_offset = std::max(offset + already_read, chunk_left); |
269 | 37 | const size_t copy_right_offset = std::min(right_offset, chunk_right); |
270 | 37 | if (copy_left_offset > copy_right_offset) { |
271 | 0 | return; |
272 | 0 | } |
273 | 37 | const size_t copy_offset = copy_left_offset - chunk_left; |
274 | 37 | const size_t copy_size = copy_right_offset - copy_left_offset + 1; |
275 | 37 | char* dst = result.data + (copy_left_offset - offset); |
276 | 37 | chunk.payload.copy_to(dst, copy_size, copy_offset); |
277 | 37 | indirect_read_bytes += copy_size; |
278 | 37 | source_read_breakdown.peer_bytes += copy_size; |
279 | 37 | } |
280 | | |
281 | | // Execute peer read targeting a specific host:port. |
282 | | Status execute_peer_read(const std::vector<FileBlockSPtr>& empty_blocks, |
283 | | PeerFetchResult* peer_result, const std::string& file_path, |
284 | | size_t file_size, bool is_doris_table, ReadStatistics& stats, |
285 | 12 | const IOContext* io_ctx, const std::string& host, int32_t port) { |
286 | 12 | VLOG_DEBUG << "PeerFileCacheReader read from peer, host=" << host << ", port=" << port |
287 | 0 | << ", file_path=" << file_path; |
288 | | |
289 | 12 | if (host.empty() || port == 0) { |
290 | 0 | g_failed_get_peer_addr_counter << 1; |
291 | 0 | LOG_EVERY_N(WARNING, 100) << "PeerFileCacheReader host or port is empty" |
292 | 0 | << ", host=" << host << ", port=" << port |
293 | 0 | << ", file_path=" << file_path; |
294 | 0 | return Status::InternalError<false>("host or port is empty"); |
295 | 0 | } |
296 | 12 | SCOPED_RAW_TIMER(&stats.peer_read_timer); |
297 | 12 | peer_read_counter << 1; |
298 | 12 | PeerFileCacheReader peer_reader(file_path, is_doris_table, host, port); |
299 | | // Serial peer read: source BE has the data from rebalance warm-up; no fill needed. |
300 | 12 | auto st = peer_reader.fetch_blocks(empty_blocks, peer_result, file_size, io_ctx, |
301 | 12 | /*request_fill=*/false); |
302 | 12 | if (!st.ok()) { |
303 | 4 | LOG_WARNING("PeerFileCacheReader read from peer failed") |
304 | 4 | .tag("host", host) |
305 | 4 | .tag("port", port) |
306 | 4 | .tag("error", st.msg()); |
307 | 4 | } |
308 | 12 | stats.from_peer_cache = st.ok(); |
309 | 12 | return st; |
310 | 12 | } |
311 | | |
312 | | // Execute S3 read |
313 | | Status execute_s3_read(size_t empty_start, size_t& size, std::unique_ptr<char[]>& buffer, |
314 | | ReadStatistics& stats, const IOContext* io_ctx, |
315 | 5.15k | FileReaderSPtr remote_file_reader) { |
316 | 5.15k | s3_read_counter << 1; |
317 | 5.15k | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
318 | 5.15k | stats.from_peer_cache = false; |
319 | 5.15k | return remote_file_reader->read_at(empty_start, Slice(buffer.get(), size), &size, io_ctx); |
320 | 5.15k | } |
321 | | |
322 | 18 | CloudWarmUpManager& get_warm_up_manager() { |
323 | 18 | return ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
324 | 18 | } |
325 | | |
326 | | // Shared state for peer-vs-S3 winner race. |
327 | | // Uses bthread primitives — never std::mutex/condition_variable in bthread context. |
328 | | struct RaceState { |
329 | | bthread::Mutex mtx; |
330 | | bthread::ConditionVariable cv; |
331 | | int winner = -1; // 0=peer won, 1=s3 won, -1=undecided, -2=both failed |
332 | | bool peer_done = false; |
333 | | bool s3_done = false; |
334 | | Status peer_status; |
335 | | Status s3_status; |
336 | | std::unique_ptr<char[]> s3_buf; |
337 | | PeerFetchResult peer_res; |
338 | | std::string peer_winner_cg_id; // compute_group_id of the winning peer candidate |
339 | | std::string peer_winner_host; // host of the winning peer candidate |
340 | | int64_t peer_elapsed_ns = 0; // wall-clock time of the entire peer path (including retries) |
341 | | int64_t peer_winner_io_ns = 0; // I/O time of the winning candidate only |
342 | | }; |
343 | | |
344 | | // Peer race logic: try candidates sequentially until one succeeds or all fail. |
345 | | // NOTE: Do NOT capture io_ctx here — it points into the caller's stack which may be destroyed |
346 | | // when S3 wins the race and the caller returns before this bthread finishes. |
347 | | void run_peer_race(std::shared_ptr<RaceState> race, std::vector<FileBlockSPtr> empty_blocks, |
348 | | const std::string& file_path, size_t file_sz, bool is_doris, |
349 | | std::shared_ptr<CloudWarmUpManager> manager, |
350 | | std::vector<doris::PeerCandidate> candidates, int64_t tablet_id, |
351 | 14 | std::string resource_id, std::shared_ptr<ResourceContext> parent_resource_ctx) { |
352 | 14 | std::unique_ptr<AttachTask> attach_task; |
353 | 14 | if (parent_resource_ctx != nullptr) { |
354 | 3 | attach_task = std::make_unique<AttachTask>(parent_resource_ctx); |
355 | 3 | } |
356 | | |
357 | 14 | bool all_tried = true; |
358 | 14 | MonotonicStopWatch peer_sw; |
359 | 14 | peer_sw.start(); |
360 | | |
361 | 22 | for (size_t i = 0; i < candidates.size(); ++i) { |
362 | | // Before issuing the next RPC, check if S3 already won. |
363 | 17 | if (i > 0) { |
364 | 3 | TEST_SYNC_POINT("run_peer_race::between_candidates"); |
365 | 3 | std::unique_lock<bthread::Mutex> lk(race->mtx); |
366 | 3 | if (race->winner > 0) { |
367 | | // S3 already won — stop, but not all candidates were tried. |
368 | 1 | all_tried = false; |
369 | 1 | break; |
370 | 1 | } |
371 | 3 | } |
372 | | |
373 | 16 | const auto& cand = candidates[i]; |
374 | 16 | peer_read_counter << 1; |
375 | 16 | PeerFileCacheReader peer_reader(file_path, is_doris, cand.host, cand.brpc_port); |
376 | 16 | PeerFetchResult local_peer_res; |
377 | 16 | const bool request_fill = |
378 | 16 | !config::peer_cache_fill_compute_group_id.empty() && |
379 | 16 | cand.compute_group_id == config::peer_cache_fill_compute_group_id && |
380 | 16 | !resource_id.empty() && !file_path.empty(); |
381 | 16 | MonotonicStopWatch cand_sw; |
382 | 16 | cand_sw.start(); |
383 | 16 | auto st = peer_reader.fetch_blocks(empty_blocks, &local_peer_res, file_sz, |
384 | 16 | /*ctx=*/nullptr, request_fill, tablet_id, resource_id); |
385 | 16 | if (st.ok()) { |
386 | 7 | manager->update_peer_candidate_on_success(tablet_id, cand.compute_group_id); |
387 | 7 | std::unique_lock<bthread::Mutex> lk(race->mtx); |
388 | 7 | if (race->winner < 0) { |
389 | 7 | race->winner = 0; |
390 | 7 | race->peer_res = std::move(local_peer_res); |
391 | 7 | race->peer_winner_cg_id = cand.compute_group_id; |
392 | 7 | race->peer_winner_host = cand.host; |
393 | 7 | race->peer_elapsed_ns = peer_sw.elapsed_time(); |
394 | 7 | race->peer_winner_io_ns = cand_sw.elapsed_time(); |
395 | 7 | } |
396 | 7 | race->peer_done = true; |
397 | 7 | race->peer_status = Status::OK(); |
398 | 7 | race->cv.notify_all(); |
399 | 7 | return; |
400 | 7 | } |
401 | | |
402 | | // Handle per-candidate failure. |
403 | 9 | if (st.template is<ErrorCode::TOO_MANY_TASKS>()) { |
404 | 0 | all_tried = false; |
405 | 0 | break; |
406 | 0 | } |
407 | 9 | if (is_fill_not_found(st, request_fill)) { |
408 | | // Pull-through fill already told us this designated fill CG could not serve the block |
409 | | // in time. Do not serially retry additional candidates in the same race; let S3 win |
410 | | // instead of paying more peer RPC latency. |
411 | 1 | manager->rotate_peer_candidate_on_cache_miss(tablet_id, cand.host, cand.brpc_port); |
412 | 1 | all_tried = false; |
413 | 1 | break; |
414 | 1 | } |
415 | 8 | if (st.template is<ErrorCode::NOT_FOUND>()) { |
416 | 5 | manager->rotate_peer_candidate_on_cache_miss(tablet_id, cand.host, cand.brpc_port); |
417 | 5 | } else { |
418 | 3 | manager->update_peer_candidate_on_rpc_failure(tablet_id, cand.host, cand.brpc_port); |
419 | 3 | } |
420 | 8 | } |
421 | | |
422 | 7 | if (all_tried) { |
423 | 5 | manager->record_peer_all_miss(tablet_id); |
424 | 5 | } |
425 | 7 | std::unique_lock<bthread::Mutex> lk(race->mtx); |
426 | 7 | race->peer_done = true; |
427 | 7 | race->peer_status = Status::InternalError<false>("peer: all candidates failed"); |
428 | 7 | if (race->winner < 0 && race->s3_done) { |
429 | 0 | race->winner = race->s3_status.ok() ? 1 : -2; |
430 | 0 | } |
431 | 7 | race->cv.notify_all(); |
432 | 7 | } |
433 | | |
434 | | // Apply hedge delay, then submit S3 read to the thread pool (or run inline). |
435 | | void launch_s3_race(std::shared_ptr<RaceState> race, size_t empty_start, size_t span_size, |
436 | | const IOContext* io_ctx, FileReaderSPtr remote_reader, |
437 | | std::shared_ptr<ResourceContext> parent_resource_ctx, |
438 | 14 | std::shared_ptr<CachedRemoteFileReader> owner) { |
439 | | // Raw S3 read body. |
440 | | // `owner` keeps the CachedRemoteFileReader alive until the S3 task finishes, |
441 | | // preventing close() from being called on remote_reader while we are still reading. |
442 | | // Do NOT capture io_ctx: it points into the caller's stack/iterator which may be |
443 | | // destroyed when the query is cancelled before this background task runs. The S3 |
444 | | // leg of the race is a best-effort background task whose result is discarded if the |
445 | | // peer wins; passing nullptr is safe because S3FileReader::read_at_impl ignores it. |
446 | 14 | auto do_s3_read = [race, empty_start, span_size, remote_reader, owner]() { |
447 | 8 | (void)owner; |
448 | 8 | auto s3_buf = std::make_unique<char[]>(span_size); |
449 | 8 | size_t read_size = span_size; |
450 | 8 | s3_read_counter << 1; |
451 | 8 | TEST_SYNC_POINT("CachedRemoteFileReader::_execute_winner_race::s3_before_read"); |
452 | 8 | auto st = remote_reader->read_at(empty_start, Slice(s3_buf.get(), span_size), &read_size, |
453 | 8 | nullptr); |
454 | 8 | std::unique_lock<bthread::Mutex> lk(race->mtx); |
455 | 8 | race->s3_done = true; |
456 | 8 | race->s3_status = st; |
457 | 8 | if (st.ok() && race->winner < 0) { |
458 | 7 | race->winner = 1; |
459 | 7 | race->s3_buf = std::move(s3_buf); |
460 | 7 | } |
461 | 8 | race->cv.notify_all(); |
462 | 8 | }; |
463 | | |
464 | | // Hedge delay: give peer a head start, but wake up early if peer finishes. |
465 | | // Uses cv.wait_for() instead of bthread_usleep() so the calling thread is |
466 | | // unblocked as soon as the peer bthread signals completion, avoiding the |
467 | | // unconditional 20ms sleep that dominated latency on cache-miss-heavy queries. |
468 | 14 | bool peer_already_won = false; |
469 | 14 | if (config::peer_race_hedge_delay_ms > 0) { |
470 | 13 | std::unique_lock<bthread::Mutex> lk(race->mtx); |
471 | 13 | if (!race->peer_done) { |
472 | 13 | race->cv.wait_for(lk, static_cast<long>(config::peer_race_hedge_delay_ms) * 1000); |
473 | 13 | } |
474 | 13 | peer_already_won = (race->winner == 0); |
475 | 13 | if (peer_already_won) { |
476 | 6 | race->s3_done = true; |
477 | 6 | race->s3_status = Status::InternalError<false>("skipped: peer won during hedge delay"); |
478 | 6 | } |
479 | 13 | } |
480 | | |
481 | 14 | if (!peer_already_won) { |
482 | 8 | auto s3_fn = [do_s3_read, parent_resource_ctx]() mutable { |
483 | 7 | std::unique_ptr<AttachTask> attach_task; |
484 | 7 | if (parent_resource_ctx != nullptr) { |
485 | 1 | attach_task = std::make_unique<AttachTask>(parent_resource_ctx); |
486 | 1 | } |
487 | 7 | do_s3_read(); |
488 | 7 | }; |
489 | 8 | auto* s3_pool = ExecEnv::GetInstance()->peer_race_s3_thread_pool(); |
490 | 8 | if (s3_pool == nullptr || !s3_pool->submit_func(s3_fn).ok()) { |
491 | 1 | do_s3_read(); |
492 | 1 | } |
493 | 8 | } |
494 | 14 | } |
495 | | |
496 | | // Wait for the race to finish and populate the output accordingly. |
497 | | Status collect_race_result(std::shared_ptr<RaceState> race, size_t span_size, |
498 | | std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, |
499 | 14 | ReadStatistics& stats, const IOContext* io_ctx) { |
500 | 14 | { |
501 | 14 | std::unique_lock<bthread::Mutex> lk(race->mtx); |
502 | 21 | while (race->winner < 0 && !(race->peer_done && race->s3_done)) { |
503 | 7 | race->cv.wait(lk); |
504 | 7 | } |
505 | 14 | } |
506 | 14 | g_active_peer_races.fetch_sub(1, std::memory_order_relaxed); |
507 | | |
508 | 14 | const std::string self_cg_id = |
509 | 14 | static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info()) |
510 | 14 | ->cloud_compute_group_id(); |
511 | | |
512 | 14 | if (race->winner == 0) { |
513 | | // Peer won. |
514 | 7 | if (peer_result != nullptr) { |
515 | 7 | *peer_result = std::move(race->peer_res); |
516 | 7 | } |
517 | 7 | stats.from_peer_cache = true; |
518 | 7 | stats.peer_read_timer += race->peer_elapsed_ns; |
519 | 7 | g_peer_race_peer_win << 1; |
520 | 7 | const bool is_cross_cg = |
521 | 7 | !race->peer_winner_cg_id.empty() && race->peer_winner_cg_id != self_cg_id; |
522 | 7 | if (is_cross_cg) { |
523 | 5 | g_peer_cross_compute_group_read << 1; |
524 | 5 | } else { |
525 | 2 | g_peer_same_compute_group_read << 1; |
526 | 2 | } |
527 | 7 | if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) { |
528 | 7 | io_ctx->file_cache_stats->num_peer_race_peer_win++; |
529 | 7 | io_ctx->file_cache_stats->peer_hosts.insert(race->peer_winner_host); |
530 | 7 | if (is_cross_cg) { |
531 | 5 | io_ctx->file_cache_stats->num_cross_cg_peer_io_total++; |
532 | 5 | io_ctx->file_cache_stats->bytes_read_from_cross_cg_peer += span_size; |
533 | 5 | io_ctx->file_cache_stats->cross_cg_peer_io_timer += race->peer_winner_io_ns; |
534 | 5 | } else { |
535 | 2 | io_ctx->file_cache_stats->num_same_cg_peer_io_total++; |
536 | 2 | io_ctx->file_cache_stats->bytes_read_from_same_cg_peer += span_size; |
537 | 2 | io_ctx->file_cache_stats->same_cg_peer_io_timer += race->peer_winner_io_ns; |
538 | 2 | } |
539 | 7 | } |
540 | 7 | return Status::OK(); |
541 | 7 | } else if (race->winner == 1) { |
542 | | // S3 won. |
543 | 7 | buffer = std::move(race->s3_buf); |
544 | 7 | stats.from_peer_cache = false; |
545 | 7 | g_peer_race_s3_win << 1; |
546 | 7 | if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) { |
547 | 7 | io_ctx->file_cache_stats->num_peer_race_s3_win++; |
548 | 7 | } |
549 | 7 | return Status::OK(); |
550 | 7 | } |
551 | 0 | g_peer_race_both_fail << 1; |
552 | 0 | return Status::InternalError<false>("peer race: both peer and s3 failed"); |
553 | 14 | } |
554 | | |
555 | | } // anonymous namespace |
556 | | |
557 | | Status CachedRemoteFileReader::_execute_s3_fallback(size_t empty_start, size_t span_size, |
558 | | std::unique_ptr<char[]>& buffer, |
559 | | PeerFetchResult* peer_result, |
560 | | ReadStatistics& stats, |
561 | 5.15k | const IOContext* io_ctx) { |
562 | 5.15k | if (peer_result != nullptr) { |
563 | 5.15k | peer_result->clear(); |
564 | 5.15k | } |
565 | 5.15k | buffer.reset(new char[span_size]); |
566 | 5.15k | size_t read_size = span_size; |
567 | 5.15k | return execute_s3_read(empty_start, read_size, buffer, stats, io_ctx, _remote_file_reader); |
568 | 5.15k | } |
569 | | |
570 | | Status CachedRemoteFileReader::_execute_sequential_peer_read( |
571 | | const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, size_t span_size, |
572 | | std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, ReadStatistics& stats, |
573 | | const IOContext* io_ctx, const std::vector<doris::PeerCandidate>& candidates, |
574 | 1 | int64_t tablet_id) { |
575 | | // candidates[0] already reflects last_successful_compute_group_id affinity: |
576 | | // get_peer_candidates() applies stable_partition before returning. |
577 | 1 | if (candidates.empty()) { |
578 | 0 | return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx); |
579 | 0 | } |
580 | | |
581 | 1 | auto& manager = get_warm_up_manager(); |
582 | 1 | PeerFetchResult serial_res; |
583 | 1 | const int64_t timer_before = stats.peer_read_timer; |
584 | 1 | auto st = execute_peer_read(empty_blocks, &serial_res, path().native(), this->size(), |
585 | 1 | _is_doris_table, stats, io_ctx, candidates[0].host, |
586 | 1 | candidates[0].brpc_port); |
587 | 1 | if (st.ok()) { |
588 | 1 | manager.update_peer_candidate_on_success(tablet_id, candidates[0].compute_group_id); |
589 | 1 | if (peer_result != nullptr) { |
590 | 1 | *peer_result = std::move(serial_res); |
591 | 1 | } |
592 | | // Update profile counters for cross/same CG stats. |
593 | 1 | const std::string self_cg_id = |
594 | 1 | static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info()) |
595 | 1 | ->cloud_compute_group_id(); |
596 | 1 | const bool is_cross_cg = !candidates[0].compute_group_id.empty() && |
597 | 1 | candidates[0].compute_group_id != self_cg_id; |
598 | 1 | if (is_cross_cg) { |
599 | 1 | g_peer_cross_compute_group_read << 1; |
600 | 1 | } else { |
601 | 0 | g_peer_same_compute_group_read << 1; |
602 | 0 | } |
603 | 1 | if (io_ctx != nullptr && io_ctx->file_cache_stats != nullptr) { |
604 | 1 | io_ctx->file_cache_stats->peer_hosts.insert(candidates[0].host); |
605 | 1 | if (is_cross_cg) { |
606 | 1 | io_ctx->file_cache_stats->num_cross_cg_peer_io_total++; |
607 | 1 | io_ctx->file_cache_stats->bytes_read_from_cross_cg_peer += span_size; |
608 | 1 | io_ctx->file_cache_stats->cross_cg_peer_io_timer += |
609 | 1 | stats.peer_read_timer - timer_before; |
610 | 1 | } else { |
611 | 0 | io_ctx->file_cache_stats->num_same_cg_peer_io_total++; |
612 | 0 | io_ctx->file_cache_stats->bytes_read_from_same_cg_peer += span_size; |
613 | 0 | io_ctx->file_cache_stats->same_cg_peer_io_timer += |
614 | 0 | stats.peer_read_timer - timer_before; |
615 | 0 | } |
616 | 1 | } |
617 | 1 | return st; |
618 | 1 | } |
619 | | // Track failure so affinity / eviction logic stays consistent with the race path. |
620 | 0 | if (st.is<ErrorCode::TOO_MANY_TASKS>()) { |
621 | | // Server healthy but overloaded — don't penalize candidate. |
622 | 0 | } else if (st.is<ErrorCode::NOT_FOUND>()) { |
623 | 0 | manager.rotate_peer_candidate_on_cache_miss(tablet_id, candidates[0].host, |
624 | 0 | candidates[0].brpc_port); |
625 | 0 | } else { |
626 | 0 | manager.update_peer_candidate_on_rpc_failure(tablet_id, candidates[0].host, |
627 | 0 | candidates[0].brpc_port); |
628 | 0 | } |
629 | 0 | return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx); |
630 | 1 | } |
631 | | |
632 | | Status CachedRemoteFileReader::_execute_remote_read(const std::vector<FileBlockSPtr>& empty_blocks, |
633 | | size_t empty_start, size_t span_size, |
634 | | std::unique_ptr<char[]>& buffer, |
635 | | PeerFetchResult* peer_result, |
636 | | ReadStatistics& stats, |
637 | 5.18k | const IOContext* io_ctx) { |
638 | | // --- Non-peer path: direct S3 --- |
639 | 5.18k | if (!_should_read_from_peer(io_ctx)) { |
640 | 5.15k | return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx); |
641 | 5.15k | } |
642 | | |
643 | | // --- UT debug point: injected peer address --- |
644 | 28 | DBUG_EXECUTE_IF("PeerFileCacheReader::_fetch_from_peer_cache_blocks", { |
645 | 28 | std::string dp_host = dp->param<std::string>("host", "127.0.0.1"); |
646 | 28 | int32_t dp_port = dp->param("port", 9060); |
647 | 28 | buffer.reset(); |
648 | 28 | DCHECK(peer_result != nullptr); |
649 | 28 | peer_result->clear(); |
650 | 28 | auto st = execute_peer_read(empty_blocks, peer_result, path().native(), this->size(), |
651 | 28 | _is_doris_table, stats, io_ctx, dp_host, dp_port); |
652 | 28 | if (st.ok()) return st; |
653 | 28 | return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx); |
654 | 28 | }); |
655 | | |
656 | | // --- Resolve tablet and obtain peer candidates --- |
657 | 17 | int64_t tablet_id = _tablet_id; |
658 | 17 | auto& manager = get_warm_up_manager(); |
659 | 17 | auto candidates = manager.get_peer_candidates(tablet_id); |
660 | 17 | if (candidates.empty()) { |
661 | 2 | if (!manager.is_peer_cooldown(tablet_id)) { |
662 | | // Cold miss: trigger background FE fetch and fall back to S3. |
663 | 1 | g_peer_lazy_fetch_triggered << 1; |
664 | 1 | auto manager_ptr = |
665 | 1 | ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager_ptr(); |
666 | 1 | start_bthread([manager_ptr = std::move(manager_ptr), tablet_id]() { |
667 | 1 | manager_ptr->fetch_candidates_from_fe(tablet_id); |
668 | 1 | }); |
669 | 1 | } |
670 | 2 | return _execute_s3_fallback(empty_start, span_size, buffer, peer_result, stats, io_ctx); |
671 | 2 | } |
672 | | |
673 | | // --- Dispatch: concurrent race or sequential fallback --- |
674 | | // Candidates are already sorted by last_successful_compute_group_id affinity |
675 | | // (stable_partition in get_peer_candidates), so the winner race peer bthread |
676 | | // naturally tries the most promising candidate first — whether same-CG or cross-CG. |
677 | 15 | if (config::enable_peer_s3_race) { |
678 | 14 | return _execute_winner_race(empty_blocks, empty_start, span_size, buffer, peer_result, |
679 | 14 | stats, io_ctx, candidates, tablet_id); |
680 | 14 | } |
681 | 1 | return _execute_sequential_peer_read(empty_blocks, empty_start, span_size, buffer, peer_result, |
682 | 1 | stats, io_ctx, candidates, tablet_id); |
683 | 15 | } |
684 | | |
685 | | Status CachedRemoteFileReader::_execute_winner_race( |
686 | | const std::vector<FileBlockSPtr>& empty_blocks, size_t empty_start, size_t span_size, |
687 | | std::unique_ptr<char[]>& buffer, PeerFetchResult* peer_result, ReadStatistics& stats, |
688 | | const IOContext* io_ctx, const std::vector<doris::PeerCandidate>& candidates, |
689 | 14 | int64_t tablet_id) { |
690 | | // Reserve a race slot; degrade to sequential if at limit. |
691 | 14 | if (g_active_peer_races.fetch_add(1, std::memory_order_relaxed) >= |
692 | 14 | config::max_concurrent_peer_races) { |
693 | 0 | g_active_peer_races.fetch_sub(1, std::memory_order_relaxed); |
694 | 0 | return _execute_sequential_peer_read(empty_blocks, empty_start, span_size, buffer, |
695 | 0 | peer_result, stats, io_ctx, candidates, tablet_id); |
696 | 0 | } |
697 | | |
698 | 14 | auto race = std::make_shared<RaceState>(); |
699 | 14 | auto manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager_ptr(); |
700 | | |
701 | | // Capture context for child threads. |
702 | 14 | const std::string file_path = path().native(); |
703 | 14 | const size_t file_sz = this->size(); |
704 | 14 | const bool is_doris = _is_doris_table; |
705 | 14 | auto remote_reader = _remote_file_reader; |
706 | 14 | std::shared_ptr<ResourceContext> parent_resource_ctx; |
707 | 14 | auto* parent_thread_context = thread_context(); |
708 | 14 | if (parent_thread_context != nullptr && parent_thread_context->is_attach_task()) { |
709 | 3 | parent_resource_ctx = parent_thread_context->resource_ctx(); |
710 | 3 | } |
711 | | |
712 | | // Launch peer bthread. |
713 | 14 | start_bthread( |
714 | 14 | [race, empty_blocks = std::move(empty_blocks), file_path, file_sz, is_doris, |
715 | 14 | manager = std::move(manager), candidates = std::move(candidates), tablet_id, |
716 | 14 | resource_id = _storage_resource_id, parent_resource_ctx]() mutable { |
717 | 14 | run_peer_race(race, std::move(empty_blocks), file_path, file_sz, is_doris, |
718 | 14 | std::move(manager), std::move(candidates), tablet_id, |
719 | 14 | std::move(resource_id), parent_resource_ctx); |
720 | 14 | }, |
721 | 14 | /*init_thread_ctx=*/true); |
722 | | |
723 | | // Launch S3 (with optional hedge delay). |
724 | | // Pass shared_from_this() so the background S3 task holds a reference to this |
725 | | // reader, preventing destruction (and close()) until the S3 task completes. |
726 | 14 | launch_s3_race(race, empty_start, span_size, io_ctx, remote_reader, parent_resource_ctx, |
727 | 14 | shared_from_this()); |
728 | | |
729 | | // Collect race result. |
730 | 14 | return collect_race_result(race, span_size, buffer, peer_result, stats, io_ctx); |
731 | 14 | } |
732 | | |
733 | | bool CachedRemoteFileReader::_try_read_from_cached_files_directly( |
734 | | size_t offset, Slice result, size_t bytes_req, bool is_dryrun, ReadStatistics& stats, |
735 | 718k | SourceReadBreakdown& source_read_breakdown, size_t& already_read, size_t* bytes_read) { |
736 | 718k | if (!_can_read_cache_file_directly()) { |
737 | 717k | return false; |
738 | 717k | } |
739 | | |
740 | 1.05k | SCOPED_RAW_TIMER(&stats.read_cache_file_directly_timer); |
741 | 1.05k | size_t need_read_size = bytes_req; |
742 | 1.05k | std::shared_lock lock(_mtx); |
743 | 1.05k | if (_cache_file_readers.empty()) { |
744 | 14 | return false; |
745 | 14 | } |
746 | | |
747 | 1.04k | auto iter = _cache_file_readers.upper_bound(offset); |
748 | 1.04k | if (iter != _cache_file_readers.begin()) { |
749 | 1.01k | --iter; |
750 | 1.01k | } |
751 | | |
752 | 1.04k | size_t current_offset = offset; |
753 | 2.48k | while (need_read_size != 0 && iter != _cache_file_readers.end()) { |
754 | 1.44k | if (iter->second->offset() > current_offset || |
755 | 1.44k | iter->second->range().right < current_offset) { |
756 | 6 | break; |
757 | 6 | } |
758 | | |
759 | 1.43k | size_t file_offset = current_offset - iter->second->offset(); |
760 | 1.43k | size_t reserve_bytes = std::min(need_read_size, iter->second->range().size() - file_offset); |
761 | 1.43k | if (is_dryrun) [[unlikely]] { |
762 | 1 | g_skip_local_cache_io_sum_bytes << reserve_bytes; |
763 | 1.43k | } else { |
764 | 1.43k | SCOPED_RAW_TIMER(&stats.local_read_timer); |
765 | 1.43k | if (!iter->second |
766 | 1.43k | ->read(Slice(result.data + (current_offset - offset), reserve_bytes), |
767 | 1.43k | file_offset) |
768 | 1.43k | .ok()) { // TODO: maybe read failed because block evict, should handle error |
769 | 0 | break; |
770 | 0 | } |
771 | 1.43k | source_read_breakdown.local_bytes += reserve_bytes; |
772 | 1.43k | } |
773 | | |
774 | 1.43k | _cache->add_need_update_lru_block(iter->second); |
775 | 1.43k | need_read_size -= reserve_bytes; |
776 | 1.43k | current_offset += reserve_bytes; |
777 | 1.43k | already_read += reserve_bytes; |
778 | 1.43k | ++iter; |
779 | 1.43k | } |
780 | | |
781 | 1.04k | if (need_read_size == 0) { |
782 | 1.00k | *bytes_read = bytes_req; |
783 | 1.00k | stats.hit_cache = true; |
784 | 1.00k | g_read_cache_direct_whole_num << 1; |
785 | 1.00k | g_read_cache_direct_whole_bytes << bytes_req; |
786 | 1.00k | return true; |
787 | 1.00k | } |
788 | | |
789 | 37 | g_read_cache_direct_partial_num << 1; |
790 | 37 | g_read_cache_direct_partial_bytes << already_read; |
791 | 37 | return false; |
792 | 1.04k | } |
793 | | |
794 | | std::vector<FileBlockSPtr> CachedRemoteFileReader::_collect_remote_read_blocks( |
795 | 717k | const FileBlocksHolder& holder, ReadStatistics& stats) { |
796 | 717k | std::vector<FileBlockSPtr> empty_blocks; |
797 | 717k | for (auto& block : holder.file_blocks) { |
798 | 717k | switch (block->state()) { |
799 | 5.24k | case FileBlock::State::EMPTY: |
800 | 5.24k | VLOG_DEBUG << fmt::format("Block EMPTY path={} hash={}:{}:{} offset={} cache_path={}", |
801 | 0 | path().native(), _cache_hash.to_string(), _cache_hash.high(), |
802 | 0 | _cache_hash.low(), block->offset(), block->get_cache_file()); |
803 | 5.24k | block->get_or_set_downloader(); |
804 | 5.24k | if (block->is_downloader()) { |
805 | 5.24k | empty_blocks.push_back(block); |
806 | 5.24k | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::EMPTY"); |
807 | 5.24k | } |
808 | 5.24k | stats.hit_cache = false; |
809 | 5.24k | break; |
810 | 3 | case FileBlock::State::SKIP_CACHE: |
811 | 3 | VLOG_DEBUG << fmt::format( |
812 | 0 | "Block SKIP_CACHE path={} hash={}:{}:{} offset={} cache_path={}", |
813 | 0 | path().native(), _cache_hash.to_string(), _cache_hash.high(), _cache_hash.low(), |
814 | 0 | block->offset(), block->get_cache_file()); |
815 | 3 | empty_blocks.push_back(block); |
816 | 3 | stats.hit_cache = false; |
817 | 3 | stats.skip_cache = true; |
818 | 3 | break; |
819 | 2 | case FileBlock::State::DOWNLOADING: |
820 | 2 | stats.hit_cache = false; |
821 | 2 | break; |
822 | 712k | case FileBlock::State::DOWNLOADED: |
823 | 712k | _insert_file_reader(block); |
824 | 712k | break; |
825 | 717k | } |
826 | 717k | } |
827 | 717k | return empty_blocks; |
828 | 717k | } |
829 | | |
830 | | Status CachedRemoteFileReader::_read_remote_blocks_into_cache( |
831 | | const std::vector<FileBlockSPtr>& empty_blocks, size_t offset, size_t bytes_req, |
832 | | size_t already_read, Slice result, bool is_dryrun, ReadStatistics& stats, |
833 | | SourceReadBreakdown& source_read_breakdown, const IOContext* io_ctx, |
834 | | size_t& indirect_read_bytes, size_t& empty_start, size_t& empty_end, |
835 | 717k | PeerFetchedBlockSet& peer_fetched_blocks) { |
836 | 717k | empty_start = 0; |
837 | 717k | empty_end = 0; |
838 | 717k | peer_fetched_blocks.clear(); |
839 | 717k | if (empty_blocks.empty()) { |
840 | 712k | return Status::OK(); |
841 | 712k | } |
842 | | |
843 | 5.18k | empty_start = empty_blocks.front()->range().left; |
844 | 5.18k | empty_end = empty_blocks.back()->range().right; |
845 | 5.18k | const size_t span_read_size = empty_end - empty_start + 1; |
846 | 5.18k | const auto peer_fetch_layout = build_peer_fetch_layout(empty_blocks, size()); |
847 | 5.18k | std::unique_ptr<char[]> buffer; |
848 | 5.18k | PeerFetchResult peer_result; |
849 | | |
850 | 5.18k | RETURN_IF_ERROR(_execute_remote_read(empty_blocks, empty_start, span_read_size, buffer, |
851 | 5.18k | &peer_result, stats, io_ctx)); |
852 | | |
853 | 5.18k | std::vector<std::vector<const PeerFetchChunk*>> peer_chunks_by_block; |
854 | 5.18k | if (stats.from_peer_cache) { |
855 | | // Peer returns sparse payloads; remember the exact sparse blocks that were filled. |
856 | 15 | peer_fetched_blocks.reserve(empty_blocks.size()); |
857 | 40 | for (const auto& block : empty_blocks) { |
858 | 40 | peer_fetched_blocks.insert(block.get()); |
859 | 40 | } |
860 | 15 | peer_chunks_by_block.resize(empty_blocks.size()); |
861 | 40 | for (const auto& chunk : peer_result.chunks) { |
862 | 40 | DCHECK_LT(chunk.block_index, empty_blocks.size()); |
863 | 40 | peer_chunks_by_block[chunk.block_index].push_back(&chunk); |
864 | 40 | } |
865 | 15 | } |
866 | | |
867 | 5.18k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_write_back); |
868 | 10.4k | for (size_t idx = 0; idx < empty_blocks.size(); ++idx) { |
869 | 5.24k | auto& block = empty_blocks[idx]; |
870 | 5.24k | if (block->state() == FileBlock::State::SKIP_CACHE) { |
871 | 3 | continue; |
872 | 3 | } |
873 | | |
874 | 5.24k | SCOPED_RAW_TIMER(&stats.local_write_timer); |
875 | 5.24k | size_t block_size = block->range().size(); |
876 | 5.24k | Status st; |
877 | 5.24k | if (stats.from_peer_cache) { |
878 | 40 | block_size = peer_fetch_layout.block_sizes[idx]; |
879 | 40 | if (block_size == 0) { |
880 | 0 | continue; |
881 | 0 | } |
882 | 40 | st = write_peer_payloads_into_block(block, peer_chunks_by_block[idx], &block_size); |
883 | 5.20k | } else { |
884 | 5.20k | char* current_ptr = buffer.get() + block->range().left - empty_start; |
885 | 5.20k | st = block->append(Slice(current_ptr, block_size)); |
886 | 5.20k | } |
887 | 5.24k | if (st.ok()) { |
888 | 5.24k | st = block->finalize(); |
889 | 5.24k | } |
890 | 5.24k | if (!st.ok()) { |
891 | 2 | LOG(WARNING) << "write data to file cache failed, source=" |
892 | 2 | << (stats.from_peer_cache ? "peer" : "remote") |
893 | 2 | << ", path=" << path().native() << ", tablet_id=" << _tablet_id |
894 | 2 | << ", file_size=" << size() << ", cache_hash=" << _cache_hash.to_string() |
895 | 2 | << ", write_block_size=" << block_size |
896 | 2 | << ", block=" << block->get_info_for_log() |
897 | 2 | << ", cache_file=" << block->get_cache_file() << ", err=" << st; |
898 | 5.24k | } else { |
899 | 5.24k | _insert_file_reader(block); |
900 | 5.24k | stats.bytes_write_into_file_cache += block_size; |
901 | 5.24k | } |
902 | 5.24k | } |
903 | | |
904 | 5.18k | const size_t right_offset = offset + bytes_req - 1; |
905 | 5.18k | if (stats.from_peer_cache) { |
906 | 15 | if (is_dryrun) { |
907 | 1 | return Status::OK(); |
908 | 1 | } |
909 | 37 | for (const auto& chunk : peer_result.chunks) { |
910 | 37 | copy_peer_chunk_to_result(chunk, offset, right_offset, already_read, result, |
911 | 37 | indirect_read_bytes, source_read_breakdown); |
912 | 37 | } |
913 | 14 | return Status::OK(); |
914 | 15 | } |
915 | | |
916 | 5.16k | if (empty_start <= right_offset && empty_end >= offset + already_read && !is_dryrun) { |
917 | 5.16k | size_t copy_left_offset = std::max(offset + already_read, empty_start); |
918 | 5.16k | size_t copy_right_offset = std::min(right_offset, empty_end); |
919 | 5.16k | char* dst = result.data + (copy_left_offset - offset); |
920 | 5.16k | char* src = buffer.get() + (copy_left_offset - empty_start); |
921 | 5.16k | size_t copy_size = copy_right_offset - copy_left_offset + 1; |
922 | 5.16k | memcpy(dst, src, copy_size); |
923 | 5.16k | indirect_read_bytes += copy_size; |
924 | 5.16k | source_read_breakdown.remote_bytes += copy_size; |
925 | 5.16k | } |
926 | 5.16k | return Status::OK(); |
927 | 5.18k | } |
928 | | |
929 | | Status CachedRemoteFileReader::_read_remaining_blocks_from_cache( |
930 | | const FileBlocksHolder& holder, size_t offset, size_t bytes_req, Slice result, |
931 | | bool is_dryrun, size_t empty_start, size_t empty_end, |
932 | | const PeerFetchedBlockSet& peer_fetched_blocks, ReadStatistics& stats, |
933 | | SourceReadBreakdown& source_read_breakdown, size_t& indirect_read_bytes, size_t* bytes_read, |
934 | 717k | const IOContext* io_ctx) { |
935 | 717k | size_t current_offset = offset + *bytes_read; |
936 | 717k | size_t end_offset = offset + bytes_req - 1; |
937 | 717k | bool need_self_heal = false; |
938 | 717k | for (auto& block : holder.file_blocks) { |
939 | 717k | if (current_offset > end_offset) { |
940 | 0 | break; |
941 | 0 | } |
942 | | |
943 | 717k | size_t left = block->range().left; |
944 | 717k | size_t right = block->range().right; |
945 | 717k | if (right < offset) { |
946 | 1 | continue; |
947 | 1 | } |
948 | | |
949 | 717k | size_t read_size = |
950 | 717k | end_offset > right ? right - current_offset + 1 : end_offset - current_offset + 1; |
951 | 717k | if (!peer_fetched_blocks.empty() && contains_file_block(peer_fetched_blocks, block)) { |
952 | | // For sparse peer reads, skip only blocks fetched from peer. Other blocks inside the |
953 | | // enclosing span may still come from local cache. |
954 | 41 | *bytes_read += read_size; |
955 | 41 | current_offset = right + 1; |
956 | 41 | continue; |
957 | 41 | } |
958 | 717k | if (peer_fetched_blocks.empty() && empty_start <= left && right <= empty_end) { |
959 | 5.20k | *bytes_read += read_size; |
960 | 5.20k | current_offset = right + 1; |
961 | 5.20k | continue; |
962 | 5.20k | } |
963 | | |
964 | 712k | FileBlock::State block_state = block->state(); |
965 | 712k | int64_t wait_time = 0; |
966 | 712k | static int64_t max_wait_time = 10; |
967 | 712k | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::max_wait_time", &max_wait_time); |
968 | 712k | if (block_state != FileBlock::State::DOWNLOADED) { |
969 | 3 | SCOPED_CONCURRENCY_COUNT( |
970 | 3 | ConcurrencyStatsManager::instance().cached_remote_reader_blocking); |
971 | 4 | do { |
972 | 4 | SCOPED_RAW_TIMER(&stats.remote_wait_timer); |
973 | 4 | TEST_SYNC_POINT_CALLBACK("CachedRemoteFileReader::DOWNLOADING"); |
974 | 4 | block_state = block->wait(); |
975 | 4 | if (block_state != FileBlock::State::DOWNLOADING) { |
976 | 2 | break; |
977 | 2 | } |
978 | 4 | } while (++wait_time < max_wait_time); |
979 | 3 | } |
980 | 712k | if (wait_time == max_wait_time) [[unlikely]] { |
981 | 1 | LOG_WARNING("Waiting too long for the download to complete"); |
982 | 1 | } |
983 | | |
984 | 712k | Status st; |
985 | | /* |
986 | | * If block_state == EMPTY, the thread reads the data from remote. |
987 | | * If block_state == DOWNLOADED, when the cache file is deleted by the other process, |
988 | | * the thread reads the data from remote too. |
989 | | */ |
990 | 712k | if (block_state == FileBlock::State::DOWNLOADED) { |
991 | 712k | if (is_dryrun) [[unlikely]] { |
992 | 1 | g_skip_local_cache_io_sum_bytes << read_size; |
993 | 712k | } else { |
994 | 712k | size_t file_offset = current_offset - left; |
995 | 712k | SCOPED_RAW_TIMER(&stats.local_read_timer); |
996 | 712k | SCOPED_CONCURRENCY_COUNT( |
997 | 712k | ConcurrencyStatsManager::instance().cached_remote_reader_local_read); |
998 | 712k | st = block->read(Slice(result.data + (current_offset - offset), read_size), |
999 | 712k | file_offset); |
1000 | 712k | indirect_read_bytes += read_size; |
1001 | 712k | if (st.ok()) { |
1002 | 712k | source_read_breakdown.local_bytes += read_size; |
1003 | 712k | } |
1004 | 712k | } |
1005 | 712k | if (block_state == FileBlock::State::DOWNLOADED && st.is<ErrorCode::NOT_FOUND>()) { |
1006 | 1 | need_self_heal = true; |
1007 | 1 | g_read_cache_self_heal_on_not_found << 1; |
1008 | 1 | LOG_EVERY_N(WARNING, 100) |
1009 | 1 | << "Cache block file is missing, will self-heal by clearing cache hash. " |
1010 | 1 | << "path=" << path().native() << ", hash=" << _cache_hash.to_string() |
1011 | 1 | << ", offset=" << left << ", err=" << st.msg(); |
1012 | 1 | } |
1013 | 712k | } |
1014 | 712k | if (!st || block_state != FileBlock::State::DOWNLOADED) { |
1015 | 4 | if (is_dryrun) [[unlikely]] { |
1016 | 0 | *bytes_read += read_size; |
1017 | 0 | current_offset = right + 1; |
1018 | 0 | continue; |
1019 | 0 | } |
1020 | 4 | LOG(WARNING) << "Read data failed from file cache downloaded by others. err=" |
1021 | 4 | << st.msg() << ", block state=" << block_state; |
1022 | 4 | size_t remote_bytes_read {0}; |
1023 | 4 | stats.hit_cache = false; |
1024 | 4 | stats.from_peer_cache = false; |
1025 | 4 | s3_read_counter << 1; |
1026 | 4 | SCOPED_RAW_TIMER(&stats.remote_read_timer); |
1027 | 4 | RETURN_IF_ERROR(_remote_file_reader->read_at( |
1028 | 4 | current_offset, Slice(result.data + (current_offset - offset), read_size), |
1029 | 4 | &remote_bytes_read, io_ctx)); |
1030 | 3 | indirect_read_bytes += read_size; |
1031 | 3 | source_read_breakdown.remote_bytes += remote_bytes_read; |
1032 | 3 | DCHECK(remote_bytes_read == read_size); |
1033 | 3 | } |
1034 | | |
1035 | 712k | *bytes_read += read_size; |
1036 | 712k | current_offset = right + 1; |
1037 | 712k | } |
1038 | 717k | if (need_self_heal && _cache != nullptr) { |
1039 | 1 | _cache->remove_if_cached_async(_cache_hash); |
1040 | 1 | } |
1041 | 717k | return Status::OK(); |
1042 | 717k | } |
1043 | | |
1044 | | Status CachedRemoteFileReader::_read_from_indirect_cache(size_t offset, Slice result, |
1045 | | size_t bytes_req, size_t already_read, |
1046 | | bool is_dryrun, size_t* bytes_read, |
1047 | | ReadStatistics& stats, |
1048 | | SourceReadBreakdown& source_read_breakdown, |
1049 | 717k | const IOContext* io_ctx) { |
1050 | 717k | g_read_cache_indirect_num << 1; |
1051 | 717k | size_t indirect_read_bytes = 0; |
1052 | 717k | auto [align_left, align_size] = |
1053 | 717k | s_align_size(offset + already_read, bytes_req - already_read, size()); |
1054 | 717k | CacheContext cache_context(io_ctx); |
1055 | 717k | cache_context.stats = &stats; |
1056 | 717k | MonotonicStopWatch sw; |
1057 | 717k | sw.start(); |
1058 | 717k | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->increment(); |
1059 | 717k | FileBlocksHolder holder = |
1060 | 717k | _cache->get_or_set(_cache_hash, align_left, align_size, cache_context); |
1061 | 717k | ConcurrencyStatsManager::instance().cached_remote_reader_get_or_set->decrement(); |
1062 | 717k | stats.cache_get_or_set_timer += sw.elapsed_time(); |
1063 | | |
1064 | 717k | auto empty_blocks = _collect_remote_read_blocks(holder, stats); |
1065 | 717k | size_t empty_start = 0; |
1066 | 717k | size_t empty_end = 0; |
1067 | 717k | PeerFetchedBlockSet peer_fetched_blocks; |
1068 | 717k | RETURN_IF_ERROR(_read_remote_blocks_into_cache(empty_blocks, offset, bytes_req, already_read, |
1069 | 717k | result, is_dryrun, stats, source_read_breakdown, |
1070 | 717k | io_ctx, indirect_read_bytes, empty_start, |
1071 | 717k | empty_end, peer_fetched_blocks)); |
1072 | 717k | *bytes_read = already_read; |
1073 | 717k | RETURN_IF_ERROR(_read_remaining_blocks_from_cache(holder, offset, bytes_req, result, is_dryrun, |
1074 | 717k | empty_start, empty_end, peer_fetched_blocks, |
1075 | 717k | stats, source_read_breakdown, |
1076 | 717k | indirect_read_bytes, bytes_read, io_ctx)); |
1077 | 717k | g_read_cache_indirect_bytes << indirect_read_bytes; |
1078 | 717k | g_read_cache_indirect_total_bytes << *bytes_read; |
1079 | 717k | DCHECK(*bytes_read == bytes_req); |
1080 | 717k | return Status::OK(); |
1081 | 717k | } |
1082 | | |
1083 | | Status CachedRemoteFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read, |
1084 | 718k | const IOContext* io_ctx) { |
1085 | 718k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().cached_remote_reader_read_at); |
1086 | 718k | IOContext default_io_ctx; |
1087 | 718k | if (io_ctx == nullptr) { |
1088 | 1 | io_ctx = &default_io_ctx; |
1089 | 1 | } |
1090 | 718k | DCHECK(io_ctx); |
1091 | 718k | DCHECK(!closed()); |
1092 | | |
1093 | 718k | const bool is_dryrun = io_ctx->is_dryrun; |
1094 | 718k | if (offset > size()) { |
1095 | 1 | return Status::InvalidArgument( |
1096 | 1 | fmt::format("offset exceeds file size(offset: {}, file size: {}, path: {})", offset, |
1097 | 1 | size(), path().native())); |
1098 | 1 | } |
1099 | | |
1100 | 718k | size_t bytes_req = std::min(result.size, size() - offset); |
1101 | 718k | if (UNLIKELY(bytes_req == 0)) { |
1102 | 1 | *bytes_read = 0; |
1103 | 1 | return Status::OK(); |
1104 | 1 | } |
1105 | | |
1106 | 718k | ReadStatistics stats; |
1107 | 718k | SourceReadBreakdown source_read_breakdown; |
1108 | 718k | Status read_st = Status::OK(); |
1109 | 718k | MonotonicStopWatch read_at_sw; |
1110 | 718k | read_at_sw.start(); |
1111 | 718k | stats.bytes_read += bytes_req; |
1112 | 718k | Defer defer {[&]() { |
1113 | 718k | if (config::print_stack_when_cache_miss) { |
1114 | 0 | if (io_ctx->file_cache_stats == nullptr && !stats.hit_cache && !io_ctx->is_warmup) { |
1115 | 0 | LOG_INFO("[verbose] {}", Status::InternalError<true>("not hit cache")); |
1116 | 0 | } |
1117 | 0 | } |
1118 | 718k | if (!stats.hit_cache && config::read_cluster_cache_opt_verbose_log) { |
1119 | 0 | LOG_INFO( |
1120 | 0 | "[verbose] not hit cache, path: {}, offset: {}, size: {}, cost: {} ms, warmup: " |
1121 | 0 | "{}", |
1122 | 0 | path().native(), offset, bytes_req, read_at_sw.elapsed_time_milliseconds(), |
1123 | 0 | io_ctx->is_warmup); |
1124 | 0 | } |
1125 | 718k | if (read_st.ok() && !is_dryrun) { |
1126 | | // Only successful reads contribute to query profile and file-cache metrics. |
1127 | 718k | const auto file_cache_read_type = |
1128 | 718k | io_ctx->is_inverted_index |
1129 | 718k | ? FileCacheReadType::INVERTED_INDEX |
1130 | 718k | : (io_ctx->is_index_data ? FileCacheReadType::SEGMENT_FOOTER_INDEX |
1131 | 718k | : FileCacheReadType::DATA); |
1132 | 718k | if (io_ctx->file_cache_stats) { |
1133 | 709k | _update_stats(stats, source_read_breakdown, io_ctx->file_cache_stats, |
1134 | 709k | file_cache_read_type); |
1135 | 709k | } |
1136 | 718k | if (!io_ctx->is_warmup) { |
1137 | 718k | FileCacheStatistics fcache_stats_increment; |
1138 | 718k | _update_stats(stats, source_read_breakdown, &fcache_stats_increment, |
1139 | 718k | file_cache_read_type); |
1140 | 718k | io::FileCacheMetrics::instance().update(&fcache_stats_increment); |
1141 | 718k | } |
1142 | 718k | } |
1143 | 718k | }}; |
1144 | | |
1145 | 718k | size_t already_read = 0; |
1146 | 718k | if (_try_read_from_cached_files_directly(offset, result, bytes_req, is_dryrun, stats, |
1147 | 718k | source_read_breakdown, already_read, bytes_read)) { |
1148 | 1.00k | return Status::OK(); |
1149 | 1.00k | } |
1150 | | |
1151 | 717k | read_st = _read_from_indirect_cache(offset, result, bytes_req, already_read, is_dryrun, |
1152 | 717k | bytes_read, stats, source_read_breakdown, io_ctx); |
1153 | 717k | return read_st; |
1154 | 718k | } |
1155 | | |
1156 | 0 | void CachedRemoteFileReader::prefetch_range(size_t offset, size_t size, const IOContext* io_ctx) { |
1157 | 0 | if (offset >= this->size() || size == 0) { |
1158 | 0 | return; |
1159 | 0 | } |
1160 | | |
1161 | 0 | size = std::min(size, this->size() - offset); |
1162 | |
|
1163 | 0 | ThreadPool* pool = ExecEnv::GetInstance()->segment_prefetch_thread_pool(); |
1164 | 0 | if (pool == nullptr) { |
1165 | 0 | return; |
1166 | 0 | } |
1167 | | |
1168 | 0 | IOContext dryrun_ctx; |
1169 | 0 | if (io_ctx != nullptr) { |
1170 | 0 | dryrun_ctx = *io_ctx; |
1171 | 0 | } |
1172 | 0 | dryrun_ctx.is_dryrun = true; |
1173 | 0 | dryrun_ctx.query_id = nullptr; |
1174 | 0 | dryrun_ctx.file_cache_stats = nullptr; |
1175 | 0 | dryrun_ctx.file_reader_stats = nullptr; |
1176 | |
|
1177 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) |
1178 | 0 | << fmt::format("[verbose] Submitting prefetch task for offset={} size={}, file={}", |
1179 | 0 | offset, size, path().filename().native()); |
1180 | 0 | std::weak_ptr<CachedRemoteFileReader> weak_this = shared_from_this(); |
1181 | 0 | auto st = pool->submit_func([weak_this, offset, size, dryrun_ctx]() { |
1182 | 0 | auto self = weak_this.lock(); |
1183 | 0 | if (self == nullptr) { |
1184 | 0 | return; |
1185 | 0 | } |
1186 | 0 | size_t bytes_read = 0; |
1187 | 0 | Slice dummy_buffer((char*)nullptr, size); |
1188 | 0 | (void)self->read_at_impl(offset, dummy_buffer, &bytes_read, &dryrun_ctx); |
1189 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) |
1190 | 0 | << fmt::format("[verbose] Prefetch task completed for offset={} size={}, file={}", |
1191 | 0 | offset, size, self->path().filename().native()); |
1192 | 0 | }); |
1193 | |
|
1194 | 0 | if (!st.ok()) { |
1195 | 0 | VLOG_DEBUG << "Failed to submit prefetch task for offset=" << offset << " size=" << size |
1196 | 0 | << " error=" << st.to_string(); |
1197 | 0 | } |
1198 | 0 | } |
1199 | | |
1200 | | void CachedRemoteFileReader::_update_stats(const ReadStatistics& read_stats, |
1201 | | const SourceReadBreakdown& source_read_breakdown, |
1202 | | FileCacheStatistics* statis, |
1203 | 1.42M | FileCacheReadType read_type) const { |
1204 | 1.42M | if (statis == nullptr) { |
1205 | 0 | return; |
1206 | 0 | } |
1207 | 1.42M | const bool has_source_bytes = source_read_breakdown.local_bytes != 0 || |
1208 | 1.42M | source_read_breakdown.remote_bytes != 0 || |
1209 | 1.42M | source_read_breakdown.peer_bytes != 0; |
1210 | 1.42M | if (has_source_bytes) { |
1211 | 1.42M | if (source_read_breakdown.local_bytes != 0) { |
1212 | 1.41M | statis->num_local_io_total++; |
1213 | 1.41M | statis->bytes_read_from_local += source_read_breakdown.local_bytes; |
1214 | 1.41M | } |
1215 | 1.42M | if (source_read_breakdown.peer_bytes != 0 || read_stats.from_peer_cache) { |
1216 | | // Count peer IO whenever peer was used, even if its fetched blocks were entirely |
1217 | | // outside the copy range (e.g., backward-aligned prefetch block before |
1218 | | // offset+already_read). In that case peer_bytes==0 but the peer RPC did happen |
1219 | | // and wrote data into the local file cache. |
1220 | 28 | statis->num_peer_io_total++; |
1221 | 28 | statis->bytes_read_from_peer += source_read_breakdown.peer_bytes; |
1222 | 28 | statis->peer_io_timer += read_stats.peer_read_timer; |
1223 | 28 | } |
1224 | 1.42M | if (source_read_breakdown.remote_bytes != 0) { |
1225 | 10.3k | statis->num_remote_io_total++; |
1226 | 10.3k | statis->bytes_read_from_remote += source_read_breakdown.remote_bytes; |
1227 | 10.3k | statis->remote_io_timer += read_stats.remote_read_timer; |
1228 | 10.3k | } |
1229 | 18.4E | } else if (read_stats.hit_cache) { |
1230 | 0 | statis->num_local_io_total++; |
1231 | 0 | statis->bytes_read_from_local += read_stats.bytes_read; |
1232 | 18.4E | } else if (read_stats.from_peer_cache) { |
1233 | 0 | statis->num_peer_io_total++; |
1234 | 0 | statis->bytes_read_from_peer += read_stats.bytes_read; |
1235 | 0 | statis->peer_io_timer += read_stats.peer_read_timer; |
1236 | 18.4E | } else { |
1237 | 18.4E | statis->num_remote_io_total++; |
1238 | 18.4E | statis->bytes_read_from_remote += read_stats.bytes_read; |
1239 | 18.4E | statis->remote_io_timer += read_stats.remote_read_timer; |
1240 | 18.4E | } |
1241 | 1.42M | statis->remote_wait_timer += read_stats.remote_wait_timer; |
1242 | 1.42M | statis->local_io_timer += read_stats.local_read_timer; |
1243 | 1.42M | statis->num_skip_cache_io_total += read_stats.skip_cache; |
1244 | 1.42M | statis->bytes_write_into_cache += read_stats.bytes_write_into_file_cache; |
1245 | 1.42M | statis->write_cache_io_timer += read_stats.local_write_timer; |
1246 | | |
1247 | 1.42M | statis->read_cache_file_directly_timer += read_stats.read_cache_file_directly_timer; |
1248 | 1.42M | statis->cache_get_or_set_timer += read_stats.cache_get_or_set_timer; |
1249 | 1.42M | statis->lock_wait_timer += read_stats.lock_wait_timer; |
1250 | 1.42M | statis->get_timer += read_stats.get_timer; |
1251 | 1.42M | statis->set_timer += read_stats.set_timer; |
1252 | | |
1253 | 1.42M | auto update_index_stats = [&](int64_t& num_local_io_total, int64_t& num_remote_io_total, |
1254 | 1.42M | int64_t& num_peer_io_total, int64_t& bytes_read_from_local, |
1255 | 1.42M | int64_t& bytes_read_from_remote, int64_t& bytes_read_from_peer, |
1256 | 1.42M | int64_t& local_io_timer, int64_t& remote_io_timer, |
1257 | 1.42M | int64_t& peer_io_timer) { |
1258 | 2 | if (has_source_bytes) { |
1259 | 2 | if (source_read_breakdown.local_bytes != 0) { |
1260 | 0 | num_local_io_total++; |
1261 | 0 | bytes_read_from_local += source_read_breakdown.local_bytes; |
1262 | 0 | } |
1263 | 2 | if (source_read_breakdown.peer_bytes != 0 || read_stats.from_peer_cache) { |
1264 | 0 | num_peer_io_total++; |
1265 | 0 | bytes_read_from_peer += source_read_breakdown.peer_bytes; |
1266 | 0 | peer_io_timer += read_stats.peer_read_timer; |
1267 | 0 | } |
1268 | 2 | if (source_read_breakdown.remote_bytes != 0) { |
1269 | 2 | num_remote_io_total++; |
1270 | 2 | bytes_read_from_remote += source_read_breakdown.remote_bytes; |
1271 | 2 | remote_io_timer += read_stats.remote_read_timer; |
1272 | 2 | } |
1273 | 2 | } else if (read_stats.hit_cache) { |
1274 | 0 | num_local_io_total++; |
1275 | 0 | bytes_read_from_local += read_stats.bytes_read; |
1276 | 0 | } else if (read_stats.from_peer_cache) { |
1277 | 0 | num_peer_io_total++; |
1278 | 0 | bytes_read_from_peer += read_stats.bytes_read; |
1279 | 0 | peer_io_timer += read_stats.peer_read_timer; |
1280 | 0 | } else { |
1281 | 0 | num_remote_io_total++; |
1282 | 0 | bytes_read_from_remote += read_stats.bytes_read; |
1283 | 0 | remote_io_timer += read_stats.remote_read_timer; |
1284 | 0 | } |
1285 | 2 | local_io_timer += read_stats.local_read_timer; |
1286 | 2 | }; |
1287 | | |
1288 | 1.42M | switch (read_type) { |
1289 | 1.42M | case FileCacheReadType::DATA: |
1290 | 1.42M | break; |
1291 | 0 | case FileCacheReadType::INVERTED_INDEX: |
1292 | 0 | update_index_stats( |
1293 | 0 | statis->inverted_index_num_local_io_total, |
1294 | 0 | statis->inverted_index_num_remote_io_total, |
1295 | 0 | statis->inverted_index_num_peer_io_total, |
1296 | 0 | statis->inverted_index_bytes_read_from_local, |
1297 | 0 | statis->inverted_index_bytes_read_from_remote, |
1298 | 0 | statis->inverted_index_bytes_read_from_peer, statis->inverted_index_local_io_timer, |
1299 | 0 | statis->inverted_index_remote_io_timer, statis->inverted_index_peer_io_timer); |
1300 | 0 | break; |
1301 | 2 | case FileCacheReadType::SEGMENT_FOOTER_INDEX: |
1302 | 2 | update_index_stats(statis->segment_footer_index_num_local_io_total, |
1303 | 2 | statis->segment_footer_index_num_remote_io_total, |
1304 | 2 | statis->segment_footer_index_num_peer_io_total, |
1305 | 2 | statis->segment_footer_index_bytes_read_from_local, |
1306 | 2 | statis->segment_footer_index_bytes_read_from_remote, |
1307 | 2 | statis->segment_footer_index_bytes_read_from_peer, |
1308 | 2 | statis->segment_footer_index_local_io_timer, |
1309 | 2 | statis->segment_footer_index_remote_io_timer, |
1310 | 2 | statis->segment_footer_index_peer_io_timer); |
1311 | 2 | break; |
1312 | 1.42M | } |
1313 | | |
1314 | 1.42M | g_skip_cache_sum << read_stats.skip_cache; |
1315 | 1.42M | } |
1316 | | |
1317 | | } // namespace doris::io |