Coverage Report

Created: 2026-03-16 13:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_cumulative_compaction_policy.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_cumulative_compaction_policy.h"
19
20
#include <algorithm>
21
#include <list>
22
#include <ostream>
23
#include <string>
24
25
#include "cloud/config.h"
26
#include "common/config.h"
27
#include "common/logging.h"
28
#include "cpp/sync_point.h"
29
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
30
#include "storage/olap_common.h"
31
#include "storage/tablet/tablet.h"
32
#include "storage/tablet/tablet_meta.h"
33
#include "util/defer_op.h"
34
35
namespace doris {
36
#include "common/compile_check_begin.h"
37
38
CloudSizeBasedCumulativeCompactionPolicy::CloudSizeBasedCumulativeCompactionPolicy(
39
        int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
40
        int64_t compaction_min_size)
41
142
        : _promotion_size(promotion_size),
42
142
          _promotion_ratio(promotion_ratio),
43
142
          _promotion_min_size(promotion_min_size),
44
142
          _compaction_min_size(compaction_min_size) {}
45
46
257k
int64_t CloudSizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
47
257k
    if (size < 1024) return 0;
48
116k
    int64_t max_level = (int64_t)1
49
116k
                        << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2));
50
116k
    if (size >= max_level) return max_level;
51
116k
    return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
52
116k
}
53
54
void find_longest_consecutive_empty_rowsets(std::vector<RowsetSharedPtr>* result,
55
110k
                                            const std::vector<RowsetSharedPtr>& candidate_rowsets) {
56
110k
    std::vector<RowsetSharedPtr> current_sequence;
57
110k
    std::vector<RowsetSharedPtr> longest_sequence;
58
59
547k
    for (size_t i = 0; i < candidate_rowsets.size(); ++i) {
60
437k
        auto& rowset = candidate_rowsets[i];
61
62
        // Check if rowset is empty and has no delete predicate
63
437k
        if (rowset->num_segments() == 0 && !rowset->rowset_meta()->has_delete_predicate()) {
64
            // Check if this is consecutive with previous rowset
65
245k
            if (current_sequence.empty() ||
66
245k
                (current_sequence.back()->end_version() == rowset->start_version() - 1)) {
67
245k
                current_sequence.push_back(rowset);
68
245k
            } else {
69
                // Start new sequence if not consecutive
70
0
                if (current_sequence.size() > longest_sequence.size()) {
71
0
                    longest_sequence = current_sequence;
72
0
                }
73
0
                current_sequence.clear();
74
0
                current_sequence.push_back(rowset);
75
0
            }
76
245k
        } else {
77
            // Non-empty rowset, check if we have a sequence to compare
78
192k
            if (current_sequence.size() > longest_sequence.size()) {
79
26.7k
                longest_sequence = current_sequence;
80
26.7k
            }
81
192k
            current_sequence.clear();
82
192k
        }
83
437k
    }
84
85
    // Check final sequence
86
110k
    if (current_sequence.size() > longest_sequence.size()) {
87
53.4k
        longest_sequence = current_sequence;
88
53.4k
    }
89
90
110k
    *result = longest_sequence;
91
110k
}
92
93
int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
94
        CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
95
        const int64_t max_compaction_score, const int64_t min_compaction_score,
96
        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
97
110k
        size_t* compaction_score, bool allow_delete) {
98
110k
    DBUG_EXECUTE_IF(
99
110k
            "CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", {
100
110k
                auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
101
110k
                if (target_tablet_id == tablet->tablet_id()) {
102
110k
                    auto start_version = dp->param<int64_t>("start_version", -1);
103
110k
                    auto end_version = dp->param<int64_t>("end_version", -1);
104
110k
                    for (auto& rowset : candidate_rowsets) {
105
110k
                        if (rowset->start_version() >= start_version &&
106
110k
                            rowset->end_version() <= end_version) {
107
110k
                            input_rowsets->push_back(rowset);
108
110k
                        }
109
110k
                    }
110
110k
                    LOG_INFO(
111
110k
                            "[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_"
112
110k
                            "input_rowsets] tablet_id={}, start={}, end={}, "
113
110k
                            "input_rowsets->size()={}",
114
110k
                            target_tablet_id, start_version, end_version, input_rowsets->size());
115
110k
                    return input_rowsets->size();
116
110k
                }
117
110k
            })
118
119
110k
    size_t promotion_size = cloud_promotion_size(tablet);
120
110k
    auto max_version = tablet->max_version().first;
121
110k
    int transient_size = 0;
122
110k
    *compaction_score = 0;
123
110k
    int64_t total_size = 0;
124
110k
    bool skip_trim = false; // Skip trim for Empty Rowset Compaction
125
126
    // DEFER: trim input_rowsets from back if score > max_compaction_score
127
    // This ensures we don't return more rowsets than allowed by max_compaction_score,
128
    // while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
129
    // Must be placed after variable initialization and before collection loop.
130
110k
    DEFER({
131
110k
        if (skip_trim) {
132
110k
            return;
133
110k
        }
134
        // Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
135
110k
        while (input_rowsets->size() > 1 &&
136
110k
               *compaction_score > static_cast<size_t>(max_compaction_score)) {
137
110k
            auto& last_rowset = input_rowsets->back();
138
110k
            *compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
139
110k
            total_size -= last_rowset->rowset_meta()->total_disk_size();
140
110k
            input_rowsets->pop_back();
141
110k
        }
142
110k
    });
143
144
440k
    for (auto& rowset : candidate_rowsets) {
145
        // check whether this rowset is delete version
146
440k
        if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
147
1.19k
            *last_delete_version = rowset->version();
148
1.19k
            if (!input_rowsets->empty()) {
149
                // we meet a delete version, and there were other versions before.
150
                // we should compact those version before handling them over to base compaction
151
747
                break;
152
747
            } else {
153
                // we meet a delete version, and no other versions before, skip it and continue
154
451
                input_rowsets->clear();
155
451
                *compaction_score = 0;
156
451
                transient_size = 0;
157
451
                continue;
158
451
            }
159
1.19k
        }
160
439k
        if (tablet->tablet_state() == TABLET_NOTREADY) {
161
            // If tablet under alter, keep latest 10 version so that base tablet max version
162
            // not merged in new tablet, and then we can copy data from base tablet
163
14
            if (rowset->version().second < max_version - 10) {
164
0
                continue;
165
0
            }
166
14
        }
167
        // Removed: max_compaction_score check here
168
        // We now collect all candidate rowsets and trim from back at return time via DEFER
169
439k
        *compaction_score += rowset->rowset_meta()->get_compaction_score();
170
439k
        total_size += rowset->rowset_meta()->total_disk_size();
171
172
439k
        transient_size += 1;
173
439k
        input_rowsets->push_back(rowset);
174
439k
    }
175
176
110k
    if (total_size >= promotion_size) {
177
0
        return transient_size;
178
0
    }
179
180
    // if there is delete version, do compaction directly
181
110k
    if (last_delete_version->first != -1) {
182
797
        if (input_rowsets->size() == 1) {
183
105
            auto rs_meta = input_rowsets->front()->rowset_meta();
184
            // if there is only one rowset and not overlapping,
185
            // we do not need to do cumulative compaction
186
105
            if (!rs_meta->is_segments_overlapping()) {
187
105
                input_rowsets->clear();
188
105
                *compaction_score = 0;
189
105
            }
190
105
        }
191
797
        return transient_size;
192
797
    }
193
194
    // Check if empty rowset compaction strategy is enabled
195
110k
    if (config::enable_empty_rowset_compaction && !input_rowsets->empty()) {
196
        // Check if input_rowsets contain consecutive empty rowsets that meet criteria
197
110k
        std::vector<RowsetSharedPtr> consecutive_empty_rowsets;
198
110k
        find_longest_consecutive_empty_rowsets(&consecutive_empty_rowsets, *input_rowsets);
199
200
110k
        if (!consecutive_empty_rowsets.empty() &&
201
110k
            consecutive_empty_rowsets.size() >= config::empty_rowset_compaction_min_count &&
202
110k
            static_cast<double>(consecutive_empty_rowsets.size()) /
203
4.04k
                            static_cast<double>(input_rowsets->size()) >=
204
4.04k
                    config::empty_rowset_compaction_min_ratio) {
205
            // Prioritize consecutive empty rowset compaction
206
            // Skip trim: empty rowset compaction has very low cost and the goal is to reduce rowset count
207
3.88k
            *input_rowsets = consecutive_empty_rowsets;
208
3.88k
            *compaction_score = consecutive_empty_rowsets.size();
209
3.88k
            skip_trim = true;
210
3.88k
            return consecutive_empty_rowsets.size();
211
3.88k
        }
212
110k
    }
213
214
106k
    auto rs_begin = input_rowsets->begin();
215
106k
    size_t new_compaction_score = *compaction_score;
216
135k
    while (rs_begin != input_rowsets->end()) {
217
128k
        auto& rs_meta = (*rs_begin)->rowset_meta();
218
128k
        int64_t current_level = _level_size(rs_meta->total_disk_size());
219
128k
        int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
220
        // if current level less then remain level, input rowsets contain current rowset
221
        // and process return; otherwise, input rowsets do not contain current rowset.
222
128k
        if (current_level <= remain_level) {
223
99.2k
            break;
224
99.2k
        }
225
29.4k
        total_size -= rs_meta->total_disk_size();
226
29.4k
        new_compaction_score -= rs_meta->get_compaction_score();
227
29.4k
        ++rs_begin;
228
29.4k
    }
229
106k
    if (rs_begin == input_rowsets->end()) { // No suitable level size found in `input_rowsets`
230
6.90k
        if (config::prioritize_query_perf_in_compaction && tablet->keys_type() != DUP_KEYS) {
231
            // While tablet's key type is not `DUP_KEYS`, compacting rowset in such tablets has a significant
232
            // positive impact on queries and reduces space amplification, so we ignore level limitation and
233
            // pick candidate rowsets as input rowsets.
234
1.95k
            return transient_size;
235
4.94k
        } else if (*compaction_score >= max_compaction_score) {
236
            // Score of `input_rowsets` exceed max compaction score, which means `input_rowsets` will never change and
237
            // this tablet will never execute cumulative compaction. MUST execute compaction on these `input_rowsets`
238
            // to reduce compaction score.
239
0
            RowsetSharedPtr rs_with_max_score;
240
0
            uint32_t max_score = 1;
241
0
            for (auto& rs : *input_rowsets) {
242
0
                if (rs->rowset_meta()->get_compaction_score() > max_score) {
243
0
                    max_score = rs->rowset_meta()->get_compaction_score();
244
0
                    rs_with_max_score = rs;
245
0
                }
246
0
            }
247
0
            if (rs_with_max_score) {
248
0
                input_rowsets->clear();
249
0
                input_rowsets->push_back(std::move(rs_with_max_score));
250
0
                *compaction_score = max_score;
251
0
                return transient_size;
252
0
            }
253
            // no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
254
0
            return transient_size;
255
0
        }
256
6.90k
    }
257
104k
    input_rowsets->erase(input_rowsets->begin(), rs_begin);
258
104k
    *compaction_score = new_compaction_score;
259
260
104k
    VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = "
261
0
                  << *compaction_score << ", total_size = " << total_size
262
0
                  << ", calc promotion size value = " << promotion_size
263
0
                  << ", tablet = " << tablet->tablet_id() << ", input_rowset size "
264
0
                  << input_rowsets->size();
265
266
    // empty return
267
104k
    if (input_rowsets->empty()) {
268
4.94k
        return transient_size;
269
4.94k
    }
270
271
    // if we have a sufficient number of segments, we should process the compaction.
272
    // otherwise, we check number of segments and total_size whether can do compaction.
273
99.2k
    if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) {
274
93.8k
        input_rowsets->clear();
275
93.8k
        *compaction_score = 0;
276
93.8k
    } else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) {
277
0
        auto rs_meta = input_rowsets->front()->rowset_meta();
278
        // if there is only one rowset and not overlapping,
279
        // we do not need to do compaction
280
0
        if (!rs_meta->is_segments_overlapping()) {
281
0
            input_rowsets->clear();
282
0
            *compaction_score = 0;
283
0
        }
284
0
    }
285
99.2k
    return transient_size;
286
104k
}
287
288
119k
int64_t CloudSizeBasedCumulativeCompactionPolicy::cloud_promotion_size(CloudTablet* t) const {
289
119k
    int64_t promotion_size = int64_t(cast_set<double>(t->base_size()) * _promotion_ratio);
290
    // promotion_size is between _size_based_promotion_size and _size_based_promotion_min_size
291
119k
    return promotion_size > _promotion_size       ? _promotion_size
292
119k
           : promotion_size < _promotion_min_size ? _promotion_min_size
293
18.4E
                                                  : promotion_size;
294
119k
}
295
296
int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
297
        CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version,
298
9.60k
        int64_t last_cumulative_point) {
299
9.60k
    TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), output_rowset.get(),
300
9.60k
                                           last_cumulative_point);
301
9.60k
    DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", {
302
9.60k
        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
303
9.60k
        auto cumu_point = dp->param<int64_t>("cumu_point", -1);
304
9.60k
        if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
305
9.60k
            LOG_INFO(
306
9.60k
                    "[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
307
9.60k
                    "tablet_id={}, cumu_point={}",
308
9.60k
                    target_tablet_id, cumu_point);
309
9.60k
            return cumu_point;
310
9.60k
        }
311
9.60k
    });
312
    // for MoW table, if there's too many versions, the delete bitmap will grow to
313
    // a very big size, which may cause the tablet meta too big and the `save_meta`
314
    // operation too slow.
315
    // if the rowset should not promotion according to it's disk size, we should also
316
    // consider it's version count here.
317
9.60k
    bool satisfy_promotion_version = tablet->enable_unique_key_merge_on_write() &&
318
9.60k
                                     output_rowset->end_version() - output_rowset->start_version() >
319
4.07k
                                             config::compaction_promotion_version_count;
320
    // if rowsets have delete version, move to the last directly.
321
    // if rowsets have no delete version, check output_rowset total disk size satisfies promotion size.
322
9.60k
    return (last_delete_version.first != -1 ||
323
9.60k
            output_rowset->total_disk_size() >= cloud_promotion_size(tablet) ||
324
9.60k
            satisfy_promotion_version)
325
9.60k
                   ? output_rowset->end_version() + 1
326
9.60k
                   : last_cumulative_point;
327
9.60k
}
328
329
int64_t CloudTimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
330
        CloudTablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
331
        const int64_t max_compaction_score, const int64_t min_compaction_score,
332
        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
333
4
        size_t* compaction_score, bool allow_delete) {
334
4
    int64_t last_cumu = tablet->last_cumu_compaction_success_time();
335
4
    return TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
336
4
            tablet, last_cumu, candidate_rowsets, max_compaction_score, min_compaction_score,
337
4
            input_rowsets, last_delete_version, compaction_score, allow_delete);
338
4
}
339
340
int64_t CloudTimeSeriesCumulativeCompactionPolicy::get_compaction_level(
341
        CloudTablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
342
3
        RowsetSharedPtr output_rowset) {
343
3
    return TimeSeriesCumulativeCompactionPolicy::get_compaction_level((BaseTablet*)tablet,
344
3
                                                                      input_rowsets, output_rowset);
345
3
}
346
347
int64_t CloudTimeSeriesCumulativeCompactionPolicy::new_cumulative_point(
348
        CloudTablet* tablet, const RowsetSharedPtr& output_rowset, Version& last_delete_version,
349
3
        int64_t last_cumulative_point) {
350
3
    if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) {
351
3
        return last_cumulative_point;
352
3
    }
353
354
0
    if (tablet->tablet_meta()->time_series_compaction_level_threshold() >= 2 &&
355
0
        output_rowset->rowset_meta()->compaction_level() < 2) {
356
0
        return last_cumulative_point;
357
0
    }
358
359
0
    return output_rowset->end_version() + 1;
360
0
}
361
362
#include "common/compile_check_end.h"
363
} // namespace doris