be/src/storage/compaction/cumulative_compaction_policy.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/compaction/cumulative_compaction_policy.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <list> |
22 | | #include <ostream> |
23 | | #include <string> |
24 | | |
25 | | #include "common/config.h" |
26 | | #include "common/logging.h" |
27 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
28 | | #include "storage/olap_common.h" |
29 | | #include "storage/tablet/tablet.h" |
30 | | #include "storage/tablet/tablet_meta.h" |
31 | | #include "util/debug_points.h" |
32 | | #include "util/defer_op.h" |
33 | | |
34 | | namespace doris { |
35 | | #include "common/compile_check_begin.h" |
36 | | |
37 | | SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy( |
38 | | int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size, |
39 | | int64_t promotion_version_count, int64_t compaction_min_size) |
40 | 580 | : _promotion_size(promotion_size), |
41 | 580 | _promotion_ratio(promotion_ratio), |
42 | 580 | _promotion_min_size(promotion_min_size), |
43 | 580 | _promotion_version_count(promotion_version_count), |
44 | 580 | _compaction_min_size(compaction_min_size) {} |
45 | | |
46 | | void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point( |
47 | | Tablet* tablet, const RowsetMetaMapContainer& all_metas, int64_t current_cumulative_point, |
48 | 69 | int64_t* ret_cumulative_point) { |
49 | 69 | *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT; |
50 | 69 | if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) { |
51 | | // only calculate the point once. |
52 | | // after that, cumulative point will be updated along with compaction process. |
53 | 21 | return; |
54 | 21 | } |
55 | | // empty return |
56 | 48 | if (all_metas.empty()) { |
57 | 0 | return; |
58 | 0 | } |
59 | | |
60 | 48 | std::list<RowsetMetaSharedPtr> existing_rss; |
61 | 3.05k | for (const auto& [_, rs] : all_metas) { |
62 | 3.05k | existing_rss.emplace_back(rs); |
63 | 3.05k | } |
64 | | |
65 | | // sort the existing rowsets by version in ascending order |
66 | 18.5k | existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) { |
67 | | // simple because 2 versions are certainly not overlapping |
68 | 18.5k | return a->version().first < b->version().first; |
69 | 18.5k | }); |
70 | | |
71 | | // calculate promotion size |
72 | 48 | auto base_rowset_meta = existing_rss.begin(); |
73 | | |
74 | 48 | if (tablet->tablet_state() == TABLET_RUNNING) { |
75 | | // check base rowset first version must be zero |
76 | | // for tablet which state is not TABLET_RUNNING, there may not have base version. |
77 | 48 | CHECK((*base_rowset_meta)->start_version() == 0); |
78 | | |
79 | 48 | int64_t promotion_size = 0; |
80 | 48 | _calc_promotion_size(tablet, *base_rowset_meta, &promotion_size); |
81 | | |
82 | 48 | int64_t prev_version = -1; |
83 | 98 | for (const RowsetMetaSharedPtr& rs : existing_rss) { |
84 | 98 | if (rs->version().first > prev_version + 1) { |
85 | | // There is a hole, do not continue |
86 | 0 | break; |
87 | 0 | } |
88 | | |
89 | 98 | bool is_delete = rs->has_delete_predicate(); |
90 | | |
91 | | // break the loop if segments in this rowset is overlapping. |
92 | 98 | if (!is_delete && rs->is_segments_overlapping()) { |
93 | 12 | *ret_cumulative_point = rs->version().first; |
94 | 12 | break; |
95 | 12 | } |
96 | | |
97 | | // check the rowset is whether less than promotion size |
98 | 86 | if (!is_delete && rs->version().first != 0 && rs->total_disk_size() < promotion_size) { |
99 | 35 | *ret_cumulative_point = rs->version().first; |
100 | 35 | break; |
101 | 35 | } |
102 | | |
103 | | // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase |
104 | 51 | prev_version = rs->version().second; |
105 | 51 | *ret_cumulative_point = prev_version + 1; |
106 | 51 | } |
107 | 48 | VLOG_NOTICE |
108 | 0 | << "cumulative compaction size_based policy, calculate cumulative point value = " |
109 | 0 | << *ret_cumulative_point << ", calc promotion size value = " << promotion_size |
110 | 0 | << " tablet = " << tablet->tablet_id(); |
111 | 48 | } else if (tablet->tablet_state() == TABLET_NOTREADY) { |
112 | | // tablet under alter process |
113 | | // we choose version next to the base version as cumulative point |
114 | 0 | for (const RowsetMetaSharedPtr& rs : existing_rss) { |
115 | 0 | if (rs->version().first > 0) { |
116 | 0 | *ret_cumulative_point = rs->version().first; |
117 | 0 | break; |
118 | 0 | } |
119 | 0 | } |
120 | 0 | } |
121 | 48 | } |
122 | | |
123 | | void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(Tablet* tablet, |
124 | | RowsetMetaSharedPtr base_rowset_meta, |
125 | 79 | int64_t* promotion_size) { |
126 | 79 | int64_t base_size = base_rowset_meta->total_disk_size(); |
127 | 79 | *promotion_size = int64_t(cast_set<double>(base_size) * _promotion_ratio); |
128 | | |
129 | | // promotion_size is between _promotion_size and _promotion_min_size |
130 | 79 | if (*promotion_size >= _promotion_size) { |
131 | 8 | *promotion_size = _promotion_size; |
132 | 71 | } else if (*promotion_size <= _promotion_min_size) { |
133 | 71 | *promotion_size = _promotion_min_size; |
134 | 71 | } |
135 | 79 | _refresh_tablet_promotion_size(tablet, *promotion_size); |
136 | 79 | } |
137 | | |
138 | | void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_promotion_size(Tablet* tablet, |
139 | 79 | int64_t promotion_size) { |
140 | 79 | tablet->set_cumulative_promotion_size(promotion_size); |
141 | 79 | } |
142 | | |
143 | | void SizeBasedCumulativeCompactionPolicy::update_cumulative_point( |
144 | | Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, |
145 | 0 | RowsetSharedPtr output_rowset, Version& last_delete_version) { |
146 | 0 | if (tablet->tablet_state() != TABLET_RUNNING) { |
147 | | // if tablet under alter process, do not update cumulative point |
148 | 0 | return; |
149 | 0 | } |
150 | | // if rowsets have delete version, move to the last directly |
151 | 0 | if (last_delete_version.first != -1) { |
152 | 0 | tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); |
153 | 0 | } else { |
154 | | // if rowsets have no delete version, check output_rowset total disk size |
155 | | // satisfies promotion size. |
156 | 0 | size_t total_size = output_rowset->rowset_meta()->total_disk_size(); |
157 | 0 | if (total_size >= tablet->cumulative_promotion_size()) { |
158 | 0 | tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); |
159 | 0 | } else if (tablet->enable_unique_key_merge_on_write() && |
160 | 0 | output_rowset->end_version() - output_rowset->start_version() > |
161 | 0 | _promotion_version_count) { |
162 | | // for MoW table, if there's too many versions, the delete bitmap will grow to |
163 | | // a very big size, which may cause the tablet meta too big and the `save_meta` |
164 | | // operation too slow. |
165 | | // if the rowset should not promotion according to it's disk size, we should also |
166 | | // consider it's version count here. |
167 | 0 | tablet->set_cumulative_layer_point(output_rowset->end_version() + 1); |
168 | 0 | } |
169 | 0 | } |
170 | 0 | } |
171 | | |
172 | 31 | uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) { |
173 | 31 | uint32_t score = 0; |
174 | 31 | bool base_rowset_exist = false; |
175 | 31 | const int64_t point = tablet->cumulative_layer_point(); |
176 | 31 | int64_t promotion_size = 0; |
177 | | |
178 | 31 | std::vector<RowsetMetaSharedPtr> rowset_to_compact; |
179 | 31 | int64_t total_size = 0; |
180 | | |
181 | 31 | RowsetMetaSharedPtr first_meta; |
182 | 31 | int64_t first_version = INT64_MAX; |
183 | | // NOTE: tablet._meta_lock is hold |
184 | 31 | auto& rs_metas = tablet->tablet_meta()->all_rs_metas(); |
185 | | // check the base rowset and collect the rowsets of cumulative part |
186 | 2.26k | for (const auto& [_, rs_meta] : rs_metas) { |
187 | 2.26k | if (rs_meta->start_version() < first_version) { |
188 | 570 | first_version = rs_meta->start_version(); |
189 | 570 | first_meta = rs_meta; |
190 | 570 | } |
191 | | // check base rowset |
192 | 2.26k | if (rs_meta->start_version() == 0) { |
193 | 31 | base_rowset_exist = true; |
194 | 31 | } |
195 | 2.26k | if (rs_meta->end_version() < point || !rs_meta->is_local()) { |
196 | | // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here. |
197 | 31 | continue; |
198 | 2.22k | } else { |
199 | | // collect the rowsets of cumulative part |
200 | 2.22k | total_size += rs_meta->total_disk_size(); |
201 | 2.22k | score += rs_meta->get_compaction_score(); |
202 | 2.22k | rowset_to_compact.push_back(rs_meta); |
203 | 2.22k | } |
204 | 2.26k | } |
205 | | |
206 | 31 | if (first_meta == nullptr) { |
207 | 0 | return 0; |
208 | 0 | } |
209 | | |
210 | | // Use "first"(not base) version to calc promotion size |
211 | | // because some tablet do not have base version(under alter operation) |
212 | 31 | _calc_promotion_size(tablet, first_meta, &promotion_size); |
213 | | |
214 | | // If base version does not exist, but its state is RUNNING. |
215 | | // It is abnormal, do not select it and set *score = 0 |
216 | 31 | if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) { |
217 | 0 | LOG(WARNING) << "tablet state is running but have no base version"; |
218 | 0 | return 0; |
219 | 0 | } |
220 | | |
221 | | // if total_size is greater than promotion_size, return total score |
222 | 31 | if (total_size >= promotion_size) { |
223 | 0 | return score; |
224 | 0 | } |
225 | | |
226 | | // sort the rowsets of cumulative part |
227 | 31 | std::sort(rowset_to_compact.begin(), rowset_to_compact.end(), RowsetMeta::comparator); |
228 | | |
229 | | // calculate the rowsets to do cumulative compaction |
230 | | // eg: size of rowset_to_compact are: |
231 | | // 128, 16, 16, 16 |
232 | | // we will choose [16,16,16] to compact. |
233 | 31 | for (auto& rs_meta : rowset_to_compact) { |
234 | 31 | int64_t current_level = _level_size(rs_meta->total_disk_size()); |
235 | 31 | int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size()); |
236 | | // if current level less then remain level, score contains current rowset |
237 | | // and process return; otherwise, score does not contains current rowset. |
238 | 31 | if (current_level <= remain_level) { |
239 | 31 | return score; |
240 | 31 | } |
241 | 0 | total_size -= rs_meta->total_disk_size(); |
242 | 0 | score -= rs_meta->get_compaction_score(); |
243 | 0 | } |
244 | 0 | return score; |
245 | 31 | } |
246 | | |
247 | | int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( |
248 | | Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, |
249 | | const int64_t max_compaction_score, const int64_t min_compaction_score, |
250 | | std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version, |
251 | 61 | size_t* compaction_score, bool allow_delete) { |
252 | 61 | DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", { |
253 | 61 | auto target_tablet_id = dp->param<int64_t>("tablet_id", -1); |
254 | 61 | if (target_tablet_id == tablet->tablet_id()) { |
255 | 61 | auto start_version = dp->param<int64_t>("start_version", -1); |
256 | 61 | auto end_version = dp->param<int64_t>("end_version", -1); |
257 | 61 | for (auto& rowset : candidate_rowsets) { |
258 | 61 | if (rowset->start_version() >= start_version && |
259 | 61 | rowset->end_version() <= end_version) { |
260 | 61 | input_rowsets->push_back(rowset); |
261 | 61 | } |
262 | 61 | } |
263 | 61 | } |
264 | 61 | return cast_set<uint32_t>(input_rowsets->size()); |
265 | 61 | }) |
266 | | |
267 | 61 | size_t promotion_size = tablet->cumulative_promotion_size(); |
268 | 61 | auto max_version = tablet->max_version().first; |
269 | 61 | int transient_size = 0; |
270 | 61 | *compaction_score = 0; |
271 | 61 | int64_t total_size = 0; |
272 | | |
273 | | // DEFER: trim input_rowsets from back if score > max_compaction_score |
274 | | // This ensures we don't return more rowsets than allowed by max_compaction_score, |
275 | | // while still collecting enough rowsets to pass min_compaction_score check after level_size removal. |
276 | | // Must be placed after variable initialization and before collection loop. |
277 | 61 | DEFER({ |
278 | | // Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch) |
279 | 61 | while (input_rowsets->size() > 1 && |
280 | 61 | *compaction_score > static_cast<size_t>(max_compaction_score)) { |
281 | 61 | auto& last_rowset = input_rowsets->back(); |
282 | 61 | *compaction_score -= last_rowset->rowset_meta()->get_compaction_score(); |
283 | 61 | total_size -= last_rowset->rowset_meta()->total_disk_size(); |
284 | 61 | input_rowsets->pop_back(); |
285 | 61 | } |
286 | 61 | }); |
287 | | |
288 | 3.54k | for (auto& rowset : candidate_rowsets) { |
289 | | // check whether this rowset is delete version |
290 | 3.54k | if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) { |
291 | 3 | *last_delete_version = rowset->version(); |
292 | 3 | if (!input_rowsets->empty()) { |
293 | | // we meet a delete version, and there were other versions before. |
294 | | // we should compact those version before handling them over to base compaction |
295 | 3 | break; |
296 | 3 | } else { |
297 | | // we meet a delete version, and no other versions before, skip it and continue |
298 | 0 | input_rowsets->clear(); |
299 | 0 | *compaction_score = 0; |
300 | 0 | transient_size = 0; |
301 | 0 | continue; |
302 | 0 | } |
303 | 3 | } |
304 | 3.54k | if (tablet->tablet_state() == TABLET_NOTREADY) { |
305 | | // If tablet under alter, keep latest 10 version so that base tablet max version |
306 | | // not merged in new tablet, and then we can copy data from base tablet |
307 | 0 | if (rowset->version().second < max_version - 10) { |
308 | 0 | continue; |
309 | 0 | } |
310 | 0 | } |
311 | | // Removed: max_compaction_score check here |
312 | | // We now collect all candidate rowsets and trim from back at return time via DEFER |
313 | 3.54k | *compaction_score += rowset->rowset_meta()->get_compaction_score(); |
314 | 3.54k | total_size += rowset->rowset_meta()->total_disk_size(); |
315 | | |
316 | 3.54k | transient_size += 1; |
317 | 3.54k | input_rowsets->push_back(rowset); |
318 | 3.54k | } |
319 | 61 | DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets", |
320 | 61 | { return transient_size; }) |
321 | | |
322 | 61 | if (total_size >= promotion_size) { |
323 | 37 | return transient_size; |
324 | 37 | } |
325 | | |
326 | | // if there is delete version, do compaction directly |
327 | 24 | if (last_delete_version->first != -1) { |
328 | 1 | if (input_rowsets->size() == 1) { |
329 | 0 | auto rs_meta = input_rowsets->front()->rowset_meta(); |
330 | | // if there is only one rowset and not overlapping, |
331 | | // we do not need to do cumulative compaction |
332 | 0 | if (!rs_meta->is_segments_overlapping()) { |
333 | 0 | input_rowsets->clear(); |
334 | 0 | *compaction_score = 0; |
335 | 0 | } |
336 | 0 | } |
337 | 1 | return transient_size; |
338 | 1 | } |
339 | | |
340 | 23 | auto rs_begin = input_rowsets->begin(); |
341 | 23 | size_t new_compaction_score = *compaction_score; |
342 | 46 | while (rs_begin != input_rowsets->end()) { |
343 | 42 | auto& rs_meta = (*rs_begin)->rowset_meta(); |
344 | 42 | int64_t current_level = _level_size(rs_meta->total_disk_size()); |
345 | 42 | int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size()); |
346 | | // if current level less then remain level, input rowsets contain current rowset |
347 | | // and process return; otherwise, input rowsets do not contain current rowset. |
348 | 42 | if (current_level <= remain_level) { |
349 | 19 | break; |
350 | 19 | } |
351 | 23 | total_size -= rs_meta->total_disk_size(); |
352 | 23 | new_compaction_score -= rs_meta->get_compaction_score(); |
353 | 23 | ++rs_begin; |
354 | 23 | } |
355 | 23 | if (rs_begin == input_rowsets->end() && *compaction_score >= max_compaction_score) { |
356 | | // No suitable level size found in `input_rowsets` but score of `input_rowsets` exceed max compaction score, |
357 | | // which means `input_rowsets` will never change and this tablet will never execute cumulative compaction. |
358 | | // MUST execute compaction on these `input_rowsets` to reduce compaction score. |
359 | 2 | RowsetSharedPtr rs_with_max_score; |
360 | 2 | uint32_t max_score = 1; |
361 | 7 | for (auto& rs : *input_rowsets) { |
362 | 7 | if (rs->rowset_meta()->get_compaction_score() > max_score) { |
363 | 1 | max_score = rs->rowset_meta()->get_compaction_score(); |
364 | 1 | rs_with_max_score = rs; |
365 | 1 | } |
366 | 7 | } |
367 | 2 | if (rs_with_max_score) { |
368 | 1 | input_rowsets->clear(); |
369 | 1 | input_rowsets->push_back(std::move(rs_with_max_score)); |
370 | 1 | *compaction_score = max_score; |
371 | 1 | return transient_size; |
372 | 1 | } |
373 | | // no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score) |
374 | 1 | return transient_size; |
375 | 2 | } |
376 | 21 | input_rowsets->erase(input_rowsets->begin(), rs_begin); |
377 | 21 | *compaction_score = new_compaction_score; |
378 | | |
379 | 21 | VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = " |
380 | 0 | << *compaction_score << ", total_size = " << total_size |
381 | 0 | << ", calc promotion size value = " << promotion_size |
382 | 0 | << ", tablet = " << tablet->tablet_id() << ", input_rowset size " |
383 | 0 | << input_rowsets->size(); |
384 | | |
385 | | // empty return |
386 | 21 | if (input_rowsets->empty()) { |
387 | 2 | return transient_size; |
388 | 2 | } |
389 | | |
390 | | // if we have a sufficient number of segments, we should process the compaction. |
391 | | // otherwise, we check number of segments and total_size whether can do compaction. |
392 | 19 | if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) { |
393 | 4 | input_rowsets->clear(); |
394 | 4 | *compaction_score = 0; |
395 | 15 | } else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) { |
396 | 0 | auto rs_meta = input_rowsets->front()->rowset_meta(); |
397 | | // if there is only one rowset and not overlapping, |
398 | | // we do not need to do compaction |
399 | 0 | if (!rs_meta->is_segments_overlapping()) { |
400 | 0 | input_rowsets->clear(); |
401 | 0 | *compaction_score = 0; |
402 | 0 | } |
403 | 0 | } |
404 | 19 | return transient_size; |
405 | 21 | } |
406 | | |
407 | 150 | int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) { |
408 | 150 | if (size < 1024) return 0; |
409 | 70 | int64_t max_level = (int64_t)1 |
410 | 70 | << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2)); |
411 | 70 | if (size >= max_level) return max_level; |
412 | 69 | return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size)); |
413 | 70 | } |
414 | | |
415 | | std::shared_ptr<CumulativeCompactionPolicy> |
416 | | CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
417 | 595 | const std::string_view& compaction_policy) { |
418 | 595 | if (compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) { |
419 | 15 | return std::make_shared<TimeSeriesCumulativeCompactionPolicy>(); |
420 | 580 | } else if (compaction_policy == CUMULATIVE_SIZE_BASED_POLICY) { |
421 | 521 | return std::make_shared<SizeBasedCumulativeCompactionPolicy>(); |
422 | 521 | } |
423 | 59 | return std::make_shared<SizeBasedCumulativeCompactionPolicy>(); |
424 | 595 | } |
425 | | #include "common/compile_check_end.h" |
426 | | } // namespace doris |