Coverage Report

Created: 2026-03-16 19:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction/cumulative_compaction_policy.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction/cumulative_compaction_policy.h"
19
20
#include <algorithm>
21
#include <list>
22
#include <ostream>
23
#include <string>
24
25
#include "common/config.h"
26
#include "common/logging.h"
27
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
28
#include "storage/olap_common.h"
29
#include "storage/tablet/tablet.h"
30
#include "storage/tablet/tablet_meta.h"
31
#include "util/debug_points.h"
32
#include "util/defer_op.h"
33
34
namespace doris {
35
#include "common/compile_check_begin.h"
36
37
SizeBasedCumulativeCompactionPolicy::SizeBasedCumulativeCompactionPolicy(
38
        int64_t promotion_size, double promotion_ratio, int64_t promotion_min_size,
39
        int64_t promotion_version_count, int64_t compaction_min_size)
40
580
        : _promotion_size(promotion_size),
41
580
          _promotion_ratio(promotion_ratio),
42
580
          _promotion_min_size(promotion_min_size),
43
580
          _promotion_version_count(promotion_version_count),
44
580
          _compaction_min_size(compaction_min_size) {}
45
46
void SizeBasedCumulativeCompactionPolicy::calculate_cumulative_point(
47
        Tablet* tablet, const RowsetMetaMapContainer& all_metas, int64_t current_cumulative_point,
48
69
        int64_t* ret_cumulative_point) {
49
69
    *ret_cumulative_point = Tablet::K_INVALID_CUMULATIVE_POINT;
50
69
    if (current_cumulative_point != Tablet::K_INVALID_CUMULATIVE_POINT) {
51
        // only calculate the point once.
52
        // after that, cumulative point will be updated along with compaction process.
53
21
        return;
54
21
    }
55
    // empty return
56
48
    if (all_metas.empty()) {
57
0
        return;
58
0
    }
59
60
48
    std::list<RowsetMetaSharedPtr> existing_rss;
61
3.05k
    for (const auto& [_, rs] : all_metas) {
62
3.05k
        existing_rss.emplace_back(rs);
63
3.05k
    }
64
65
    // sort the existing rowsets by version in ascending order
66
18.5k
    existing_rss.sort([](const RowsetMetaSharedPtr& a, const RowsetMetaSharedPtr& b) {
67
        // simple because 2 versions are certainly not overlapping
68
18.5k
        return a->version().first < b->version().first;
69
18.5k
    });
70
71
    // calculate promotion size
72
48
    auto base_rowset_meta = existing_rss.begin();
73
74
48
    if (tablet->tablet_state() == TABLET_RUNNING) {
75
        // check base rowset first version must be zero
76
        // for tablet which state is not TABLET_RUNNING, there may not have base version.
77
48
        CHECK((*base_rowset_meta)->start_version() == 0);
78
79
48
        int64_t promotion_size = 0;
80
48
        _calc_promotion_size(tablet, *base_rowset_meta, &promotion_size);
81
82
48
        int64_t prev_version = -1;
83
98
        for (const RowsetMetaSharedPtr& rs : existing_rss) {
84
98
            if (rs->version().first > prev_version + 1) {
85
                // There is a hole, do not continue
86
0
                break;
87
0
            }
88
89
98
            bool is_delete = rs->has_delete_predicate();
90
91
            // break the loop if segments in this rowset is overlapping.
92
98
            if (!is_delete && rs->is_segments_overlapping()) {
93
12
                *ret_cumulative_point = rs->version().first;
94
12
                break;
95
12
            }
96
97
            // check the rowset is whether less than promotion size
98
86
            if (!is_delete && rs->version().first != 0 && rs->total_disk_size() < promotion_size) {
99
35
                *ret_cumulative_point = rs->version().first;
100
35
                break;
101
35
            }
102
103
            // include one situation: When the segment is not deleted, and is singleton delta, and is NONOVERLAPPING, ret_cumulative_point increase
104
51
            prev_version = rs->version().second;
105
51
            *ret_cumulative_point = prev_version + 1;
106
51
        }
107
48
        VLOG_NOTICE
108
0
                << "cumulative compaction size_based policy, calculate cumulative point value = "
109
0
                << *ret_cumulative_point << ", calc promotion size value = " << promotion_size
110
0
                << " tablet = " << tablet->tablet_id();
111
48
    } else if (tablet->tablet_state() == TABLET_NOTREADY) {
112
        // tablet under alter process
113
        // we choose version next to the base version as cumulative point
114
0
        for (const RowsetMetaSharedPtr& rs : existing_rss) {
115
0
            if (rs->version().first > 0) {
116
0
                *ret_cumulative_point = rs->version().first;
117
0
                break;
118
0
            }
119
0
        }
120
0
    }
121
48
}
122
123
void SizeBasedCumulativeCompactionPolicy::_calc_promotion_size(Tablet* tablet,
124
                                                               RowsetMetaSharedPtr base_rowset_meta,
125
79
                                                               int64_t* promotion_size) {
126
79
    int64_t base_size = base_rowset_meta->total_disk_size();
127
79
    *promotion_size = int64_t(cast_set<double>(base_size) * _promotion_ratio);
128
129
    // promotion_size is between _promotion_size and _promotion_min_size
130
79
    if (*promotion_size >= _promotion_size) {
131
8
        *promotion_size = _promotion_size;
132
71
    } else if (*promotion_size <= _promotion_min_size) {
133
71
        *promotion_size = _promotion_min_size;
134
71
    }
135
79
    _refresh_tablet_promotion_size(tablet, *promotion_size);
136
79
}
137
138
void SizeBasedCumulativeCompactionPolicy::_refresh_tablet_promotion_size(Tablet* tablet,
139
79
                                                                         int64_t promotion_size) {
140
79
    tablet->set_cumulative_promotion_size(promotion_size);
141
79
}
142
143
void SizeBasedCumulativeCompactionPolicy::update_cumulative_point(
144
        Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
145
0
        RowsetSharedPtr output_rowset, Version& last_delete_version) {
146
0
    if (tablet->tablet_state() != TABLET_RUNNING) {
147
        // if tablet under alter process, do not update cumulative point
148
0
        return;
149
0
    }
150
    // if rowsets have delete version, move to the last directly
151
0
    if (last_delete_version.first != -1) {
152
0
        tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
153
0
    } else {
154
        // if rowsets have no delete version, check output_rowset total disk size
155
        // satisfies promotion size.
156
0
        size_t total_size = output_rowset->rowset_meta()->total_disk_size();
157
0
        if (total_size >= tablet->cumulative_promotion_size()) {
158
0
            tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
159
0
        } else if (tablet->enable_unique_key_merge_on_write() &&
160
0
                   output_rowset->end_version() - output_rowset->start_version() >
161
0
                           _promotion_version_count) {
162
            // for MoW table, if there's too many versions, the delete bitmap will grow to
163
            // a very big size, which may cause the tablet meta too big and the `save_meta`
164
            // operation too slow.
165
            // if the rowset should not promotion according to it's disk size, we should also
166
            // consider it's version count here.
167
0
            tablet->set_cumulative_layer_point(output_rowset->end_version() + 1);
168
0
        }
169
0
    }
170
0
}
171
172
31
uint32_t SizeBasedCumulativeCompactionPolicy::calc_cumulative_compaction_score(Tablet* tablet) {
173
31
    uint32_t score = 0;
174
31
    bool base_rowset_exist = false;
175
31
    const int64_t point = tablet->cumulative_layer_point();
176
31
    int64_t promotion_size = 0;
177
178
31
    std::vector<RowsetMetaSharedPtr> rowset_to_compact;
179
31
    int64_t total_size = 0;
180
181
31
    RowsetMetaSharedPtr first_meta;
182
31
    int64_t first_version = INT64_MAX;
183
    // NOTE: tablet._meta_lock is hold
184
31
    auto& rs_metas = tablet->tablet_meta()->all_rs_metas();
185
    // check the base rowset and collect the rowsets of cumulative part
186
2.26k
    for (const auto& [_, rs_meta] : rs_metas) {
187
2.26k
        if (rs_meta->start_version() < first_version) {
188
570
            first_version = rs_meta->start_version();
189
570
            first_meta = rs_meta;
190
570
        }
191
        // check base rowset
192
2.26k
        if (rs_meta->start_version() == 0) {
193
31
            base_rowset_exist = true;
194
31
        }
195
2.26k
        if (rs_meta->end_version() < point || !rs_meta->is_local()) {
196
            // all_rs_metas() is not sorted, so we use _continue_ other than _break_ here.
197
31
            continue;
198
2.22k
        } else {
199
            // collect the rowsets of cumulative part
200
2.22k
            total_size += rs_meta->total_disk_size();
201
2.22k
            score += rs_meta->get_compaction_score();
202
2.22k
            rowset_to_compact.push_back(rs_meta);
203
2.22k
        }
204
2.26k
    }
205
206
31
    if (first_meta == nullptr) {
207
0
        return 0;
208
0
    }
209
210
    // Use "first"(not base) version to calc promotion size
211
    // because some tablet do not have base version(under alter operation)
212
31
    _calc_promotion_size(tablet, first_meta, &promotion_size);
213
214
    // If base version does not exist, but its state is RUNNING.
215
    // It is abnormal, do not select it and set *score = 0
216
31
    if (!base_rowset_exist && tablet->tablet_state() == TABLET_RUNNING) {
217
0
        LOG(WARNING) << "tablet state is running but have no base version";
218
0
        return 0;
219
0
    }
220
221
    // if total_size is greater than promotion_size, return total score
222
31
    if (total_size >= promotion_size) {
223
0
        return score;
224
0
    }
225
226
    // sort the rowsets of cumulative part
227
31
    std::sort(rowset_to_compact.begin(), rowset_to_compact.end(), RowsetMeta::comparator);
228
229
    // calculate the rowsets to do cumulative compaction
230
    // eg: size of rowset_to_compact are:
231
    // 128, 16, 16, 16
232
    // we will choose [16,16,16] to compact.
233
31
    for (auto& rs_meta : rowset_to_compact) {
234
31
        int64_t current_level = _level_size(rs_meta->total_disk_size());
235
31
        int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
236
        // if current level less then remain level, score contains current rowset
237
        // and process return; otherwise, score does not contains current rowset.
238
31
        if (current_level <= remain_level) {
239
31
            return score;
240
31
        }
241
0
        total_size -= rs_meta->total_disk_size();
242
0
        score -= rs_meta->get_compaction_score();
243
0
    }
244
0
    return score;
245
31
}
246
247
int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
248
        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
249
        const int64_t max_compaction_score, const int64_t min_compaction_score,
250
        std::vector<RowsetSharedPtr>* input_rowsets, Version* last_delete_version,
251
61
        size_t* compaction_score, bool allow_delete) {
252
61
    DBUG_EXECUTE_IF("SizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", {
253
61
        auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
254
61
        if (target_tablet_id == tablet->tablet_id()) {
255
61
            auto start_version = dp->param<int64_t>("start_version", -1);
256
61
            auto end_version = dp->param<int64_t>("end_version", -1);
257
61
            for (auto& rowset : candidate_rowsets) {
258
61
                if (rowset->start_version() >= start_version &&
259
61
                    rowset->end_version() <= end_version) {
260
61
                    input_rowsets->push_back(rowset);
261
61
                }
262
61
            }
263
61
        }
264
61
        return cast_set<uint32_t>(input_rowsets->size());
265
61
    })
266
267
61
    size_t promotion_size = tablet->cumulative_promotion_size();
268
61
    auto max_version = tablet->max_version().first;
269
61
    int transient_size = 0;
270
61
    *compaction_score = 0;
271
61
    int64_t total_size = 0;
272
273
    // DEFER: trim input_rowsets from back if score > max_compaction_score
274
    // This ensures we don't return more rowsets than allowed by max_compaction_score,
275
    // while still collecting enough rowsets to pass min_compaction_score check after level_size removal.
276
    // Must be placed after variable initialization and before collection loop.
277
61
    DEFER({
278
        // Keep at least 1 rowset to avoid removing the only rowset (consistent with fallback branch)
279
61
        while (input_rowsets->size() > 1 &&
280
61
               *compaction_score > static_cast<size_t>(max_compaction_score)) {
281
61
            auto& last_rowset = input_rowsets->back();
282
61
            *compaction_score -= last_rowset->rowset_meta()->get_compaction_score();
283
61
            total_size -= last_rowset->rowset_meta()->total_disk_size();
284
61
            input_rowsets->pop_back();
285
61
        }
286
61
    });
287
288
3.54k
    for (auto& rowset : candidate_rowsets) {
289
        // check whether this rowset is delete version
290
3.54k
        if (!allow_delete && rowset->rowset_meta()->has_delete_predicate()) {
291
3
            *last_delete_version = rowset->version();
292
3
            if (!input_rowsets->empty()) {
293
                // we meet a delete version, and there were other versions before.
294
                // we should compact those version before handling them over to base compaction
295
3
                break;
296
3
            } else {
297
                // we meet a delete version, and no other versions before, skip it and continue
298
0
                input_rowsets->clear();
299
0
                *compaction_score = 0;
300
0
                transient_size = 0;
301
0
                continue;
302
0
            }
303
3
        }
304
3.54k
        if (tablet->tablet_state() == TABLET_NOTREADY) {
305
            // If tablet under alter, keep latest 10 version so that base tablet max version
306
            // not merged in new tablet, and then we can copy data from base tablet
307
0
            if (rowset->version().second < max_version - 10) {
308
0
                continue;
309
0
            }
310
0
        }
311
        // Removed: max_compaction_score check here
312
        // We now collect all candidate rowsets and trim from back at return time via DEFER
313
3.54k
        *compaction_score += rowset->rowset_meta()->get_compaction_score();
314
3.54k
        total_size += rowset->rowset_meta()->total_disk_size();
315
316
3.54k
        transient_size += 1;
317
3.54k
        input_rowsets->push_back(rowset);
318
3.54k
    }
319
61
    DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
320
61
                    { return transient_size; })
321
322
61
    if (total_size >= promotion_size) {
323
37
        return transient_size;
324
37
    }
325
326
    // if there is delete version, do compaction directly
327
24
    if (last_delete_version->first != -1) {
328
1
        if (input_rowsets->size() == 1) {
329
0
            auto rs_meta = input_rowsets->front()->rowset_meta();
330
            // if there is only one rowset and not overlapping,
331
            // we do not need to do cumulative compaction
332
0
            if (!rs_meta->is_segments_overlapping()) {
333
0
                input_rowsets->clear();
334
0
                *compaction_score = 0;
335
0
            }
336
0
        }
337
1
        return transient_size;
338
1
    }
339
340
23
    auto rs_begin = input_rowsets->begin();
341
23
    size_t new_compaction_score = *compaction_score;
342
46
    while (rs_begin != input_rowsets->end()) {
343
42
        auto& rs_meta = (*rs_begin)->rowset_meta();
344
42
        int64_t current_level = _level_size(rs_meta->total_disk_size());
345
42
        int64_t remain_level = _level_size(total_size - rs_meta->total_disk_size());
346
        // if current level less then remain level, input rowsets contain current rowset
347
        // and process return; otherwise, input rowsets do not contain current rowset.
348
42
        if (current_level <= remain_level) {
349
19
            break;
350
19
        }
351
23
        total_size -= rs_meta->total_disk_size();
352
23
        new_compaction_score -= rs_meta->get_compaction_score();
353
23
        ++rs_begin;
354
23
    }
355
23
    if (rs_begin == input_rowsets->end() && *compaction_score >= max_compaction_score) {
356
        // No suitable level size found in `input_rowsets` but score of `input_rowsets` exceed max compaction score,
357
        // which means `input_rowsets` will never change and this tablet will never execute cumulative compaction.
358
        // MUST execute compaction on these `input_rowsets` to reduce compaction score.
359
2
        RowsetSharedPtr rs_with_max_score;
360
2
        uint32_t max_score = 1;
361
7
        for (auto& rs : *input_rowsets) {
362
7
            if (rs->rowset_meta()->get_compaction_score() > max_score) {
363
1
                max_score = rs->rowset_meta()->get_compaction_score();
364
1
                rs_with_max_score = rs;
365
1
            }
366
7
        }
367
2
        if (rs_with_max_score) {
368
1
            input_rowsets->clear();
369
1
            input_rowsets->push_back(std::move(rs_with_max_score));
370
1
            *compaction_score = max_score;
371
1
            return transient_size;
372
1
        }
373
        // no rowset is OVERLAPPING, return all input rowsets (DEFER will trim to max_compaction_score)
374
1
        return transient_size;
375
2
    }
376
21
    input_rowsets->erase(input_rowsets->begin(), rs_begin);
377
21
    *compaction_score = new_compaction_score;
378
379
21
    VLOG_CRITICAL << "cumulative compaction size_based policy, compaction_score = "
380
0
                  << *compaction_score << ", total_size = " << total_size
381
0
                  << ", calc promotion size value = " << promotion_size
382
0
                  << ", tablet = " << tablet->tablet_id() << ", input_rowset size "
383
0
                  << input_rowsets->size();
384
385
    // empty return
386
21
    if (input_rowsets->empty()) {
387
2
        return transient_size;
388
2
    }
389
390
    // if we have a sufficient number of segments, we should process the compaction.
391
    // otherwise, we check number of segments and total_size whether can do compaction.
392
19
    if (total_size < _compaction_min_size && *compaction_score < min_compaction_score) {
393
4
        input_rowsets->clear();
394
4
        *compaction_score = 0;
395
15
    } else if (total_size >= _compaction_min_size && input_rowsets->size() == 1) {
396
0
        auto rs_meta = input_rowsets->front()->rowset_meta();
397
        // if there is only one rowset and not overlapping,
398
        // we do not need to do compaction
399
0
        if (!rs_meta->is_segments_overlapping()) {
400
0
            input_rowsets->clear();
401
0
            *compaction_score = 0;
402
0
        }
403
0
    }
404
19
    return transient_size;
405
21
}
406
407
150
int64_t SizeBasedCumulativeCompactionPolicy::_level_size(const int64_t size) {
408
150
    if (size < 1024) return 0;
409
70
    int64_t max_level = (int64_t)1
410
70
                        << (sizeof(_promotion_size) * 8 - 1 - __builtin_clzl(_promotion_size / 2));
411
70
    if (size >= max_level) return max_level;
412
69
    return (int64_t)1 << (sizeof(size) * 8 - 1 - __builtin_clzl(size));
413
70
}
414
415
std::shared_ptr<CumulativeCompactionPolicy>
416
CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy(
417
595
        const std::string_view& compaction_policy) {
418
595
    if (compaction_policy == CUMULATIVE_TIME_SERIES_POLICY) {
419
15
        return std::make_shared<TimeSeriesCumulativeCompactionPolicy>();
420
580
    } else if (compaction_policy == CUMULATIVE_SIZE_BASED_POLICY) {
421
521
        return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
422
521
    }
423
59
    return std::make_shared<SizeBasedCumulativeCompactionPolicy>();
424
595
}
425
#include "common/compile_check_end.h"
426
} // namespace doris