Coverage Report

Created: 2026-04-07 21:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/compaction_task_tracker.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <atomic>
21
#include <cstdint>
22
#include <deque>
23
#include <shared_mutex>
24
#include <string>
25
#include <unordered_map>
26
#include <vector>
27
28
namespace doris {
29
30
// Standardized compaction type enum covering base / cumulative / full.
31
enum class CompactionProfileType : uint8_t {
32
    BASE = 0,
33
    CUMULATIVE = 1,
34
    FULL = 2,
35
};
36
37
const char* to_string(CompactionProfileType type);
38
39
// Task lifecycle status.
40
enum class CompactionTaskStatus : uint8_t {
41
    PENDING = 0,
42
    RUNNING = 1,
43
    FINISHED = 2,
44
    FAILED = 3,
45
};
46
47
const char* to_string(CompactionTaskStatus status);
48
49
// How the compaction was triggered.
50
enum class TriggerMethod : uint8_t {
51
    AUTO = 0,
52
    MANUAL = 1,
53
    LOAD_TRIGGERED = 2,
54
};
55
56
const char* to_string(TriggerMethod method);
57
58
// Incremental info when transitioning from PENDING to RUNNING.
59
struct RunningStats {
60
    int64_t start_time_ms {0};
61
    bool is_vertical {false};
62
    int64_t permits {0};
63
};
64
65
// Result info collected when a task completes or fails.
66
struct CompletionStats {
67
    // Input stats backfill: local compaction populates these in build_basic_info()
68
    // which runs inside execute_compact_impl(), after register_task().
69
    std::string input_version_range;
70
    int64_t input_rowsets_count {0};
71
    int64_t input_row_num {0};
72
    int64_t input_data_size {0};
73
    int64_t input_index_size {0};
74
    int64_t input_total_size {0};
75
    int64_t input_segments_num {0};
76
    int64_t end_time_ms {0};
77
    int64_t merged_rows {0};
78
    int64_t filtered_rows {0};
79
    int64_t output_rows {0}; // _stats.output_rows (Merger statistics)
80
    int64_t output_row_num {0};
81
    int64_t output_data_size {0};
82
    int64_t output_index_size {0}; // _output_rowset->index_disk_size()
83
    int64_t output_total_size {0}; // _output_rowset->total_disk_size()
84
    int64_t output_segments_num {0};
85
    std::string output_version;
86
    int64_t merge_latency_ms {0}; // _merge_rowsets_latency_timer (converted to ms)
87
    int64_t bytes_read_from_local {0};
88
    int64_t bytes_read_from_remote {0};
89
    int64_t peak_memory_bytes {0};
90
};
91
92
// Unified metadata describing a compaction task across its full lifecycle.
93
struct CompactionTaskInfo {
94
    // ===== Identity =====
95
    int64_t compaction_id {0}; // unique task ID assigned by Tracker
96
    int64_t backend_id {0};    // BE node ID
97
    int64_t table_id {0};      // table ID
98
    int64_t partition_id {0};  // partition ID
99
    int64_t tablet_id {0};     // tablet ID
100
101
    // ===== Task attributes =====
102
    CompactionProfileType compaction_type {CompactionProfileType::BASE};
103
    CompactionTaskStatus status {CompactionTaskStatus::PENDING};
104
    TriggerMethod trigger_method {TriggerMethod::AUTO};
105
    int64_t compaction_score {0}; // tablet compaction score at register time
106
107
    // ===== Timestamps =====
108
    int64_t scheduled_time_ms {0}; // task registration time
109
    int64_t start_time_ms {0};     // task execution start time (0 while PENDING)
110
    int64_t end_time_ms {0};       // task end time (0 while not completed)
111
112
    // ===== Input statistics (available after prepare_compact) =====
113
    int64_t input_rowsets_count {0};
114
    int64_t input_row_num {0};
115
    int64_t input_data_size {0};  // bytes, corresponds to _input_rowsets_data_size
116
    int64_t input_index_size {0}; // bytes, corresponds to _input_rowsets_index_size
117
    int64_t input_total_size {0}; // bytes, = data + index
118
    int64_t input_segments_num {0};
119
    std::string input_version_range; // e.g. "[0-5]"
120
121
    // ===== Output statistics (written at complete/fail) =====
122
    int64_t merged_rows {0};
123
    int64_t filtered_rows {0};
124
    int64_t output_rows {0};       // Merger output rows (_stats.output_rows; 0 for ordered path)
125
    int64_t output_row_num {0};    // from _output_rowset->num_rows()
126
    int64_t output_data_size {0};  // bytes, from _output_rowset->data_disk_size()
127
    int64_t output_index_size {0}; // bytes, from _output_rowset->index_disk_size()
128
    int64_t output_total_size {0}; // bytes, from _output_rowset->total_disk_size()
129
    int64_t output_segments_num {0};
130
    std::string output_version; // e.g. "[0-5]"
131
132
    // ===== Merge performance =====
133
    int64_t merge_latency_ms {0}; // merge rowsets latency (ms; 0 for ordered path)
134
135
    // ===== IO statistics (written at complete/fail) =====
136
    int64_t bytes_read_from_local {0};
137
    int64_t bytes_read_from_remote {0};
138
139
    // ===== Resources =====
140
    int64_t peak_memory_bytes {0}; // peak memory usage (bytes)
141
    bool is_vertical {false};      // whether vertical merge is used
142
    int64_t permits {0};           // compaction permits used
143
144
    // ===== Vertical compaction progress =====
145
    int64_t vertical_total_groups {0}; // total column groups (0 for horizontal)
146
    int64_t vertical_completed_groups {
147
            0}; // completed column groups (updated in real-time during RUNNING)
148
149
    // ===== Error =====
150
    std::string status_msg; // failure message (empty on success)
151
};
152
153
// Global singleton managing compaction task lifecycle.
154
// Receives push reports from compaction entries and execution layer,
155
// provides pull query interfaces for system table and HTTP API.
156
class CompactionTaskTracker {
157
public:
158
    static CompactionTaskTracker* instance();
159
160
    // ID allocation: globally unique monotonically increasing, restarts from 1 after BE restart.
161
528
    int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); }
162
163
    // ===== Push interfaces: lifecycle management (write lock) =====
164
    // All push interfaces are no-op when enable_compaction_task_tracker=false.
165
    void register_task(CompactionTaskInfo info);
166
    void update_to_running(int64_t compaction_id, const RunningStats& stats);
167
    void update_progress(int64_t compaction_id, int64_t total_groups, int64_t completed_groups);
168
    void complete(int64_t compaction_id, const CompletionStats& stats);
169
    void fail(int64_t compaction_id, const CompletionStats& stats, const std::string& msg);
170
    void remove_task(int64_t compaction_id);
171
172
    // ===== Pull interfaces: queries (read lock) =====
173
    // For system table: returns full snapshot copy of _active_tasks + _completed_tasks.
174
    std::vector<CompactionTaskInfo> get_all_tasks() const;
175
176
    // For HTTP API: iterates _completed_tasks only, returns filtered subset copy.
177
    std::vector<CompactionTaskInfo> get_completed_tasks(int64_t tablet_id = 0, int64_t top_n = 0,
178
                                                        const std::string& compaction_type = "",
179
                                                        int success_filter = -1) const;
180
181
    // Test only: clear all active and completed tasks.
182
    void clear_for_test();
183
184
private:
185
1
    CompactionTaskTracker() = default;
186
187
    void _apply_completion(CompactionTaskInfo& info, const CompletionStats& stats);
188
    void _trim_completed_locked();
189
190
    std::atomic<int64_t> _next_id {1};
191
192
    mutable std::shared_mutex _mutex;
193
194
    // Active tasks (PENDING + RUNNING), indexed by compaction_id.
195
    // Removed on complete/fail and moved to _completed_tasks.
196
    std::unordered_map<int64_t, CompactionTaskInfo> _active_tasks;
197
198
    // Completed tasks (FINISHED + FAILED), FIFO ring buffer.
199
    // Oldest records are evicted when exceeding compaction_task_tracker_max_records.
200
    std::deque<CompactionTaskInfo> _completed_tasks;
201
};
202
203
} // namespace doris