be/src/storage/compaction_task_tracker.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/compaction_task_tracker.h" |
19 | | |
20 | | #include "common/config.h" |
21 | | #include "common/logging.h" |
22 | | |
23 | | namespace doris { |
24 | | |
25 | 18 | const char* to_string(CompactionProfileType type) { |
26 | 18 | switch (type) { |
27 | 4 | case CompactionProfileType::BASE: |
28 | 4 | return "base"; |
29 | 11 | case CompactionProfileType::CUMULATIVE: |
30 | 11 | return "cumulative"; |
31 | 3 | case CompactionProfileType::FULL: |
32 | 3 | return "full"; |
33 | 18 | } |
34 | 0 | return "unknown"; |
35 | 18 | } |
36 | | |
37 | 0 | const char* to_string(CompactionTaskStatus status) { |
38 | 0 | switch (status) { |
39 | 0 | case CompactionTaskStatus::PENDING: |
40 | 0 | return "PENDING"; |
41 | 0 | case CompactionTaskStatus::RUNNING: |
42 | 0 | return "RUNNING"; |
43 | 0 | case CompactionTaskStatus::FINISHED: |
44 | 0 | return "FINISHED"; |
45 | 0 | case CompactionTaskStatus::FAILED: |
46 | 0 | return "FAILED"; |
47 | 0 | } |
48 | 0 | return "UNKNOWN"; |
49 | 0 | } |
50 | | |
51 | 0 | const char* to_string(TriggerMethod method) { |
52 | 0 | switch (method) { |
53 | 0 | case TriggerMethod::AUTO: |
54 | 0 | return "AUTO"; |
55 | 0 | case TriggerMethod::MANUAL: |
56 | 0 | return "MANUAL"; |
57 | 0 | case TriggerMethod::LOAD_TRIGGERED: |
58 | 0 | return "LOAD_TRIGGERED"; |
59 | 0 | } |
60 | 0 | return "UNKNOWN"; |
61 | 0 | } |
62 | | |
63 | 3.07k | CompactionTaskTracker* CompactionTaskTracker::instance() { |
64 | 3.07k | static CompactionTaskTracker s_instance; |
65 | 3.07k | return &s_instance; |
66 | 3.07k | } |
67 | | |
68 | 450 | void CompactionTaskTracker::register_task(CompactionTaskInfo info) { |
69 | 450 | if (!config::enable_compaction_task_tracker) { |
70 | 1 | return; |
71 | 1 | } |
72 | 449 | std::unique_lock wlock(_mutex); |
73 | 449 | _active_tasks[info.compaction_id] = std::move(info); |
74 | 449 | } |
75 | | |
76 | 412 | void CompactionTaskTracker::update_to_running(int64_t compaction_id, const RunningStats& stats) { |
77 | 412 | if (!config::enable_compaction_task_tracker) { |
78 | 0 | return; |
79 | 0 | } |
80 | 412 | std::unique_lock wlock(_mutex); |
81 | 412 | auto it = _active_tasks.find(compaction_id); |
82 | 413 | if (it != _active_tasks.end()) { |
83 | 413 | auto& task = it->second; |
84 | 413 | task.status = CompactionTaskStatus::RUNNING; |
85 | 413 | task.start_time_ms = stats.start_time_ms; |
86 | 413 | task.is_vertical = stats.is_vertical; |
87 | 413 | task.permits = stats.permits; |
88 | 413 | } |
89 | 412 | } |
90 | | |
91 | | void CompactionTaskTracker::update_progress(int64_t compaction_id, int64_t total_groups, |
92 | 403 | int64_t completed_groups) { |
93 | 403 | if (!config::enable_compaction_task_tracker) { |
94 | 0 | return; |
95 | 0 | } |
96 | 403 | std::unique_lock wlock(_mutex); |
97 | 403 | auto it = _active_tasks.find(compaction_id); |
98 | 404 | if (it != _active_tasks.end()) { |
99 | 404 | auto& task = it->second; |
100 | 404 | task.vertical_total_groups = total_groups; |
101 | 404 | task.vertical_completed_groups = completed_groups; |
102 | 404 | } |
103 | 403 | } |
104 | | |
105 | 286 | void CompactionTaskTracker::complete(int64_t compaction_id, const CompletionStats& stats) { |
106 | 286 | if (!config::enable_compaction_task_tracker) { |
107 | 0 | return; |
108 | 0 | } |
109 | 286 | std::unique_lock wlock(_mutex); |
110 | 286 | auto it = _active_tasks.find(compaction_id); |
111 | 286 | if (it == _active_tasks.end()) { |
112 | 1 | LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip"; |
113 | 1 | return; |
114 | 1 | } |
115 | | |
116 | | // Extract the task from active map. |
117 | 285 | auto node = _active_tasks.extract(it); |
118 | 285 | CompactionTaskInfo& info = node.mapped(); |
119 | 285 | info.status = CompactionTaskStatus::FINISHED; |
120 | 285 | _apply_completion(info, stats); |
121 | | |
122 | 285 | if (config::compaction_task_tracker_max_records > 0) { |
123 | 285 | _completed_tasks.push_back(std::move(info)); |
124 | 285 | _trim_completed_locked(); |
125 | 285 | } |
126 | 285 | } |
127 | | |
128 | | void CompactionTaskTracker::fail(int64_t compaction_id, const CompletionStats& stats, |
129 | 139 | const std::string& msg) { |
130 | 139 | if (!config::enable_compaction_task_tracker) { |
131 | 0 | return; |
132 | 0 | } |
133 | 139 | std::unique_lock wlock(_mutex); |
134 | 139 | auto it = _active_tasks.find(compaction_id); |
135 | 139 | if (it == _active_tasks.end()) { |
136 | 1 | LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip"; |
137 | 1 | return; |
138 | 1 | } |
139 | | |
140 | | // Extract the task from active map. |
141 | 138 | auto node = _active_tasks.extract(it); |
142 | 138 | CompactionTaskInfo& info = node.mapped(); |
143 | 138 | info.status = CompactionTaskStatus::FAILED; |
144 | 138 | info.status_msg = msg; |
145 | 138 | _apply_completion(info, stats); |
146 | | |
147 | 138 | if (config::compaction_task_tracker_max_records > 0) { |
148 | 138 | _completed_tasks.push_back(std::move(info)); |
149 | 138 | _trim_completed_locked(); |
150 | 138 | } |
151 | 138 | } |
152 | | |
153 | 412 | void CompactionTaskTracker::remove_task(int64_t compaction_id) { |
154 | 412 | if (!config::enable_compaction_task_tracker) { |
155 | 0 | return; |
156 | 0 | } |
157 | 412 | std::unique_lock wlock(_mutex); |
158 | 412 | _active_tasks.erase(compaction_id); // idempotent: no-op if already removed |
159 | 412 | } |
160 | | |
161 | | void CompactionTaskTracker::_apply_completion(CompactionTaskInfo& info, |
162 | 423 | const CompletionStats& stats) { |
163 | 423 | info.end_time_ms = stats.end_time_ms; |
164 | 423 | info.merged_rows = stats.merged_rows; |
165 | 423 | info.filtered_rows = stats.filtered_rows; |
166 | 423 | info.output_rows = stats.output_rows; |
167 | 423 | info.output_row_num = stats.output_row_num; |
168 | 423 | info.output_data_size = stats.output_data_size; |
169 | 423 | info.output_index_size = stats.output_index_size; |
170 | 423 | info.output_total_size = stats.output_total_size; |
171 | 423 | info.output_segments_num = stats.output_segments_num; |
172 | 423 | info.output_version = stats.output_version; |
173 | 423 | info.merge_latency_ms = stats.merge_latency_ms; |
174 | 423 | info.bytes_read_from_local = stats.bytes_read_from_local; |
175 | 423 | info.bytes_read_from_remote = stats.bytes_read_from_remote; |
176 | 423 | info.peak_memory_bytes = stats.peak_memory_bytes; |
177 | | // Backfill input stats if they were 0 at register time. |
178 | | // Local compaction populates _input_rowsets_data_size etc. in build_basic_info() |
179 | | // which runs inside execute_compact_impl(), after register_task(). |
180 | 423 | if (info.input_version_range.empty() && !stats.input_version_range.empty()) { |
181 | 0 | info.input_version_range = stats.input_version_range; |
182 | 0 | } |
183 | 423 | if (info.input_rowsets_count == 0 && stats.input_rowsets_count > 0) { |
184 | 0 | info.input_rowsets_count = stats.input_rowsets_count; |
185 | 0 | } |
186 | 423 | if (info.input_row_num == 0 && stats.input_row_num > 0) { |
187 | 0 | info.input_row_num = stats.input_row_num; |
188 | 0 | } |
189 | 423 | if (info.input_data_size == 0 && stats.input_data_size > 0) { |
190 | 0 | info.input_data_size = stats.input_data_size; |
191 | 0 | } |
192 | 423 | if (info.input_index_size == 0 && stats.input_index_size > 0) { |
193 | 0 | info.input_index_size = stats.input_index_size; |
194 | 0 | } |
195 | 423 | if (info.input_total_size == 0 && stats.input_total_size > 0) { |
196 | 0 | info.input_total_size = stats.input_total_size; |
197 | 0 | } |
198 | 423 | if (info.input_segments_num == 0 && stats.input_segments_num > 0) { |
199 | 0 | info.input_segments_num = stats.input_segments_num; |
200 | 0 | } |
201 | 423 | } |
202 | | |
203 | 423 | void CompactionTaskTracker::_trim_completed_locked() { |
204 | 423 | int32_t max = config::compaction_task_tracker_max_records; |
205 | 423 | if (max <= 0) { |
206 | 0 | _completed_tasks.clear(); |
207 | 0 | return; |
208 | 0 | } |
209 | 428 | while (static_cast<int32_t>(_completed_tasks.size()) > max) { |
210 | 5 | _completed_tasks.pop_front(); |
211 | 5 | } |
212 | 423 | } |
213 | | |
214 | 427 | std::vector<CompactionTaskInfo> CompactionTaskTracker::get_all_tasks() const { |
215 | 427 | std::shared_lock rlock(_mutex); |
216 | 427 | std::vector<CompactionTaskInfo> result; |
217 | 427 | result.reserve(_active_tasks.size() + _completed_tasks.size()); |
218 | 2.43k | for (const auto& [id, info] : _active_tasks) { |
219 | 2.43k | result.push_back(info); |
220 | 2.43k | } |
221 | 76.4k | for (const auto& info : _completed_tasks) { |
222 | 76.4k | result.push_back(info); |
223 | 76.4k | } |
224 | 427 | return result; |
225 | 427 | } |
226 | | |
227 | | std::vector<CompactionTaskInfo> CompactionTaskTracker::get_completed_tasks( |
228 | | int64_t tablet_id, int64_t top_n, const std::string& compaction_type, |
229 | 13 | int success_filter) const { |
230 | 13 | int32_t max = config::compaction_task_tracker_max_records; |
231 | 13 | if (max <= 0) { |
232 | 0 | return {}; |
233 | 0 | } |
234 | | |
235 | 13 | std::shared_lock rlock(_mutex); |
236 | 13 | std::vector<CompactionTaskInfo> result; |
237 | 13 | int32_t count = 0; |
238 | | // Iterate in reverse order (newest first). |
239 | 67 | for (auto it = _completed_tasks.rbegin(); it != _completed_tasks.rend(); ++it) { |
240 | 56 | if (count >= max) { |
241 | 0 | break; |
242 | 0 | } |
243 | 56 | count++; |
244 | 56 | const auto& record = *it; |
245 | 56 | if (tablet_id != 0 && record.tablet_id != tablet_id) { |
246 | 7 | continue; |
247 | 7 | } |
248 | 49 | if (!compaction_type.empty() && compaction_type != to_string(record.compaction_type)) { |
249 | 11 | continue; |
250 | 11 | } |
251 | 38 | if (success_filter == 1 && record.status != CompactionTaskStatus::FINISHED) { |
252 | 1 | continue; |
253 | 1 | } |
254 | 37 | if (success_filter == 0 && record.status != CompactionTaskStatus::FAILED) { |
255 | 4 | continue; |
256 | 4 | } |
257 | 33 | result.push_back(record); |
258 | 33 | if (top_n > 0 && static_cast<int64_t>(result.size()) >= top_n) { |
259 | 2 | break; |
260 | 2 | } |
261 | 33 | } |
262 | 13 | return result; |
263 | 13 | } |
264 | | |
265 | 15 | void CompactionTaskTracker::clear_for_test() { |
266 | 15 | std::unique_lock wlock(_mutex); |
267 | 15 | _active_tasks.clear(); |
268 | 15 | _completed_tasks.clear(); |
269 | 15 | } |
270 | | |
271 | | } // namespace doris |