Coverage Report

Created: 2026-06-03 03:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction_task_tracker.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <cstdint>
22
#include <deque>
23
#include <shared_mutex>
24
#include <string>
25
#include <unordered_map>
26
#include <vector>
27
28
namespace doris {
29
30
// Standardized compaction type enum covering base / cumulative / full.
31
enum class CompactionProfileType : uint8_t {
32
    BASE = 0,
33
    CUMULATIVE = 1,
34
    FULL = 2,
35
    BINLOG = 3,
36
};
37
38
const char* to_string(CompactionProfileType type);
39
40
// Task lifecycle status.
41
enum class CompactionTaskStatus : uint8_t {
42
    PENDING = 0,
43
    RUNNING = 1,
44
    FINISHED = 2,
45
    FAILED = 3,
46
};
47
48
const char* to_string(CompactionTaskStatus status);
49
50
// How the compaction was triggered.
51
enum class TriggerMethod : uint8_t {
52
    AUTO = 0,
53
    MANUAL = 1,
54
    LOAD_TRIGGERED = 2,
55
};
56
57
const char* to_string(TriggerMethod method);
58
59
// Incremental info when transitioning from PENDING to RUNNING.
60
struct RunningStats {
61
    int64_t start_time_ms {0};
62
    bool is_vertical {false};
63
    int64_t permits {0};
64
};
65
66
// Result info collected when a task completes or fails.
67
struct CompletionStats {
68
    // Input stats backfill: local compaction populates these in build_basic_info()
69
    // which runs inside execute_compact_impl(), after register_task().
70
    std::string input_version_range;
71
    int64_t input_rowsets_count {0};
72
    int64_t input_row_num {0};
73
    int64_t input_data_size {0};
74
    int64_t input_index_size {0};
75
    int64_t input_total_size {0};
76
    int64_t input_segments_num {0};
77
    int64_t end_time_ms {0};
78
    int64_t merged_rows {0};
79
    int64_t filtered_rows {0};
80
    int64_t output_rows {0}; // _stats.output_rows (Merger statistics)
81
    int64_t output_row_num {0};
82
    int64_t output_data_size {0};
83
    int64_t output_index_size {0}; // _output_rowset->index_disk_size()
84
    int64_t output_total_size {0}; // _output_rowset->total_disk_size()
85
    int64_t output_segments_num {0};
86
    std::string output_version;
87
    bool is_ordered_data_compaction {false};
88
    int64_t merge_latency_ms {0}; // _merge_rowsets_latency_timer (converted to ms)
89
    int64_t bytes_read_from_local {0};
90
    int64_t bytes_read_from_remote {0};
91
    int64_t peak_memory_bytes {0};
92
};
93
94
// Unified metadata describing a compaction task across its full lifecycle.
95
struct CompactionTaskInfo {
96
    // ===== Identity =====
97
    int64_t compaction_id {0}; // unique task ID assigned by Tracker
98
    int64_t backend_id {0};    // BE node ID
99
    int64_t table_id {0};      // table ID
100
    int64_t partition_id {0};  // partition ID
101
    int64_t tablet_id {0};     // tablet ID
102
103
    // ===== Task attributes =====
104
    CompactionProfileType compaction_type {CompactionProfileType::BASE};
105
    CompactionTaskStatus status {CompactionTaskStatus::PENDING};
106
    TriggerMethod trigger_method {TriggerMethod::AUTO};
107
    int64_t compaction_score {0}; // tablet compaction score at register time
108
109
    // ===== Timestamps =====
110
    int64_t scheduled_time_ms {0}; // task registration time
111
    int64_t start_time_ms {0};     // task execution start time (0 while PENDING)
112
    int64_t end_time_ms {0};       // task end time (0 while not completed)
113
114
    // ===== Input statistics (available after prepare_compact) =====
115
    int64_t input_rowsets_count {0};
116
    int64_t input_row_num {0};
117
    int64_t input_data_size {0};  // bytes, corresponds to _input_rowsets_data_size
118
    int64_t input_index_size {0}; // bytes, corresponds to _input_rowsets_index_size
119
    int64_t input_total_size {0}; // bytes, = data + index
120
    int64_t input_segments_num {0};
121
    std::string input_version_range; // e.g. "[0-5]"
122
123
    // ===== Output statistics (written at complete/fail) =====
124
    int64_t merged_rows {0};
125
    int64_t filtered_rows {0};
126
    int64_t output_rows {0};       // Merger output rows (_stats.output_rows; 0 for ordered path)
127
    int64_t output_row_num {0};    // from _output_rowset->num_rows()
128
    int64_t output_data_size {0};  // bytes, from _output_rowset->data_disk_size()
129
    int64_t output_index_size {0}; // bytes, from _output_rowset->index_disk_size()
130
    int64_t output_total_size {0}; // bytes, from _output_rowset->total_disk_size()
131
    int64_t output_segments_num {0};
132
    std::string output_version; // e.g. "[0-5]"
133
    bool is_ordered_data_compaction {
134
            false}; // ordered/meta-link compaction path, including binlog quick path
135
136
    // ===== Merge performance =====
137
    int64_t merge_latency_ms {0}; // merge rowsets latency (ms; 0 for ordered path)
138
139
    // ===== IO statistics (written at complete/fail) =====
140
    int64_t bytes_read_from_local {0};
141
    int64_t bytes_read_from_remote {0};
142
143
    // ===== Resources =====
144
    int64_t peak_memory_bytes {0}; // peak memory usage (bytes)
145
    bool is_vertical {false};      // whether vertical merge is used
146
    int64_t permits {0};           // compaction permits used
147
148
    // ===== Vertical compaction progress =====
149
    int64_t vertical_total_groups {0}; // total column groups (0 for horizontal)
150
    int64_t vertical_completed_groups {
151
            0}; // completed column groups (updated in real-time during RUNNING)
152
153
    // ===== Error =====
154
    std::string status_msg; // failure message (empty on success)
155
};
156
157
// Global singleton managing compaction task lifecycle.
158
// Receives push reports from compaction entries and execution layer,
159
// provides pull query interfaces for system table and HTTP API.
160
class CompactionTaskTracker {
161
public:
162
    static CompactionTaskTracker* instance();
163
164
    // ID allocation: globally unique monotonically increasing, restarts from 1 after BE restart.
165
343k
    int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); }
166
167
    // ===== Push interfaces: lifecycle management (write lock) =====
168
    // All push interfaces are no-op when enable_compaction_task_tracker=false.
169
    void register_task(CompactionTaskInfo info);
170
    void update_to_running(int64_t compaction_id, const RunningStats& stats);
171
    void update_progress(int64_t compaction_id, int64_t total_groups, int64_t completed_groups);
172
    void complete(int64_t compaction_id, const CompletionStats& stats);
173
    void fail(int64_t compaction_id, const CompletionStats& stats, const std::string& msg);
174
    void remove_task(int64_t compaction_id);
175
176
    // ===== Pull interfaces: queries (read lock) =====
177
    // For system table: returns full snapshot copy of _active_tasks + _completed_tasks.
178
    std::vector<CompactionTaskInfo> get_all_tasks() const;
179
180
    // For HTTP API: iterates _completed_tasks only, returns filtered subset copy.
181
    std::vector<CompactionTaskInfo> get_completed_tasks(int64_t tablet_id = 0, int64_t top_n = 0,
182
                                                        const std::string& compaction_type = "",
183
                                                        int success_filter = -1) const;
184
185
    // Test only: clear all active and completed tasks.
186
    void clear_for_test();
187
188
private:
189
8
    CompactionTaskTracker() = default;
190
191
    void _apply_completion(CompactionTaskInfo& info, const CompletionStats& stats);
192
    void _trim_completed_locked();
193
194
    std::atomic<int64_t> _next_id {1};
195
196
    mutable std::shared_mutex _mutex;
197
198
    // Active tasks (PENDING + RUNNING), indexed by compaction_id.
199
    // Removed on complete/fail and moved to _completed_tasks.
200
    std::unordered_map<int64_t, CompactionTaskInfo> _active_tasks;
201
202
    // Completed tasks (FINISHED + FAILED), FIFO ring buffer.
203
    // Oldest records are evicted when exceeding compaction_task_tracker_max_records.
204
    std::deque<CompactionTaskInfo> _completed_tasks;
205
};
206
207
} // namespace doris