be/src/cloud/cloud_cumulative_compaction_policy.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_cumulative_compaction_policy.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <list> |
22 | | #include <ostream> |
23 | | #include <string> |
24 | | |
25 | | #include "cloud/config.h" |
26 | | #include "common/config.h" |
27 | | #include "common/logging.h" |
28 | | #include "cpp/sync_point.h" |
29 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
30 | | #include "storage/olap_common.h" |
31 | | #include "storage/tablet/tablet.h" |
32 | | #include "storage/tablet/tablet_meta.h" |
33 | | #include "util/defer_op.h" |
34 | | |
35 | | namespace doris { |
36 | | #include "common/compile_check_begin.h" |
37 | | |
38 | | CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy( |
39 | | int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size, |
40 | | int64_t compaction_min_size) |
41 | 142 | : _promotion_size(promotion_size), |
42 | 142 | _promotion_ratio(promotion_ratio), |
43 | 142 | _promotion_min_size(promotion_min_size), |
44 | 142 | _compaction_min_size(compaction_min_size) {} |
45 | | |
46 | 257k | int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { |
47 | 257k | if (size < 1024) return 0; |
48 | 116k | int64_t max_level = (int64_t)1 |
49 | 116k | << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2)); |
50 | 116k | if (size >= max_level) return max_level; |
51 | 116k | return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size)); |
52 | 116k | } |
53 | | |
54 | | void find_longest_consecutive_empty_rowsets(std::vector<RowsetSharedPtr>* result, |
55 | 110k | const std::vector<RowsetSharedPtr>& candidate_rowsets) { |
56 | 110k | std::vector<RowsetSharedPtr> current_sequence; |
57 | 110k | std::vector<RowsetSharedPtr> longest_sequence; |
58 | | |
59 | 547k | for (size_t i = 0; i < candidate_rowsets.size(); ++i) { |
60 | 437k | auto& rowset = candidate_rowsets[i]; |
61 | | |
62 | | // Check if rowset is empty and has no delete predicate |
63 | 437k | if (rowset->num_segments() == 0 && !rowset->rowset_meta()->has_delete_predicate()) { |
64 | | // Check if this is consecutive with previous rowset |
65 | 245k | if (current_sequence.empty() || |
66 | 245k | (current_sequence.back()->end_version() == rowset->start_version() - 1)) { |
67 | 245k | current_sequence.push_back(rowset); |
68 | 245k | } else { |
69 | | // Start new sequence if not consecutive |
70 | 0 | if (current_sequence.size() > longest_sequence.size()) { |
71 | 0 | longest_sequence = current_sequence; |
72 | 0 | } |
73 | 0 | current_sequence.clear(); |
74 | 0 | current_sequence.push_back(rowset); |
75 | 0 | } |
76 | 245k | } else { |
77 | | // Non-empty rowset, check if we have a sequence to compare |
78 | 192k | if (current_sequence.size() > longest_sequence.size()) { |
79 | 26.7k | longest_sequence = current_sequence; |
80 | 26.7k | } |
81 | 192k | current_sequence.clear(); |
82 | 192k | } |
83 | 437k | } |
84 | | |
85 | | // Check final sequence |
86 | 110k | if (current_sequence.size() > longest_sequence.size()) { |
87 | 53.4k | longest_sequence = current_sequence; |
88 | 53.4k | } |
89 | | |
90 | 110k | *result = longest_sequence; |
91 | 110k | } |
92 | | |
93 | | int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( |
94 | | CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, |
95 | | const int64_t max_compaction_score, const int64_t min_compaction_score, |
96 | | std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, |
97 | 110k | size_t* compaction_score, bool allow_delete) { |
98 | 110k | DBUG_EXECUTE_IF( |
99 | 110k | "CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { |
100 | 110k | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
101 | 110k | if (target_tablet_id == tablet->tablet_id()) { |
102 | 110k | auto start_version = dp->param<int64_t>("start_version", -1); |
103 | 110k | auto end_version = dp->param<int64_t>("end_version", -1); |
104 | 110k | for (auto& rowset : candidate_rowsets) { |
105 | 110k | if (rowset->start_version() >= start_version && |
106 | 110k | rowset->end_version() <= end_version) { |
107 | 110k | input_rowsets->push_back(rowset); |
108 | 110k | } |
109 | 110k | } |
110 | 110k | LOG_INFO( |
111 | 110k | "[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_" |
112 | 110k | "input_rowsets] tablet_id={}, start={}, end={}, " |
113 | 110k | "input_rowsets->size()={}", |
114 | 110k | target_tablet_id, start_version, end_version, input_rowsets->size()); |
115 | 110k | return input_rowsets->size(); |
116 | 110k | } |
117 | 110k | }) |
118 | | |
119 | 110k | size_t promotion_size = cloud_promotion_size(tablet); |
120 | 110k | auto max_version = tablet->max_version().first; |
121 | 110k | int transient_size = 0; |
122 | 110k | *compaction_score = 0; |
123 | 110k | int64_t total_size = 0; |
124 | 110k | bool skip_trim = false; // Skip trim for Empty Rowset Compaction |
125 | | |
126 | | // DEFER: trim input_rowsets from back if score > max_compaction_score |
127 | | // This ensures we don't return more rowsets than allowed by max_compaction_score, |
128 | | // while still collecting enough rowsets to pass min_compaction_score check after level_size removal. |
129 | | // Must be placed after variable initialization and before collection loop. |
130 | 110k | DEFER({ |
131 | 110k | if (skip_trim) { |
132 | 110k | return; |
133 | 110k | } |
134 | | // Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch) |
135 | 110k | while (input_rowsets->size() > 1 && |
136 | 110k | *compaction_score > static_cast<size_t>(max_compaction_score)) { |
137 | 110k | auto& last_rowset = input_rowsets->back(); |
138 | 110k | *compaction_score -= last_rowset->rowset_meta()->get_compaction_score(); |
139 | 110k | total_size -= last_rowset->rowset_meta()->total_disk_size(); |
140 | 110k | input_rowsets->pop_back(); |
141 | 110k | } |
142 | 110k | }); |
143 | | |
144 | 440k | for (auto& rowset : candidate_rowsets) { |
145 | | // check whether this rowset is delete version |
146 | 440k | if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) { |
147 | 1.19k | *last_delete_version = rowset->version(); |
148 | 1.19k | if (!input_rowsets->empty()) { |
149 | | // we meet a delete version, and there were other versions before. |
150 | | // we should compact those version before handling them over to base compaction |
151 | 747 | break; |
152 | 747 | } else { |
153 | | // we meet a delete version, and no other versions before, skip it and continue |
154 | 451 | input_rowsets->clear(); |
155 | 451 | *compaction_score = 0; |
156 | 451 | transient_size = 0; |
157 | 451 | continue; |
158 | 451 | } |
159 | 1.19k | } |
160 | 439k | if (tablet->tablet_state() == TABLET_NOTREADY) { |
161 | | // If tablet under alter, keep latest 10 version so that base tablet max version |
162 | | // not merged in new tablet, and then we can copy data from base tablet |
163 | 14 | if (rowset->version().second < max_version - 10) { |
164 | 0 | continue; |
165 | 0 | } |
166 | 14 | } |
167 | | // Removed: max_compaction_score check here |
168 | | // We now collect all candidate rowsets and trim from back at return time via DEFER |
169 | 439k | *compaction_score += rowset->rowset_meta()->get_compaction_score(); |
170 | 439k | total_size += rowset->rowset_meta()->total_disk_size(); |
171 | | |
172 | 439k | transient_size += 1; |
173 | 439k | input_rowsets->push_back(rowset); |
174 | 439k | } |
175 | | |
176 | 110k | if (total_size >= promotion_size) { |
177 | 0 | return transient_size; |
178 | 0 | } |
179 | | |
180 | | // if there is delete version, do compaction directly |
181 | 110k | if (last_delete_version->first != -1) { |
182 | 797 | if (input_rowsets->size() == 1) { |
183 | 105 | auto rs_meta = input_rowsets->front()->rowset_meta(); |
184 | | // if there is only one rowset and not overlapping, |
185 | | // we do not need to do cumulative compaction |
186 | 105 | if (!rs_meta->is_segments_overlapping()) { |
187 | 105 | input_rowsets->clear(); |
188 | 105 | *compaction_score = 0; |
189 | 105 | } |
190 | 105 | } |
191 | 797 | return transient_size; |
192 | 797 | } |
193 | | |
194 | | // Check if empty rowset compaction strategy is enabled |
195 | 110k | if (config::enable_empty_rowset_compaction && !input_rowsets->empty()) { |
196 | | // Check if input_rowsets contain consecutive empty rowsets that meet criteria |
197 | 110k | std::vector<RowsetSharedPtr> consecutive_empty_rowsets; |
198 | 110k | find_longest_consecutive_empty_rowsets(&consecutive_empty_rowsets, *input_rowsets); |
199 | | |
200 | 110k | if (!consecutive_empty_rowsets.empty() && |
201 | 110k | consecutive_empty_rowsets.size() >= config::empty_rowset_compaction_min_count && |
202 | 110k | static_cast<double>(consecutive_empty_rowsets.size()) / |
203 | 4.04k | static_cast<double>(input_rowsets->size()) >= |
204 | 4.04k | config::empty_rowset_compaction_min_ratio) { |
205 | | // Prioritize consecutive empty rowset compaction |
206 | | // Skip trim: empty rowset compaction has very low cost and the goal is to reduce rowset count |
207 | 3.88k | *input_rowsets = consecutive_empty_rowsets; |
208 | 3.88k | *compaction_score = consecutive_empty_rowsets.size(); |
209 | 3.88k | skip_trim = true; |
210 | 3.88k | return consecutive_empty_rowsets.size(); |
211 | 3.88k | } |
212 | 110k | } |
213 | | |
214 | 106k | auto rs_begin = input_rowsets->begin(); |
215 | 106k | size_t new_compaction_score = *compaction_score; |
216 | 135k | while (rs_begin != input_rowsets->end()) { |
217 | 128k | auto& rs_meta = (*rs_begin)->rowset_meta(); |
218 | 128k | int64_t current_level = _level_size(rs_meta->total_disk_size()); |
219 | 128k | int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size()); |
220 | | // if current level less then remain level, input rowsets contain current rowset |
221 | | // and process return; otherwise, input rowsets do not contain current rowset. |
222 | 128k | if (current_level <= remain_level) { |
223 | 99.2k | break; |
224 | 99.2k | } |
225 | 29.4k | total_size -= rs_meta->total_disk_size(); |
226 | 29.4k | new_compaction_score -= rs_meta->get_compaction_score(); |
227 | 29.4k | ++rs_begin; |
228 | 29.4k | } |
229 | 106k | if (rs_begin == input_rowsets->end()) { // No suitable level size found in `input_rowsets` |
230 | 6.90k | if (config::prioritize_query_perf_in_compaction && tablet->keys_type() != DUP_KEYS) { |
231 | | // While tablet's key type is not `DUP_KEYS`, compacting rowset in such tablets has a significant |
232 | | // positive impact on queries and reduces space amplification, so we ignore level limitation and |
233 | | // pick candidate rowsets as input rowsets. |
234 | 1.95k | return transient_size; |
235 | 4.94k | } else if (*compaction_score >= max_compaction_score) { |
236 | | // Score of `input_rowsets` exceed max compaction score, which means `input_rowsets` will never change and |
237 | | // this tablet will never execute cumulative compaction. MUST execute compaction on these `input_rowsets` |
238 | | // to reduce compaction score. |
239 | 0 | RowsetSharedPtr rs_with_max_score; |
240 | 0 | uint32_t max_score = 1; |
241 | 0 | for (auto& rs : *input_rowsets) { |
242 | 0 | if (rs->rowset_meta()->get_compaction_score() > max_score) { |
243 | 0 | max_score = rs->rowset_meta()->get_compaction_score(); |
244 | 0 | rs_with_max_score = rs; |
245 | 0 | } |
246 | 0 | } |
247 | 0 | if (rs_with_max_score) { |
248 | 0 | input_rowsets->clear(); |
249 | 0 | input_rowsets->push_back(std::move(rs_with_max_score)); |
250 | 0 | *compaction_score = max_score; |
251 | 0 | return transient_size; |
252 | 0 | } |
253 | | // no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score) |
254 | 0 | return transient_size; |
255 | 0 | } |
256 | 6.90k | } |
257 | 104k | input_rowsets->erase(input_rowsets->begin(), rs_begin); |
258 | 104k | *compaction_score = new_compaction_score; |
259 | | |
260 | 104k | VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = " |
261 | 0 | << *compaction_score << ", total_size = " << total_size |
262 | 0 | << ", calc promotion size value = " << promotion_size |
263 | 0 | << ", tablet = " << tablet->tablet_id() << ", input_rowset size " |
264 | 0 | << input_rowsets->size(); |
265 | | |
266 | | // empty return |
267 | 104k | if (input_rowsets->empty()) { |
268 | 4.94k | return transient_size; |
269 | 4.94k | } |
270 | | |
271 | | // if we have a sufficient number of segments, we should process the compaction. |
272 | | // otherwise, we check number of segments and total_size whether can do compaction. |
273 | 99.2k | if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) { |
274 | 93.8k | input_rowsets->clear(); |
275 | 93.8k | *compaction_score = 0; |
276 | 93.8k | } else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) { |
277 | 0 | auto rs_meta = input_rowsets->front()->rowset_meta(); |
278 | | // if there is only one rowset and not overlapping, |
279 | | // we do not need to do compaction |
280 | 0 | if (!rs_meta->is_segments_overlapping()) { |
281 | 0 | input_rowsets->clear(); |
282 | 0 | *compaction_score = 0; |
283 | 0 | } |
284 | 0 | } |
285 | 99.2k | return transient_size; |
286 | 104k | } |
287 | | |
288 | 119k | int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const { |
289 | 119k | int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio); |
290 | | // promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size |
291 | 119k | return promotion_size > _promotion_size ? _promotion_size |
292 | 119k | : promotion_size < _promotion_min_size ? _promotion_min_size |
293 | 18.4E | : promotion_size; |
294 | 119k | } |
295 | | |
296 | | int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point( |
297 | | CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version, |
298 | 9.60k | int64_t last_cumulative_point) { |
299 | 9.60k | TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), output_rowset.get(), |
300 | 9.60k | last_cumulative_point); |
301 | 9.60k | DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", { |
302 | 9.60k | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
303 | 9.60k | auto cumu_point = dp->param<int64_t>("cumu_point", -1); |
304 | 9.60k | if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) { |
305 | 9.60k | LOG_INFO( |
306 | 9.60k | "[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] " |
307 | 9.60k | "tablet_id={}, cumu_point={}", |
308 | 9.60k | target_tablet_id, cumu_point); |
309 | 9.60k | return cumu_point; |
310 | 9.60k | } |
311 | 9.60k | }); |
312 | | // for MoW table, if there's too many versions, the delete bitmap will grow to |
313 | | // a very big size, which may cause the tablet meta too big and the `save_meta` |
314 | | // operation too slow. |
315 | | // if the rowset should not promotion according to it's disk size, we should also |
316 | | // consider it's version count here. |
317 | 9.60k | bool satisfy_promotion_version = tablet->enable_unique_key_merge_on_write() && |
318 | 9.60k | output_rowset->end_version() - output_rowset->start_version() > |
319 | 4.07k | config::compaction_promotion_version_count; |
320 | | // if rowsets have delete version, move to the last directly. |
321 | | // if rowsets have no delete version, check output_rowset total disk size satisfies promotion size. |
322 | 9.60k | return (last_delete_version.first != -1 || |
323 | 9.60k | output_rowset->total_disk_size() >= cloud_promotion_size(tablet) || |
324 | 9.60k | satisfy_promotion_version) |
325 | 9.60k | ? output_rowset->end_version() + 1 |
326 | 9.60k | : last_cumulative_point; |
327 | 9.60k | } |
328 | | |
329 | | int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( |
330 | | CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, |
331 | | const int64_t max_compaction_score, const int64_t min_compaction_score, |
332 | | std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, |
333 | 4 | size_t* compaction_score, bool allow_delete) { |
334 | 4 | int64_t last_cumu = tablet->last_cumu_compaction_success_time(); |
335 | 4 | return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets( |
336 | 4 | tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score, |
337 | 4 | input_rowsets, last_delete_version, compaction_score, allow_delete); |
338 | 4 | } |
339 | | |
340 | | int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level( |
341 | | CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, |
342 | 3 | RowsetSharedPtr output_rowset) { |
343 | 3 | return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet, |
344 | 3 | input_rowsets, output_rowset); |
345 | 3 | } |
346 | | |
347 | | int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point( |
348 | | CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version, |
349 | 3 | int64_t last_cumulative_point) { |
350 | 3 | if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) { |
351 | 3 | return last_cumulative_point; |
352 | 3 | } |
353 | | |
354 | 0 | if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 && |
355 | 0 | output_rowset->rowset_meta()->compaction_level() < 2) { |
356 | 0 | return last_cumulative_point; |
357 | 0 | } |
358 | | |
359 | 0 | return output_rowset->end_version() + 1; |
360 | 0 | } |
361 | | |
362 | | #include "common/compile_check_end.h" |
363 | | } // namespace doris |