be/src/cloud/cloud_tablet.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_tablet.h" |
19 | | |
20 | | #include <bvar/bvar.h> |
21 | | #include <bvar/latency_recorder.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <gen_cpp/olap_file.pb.h> |
24 | | #include <rapidjson/document.h> |
25 | | #include <rapidjson/encodings.h> |
26 | | #include <rapidjson/prettywriter.h> |
27 | | #include <rapidjson/rapidjson.h> |
28 | | #include <rapidjson/stringbuffer.h> |
29 | | |
30 | | #include <algorithm> |
31 | | #include <atomic> |
32 | | #include <chrono> |
33 | | #include <cstdint> |
34 | | #include <memory> |
35 | | #include <ranges> |
36 | | #include <ratio> |
37 | | #include <shared_mutex> |
38 | | #include <unordered_map> |
39 | | #include <vector> |
40 | | |
41 | | #include "cloud/cloud_meta_mgr.h" |
42 | | #include "cloud/cloud_storage_engine.h" |
43 | | #include "cloud/cloud_tablet_mgr.h" |
44 | | #include "cloud/cloud_warm_up_manager.h" |
45 | | #include "cloud/config.h" |
46 | | #include "common/cast_set.h" |
47 | | #include "common/config.h" |
48 | | #include "common/logging.h" |
49 | | #include "cpp/sync_point.h" |
50 | | #include "io/cache/block_file_cache_downloader.h" |
51 | | #include "io/cache/block_file_cache_factory.h" |
52 | | #include "storage/compaction/compaction.h" |
53 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
54 | | #include "storage/index/inverted/inverted_index_desc.h" |
55 | | #include "storage/olap_define.h" |
56 | | #include "storage/rowset/beta_rowset.h" |
57 | | #include "storage/rowset/rowset.h" |
58 | | #include "storage/rowset/rowset_factory.h" |
59 | | #include "storage/rowset/rowset_fwd.h" |
60 | | #include "storage/rowset/rowset_writer.h" |
61 | | #include "storage/storage_policy.h" |
62 | | #include "storage/tablet/base_tablet.h" |
63 | | #include "storage/tablet/tablet_schema.h" |
64 | | #include "storage/txn/txn_manager.h" |
65 | | #include "util/debug_points.h" |
66 | | #include "util/stack_util.h" |
67 | | |
68 | | namespace doris { |
69 | | #include "common/compile_check_begin.h" |
70 | | using namespace ErrorCode; |
71 | | |
72 | | bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms( |
73 | | "cu_compaction_get_delete_bitmap_lock_time_ms"); |
74 | | bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms( |
75 | | "base_compaction_get_delete_bitmap_lock_time_ms"); |
76 | | |
77 | | bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count"); |
78 | | bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes"); |
79 | | |
80 | | bvar::Adder<int64_t> g_capture_prefer_cache_count("capture_prefer_cache_count"); |
81 | | bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count( |
82 | | "capture_with_freshness_tolerance_count"); |
83 | | bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count( |
84 | | "capture_with_freshness_tolerance_fallback_count"); |
85 | | bvar::Adder<int64_t> g_rowset_warmup_state_missing_count("rowset_warmup_state_missing_count"); |
86 | | bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window( |
87 | | "capture_prefer_cache_count_window", &g_capture_prefer_cache_count, 30); |
88 | | bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_count_window( |
89 | | "capture_with_freshness_tolerance_count_window", &g_capture_with_freshness_tolerance_count, |
90 | | 30); |
91 | | bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_count_window( |
92 | | "capture_with_freshness_tolerance_fallback_count_window", |
93 | | &g_capture_with_freshness_tolerance_fallback_count, 30); |
94 | | |
95 | | static constexpr int LOAD_INITIATOR_ID = -1; |
96 | | |
97 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size( |
98 | | "file_cache_cloud_tablet_submitted_segment_size"); |
99 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num( |
100 | | "file_cache_cloud_tablet_submitted_segment_num"); |
101 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size( |
102 | | "file_cache_cloud_tablet_submitted_index_size"); |
103 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num( |
104 | | "file_cache_cloud_tablet_submitted_index_num"); |
105 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size( |
106 | | "file_cache_cloud_tablet_finished_segment_size"); |
107 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num( |
108 | | "file_cache_cloud_tablet_finished_segment_num"); |
109 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size( |
110 | | "file_cache_cloud_tablet_finished_index_size"); |
111 | | bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num( |
112 | | "file_cache_cloud_tablet_finished_index_num"); |
113 | | |
114 | | bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num( |
115 | | "file_cache_recycle_cached_data_segment_num"); |
116 | | bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size( |
117 | | "file_cache_recycle_cached_data_segment_size"); |
118 | | bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num( |
119 | | "file_cache_recycle_cached_data_index_num"); |
120 | | |
121 | | bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num( |
122 | | "file_cache_warm_up_segment_complete_num"); |
123 | | bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num( |
124 | | "file_cache_warm_up_segment_failed_num"); |
125 | | bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_complete_num( |
126 | | "file_cache_warm_up_inverted_idx_complete_num"); |
127 | | bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_failed_num( |
128 | | "file_cache_warm_up_inverted_idx_failed_num"); |
129 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num( |
130 | | "file_cache_warm_up_rowset_complete_num"); |
131 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num( |
132 | | "file_cache_warm_up_rowset_triggered_by_job_num"); |
133 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num( |
134 | | "file_cache_warm_up_rowset_triggered_by_sync_rowset_num"); |
135 | | bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num( |
136 | | "file_cache_warm_up_rowset_triggered_by_event_driven_num"); |
137 | | bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency( |
138 | | "file_cache_warm_up_rowset_all_segments_latency"); |
139 | | |
140 | | CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta) |
141 | 124k | : BaseTablet(std::move(tablet_meta)), _engine(engine) {} |
142 | | |
143 | 25.3k | CloudTablet::~CloudTablet() = default; |
144 | | |
145 | 0 | bool CloudTablet::exceed_version_limit(int32_t limit) { |
146 | 0 | return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit; |
147 | 0 | } |
148 | | |
149 | 12.5k | std::string CloudTablet::tablet_path() const { |
150 | 12.5k | return ""; |
151 | 12.5k | } |
152 | | |
153 | | Status CloudTablet::capture_rs_readers(const Version& spec_version, |
154 | | std::vector<RowSetSplits>* rs_splits, |
155 | 4.82k | const CaptureRowsetOps& opts) { |
156 | 4.82k | DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", { |
157 | 4.82k | LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id()); |
158 | 4.82k | return Status::Error<false>(-230, "injected error"); |
159 | 4.82k | }); |
160 | 4.82k | std::shared_lock rlock(_meta_lock); |
161 | 4.82k | *rs_splits = DORIS_TRY(capture_rs_readers_unlocked( |
162 | 4.82k | spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions})); |
163 | 4.82k | return Status::OK(); |
164 | 4.82k | } |
165 | | |
166 | | [[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked( |
167 | 1.26M | const Version& version_range, const CaptureRowsetOps& options) const { |
168 | 1.26M | if (options.query_freshness_tolerance_ms > 0) { |
169 | 52 | return capture_versions_with_freshness_tolerance(version_range, options); |
170 | 1.26M | } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) { |
171 | 28 | return capture_versions_prefer_cache(version_range); |
172 | 28 | } |
173 | 1.26M | return BaseTablet::capture_consistent_versions_unlocked(version_range, options); |
174 | 1.26M | } |
175 | | |
176 | | Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache( |
177 | 28 | const Version& spec_version) const { |
178 | 28 | g_capture_prefer_cache_count << 1; |
179 | 28 | Versions version_path; |
180 | 28 | std::shared_lock rlock(_meta_lock); |
181 | 28 | auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache( |
182 | 28 | spec_version, version_path, |
183 | 226 | [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); }); |
184 | 28 | if (!st.ok()) { |
185 | 0 | return ResultError(st); |
186 | 0 | } |
187 | 28 | int64_t path_max_version = version_path.back().second; |
188 | 28 | VLOG_DEBUG << fmt::format( |
189 | 0 | "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, " |
190 | 0 | "tablet_id={}, spec_version={}, path_max_version={}", |
191 | 0 | fmt::join(version_path | std::views::transform([](const auto& version) { |
192 | 0 | return fmt::format("{}", version.to_string()); |
193 | 0 | }), |
194 | 0 | ", "), |
195 | 0 | tablet_id(), spec_version.to_string(), path_max_version); |
196 | 28 | return version_path; |
197 | 28 | } |
198 | | |
199 | 586 | bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const { |
200 | 586 | if (start_version > end_version) { |
201 | 0 | return false; |
202 | 0 | } |
203 | 586 | Version version {start_version, end_version}; |
204 | 586 | auto it = _rs_version_map.find(version); |
205 | 586 | if (it == _rs_version_map.end()) { |
206 | 156 | it = _stale_rs_version_map.find(version); |
207 | 156 | if (it == _stale_rs_version_map.end()) { |
208 | 0 | LOG_WARNING( |
209 | 0 | "fail to find Rowset in rs_version or stale_rs_version for version. " |
210 | 0 | "tablet={}, version={}", |
211 | 0 | tablet_id(), version.to_string()); |
212 | 0 | return false; |
213 | 0 | } |
214 | 156 | } |
215 | 586 | const auto& rs = it->second; |
216 | 586 | if (rs->visible_timestamp() < _engine.startup_timepoint()) { |
217 | | // We only care about rowsets that are created after startup time point. For other rowsets, |
218 | | // we assume they are warmed up. |
219 | 32 | return true; |
220 | 32 | } |
221 | 554 | return is_rowset_warmed_up(rs->rowset_id()); |
222 | 586 | }; |
223 | | |
224 | | Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance( |
225 | 52 | const Version& spec_version, const CaptureRowsetOps& options) const { |
226 | 52 | g_capture_with_freshness_tolerance_count << 1; |
227 | 52 | using namespace std::chrono; |
228 | 52 | auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms; |
229 | 52 | auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms); |
230 | | // find a version path where every edge(rowset) has been warmuped |
231 | 52 | Versions version_path; |
232 | 52 | std::shared_lock rlock(_meta_lock); |
233 | 52 | if (enable_unique_key_merge_on_write()) { |
234 | | // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout. |
235 | | // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue. |
236 | 26 | RETURN_IF_ERROR_RESULT( |
237 | 26 | _timestamped_version_tracker.capture_consistent_versions_with_validator_mow( |
238 | 26 | spec_version, version_path, [&](int64_t start, int64_t end) { |
239 | 26 | return rowset_is_warmed_up_unlocked(start, end); |
240 | 26 | })); |
241 | 26 | } else { |
242 | 26 | RETURN_IF_ERROR_RESULT( |
243 | 26 | _timestamped_version_tracker.capture_consistent_versions_with_validator( |
244 | 26 | spec_version, version_path, [&](int64_t start, int64_t end) { |
245 | 26 | return rowset_is_warmed_up_unlocked(start, end); |
246 | 26 | })); |
247 | 26 | } |
248 | 52 | int64_t path_max_version = version_path.back().second; |
249 | | // use std::views::concat after C++26 |
250 | 918 | auto check_fn = [this, path_max_version, freshness_limit_tp](const auto& rs_meta) { |
251 | 918 | return _check_rowset_should_be_visible_but_not_warmed_up(rs_meta, path_max_version, |
252 | 918 | freshness_limit_tp); |
253 | 918 | }; |
254 | 52 | bool should_fallback = |
255 | 52 | std::ranges::any_of(std::views::values(_tablet_meta->all_rs_metas()), check_fn) || |
256 | 52 | std::ranges::any_of(std::views::values(_tablet_meta->all_stale_rs_metas()), check_fn); |
257 | 52 | if (should_fallback) { |
258 | 10 | rlock.unlock(); |
259 | 10 | g_capture_with_freshness_tolerance_fallback_count << 1; |
260 | | // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version |
261 | | // but has not been warmuped up yet, fallback to capture rowsets as usual |
262 | 10 | return BaseTablet::capture_consistent_versions_unlocked(spec_version, options); |
263 | 10 | } |
264 | 42 | VLOG_DEBUG << fmt::format( |
265 | 0 | "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, " |
266 | 0 | "tablet_id={}, spec_version={}, path_max_version={}", |
267 | 0 | fmt::join(version_path | std::views::transform([](const auto& version) { |
268 | 0 | return fmt::format("{}", version.to_string()); |
269 | 0 | }), |
270 | 0 | ", "), |
271 | 0 | tablet_id(), spec_version.to_string(), path_max_version); |
272 | 42 | return version_path; |
273 | 52 | } |
274 | | |
275 | | // There are only two tablet_states RUNNING and NOT_READY in cloud mode |
276 | | // This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS. |
277 | 1.31M | Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* stats) { |
278 | 1.31M | RETURN_IF_ERROR(sync_if_not_running(stats)); |
279 | | |
280 | 1.31M | if (options.query_version > 0) { |
281 | 1.25M | auto lock_start = std::chrono::steady_clock::now(); |
282 | 1.25M | std::shared_lock rlock(_meta_lock); |
283 | 1.25M | if (stats) { |
284 | 1.25M | stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>( |
285 | 1.25M | std::chrono::steady_clock::now() - lock_start) |
286 | 1.25M | .count(); |
287 | 1.25M | } |
288 | 1.26M | if (_max_version >= options.query_version) { |
289 | 1.26M | return Status::OK(); |
290 | 1.26M | } |
291 | 1.25M | } |
292 | | |
293 | | // serially execute sync to reduce unnecessary network overhead |
294 | 50.7k | auto sync_lock_start = std::chrono::steady_clock::now(); |
295 | 50.7k | std::unique_lock lock(_sync_meta_lock); |
296 | 50.7k | if (stats) { |
297 | 2.37k | stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>( |
298 | 2.37k | std::chrono::steady_clock::now() - sync_lock_start) |
299 | 2.37k | .count(); |
300 | 2.37k | } |
301 | 50.7k | if (options.query_version > 0) { |
302 | 2.37k | auto lock_start = std::chrono::steady_clock::now(); |
303 | 2.37k | std::shared_lock rlock(_meta_lock); |
304 | 2.37k | if (stats) { |
305 | 2.37k | stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>( |
306 | 2.37k | std::chrono::steady_clock::now() - lock_start) |
307 | 2.37k | .count(); |
308 | 2.37k | } |
309 | 2.37k | if (_max_version >= options.query_version) { |
310 | 269 | return Status::OK(); |
311 | 269 | } |
312 | 2.37k | } |
313 | | |
314 | 50.5k | auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, options, stats); |
315 | 50.5k | if (st.is<ErrorCode::NOT_FOUND>()) { |
316 | 0 | clear_cache(); |
317 | 0 | } |
318 | | |
319 | 50.5k | return st; |
320 | 50.7k | } |
321 | | |
322 | | // Sync tablet meta and all rowset meta if not running. |
323 | | // This could happen when BE didn't finish schema change job and another BE committed this schema change job. |
324 | | // It should be a quite rare situation. |
325 | 1.32M | Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) { |
326 | 1.32M | if (tablet_state() == TABLET_RUNNING) { |
327 | 1.32M | return Status::OK(); |
328 | 1.32M | } |
329 | | |
330 | | // Serially execute sync to reduce unnecessary network overhead |
331 | 18.4E | auto sync_lock_start = std::chrono::steady_clock::now(); |
332 | 18.4E | std::unique_lock lock(_sync_meta_lock); |
333 | 18.4E | if (stats) { |
334 | 0 | stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>( |
335 | 0 | std::chrono::steady_clock::now() - sync_lock_start) |
336 | 0 | .count(); |
337 | 0 | } |
338 | | |
339 | 18.4E | { |
340 | 18.4E | auto lock_start = std::chrono::steady_clock::now(); |
341 | 18.4E | std::shared_lock rlock(_meta_lock); |
342 | 18.4E | if (stats) { |
343 | 0 | stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>( |
344 | 0 | std::chrono::steady_clock::now() - lock_start) |
345 | 0 | .count(); |
346 | 0 | } |
347 | 18.4E | if (tablet_state() == TABLET_RUNNING) { |
348 | 0 | return Status::OK(); |
349 | 0 | } |
350 | 18.4E | } |
351 | | |
352 | 18.4E | TabletMetaSharedPtr tablet_meta; |
353 | 18.4E | auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); |
354 | 18.4E | if (!st.ok()) { |
355 | 0 | if (st.is<ErrorCode::NOT_FOUND>()) { |
356 | 0 | clear_cache(); |
357 | 0 | } |
358 | 0 | return st; |
359 | 0 | } |
360 | | |
361 | 18.4E | if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] { |
362 | | // MoW may go to here when load while schema change |
363 | 101 | return Status::OK(); |
364 | 101 | } |
365 | | |
366 | 18.4E | TimestampedVersionTracker empty_tracker; |
367 | 18.4E | { |
368 | 18.4E | auto lock_start = std::chrono::steady_clock::now(); |
369 | 18.4E | std::lock_guard wlock(_meta_lock); |
370 | 18.4E | if (stats) { |
371 | 0 | stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>( |
372 | 0 | std::chrono::steady_clock::now() - lock_start) |
373 | 0 | .count(); |
374 | 0 | } |
375 | 18.4E | RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING)); |
376 | 18.4E | _rs_version_map.clear(); |
377 | 18.4E | _stale_rs_version_map.clear(); |
378 | 18.4E | std::swap(_timestamped_version_tracker, empty_tracker); |
379 | 18.4E | _tablet_meta->clear_rowsets(); |
380 | 18.4E | _tablet_meta->clear_stale_rowset(); |
381 | 18.4E | _max_version = -1; |
382 | 18.4E | } |
383 | | |
384 | 0 | st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, {}, stats); |
385 | 18.4E | if (st.is<ErrorCode::NOT_FOUND>()) { |
386 | 0 | clear_cache(); |
387 | 0 | } |
388 | 18.4E | return st; |
389 | 18.4E | } |
390 | | |
391 | | void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap, |
392 | | std::unique_lock<std::shared_mutex>& meta_lock, |
393 | 323k | bool warmup_delta_data) { |
394 | 323k | if (to_add.empty()) { |
395 | 4.73k | return; |
396 | 4.73k | } |
397 | | |
398 | 18.4E | VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace(); |
399 | | |
400 | 319k | if (!version_overlap) { |
401 | 312k | _add_rowsets_directly(to_add, warmup_delta_data); |
402 | 312k | return; |
403 | 312k | } |
404 | | |
405 | | // Filter out existed rowsets |
406 | 6.51k | auto remove_it = |
407 | 11.8k | std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) { |
408 | 11.8k | if (auto find_it = _rs_version_map.find(rs->version()); |
409 | 11.8k | find_it == _rs_version_map.end()) { |
410 | 10.8k | return false; |
411 | 10.8k | } else if (find_it->second->rowset_id() == rs->rowset_id()) { |
412 | 58 | return true; // Same rowset |
413 | 58 | } |
414 | | |
415 | | // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal, |
416 | | // replace existed rowset with `to_add` rowset. This may occur when: |
417 | | // 1. schema change converts rowsets which have been double written to new tablet |
418 | | // 2. cumu compaction picks single overlapping input rowset to perform compaction |
419 | | |
420 | | // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data |
421 | | |
422 | 867 | std::vector<RowsetSharedPtr> unused_rowsets; |
423 | 867 | if (auto find_it = _rs_version_map.find(rs->version()); |
424 | 891 | find_it != _rs_version_map.end()) { |
425 | 891 | if (find_it->second->rowset_id() == rs->rowset_id()) { |
426 | 0 | LOG(WARNING) << "tablet_id=" << tablet_id() |
427 | 0 | << ", rowset_id=" << rs->rowset_id().to_string() |
428 | 0 | << ", existed rowset_id=" |
429 | 0 | << find_it->second->rowset_id().to_string(); |
430 | 0 | DCHECK(find_it->second->rowset_id() != rs->rowset_id()) |
431 | 0 | << "tablet_id=" << tablet_id() |
432 | 0 | << ", rowset_id=" << rs->rowset_id().to_string() |
433 | 0 | << ", existed rowset_id=" |
434 | 0 | << find_it->second->rowset_id().to_string(); |
435 | 0 | } |
436 | 891 | unused_rowsets.push_back(find_it->second); |
437 | 891 | } |
438 | 867 | add_unused_rowsets(unused_rowsets); |
439 | | |
440 | 867 | _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr); |
441 | 867 | _rs_version_map[rs->version()] = rs; |
442 | 867 | _tablet_meta->add_rowsets_unchecked({rs}); |
443 | 867 | update_base_size(*rs); |
444 | 867 | return true; |
445 | 11.8k | }); |
446 | | |
447 | 6.51k | to_add.erase(remove_it, to_add.end()); |
448 | | |
449 | | // delete rowsets with overlapped version |
450 | 6.51k | std::vector<RowsetSharedPtr> to_add_directly; |
451 | 10.8k | for (auto& to_add_rs : to_add) { |
452 | | // delete rowsets with overlapped version |
453 | 10.8k | std::vector<RowsetSharedPtr> to_delete; |
454 | 10.8k | Version to_add_v = to_add_rs->version(); |
455 | | // if start_version > max_version, we can skip checking overlap here. |
456 | 10.8k | if (to_add_v.first > _max_version) { |
457 | | // if start_version > max_version, we can skip checking overlap here. |
458 | 10.8k | to_add_directly.push_back(to_add_rs); |
459 | 10.8k | } else { |
460 | 40 | to_add_directly.push_back(to_add_rs); |
461 | 256 | for (auto& [v, rs] : _rs_version_map) { |
462 | 256 | if (to_add_v.contains(v)) { |
463 | 0 | to_delete.push_back(rs); |
464 | 0 | } |
465 | 256 | } |
466 | 40 | delete_rowsets(to_delete, meta_lock); |
467 | 40 | } |
468 | 10.8k | } |
469 | | |
470 | 6.51k | _add_rowsets_directly(to_add_directly, warmup_delta_data); |
471 | 6.51k | } |
472 | | |
473 | | void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, |
474 | 7.27k | std::unique_lock<std::shared_mutex>&) { |
475 | 7.27k | if (to_delete.empty()) { |
476 | 37 | return; |
477 | 37 | } |
478 | 7.23k | std::vector<RowsetMetaSharedPtr> rs_metas; |
479 | 7.23k | rs_metas.reserve(to_delete.size()); |
480 | 7.23k | int64_t now = ::time(nullptr); |
481 | 61.7k | for (auto&& rs : to_delete) { |
482 | 61.7k | rs->rowset_meta()->set_stale_at(now); |
483 | 61.7k | rs_metas.push_back(rs->rowset_meta()); |
484 | 61.7k | _stale_rs_version_map[rs->version()] = rs; |
485 | 61.7k | } |
486 | 7.23k | _timestamped_version_tracker.add_stale_path_version(rs_metas); |
487 | 61.7k | for (auto&& rs : to_delete) { |
488 | 61.7k | _rs_version_map.erase(rs->version()); |
489 | 61.7k | } |
490 | | |
491 | 7.23k | _tablet_meta->modify_rs_metas({}, rs_metas, false); |
492 | 7.23k | } |
493 | | |
494 | 5.54k | uint64_t CloudTablet::delete_expired_stale_rowsets() { |
495 | 5.54k | if (config::enable_mow_verbose_log) { |
496 | 0 | LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id()); |
497 | 0 | } |
498 | 5.54k | std::vector<RowsetSharedPtr> expired_rowsets; |
499 | | // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2. |
500 | 5.54k | std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets; |
501 | 5.54k | int64_t expired_stale_sweep_endtime = |
502 | 5.54k | ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec; |
503 | 5.54k | { |
504 | 5.54k | std::unique_lock wlock(_meta_lock); |
505 | | |
506 | 5.54k | std::vector<int64_t> path_ids; |
507 | | // capture the path version to delete |
508 | 5.54k | _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids); |
509 | | |
510 | 5.54k | if (path_ids.empty()) { |
511 | 2.72k | return 0; |
512 | 2.72k | } |
513 | | |
514 | 6.94k | for (int64_t path_id : path_ids) { |
515 | 6.94k | int64_t start_version = -1; |
516 | 6.94k | int64_t end_version = -1; |
517 | 6.94k | std::vector<RowsetSharedPtr> stale_rowsets; |
518 | | // delete stale versions in version graph |
519 | 6.94k | auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id); |
520 | 59.8k | for (auto& v_ts : version_path->timestamped_versions()) { |
521 | 59.8k | auto rs_it = _stale_rs_version_map.find(v_ts->version()); |
522 | 59.8k | if (rs_it != _stale_rs_version_map.end()) { |
523 | 59.8k | expired_rowsets.push_back(rs_it->second); |
524 | 59.8k | stale_rowsets.push_back(rs_it->second); |
525 | 59.8k | VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id() |
526 | 0 | << " rowset_id=" << rs_it->second->rowset_id().to_string() |
527 | 0 | << " version=" << rs_it->first.to_string(); |
528 | 59.8k | _stale_rs_version_map.erase(rs_it); |
529 | 59.8k | } else { |
530 | 0 | LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet " |
531 | 0 | << tablet_id(); |
532 | | // clang-format off |
533 | 0 | DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }(); |
534 | | // clang-format on |
535 | 0 | } |
536 | 59.8k | if (start_version < 0) { |
537 | 6.94k | start_version = v_ts->version().first; |
538 | 6.94k | } |
539 | 59.8k | end_version = v_ts->version().second; |
540 | 59.8k | _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version()); |
541 | 59.8k | } |
542 | 6.94k | Version version(start_version, end_version); |
543 | 6.94k | if (!stale_rowsets.empty()) { |
544 | 6.94k | deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets)); |
545 | 6.94k | } |
546 | 6.94k | } |
547 | 2.81k | _reconstruct_version_tracker_if_necessary(); |
548 | 2.81k | } |
549 | | |
550 | | // if the rowset is not used by any query, we can recycle its cached data early. |
551 | 0 | auto recycled_rowsets = recycle_cached_data(expired_rowsets); |
552 | 2.81k | if (!recycled_rowsets.empty()) { |
553 | 2.81k | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
554 | 2.81k | manager.recycle_cache(tablet_id(), recycled_rowsets); |
555 | 2.81k | } |
556 | 2.81k | if (config::enable_mow_verbose_log) { |
557 | 0 | LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id()); |
558 | 0 | } |
559 | | |
560 | 2.81k | add_unused_rowsets(expired_rowsets); |
561 | 2.81k | if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS && |
562 | 2.81k | enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) { |
563 | | // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges |
564 | 578 | OlapStopWatch watch; |
565 | 3.56k | for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) { |
566 | | // agg delete bitmap for pre rowset |
567 | 3.56k | DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges; |
568 | 3.56k | agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges); |
569 | | // add remove delete bitmap |
570 | 3.56k | if (!remove_delete_bitmap_key_ranges.empty()) { |
571 | 1.24k | std::vector<RowsetId> rowset_ids; |
572 | 14.3k | for (const auto& rs : unused_rowsets) { |
573 | 14.3k | rowset_ids.push_back(rs->rowset_id()); |
574 | 14.3k | } |
575 | 1.24k | std::lock_guard<std::mutex> lock(_gc_mutex); |
576 | 1.24k | _unused_delete_bitmap.push_back( |
577 | 1.24k | std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges)); |
578 | 1.24k | } |
579 | 3.56k | } |
580 | 578 | LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id() |
581 | 578 | << ", size=" << deleted_stale_rowsets.size() |
582 | 578 | << ", cost(us)=" << watch.get_elapse_time_us(); |
583 | 578 | } |
584 | 2.81k | return expired_rowsets.size(); |
585 | 5.54k | } |
586 | | |
587 | 800k | bool CloudTablet::need_remove_unused_rowsets() { |
588 | 800k | std::lock_guard<std::mutex> lock(_gc_mutex); |
589 | 800k | return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty(); |
590 | 800k | } |
591 | | |
592 | 3.70k | void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) { |
593 | 3.70k | std::lock_guard<std::mutex> lock(_gc_mutex); |
594 | 60.7k | for (const auto& rowset : rowsets) { |
595 | 60.7k | _unused_rowsets[rowset->rowset_id()] = rowset; |
596 | 60.7k | g_unused_rowsets_bytes << rowset->total_disk_size(); |
597 | 60.7k | } |
598 | 3.70k | g_unused_rowsets_count << rowsets.size(); |
599 | 3.70k | } |
600 | | |
601 | 2.97k | void CloudTablet::remove_unused_rowsets() { |
602 | 2.97k | std::vector<std::shared_ptr<Rowset>> removed_rowsets; |
603 | 2.97k | int64_t removed_delete_bitmap_num = 0; |
604 | 2.97k | OlapStopWatch watch; |
605 | 2.97k | { |
606 | 2.97k | std::lock_guard<std::mutex> lock(_gc_mutex); |
607 | | // 1. remove unused rowsets's cache data and delete bitmap |
608 | 63.6k | for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) { |
609 | 60.7k | auto& rs = it->second; |
610 | 60.7k | if (rs.use_count() > 1) { |
611 | 0 | LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id() |
612 | 0 | << " has " << rs.use_count() << " references, it cannot be removed"; |
613 | 0 | ++it; |
614 | 0 | continue; |
615 | 0 | } |
616 | 60.7k | tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version()); |
617 | 60.7k | _rowset_warm_up_states.erase(rs->rowset_id()); |
618 | 60.7k | rs->clear_cache(); |
619 | 60.7k | g_unused_rowsets_count << -1; |
620 | 60.7k | g_unused_rowsets_bytes << -rs->total_disk_size(); |
621 | 60.7k | removed_rowsets.push_back(std::move(rs)); |
622 | 60.7k | it = _unused_rowsets.erase(it); |
623 | 60.7k | } |
624 | 2.97k | } |
625 | | |
626 | 2.97k | { |
627 | 2.97k | std::vector<RecycledRowsets> recycled_rowsets; |
628 | | |
629 | 60.7k | for (auto& rs : removed_rowsets) { |
630 | 60.7k | auto index_names = rs->get_index_file_names(); |
631 | 60.7k | recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names); |
632 | 60.7k | int64_t segment_size_sum = 0; |
633 | 84.7k | for (int32_t i = 0; i < rs->num_segments(); i++) { |
634 | 24.0k | segment_size_sum += rs->rowset_meta()->segment_file_size(i); |
635 | 24.0k | } |
636 | 60.7k | g_file_cache_recycle_cached_data_segment_num << rs->num_segments(); |
637 | 60.7k | g_file_cache_recycle_cached_data_segment_size << segment_size_sum; |
638 | 60.7k | g_file_cache_recycle_cached_data_index_num << index_names.size(); |
639 | 60.7k | } |
640 | | |
641 | 2.97k | if (recycled_rowsets.size() > 0) { |
642 | 2.97k | auto& manager = |
643 | 2.97k | ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
644 | 2.97k | manager.recycle_cache(tablet_id(), recycled_rowsets); |
645 | 2.97k | } |
646 | 2.97k | } |
647 | | |
648 | 2.97k | { |
649 | 2.97k | std::lock_guard<std::mutex> lock(_gc_mutex); |
650 | | // 2. remove delete bitmap of pre rowsets |
651 | 4.21k | for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) { |
652 | 1.24k | auto& rowset_ids = std::get<0>(*it); |
653 | 1.24k | bool find_unused_rowset = false; |
654 | 14.3k | for (const auto& rowset_id : rowset_ids) { |
655 | 14.3k | if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) { |
656 | 0 | LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use" |
657 | 0 | << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id; |
658 | 0 | find_unused_rowset = true; |
659 | 0 | break; |
660 | 0 | } |
661 | 14.3k | } |
662 | 1.24k | if (find_unused_rowset) { |
663 | 0 | ++it; |
664 | 0 | continue; |
665 | 0 | } |
666 | 1.24k | auto& key_ranges = std::get<1>(*it); |
667 | 1.24k | tablet_meta()->delete_bitmap().remove(key_ranges); |
668 | 1.24k | it = _unused_delete_bitmap.erase(it); |
669 | 1.24k | removed_delete_bitmap_num++; |
670 | | // TODO(kaijie): recycle cache for unused delete bitmap |
671 | 1.24k | } |
672 | 2.97k | } |
673 | | |
674 | 2.97k | LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size() |
675 | 2.97k | << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size() |
676 | 2.97k | << ", removed_rowsets_num=" << removed_rowsets.size() |
677 | 2.97k | << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num |
678 | 2.97k | << ", cost(us)=" << watch.get_elapse_time_us(); |
679 | 2.97k | } |
680 | | |
681 | 339k | void CloudTablet::update_base_size(const Rowset& rs) { |
682 | | // Define base rowset as the rowset of version [2-x] |
683 | 339k | if (rs.start_version() == 2) { |
684 | 114k | _base_size = rs.total_disk_size(); |
685 | 114k | } |
686 | 339k | } |
687 | | |
688 | 571 | void CloudTablet::clear_cache() { |
689 | 571 | auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true)); |
690 | 571 | if (!recycled_rowsets.empty()) { |
691 | 571 | auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager(); |
692 | 571 | manager.recycle_cache(tablet_id(), recycled_rowsets); |
693 | 571 | } |
694 | 571 | _engine.tablet_mgr().erase_tablet(tablet_id()); |
695 | 571 | } |
696 | | |
697 | | std::vector<RecycledRowsets> CloudTablet::recycle_cached_data( |
698 | 3.38k | const std::vector<RowsetSharedPtr>& rowsets) { |
699 | 3.38k | std::vector<RecycledRowsets> recycled_rowsets; |
700 | 65.8k | for (const auto& rs : rowsets) { |
701 | | // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2. |
702 | 65.8k | if (rs.use_count() > 2) { |
703 | 4.58k | LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count() |
704 | 4.58k | << " references. File Cache won't be recycled when query is using it."; |
705 | 4.58k | continue; |
706 | 4.58k | } |
707 | 61.3k | rs->clear_cache(); |
708 | 61.3k | auto index_names = rs->get_index_file_names(); |
709 | 61.3k | recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names); |
710 | | |
711 | 61.3k | int64_t segment_size_sum = 0; |
712 | 85.6k | for (int32_t i = 0; i < rs->num_segments(); i++) { |
713 | 24.3k | segment_size_sum += rs->rowset_meta()->segment_file_size(i); |
714 | 24.3k | } |
715 | 61.3k | g_file_cache_recycle_cached_data_segment_num << rs->num_segments(); |
716 | 61.3k | g_file_cache_recycle_cached_data_segment_size << segment_size_sum; |
717 | 61.3k | g_file_cache_recycle_cached_data_index_num << index_names.size(); |
718 | 61.3k | } |
719 | 3.38k | return recycled_rowsets; |
720 | 3.38k | } |
721 | | |
722 | | void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments, |
723 | 216k | int64_t num_rows, int64_t data_size) { |
724 | 216k | _approximate_num_segments.store(num_segments, std::memory_order_relaxed); |
725 | 216k | _approximate_num_rows.store(num_rows, std::memory_order_relaxed); |
726 | 216k | _approximate_data_size.store(data_size, std::memory_order_relaxed); |
727 | 216k | int64_t cumu_num_deltas = 0; |
728 | 216k | int64_t cumu_num_rowsets = 0; |
729 | 216k | auto cp = _cumulative_point.load(std::memory_order_relaxed); |
730 | 473k | for (auto& [v, r] : _rs_version_map) { |
731 | 473k | if (v.second < cp) { |
732 | 230k | continue; |
733 | 230k | } |
734 | 243k | cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1; |
735 | 243k | ++cumu_num_rowsets; |
736 | 243k | } |
737 | | // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets |
738 | | // in the version map, so we use the max value to ensure that the approximate number |
739 | | // of rowsets is at least the size of _rs_version_map. |
740 | | // Note that this is not the exact number of rowsets, but an approximate number. |
741 | 216k | int64_t approximate_num_rowsets = |
742 | 216k | std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size())); |
743 | 216k | _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed); |
744 | 216k | _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed); |
745 | 216k | _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed); |
746 | 216k | } |
747 | | |
748 | | Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer( |
749 | 200k | RowsetWriterContext& context, bool vertical) { |
750 | 200k | context.rowset_id = _engine.next_rowset_id(); |
751 | | // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly |
752 | 200k | context.tablet_id = tablet_id(); |
753 | 200k | context.index_id = index_id(); |
754 | 200k | context.partition_id = partition_id(); |
755 | 200k | context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write(); |
756 | 200k | context.encrypt_algorithm = tablet_meta()->encryption_algorithm(); |
757 | 200k | return RowsetFactory::create_rowset_writer(_engine, context, vertical); |
758 | 200k | } |
759 | | |
760 | | // create a rowset writer with rowset_id and seg_id |
761 | | // after writer, merge this transient rowset with original rowset |
762 | | Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer( |
763 | | const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info, |
764 | 3.28k | int64_t txn_expiration) { |
765 | 3.28k | if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE && |
766 | 3.28k | rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] { |
767 | 0 | auto msg = fmt::format( |
768 | 0 | "wrong rowset state when create_transient_rowset_writer, rowset state should be " |
769 | 0 | "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}", |
770 | 0 | RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(), |
771 | 0 | tablet_id()); |
772 | | // see `CloudRowsetWriter::build` for detail. |
773 | | // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED |
774 | | // in `RowsetMeta::merge_rowset_meta()` in previous trials. |
775 | 0 | LOG(WARNING) << msg; |
776 | 0 | DCHECK(false) << msg; |
777 | 0 | } |
778 | 3.28k | RowsetWriterContext context; |
779 | 3.28k | context.rowset_state = PREPARED; |
780 | 3.28k | context.segments_overlap = OVERLAPPING; |
781 | | // During a partial update, the extracted columns of a variant should not be included in the tablet schema. |
782 | | // This is because the partial update for a variant needs to ignore the extracted columns. |
783 | | // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update, |
784 | | // the complete variant is constructed by reading all the sub-columns of the variant. |
785 | 3.28k | context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns(); |
786 | 3.28k | context.newest_write_timestamp = UnixSeconds(); |
787 | 3.28k | context.tablet_id = table_id(); |
788 | 3.28k | context.enable_segcompaction = false; |
789 | 3.28k | context.write_type = DataWriteType::TYPE_DIRECT; |
790 | 3.28k | context.partial_update_info = std::move(partial_update_info); |
791 | 3.28k | context.is_transient_rowset_writer = true; |
792 | 3.28k | context.rowset_id = rowset.rowset_id(); |
793 | 3.28k | context.tablet_id = tablet_id(); |
794 | 3.28k | context.index_id = index_id(); |
795 | 3.28k | context.partition_id = partition_id(); |
796 | 3.28k | context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write(); |
797 | 3.28k | context.txn_expiration = txn_expiration; |
798 | 3.28k | context.encrypt_algorithm = tablet_meta()->encryption_algorithm(); |
799 | | // TODO(liaoxin) enable packed file for transient rowset |
800 | 3.28k | context.allow_packed_file = false; |
801 | | |
802 | 3.28k | auto storage_resource = rowset.rowset_meta()->remote_storage_resource(); |
803 | 3.28k | if (!storage_resource) { |
804 | 0 | return ResultError(std::move(storage_resource.error())); |
805 | 0 | } |
806 | | |
807 | 3.28k | context.storage_resource = *storage_resource.value(); |
808 | | |
809 | 3.28k | return RowsetFactory::create_rowset_writer(_engine, context, false) |
810 | 3.30k | .transform([&](auto&& writer) { |
811 | 3.30k | writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments())); |
812 | 3.30k | return writer; |
813 | 3.30k | }); |
814 | 3.28k | } |
815 | | |
816 | 80.2M | int64_t CloudTablet::get_cloud_base_compaction_score() const { |
817 | 80.2M | if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) { |
818 | 36.2k | bool has_delete = false; |
819 | 36.2k | int64_t point = cumulative_layer_point(); |
820 | 36.2k | std::shared_lock<std::shared_mutex> rlock(_meta_lock); |
821 | 363k | for (const auto& [_, rs_meta] : _tablet_meta->all_rs_metas()) { |
822 | 363k | if (rs_meta->start_version() >= point) { |
823 | 327k | continue; |
824 | 327k | } |
825 | 36.2k | if (rs_meta->has_delete_predicate()) { |
826 | 0 | has_delete = true; |
827 | 0 | break; |
828 | 0 | } |
829 | 36.2k | } |
830 | 36.2k | if (!has_delete) { |
831 | 36.2k | return 0; |
832 | 36.2k | } |
833 | 36.2k | } |
834 | | |
835 | 80.2M | return _approximate_num_rowsets.load(std::memory_order_relaxed) - |
836 | 80.2M | _approximate_cumu_num_rowsets.load(std::memory_order_relaxed); |
837 | 80.2M | } |
838 | | |
839 | 719M | int64_t CloudTablet::get_cloud_cumu_compaction_score() const { |
840 | | // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets, |
841 | | // number of tablet versions simultaneously. |
842 | 719M | return _approximate_cumu_num_deltas.load(std::memory_order_relaxed); |
843 | 719M | } |
844 | | |
845 | | // return a json string to show the compaction status of this tablet |
846 | 2.67k | void CloudTablet::get_compaction_status(std::string* json_result) { |
847 | 2.67k | rapidjson::Document root; |
848 | 2.67k | root.SetObject(); |
849 | | |
850 | 2.67k | rapidjson::Document path_arr; |
851 | 2.67k | path_arr.SetArray(); |
852 | | |
853 | 2.67k | std::vector<RowsetSharedPtr> rowsets; |
854 | 2.67k | std::vector<RowsetSharedPtr> stale_rowsets; |
855 | 2.67k | { |
856 | 2.67k | std::shared_lock rdlock(_meta_lock); |
857 | 2.67k | rowsets.reserve(_rs_version_map.size()); |
858 | 12.8k | for (auto& it : _rs_version_map) { |
859 | 12.8k | rowsets.push_back(it.second); |
860 | 12.8k | } |
861 | 2.67k | stale_rowsets.reserve(_stale_rs_version_map.size()); |
862 | 12.7k | for (auto& it : _stale_rs_version_map) { |
863 | 12.7k | stale_rowsets.push_back(it.second); |
864 | 12.7k | } |
865 | 2.67k | } |
866 | 2.67k | std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator); |
867 | 2.67k | std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator); |
868 | | |
869 | | // get snapshot version path json_doc |
870 | 2.67k | _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr); |
871 | 2.67k | root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator()); |
872 | 2.67k | rapidjson::Value cumu_value; |
873 | 2.67k | std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load()); |
874 | 2.67k | cumu_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
875 | 2.67k | root.GetAllocator()); |
876 | 2.67k | root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator()); |
877 | 2.67k | rapidjson::Value base_value; |
878 | 2.67k | format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load()); |
879 | 2.67k | base_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
880 | 2.67k | root.GetAllocator()); |
881 | 2.67k | root.AddMember("last base failure time", base_value, root.GetAllocator()); |
882 | 2.67k | rapidjson::Value full_value; |
883 | 2.67k | format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load()); |
884 | 2.67k | full_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
885 | 2.67k | root.GetAllocator()); |
886 | 2.67k | root.AddMember("last full failure time", full_value, root.GetAllocator()); |
887 | 2.67k | rapidjson::Value cumu_success_value; |
888 | 2.67k | format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load()); |
889 | 2.67k | cumu_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
890 | 2.67k | root.GetAllocator()); |
891 | 2.67k | root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator()); |
892 | 2.67k | rapidjson::Value base_success_value; |
893 | 2.67k | format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load()); |
894 | 2.67k | base_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
895 | 2.67k | root.GetAllocator()); |
896 | 2.67k | root.AddMember("last base success time", base_success_value, root.GetAllocator()); |
897 | 2.67k | rapidjson::Value full_success_value; |
898 | 2.67k | format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load()); |
899 | 2.67k | full_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
900 | 2.67k | root.GetAllocator()); |
901 | 2.67k | root.AddMember("last full success time", full_success_value, root.GetAllocator()); |
902 | 2.67k | rapidjson::Value cumu_schedule_value; |
903 | 2.67k | format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load()); |
904 | 2.67k | cumu_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
905 | 2.67k | root.GetAllocator()); |
906 | 2.67k | root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator()); |
907 | 2.67k | rapidjson::Value base_schedule_value; |
908 | 2.67k | format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load()); |
909 | 2.67k | base_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
910 | 2.67k | root.GetAllocator()); |
911 | 2.67k | root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator()); |
912 | 2.67k | rapidjson::Value full_schedule_value; |
913 | 2.67k | format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load()); |
914 | 2.67k | full_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()), |
915 | 2.67k | root.GetAllocator()); |
916 | 2.67k | root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator()); |
917 | 2.67k | rapidjson::Value cumu_compaction_status_value; |
918 | 2.67k | cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(), |
919 | 2.67k | cast_set<uint>(_last_cumu_compaction_status.length()), |
920 | 2.67k | root.GetAllocator()); |
921 | 2.67k | root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator()); |
922 | 2.67k | rapidjson::Value base_compaction_status_value; |
923 | 2.67k | base_compaction_status_value.SetString(_last_base_compaction_status.c_str(), |
924 | 2.67k | cast_set<uint>(_last_base_compaction_status.length()), |
925 | 2.67k | root.GetAllocator()); |
926 | 2.67k | root.AddMember("last base status", base_compaction_status_value, root.GetAllocator()); |
927 | 2.67k | rapidjson::Value full_compaction_status_value; |
928 | 2.67k | full_compaction_status_value.SetString(_last_full_compaction_status.c_str(), |
929 | 2.67k | cast_set<uint>(_last_full_compaction_status.length()), |
930 | 2.67k | root.GetAllocator()); |
931 | 2.67k | root.AddMember("last full status", full_compaction_status_value, root.GetAllocator()); |
932 | 2.67k | rapidjson::Value exec_compaction_time; |
933 | 2.67k | std::string num_str {std::to_string(exec_compaction_time_us.load())}; |
934 | 2.67k | exec_compaction_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()), |
935 | 2.67k | root.GetAllocator()); |
936 | 2.67k | root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator()); |
937 | 2.67k | rapidjson::Value local_read_time; |
938 | 2.67k | num_str = std::to_string(local_read_time_us.load()); |
939 | 2.67k | local_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()), |
940 | 2.67k | root.GetAllocator()); |
941 | 2.67k | root.AddMember("compaction local read time us", local_read_time, root.GetAllocator()); |
942 | 2.67k | rapidjson::Value remote_read_time; |
943 | 2.67k | num_str = std::to_string(remote_read_time_us.load()); |
944 | 2.67k | remote_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()), |
945 | 2.67k | root.GetAllocator()); |
946 | 2.67k | root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator()); |
947 | | |
948 | | // print all rowsets' version as an array |
949 | 2.67k | rapidjson::Document versions_arr; |
950 | 2.67k | rapidjson::Document missing_versions_arr; |
951 | 2.67k | versions_arr.SetArray(); |
952 | 2.67k | missing_versions_arr.SetArray(); |
953 | 2.67k | int64_t last_version = -1; |
954 | 12.8k | for (auto& rowset : rowsets) { |
955 | 12.8k | const Version& ver = rowset->version(); |
956 | 12.8k | if (ver.first != last_version + 1) { |
957 | 0 | rapidjson::Value miss_value; |
958 | 0 | miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(), |
959 | 0 | missing_versions_arr.GetAllocator()); |
960 | 0 | missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator()); |
961 | 0 | } |
962 | 12.8k | rapidjson::Value value; |
963 | 12.8k | std::string version_str = rowset->get_rowset_info_str(); |
964 | 12.8k | value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()), |
965 | 12.8k | versions_arr.GetAllocator()); |
966 | 12.8k | versions_arr.PushBack(value, versions_arr.GetAllocator()); |
967 | 12.8k | last_version = ver.second; |
968 | 12.8k | } |
969 | 2.67k | root.AddMember("rowsets", versions_arr, root.GetAllocator()); |
970 | 2.67k | root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator()); |
971 | | |
972 | | // print all stale rowsets' version as an array |
973 | 2.67k | rapidjson::Document stale_versions_arr; |
974 | 2.67k | stale_versions_arr.SetArray(); |
975 | 12.7k | for (auto& rowset : stale_rowsets) { |
976 | 12.7k | rapidjson::Value value; |
977 | 12.7k | std::string version_str = rowset->get_rowset_info_str(); |
978 | 12.7k | value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()), |
979 | 12.7k | stale_versions_arr.GetAllocator()); |
980 | 12.7k | stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator()); |
981 | 12.7k | } |
982 | 2.67k | root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator()); |
983 | | |
984 | | // add stale version rowsets |
985 | 2.67k | root.AddMember("stale version path", path_arr, root.GetAllocator()); |
986 | | |
987 | | // to json string |
988 | 2.67k | rapidjson::StringBuffer strbuf; |
989 | 2.67k | rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf); |
990 | 2.67k | root.Accept(writer); |
991 | 2.67k | *json_result = std::string(strbuf.GetString()); |
992 | 2.67k | } |
993 | | |
994 | 215k | void CloudTablet::set_cumulative_layer_point(int64_t new_point) { |
995 | 216k | if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) { |
996 | 216k | _cumulative_point = new_point; |
997 | 216k | return; |
998 | 216k | } |
999 | | // cumulative point should only be reset to -1, or be increased |
1000 | | // FIXME: could happen in currently unresolved race conditions |
1001 | 18.4E | LOG(WARNING) << "Unexpected cumulative point: " << new_point |
1002 | 18.4E | << ", origin: " << _cumulative_point.load(); |
1003 | 18.4E | } |
1004 | | |
1005 | | Status CloudTablet::check_rowset_schema_for_build_index(std::vector<TColumn>& columns, |
1006 | 593 | int schema_version) { |
1007 | 593 | std::map<std::string, TabletColumn> fe_col_map; |
1008 | 3.97k | for (int i = 0; i < columns.size(); i++) { |
1009 | 3.38k | fe_col_map[columns[i].column_name] = TabletColumn(columns[i]); |
1010 | 3.38k | } |
1011 | | |
1012 | 593 | std::shared_lock rlock(_meta_lock); |
1013 | 1.64k | for (const auto& [version, rs] : _rs_version_map) { |
1014 | 1.64k | if (version.first == 0) { |
1015 | 565 | continue; |
1016 | 565 | } |
1017 | | |
1018 | 1.08k | if (rs->tablet_schema()->schema_version() >= schema_version) { |
1019 | 167 | continue; |
1020 | 167 | } |
1021 | | |
1022 | 6.28k | for (auto rs_col : rs->tablet_schema()->columns()) { |
1023 | 6.28k | auto find_ret = fe_col_map.find(rs_col->name()); |
1024 | 6.28k | if (find_ret == fe_col_map.end()) { |
1025 | 8 | return Status::InternalError( |
1026 | 8 | "check rowset meta failed:rowset's col is dropped in FE."); |
1027 | 8 | } |
1028 | | |
1029 | 6.27k | if (rs_col->unique_id() != find_ret->second.unique_id()) { |
1030 | 5 | return Status::InternalError("check rowset meta failed:col id not match."); |
1031 | 5 | } |
1032 | | |
1033 | 6.27k | if (rs_col->type() != find_ret->second.type()) { |
1034 | 2 | return Status::InternalError("check rowset meta failed:col type not match."); |
1035 | 2 | } |
1036 | 6.27k | } |
1037 | 916 | } |
1038 | | |
1039 | 578 | return Status::OK(); |
1040 | 593 | } |
1041 | | |
1042 | | Result<RowsetSharedPtr> CloudTablet::pick_a_rowset_for_index_change(int schema_version, |
1043 | 2.34k | bool& is_base_rowset) { |
1044 | 2.34k | TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudTablet::pick_a_rowset_for_index_change", |
1045 | 2.34k | Result<RowsetSharedPtr>(nullptr)); |
1046 | 2.34k | RowsetSharedPtr ret_rowset = nullptr; |
1047 | 2.34k | std::shared_lock rlock(_meta_lock); |
1048 | 12.9k | for (const auto& [version, rs] : _rs_version_map) { |
1049 | 12.9k | if (version.first == 0) { |
1050 | 2.34k | continue; |
1051 | 2.34k | } |
1052 | 10.5k | if (rs->num_rows() == 0) { |
1053 | 18.4E | VLOG_DEBUG << "[index_change]find empty rs, index change may " |
1054 | 18.4E | "failed, id=" |
1055 | 18.4E | << rs->rowset_id().to_string(); |
1056 | 7.26k | } |
1057 | | |
1058 | 10.5k | if (rs->tablet_schema()->schema_version() >= schema_version) { |
1059 | 4.91k | VLOG_DEBUG << "[index_change] skip rowset " << rs->tablet_schema()->schema_version() |
1060 | 1 | << "," << schema_version; |
1061 | 4.91k | continue; |
1062 | 4.91k | } |
1063 | | |
1064 | 5.66k | if (ret_rowset == nullptr) { |
1065 | 1.78k | ret_rowset = rs; |
1066 | 1.78k | continue; |
1067 | 1.78k | } |
1068 | | |
1069 | 3.88k | if (rs->start_version() > ret_rowset->start_version()) { |
1070 | 318 | ret_rowset = rs; |
1071 | 318 | } |
1072 | 3.88k | } |
1073 | | |
1074 | 2.34k | if (ret_rowset != nullptr) { |
1075 | 1.78k | is_base_rowset = ret_rowset->version().first < _cumulative_point; |
1076 | 1.78k | } |
1077 | | |
1078 | 2.34k | return ret_rowset; |
1079 | 2.34k | } |
1080 | | |
1081 | 3.84k | std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() { |
1082 | 3.84k | std::vector<RowsetSharedPtr> candidate_rowsets; |
1083 | 3.84k | { |
1084 | 3.84k | std::shared_lock rlock(_meta_lock); |
1085 | 19.3k | for (const auto& [version, rs] : _rs_version_map) { |
1086 | 19.3k | if (version.first != 0 && version.first < _cumulative_point && |
1087 | 19.3k | (_alter_version == -1 || version.second <= _alter_version)) { |
1088 | 11.2k | candidate_rowsets.push_back(rs); |
1089 | 11.2k | } |
1090 | 19.3k | } |
1091 | 3.84k | } |
1092 | 3.84k | std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); |
1093 | 3.84k | return candidate_rowsets; |
1094 | 3.84k | } |
1095 | | |
1096 | 105 | std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() { |
1097 | 105 | std::vector<RowsetSharedPtr> candidate_rowsets; |
1098 | 105 | { |
1099 | 105 | std::shared_lock rlock(_meta_lock); |
1100 | 730 | for (auto& [v, rs] : _rs_version_map) { |
1101 | | // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change) |
1102 | 730 | if (v.first != 0) { |
1103 | 625 | candidate_rowsets.push_back(rs); |
1104 | 625 | } |
1105 | 730 | } |
1106 | 105 | } |
1107 | 105 | std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator); |
1108 | 105 | return candidate_rowsets; |
1109 | 105 | } |
1110 | | |
1111 | 88 | CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() { |
1112 | 88 | return _engine.calc_delete_bitmap_executor(); |
1113 | 88 | } |
1114 | | |
1115 | | Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id, |
1116 | | DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer, |
1117 | | const RowsetIdUnorderedSet& cur_rowset_ids, int64_t lock_id, |
1118 | 53.5k | int64_t next_visible_version) { |
1119 | 53.5k | RowsetSharedPtr rowset = txn_info->rowset; |
1120 | 53.5k | int64_t cur_version = rowset->start_version(); |
1121 | | // update delete bitmap info, in order to avoid recalculation when trying again |
1122 | 53.5k | RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( |
1123 | 53.5k | txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE)); |
1124 | | |
1125 | 54.5k | if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() && |
1126 | 53.5k | rowset_writer->num_rows() > 0) { |
1127 | 20 | DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", { |
1128 | 20 | return Status::InternalError<false>("injected update_tmp_rowset error."); |
1129 | 20 | }); |
1130 | 20 | const auto& rowset_meta = rowset->rowset_meta(); |
1131 | 20 | RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta)); |
1132 | 20 | } |
1133 | | |
1134 | 53.5k | RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id, |
1135 | 53.5k | next_visible_version, rowset)); |
1136 | | |
1137 | | // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason, |
1138 | | // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do |
1139 | | // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail |
1140 | 53.5k | RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info( |
1141 | 53.5k | txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED, |
1142 | 53.5k | txn_info->publish_info)); |
1143 | | |
1144 | 53.5k | DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", { |
1145 | 53.5k | auto sleep_sec = dp->param<int>("sleep", 5); |
1146 | 53.5k | std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); |
1147 | 53.5k | }); |
1148 | | |
1149 | 53.5k | DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", { |
1150 | 53.5k | auto retry = dp->param<bool>("retry", false); |
1151 | 53.5k | auto sleep_sec = dp->param<int>("sleep", 0); |
1152 | 53.5k | std::this_thread::sleep_for(std::chrono::seconds(sleep_sec)); |
1153 | 53.5k | if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry |
1154 | 53.5k | return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>( |
1155 | 53.5k | "injected DELETE_BITMAP_LOCK_ERROR"); |
1156 | 53.5k | } else { |
1157 | 53.5k | return Status::InternalError<false>("injected non-retryable error"); |
1158 | 53.5k | } |
1159 | 53.5k | }); |
1160 | | |
1161 | 53.5k | return Status::OK(); |
1162 | 53.5k | } |
1163 | | |
1164 | | Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id, |
1165 | | DeleteBitmapPtr delete_bitmap, int64_t lock_id, |
1166 | 54.3k | int64_t next_visible_version, RowsetSharedPtr rowset) { |
1167 | 54.3k | DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); |
1168 | 54.3k | for (auto iter = delete_bitmap->delete_bitmap.begin(); |
1169 | 280k | iter != delete_bitmap->delete_bitmap.end(); ++iter) { |
1170 | | // skip sentinel mark, which is used for delete bitmap correctness check |
1171 | 226k | if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) { |
1172 | 9.90k | new_delete_bitmap->merge( |
1173 | 9.90k | {std::get<0>(iter->first), std::get<1>(iter->first), cur_version}, |
1174 | 9.90k | iter->second); |
1175 | 9.90k | } |
1176 | 226k | } |
1177 | | // lock_id != -1 means this is in an explict txn |
1178 | 54.3k | bool is_explicit_txn = (lock_id != -1); |
1179 | 54.3k | auto ms_lock_id = !is_explicit_txn ? txn_id : lock_id; |
1180 | 54.3k | std::optional<StorageResource> storage_resource; |
1181 | 54.3k | auto storage_resource_result = rowset->rowset_meta()->remote_storage_resource(); |
1182 | 54.3k | if (storage_resource_result) { |
1183 | 53.9k | storage_resource = *storage_resource_result.value(); |
1184 | 53.9k | } |
1185 | 54.3k | RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap( |
1186 | 54.3k | *this, ms_lock_id, LOAD_INITIATOR_ID, new_delete_bitmap.get(), new_delete_bitmap.get(), |
1187 | 54.3k | rowset->rowset_id().to_string(), storage_resource, |
1188 | 54.3k | config::delete_bitmap_store_write_version, txn_id, is_explicit_txn, |
1189 | 54.3k | next_visible_version)); |
1190 | 54.3k | return Status::OK(); |
1191 | 54.3k | } |
1192 | | |
1193 | 0 | Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const { |
1194 | 0 | DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version; |
1195 | | |
1196 | | // sort the existing versions in ascending order |
1197 | 0 | std::sort(existing_versions.begin(), existing_versions.end(), |
1198 | 0 | [](const Version& a, const Version& b) { |
1199 | | // simple because 2 versions are certainly not overlapping |
1200 | 0 | return a.first < b.first; |
1201 | 0 | }); |
1202 | | |
1203 | | // From the first version(=0), find the missing version until spec_version |
1204 | 0 | int64_t last_version = -1; |
1205 | 0 | Versions missed_versions; |
1206 | 0 | for (const Version& version : existing_versions) { |
1207 | 0 | if (version.first > last_version + 1) { |
1208 | | // there is a hole between versions |
1209 | 0 | missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version)); |
1210 | 0 | } |
1211 | 0 | last_version = version.second; |
1212 | 0 | if (last_version >= spec_version) { |
1213 | 0 | break; |
1214 | 0 | } |
1215 | 0 | } |
1216 | 0 | if (last_version < spec_version) { |
1217 | | // there is a hole between the last version and the specificed version. |
1218 | 0 | missed_versions.emplace_back(last_version + 1, spec_version); |
1219 | 0 | } |
1220 | 0 | return missed_versions; |
1221 | 0 | } |
1222 | | |
1223 | | Status CloudTablet::calc_delete_bitmap_for_compaction( |
1224 | | const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset, |
1225 | | const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows, |
1226 | | int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap, |
1227 | 3.68k | bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) { |
1228 | 3.68k | output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id()); |
1229 | 3.68k | std::unique_ptr<RowLocationSet> missed_rows; |
1230 | 3.68k | if ((config::enable_missing_rows_correctness_check || |
1231 | 3.68k | config::enable_mow_compaction_correctness_check_core || |
1232 | 3.69k | config::enable_mow_compaction_correctness_check_fail) && |
1233 | 3.68k | !allow_delete_in_cumu_compaction && |
1234 | 3.68k | (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION || |
1235 | 0 | !config::enable_prune_delete_sign_when_base_compaction)) { |
1236 | | // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false |
1237 | 0 | missed_rows = std::make_unique<RowLocationSet>(); |
1238 | 0 | LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id(); |
1239 | 0 | } |
1240 | | |
1241 | 3.68k | std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map; |
1242 | 3.68k | if (config::enable_rowid_conversion_correctness_check && |
1243 | 3.68k | tablet_schema()->cluster_key_uids().empty()) { |
1244 | 0 | location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>(); |
1245 | 0 | LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id(); |
1246 | 0 | } |
1247 | | |
1248 | | // 1. calc delete bitmap for historical data |
1249 | 3.68k | RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this)); |
1250 | 3.68k | Version version = max_version(); |
1251 | 3.68k | std::size_t missed_rows_size = 0; |
1252 | 3.68k | calc_compaction_output_rowset_delete_bitmap( |
1253 | 3.68k | input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(), |
1254 | 3.68k | location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get()); |
1255 | 3.68k | if (missed_rows) { |
1256 | 0 | missed_rows_size = missed_rows->size(); |
1257 | 0 | if (!allow_delete_in_cumu_compaction) { |
1258 | 0 | if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION || |
1259 | 0 | !config::enable_prune_delete_sign_when_base_compaction) && |
1260 | 0 | tablet_state() == TABLET_RUNNING) { |
1261 | 0 | if (merged_rows + filtered_rows >= 0 && |
1262 | 0 | merged_rows + filtered_rows != missed_rows_size) { |
1263 | 0 | std::string err_msg = fmt::format( |
1264 | 0 | "cumulative compaction: the merged rows({}), the filtered rows({}) is " |
1265 | 0 | "not equal to missed rows({}) in rowid conversion, tablet_id: {}, " |
1266 | 0 | "table_id:{}", |
1267 | 0 | merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id()); |
1268 | 0 | LOG(WARNING) << err_msg; |
1269 | 0 | if (config::enable_mow_compaction_correctness_check_core) { |
1270 | 0 | CHECK(false) << err_msg; |
1271 | 0 | } else if (config::enable_mow_compaction_correctness_check_fail) { |
1272 | 0 | return Status::InternalError<false>(err_msg); |
1273 | 0 | } else { |
1274 | 0 | DCHECK(false) << err_msg; |
1275 | 0 | } |
1276 | 0 | } |
1277 | 0 | } |
1278 | 0 | } |
1279 | 0 | } |
1280 | 3.68k | if (location_map) { |
1281 | 0 | RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map)); |
1282 | 0 | location_map->clear(); |
1283 | 0 | } |
1284 | | |
1285 | | // 2. calc delete bitmap for incremental data |
1286 | 3.68k | int64_t t1 = MonotonicMicros(); |
1287 | 3.68k | RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock( |
1288 | 3.68k | *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator)); |
1289 | 3.68k | int64_t t2 = MonotonicMicros(); |
1290 | 3.68k | if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) { |
1291 | 3.67k | g_cu_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000; |
1292 | 3.67k | } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) { |
1293 | 17 | g_base_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000; |
1294 | 17 | } |
1295 | 3.68k | get_delete_bitmap_lock_start_time = t2; |
1296 | 3.68k | RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this)); |
1297 | 3.68k | int64_t t3 = MonotonicMicros(); |
1298 | | |
1299 | 3.68k | calc_compaction_output_rowset_delete_bitmap( |
1300 | 3.68k | input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(), |
1301 | 3.68k | location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get()); |
1302 | 3.68k | int64_t t4 = MonotonicMicros(); |
1303 | 3.68k | if (location_map) { |
1304 | 0 | RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map)); |
1305 | 0 | } |
1306 | 3.68k | int64_t t5 = MonotonicMicros(); |
1307 | | |
1308 | | // 3. store delete bitmap |
1309 | 3.68k | DeleteBitmapPtr delete_bitmap_v2 = nullptr; |
1310 | 3.68k | auto delete_bitmap_size = output_rowset_delete_bitmap->delete_bitmap.size(); |
1311 | 3.68k | auto store_version = config::delete_bitmap_store_write_version; |
1312 | 3.69k | if (store_version == 2 || store_version == 3) { |
1313 | 0 | delete_bitmap_v2 = std::make_shared<DeleteBitmap>(*output_rowset_delete_bitmap); |
1314 | 0 | std::vector<std::pair<RowsetId, int64_t>> retained_rowsets_to_seg_num; |
1315 | 0 | { |
1316 | 0 | std::shared_lock rlock(get_header_lock()); |
1317 | 0 | for (const auto& [rowset_version, rowset_ptr] : rowset_map()) { |
1318 | 0 | if (rowset_version.second < output_rowset->start_version()) { |
1319 | 0 | retained_rowsets_to_seg_num.emplace_back( |
1320 | 0 | std::make_pair(rowset_ptr->rowset_id(), rowset_ptr->num_segments())); |
1321 | 0 | } |
1322 | 0 | } |
1323 | 0 | } |
1324 | 0 | if (config::enable_agg_delta_delete_bitmap_for_store_v2) { |
1325 | 0 | tablet_meta()->delete_bitmap().subset_and_agg( |
1326 | 0 | retained_rowsets_to_seg_num, output_rowset->start_version(), |
1327 | 0 | output_rowset->end_version(), delete_bitmap_v2.get()); |
1328 | 0 | } else { |
1329 | 0 | tablet_meta()->delete_bitmap().subset( |
1330 | 0 | retained_rowsets_to_seg_num, output_rowset->start_version(), |
1331 | 0 | output_rowset->end_version(), delete_bitmap_v2.get()); |
1332 | 0 | } |
1333 | 0 | } |
1334 | 3.68k | std::optional<StorageResource> storage_resource; |
1335 | 3.68k | auto storage_resource_result = output_rowset->rowset_meta()->remote_storage_resource(); |
1336 | 3.69k | if (storage_resource_result) { |
1337 | 3.69k | storage_resource = *storage_resource_result.value(); |
1338 | 3.69k | } |
1339 | 3.68k | auto st = _engine.meta_mgr().update_delete_bitmap( |
1340 | 3.68k | *this, -1, initiator, output_rowset_delete_bitmap.get(), delete_bitmap_v2.get(), |
1341 | 3.68k | output_rowset->rowset_id().to_string(), storage_resource, store_version); |
1342 | 3.68k | int64_t t6 = MonotonicMicros(); |
1343 | 3.68k | LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id() |
1344 | 3.68k | << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2) |
1345 | 3.68k | << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost " |
1346 | 3.68k | << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5) |
1347 | 3.68k | << " us, st=" << st.to_string() << ". store_version=" << store_version |
1348 | 3.68k | << ", calculated delete bitmap size=" << delete_bitmap_size |
1349 | 3.68k | << ", update delete bitmap size=" |
1350 | 3.68k | << output_rowset_delete_bitmap->delete_bitmap.size(); |
1351 | 3.68k | return st; |
1352 | 3.68k | } |
1353 | | |
1354 | | void CloudTablet::agg_delete_bitmap_for_compaction( |
1355 | | int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets, |
1356 | | DeleteBitmapPtr& new_delete_bitmap, |
1357 | 3.57k | std::map<std::string, int64_t>& pre_rowset_to_versions) { |
1358 | 10.1k | for (auto& rowset : pre_rowsets) { |
1359 | 15.6k | for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) { |
1360 | 5.47k | auto d = tablet_meta()->delete_bitmap().get_agg_without_cache( |
1361 | 5.47k | {rowset->rowset_id(), seg_id, end_version}, start_version); |
1362 | 5.47k | if (d->isEmpty()) { |
1363 | 4.11k | continue; |
1364 | 4.11k | } |
1365 | 18.4E | VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id() |
1366 | 18.4E | << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id |
1367 | 18.4E | << ", rowset_version=" << rowset->version().to_string() |
1368 | 18.4E | << ". compaction start_version=" << start_version |
1369 | 18.4E | << ", end_version=" << end_version |
1370 | 18.4E | << ". delete_bitmap cardinality=" << d->cardinality(); |
1371 | 1.35k | DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version}; |
1372 | 1.35k | new_delete_bitmap->set(end_key, *d); |
1373 | 1.35k | pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second; |
1374 | 1.35k | } |
1375 | 10.1k | } |
1376 | 3.57k | } |
1377 | | |
1378 | 59.8k | Status CloudTablet::sync_meta() { |
1379 | 59.8k | if (!config::enable_file_cache) { |
1380 | 2 | return Status::OK(); |
1381 | 2 | } |
1382 | | |
1383 | 59.8k | TabletMetaSharedPtr tablet_meta; |
1384 | 59.8k | auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta); |
1385 | 59.8k | if (!st.ok()) { |
1386 | 0 | if (st.is<ErrorCode::NOT_FOUND>()) { |
1387 | 0 | clear_cache(); |
1388 | 0 | } |
1389 | 0 | return st; |
1390 | 0 | } |
1391 | | |
1392 | 59.8k | auto new_compaction_policy = tablet_meta->compaction_policy(); |
1393 | 59.8k | if (_tablet_meta->compaction_policy() != new_compaction_policy) { |
1394 | 2 | _tablet_meta->set_compaction_policy(new_compaction_policy); |
1395 | 2 | } |
1396 | 59.8k | auto new_time_series_compaction_goal_size_mbytes = |
1397 | 59.8k | tablet_meta->time_series_compaction_goal_size_mbytes(); |
1398 | 59.8k | if (_tablet_meta->time_series_compaction_goal_size_mbytes() != |
1399 | 59.8k | new_time_series_compaction_goal_size_mbytes) { |
1400 | 0 | _tablet_meta->set_time_series_compaction_goal_size_mbytes( |
1401 | 0 | new_time_series_compaction_goal_size_mbytes); |
1402 | 0 | } |
1403 | 59.8k | auto new_time_series_compaction_file_count_threshold = |
1404 | 59.8k | tablet_meta->time_series_compaction_file_count_threshold(); |
1405 | 59.8k | if (_tablet_meta->time_series_compaction_file_count_threshold() != |
1406 | 59.8k | new_time_series_compaction_file_count_threshold) { |
1407 | 0 | _tablet_meta->set_time_series_compaction_file_count_threshold( |
1408 | 0 | new_time_series_compaction_file_count_threshold); |
1409 | 0 | } |
1410 | 59.8k | auto new_time_series_compaction_time_threshold_seconds = |
1411 | 59.8k | tablet_meta->time_series_compaction_time_threshold_seconds(); |
1412 | 59.8k | if (_tablet_meta->time_series_compaction_time_threshold_seconds() != |
1413 | 59.8k | new_time_series_compaction_time_threshold_seconds) { |
1414 | 0 | _tablet_meta->set_time_series_compaction_time_threshold_seconds( |
1415 | 0 | new_time_series_compaction_time_threshold_seconds); |
1416 | 0 | } |
1417 | 59.8k | auto new_time_series_compaction_empty_rowsets_threshold = |
1418 | 59.8k | tablet_meta->time_series_compaction_empty_rowsets_threshold(); |
1419 | 59.8k | if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() != |
1420 | 59.8k | new_time_series_compaction_empty_rowsets_threshold) { |
1421 | 0 | _tablet_meta->set_time_series_compaction_empty_rowsets_threshold( |
1422 | 0 | new_time_series_compaction_empty_rowsets_threshold); |
1423 | 0 | } |
1424 | 59.8k | auto new_time_series_compaction_level_threshold = |
1425 | 59.8k | tablet_meta->time_series_compaction_level_threshold(); |
1426 | 59.8k | if (_tablet_meta->time_series_compaction_level_threshold() != |
1427 | 59.8k | new_time_series_compaction_level_threshold) { |
1428 | 0 | _tablet_meta->set_time_series_compaction_level_threshold( |
1429 | 0 | new_time_series_compaction_level_threshold); |
1430 | 0 | } |
1431 | | // Sync disable_auto_compaction (stored in tablet_schema) |
1432 | 59.8k | auto new_disable_auto_compaction = tablet_meta->tablet_schema()->disable_auto_compaction(); |
1433 | 59.8k | if (_tablet_meta->tablet_schema()->disable_auto_compaction() != new_disable_auto_compaction) { |
1434 | 9 | _tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction( |
1435 | 9 | new_disable_auto_compaction); |
1436 | 9 | } |
1437 | | // Sync vertical_compaction_num_columns_per_group |
1438 | 59.8k | auto new_vertical_compaction_num_columns_per_group = |
1439 | 59.8k | tablet_meta->vertical_compaction_num_columns_per_group(); |
1440 | 59.8k | if (_tablet_meta->vertical_compaction_num_columns_per_group() != |
1441 | 59.8k | new_vertical_compaction_num_columns_per_group) { |
1442 | 0 | _tablet_meta->set_vertical_compaction_num_columns_per_group( |
1443 | 0 | new_vertical_compaction_num_columns_per_group); |
1444 | 0 | } |
1445 | | |
1446 | 59.8k | return Status::OK(); |
1447 | 59.8k | } |
1448 | | |
1449 | 4.59M | void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) { |
1450 | 4.59M | std::shared_lock rdlock(_meta_lock); |
1451 | 4.59M | tablet_info->__set_total_version_count(_tablet_meta->version_count()); |
1452 | 4.59M | tablet_info->__set_tablet_id(_tablet_meta->tablet_id()); |
1453 | | // Currently, this information will not be used by the cloud report, |
1454 | | // but it may be used in the future. |
1455 | 4.59M | } |
1456 | | |
1457 | | Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id, |
1458 | 54.7k | DeleteBitmap* expected_delete_bitmap) { |
1459 | 54.7k | DeleteBitmapPtr cached_delete_bitmap; |
1460 | 54.7k | CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud(); |
1461 | 54.7k | Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap( |
1462 | 54.7k | txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr); |
1463 | 54.7k | if (st.ok()) { |
1464 | 54.4k | bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality()); |
1465 | 54.4k | auto msg = fmt::format( |
1466 | 54.4k | "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}" |
1467 | 54.4k | "txn_id={}, tablet_id={}", |
1468 | 54.4k | expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id, |
1469 | 54.4k | tablet_id()); |
1470 | 54.4k | if (!res) { |
1471 | 0 | DCHECK(res) << msg; |
1472 | 0 | return Status::InternalError<false>(msg); |
1473 | 0 | } |
1474 | 54.4k | } |
1475 | 54.7k | return Status::OK(); |
1476 | 54.7k | } |
1477 | | |
1478 | 70 | WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) { |
1479 | 70 | std::shared_lock rlock(_meta_lock); |
1480 | 70 | if (!_rowset_warm_up_states.contains(rowset_id)) { |
1481 | 2 | return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE}; |
1482 | 2 | } |
1483 | 68 | auto& warmup_info = _rowset_warm_up_states[rowset_id]; |
1484 | 68 | warmup_info.update_state(); |
1485 | 68 | return warmup_info.state; |
1486 | 70 | } |
1487 | | |
1488 | | bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source, |
1489 | 68 | std::chrono::steady_clock::time_point start_tp) { |
1490 | 68 | std::lock_guard wlock(_meta_lock); |
1491 | 68 | return add_rowset_warmup_state_unlocked(rowset, source, start_tp); |
1492 | 68 | } |
1493 | | |
1494 | | bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source, |
1495 | 10 | RowsetId rowset_id, int64_t delta) { |
1496 | 10 | std::lock_guard wlock(_meta_lock); |
1497 | 10 | return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta); |
1498 | 10 | } |
1499 | | |
1500 | | bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source, |
1501 | | RowsetId rowset_id, |
1502 | 5.14k | int64_t delta) { |
1503 | 5.14k | auto it = _rowset_warm_up_states.find(rowset_id); |
1504 | 5.14k | if (it == _rowset_warm_up_states.end()) { |
1505 | 0 | return false; |
1506 | 0 | } |
1507 | 5.14k | if (it->second.state.trigger_source != source) { |
1508 | | // Only the same trigger source can update the state |
1509 | 4 | return false; |
1510 | 4 | } |
1511 | 5.14k | it->second.num_inverted_idx += delta; |
1512 | 5.14k | return true; |
1513 | 5.14k | } |
1514 | | |
1515 | | bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset, |
1516 | | WarmUpTriggerSource source, |
1517 | 52.9k | std::chrono::steady_clock::time_point start_tp) { |
1518 | 52.9k | auto rowset_id = rowset.rowset_id(); |
1519 | | |
1520 | | // Check if rowset already has warmup state |
1521 | 52.9k | if (_rowset_warm_up_states.contains(rowset_id)) { |
1522 | 20 | auto existing_state = _rowset_warm_up_states[rowset_id].state; |
1523 | | |
1524 | | // For job-triggered warmup (one-time and periodic warmup), allow it to proceed |
1525 | | // except when there's already another job-triggered warmup in progress |
1526 | 20 | if (source == WarmUpTriggerSource::JOB) { |
1527 | 10 | if (existing_state.trigger_source == WarmUpTriggerSource::JOB && |
1528 | 10 | existing_state.progress == WarmUpProgress::DOING) { |
1529 | | // Same job type already in progress, skip to avoid duplicate warmup |
1530 | 2 | return false; |
1531 | 2 | } |
1532 | 10 | } else { |
1533 | | // For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists |
1534 | 10 | return false; |
1535 | 10 | } |
1536 | 20 | } |
1537 | | |
1538 | 52.9k | if (source == WarmUpTriggerSource::JOB) { |
1539 | 18 | g_file_cache_warm_up_rowset_triggered_by_job_num << 1; |
1540 | 52.9k | } else if (source == WarmUpTriggerSource::SYNC_ROWSET) { |
1541 | 52.9k | g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1; |
1542 | 18.4E | } else if (source == WarmUpTriggerSource::EVENT_DRIVEN) { |
1543 | 26 | g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1; |
1544 | 26 | } |
1545 | 52.9k | _rowset_warm_up_states[rowset_id] = { |
1546 | 52.9k | .state = {.trigger_source = source, |
1547 | 52.9k | .progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE |
1548 | 52.9k | : WarmUpProgress::DOING)}, |
1549 | 52.9k | .num_segments = rowset.num_segments(), |
1550 | 52.9k | .start_tp = start_tp}; |
1551 | 52.9k | return true; |
1552 | 52.9k | } |
1553 | | |
1554 | 58.1k | void CloudTablet::RowsetWarmUpInfo::update_state() { |
1555 | 58.1k | if (has_finished()) { |
1556 | 52.9k | g_file_cache_warm_up_rowset_complete_num << 1; |
1557 | 52.9k | auto cost = std::chrono::duration_cast<std::chrono::milliseconds>( |
1558 | 52.9k | std::chrono::steady_clock::now() - start_tp) |
1559 | 52.9k | .count(); |
1560 | 52.9k | g_file_cache_warm_up_rowset_all_segments_latency << cost; |
1561 | 52.9k | state.progress = WarmUpProgress::DONE; |
1562 | 52.9k | } |
1563 | 58.1k | } |
1564 | | |
1565 | | WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source, |
1566 | | RowsetId rowset_id, Status status, |
1567 | | int64_t segment_num, |
1568 | 58.0k | int64_t inverted_idx_num) { |
1569 | 58.0k | std::lock_guard wlock(_meta_lock); |
1570 | 58.0k | auto it = _rowset_warm_up_states.find(rowset_id); |
1571 | 58.0k | if (it == _rowset_warm_up_states.end()) { |
1572 | 2 | return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE}; |
1573 | 2 | } |
1574 | 58.0k | auto& warmup_info = it->second; |
1575 | 58.0k | if (warmup_info.state.trigger_source != trigger_source) { |
1576 | | // Only the same trigger source can update the state |
1577 | 4 | return warmup_info.state; |
1578 | 4 | } |
1579 | 18.4E | VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status; |
1580 | 58.0k | if (segment_num > 0) { |
1581 | 52.9k | g_file_cache_warm_up_segment_complete_num << segment_num; |
1582 | 52.9k | if (!status.ok()) { |
1583 | 2 | g_file_cache_warm_up_segment_failed_num << segment_num; |
1584 | 2 | } |
1585 | 52.9k | } |
1586 | 58.0k | if (inverted_idx_num > 0) { |
1587 | 5.14k | g_file_cache_warm_up_inverted_idx_complete_num << inverted_idx_num; |
1588 | 5.14k | if (!status.ok()) { |
1589 | 0 | g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num; |
1590 | 0 | } |
1591 | 5.14k | } |
1592 | 58.0k | warmup_info.done(segment_num, inverted_idx_num); |
1593 | 58.0k | return warmup_info.state; |
1594 | 58.0k | } |
1595 | | |
1596 | 564 | bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const { |
1597 | 564 | auto it = _rowset_warm_up_states.find(rowset_id); |
1598 | 564 | if (it == _rowset_warm_up_states.end()) { |
1599 | | // The rowset is not in warmup state, which means the rowset has never been warmed up. |
1600 | | // This may happen when the upstream BE tried to warm up rowsets on this BE but this BE |
1601 | | // was restarting so the warmup failed, and _rowset_warm_up_states has no entry for it. |
1602 | | // |
1603 | | // Normally the startup_timepoint check in rowset_is_warmed_up_unlocked() would filter out |
1604 | | // such rowsets (visible_timestamp < startup_timepoint → assumed warmed up). However, |
1605 | | // compaction-produced rowsets have their visible_timestamp set at rowset builder |
1606 | | // initialization time rather than the final transaction commit time on meta-service, |
1607 | | // so their visible_timestamp can be earlier than startup_timepoint, causing the |
1608 | | // startup_timepoint check to NOT filter them out and reaching here with no warmup entry. |
1609 | | // |
1610 | | // If such a rowset is before the cumulative compaction point and base compaction never |
1611 | | // happens, returning false here would cause the version path algorithm to exclude it, |
1612 | | // leading to a persistently low path_max_version. With continuous upstream ingestion, |
1613 | | // the freshness tolerance fallback check would keep triggering, making every query on |
1614 | | // this tablet fall back to reading all data from remote storage. |
1615 | | // |
1616 | | // Returning true (optimistically treating it as warmed up) allows the version path to |
1617 | | // include it. On cache miss the data is transparently read from remote storage per-segment |
1618 | | // and cached locally in 1MB blocks, so the problem self-heals through subsequent queries. |
1619 | 14 | g_rowset_warmup_state_missing_count << 1; |
1620 | 14 | LOG_EVERY_N(WARNING, 100) << fmt::format( |
1621 | 2 | "rowset warmup state missing, considering it as warmed up. tablet_id={}, " |
1622 | 2 | "rowset_id={}", |
1623 | 2 | tablet_id(), rowset_id.to_string()); |
1624 | 14 | return true; |
1625 | 14 | } |
1626 | 550 | return it->second.state.progress == WarmUpProgress::DONE; |
1627 | 564 | } |
1628 | | |
1629 | 1.35k | void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) { |
1630 | 1.35k | _rowset_warm_up_states[rowset_id] = { |
1631 | 1.35k | .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET, |
1632 | 1.35k | .progress = WarmUpProgress::DONE}, |
1633 | 1.35k | .num_segments = 1, |
1634 | 1.35k | .start_tp = std::chrono::steady_clock::now()}; |
1635 | 1.35k | } |
1636 | | |
1637 | 186 | void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) { |
1638 | 186 | _rowset_warm_up_states[rowset_id] = { |
1639 | 186 | .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET, |
1640 | 186 | .progress = WarmUpProgress::DOING}, |
1641 | 186 | .num_segments = 1, |
1642 | 186 | .start_tp = std::chrono::steady_clock::now()}; |
1643 | 186 | } |
1644 | | |
1645 | | bool CloudTablet::_check_rowset_should_be_visible_but_not_warmed_up( |
1646 | | const RowsetMetaSharedPtr& rs_meta, int64_t path_max_version, |
1647 | 918 | std::chrono::system_clock::time_point freshness_limit_tp) const { |
1648 | 918 | if (rs_meta->version() == Version {0, 1}) { |
1649 | | // skip rowset[0-1] |
1650 | 48 | return false; |
1651 | 48 | } |
1652 | 870 | bool ret = rs_meta->start_version() > path_max_version && |
1653 | 870 | rs_meta->visible_timestamp() < freshness_limit_tp; |
1654 | 870 | if (ret && config::read_cluster_cache_opt_verbose_log) { |
1655 | 10 | using namespace std::chrono; |
1656 | 10 | std::time_t t1 = system_clock::to_time_t(rs_meta->visible_timestamp()); |
1657 | 10 | std::tm tm1 = *std::localtime(&t1); |
1658 | 10 | std::ostringstream oss1; |
1659 | 10 | oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S"); |
1660 | | |
1661 | 10 | std::time_t t2 = system_clock::to_time_t(freshness_limit_tp); |
1662 | 10 | std::tm tm2 = *std::localtime(&t2); |
1663 | 10 | std::ostringstream oss2; |
1664 | 10 | oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S"); |
1665 | 10 | LOG_INFO( |
1666 | 10 | "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, " |
1667 | 10 | "find a rowset which should be visible but not warmed up, tablet_id={}, " |
1668 | 10 | "path_max_version={}, rowset_id={}, version={}, visible_time={}, " |
1669 | 10 | "freshness_limit={}, version_graph={}, rowset_warmup_digest={}", |
1670 | 10 | tablet_id(), path_max_version, rs_meta->rowset_id().to_string(), |
1671 | 10 | rs_meta->version().to_string(), oss1.str(), oss2.str(), |
1672 | 10 | _timestamped_version_tracker.debug_string(), rowset_warmup_digest()); |
1673 | 10 | } |
1674 | 870 | return ret; |
1675 | 918 | } |
1676 | | |
1677 | | void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs, |
1678 | | const StorageResource* storage_resource, int seg_id, |
1679 | | |
1680 | 52.9k | int64_t expiration_time) { |
1681 | | // clang-format off |
1682 | 52.9k | const auto& rowset_meta = rs->rowset_meta(); |
1683 | 52.9k | auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this()); |
1684 | | // Use rowset_meta->fs() instead of storage_resource->fs to support packed file. |
1685 | | // RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when |
1686 | | // packed_slice_locations is not empty, which correctly maps segment file paths |
1687 | | // to their actual locations within packed files. |
1688 | 52.9k | auto file_system = rowset_meta->fs(); |
1689 | 52.9k | if (!file_system) { |
1690 | 0 | LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id() |
1691 | 0 | << ", rowset_id=" << rowset_meta->rowset_id(); |
1692 | 0 | return; |
1693 | 0 | } |
1694 | 52.9k | _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta { |
1695 | 52.9k | .path = storage_resource->remote_segment_path(*rowset_meta, seg_id), |
1696 | 52.9k | .file_size = rs->rowset_meta()->segment_file_size(seg_id), |
1697 | 52.9k | .file_system = file_system, |
1698 | 52.9k | .ctx = { |
1699 | 52.9k | .expiration_time = expiration_time, |
1700 | 52.9k | .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
1701 | 52.9k | .is_warmup = true |
1702 | 52.9k | }, |
1703 | 52.9k | .download_done {[=](Status st) { |
1704 | 52.9k | DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", { |
1705 | 52.9k | if (rs->version().second > rs->version().first) { |
1706 | 52.9k | auto sleep_time = dp->param<int>("sleep", 3); |
1707 | 52.9k | LOG_INFO( |
1708 | 52.9k | "[verbose] block download for rowset={}, " |
1709 | 52.9k | "version={}, sleep={}", |
1710 | 52.9k | rs->rowset_id().to_string(), |
1711 | 52.9k | rs->version().to_string(), sleep_time); |
1712 | 52.9k | std::this_thread::sleep_for( |
1713 | 52.9k | std::chrono::seconds(sleep_time)); |
1714 | 52.9k | } |
1715 | 52.9k | }); |
1716 | 52.9k | self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0); |
1717 | 52.9k | if (!st) { |
1718 | 0 | LOG_WARNING("add rowset warm up error ").error(st); |
1719 | 0 | } |
1720 | 52.9k | }}, |
1721 | 52.9k | }); |
1722 | | // clang-format on |
1723 | 52.9k | } |
1724 | | |
1725 | | void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs, |
1726 | | const StorageResource* storage_resource, |
1727 | | const io::Path& idx_path, int64_t idx_size, |
1728 | 5.13k | int64_t expiration_time) { |
1729 | | // clang-format off |
1730 | 5.13k | const auto& rowset_meta = rs->rowset_meta(); |
1731 | 5.13k | auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this()); |
1732 | | // Use rowset_meta->fs() instead of storage_resource->fs to support packed file for idx files. |
1733 | 5.13k | auto file_system = rowset_meta->fs(); |
1734 | 5.13k | if (!file_system) { |
1735 | 0 | LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id() |
1736 | 0 | << ", rowset_id=" << rowset_meta->rowset_id(); |
1737 | 0 | return; |
1738 | 0 | } |
1739 | 5.13k | io::DownloadFileMeta meta { |
1740 | 5.13k | .path = idx_path, |
1741 | 5.13k | .file_size = idx_size, |
1742 | 5.13k | .file_system = file_system, |
1743 | 5.13k | .ctx = { |
1744 | 5.13k | .expiration_time = expiration_time, |
1745 | 5.13k | .is_dryrun = config::enable_reader_dryrun_when_download_file_cache, |
1746 | 5.13k | .is_warmup = true |
1747 | 5.13k | }, |
1748 | 5.13k | .download_done {[=](Status st) { |
1749 | 5.13k | DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_idx.callback.block", { |
1750 | 5.13k | auto sleep_time = dp->param<int>("sleep", 3); |
1751 | 5.13k | LOG_INFO( |
1752 | 5.13k | "[verbose] block download for " |
1753 | 5.13k | "rowset={}, inverted_idx_file={}, " |
1754 | 5.13k | "sleep={}", |
1755 | 5.13k | rs->rowset_id().to_string(), idx_path.string(), sleep_time); |
1756 | 5.13k | std::this_thread::sleep_for(std::chrono::seconds(sleep_time)); |
1757 | 5.13k | }); |
1758 | 5.13k | self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1); |
1759 | 5.13k | if (!st) { |
1760 | 0 | LOG_WARNING("add rowset warm up error ").error(st); |
1761 | 0 | } |
1762 | 5.13k | }}, |
1763 | 5.13k | }; |
1764 | 5.13k | self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1); |
1765 | 5.13k | _engine.file_cache_block_downloader().submit_download_task(std::move(meta)); |
1766 | 5.13k | g_file_cache_cloud_tablet_submitted_index_num << 1; |
1767 | 5.13k | g_file_cache_cloud_tablet_submitted_index_size << idx_size; |
1768 | | // clang-format on |
1769 | 5.13k | } |
1770 | | |
1771 | | void CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets, |
1772 | 319k | bool warmup_delta_data) { |
1773 | | #ifdef BE_TEST |
1774 | | warmup_delta_data = false; |
1775 | | #endif |
1776 | 339k | for (auto& rs : rowsets) { |
1777 | 339k | if (warmup_delta_data) { |
1778 | | // Pre-set encryption algorithm to avoid re-entrant get_tablet() call |
1779 | | // inside RowsetMeta::fs() which causes SingleFlight deadlock when the |
1780 | | // tablet is not yet cached (during initial load_tablet). |
1781 | 178k | rs->rowset_meta()->set_encryption_algorithm(_tablet_meta->encryption_algorithm()); |
1782 | 178k | bool warm_up_state_updated = false; |
1783 | | // Warmup rowset data in background |
1784 | 231k | for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) { |
1785 | 52.9k | const auto& rowset_meta = rs->rowset_meta(); |
1786 | 52.9k | constexpr int64_t interval = 600; // 10 mins |
1787 | | // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time. |
1788 | | // So we need to filter out the old rowsets avoid to download the whole table. |
1789 | 52.9k | if (warmup_delta_data && |
1790 | 52.9k | ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) { |
1791 | 0 | continue; |
1792 | 0 | } |
1793 | | |
1794 | 52.9k | auto storage_resource = rowset_meta->remote_storage_resource(); |
1795 | 52.9k | if (!storage_resource) { |
1796 | 0 | LOG(WARNING) << storage_resource.error(); |
1797 | 0 | continue; |
1798 | 0 | } |
1799 | | |
1800 | 52.9k | int64_t expiration_time = _tablet_meta->ttl_seconds(); |
1801 | 52.9k | g_file_cache_cloud_tablet_submitted_segment_num << 1; |
1802 | 52.9k | if (rs->rowset_meta()->segment_file_size(seg_id) > 0) { |
1803 | 52.9k | g_file_cache_cloud_tablet_submitted_segment_size |
1804 | 52.9k | << rs->rowset_meta()->segment_file_size(seg_id); |
1805 | 52.9k | } |
1806 | 52.9k | if (!warm_up_state_updated) { |
1807 | 52.9k | VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id() |
1808 | 2 | << ") triggerd by sync rowset"; |
1809 | 52.9k | if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()), |
1810 | 52.9k | WarmUpTriggerSource::SYNC_ROWSET)) { |
1811 | 0 | LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id() |
1812 | 0 | << ", skip it"; |
1813 | 0 | break; |
1814 | 0 | } |
1815 | 52.9k | warm_up_state_updated = true; |
1816 | 52.9k | } |
1817 | | |
1818 | 52.9k | if (!config::file_cache_enable_only_warm_up_idx) { |
1819 | 52.9k | _submit_segment_download_task(rs, storage_resource.value(), seg_id, |
1820 | 52.9k | expiration_time); |
1821 | 52.9k | } |
1822 | | |
1823 | 52.9k | auto schema_ptr = rowset_meta->tablet_schema(); |
1824 | 52.9k | auto idx_version = schema_ptr->get_inverted_index_storage_format(); |
1825 | 52.9k | if (idx_version == InvertedIndexStorageFormatPB::V1) { |
1826 | 1.14k | std::unordered_map<int64_t, int64_t> index_size_map; |
1827 | 1.14k | auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id); |
1828 | 1.14k | for (const auto& info : inverted_index_info.index_info()) { |
1829 | 434 | if (info.index_file_size() != -1) { |
1830 | 434 | index_size_map[info.index_id()] = info.index_file_size(); |
1831 | 434 | } else { |
1832 | 0 | VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id |
1833 | 0 | << ", index_id " << info.index_id(); |
1834 | 0 | } |
1835 | 434 | } |
1836 | 1.14k | for (const auto& index : schema_ptr->inverted_indexes()) { |
1837 | 434 | auto idx_path = storage_resource.value()->remote_idx_v1_path( |
1838 | 434 | *rowset_meta, seg_id, index->index_id(), index->get_index_suffix()); |
1839 | 434 | _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path, |
1840 | 434 | index_size_map[index->index_id()], |
1841 | 434 | expiration_time); |
1842 | 434 | } |
1843 | 51.7k | } else { |
1844 | 51.7k | if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) { |
1845 | 4.70k | auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id); |
1846 | 4.70k | int64_t idx_size = 0; |
1847 | 4.70k | if (inverted_index_info.has_index_size()) { |
1848 | 4.68k | idx_size = inverted_index_info.index_size(); |
1849 | 4.68k | } else { |
1850 | 19 | VLOG_DEBUG << "index_size is not set for segment " << seg_id; |
1851 | 19 | } |
1852 | 4.70k | auto idx_path = |
1853 | 4.70k | storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id); |
1854 | 4.70k | _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path, |
1855 | 4.70k | idx_size, expiration_time); |
1856 | 4.70k | } |
1857 | 51.7k | } |
1858 | 52.9k | } |
1859 | 178k | } |
1860 | 339k | _rs_version_map.emplace(rs->version(), rs); |
1861 | 339k | _timestamped_version_tracker.add_version(rs->version()); |
1862 | 339k | _max_version = std::max(rs->end_version(), _max_version); |
1863 | 339k | update_base_size(*rs); |
1864 | 339k | } |
1865 | 319k | _tablet_meta->add_rowsets_unchecked(rowsets); |
1866 | 319k | } |
1867 | | |
1868 | 925k | void CloudTablet::clear_unused_visible_pending_rowsets() { |
1869 | 925k | int64_t cur_max_version = max_version().second; |
1870 | 925k | int32_t max_version_count = max_version_config(); |
1871 | 925k | int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>( |
1872 | 925k | std::chrono::system_clock::now().time_since_epoch()) |
1873 | 925k | .count(); |
1874 | | |
1875 | 925k | std::unique_lock<std::mutex> wlock(_visible_pending_rs_lock); |
1876 | 1.05M | for (auto it = _visible_pending_rs_map.begin(); it != _visible_pending_rs_map.end();) { |
1877 | 124k | if (int64_t version = it->first, expiration_time = it->second.expiration_time; |
1878 | 124k | version <= cur_max_version || expiration_time < current_time) { |
1879 | 124k | it = _visible_pending_rs_map.erase(it); |
1880 | 124k | } else { |
1881 | 33 | ++it; |
1882 | 33 | } |
1883 | 124k | } |
1884 | | |
1885 | 925k | while (!_visible_pending_rs_map.empty() && _visible_pending_rs_map.size() > max_version_count) { |
1886 | 0 | _visible_pending_rs_map.erase(--_visible_pending_rs_map.end()); |
1887 | 0 | } |
1888 | 925k | } |
1889 | | |
1890 | | void CloudTablet::try_make_committed_rs_visible(int64_t txn_id, int64_t visible_version, |
1891 | 179k | int64_t version_update_time_ms) { |
1892 | 179k | if (enable_unique_key_merge_on_write()) { |
1893 | | // for mow tablet, we get committed rowset from `CloudTxnDeleteBitmapCache` rather than `CommittedRowsetManager` |
1894 | 55.3k | try_make_committed_rs_visible_for_mow(txn_id, visible_version, version_update_time_ms); |
1895 | 55.3k | return; |
1896 | 55.3k | } |
1897 | | |
1898 | 124k | auto& committed_rs_mgr = _engine.committed_rs_mgr(); |
1899 | 124k | auto res = committed_rs_mgr.get_committed_rowset(txn_id, tablet_id()); |
1900 | 124k | if (!res.has_value()) { |
1901 | 0 | return; |
1902 | 0 | } |
1903 | 124k | auto [rowset_meta, expiration_time] = res.value(); |
1904 | 124k | bool is_empty_rowset = (rowset_meta == nullptr); |
1905 | 124k | if (!is_empty_rowset) { |
1906 | 124k | rowset_meta->set_cloud_fields_after_visible(visible_version, version_update_time_ms); |
1907 | 124k | } |
1908 | 124k | { |
1909 | 124k | std::lock_guard<std::mutex> lock(_visible_pending_rs_lock); |
1910 | 124k | _visible_pending_rs_map.emplace( |
1911 | 124k | visible_version, |
1912 | 124k | VisiblePendingRowset {rowset_meta, expiration_time, is_empty_rowset}); |
1913 | 124k | } |
1914 | 124k | apply_visible_pending_rowsets(); |
1915 | 124k | committed_rs_mgr.remove_committed_rowset(txn_id, tablet_id()); |
1916 | 124k | } |
1917 | | |
1918 | | void CloudTablet::try_make_committed_rs_visible_for_mow(int64_t txn_id, int64_t visible_version, |
1919 | 55.3k | int64_t version_update_time_ms) { |
1920 | 55.3k | Defer defer {[&] { |
1921 | 55.3k | _engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, tablet_id()); |
1922 | 55.3k | }}; |
1923 | 55.3k | auto res = _engine.txn_delete_bitmap_cache().get_rowset_and_delete_bitmap(txn_id, tablet_id()); |
1924 | 55.3k | if (!res.has_value()) { |
1925 | 461 | return; |
1926 | 461 | } |
1927 | 54.8k | auto [rowset, delete_bitmap] = res.value(); |
1928 | 54.8k | bool is_empty_rowset = (rowset == nullptr); |
1929 | 54.8k | { |
1930 | 54.8k | std::unique_lock lock {_sync_meta_lock}; |
1931 | 54.8k | std::unique_lock meta_wlock {_meta_lock}; |
1932 | 54.8k | if (_max_version + 1 != visible_version) { |
1933 | 399 | return; |
1934 | 399 | } |
1935 | 54.4k | if (is_empty_rowset) { |
1936 | 0 | Versions existing_versions; |
1937 | 0 | for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) { |
1938 | 0 | existing_versions.emplace_back(rs->version()); |
1939 | 0 | } |
1940 | 0 | if (existing_versions.empty()) { |
1941 | 0 | return; |
1942 | 0 | } |
1943 | 0 | auto max_version = std::ranges::max(existing_versions, {}, &Version::first); |
1944 | 0 | auto prev_rowset = get_rowset_by_version(max_version); |
1945 | 0 | auto st = _engine.meta_mgr().create_empty_rowset_for_hole( |
1946 | 0 | this, visible_version, prev_rowset->rowset_meta(), &rowset); |
1947 | 0 | if (!st.ok()) { |
1948 | 0 | return; |
1949 | 0 | } |
1950 | 54.4k | } else { |
1951 | 225k | for (const auto& [delete_bitmap_key, bitmap_value] : delete_bitmap->delete_bitmap) { |
1952 | | // skip sentinel mark, which is used for delete bitmap correctness check |
1953 | 225k | if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) { |
1954 | 9.63k | tablet_meta()->delete_bitmap().merge( |
1955 | 9.63k | {std::get<0>(delete_bitmap_key), std::get<1>(delete_bitmap_key), |
1956 | 9.63k | visible_version}, |
1957 | 9.63k | bitmap_value); |
1958 | 9.63k | } |
1959 | 225k | } |
1960 | 54.4k | } |
1961 | 54.4k | rowset->rowset_meta()->set_cloud_fields_after_visible(visible_version, |
1962 | 54.4k | version_update_time_ms); |
1963 | 54.4k | add_rowsets({rowset}, false, meta_wlock, true); |
1964 | 54.4k | } |
1965 | 54.4k | LOG(INFO) << "mow added visible pending rowset, txn_id=" << txn_id |
1966 | 54.4k | << ", tablet_id=" << tablet_id() << ", version=" << visible_version |
1967 | 54.4k | << ", rowset_id=" << rowset->rowset_id().to_string(); |
1968 | 54.4k | } |
1969 | | |
1970 | 124k | void CloudTablet::apply_visible_pending_rowsets() { |
1971 | 124k | Defer defer {[&] { clear_unused_visible_pending_rowsets(); }}; |
1972 | | |
1973 | 124k | std::unique_lock lock(_sync_meta_lock); |
1974 | 124k | std::unique_lock<std::shared_mutex> meta_wlock(_meta_lock); |
1975 | 124k | int64_t next_version = _max_version + 1; |
1976 | 124k | std::vector<RowsetSharedPtr> to_add; |
1977 | 124k | std::lock_guard<std::mutex> pending_lock(_visible_pending_rs_lock); |
1978 | 124k | for (auto it = _visible_pending_rs_map.upper_bound(_max_version); |
1979 | 248k | it != _visible_pending_rs_map.end(); ++it) { |
1980 | 123k | int64_t version = it->first; |
1981 | 123k | if (version != next_version) break; |
1982 | | |
1983 | 123k | auto& pending_rs = it->second; |
1984 | 123k | if (pending_rs.is_empty_rowset) { |
1985 | 6 | RowsetSharedPtr prev_rowset {nullptr}; |
1986 | 6 | if (!to_add.empty()) { |
1987 | 2 | prev_rowset = to_add.back(); |
1988 | 4 | } else { |
1989 | 4 | Versions existing_versions; |
1990 | 4 | for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) { |
1991 | 2 | existing_versions.emplace_back(rs->version()); |
1992 | 2 | } |
1993 | 4 | if (existing_versions.empty()) { |
1994 | 2 | break; |
1995 | 2 | } |
1996 | 2 | auto max_version = std::ranges::max(existing_versions, {}, &Version::first); |
1997 | 2 | prev_rowset = get_rowset_by_version(max_version); |
1998 | 2 | } |
1999 | 4 | RowsetSharedPtr rowset; |
2000 | 4 | auto st = _engine.meta_mgr().create_empty_rowset_for_hole( |
2001 | 4 | this, version, prev_rowset->rowset_meta(), &rowset); |
2002 | 4 | if (!st.ok()) { |
2003 | 0 | return; |
2004 | 0 | } |
2005 | 4 | to_add.push_back(std::move(rowset)); |
2006 | 123k | } else { |
2007 | 123k | RowsetSharedPtr rowset; |
2008 | 123k | auto st = RowsetFactory::create_rowset(nullptr, "", pending_rs.rowset_meta, &rowset); |
2009 | 123k | if (!st.ok()) { |
2010 | 0 | LOG(WARNING) << "failed to create rowset from pending rowset meta, tablet_id=" |
2011 | 0 | << tablet_id() << ", version=" << version |
2012 | 0 | << ", rowset_id=" << pending_rs.rowset_meta->rowset_id().to_string() |
2013 | 0 | << ", error=" << st; |
2014 | 0 | break; |
2015 | 0 | } |
2016 | 123k | to_add.push_back(std::move(rowset)); |
2017 | 123k | } |
2018 | 123k | next_version++; |
2019 | 123k | } |
2020 | 124k | if (!to_add.empty()) { |
2021 | 123k | add_rowsets(to_add, false, meta_wlock, true); |
2022 | 123k | LOG_INFO( |
2023 | 123k | "applied_visible_pending_rowsets, tablet_id={}, new_max_version={}, " |
2024 | 123k | "count={}, new_rowsets={}", |
2025 | 123k | tablet_id(), _max_version, to_add.size(), |
2026 | 123k | fmt::join(to_add | std::views::transform([](const RowsetSharedPtr& rs) { |
2027 | 123k | return fmt::format("{}{}", rs->rowset_id().to_string(), |
2028 | 123k | rs->version().to_string()); |
2029 | 123k | }), |
2030 | 123k | ",")); |
2031 | 123k | } |
2032 | 124k | } |
2033 | | |
2034 | | #include "common/compile_check_end.h" |
2035 | | |
2036 | | } // namespace doris |