Coverage Report

Created: 2026-06-09 13:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction_task_tracker.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/compaction_task_tracker.h"
19
20
#include "common/config.h"
21
#include "common/logging.h"
22
23
namespace doris {
24
25
17.4k
const char* to_string(CompactionProfileType type) {
26
17.4k
    switch (type) {
27
41
    case CompactionProfileType::BASE:
28
41
        return "base";
29
17.3k
    case CompactionProfileType::CUMULATIVE:
30
17.3k
        return "cumulative";
31
45
    case CompactionProfileType::FULL:
32
45
        return "full";
33
0
    case CompactionProfileType::BINLOG:
34
0
        return "binlog";
35
17.4k
    }
36
0
    return "unknown";
37
17.4k
}
38
39
12.1k
const char* to_string(CompactionTaskStatus status) {
40
12.1k
    switch (status) {
41
0
    case CompactionTaskStatus::PENDING:
42
0
        return "PENDING";
43
24
    case CompactionTaskStatus::RUNNING:
44
24
        return "RUNNING";
45
12.0k
    case CompactionTaskStatus::FINISHED:
46
12.0k
        return "FINISHED";
47
72
    case CompactionTaskStatus::FAILED:
48
72
        return "FAILED";
49
12.1k
    }
50
0
    return "UNKNOWN";
51
12.1k
}
52
53
15.6k
const char* to_string(TriggerMethod method) {
54
15.6k
    switch (method) {
55
13.6k
    case TriggerMethod::AUTO:
56
13.6k
        return "AUTO";
57
1.99k
    case TriggerMethod::MANUAL:
58
1.99k
        return "MANUAL";
59
0
    case TriggerMethod::LOAD_TRIGGERED:
60
0
        return "LOAD_TRIGGERED";
61
15.6k
    }
62
0
    return "UNKNOWN";
63
15.6k
}
64
65
354k
CompactionTaskTracker* CompactionTaskTracker::instance() {
66
354k
    static CompactionTaskTracker s_instance;
67
354k
    return &s_instance;
68
354k
}
69
70
9.05k
void CompactionTaskTracker::register_task(CompactionTaskInfo info) {
71
9.05k
    if (!config::enable_compaction_task_tracker) {
72
1
        return;
73
1
    }
74
9.05k
    std::unique_lock wlock(_mutex);
75
9.05k
    _active_tasks[info.compaction_id] = std::move(info);
76
9.05k
}
77
78
8.70k
void CompactionTaskTracker::update_to_running(int64_t compaction_id, const RunningStats& stats) {
79
8.70k
    if (!config::enable_compaction_task_tracker) {
80
0
        return;
81
0
    }
82
8.70k
    std::unique_lock wlock(_mutex);
83
8.70k
    auto it = _active_tasks.find(compaction_id);
84
8.70k
    if (it != _active_tasks.end()) {
85
8.70k
        auto& task = it->second;
86
8.70k
        task.status = CompactionTaskStatus::RUNNING;
87
8.70k
        task.start_time_ms = stats.start_time_ms;
88
8.70k
        task.is_vertical = stats.is_vertical;
89
8.70k
        task.permits = stats.permits;
90
8.70k
    }
91
8.70k
}
92
93
void CompactionTaskTracker::update_progress(int64_t compaction_id, int64_t total_groups,
94
32.5k
                                            int64_t completed_groups) {
95
32.5k
    if (!config::enable_compaction_task_tracker) {
96
0
        return;
97
0
    }
98
32.5k
    std::unique_lock wlock(_mutex);
99
32.5k
    auto it = _active_tasks.find(compaction_id);
100
32.5k
    if (it != _active_tasks.end()) {
101
30.1k
        auto& task = it->second;
102
30.1k
        task.vertical_total_groups = total_groups;
103
30.1k
        task.vertical_completed_groups = completed_groups;
104
30.1k
    }
105
32.5k
}
106
107
8.50k
void CompactionTaskTracker::complete(int64_t compaction_id, const CompletionStats& stats) {
108
8.50k
    if (!config::enable_compaction_task_tracker) {
109
0
        return;
110
0
    }
111
8.50k
    std::unique_lock wlock(_mutex);
112
8.50k
    auto it = _active_tasks.find(compaction_id);
113
8.50k
    if (it == _active_tasks.end()) {
114
1
        LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip";
115
1
        return;
116
1
    }
117
118
    // Extract the task from active map.
119
8.50k
    auto node = _active_tasks.extract(it);
120
8.50k
    CompactionTaskInfo& info = node.mapped();
121
8.50k
    info.status = CompactionTaskStatus::FINISHED;
122
8.50k
    _apply_completion(info, stats);
123
124
8.52k
    if (config::compaction_task_tracker_max_records > 0) {
125
8.52k
        _completed_tasks.push_back(std::move(info));
126
8.52k
        _trim_completed_locked();
127
8.52k
    }
128
8.50k
}
129
130
void CompactionTaskTracker::fail(int64_t compaction_id, const CompletionStats& stats,
131
188
                                 const std::string& msg) {
132
188
    if (!config::enable_compaction_task_tracker) {
133
0
        return;
134
0
    }
135
188
    std::unique_lock wlock(_mutex);
136
188
    auto it = _active_tasks.find(compaction_id);
137
188
    if (it == _active_tasks.end()) {
138
1
        LOG(WARNING) << "compaction_id " << compaction_id << " not found in active_tasks, skip";
139
1
        return;
140
1
    }
141
142
    // Extract the task from active map.
143
187
    auto node = _active_tasks.extract(it);
144
187
    CompactionTaskInfo& info = node.mapped();
145
187
    info.status = CompactionTaskStatus::FAILED;
146
187
    info.status_msg = msg;
147
187
    _apply_completion(info, stats);
148
149
187
    if (config::compaction_task_tracker_max_records > 0) {
150
187
        _completed_tasks.push_back(std::move(info));
151
187
        _trim_completed_locked();
152
187
    }
153
187
}
154
155
9.01k
void CompactionTaskTracker::remove_task(int64_t compaction_id) {
156
9.01k
    if (!config::enable_compaction_task_tracker) {
157
0
        return;
158
0
    }
159
9.01k
    std::unique_lock wlock(_mutex);
160
9.01k
    _active_tasks.erase(compaction_id); // idempotent: no-op if already removed
161
9.01k
}
162
163
void CompactionTaskTracker::_apply_completion(CompactionTaskInfo& info,
164
8.71k
                                              const CompletionStats& stats) {
165
8.71k
    info.end_time_ms = stats.end_time_ms;
166
8.71k
    info.merged_rows = stats.merged_rows;
167
8.71k
    info.filtered_rows = stats.filtered_rows;
168
8.71k
    info.output_rows = stats.output_rows;
169
8.71k
    info.output_row_num = stats.output_row_num;
170
8.71k
    info.output_data_size = stats.output_data_size;
171
8.71k
    info.output_index_size = stats.output_index_size;
172
8.71k
    info.output_total_size = stats.output_total_size;
173
8.71k
    info.output_segments_num = stats.output_segments_num;
174
8.71k
    info.output_version = stats.output_version;
175
8.71k
    info.is_ordered_data_compaction = stats.is_ordered_data_compaction;
176
8.71k
    info.merge_latency_ms = stats.merge_latency_ms;
177
8.71k
    info.bytes_read_from_local = stats.bytes_read_from_local;
178
8.71k
    info.bytes_read_from_remote = stats.bytes_read_from_remote;
179
8.71k
    info.peak_memory_bytes = stats.peak_memory_bytes;
180
    // Backfill input stats if they were 0 at register time.
181
    // Local compaction populates _input_rowsets_data_size etc. in build_basic_info()
182
    // which runs inside execute_compact_impl(), after register_task().
183
8.71k
    if (info.input_version_range.empty() && !stats.input_version_range.empty()) {
184
0
        info.input_version_range = stats.input_version_range;
185
0
    }
186
8.71k
    if (info.input_rowsets_count == 0 && stats.input_rowsets_count > 0) {
187
0
        info.input_rowsets_count = stats.input_rowsets_count;
188
0
    }
189
8.71k
    if (info.input_row_num == 0 && stats.input_row_num > 0) {
190
1.76k
        info.input_row_num = stats.input_row_num;
191
1.76k
    }
192
8.71k
    if (info.input_data_size == 0 && stats.input_data_size > 0) {
193
1.76k
        info.input_data_size = stats.input_data_size;
194
1.76k
    }
195
8.71k
    if (info.input_index_size == 0 && stats.input_index_size > 0) {
196
2
        info.input_index_size = stats.input_index_size;
197
2
    }
198
8.71k
    if (info.input_total_size == 0 && stats.input_total_size > 0) {
199
1.76k
        info.input_total_size = stats.input_total_size;
200
1.76k
    }
201
8.71k
    if (info.input_segments_num == 0 && stats.input_segments_num > 0) {
202
1.76k
        info.input_segments_num = stats.input_segments_num;
203
1.76k
    }
204
8.71k
}
205
206
8.71k
void CompactionTaskTracker::_trim_completed_locked() {
207
8.71k
    int32_t max = config::compaction_task_tracker_max_records;
208
8.71k
    if (max <= 0) {
209
0
        _completed_tasks.clear();
210
0
        return;
211
0
    }
212
8.72k
    while (static_cast<int32_t>(_completed_tasks.size()) > max) {
213
5
        _completed_tasks.pop_front();
214
5
    }
215
8.71k
}
216
217
434
std::vector<CompactionTaskInfo> CompactionTaskTracker::get_all_tasks() const {
218
434
    std::shared_lock rlock(_mutex);
219
434
    std::vector<CompactionTaskInfo> result;
220
434
    result.reserve(_active_tasks.size() + _completed_tasks.size());
221
2.42k
    for (const auto& [id, info] : _active_tasks) {
222
2.42k
        result.push_back(info);
223
2.42k
    }
224
88.6k
    for (const auto& info : _completed_tasks) {
225
88.6k
        result.push_back(info);
226
88.6k
    }
227
434
    return result;
228
434
}
229
230
std::vector<CompactionTaskInfo> CompactionTaskTracker::get_completed_tasks(
231
        int64_t tablet_id, int64_t top_n, const std::string& compaction_type,
232
19
        int success_filter) const {
233
19
    int32_t max = config::compaction_task_tracker_max_records;
234
19
    if (max <= 0) {
235
0
        return {};
236
0
    }
237
238
19
    std::shared_lock rlock(_mutex);
239
19
    std::vector<CompactionTaskInfo> result;
240
19
    int32_t count = 0;
241
    // Iterate in reverse order (newest first).
242
7.30k
    for (auto it = _completed_tasks.rbegin(); it != _completed_tasks.rend(); ++it) {
243
7.29k
        if (count >= max) {
244
0
            break;
245
0
        }
246
7.29k
        count++;
247
7.29k
        const auto& record = *it;
248
7.29k
        if (tablet_id != 0 && record.tablet_id != tablet_id) {
249
3.70k
            continue;
250
3.70k
        }
251
3.58k
        if (!compaction_type.empty() && compaction_type != to_string(record.compaction_type)) {
252
15
            continue;
253
15
        }
254
3.57k
        if (success_filter == 1 && record.status != CompactionTaskStatus::FINISHED) {
255
1
            continue;
256
1
        }
257
3.56k
        if (success_filter == 0 && record.status != CompactionTaskStatus::FAILED) {
258
4
            continue;
259
4
        }
260
3.56k
        result.push_back(record);
261
3.56k
        if (top_n > 0 && static_cast<int64_t>(result.size()) >= top_n) {
262
4
            break;
263
4
        }
264
3.56k
    }
265
19
    return result;
266
19
}
267
268
15
void CompactionTaskTracker::clear_for_test() {
269
15
    std::unique_lock wlock(_mutex);
270
15
    _active_tasks.clear();
271
15
    _completed_tasks.clear();
272
15
}
273
274
} // namespace doris