be/src/cloud/cloud_internal_service.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_internal_service.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <bthread/countdown_event.h> |
22 | | #include <butil/iobuf.h> |
23 | | #include <fmt/format.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <chrono> |
27 | | #include <limits> |
28 | | #include <list> |
29 | | #include <memory> |
30 | | #include <optional> |
31 | | #include <thread> |
32 | | #include <unordered_map> |
33 | | #include <vector> |
34 | | |
35 | | #include "cloud/cloud_storage_engine.h" |
36 | | #include "cloud/cloud_tablet.h" |
37 | | #include "cloud/cloud_tablet_mgr.h" |
38 | | #include "cloud/cloud_warm_up_manager.h" |
39 | | #include "cloud/cloud_warmup_metrics.h" |
40 | | #include "cloud/config.h" |
41 | | #include "io/cache/block_file_cache.h" |
42 | | #include "io/cache/block_file_cache_downloader.h" |
43 | | #include "io/cache/block_file_cache_factory.h" |
44 | | #include "io/fs/path.h" |
45 | | #include "runtime/thread_context.h" |
46 | | #include "runtime/workload_management/io_throttle.h" |
47 | | #include "storage/storage_policy.h" |
48 | | #include "util/async_io.h" |
49 | | #include "util/bvar_windowed_adder.h" |
50 | | #include "util/debug_points.h" |
51 | | |
52 | | namespace doris { |
53 | | #include "common/compile_check_avoid_begin.h" |
54 | | |
55 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_num("file_cache_get_by_peer_num"); |
56 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_blocks_num("file_cache_get_by_peer_blocks_num"); |
57 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_success_num("file_cache_get_by_peer_success_num"); |
58 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_failed_num("file_cache_get_by_peer_failed_num"); |
59 | | bvar::LatencyRecorder g_file_cache_get_by_peer_server_latency( |
60 | | "file_cache_get_by_peer_server_latency"); |
61 | | bvar::LatencyRecorder g_file_cache_get_by_peer_read_cache_file_latency( |
62 | | "file_cache_get_by_peer_read_cache_file_latency"); |
63 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_offer_failed_num( |
64 | | "file_cache_get_by_peer_offer_failed_num"); |
65 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_queue_timeout_num( |
66 | | "file_cache_get_by_peer_queue_timeout_num"); |
67 | | bvar::LatencyRecorder g_file_cache_get_by_peer_queue_wait_latency( |
68 | | "file_cache_get_by_peer_queue_wait_latency"); |
69 | | bvar::LatencyRecorder g_file_cache_get_by_peer_handle_cache_block_req_latency( |
70 | | "file_cache_get_by_peer_handle_cache_block_req_latency"); |
71 | | bvar::LatencyRecorder g_file_cache_get_by_peer_get_cache_latency( |
72 | | "file_cache_get_by_peer_get_cache_latency"); |
73 | | bvar::LatencyRecorder g_file_cache_get_by_peer_get_or_set_latency( |
74 | | "file_cache_get_by_peer_get_or_set_latency"); |
75 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_get_or_set_calls( |
76 | | "file_cache_get_by_peer_get_or_set_calls"); |
77 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_get_or_set_blocks_total( |
78 | | "file_cache_get_by_peer_get_or_set_blocks_total"); |
79 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_request_blocks_total( |
80 | | "file_cache_get_by_peer_request_blocks_total"); |
81 | | bvar::LatencyRecorder g_file_cache_get_by_peer_request_blocks_per_rpc( |
82 | | "file_cache_get_by_peer_request_blocks_per_rpc"); |
83 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_response_blocks_total( |
84 | | "file_cache_get_by_peer_response_blocks_total"); |
85 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_response_bytes_total( |
86 | | "file_cache_get_by_peer_response_bytes_total"); |
87 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_not_downloaded_block_num( |
88 | | "file_cache_get_by_peer_not_downloaded_block_num"); |
89 | | bvar::LatencyRecorder g_file_cache_get_by_peer_read_file_block_total_latency( |
90 | | "file_cache_get_by_peer_read_file_block_total_latency"); |
91 | | bvar::LatencyRecorder g_file_cache_get_by_peer_set_response_data_latency( |
92 | | "file_cache_get_by_peer_set_response_data_latency"); |
93 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_attachment_response_num( |
94 | | "file_cache_get_by_peer_attachment_response_num"); |
95 | | bvar::Adder<uint64_t> g_file_cache_get_by_peer_pb_response_num( |
96 | | "file_cache_get_by_peer_pb_response_num"); |
97 | | |
98 | | bvar::Adder<int64_t> g_peer_server_fill_requested("peer_server_fill_requested"); |
99 | | bvar::Adder<int64_t> g_peer_server_fill_success("peer_server_fill_success"); |
100 | | bvar::Adder<int64_t> g_peer_server_fill_timeout("peer_server_fill_timeout"); |
101 | | bvar::Adder<int64_t> g_peer_server_fill_rejected("peer_server_fill_rejected"); |
102 | | bvar::LatencyRecorder g_peer_server_fill_latency("peer_server_fill_latency"); |
103 | | bvar::LatencyRecorder g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency( |
104 | | "cloud_internal_service_get_file_cache_meta_by_tablet_id_latency"); |
105 | | |
106 | | // Concurrency guard for server-side S3 pull-through fills. |
107 | | static std::atomic<int32_t> g_active_server_fills {0}; |
108 | | bvar::PassiveStatus<int32_t> g_peer_active_fills( |
109 | | "peer_active_fills", |
110 | 3.27k | [](void*) { return g_active_server_fills.load(std::memory_order_relaxed); }, nullptr); |
111 | | |
112 | | CloudInternalServiceImpl::CloudInternalServiceImpl(CloudStorageEngine& engine, ExecEnv* exec_env) |
113 | 0 | : PInternalService(exec_env), _engine(engine) {} |
114 | | |
115 | 0 | CloudInternalServiceImpl::~CloudInternalServiceImpl() = default; |
116 | | |
117 | | void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* controller, |
118 | | const doris::PAlterVaultSyncRequest* request, |
119 | | PAlterVaultSyncResponse* response, |
120 | 0 | google::protobuf::Closure* done) { |
121 | 0 | LOG(INFO) << "alter be to sync vault info from Meta Service"; |
122 | | // If the vaults containing hdfs vault then it would try to create hdfs connection using jni |
123 | | // which would acuiqre one thread local jniEnv. But bthread context can't guarantee that the brpc |
124 | | // worker thread wouldn't do bthread switch between worker threads. |
125 | 0 | bool ret = _heavy_work_pool.try_offer([this, done]() { |
126 | 0 | brpc::ClosureGuard closure_guard(done); |
127 | 0 | _engine.sync_storage_vault(); |
128 | 0 | }); |
129 | 0 | if (!ret) { |
130 | 0 | brpc::ClosureGuard closure_guard(done); |
131 | 0 | LOG(WARNING) << "fail to offer alter_vault_sync request to the work pool, pool=" |
132 | 0 | << _heavy_work_pool.get_info(); |
133 | 0 | } |
134 | 0 | } |
135 | | |
136 | 0 | FileCacheType cache_type_to_pb(io::FileCacheType type) { |
137 | 0 | switch (type) { |
138 | 0 | case io::FileCacheType::TTL: |
139 | 0 | return FileCacheType::TTL; |
140 | 0 | case io::FileCacheType::INDEX: |
141 | 0 | return FileCacheType::INDEX; |
142 | 0 | case io::FileCacheType::NORMAL: |
143 | 0 | return FileCacheType::NORMAL; |
144 | 0 | default: |
145 | 0 | DCHECK(false); |
146 | 0 | } |
147 | 0 | return FileCacheType::NORMAL; |
148 | 0 | } |
149 | | |
150 | 0 | static int64_t current_unix_time_us() { |
151 | 0 | return std::chrono::duration_cast<std::chrono::microseconds>( |
152 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
153 | 0 | .count(); |
154 | 0 | } |
155 | | |
156 | | static std::optional<int64_t> warm_up_rowset_cross_host_latency_us(int64_t start_unix_ts_us, |
157 | 0 | int64_t end_unix_ts_us) { |
158 | | // The start timestamp is generated by the caller BE. Mixed-version callers may omit it, and |
159 | | // system clocks across BEs are not guaranteed to be ordered. |
160 | 0 | if (start_unix_ts_us <= 0 || end_unix_ts_us < start_unix_ts_us) { |
161 | 0 | return std::nullopt; |
162 | 0 | } |
163 | 0 | return end_unix_ts_us - start_unix_ts_us; |
164 | 0 | } |
165 | | |
166 | | static void add_file_cache_block_meta_to_response( |
167 | | PGetFileCacheMetaResponse* resp, int64_t tablet_id, const std::string& rowset_id, |
168 | | int32_t segment_id, const std::string& file_name, |
169 | | const std::tuple<int64_t, int64_t, io::FileCacheType, int64_t>& tuple, |
170 | 0 | const RowsetSharedPtr& rowset, bool is_index) { |
171 | 0 | FileCacheBlockMeta* meta = resp->add_file_cache_block_metas(); |
172 | 0 | meta->set_tablet_id(tablet_id); |
173 | 0 | meta->set_rowset_id(rowset_id); |
174 | 0 | meta->set_segment_id(segment_id); |
175 | 0 | meta->set_file_name(file_name); |
176 | |
|
177 | 0 | if (!is_index) { |
178 | | // .dat |
179 | 0 | meta->set_file_size(rowset->rowset_meta()->segment_file_size(segment_id)); |
180 | 0 | meta->set_file_type(doris::FileType::SEGMENT_FILE); |
181 | 0 | } else { |
182 | | // .idx |
183 | 0 | const auto& idx_file_info = rowset->rowset_meta()->inverted_index_file_info(segment_id); |
184 | 0 | meta->set_file_size(idx_file_info.has_index_size() ? idx_file_info.index_size() : -1); |
185 | 0 | meta->set_file_type(doris::FileType::INVERTED_INDEX_FILE); |
186 | 0 | } |
187 | |
|
188 | 0 | meta->set_offset(std::get<0>(tuple)); |
189 | 0 | meta->set_size(std::get<1>(tuple)); |
190 | 0 | meta->set_cache_type(cache_type_to_pb(std::get<2>(tuple))); |
191 | 0 | meta->set_expiration_time(std::get<3>(tuple)); |
192 | 0 | } |
193 | | |
194 | | static void process_segment_file_cache_meta(PGetFileCacheMetaResponse* resp, |
195 | | const RowsetSharedPtr& rowset, int64_t tablet_id, |
196 | | const std::string& rowset_id, int32_t segment_id, |
197 | 0 | bool is_index) { |
198 | 0 | const char* extension = is_index ? ".idx" : ".dat"; |
199 | 0 | std::string file_name = fmt::format("{}_{}{}", rowset_id, segment_id, extension); |
200 | 0 | auto cache_key = io::BlockFileCache::hash(file_name); |
201 | 0 | auto* cache = io::FileCacheFactory::instance()->get_by_path(cache_key); |
202 | 0 | if (!cache) return; |
203 | 0 | auto segments_meta = cache->get_hot_blocks_meta(cache_key); |
204 | 0 | for (const auto& tuple : segments_meta) { |
205 | 0 | add_file_cache_block_meta_to_response(resp, tablet_id, rowset_id, segment_id, file_name, |
206 | 0 | tuple, rowset, is_index); |
207 | 0 | } |
208 | 0 | } |
209 | | |
210 | | void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( |
211 | | google::protobuf::RpcController* controller [[maybe_unused]], |
212 | | const PGetFileCacheMetaRequest* request, PGetFileCacheMetaResponse* response, |
213 | 0 | google::protobuf::Closure* done) { |
214 | 0 | brpc::ClosureGuard closure_guard(done); |
215 | 0 | if (!config::enable_file_cache) { |
216 | 0 | LOG_WARNING("try to access tablet file cache meta, but file cache not enabled"); |
217 | 0 | return; |
218 | 0 | } |
219 | 0 | auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
220 | 0 | std::chrono::steady_clock::now().time_since_epoch()) |
221 | 0 | .count(); |
222 | 0 | std::ostringstream tablet_ids_stream; |
223 | 0 | int count = 0; |
224 | 0 | for (const auto& tablet_id : request->tablet_ids()) { |
225 | 0 | tablet_ids_stream << tablet_id << ", "; |
226 | 0 | count++; |
227 | 0 | if (count >= 10) { |
228 | 0 | break; |
229 | 0 | } |
230 | 0 | } |
231 | 0 | LOG(INFO) << "warm up get meta from this be, tablets num=" << request->tablet_ids().size() |
232 | 0 | << ", first 10 tablet_ids=[ " << tablet_ids_stream.str() << " ]"; |
233 | 0 | for (const auto& tablet_id : request->tablet_ids()) { |
234 | 0 | auto res = _engine.tablet_mgr().get_tablet(tablet_id); |
235 | 0 | if (!res.has_value()) { |
236 | 0 | LOG(ERROR) << "failed to get tablet: " << tablet_id |
237 | 0 | << " err msg: " << res.error().msg(); |
238 | 0 | continue; |
239 | 0 | } |
240 | 0 | CloudTabletSPtr tablet = std::move(res.value()); |
241 | 0 | auto st = tablet->sync_rowsets(); |
242 | 0 | if (!st) { |
243 | | // just log failed, try it best |
244 | 0 | LOG(WARNING) << "failed to sync rowsets: " << tablet_id |
245 | 0 | << " err msg: " << st.to_string(); |
246 | 0 | } |
247 | 0 | auto rowsets = tablet->get_snapshot_rowset(); |
248 | |
|
249 | 0 | for (const RowsetSharedPtr& rowset : rowsets) { |
250 | 0 | std::string rowset_id = rowset->rowset_id().to_string(); |
251 | 0 | for (int32_t segment_id = 0; segment_id < rowset->num_segments(); ++segment_id) { |
252 | 0 | process_segment_file_cache_meta(response, rowset, tablet_id, rowset_id, segment_id, |
253 | 0 | false); |
254 | 0 | process_segment_file_cache_meta(response, rowset, tablet_id, rowset_id, segment_id, |
255 | 0 | true); |
256 | 0 | } |
257 | 0 | } |
258 | 0 | } |
259 | 0 | auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
260 | 0 | std::chrono::steady_clock::now().time_since_epoch()) |
261 | 0 | .count(); |
262 | 0 | g_cloud_internal_service_get_file_cache_meta_by_tablet_id_latency << (end_ts - begin_ts); |
263 | 0 | LOG(INFO) << "get file cache meta by tablet ids = [ " << tablet_ids_stream.str() << " ] took " |
264 | 0 | << end_ts - begin_ts << " us"; |
265 | 0 | VLOG_DEBUG << "get file cache meta by tablet id request=" << request->DebugString() |
266 | 0 | << ", response=" << response->DebugString(); |
267 | 0 | } |
268 | | |
269 | | namespace { |
270 | | // Helper functions for fetch_peer_data |
271 | 36 | inline int64_t elapsed_us(std::chrono::steady_clock::time_point start) { |
272 | 36 | return std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::steady_clock::now() - |
273 | 36 | start) |
274 | 36 | .count(); |
275 | 36 | } |
276 | | |
277 | 1 | int64_t get_rowset_meta_tablet_id_from_request(const PFetchPeerDataRequest* request) { |
278 | 1 | return request->has_rowset_meta() && request->rowset_meta().has_tablet_id() |
279 | 1 | ? request->rowset_meta().tablet_id() |
280 | 1 | : -1; |
281 | 1 | } |
282 | | |
283 | 1 | std::string get_rowset_meta_resource_id_from_request(const PFetchPeerDataRequest* request) { |
284 | 1 | if (request->has_rowset_meta() && request->rowset_meta().has_resource_id()) { |
285 | 1 | return request->rowset_meta().resource_id(); |
286 | 1 | } |
287 | 0 | return ""; |
288 | 1 | } |
289 | | |
290 | 7 | std::string get_peer_cache_filename(std::string_view path) { |
291 | 7 | return io::Path(std::string(path)).filename().native(); |
292 | 7 | } |
293 | | |
294 | | std::string format_peer_request_context(const PFetchPeerDataRequest* request, |
295 | 0 | const io::UInt128Wrapper& hash, size_t file_size) { |
296 | 0 | const std::string file_size_str = |
297 | 0 | request->has_file_size() ? std::to_string(request->file_size()) : "unknown"; |
298 | 0 | return fmt::format( |
299 | 0 | "type={}, path={}, cache_hash={}, request_fill={}, fill_tablet_id={}, " |
300 | 0 | "fill_remote_path={}, fill_resource_id={}, file_size={}, resolved_file_size={}, " |
301 | 0 | "cache_req_count={}, support_attachment={}", |
302 | 0 | request->type(), request->path(), hash.to_string(), |
303 | 0 | request->has_request_cache_fill() && request->request_cache_fill(), |
304 | 0 | get_rowset_meta_tablet_id_from_request(request), request->path(), |
305 | 0 | get_rowset_meta_resource_id_from_request(request), file_size_str, |
306 | 0 | file_size == std::numeric_limits<size_t>::max() ? std::string("unknown") |
307 | 0 | : std::to_string(file_size), |
308 | 0 | request->cache_req_size(), |
309 | 0 | request->has_support_attachment() && request->support_attachment()); |
310 | 0 | } |
311 | | |
312 | | std::string format_peer_cache_block_context(const PFetchPeerDataRequest* request, |
313 | | const CacheBlockReqest& cb_req, |
314 | | const io::FileBlockSPtr& fb, |
315 | | const io::UInt128Wrapper& hash, size_t file_size, |
316 | 0 | bool do_fill) { |
317 | 0 | return fmt::format("{}, req_block=[offset={}, size={}], do_fill={}, block={}, cache_file={}", |
318 | 0 | format_peer_request_context(request, hash, file_size), cb_req.block_offset(), |
319 | 0 | cb_req.block_size(), do_fill, fb->get_info_for_log(), fb->get_cache_file()); |
320 | 0 | } |
321 | | |
322 | | std::string format_peer_fill_context(const io::FileBlockSPtr& fb, int64_t fill_tablet_id, |
323 | | const std::string& filename, const std::string& resource_id, |
324 | | const std::string& remote_path, int64_t file_size, |
325 | 1 | int64_t offset, int64_t size, int32_t timeout_ms) { |
326 | 1 | return fmt::format( |
327 | 1 | "tablet_id={}, filename={}, resource_id={}, remote_path={}, file_size={}, " |
328 | 1 | "request_range=[offset={}, size={}], timeout_ms={}, block={}, cache_file={}", |
329 | 1 | fill_tablet_id, filename, resource_id.empty() ? "<unknown>" : resource_id, |
330 | 1 | remote_path.empty() ? "<unknown>" : remote_path, file_size, offset, size, timeout_ms, |
331 | 1 | fb->get_info_for_log(), fb->get_cache_file()); |
332 | 1 | } |
333 | | |
334 | 1 | bool wait_for_file_block_state(const io::FileBlockSPtr& fb, int32_t timeout_ms) { |
335 | 1 | const auto deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(timeout_ms); |
336 | 4 | while (true) { |
337 | 4 | const auto state = fb->state(); |
338 | 4 | if (state == io::FileBlock::State::DOWNLOADED || |
339 | 4 | state == io::FileBlock::State::SKIP_CACHE || state == io::FileBlock::State::EMPTY) { |
340 | 1 | return true; |
341 | 1 | } |
342 | 3 | if (std::chrono::steady_clock::now() >= deadline) { |
343 | 0 | return false; |
344 | 0 | } |
345 | 3 | fb->wait(); |
346 | 3 | } |
347 | 1 | } |
348 | | |
349 | 0 | Status handle_peer_file_range_request(const std::string& path, PFetchPeerDataResponse* response) { |
350 | | // Legacy path: PEER_FILE_RANGE still returns payload via protobuf bytes. |
351 | | // Keep this for compatibility until range path is migrated to attachment mode. |
352 | | // Read specific range [file_offset, file_offset+file_size) across cached blocks |
353 | 0 | auto datas = |
354 | 0 | io::FileCacheFactory::instance()->get_cache_data_by_path(get_peer_cache_filename(path)); |
355 | 0 | for (auto& cb : datas) { |
356 | 0 | *(response->add_datas()) = std::move(cb); |
357 | 0 | } |
358 | 0 | return Status::OK(); |
359 | 0 | } |
360 | | |
361 | 0 | void set_error_response(PFetchPeerDataResponse* response, const std::string& error_msg) { |
362 | 0 | response->mutable_status()->add_error_msgs(error_msg); |
363 | 0 | response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR); |
364 | 0 | } |
365 | | |
366 | 1 | void set_too_many_tasks_response(PFetchPeerDataResponse* response, const std::string& error_msg) { |
367 | 1 | response->mutable_status()->add_error_msgs(error_msg); |
368 | 1 | response->mutable_status()->set_status_code(TStatusCode::TOO_MANY_TASKS); |
369 | 1 | } |
370 | | |
371 | | bool try_reject_if_queue_timed_out(std::chrono::steady_clock::time_point enqueue_ts, |
372 | 2 | PFetchPeerDataResponse* response) { |
373 | 2 | auto wait_us = elapsed_us(enqueue_ts); |
374 | 2 | g_file_cache_get_by_peer_queue_wait_latency << wait_us; |
375 | 2 | auto wait_ms = wait_us / 1000; |
376 | 2 | if (wait_ms <= config::peer_fetch_queue_timeout_ms) { |
377 | 1 | return false; |
378 | 1 | } |
379 | | |
380 | 1 | const std::string msg = fmt::format("fetch peer data queue timeout, wait_ms={}, timeout_ms={}", |
381 | 1 | wait_ms, config::peer_fetch_queue_timeout_ms); |
382 | 1 | g_file_cache_get_by_peer_queue_timeout_num << 1; |
383 | 1 | set_too_many_tasks_response(response, msg); |
384 | 1 | return true; |
385 | 2 | } |
386 | | |
387 | | Status read_file_block(const std::shared_ptr<io::FileBlock>& file_block, size_t file_size, |
388 | 9 | doris::CacheBlockPB* output, butil::IOBuf* response_attachment) { |
389 | 9 | auto total_start = std::chrono::steady_clock::now(); |
390 | 9 | int64_t set_data_us = 0; |
391 | 9 | Defer report {[&]() { |
392 | 9 | g_file_cache_get_by_peer_read_file_block_total_latency << elapsed_us(total_start); |
393 | 9 | if (set_data_us > 0) { |
394 | 2 | g_file_cache_get_by_peer_set_response_data_latency << set_data_us; |
395 | 2 | } |
396 | 9 | }}; |
397 | | // ATTN: calculate the rightmost boundary value of the block, due to inaccurate current block meta information. |
398 | | // see CachedRemoteFileReader::read_at_impl for more details. |
399 | | // Ensure file_size >= file_block->offset() to avoid underflow |
400 | 9 | if (file_size < file_block->offset()) { |
401 | 0 | LOG(WARNING) << "file_size (" << file_size << ") < file_block->offset(" |
402 | 0 | << file_block->offset() << ")"; |
403 | 0 | return Status::InternalError<false>("file_size less than block offset"); |
404 | 0 | } |
405 | 9 | size_t read_size = std::min(static_cast<size_t>(file_size - file_block->offset()), |
406 | 9 | file_block->range().size()); |
407 | 9 | output->set_block_offset(static_cast<int64_t>(file_block->offset())); |
408 | 9 | output->set_block_size(static_cast<int64_t>(read_size)); |
409 | 9 | if (read_size == 0) { |
410 | 0 | return Status::OK(); |
411 | 0 | } |
412 | | |
413 | 9 | Status read_st = Status::OK(); |
414 | | // Attachment payload mode: protobuf carries metadata only, payload goes to attachment. |
415 | | // This allows FS cache to use a file-descriptor->IOBuf path directly. |
416 | 9 | if (response_attachment != nullptr) { |
417 | 7 | size_t bytes_read = 0; |
418 | 7 | auto begin_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
419 | 7 | std::chrono::steady_clock::now().time_since_epoch()) |
420 | 7 | .count(); |
421 | 7 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); |
422 | 7 | read_st = file_block->read_to_iobuf(response_attachment, /*read_offset=*/0, read_size, |
423 | 7 | &bytes_read); |
424 | 7 | auto end_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
425 | 7 | std::chrono::steady_clock::now().time_since_epoch()) |
426 | 7 | .count(); |
427 | 7 | g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts); |
428 | | |
429 | 7 | if (read_st.ok()) { |
430 | 7 | if (bytes_read != read_size) { |
431 | 0 | return Status::InternalError<false>( |
432 | 0 | "peer cache read short data, expected={}, actual={}", read_size, |
433 | 0 | bytes_read); |
434 | 0 | } |
435 | 7 | g_file_cache_get_by_peer_response_bytes_total << bytes_read; |
436 | 7 | return Status::OK(); |
437 | 7 | } |
438 | 7 | } else { |
439 | 2 | std::string data; |
440 | 2 | data.resize(read_size); |
441 | 2 | auto begin_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
442 | 2 | std::chrono::steady_clock::now().time_since_epoch()) |
443 | 2 | .count(); |
444 | 2 | SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->s3_file_buffer_tracker()); |
445 | 2 | Slice slice(data.data(), data.size()); |
446 | 2 | read_st = file_block->read(slice, /*read_offset=*/0); |
447 | 2 | auto end_read_file_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
448 | 2 | std::chrono::steady_clock::now().time_since_epoch()) |
449 | 2 | .count(); |
450 | 2 | g_file_cache_get_by_peer_read_cache_file_latency << (end_read_file_ts - begin_read_file_ts); |
451 | | |
452 | 2 | if (read_st.ok()) { |
453 | 2 | auto set_data_start = std::chrono::steady_clock::now(); |
454 | 2 | output->set_data(std::move(data)); |
455 | 2 | set_data_us = elapsed_us(set_data_start); |
456 | 2 | g_file_cache_get_by_peer_response_bytes_total << read_size; |
457 | 2 | return Status::OK(); |
458 | 2 | } |
459 | 2 | } |
460 | | |
461 | 0 | g_file_cache_get_by_peer_failed_num << 1; |
462 | 0 | LOG(WARNING) << "read cache block failed, file_size=" << file_size |
463 | 0 | << ", block=" << file_block->get_info_for_log() |
464 | 0 | << ", cache_file=" << file_block->get_cache_file() << ", err=" << read_st; |
465 | 0 | return read_st; |
466 | 9 | } |
467 | | |
468 | | // Trigger S3 -> local cache fill for the given file block. |
469 | | // Returns OK when the block is DOWNLOADED after the fill. |
470 | | // Returns TOO_MANY_TASKS when the fill slot is exhausted (server healthy but overloaded): |
471 | | // client should not rotate or evict, just fall back to S3 and retry same candidate later. |
472 | | // Returns NOT_FOUND for soft misses (tablet not found, fill incomplete, timeout): |
473 | | // client should rotate the candidate to try a different CG next time. |
474 | | // The peer uses request.path as the full remote path. tablet_id/filename are kept for logging. |
475 | | Status trigger_peer_server_fill(io::FileBlockSPtr& fb, int64_t fill_tablet_id, |
476 | | const std::string& filename, const std::string& resource_id, |
477 | | const std::string& remote_path, int64_t file_size, int64_t offset, |
478 | 1 | int64_t size, int32_t timeout_ms) { |
479 | 1 | g_peer_server_fill_requested << 1; |
480 | | |
481 | | // Concurrency guard: atomically reserve a fill slot. |
482 | | // Excess requests are rejected so the client falls back to its own S3 read. |
483 | | // Return NOT_FOUND so the client rotates the candidate instead of evicting it. |
484 | 1 | if (g_active_server_fills.fetch_add(1, std::memory_order_relaxed) >= |
485 | 1 | config::max_concurrent_peer_server_fills) { |
486 | 0 | g_active_server_fills.fetch_sub(1, std::memory_order_relaxed); |
487 | 0 | g_peer_server_fill_rejected << 1; |
488 | 0 | VLOG_DEBUG << "trigger_peer_server_fill: rejected (concurrency limit " |
489 | 0 | << config::max_concurrent_peer_server_fills << "), tablet_id=" << fill_tablet_id; |
490 | | // TOO_MANY_TASKS: server is healthy but overloaded. Client must not rotate or evict; |
491 | | // just fall back to S3 for this request and retry the same candidate next time. |
492 | 0 | return Status::Error<ErrorCode::TOO_MANY_TASKS, false>("fill slot exhausted"); |
493 | 0 | } |
494 | | // RAII decrement: runs on every return path below. |
495 | 1 | Defer fill_guard {[]() { g_active_server_fills.fetch_sub(1, std::memory_order_relaxed); }}; |
496 | | |
497 | 1 | if (remote_path.empty() || resource_id.empty()) { |
498 | 0 | const std::string ctx = |
499 | 0 | format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path, |
500 | 0 | file_size, offset, size, timeout_ms); |
501 | 0 | LOG(WARNING) << "trigger_peer_server_fill: missing remote_path or resource_id, " << ctx; |
502 | 0 | g_peer_server_fill_rejected << 1; |
503 | 0 | return Status::NotFound<false>("fill: missing remote_path or resource_id, {}", ctx); |
504 | 0 | } |
505 | 1 | auto storage_resource = doris::get_storage_resource(resource_id); |
506 | 1 | if (!storage_resource.has_value()) { |
507 | 1 | const std::string ctx = |
508 | 1 | format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path, |
509 | 1 | file_size, offset, size, timeout_ms); |
510 | 1 | LOG(WARNING) << "trigger_peer_server_fill: storage resource not found, " << ctx; |
511 | 1 | g_peer_server_fill_rejected << 1; |
512 | 1 | return Status::NotFound<false>("fill: storage resource not found, {}", ctx); |
513 | 1 | } |
514 | 0 | auto fs = storage_resource->first.fs; |
515 | |
|
516 | 0 | const auto initial_state = fb->state(); |
517 | 0 | if (initial_state == io::FileBlock::State::DOWNLOADING) { |
518 | | // Another thread already owns the block downloader. Wait up to the request timeout instead |
519 | | // of the shorter per-wait timeout in FileBlock::wait(). |
520 | 0 | [[maybe_unused]] const bool completed = wait_for_file_block_state(fb, timeout_ms); |
521 | 0 | const std::string ctx = |
522 | 0 | format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path, |
523 | 0 | file_size, offset, size, timeout_ms); |
524 | 0 | return fb->state() == io::FileBlock::State::DOWNLOADED |
525 | 0 | ? Status::OK() |
526 | 0 | : Status::NotFound<false>("fill: concurrent download incomplete, {}", ctx); |
527 | 0 | } |
528 | 0 | if (initial_state != io::FileBlock::State::EMPTY) { |
529 | 0 | const std::string ctx = |
530 | 0 | format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path, |
531 | 0 | file_size, offset, size, timeout_ms); |
532 | 0 | return initial_state == io::FileBlock::State::DOWNLOADED |
533 | 0 | ? Status::OK() |
534 | 0 | : Status::NotFound<false>("fill: unexpected initial block state, {}", ctx); |
535 | 0 | } |
536 | | |
537 | 0 | auto fill_start = std::chrono::steady_clock::now(); |
538 | 0 | auto fill_done = std::make_shared<bthread::CountdownEvent>(1); |
539 | 0 | auto fill_status = std::make_shared<Status>(Status::OK()); |
540 | 0 | io::DownloadFileMeta download_meta { |
541 | 0 | .path = remote_path, |
542 | 0 | .file_size = file_size, |
543 | 0 | .offset = offset, |
544 | 0 | .download_size = size, |
545 | 0 | .file_system = fs, |
546 | 0 | .ctx = {.is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
547 | | // Pull-through fill must go straight to remote storage. If this download |
548 | | // re-enters peer race, the original block can remain DOWNLOADING for the |
549 | | // duration of nested peer retries and timeouts. |
550 | 0 | .is_warmup = false, |
551 | 0 | .bypass_peer_read = true}, |
552 | 0 | .download_done = |
553 | 0 | [fill_done, fill_status](Status st) { |
554 | 0 | *fill_status = std::move(st); |
555 | 0 | fill_done->signal(); |
556 | 0 | }, |
557 | 0 | .tablet_id = fill_tablet_id, |
558 | 0 | }; |
559 | |
|
560 | 0 | io::DownloadTask task(std::move(download_meta)); |
561 | 0 | ExecEnv::GetInstance() |
562 | 0 | ->storage_engine() |
563 | 0 | .to_cloud() |
564 | 0 | .file_cache_block_downloader() |
565 | 0 | .submit_download_task(std::move(task)); |
566 | |
|
567 | 0 | const timespec due_time = butil::milliseconds_from_now(timeout_ms); |
568 | 0 | const bool timed_out = fill_done->timed_wait(due_time) != 0; |
569 | |
|
570 | 0 | int64_t fill_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
571 | 0 | std::chrono::steady_clock::now() - fill_start) |
572 | 0 | .count(); |
573 | 0 | g_peer_server_fill_latency << fill_ms * 1000; // LatencyRecorder takes microseconds |
574 | |
|
575 | 0 | if (!timed_out && fill_status->ok() && fb->state() == io::FileBlock::State::DOWNLOADING) { |
576 | 0 | const int32_t settle_timeout_ms = |
577 | 0 | std::max<int32_t>(1, timeout_ms - static_cast<int32_t>(fill_ms)); |
578 | 0 | [[maybe_unused]] const bool settled = wait_for_file_block_state(fb, settle_timeout_ms); |
579 | 0 | } |
580 | |
|
581 | 0 | auto final_state = fb->state(); |
582 | 0 | if (final_state == io::FileBlock::State::DOWNLOADED) { |
583 | 0 | g_peer_server_fill_success << 1; |
584 | 0 | return Status::OK(); |
585 | 0 | } |
586 | 0 | if (timed_out) { |
587 | 0 | LOG(WARNING) << "trigger_peer_server_fill: fill timeout, elapsed_ms=" << fill_ms << ", " |
588 | 0 | << format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, |
589 | 0 | remote_path, file_size, offset, size, timeout_ms); |
590 | 0 | g_peer_server_fill_timeout << 1; |
591 | 0 | } else if (!fill_status->ok()) { |
592 | 0 | LOG(WARNING) << "trigger_peer_server_fill: fill failed, elapsed_ms=" << fill_ms |
593 | 0 | << ", status=" << fill_status->to_string() << ", " |
594 | 0 | << format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, |
595 | 0 | remote_path, file_size, offset, size, timeout_ms); |
596 | 0 | } |
597 | | // Any non-DOWNLOADED outcome is a soft failure: the server is otherwise healthy so the |
598 | | // client should rotate the candidate rather than evict it. |
599 | 0 | return Status::NotFound<false>( |
600 | 0 | "fill: block not downloaded, {}", |
601 | 0 | format_peer_fill_context(fb, fill_tablet_id, filename, resource_id, remote_path, |
602 | 0 | file_size, offset, size, timeout_ms)); |
603 | 0 | } |
604 | | |
605 | | Status handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request, |
606 | | PFetchPeerDataResponse* response, |
607 | 7 | brpc::Controller* cntl) { |
608 | 7 | auto handle_start = std::chrono::steady_clock::now(); |
609 | 7 | const uint64_t request_blocks = request->cache_req_size(); |
610 | 7 | uint64_t response_blocks = 0; |
611 | 7 | uint64_t get_or_set_calls = 0; |
612 | 7 | uint64_t get_or_set_blocks = 0; |
613 | 7 | int64_t get_cache_us = 0; |
614 | 7 | Defer report {[&]() { |
615 | 7 | g_file_cache_get_by_peer_handle_cache_block_req_latency << elapsed_us(handle_start); |
616 | 7 | g_file_cache_get_by_peer_get_cache_latency << get_cache_us; |
617 | 7 | g_file_cache_get_by_peer_get_or_set_calls << get_or_set_calls; |
618 | 7 | g_file_cache_get_by_peer_get_or_set_blocks_total << get_or_set_blocks; |
619 | 7 | g_file_cache_get_by_peer_request_blocks_total << request_blocks; |
620 | 7 | g_file_cache_get_by_peer_request_blocks_per_rpc << request_blocks; |
621 | 7 | g_file_cache_get_by_peer_response_blocks_total << response_blocks; |
622 | 7 | }}; |
623 | 7 | const auto& path = request->path(); |
624 | 7 | const auto cache_key_path = get_peer_cache_filename(path); |
625 | 7 | auto hash = io::BlockFileCache::hash(cache_key_path); |
626 | 7 | auto get_cache_start = std::chrono::steady_clock::now(); |
627 | 7 | auto* cache = io::FileCacheFactory::instance()->get_by_path(hash); |
628 | 7 | get_cache_us = elapsed_us(get_cache_start); |
629 | 7 | if (cache == nullptr) { |
630 | 0 | g_file_cache_get_by_peer_failed_num << 1; |
631 | 0 | set_error_response(response, "can't get file cache instance"); |
632 | 0 | return Status::InternalError<false>("can't get file cache instance"); |
633 | 0 | } |
634 | | |
635 | 7 | io::CacheContext ctx {}; |
636 | 7 | io::ReadStatistics local_stats; |
637 | 7 | ctx.stats = &local_stats; |
638 | 7 | const size_t file_size = |
639 | 7 | request->has_file_size() |
640 | 7 | ? static_cast<size_t>(std::max<int64_t>(0, request->file_size())) |
641 | 7 | : std::numeric_limits<size_t>::max(); |
642 | | // Enable attachment mode only when client advertises support. |
643 | | // This keeps mixed-version rolling upgrades safe. |
644 | 7 | const bool use_attachment = |
645 | 7 | cntl != nullptr && request->has_support_attachment() && request->support_attachment(); |
646 | 7 | response->set_data_in_attachment(use_attachment); |
647 | 7 | if (use_attachment) { |
648 | 4 | g_file_cache_get_by_peer_attachment_response_num << 1; |
649 | 4 | } else { |
650 | 3 | g_file_cache_get_by_peer_pb_response_num << 1; |
651 | 3 | } |
652 | | |
653 | 7 | const bool do_fill = request->has_request_cache_fill() && request->request_cache_fill() && |
654 | 7 | config::enable_peer_server_cache_fill; |
655 | | |
656 | 9 | for (const auto& cb_req : request->cache_req()) { |
657 | 9 | size_t offset = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_offset())); |
658 | 9 | size_t size = static_cast<size_t>(std::max<int64_t>(0, cb_req.block_size())); |
659 | 9 | if (offset >= file_size) { |
660 | 0 | continue; |
661 | 0 | } |
662 | | // Clip tail requests before get_or_set so peer reads do not synthesize EMPTY blocks past |
663 | | // EOF and then fail the whole RPC. |
664 | 9 | size = std::min(size, file_size - offset); |
665 | 9 | if (size == 0) { |
666 | 0 | continue; |
667 | 0 | } |
668 | 9 | DBUG_EXECUTE_IF( |
669 | 9 | "CloudInternalServiceImpl::handle_peer_file_cache_block_request_hold_before_get_or_" |
670 | 9 | "set", |
671 | 9 | { |
672 | 9 | int sleep_ms = dp->param<int>("sleep_ms", 300); |
673 | 9 | bthread_usleep(sleep_ms * 1000); |
674 | 9 | }); |
675 | 9 | auto get_or_set_start = std::chrono::steady_clock::now(); |
676 | 9 | auto holder = cache->get_or_set(hash, offset, size, ctx); |
677 | 9 | g_file_cache_get_by_peer_get_or_set_latency << elapsed_us(get_or_set_start); |
678 | 9 | ++get_or_set_calls; |
679 | 9 | get_or_set_blocks += holder.file_blocks.size(); |
680 | | |
681 | 10 | for (auto& fb : holder.file_blocks) { |
682 | 10 | auto fb_state = fb->state(); |
683 | 10 | if (fb_state == io::FileBlock::State::DOWNLOADING) { |
684 | 1 | if (do_fill) { |
685 | | // Only peer fill requests should wait longer here. Plain peer-cache reads keep |
686 | | // the short wait semantics so they can fail fast and let the client race S3. |
687 | 1 | [[maybe_unused]] const bool completed = wait_for_file_block_state( |
688 | 1 | fb, config::peer_server_cache_fill_timeout_ms); |
689 | 1 | fb_state = fb->state(); |
690 | 1 | } else { |
691 | | // Wait for in-progress download to complete using the normal short timeout. |
692 | 0 | fb_state = fb->wait(); |
693 | 0 | } |
694 | 1 | } |
695 | 10 | if (fb_state == io::FileBlock::State::EMPTY) { |
696 | 1 | if (!do_fill) { |
697 | 0 | const std::string msg = |
698 | 0 | fmt::format("cache block not downloaded, {}", |
699 | 0 | format_peer_cache_block_context(request, cb_req, fb, hash, |
700 | 0 | file_size, do_fill)); |
701 | 0 | g_file_cache_get_by_peer_failed_num << 1; |
702 | 0 | g_file_cache_get_by_peer_not_downloaded_block_num << 1; |
703 | 0 | LOG(WARNING) << msg; |
704 | | // Use NOT_FOUND so the client can distinguish "block not cached" |
705 | | // from an actual RPC/server error. On NOT_FOUND the client rotates |
706 | | // the candidate to the end of its list (trying another CG next time) |
707 | | // rather than incrementing the RPC-failure eviction counter. |
708 | 0 | response->mutable_status()->add_error_msgs(msg); |
709 | 0 | response->mutable_status()->set_status_code(TStatusCode::NOT_FOUND); |
710 | 0 | return Status::NotFound<false>(msg); |
711 | 0 | } |
712 | | // Server-side fill: request.path already carries the full remote path. |
713 | 1 | auto fill_st = trigger_peer_server_fill( |
714 | 1 | fb, get_rowset_meta_tablet_id_from_request(request), cache_key_path, |
715 | 1 | get_rowset_meta_resource_id_from_request(request), path, |
716 | 1 | request->has_file_size() ? request->file_size() : -1, |
717 | 1 | static_cast<int64_t>(fb->range().left), |
718 | 1 | static_cast<int64_t>(fb->range().size()), |
719 | 1 | config::peer_server_cache_fill_timeout_ms); |
720 | 1 | if (!fill_st.ok()) { |
721 | 1 | g_file_cache_get_by_peer_failed_num << 1; |
722 | 1 | g_file_cache_get_by_peer_not_downloaded_block_num << 1; |
723 | 1 | if (fill_st.is<ErrorCode::TOO_MANY_TASKS>()) { |
724 | | // Server slot exhausted: healthy but overloaded. Client must not rotate |
725 | | // or evict — just fall back to S3 and retry same candidate next time. |
726 | 0 | response->mutable_status()->add_error_msgs(std::string(fill_st.msg())); |
727 | 0 | response->mutable_status()->set_status_code(TStatusCode::TOO_MANY_TASKS); |
728 | 1 | } else if (fill_st.is<ErrorCode::NOT_FOUND>()) { |
729 | | // Soft miss (fill incomplete, timeout, unexpected state) — client rotates, |
730 | | // not evicts. |
731 | 1 | response->mutable_status()->add_error_msgs(std::string(fill_st.msg())); |
732 | 1 | response->mutable_status()->set_status_code(TStatusCode::NOT_FOUND); |
733 | 1 | } else { |
734 | 0 | LOG(WARNING) << "cache block fill failed, status=" << fill_st << ", " |
735 | 0 | << format_peer_cache_block_context(request, cb_req, fb, hash, |
736 | 0 | file_size, do_fill); |
737 | 0 | set_error_response(response, "cache block not ready"); |
738 | 0 | } |
739 | 1 | return fill_st; |
740 | 1 | } |
741 | 0 | fb_state = io::FileBlock::State::DOWNLOADED; |
742 | 0 | } |
743 | 9 | if (fb_state != io::FileBlock::State::DOWNLOADED) { |
744 | | // A concurrent download was in progress (DOWNLOADING at request time) but its |
745 | | // wait() returned a non-DOWNLOADED state (e.g., timed-out while still |
746 | | // DOWNLOADING, or some other non-EMPTY intermediate state). The server is |
747 | | // healthy; the block just isn't available yet. Return NOT_FOUND so the client |
748 | | // rotates the candidate instead of evicting it. |
749 | 0 | const std::string msg = |
750 | 0 | fmt::format("cache block not ready after wait, {}", |
751 | 0 | format_peer_cache_block_context(request, cb_req, fb, hash, |
752 | 0 | file_size, do_fill)); |
753 | 0 | g_file_cache_get_by_peer_failed_num << 1; |
754 | 0 | g_file_cache_get_by_peer_not_downloaded_block_num << 1; |
755 | 0 | LOG(WARNING) << msg; |
756 | 0 | response->mutable_status()->add_error_msgs(msg); |
757 | 0 | response->mutable_status()->set_status_code(TStatusCode::NOT_FOUND); |
758 | 0 | return Status::NotFound<false>(msg); |
759 | 0 | } |
760 | | |
761 | 9 | g_file_cache_get_by_peer_blocks_num << 1; |
762 | 9 | doris::CacheBlockPB* out = response->add_datas(); |
763 | | // In attachment mode, metadata order must match attachment append order because the |
764 | | // client consumes attachment payload sequentially using resp.datas() order. |
765 | 9 | Status read_status = read_file_block( |
766 | 9 | fb, file_size, out, use_attachment ? &cntl->response_attachment() : nullptr); |
767 | 9 | if (!read_status.ok()) { |
768 | 0 | set_error_response(response, "read cache file error"); |
769 | 0 | return read_status; |
770 | 0 | } |
771 | 9 | ++response_blocks; |
772 | 9 | } |
773 | 9 | } |
774 | | |
775 | 6 | return Status::OK(); |
776 | 7 | } |
777 | | } // namespace |
778 | | |
779 | | #ifdef BE_TEST |
780 | | Status test_handle_peer_file_cache_block_request(const PFetchPeerDataRequest* request, |
781 | | PFetchPeerDataResponse* response, |
782 | 7 | brpc::Controller* cntl) { |
783 | 7 | return handle_peer_file_cache_block_request(request, response, cntl); |
784 | 7 | } |
785 | | |
786 | | bool test_try_reject_if_queue_timed_out(std::chrono::steady_clock::time_point enqueue_ts, |
787 | 2 | PFetchPeerDataResponse* response) { |
788 | 2 | return try_reject_if_queue_timed_out(enqueue_ts, response); |
789 | 2 | } |
790 | | #endif |
791 | | |
792 | | void CloudInternalServiceImpl::fetch_peer_data(google::protobuf::RpcController* controller, |
793 | | const PFetchPeerDataRequest* request, |
794 | | PFetchPeerDataResponse* response, |
795 | 0 | google::protobuf::Closure* done) { |
796 | 0 | auto enqueue_ts = std::chrono::steady_clock::now(); |
797 | | // Lifetime: cntl is owned by brpc framework and valid until done->Run() is called. |
798 | | // The ClosureGuard inside the lambda ensures done->Run() happens after all cntl usage, |
799 | | // so capturing the raw pointer by value is safe. |
800 | 0 | auto* cntl = static_cast<brpc::Controller*>(controller); |
801 | 0 | bool ret = _peer_fetch_pool.try_offer([request, response, done, enqueue_ts, cntl]() { |
802 | 0 | brpc::ClosureGuard closure_guard(done); |
803 | 0 | g_file_cache_get_by_peer_num << 1; |
804 | 0 | if (try_reject_if_queue_timed_out(enqueue_ts, response)) { |
805 | 0 | return; |
806 | 0 | } |
807 | | |
808 | 0 | if (!config::enable_file_cache) { |
809 | 0 | LOG_WARNING("try to access file cache data, but file cache not enabled"); |
810 | 0 | return; |
811 | 0 | } |
812 | | |
813 | 0 | auto begin_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
814 | 0 | std::chrono::steady_clock::now().time_since_epoch()) |
815 | 0 | .count(); |
816 | |
|
817 | 0 | const auto type = request->type(); |
818 | 0 | const auto& path = request->path(); |
819 | 0 | response->mutable_status()->set_status_code(TStatusCode::OK); |
820 | |
|
821 | 0 | Status status = Status::OK(); |
822 | 0 | if (type == PFetchPeerDataRequest_Type_PEER_FILE_RANGE) { |
823 | 0 | status = handle_peer_file_range_request(path, response); |
824 | 0 | } else if (type == PFetchPeerDataRequest_Type_PEER_FILE_CACHE_BLOCK) { |
825 | 0 | status = handle_peer_file_cache_block_request(request, response, cntl); |
826 | 0 | } |
827 | |
|
828 | 0 | if (!status.ok()) { |
829 | 0 | const std::string msg = |
830 | 0 | "fetch peer data failed: " + status.to_string() + ", " + |
831 | 0 | format_peer_request_context( |
832 | 0 | request, io::BlockFileCache::hash(get_peer_cache_filename(path)), |
833 | 0 | request->has_file_size() ? static_cast<size_t>(std::max<int64_t>( |
834 | 0 | 0, request->file_size())) |
835 | 0 | : std::numeric_limits<size_t>::max()); |
836 | 0 | if (status.is<ErrorCode::NOT_FOUND>() || status.is<ErrorCode::TOO_MANY_TASKS>()) { |
837 | 0 | VLOG_DEBUG << msg; |
838 | 0 | } else { |
839 | 0 | LOG(WARNING) << msg; |
840 | 0 | } |
841 | 0 | auto* resp_status = response->mutable_status(); |
842 | 0 | if (resp_status->status_code() == TStatusCode::OK) { |
843 | 0 | set_error_response(response, status.to_string()); |
844 | 0 | } else if (resp_status->error_msgs().empty()) { |
845 | 0 | resp_status->add_error_msgs(status.to_string()); |
846 | 0 | } |
847 | 0 | } |
848 | |
|
849 | 0 | DBUG_EXECUTE_IF("CloudInternalServiceImpl::fetch_peer_data_slower", { |
850 | 0 | int st_us = dp->param<int>("sleep", 1000); |
851 | 0 | LOG_WARNING("CloudInternalServiceImpl::fetch_peer_data_slower").tag("sleep", st_us); |
852 | 0 | bthread_usleep(st_us); |
853 | 0 | }); |
854 | |
|
855 | 0 | auto end_ts = std::chrono::duration_cast<std::chrono::microseconds>( |
856 | 0 | std::chrono::steady_clock::now().time_since_epoch()) |
857 | 0 | .count(); |
858 | | // Latency covers every completed callback (including failures) so the |
859 | | // server-side fail-fast paths still show up in the latency histogram. |
860 | | // success_num must only count actual OK results, otherwise dedup |
861 | | // TOO_MANY_TASKS / NOT_FOUND / handler errors all fall through here |
862 | | // and the success rate is meaningless. Use file_cache_get_by_peer_num |
863 | | // for the total completed-callback count. |
864 | 0 | g_file_cache_get_by_peer_server_latency << (end_ts - begin_ts); |
865 | 0 | if (status.ok()) { |
866 | 0 | g_file_cache_get_by_peer_success_num << 1; |
867 | 0 | } |
868 | |
|
869 | 0 | VLOG_DEBUG << "fetch cache request=" << request->DebugString() |
870 | 0 | << ", response=" << response->DebugString(); |
871 | 0 | }); |
872 | |
|
873 | 0 | if (!ret) { |
874 | 0 | g_file_cache_get_by_peer_offer_failed_num << 1; |
875 | 0 | brpc::ClosureGuard closure_guard(done); |
876 | 0 | const std::string msg = fmt::format( |
877 | 0 | "fail to offer fetch peer data request to the peer fetch work pool, pool={}", |
878 | 0 | _peer_fetch_pool.get_info()); |
879 | 0 | set_too_many_tasks_response(response, msg); |
880 | 0 | LOG(WARNING) << msg; |
881 | 0 | } |
882 | 0 | } |
883 | | |
884 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_num( |
885 | | "file_cache_event_driven_warm_up_submitted_segment_num"); |
886 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_segment_num( |
887 | | "file_cache_event_driven_warm_up_finished_segment_num"); |
888 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_segment_num( |
889 | | "file_cache_event_driven_warm_up_failed_segment_num"); |
890 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_segment_size( |
891 | | "file_cache_event_driven_warm_up_submitted_segment_size"); |
892 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_segment_size( |
893 | | "file_cache_event_driven_warm_up_finished_segment_size"); |
894 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_segment_size( |
895 | | "file_cache_event_driven_warm_up_failed_segment_size"); |
896 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_index_num( |
897 | | "file_cache_event_driven_warm_up_submitted_index_num"); |
898 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_index_num( |
899 | | "file_cache_event_driven_warm_up_finished_index_num"); |
900 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_index_num( |
901 | | "file_cache_event_driven_warm_up_failed_index_num"); |
902 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_submitted_index_size( |
903 | | "file_cache_event_driven_warm_up_submitted_index_size"); |
904 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_finished_index_size( |
905 | | "file_cache_event_driven_warm_up_finished_index_size"); |
906 | | bvar::Adder<uint64_t> g_file_cache_event_driven_warm_up_failed_index_size( |
907 | | "file_cache_event_driven_warm_up_failed_index_size"); |
908 | | bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_handle_unix_ts( |
909 | | "file_cache_warm_up_rowset_last_handle_unix_ts", 0); |
910 | | bvar::Status<int64_t> g_file_cache_warm_up_rowset_last_finish_unix_ts( |
911 | | "file_cache_warm_up_rowset_last_finish_unix_ts", 0); |
912 | | bvar::LatencyRecorder g_file_cache_warm_up_rowset_latency("file_cache_warm_up_rowset_latency"); |
913 | | bvar::LatencyRecorder g_file_cache_warm_up_rowset_request_to_handle_latency( |
914 | | "file_cache_warm_up_rowset_request_to_handle_latency"); |
915 | | bvar::LatencyRecorder g_file_cache_warm_up_rowset_handle_to_finish_latency( |
916 | | "file_cache_warm_up_rowset_handle_to_finish_latency"); |
917 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_slow_count( |
918 | | "file_cache_warm_up_rowset_slow_count"); |
919 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_request_to_handle_slow_count( |
920 | | "file_cache_warm_up_rowset_request_to_handle_slow_count"); |
921 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_handle_to_finish_slow_count( |
922 | | "file_cache_warm_up_rowset_handle_to_finish_slow_count"); |
923 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_num( |
924 | | "file_cache_warm_up_rowset_wait_for_compaction_num"); |
925 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num( |
926 | | "file_cache_warm_up_rowset_wait_for_compaction_timeout_num"); |
927 | | |
928 | | // Per-job windowed metrics for target BE |
929 | | // bvar::Window enforces MAX_SECONDS_LIMIT = 3600, so the longest window is 1h. |
930 | | static constexpr int WINDOW_5M = 300; |
931 | | static constexpr int WINDOW_30M = 1800; |
932 | | static constexpr int WINDOW_1H = 3600; |
933 | | |
934 | | MBvarWindowedAdder g_warmup_ed_finish_segment_num("warmup_ed_finish_segment_num", {"job_id"}, |
935 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
936 | | MBvarWindowedAdder g_warmup_ed_finish_segment_size("warmup_ed_finish_segment_size", {"job_id"}, |
937 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
938 | | MBvarWindowedAdder g_warmup_ed_finish_index_num("warmup_ed_finish_index_num", {"job_id"}, |
939 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
940 | | MBvarWindowedAdder g_warmup_ed_finish_index_size("warmup_ed_finish_index_size", {"job_id"}, |
941 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
942 | | MBvarWindowedAdder g_warmup_ed_fail_segment_num("warmup_ed_fail_segment_num", {"job_id"}, |
943 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
944 | | MBvarWindowedAdder g_warmup_ed_fail_segment_size("warmup_ed_fail_segment_size", {"job_id"}, |
945 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
946 | | MBvarWindowedAdder g_warmup_ed_fail_index_num("warmup_ed_fail_index_num", {"job_id"}, |
947 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
948 | | MBvarWindowedAdder g_warmup_ed_fail_index_size("warmup_ed_fail_index_size", {"job_id"}, |
949 | | {WINDOW_5M, WINDOW_30M, WINDOW_1H}, false); |
950 | | bvar::MultiDimension<bvar::Status<int64_t>> g_warmup_ed_last_finish_ts({"job_id"}); |
951 | | |
952 | 0 | void update_warmup_ed_last_finish_ts(const std::string& job_id_str) { |
953 | 0 | auto* finish_ts = g_warmup_ed_last_finish_ts.get_stats(std::list<std::string> {job_id_str}); |
954 | 0 | if (finish_ts) { |
955 | 0 | finish_ts->set_value(std::chrono::duration_cast<std::chrono::milliseconds>( |
956 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
957 | 0 | .count()); |
958 | 0 | } |
959 | 0 | } |
960 | | |
961 | 0 | void record_warmup_ed_finish_segment(const std::string& job_id_str, int64_t segment_size) { |
962 | 0 | g_warmup_ed_finish_segment_num.put({job_id_str}, 1); |
963 | 0 | g_warmup_ed_finish_segment_size.put({job_id_str}, segment_size); |
964 | 0 | update_warmup_ed_last_finish_ts(job_id_str); |
965 | 0 | } |
966 | | |
967 | 0 | void record_warmup_ed_finish_index(const std::string& job_id_str, int64_t idx_size) { |
968 | 0 | g_warmup_ed_finish_index_num.put({job_id_str}, 1); |
969 | 0 | g_warmup_ed_finish_index_size.put({job_id_str}, idx_size); |
970 | 0 | update_warmup_ed_last_finish_ts(job_id_str); |
971 | 0 | } |
972 | | |
973 | 0 | void record_warmup_ed_fail_segment(const std::string& job_id_str, int64_t segment_size) { |
974 | 0 | g_warmup_ed_fail_segment_num.put({job_id_str}, 1); |
975 | 0 | g_warmup_ed_fail_segment_size.put({job_id_str}, segment_size); |
976 | 0 | } |
977 | | |
978 | 0 | void record_warmup_ed_fail_index(const std::string& job_id_str, int64_t idx_size) { |
979 | 0 | g_warmup_ed_fail_index_num.put({job_id_str}, 1); |
980 | 0 | g_warmup_ed_fail_index_size.put({job_id_str}, idx_size); |
981 | 0 | } |
982 | | |
983 | | void record_warmup_ed_skipped_rowset_as_finished(RowsetMeta& rs_meta, |
984 | 0 | const std::string& job_id_str) { |
985 | 0 | auto schema_ptr = rs_meta.tablet_schema(); |
986 | 0 | bool has_inverted_index = schema_ptr->has_inverted_index() || schema_ptr->has_ann_index(); |
987 | 0 | auto idx_version = schema_ptr->get_inverted_index_storage_format(); |
988 | 0 | for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) { |
989 | 0 | record_warmup_ed_finish_segment(job_id_str, rs_meta.segment_file_size(segment_id)); |
990 | |
|
991 | 0 | if (!has_inverted_index) { |
992 | 0 | continue; |
993 | 0 | } |
994 | 0 | auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id); |
995 | 0 | if (idx_version == InvertedIndexStorageFormatPB::V1) { |
996 | 0 | std::unordered_map<int64_t, int64_t> index_size_map; |
997 | 0 | for (const auto& info : inverted_index_info.index_info()) { |
998 | 0 | if (info.index_file_size() != -1) { |
999 | 0 | index_size_map[info.index_id()] = info.index_file_size(); |
1000 | 0 | } else { |
1001 | 0 | VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id |
1002 | 0 | << ", index_id " << info.index_id(); |
1003 | 0 | } |
1004 | 0 | } |
1005 | 0 | for (const auto& index : schema_ptr->inverted_indexes()) { |
1006 | 0 | record_warmup_ed_finish_index(job_id_str, index_size_map[index->index_id()]); |
1007 | 0 | } |
1008 | 0 | } else { // InvertedIndexStorageFormatPB::V2 |
1009 | 0 | int64_t idx_size = 0; |
1010 | 0 | if (inverted_index_info.has_index_size()) { |
1011 | 0 | idx_size = inverted_index_info.index_size(); |
1012 | 0 | } else { |
1013 | 0 | VLOG_DEBUG << "index_size is not set for segment " << segment_id; |
1014 | 0 | } |
1015 | 0 | record_warmup_ed_finish_index(job_id_str, idx_size); |
1016 | 0 | } |
1017 | 0 | } |
1018 | 0 | } |
1019 | | |
1020 | | void handle_segment_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id, |
1021 | | int64_t segment_id, std::shared_ptr<CloudTablet> tablet, |
1022 | | std::shared_ptr<bthread::CountdownEvent> wait, Version version, |
1023 | | int64_t segment_size, int64_t request_ts, int64_t handle_ts, |
1024 | 0 | std::string job_id_str, int64_t upstream_trigger_ts_ms) { |
1025 | 0 | DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_segment", { |
1026 | 0 | auto sleep_time = dp->param<int>("sleep", 3); |
1027 | 0 | LOG_INFO("[verbose] block download for rowset={}, version={}, sleep={}", |
1028 | 0 | rowset_id.to_string(), version.to_string(), sleep_time); |
1029 | 0 | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
1030 | 0 | }); |
1031 | 0 | DBUG_EXECUTE_IF( |
1032 | 0 | "CloudInternalServiceImpl::warm_up_rowset.download_segment.inject_" |
1033 | 0 | "error", |
1034 | 0 | { |
1035 | 0 | st = Status::InternalError("injected error"); |
1036 | 0 | LOG_INFO("[verbose] inject error, tablet={}, rowset={}, st={}", tablet_id, |
1037 | 0 | rowset_id.to_string(), st.to_string()); |
1038 | 0 | }); |
1039 | 0 | if (st.ok()) { |
1040 | 0 | g_file_cache_event_driven_warm_up_finished_segment_num << 1; |
1041 | 0 | g_file_cache_event_driven_warm_up_finished_segment_size << segment_size; |
1042 | 0 | record_warmup_ed_finish_segment(job_id_str, segment_size); |
1043 | 0 | int64_t now_ts = current_unix_time_us(); |
1044 | 0 | g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts); |
1045 | 0 | auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts); |
1046 | 0 | if (rowset_latency_us.has_value()) { |
1047 | 0 | g_file_cache_warm_up_rowset_latency << *rowset_latency_us; |
1048 | 0 | } |
1049 | 0 | g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts); |
1050 | 0 | if (rowset_latency_us.has_value() && |
1051 | 0 | *rowset_latency_us > config::warm_up_rowset_slow_log_ms * 1000) { |
1052 | 0 | g_file_cache_warm_up_rowset_slow_count << 1; |
1053 | 0 | LOG(INFO) << "warm up rowset took " << *rowset_latency_us |
1054 | 0 | << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string() |
1055 | 0 | << ", segment_id: " << segment_id; |
1056 | 0 | } |
1057 | 0 | if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) { |
1058 | 0 | g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1; |
1059 | 0 | LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts |
1060 | 0 | << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string() |
1061 | 0 | << ", segment_id: " << segment_id; |
1062 | 0 | } |
1063 | 0 | } else { |
1064 | 0 | g_file_cache_event_driven_warm_up_failed_segment_num << 1; |
1065 | 0 | g_file_cache_event_driven_warm_up_failed_segment_size << segment_size; |
1066 | 0 | record_warmup_ed_fail_segment(job_id_str, segment_size); |
1067 | 0 | LOG(WARNING) << "download segment failed, tablet_id: " << tablet_id |
1068 | 0 | << " rowset_id: " << rowset_id.to_string() << ", error: " << st; |
1069 | 0 | } |
1070 | 0 | if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, st, 1, |
1071 | 0 | 0) |
1072 | 0 | .trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) { |
1073 | 0 | VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string() |
1074 | 0 | << ") completed"; |
1075 | 0 | } |
1076 | 0 | g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms); |
1077 | 0 | if (wait) { |
1078 | 0 | wait->signal(); |
1079 | 0 | } |
1080 | 0 | } |
1081 | | |
1082 | | void handle_inverted_index_download_done(Status st, int64_t tablet_id, const RowsetId& rowset_id, |
1083 | | int64_t segment_id, std::string index_path, |
1084 | | std::shared_ptr<CloudTablet> tablet, |
1085 | | std::shared_ptr<bthread::CountdownEvent> wait, |
1086 | | Version version, uint64_t idx_size, int64_t request_ts, |
1087 | | int64_t handle_ts, std::string job_id_str, |
1088 | 0 | int64_t upstream_trigger_ts_ms) { |
1089 | 0 | DBUG_EXECUTE_IF("CloudInternalServiceImpl::warm_up_rowset.download_inverted_idx", { |
1090 | 0 | auto sleep_time = dp->param<int>("sleep", 3); |
1091 | 0 | LOG_INFO( |
1092 | 0 | "[verbose] block download for rowset={}, inverted index " |
1093 | 0 | "file={}, sleep={}", |
1094 | 0 | rowset_id.to_string(), index_path, sleep_time); |
1095 | 0 | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
1096 | 0 | }); |
1097 | 0 | if (st.ok()) { |
1098 | 0 | g_file_cache_event_driven_warm_up_finished_index_num << 1; |
1099 | 0 | g_file_cache_event_driven_warm_up_finished_index_size << idx_size; |
1100 | 0 | record_warmup_ed_finish_index(job_id_str, static_cast<int64_t>(idx_size)); |
1101 | 0 | int64_t now_ts = current_unix_time_us(); |
1102 | 0 | g_file_cache_warm_up_rowset_last_finish_unix_ts.set_value(now_ts); |
1103 | 0 | auto rowset_latency_us = warm_up_rowset_cross_host_latency_us(request_ts, now_ts); |
1104 | 0 | if (rowset_latency_us.has_value()) { |
1105 | 0 | g_file_cache_warm_up_rowset_latency << *rowset_latency_us; |
1106 | 0 | } |
1107 | 0 | g_file_cache_warm_up_rowset_handle_to_finish_latency << (now_ts - handle_ts); |
1108 | 0 | if (rowset_latency_us.has_value() && |
1109 | 0 | *rowset_latency_us > config::warm_up_rowset_slow_log_ms * 1000) { |
1110 | 0 | g_file_cache_warm_up_rowset_slow_count << 1; |
1111 | 0 | LOG(INFO) << "warm up rowset took " << *rowset_latency_us |
1112 | 0 | << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string() |
1113 | 0 | << ", segment_id: " << segment_id; |
1114 | 0 | } |
1115 | 0 | if (now_ts - handle_ts > config::warm_up_rowset_slow_log_ms * 1000) { |
1116 | 0 | g_file_cache_warm_up_rowset_handle_to_finish_slow_count << 1; |
1117 | 0 | LOG(INFO) << "warm up rowset (handle to finish) took " << now_ts - handle_ts |
1118 | 0 | << " us, tablet_id: " << tablet_id << ", rowset_id: " << rowset_id.to_string() |
1119 | 0 | << ", segment_id: " << segment_id; |
1120 | 0 | } |
1121 | 0 | } else { |
1122 | 0 | g_file_cache_event_driven_warm_up_failed_index_num << 1; |
1123 | 0 | g_file_cache_event_driven_warm_up_failed_index_size << idx_size; |
1124 | 0 | record_warmup_ed_fail_index(job_id_str, static_cast<int64_t>(idx_size)); |
1125 | 0 | LOG(WARNING) << "download inverted index failed, tablet_id: " << tablet_id |
1126 | 0 | << " rowset_id: " << rowset_id << ", error: " << st; |
1127 | 0 | } |
1128 | 0 | if (tablet->complete_rowset_segment_warmup(WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, st, 0, |
1129 | 0 | 1) |
1130 | 0 | .trigger_source == WarmUpTriggerSource::EVENT_DRIVEN) { |
1131 | 0 | VLOG_DEBUG << "warmup rowset " << version.to_string() << "(" << rowset_id.to_string() |
1132 | 0 | << ") completed"; |
1133 | 0 | } |
1134 | 0 | g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, upstream_trigger_ts_ms); |
1135 | 0 | if (wait) { |
1136 | 0 | wait->signal(); |
1137 | 0 | } |
1138 | 0 | } |
1139 | | |
1140 | | void CloudInternalServiceImpl::warm_up_rowset(google::protobuf::RpcController* controller |
1141 | | [[maybe_unused]], |
1142 | | const PWarmUpRowsetRequest* request, |
1143 | | PWarmUpRowsetResponse* response, |
1144 | 0 | google::protobuf::Closure* done) { |
1145 | 0 | brpc::ClosureGuard closure_guard(done); |
1146 | 0 | std::shared_ptr<bthread::CountdownEvent> wait = nullptr; |
1147 | 0 | timespec due_time; |
1148 | 0 | if (request->has_sync_wait_timeout_ms() && request->sync_wait_timeout_ms() > 0) { |
1149 | 0 | g_file_cache_warm_up_rowset_wait_for_compaction_num << 1; |
1150 | 0 | wait = std::make_shared<bthread::CountdownEvent>(0); |
1151 | 0 | VLOG_DEBUG << "sync_wait_timeout: " << request->sync_wait_timeout_ms() << " ms"; |
1152 | 0 | due_time = butil::milliseconds_from_now(request->sync_wait_timeout_ms()); |
1153 | 0 | } |
1154 | | |
1155 | | // Extract job_id from request (0 if not set, for backward compatibility) |
1156 | 0 | std::string job_id_str = std::to_string(request->has_job_id() ? request->job_id() : 0); |
1157 | 0 | int64_t upstream_trigger_ts_ms = |
1158 | 0 | request->has_upstream_trigger_ts_ms() ? request->upstream_trigger_ts_ms() : 0; |
1159 | |
|
1160 | 0 | for (auto& rs_meta_pb : request->rowset_metas()) { |
1161 | 0 | RowsetMeta rs_meta; |
1162 | 0 | rs_meta.init_from_pb(rs_meta_pb); |
1163 | 0 | auto storage_resource = rs_meta.remote_storage_resource(); |
1164 | 0 | if (!storage_resource) { |
1165 | 0 | LOG(WARNING) << storage_resource.error(); |
1166 | 0 | continue; |
1167 | 0 | } |
1168 | 0 | int64_t tablet_id = rs_meta.tablet_id(); |
1169 | 0 | auto rowset_id = rs_meta.rowset_id(); |
1170 | 0 | bool local_only = !(request->has_skip_existence_check() && request->skip_existence_check()); |
1171 | 0 | auto res = _engine.tablet_mgr().get_tablet(tablet_id, /* warmup_data = */ false, |
1172 | 0 | /* sync_delete_bitmap = */ true, |
1173 | 0 | /* sync_stats = */ nullptr, |
1174 | 0 | /* local_only = */ local_only); |
1175 | 0 | if (!res.has_value()) { |
1176 | 0 | LOG_WARNING("Warm up error ").tag("tablet_id", tablet_id).error(res.error()); |
1177 | 0 | if (res.error().msg().find("local_only=true") != std::string::npos || |
1178 | 0 | res.error().msg().find("force_use_only_cached=true") != std::string::npos) { |
1179 | 0 | res.error().set_code(ErrorCode::TABLE_NOT_FOUND); |
1180 | 0 | } |
1181 | 0 | res.error().to_protobuf(response->mutable_status()); |
1182 | 0 | continue; |
1183 | 0 | } |
1184 | 0 | auto tablet = res.value(); |
1185 | 0 | auto tablet_meta = tablet->tablet_meta(); |
1186 | |
|
1187 | 0 | int64_t handle_ts = current_unix_time_us(); |
1188 | 0 | g_file_cache_warm_up_rowset_last_handle_unix_ts.set_value(handle_ts); |
1189 | 0 | int64_t request_ts = request->has_unix_ts_us() ? request->unix_ts_us() : 0; |
1190 | 0 | auto request_to_handle_latency_us = |
1191 | 0 | warm_up_rowset_cross_host_latency_us(request_ts, handle_ts); |
1192 | 0 | if (request_to_handle_latency_us.has_value()) { |
1193 | 0 | g_file_cache_warm_up_rowset_request_to_handle_latency << *request_to_handle_latency_us; |
1194 | 0 | } |
1195 | 0 | if (request_to_handle_latency_us.has_value() && |
1196 | 0 | *request_to_handle_latency_us > config::warm_up_rowset_slow_log_ms * 1000) { |
1197 | 0 | g_file_cache_warm_up_rowset_request_to_handle_slow_count << 1; |
1198 | 0 | LOG(INFO) << "warm up rowset (request to handle) took " << *request_to_handle_latency_us |
1199 | 0 | << " us, tablet_id: " << rs_meta.tablet_id() |
1200 | 0 | << ", rowset_id: " << rowset_id.to_string(); |
1201 | 0 | } |
1202 | 0 | int64_t expiration_time = tablet_meta->ttl_seconds(); |
1203 | |
|
1204 | 0 | if (!tablet->add_rowset_warmup_state(rs_meta, WarmUpTriggerSource::EVENT_DRIVEN)) { |
1205 | 0 | LOG(INFO) << "found duplicate warmup task for rowset " << rowset_id.to_string() |
1206 | 0 | << ", skip it"; |
1207 | 0 | g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, |
1208 | 0 | upstream_trigger_ts_ms); |
1209 | 0 | record_warmup_ed_skipped_rowset_as_finished(rs_meta, job_id_str); |
1210 | 0 | continue; |
1211 | 0 | } |
1212 | 0 | if (rs_meta.num_segments() == 0) { |
1213 | 0 | g_warmup_ed_downstream_progress_tracker.record_task_done(job_id_str, |
1214 | 0 | upstream_trigger_ts_ms); |
1215 | 0 | } |
1216 | |
|
1217 | 0 | for (int64_t segment_id = 0; segment_id < rs_meta.num_segments(); segment_id++) { |
1218 | 0 | if (!config::file_cache_enable_only_warm_up_idx) { |
1219 | 0 | auto segment_size = rs_meta.segment_file_size(segment_id); |
1220 | | |
1221 | | // Use rs_meta.fs() instead of storage_resource.value()->fs to support packed files. |
1222 | | // PackedFileSystem wrapper in rs_meta.fs() handles the index_map lookup and |
1223 | | // reads from the correct packed file. |
1224 | 0 | io::DownloadFileMeta download_meta { |
1225 | 0 | .path = storage_resource.value()->remote_segment_path(rs_meta, segment_id), |
1226 | 0 | .file_size = segment_size, |
1227 | 0 | .offset = 0, |
1228 | 0 | .download_size = segment_size, |
1229 | 0 | .file_system = rs_meta.fs(), |
1230 | 0 | .ctx = {.is_index_data = false, |
1231 | 0 | .expiration_time = expiration_time, |
1232 | 0 | .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
1233 | 0 | .is_warmup = true}, |
1234 | 0 | .download_done = |
1235 | 0 | [=, version = rs_meta.version()](Status st) { |
1236 | 0 | handle_segment_download_done( |
1237 | 0 | st, tablet_id, rowset_id, segment_id, tablet, wait, |
1238 | 0 | version, segment_size, request_ts, handle_ts, |
1239 | 0 | job_id_str, upstream_trigger_ts_ms); |
1240 | 0 | }, |
1241 | 0 | .tablet_id = tablet_id}; |
1242 | |
|
1243 | 0 | g_file_cache_event_driven_warm_up_submitted_segment_num << 1; |
1244 | 0 | g_file_cache_event_driven_warm_up_submitted_segment_size << segment_size; |
1245 | 0 | if (wait) { |
1246 | 0 | wait->add_count(); |
1247 | 0 | } |
1248 | 0 | g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str, |
1249 | 0 | upstream_trigger_ts_ms); |
1250 | |
|
1251 | 0 | _engine.file_cache_block_downloader().submit_download_task(download_meta); |
1252 | 0 | } |
1253 | | |
1254 | | // Use rs_meta.fs() to support packed files for inverted index download. |
1255 | 0 | auto download_inverted_index = [&, tablet, job_id_str](std::string index_path, |
1256 | 0 | uint64_t idx_size) { |
1257 | 0 | io::DownloadFileMeta download_meta { |
1258 | 0 | .path = io::Path(index_path), |
1259 | 0 | .file_size = static_cast<int64_t>(idx_size), |
1260 | 0 | .file_system = rs_meta.fs(), |
1261 | 0 | .ctx = {.is_index_data = false, // DORIS-20877 |
1262 | 0 | .expiration_time = expiration_time, |
1263 | 0 | .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
1264 | 0 | .is_warmup = true}, |
1265 | 0 | .download_done = |
1266 | 0 | [=, version = rs_meta.version()](Status st) { |
1267 | 0 | handle_inverted_index_download_done( |
1268 | 0 | st, tablet_id, rowset_id, segment_id, index_path, |
1269 | 0 | tablet, wait, version, idx_size, request_ts, handle_ts, |
1270 | 0 | job_id_str, upstream_trigger_ts_ms); |
1271 | 0 | }, |
1272 | 0 | .tablet_id = tablet_id, |
1273 | 0 | }; |
1274 | 0 | g_file_cache_event_driven_warm_up_submitted_index_num << 1; |
1275 | 0 | g_file_cache_event_driven_warm_up_submitted_index_size << idx_size; |
1276 | 0 | tablet->update_rowset_warmup_state_inverted_idx_num( |
1277 | 0 | WarmUpTriggerSource::EVENT_DRIVEN, rowset_id, 1); |
1278 | 0 | if (wait) { |
1279 | 0 | wait->add_count(); |
1280 | 0 | } |
1281 | 0 | g_warmup_ed_downstream_progress_tracker.record_task_submit(job_id_str, |
1282 | 0 | upstream_trigger_ts_ms); |
1283 | 0 | _engine.file_cache_block_downloader().submit_download_task(download_meta); |
1284 | 0 | }; |
1285 | | |
1286 | | // inverted index |
1287 | 0 | auto schema_ptr = rs_meta.tablet_schema(); |
1288 | 0 | auto idx_version = schema_ptr->get_inverted_index_storage_format(); |
1289 | |
|
1290 | 0 | if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) { |
1291 | 0 | if (idx_version == InvertedIndexStorageFormatPB::V1) { |
1292 | 0 | auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id); |
1293 | 0 | std::unordered_map<int64_t, int64_t> index_size_map; |
1294 | 0 | for (const auto& info : inverted_index_info.index_info()) { |
1295 | 0 | if (info.index_file_size() != -1) { |
1296 | 0 | index_size_map[info.index_id()] = info.index_file_size(); |
1297 | 0 | } else { |
1298 | 0 | VLOG_DEBUG << "Invalid index_file_size for segment_id " << segment_id |
1299 | 0 | << ", index_id " << info.index_id(); |
1300 | 0 | } |
1301 | 0 | } |
1302 | 0 | for (const auto& index : schema_ptr->inverted_indexes()) { |
1303 | 0 | auto idx_path = storage_resource.value()->remote_idx_v1_path( |
1304 | 0 | rs_meta, segment_id, index->index_id(), index->get_index_suffix()); |
1305 | 0 | download_inverted_index(idx_path, index_size_map[index->index_id()]); |
1306 | 0 | } |
1307 | 0 | } else { // InvertedIndexStorageFormatPB::V2 |
1308 | 0 | auto&& inverted_index_info = rs_meta.inverted_index_file_info(segment_id); |
1309 | 0 | int64_t idx_size = 0; |
1310 | 0 | if (inverted_index_info.has_index_size()) { |
1311 | 0 | idx_size = inverted_index_info.index_size(); |
1312 | 0 | } else { |
1313 | 0 | VLOG_DEBUG << "index_size is not set for segment " << segment_id; |
1314 | 0 | } |
1315 | 0 | auto idx_path = |
1316 | 0 | storage_resource.value()->remote_idx_v2_path(rs_meta, segment_id); |
1317 | 0 | download_inverted_index(idx_path, idx_size); |
1318 | 0 | } |
1319 | 0 | } |
1320 | 0 | } |
1321 | 0 | } |
1322 | 0 | if (wait && wait->timed_wait(due_time)) { |
1323 | 0 | g_file_cache_warm_up_rowset_wait_for_compaction_timeout_num << 1; |
1324 | 0 | LOG_WARNING("the time spent warming up {} rowsets exceeded {} ms", |
1325 | 0 | request->rowset_metas().size(), request->sync_wait_timeout_ms()); |
1326 | 0 | } |
1327 | 0 | } |
1328 | | |
1329 | | bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_segment_num( |
1330 | | "file_cache_recycle_cache_finished_segment_num"); |
1331 | | bvar::Adder<uint64_t> g_file_cache_recycle_cache_finished_index_num( |
1332 | | "file_cache_recycle_cache_finished_index_num"); |
1333 | | |
1334 | | void CloudInternalServiceImpl::recycle_cache(google::protobuf::RpcController* controller |
1335 | | [[maybe_unused]], |
1336 | | const PRecycleCacheRequest* request, |
1337 | | PRecycleCacheResponse* response, |
1338 | 0 | google::protobuf::Closure* done) { |
1339 | 0 | brpc::ClosureGuard closure_guard(done); |
1340 | |
|
1341 | 0 | if (!config::enable_file_cache) { |
1342 | 0 | return; |
1343 | 0 | } |
1344 | 0 | for (const auto& meta : request->cache_metas()) { |
1345 | 0 | for (int64_t segment_id = 0; segment_id < meta.num_segments(); segment_id++) { |
1346 | 0 | auto file_key = Segment::file_cache_key(meta.rowset_id(), segment_id); |
1347 | 0 | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
1348 | 0 | file_cache->remove_if_cached_async(file_key); |
1349 | 0 | g_file_cache_recycle_cache_finished_segment_num << 1; |
1350 | 0 | } |
1351 | | |
1352 | | // inverted index |
1353 | 0 | for (const auto& file_name : meta.index_file_names()) { |
1354 | 0 | auto file_key = io::BlockFileCache::hash(file_name); |
1355 | 0 | auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key); |
1356 | 0 | file_cache->remove_if_cached_async(file_key); |
1357 | 0 | g_file_cache_recycle_cache_finished_index_num << 1; |
1358 | 0 | } |
1359 | 0 | } |
1360 | 0 | } |
1361 | | |
1362 | | #include "common/compile_check_avoid_end.h" |
1363 | | } // namespace doris |