Coverage Report

Created: 2026-04-10 05:08

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction_task_tracker.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction_task_tracker.h"
19
20
#include "common/config.h"
21
#include "common/logging.h"
22
23
namespace doris {
24
25
17.9k
const char* to_string(CompactionProfileType type) {
26
17.9k
    switch (type) {
27
83
    case CompactionProfileType::BASE:
28
83
        return "base";
29
17.8k
    case CompactionProfileType::CUMULATIVE:
30
17.8k
        return "cumulative";
31
45
    case CompactionProfileType::FULL:
32
45
        return "full";
33
17.9k
    }
34
0
    return "unknown";
35
17.9k
}
36
37
12.4k
const char* to_string(CompactionTaskStatus status) {
38
12.4k
    switch (status) {
39
0
    case CompactionTaskStatus::PENDING:
40
0
        return "PENDING";
41
4
    case CompactionTaskStatus::RUNNING:
42
4
        return "RUNNING";
43
12.3k
    case CompactionTaskStatus::FINISHED:
44
12.3k
        return "FINISHED";
45
78
    case CompactionTaskStatus::FAILED:
46
78
        return "FAILED";
47
12.4k
    }
48
0
    return "UNKNOWN";
49
12.4k
}
50
51
16.1k
const char* to_string(TriggerMethod method) {
52
16.1k
    switch (method) {
53
14.1k
    case TriggerMethod::AUTO:
54
14.1k
        return "AUTO";
55
1.92k
    case TriggerMethod::MANUAL:
56
1.92k
        return "MANUAL";
57
0
    case TriggerMethod::LOAD_TRIGGERED:
58
0
        return "LOAD_TRIGGERED";
59
16.1k
    }
60
0
    return "UNKNOWN";
61
16.1k
}
62
63
272k
CompactionTaskTracker* CompactionTaskTracker::instance() {
64
272k
    static CompactionTaskTracker s_instance;
65
272k
    return &s_instance;
66
272k
}
67
68
7.22k
void CompactionTaskTracker::register_task(CompactionTaskInfo info) {
69
7.22k
    if (!config::enable_compaction_task_tracker) {
70
1
        return;
71
1
    }
72
7.22k
    std::unique_lock wlock(_mutex);
73
7.22k
    _active_tasks[info.compaction_id] = std::move(info);
74
7.22k
}
75
76
6.72k
void CompactionTaskTracker::update_to_running(int64_t compaction_id, const RunningStats& stats) {
77
6.72k
    if (!config::enable_compaction_task_tracker) {
78
0
        return;
79
0
    }
80
6.72k
    std::unique_lock wlock(_mutex);
81
6.72k
    auto it = _active_tasks.find(compaction_id);
82
6.72k
    if (it != _active_tasks.end()) {
83
6.72k
        auto& task = it->second;
84
6.72k
        task.status = CompactionTaskStatus::RUNNING;
85
6.72k
        task.start_time_ms = stats.start_time_ms;
86
6.72k
        task.is_vertical = stats.is_vertical;
87
6.72k
        task.permits = stats.permits;
88
6.72k
    }
89
6.72k
}
90
91
void CompactionTaskTracker::update_progress(int64_t compaction_id, int64_t total_groups,
92
27.1k
                                            int64_t completed_groups) {
93
27.1k
    if (!config::enable_compaction_task_tracker) {
94
0
        return;
95
0
    }
96
27.1k
    std::unique_lock wlock(_mutex);
97
27.1k
    auto it = _active_tasks.find(compaction_id);
98
27.1k
    if (it != _active_tasks.end()) {
99
24.4k
        auto& task = it->second;
100
24.4k
        task.vertical_total_groups = total_groups;
101
24.4k
        task.vertical_completed_groups = completed_groups;
102
24.4k
    }
103
27.1k
}
104
105
6.52k
void CompactionTaskTracker::complete(int64_t compaction_id, const CompletionStats& stats) {
106
6.52k
    if (!config::enable_compaction_task_tracker) {
107
0
        return;
108
0
    }
109
6.52k
    std::unique_lock wlock(_mutex);
110
6.52k
    auto it = _active_tasks.find(compaction_id);
111
6.52k
    if (it == _active_tasks.end()) {
112
1
        LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip";
113
1
        return;
114
1
    }
115
116
    // Extract the task from active map.
117
6.52k
    auto node = _active_tasks.extract(it);
118
6.52k
    CompactionTaskInfo& info = node.mapped();
119
6.52k
    info.status = CompactionTaskStatus::FINISHED;
120
6.52k
    _apply_completion(info, stats);
121
122
6.53k
    if (config::compaction_task_tracker_max_records > 0) {
123
6.53k
        _completed_tasks.push_back(std::move(info));
124
6.53k
        _trim_completed_locked();
125
6.53k
    }
126
6.52k
}
127
128
void CompactionTaskTracker::fail(int64_t compaction_id, const CompletionStats& stats,
129
195
                                 const std::string& msg) {
130
195
    if (!config::enable_compaction_task_tracker) {
131
0
        return;
132
0
    }
133
195
    std::unique_lock wlock(_mutex);
134
195
    auto it = _active_tasks.find(compaction_id);
135
195
    if (it == _active_tasks.end()) {
136
1
        LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip";
137
1
        return;
138
1
    }
139
140
    // Extract the task from active map.
141
194
    auto node = _active_tasks.extract(it);
142
194
    CompactionTaskInfo& info = node.mapped();
143
194
    info.status = CompactionTaskStatus::FAILED;
144
194
    info.status_msg = msg;
145
194
    _apply_completion(info, stats);
146
147
194
    if (config::compaction_task_tracker_max_records > 0) {
148
194
        _completed_tasks.push_back(std::move(info));
149
194
        _trim_completed_locked();
150
194
    }
151
194
}
152
153
7.18k
void CompactionTaskTracker::remove_task(int64_t compaction_id) {
154
7.18k
    if (!config::enable_compaction_task_tracker) {
155
0
        return;
156
0
    }
157
7.18k
    std::unique_lock wlock(_mutex);
158
7.18k
    _active_tasks.erase(compaction_id); // idempotent: no-op if already removed
159
7.18k
}
160
161
void CompactionTaskTracker::_apply_completion(CompactionTaskInfo& info,
162
6.73k
                                              const CompletionStats& stats) {
163
6.73k
    info.end_time_ms = stats.end_time_ms;
164
6.73k
    info.merged_rows = stats.merged_rows;
165
6.73k
    info.filtered_rows = stats.filtered_rows;
166
6.73k
    info.output_rows = stats.output_rows;
167
6.73k
    info.output_row_num = stats.output_row_num;
168
6.73k
    info.output_data_size = stats.output_data_size;
169
6.73k
    info.output_index_size = stats.output_index_size;
170
6.73k
    info.output_total_size = stats.output_total_size;
171
6.73k
    info.output_segments_num = stats.output_segments_num;
172
6.73k
    info.output_version = stats.output_version;
173
6.73k
    info.merge_latency_ms = stats.merge_latency_ms;
174
6.73k
    info.bytes_read_from_local = stats.bytes_read_from_local;
175
6.73k
    info.bytes_read_from_remote = stats.bytes_read_from_remote;
176
6.73k
    info.peak_memory_bytes = stats.peak_memory_bytes;
177
    // Backfill input stats if they were 0 at register time.
178
    // Local compaction populates _input_rowsets_data_size etc. in build_basic_info()
179
    // which runs inside execute_compact_impl(), after register_task().
180
6.73k
    if (info.input_version_range.empty() && !stats.input_version_range.empty()) {
181
0
        info.input_version_range = stats.input_version_range;
182
0
    }
183
6.73k
    if (info.input_rowsets_count == 0 && stats.input_rowsets_count > 0) {
184
0
        info.input_rowsets_count = stats.input_rowsets_count;
185
0
    }
186
6.73k
    if (info.input_row_num == 0 && stats.input_row_num > 0) {
187
42
        info.input_row_num = stats.input_row_num;
188
42
    }
189
6.73k
    if (info.input_data_size == 0 && stats.input_data_size > 0) {
190
42
        info.input_data_size = stats.input_data_size;
191
42
    }
192
6.73k
    if (info.input_index_size == 0 && stats.input_index_size > 0) {
193
0
        info.input_index_size = stats.input_index_size;
194
0
    }
195
6.73k
    if (info.input_total_size == 0 && stats.input_total_size > 0) {
196
42
        info.input_total_size = stats.input_total_size;
197
42
    }
198
6.73k
    if (info.input_segments_num == 0 && stats.input_segments_num > 0) {
199
42
        info.input_segments_num = stats.input_segments_num;
200
42
    }
201
6.73k
}
202
203
6.73k
void CompactionTaskTracker::_trim_completed_locked() {
204
6.73k
    int32_t max = config::compaction_task_tracker_max_records;
205
6.73k
    if (max <= 0) {
206
0
        _completed_tasks.clear();
207
0
        return;
208
0
    }
209
6.73k
    while (static_cast<int32_t>(_completed_tasks.size()) > max) {
210
5
        _completed_tasks.pop_front();
211
5
    }
212
6.73k
}
213
214
431
std::vector<CompactionTaskInfo> CompactionTaskTracker::get_all_tasks() const {
215
431
    std::shared_lock rlock(_mutex);
216
431
    std::vector<CompactionTaskInfo> result;
217
431
    result.reserve(_active_tasks.size() + _completed_tasks.size());
218
2.31k
    for (const auto& [id, info] : _active_tasks) {
219
2.31k
        result.push_back(info);
220
2.31k
    }
221
89.6k
    for (const auto& info : _completed_tasks) {
222
89.6k
        result.push_back(info);
223
89.6k
    }
224
431
    return result;
225
431
}
226
227
std::vector<CompactionTaskInfo> CompactionTaskTracker::get_completed_tasks(
228
        int64_t tablet_id, int64_t top_n, const std::string& compaction_type,
229
19
        int success_filter) const {
230
19
    int32_t max = config::compaction_task_tracker_max_records;
231
19
    if (max <= 0) {
232
0
        return {};
233
0
    }
234
235
19
    std::shared_lock rlock(_mutex);
236
19
    std::vector<CompactionTaskInfo> result;
237
19
    int32_t count = 0;
238
    // Iterate in reverse order (newest first).
239
7.55k
    for (auto it = _completed_tasks.rbegin(); it != _completed_tasks.rend(); ++it) {
240
7.54k
        if (count >= max) {
241
0
            break;
242
0
        }
243
7.54k
        count++;
244
7.54k
        const auto& record = *it;
245
7.54k
        if (tablet_id != 0 && record.tablet_id != tablet_id) {
246
3.84k
            continue;
247
3.84k
        }
248
3.69k
        if (!compaction_type.empty() && compaction_type != to_string(record.compaction_type)) {
249
16
            continue;
250
16
        }
251
3.67k
        if (success_filter == 1 && record.status != CompactionTaskStatus::FINISHED) {
252
1
            continue;
253
1
        }
254
3.67k
        if (success_filter == 0 && record.status != CompactionTaskStatus::FAILED) {
255
4
            continue;
256
4
        }
257
3.67k
        result.push_back(record);
258
3.67k
        if (top_n > 0 && static_cast<int64_t>(result.size()) >= top_n) {
259
4
            break;
260
4
        }
261
3.67k
    }
262
19
    return result;
263
19
}
264
265
15
void CompactionTaskTracker::clear_for_test() {
266
15
    std::unique_lock wlock(_mutex);
267
15
    _active_tasks.clear();
268
15
    _completed_tasks.clear();
269
15
}
270
271
} // namespace doris