be/src/storage/compaction/binlog_compaction_policy.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/compaction/binlog_compaction_policy.h" |
19 | | |
20 | | #include <string> |
21 | | |
22 | | #include "common/config.h" |
23 | | #include "common/logging.h" |
24 | | #include "storage/rowset/rowset.h" |
25 | | #include "storage/tablet/tablet.h" |
26 | | #include "storage/tablet/tablet_meta.h" |
27 | | #include "util/time.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | | int BinlogCompactionPolicy::pick_input_rowsets( |
32 | | Tablet* tablet, const std::vector<RowsetSharedPtr>& candidate_rowsets, |
33 | 0 | int8_t compaction_level, std::vector<RowsetSharedPtr>* input_rowsets) const { |
34 | | // 1) Filter rowsets by `compaction_level` |
35 | 0 | std::vector<RowsetSharedPtr> level_rowsets; |
36 | 0 | level_rowsets.reserve(candidate_rowsets.size()); |
37 | 0 | for (const auto& rs : candidate_rowsets) { |
38 | 0 | if (!rs->is_local() || rs->rowset_meta()->compaction_level() != compaction_level) { |
39 | 0 | continue; |
40 | 0 | } |
41 | 0 | if (!level_rowsets.empty() && |
42 | 0 | rs->start_version() != level_rowsets.back()->end_version() + 1) { |
43 | 0 | LOG(WARNING) << "rowset is non-continuous in the same compaction_level of binlog " |
44 | 0 | "compaction. tablet=" |
45 | 0 | << tablet->tablet_id() |
46 | 0 | << ", compaction_level=" << std::to_string(compaction_level) |
47 | 0 | << ", prev_version=" << level_rowsets.back()->version() |
48 | 0 | << ", next_version=" << rs->version(); |
49 | 0 | return 0; |
50 | 0 | } |
51 | 0 | level_rowsets.push_back(rs); |
52 | 0 | } |
53 | | |
54 | | // 2) Split `level_rowsets` into `full_enough_rowsets` and `remaining_rowsets`. |
55 | | // - L0/L1: only physical rewrite. |
56 | | // - LMax: Base([0-x]) + prefix ENOUGH rowsets are candidates for quick compact. |
57 | 0 | std::vector<RowsetSharedPtr> full_enough_rowsets; |
58 | 0 | std::vector<RowsetSharedPtr> remaining_rowsets; |
59 | 0 | full_enough_rowsets.reserve(level_rowsets.size()); |
60 | 0 | remaining_rowsets.reserve(level_rowsets.size()); |
61 | |
|
62 | 0 | bool find_base = false; |
63 | 0 | int full_enough_size = 0; |
64 | 0 | size_t idx = 0; |
65 | 0 | if (compaction_level == kBinlogCompactionMaxLevel - 1) { |
66 | 0 | if (!level_rowsets.empty() && level_rowsets[0]->start_version() == 0) { |
67 | 0 | find_base = true; |
68 | 0 | full_enough_rowsets.push_back(level_rowsets[0]); |
69 | 0 | ++full_enough_size; |
70 | 0 | idx = 1; |
71 | 0 | for (; idx < level_rowsets.size(); ++idx) { |
72 | 0 | const auto& rs = level_rowsets[idx]; |
73 | 0 | int64_t rs_score = rs->rowset_meta()->get_compaction_score(); |
74 | 0 | if (rs->data_disk_size() >= |
75 | 0 | config::binlog_compaction_goal_size_mbytes * 1024 * 1024 || |
76 | 0 | rs_score >= config::binlog_compaction_file_count_threshold) { |
77 | 0 | full_enough_rowsets.push_back(rs); |
78 | 0 | ++full_enough_size; |
79 | 0 | continue; |
80 | 0 | } |
81 | 0 | break; |
82 | 0 | } |
83 | 0 | for (; idx < level_rowsets.size(); ++idx) { |
84 | 0 | remaining_rowsets.push_back(level_rowsets[idx]); |
85 | 0 | } |
86 | 0 | } else { |
87 | 0 | remaining_rowsets = level_rowsets; |
88 | 0 | } |
89 | 0 | } else { |
90 | 0 | remaining_rowsets = level_rowsets; |
91 | 0 | } |
92 | | |
93 | | // 3) Pick rowsets from `remaining_rowsets` (physical rewrite path). |
94 | 0 | std::vector<RowsetSharedPtr> picked_rowsets; |
95 | 0 | picked_rowsets.reserve(remaining_rowsets.size()); |
96 | 0 | int transient_size = 0; |
97 | 0 | int64_t total_size = 0; |
98 | 0 | int64_t compaction_score = 0; |
99 | 0 | const int64_t max_binlog_permits = (config::total_permits_for_compaction_score * |
100 | 0 | config::binlog_compaction_permits_percent + |
101 | 0 | 99) / |
102 | 0 | 100; |
103 | 0 | for (const auto& rs : remaining_rowsets) { |
104 | 0 | int64_t rs_score = rs->rowset_meta()->get_compaction_score(); |
105 | 0 | if (transient_size >= config::binlog_level_compaction_max_deltas || |
106 | 0 | compaction_score + rs_score > max_binlog_permits) { |
107 | 0 | break; |
108 | 0 | } |
109 | 0 | picked_rowsets.push_back(rs); |
110 | 0 | ++transient_size; |
111 | 0 | total_size += rs->data_disk_size(); |
112 | 0 | compaction_score += rs_score; |
113 | 0 | } |
114 | | |
115 | | // 4) Trigger check |
116 | | // - L0/L1: only physical rewrite if trigger is met. |
117 | | // - LMax: if physical rewrite trigger is NOT met, try quick compact; if both met, compare score. |
118 | 0 | bool can_do_binlog_compaction = false; |
119 | 0 | if (total_size >= config::binlog_compaction_goal_size_mbytes * 1024 * 1024 || |
120 | 0 | compaction_score >= config::binlog_compaction_file_count_threshold) { |
121 | 0 | can_do_binlog_compaction = true; |
122 | 0 | } else if ((UnixMillis() - tablet->last_binlog_compaction_success_time(compaction_level)) / |
123 | 0 | 1000 >= |
124 | 0 | config::binlog_compaction_time_threshold_seconds) { |
125 | 0 | can_do_binlog_compaction = true; |
126 | 0 | } |
127 | | |
128 | | // 5) LMax quick compact vs physical rewrite. |
129 | 0 | if (compaction_level == kBinlogCompactionMaxLevel - 1 && find_base && full_enough_size > 1) { |
130 | 0 | int64_t quick_merge_score = 1; |
131 | 0 | for (int i = 1; i < full_enough_size; ++i) { |
132 | 0 | quick_merge_score += full_enough_rowsets[i]->rowset_meta()->get_compaction_score(); |
133 | 0 | } |
134 | |
|
135 | 0 | if (!can_do_binlog_compaction || quick_merge_score > compaction_score) { |
136 | 0 | input_rowsets->swap(full_enough_rowsets); |
137 | 0 | return full_enough_size; |
138 | 0 | } |
139 | 0 | } |
140 | | |
141 | 0 | if (transient_size < 2) { |
142 | 0 | can_do_binlog_compaction = false; |
143 | 0 | } |
144 | |
|
145 | 0 | if (can_do_binlog_compaction) { |
146 | 0 | input_rowsets->swap(picked_rowsets); |
147 | 0 | return transient_size; |
148 | 0 | } |
149 | | |
150 | 0 | input_rowsets->clear(); |
151 | 0 | return 0; |
152 | 0 | } |
153 | | |
154 | | uint32_t BinlogCompactionPolicy::calc_binlog_compaction_level_score(Tablet* tablet, |
155 | 0 | int8_t level) const { |
156 | 0 | uint32_t score = 0; |
157 | | // Binlog tiered compaction score (L0..LMax) |
158 | | // |
159 | | // L0/L1/... : | rowsets ... | |
160 | | // LMax : | Base([0-x]) | remaining rowsets ... | |
161 | | // |
162 | | // Base only performs meta-only merge (merge rowset meta), so get_compaction_score() |
163 | | // treats Base score as 1. |
164 | 0 | for (const auto& [_, rs_meta] : tablet->tablet_meta()->all_row_binlog_rs_metas()) { |
165 | 0 | if (!rs_meta->is_local() || rs_meta->compaction_level() != level) { |
166 | 0 | continue; |
167 | 0 | } |
168 | 0 | score += rs_meta->get_compaction_score(); |
169 | 0 | } |
170 | 0 | return score; |
171 | 0 | } |
172 | | |
173 | | uint32_t BinlogCompactionPolicy::calc_binlog_compaction_score( |
174 | 0 | Tablet* tablet, int8_t* prefer_compaction_level) const { |
175 | 0 | uint32_t max_score = 0; |
176 | 0 | int8_t max_level = -1; |
177 | 0 | for (int8_t level = 0; level < kBinlogCompactionMaxLevel; ++level) { |
178 | 0 | uint32_t score = calc_binlog_compaction_level_score(tablet, level); |
179 | 0 | if (score > max_score) { |
180 | 0 | max_score = score; |
181 | 0 | max_level = level; |
182 | 0 | } |
183 | 0 | } |
184 | 0 | if (prefer_compaction_level != nullptr) { |
185 | 0 | *prefer_compaction_level = max_level; |
186 | 0 | } |
187 | 0 | return max_score; |
188 | 0 | } |
189 | | |
190 | | void BinlogCompactionPolicy::update_compaction_level( |
191 | | Tablet* tablet, const std::vector<RowsetSharedPtr>& input_rowsets, |
192 | 0 | RowsetSharedPtr output_rowset) { |
193 | 0 | if (tablet->tablet_state() != TABLET_RUNNING || output_rowset->num_segments() == 0) { |
194 | 0 | return; |
195 | 0 | } |
196 | | |
197 | 0 | int64_t first_level = 0; |
198 | 0 | for (size_t i = 0; i < input_rowsets.size(); ++i) { |
199 | 0 | int64_t cur_level = input_rowsets[i]->rowset_meta()->compaction_level(); |
200 | 0 | if (i == 0) { |
201 | 0 | first_level = cur_level; |
202 | 0 | continue; |
203 | 0 | } |
204 | | |
205 | 0 | DCHECK_EQ(first_level, cur_level) |
206 | 0 | << "Compaction level mismatch, first_level: " << first_level |
207 | 0 | << ", cur_level: " << cur_level << ", tablet: " << tablet->tablet_id(); |
208 | 0 | if (first_level != cur_level) { |
209 | 0 | LOG(WARNING) << "Failed to check compaction level, first_level: " << first_level |
210 | 0 | << ", cur_level: " << cur_level << ", tablet: " << tablet->tablet_id(); |
211 | 0 | } |
212 | |
|
213 | 0 | DCHECK_EQ(input_rowsets[i]->start_version(), input_rowsets[i - 1]->end_version() + 1) |
214 | 0 | << "Binlog compaction input rowsets are non-continuous, prev_version: " |
215 | 0 | << input_rowsets[i - 1]->version() |
216 | 0 | << ", cur_version: " << input_rowsets[i]->version() |
217 | 0 | << ", tablet: " << tablet->tablet_id(); |
218 | 0 | if (input_rowsets[i]->start_version() != input_rowsets[i - 1]->end_version() + 1) { |
219 | 0 | LOG(WARNING) << "Failed to check binlog compaction input rowsets continuity, " |
220 | 0 | << "prev_version=" << input_rowsets[i - 1]->version() |
221 | 0 | << ", cur_version=" << input_rowsets[i]->version() |
222 | 0 | << ", tablet=" << tablet->tablet_id(); |
223 | 0 | } |
224 | 0 | } |
225 | |
|
226 | 0 | if (first_level == kBinlogCompactionMaxLevel - 1) { |
227 | | // level max do not update compaction level |
228 | 0 | output_rowset->rowset_meta()->set_compaction_level(first_level); |
229 | 0 | return; |
230 | 0 | } |
231 | | |
232 | 0 | output_rowset->rowset_meta()->set_compaction_level(first_level + 1); |
233 | 0 | } |
234 | | |
235 | | } // namespace doris |