Coverage Report

Created: 2026-06-04 13:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction/binlog_compaction_policy.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction/binlog_compaction_policy.h"
19
20
#include <string>
21
22
#include "common/config.h"
23
#include "common/logging.h"
24
#include "storage/rowset/rowset.h"
25
#include "storage/tablet/tablet.h"
26
#include "storage/tablet/tablet_meta.h"
27
#include "util/time.h"
28
29
namespace doris {
30
31
int BinlogCompactionPolicy::pick_input_rowsets(
32
        Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets,
33
0
        int8_t compaction_level, std::vector<RowsetSharedPtr>* input_rowsets) const {
34
    // 1) Filter rowsets by `compaction_level`
35
0
    std::vector<RowsetSharedPtr> level_rowsets;
36
0
    level_rowsets.reserve(candidate_rowsets.size());
37
0
    for (const auto& rs : candidate_rowsets) {
38
0
        if (!rs->is_local() || rs->rowset_meta()->compaction_level() != compaction_level) {
39
0
            continue;
40
0
        }
41
0
        if (!level_rowsets.empty() &&
42
0
            rs->start_version() != level_rowsets.back()->end_version() + 1) {
43
0
            LOG(WARNING) << "rowset is non-continuous in the same compaction_level of binlog "
44
0
                            "compaction. tablet="
45
0
                         << tablet->tablet_id()
46
0
                         << ", compaction_level=" << std::to_string(compaction_level)
47
0
                         << ", prev_version=" << level_rowsets.back()->version()
48
0
                         << ", next_version=" << rs->version();
49
0
            return 0;
50
0
        }
51
0
        level_rowsets.push_back(rs);
52
0
    }
53
54
    // 2) Split `level_rowsets` into `full_enough_rowsets` and `remaining_rowsets`.
55
    //    - L0/L1: only physical rewrite.
56
    //    - LMax: Base([0-x]) + prefix ENOUGH rowsets are candidates for quick compact.
57
0
    std::vector<RowsetSharedPtr> full_enough_rowsets;
58
0
    std::vector<RowsetSharedPtr> remaining_rowsets;
59
0
    full_enough_rowsets.reserve(level_rowsets.size());
60
0
    remaining_rowsets.reserve(level_rowsets.size());
61
62
0
    bool find_base = false;
63
0
    int full_enough_size = 0;
64
0
    size_t idx = 0;
65
0
    if (compaction_level == kBinlogCompactionMaxLevel - 1) {
66
0
        if (!level_rowsets.empty() && level_rowsets[0]->start_version() == 0) {
67
0
            find_base = true;
68
0
            full_enough_rowsets.push_back(level_rowsets[0]);
69
0
            ++full_enough_size;
70
0
            idx = 1;
71
0
            for (; idx < level_rowsets.size(); ++idx) {
72
0
                const auto& rs = level_rowsets[idx];
73
0
                int64_t rs_score = rs->rowset_meta()->get_compaction_score();
74
0
                if (rs->data_disk_size() >=
75
0
                            config::binlog_compaction_goal_size_mbytes * 1024 * 1024 ||
76
0
                    rs_score >= config::binlog_compaction_file_count_threshold) {
77
0
                    full_enough_rowsets.push_back(rs);
78
0
                    ++full_enough_size;
79
0
                    continue;
80
0
                }
81
0
                break;
82
0
            }
83
0
            for (; idx < level_rowsets.size(); ++idx) {
84
0
                remaining_rowsets.push_back(level_rowsets[idx]);
85
0
            }
86
0
        } else {
87
0
            remaining_rowsets = level_rowsets;
88
0
        }
89
0
    } else {
90
0
        remaining_rowsets = level_rowsets;
91
0
    }
92
93
    // 3) Pick rowsets from `remaining_rowsets` (physical rewrite path).
94
0
    std::vector<RowsetSharedPtr> picked_rowsets;
95
0
    picked_rowsets.reserve(remaining_rowsets.size());
96
0
    int transient_size = 0;
97
0
    int64_t total_size = 0;
98
0
    int64_t compaction_score = 0;
99
0
    const int64_t max_binlog_permits = (config::total_permits_for_compaction_score *
100
0
                                                config::binlog_compaction_permits_percent +
101
0
                                        99) /
102
0
                                       100;
103
0
    for (const auto& rs : remaining_rowsets) {
104
0
        int64_t rs_score = rs->rowset_meta()->get_compaction_score();
105
0
        if (transient_size >= config::binlog_level_compaction_max_deltas ||
106
0
            compaction_score + rs_score > max_binlog_permits) {
107
0
            break;
108
0
        }
109
0
        picked_rowsets.push_back(rs);
110
0
        ++transient_size;
111
0
        total_size += rs->data_disk_size();
112
0
        compaction_score += rs_score;
113
0
    }
114
115
    // 4) Trigger check
116
    //    - L0/L1: only physical rewrite if trigger is met.
117
    //    - LMax: if physical rewrite trigger is NOT met, try quick compact; if both met, compare score.
118
0
    bool can_do_binlog_compaction = false;
119
0
    if (total_size >= config::binlog_compaction_goal_size_mbytes * 1024 * 1024 ||
120
0
        compaction_score >= config::binlog_compaction_file_count_threshold) {
121
0
        can_do_binlog_compaction = true;
122
0
    } else if ((UnixMillis() - tablet->last_binlog_compaction_success_time(compaction_level)) /
123
0
                       1000 >=
124
0
               config::binlog_compaction_time_threshold_seconds) {
125
0
        can_do_binlog_compaction = true;
126
0
    }
127
128
    // 5) LMax quick compact vs physical rewrite.
129
0
    if (compaction_level == kBinlogCompactionMaxLevel - 1 && find_base && full_enough_size > 1) {
130
0
        int64_t quick_merge_score = 1;
131
0
        for (int i = 1; i < full_enough_size; ++i) {
132
0
            quick_merge_score += full_enough_rowsets[i]->rowset_meta()->get_compaction_score();
133
0
        }
134
135
0
        if (!can_do_binlog_compaction || quick_merge_score > compaction_score) {
136
0
            input_rowsets->swap(full_enough_rowsets);
137
0
            return full_enough_size;
138
0
        }
139
0
    }
140
141
0
    if (transient_size < 2) {
142
0
        can_do_binlog_compaction = false;
143
0
    }
144
145
0
    if (can_do_binlog_compaction) {
146
0
        input_rowsets->swap(picked_rowsets);
147
0
        return transient_size;
148
0
    }
149
150
0
    input_rowsets->clear();
151
0
    return 0;
152
0
}
153
154
uint32_t BinlogCompactionPolicy::calc_binlog_compaction_level_score(Tablet* tablet,
155
0
                                                                    int8_t level) const {
156
0
    uint32_t score = 0;
157
    // Binlog tiered compaction score (L0..LMax)
158
    //
159
    //   L0/L1/... : | rowsets ... |
160
    //   LMax      : | Base([0-x]) | remaining rowsets ... |
161
    //
162
    // Base only performs meta-only merge (merge rowset meta), so get_compaction_score()
163
    // treats Base score as 1.
164
0
    for (const auto& [_, rs_meta] : tablet->tablet_meta()->all_row_binlog_rs_metas()) {
165
0
        if (!rs_meta->is_local() || rs_meta->compaction_level() != level) {
166
0
            continue;
167
0
        }
168
0
        score += rs_meta->get_compaction_score();
169
0
    }
170
0
    return score;
171
0
}
172
173
uint32_t BinlogCompactionPolicy::calc_binlog_compaction_score(
174
0
        Tablet* tablet, int8_t* prefer_compaction_level) const {
175
0
    uint32_t max_score = 0;
176
0
    int8_t max_level = -1;
177
0
    for (int8_t level = 0; level < kBinlogCompactionMaxLevel; ++level) {
178
0
        uint32_t score = calc_binlog_compaction_level_score(tablet, level);
179
0
        if (score > max_score) {
180
0
            max_score = score;
181
0
            max_level = level;
182
0
        }
183
0
    }
184
0
    if (prefer_compaction_level != nullptr) {
185
0
        *prefer_compaction_level = max_level;
186
0
    }
187
0
    return max_score;
188
0
}
189
190
void BinlogCompactionPolicy::update_compaction_level(
191
        Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets,
192
0
        RowsetSharedPtr output_rowset) {
193
0
    if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) {
194
0
        return;
195
0
    }
196
197
0
    int64_t first_level = 0;
198
0
    for (size_t i = 0; i < input_rowsets.size(); ++i) {
199
0
        int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level();
200
0
        if (i == 0) {
201
0
            first_level = cur_level;
202
0
            continue;
203
0
        }
204
205
0
        DCHECK_EQ(first_level, cur_level)
206
0
                << "Compaction level mismatch, first_level: " << first_level
207
0
                << ", cur_level: " << cur_level << ", tablet: " << tablet->tablet_id();
208
0
        if (first_level != cur_level) {
209
0
            LOG(WARNING) << "Failed to check compaction level, first_level: " << first_level
210
0
                         << ", cur_level: " << cur_level << ", tablet: " << tablet->tablet_id();
211
0
        }
212
213
0
        DCHECK_EQ(input_rowsets[i]->start_version(), input_rowsets[i - 1]->end_version() + 1)
214
0
                << "Binlog compaction input rowsets are non-continuous, prev_version: "
215
0
                << input_rowsets[i - 1]->version()
216
0
                << ", cur_version: " << input_rowsets[i]->version()
217
0
                << ", tablet: " << tablet->tablet_id();
218
0
        if (input_rowsets[i]->start_version() != input_rowsets[i - 1]->end_version() + 1) {
219
0
            LOG(WARNING) << "Failed to check binlog compaction input rowsets continuity, "
220
0
                         << "prev_version=" << input_rowsets[i - 1]->version()
221
0
                         << ", cur_version=" << input_rowsets[i]->version()
222
0
                         << ", tablet=" << tablet->tablet_id();
223
0
        }
224
0
    }
225
226
0
    if (first_level == kBinlogCompactionMaxLevel - 1) {
227
        // level max do not update compaction level
228
0
        output_rowset->rowset_meta()->set_compaction_level(first_level);
229
0
        return;
230
0
    }
231
232
0
    output_rowset->rowset_meta()->set_compaction_level(first_level + 1);
233
0
}
234
235
} // namespace doris