be/src/storage/compaction_task_tracker.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <atomic> |
21 | | #include <cstdint> |
22 | | #include <deque> |
23 | | #include <shared_mutex> |
24 | | #include <string> |
25 | | #include <unordered_map> |
26 | | #include <vector> |
27 | | |
28 | | namespace doris { |
29 | | |
30 | | // Standardized compaction type enum covering base / cumulative / full. |
31 | | enum class CompactionProfileType : uint8_t { |
32 | | BASE = 0, |
33 | | CUMULATIVE = 1, |
34 | | FULL = 2, |
35 | | BINLOG = 3, |
36 | | }; |
37 | | |
38 | | const char* to_string(CompactionProfileType type); |
39 | | |
40 | | // Task lifecycle status. |
41 | | enum class CompactionTaskStatus : uint8_t { |
42 | | PENDING = 0, |
43 | | RUNNING = 1, |
44 | | FINISHED = 2, |
45 | | FAILED = 3, |
46 | | }; |
47 | | |
48 | | const char* to_string(CompactionTaskStatus status); |
49 | | |
50 | | // How the compaction was triggered. |
51 | | enum class TriggerMethod : uint8_t { |
52 | | AUTO = 0, |
53 | | MANUAL = 1, |
54 | | LOAD_TRIGGERED = 2, |
55 | | }; |
56 | | |
57 | | const char* to_string(TriggerMethod method); |
58 | | |
59 | | // Incremental info when transitioning from PENDING to RUNNING. |
60 | | struct RunningStats { |
61 | | int64_t start_time_ms {0}; |
62 | | bool is_vertical {false}; |
63 | | int64_t permits {0}; |
64 | | }; |
65 | | |
66 | | // Result info collected when a task completes or fails. |
67 | | struct CompletionStats { |
68 | | // Input stats backfill: local compaction populates these in build_basic_info() |
69 | | // which runs inside execute_compact_impl(), after register_task(). |
70 | | std::string input_version_range; |
71 | | int64_t input_rowsets_count {0}; |
72 | | int64_t input_row_num {0}; |
73 | | int64_t input_data_size {0}; |
74 | | int64_t input_index_size {0}; |
75 | | int64_t input_total_size {0}; |
76 | | int64_t input_segments_num {0}; |
77 | | int64_t end_time_ms {0}; |
78 | | int64_t merged_rows {0}; |
79 | | int64_t filtered_rows {0}; |
80 | | int64_t output_rows {0}; // _stats.output_rows (Merger statistics) |
81 | | int64_t output_row_num {0}; |
82 | | int64_t output_data_size {0}; |
83 | | int64_t output_index_size {0}; // _output_rowset->index_disk_size() |
84 | | int64_t output_total_size {0}; // _output_rowset->total_disk_size() |
85 | | int64_t output_segments_num {0}; |
86 | | std::string output_version; |
87 | | bool is_ordered_data_compaction {false}; |
88 | | int64_t merge_latency_ms {0}; // _merge_rowsets_latency_timer (converted to ms) |
89 | | int64_t bytes_read_from_local {0}; |
90 | | int64_t bytes_read_from_remote {0}; |
91 | | int64_t peak_memory_bytes {0}; |
92 | | }; |
93 | | |
94 | | // Unified metadata describing a compaction task across its full lifecycle. |
95 | | struct CompactionTaskInfo { |
96 | | // ===== Identity ===== |
97 | | int64_t compaction_id {0}; // unique task ID assigned by Tracker |
98 | | int64_t backend_id {0}; // BE node ID |
99 | | int64_t table_id {0}; // table ID |
100 | | int64_t partition_id {0}; // partition ID |
101 | | int64_t tablet_id {0}; // tablet ID |
102 | | |
103 | | // ===== Task attributes ===== |
104 | | CompactionProfileType compaction_type {CompactionProfileType::BASE}; |
105 | | CompactionTaskStatus status {CompactionTaskStatus::PENDING}; |
106 | | TriggerMethod trigger_method {TriggerMethod::AUTO}; |
107 | | int64_t compaction_score {0}; // tablet compaction score at register time |
108 | | |
109 | | // ===== Timestamps ===== |
110 | | int64_t scheduled_time_ms {0}; // task registration time |
111 | | int64_t start_time_ms {0}; // task execution start time (0 while PENDING) |
112 | | int64_t end_time_ms {0}; // task end time (0 while not completed) |
113 | | |
114 | | // ===== Input statistics (available after prepare_compact) ===== |
115 | | int64_t input_rowsets_count {0}; |
116 | | int64_t input_row_num {0}; |
117 | | int64_t input_data_size {0}; // bytes, corresponds to _input_rowsets_data_size |
118 | | int64_t input_index_size {0}; // bytes, corresponds to _input_rowsets_index_size |
119 | | int64_t input_total_size {0}; // bytes, = data + index |
120 | | int64_t input_segments_num {0}; |
121 | | std::string input_version_range; // e.g. "[0-5]" |
122 | | |
123 | | // ===== Output statistics (written at complete/fail) ===== |
124 | | int64_t merged_rows {0}; |
125 | | int64_t filtered_rows {0}; |
126 | | int64_t output_rows {0}; // Merger output rows (_stats.output_rows; 0 for ordered path) |
127 | | int64_t output_row_num {0}; // from _output_rowset->num_rows() |
128 | | int64_t output_data_size {0}; // bytes, from _output_rowset->data_disk_size() |
129 | | int64_t output_index_size {0}; // bytes, from _output_rowset->index_disk_size() |
130 | | int64_t output_total_size {0}; // bytes, from _output_rowset->total_disk_size() |
131 | | int64_t output_segments_num {0}; |
132 | | std::string output_version; // e.g. "[0-5]" |
133 | | bool is_ordered_data_compaction { |
134 | | false}; // ordered/meta-link compaction path, including binlog quick path |
135 | | |
136 | | // ===== Merge performance ===== |
137 | | int64_t merge_latency_ms {0}; // merge rowsets latency (ms; 0 for ordered path) |
138 | | |
139 | | // ===== IO statistics (written at complete/fail) ===== |
140 | | int64_t bytes_read_from_local {0}; |
141 | | int64_t bytes_read_from_remote {0}; |
142 | | |
143 | | // ===== Resources ===== |
144 | | int64_t peak_memory_bytes {0}; // peak memory usage (bytes) |
145 | | bool is_vertical {false}; // whether vertical merge is used |
146 | | int64_t permits {0}; // compaction permits used |
147 | | |
148 | | // ===== Vertical compaction progress ===== |
149 | | int64_t vertical_total_groups {0}; // total column groups (0 for horizontal) |
150 | | int64_t vertical_completed_groups { |
151 | | 0}; // completed column groups (updated in real-time during RUNNING) |
152 | | |
153 | | // ===== Error ===== |
154 | | std::string status_msg; // failure message (empty on success) |
155 | | }; |
156 | | |
157 | | // Global singleton managing compaction task lifecycle. |
158 | | // Receives push reports from compaction entries and execution layer, |
159 | | // provides pull query interfaces for system table and HTTP API. |
160 | | class CompactionTaskTracker { |
161 | | public: |
162 | | static CompactionTaskTracker* instance(); |
163 | | |
164 | | // ID allocation: globally unique monotonically increasing, restarts from 1 after BE restart. |
165 | 343k | int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); } |
166 | | |
167 | | // ===== Push interfaces: lifecycle management (write lock) ===== |
168 | | // All push interfaces are no-op when enable_compaction_task_tracker=false. |
169 | | void register_task(CompactionTaskInfo info); |
170 | | void update_to_running(int64_t compaction_id, const RunningStats& stats); |
171 | | void update_progress(int64_t compaction_id, int64_t total_groups, int64_t completed_groups); |
172 | | void complete(int64_t compaction_id, const CompletionStats& stats); |
173 | | void fail(int64_t compaction_id, const CompletionStats& stats, const std::string& msg); |
174 | | void remove_task(int64_t compaction_id); |
175 | | |
176 | | // ===== Pull interfaces: queries (read lock) ===== |
177 | | // For system table: returns full snapshot copy of _active_tasks + _completed_tasks. |
178 | | std::vector<CompactionTaskInfo> get_all_tasks() const; |
179 | | |
180 | | // For HTTP API: iterates _completed_tasks only, returns filtered subset copy. |
181 | | std::vector<CompactionTaskInfo> get_completed_tasks(int64_t tablet_id = 0, int64_t top_n = 0, |
182 | | const std::string& compaction_type = "", |
183 | | int success_filter = -1) const; |
184 | | |
185 | | // Test only: clear all active and completed tasks. |
186 | | void clear_for_test(); |
187 | | |
188 | | private: |
189 | 8 | CompactionTaskTracker() = default; |
190 | | |
191 | | void _apply_completion(CompactionTaskInfo& info, const CompletionStats& stats); |
192 | | void _trim_completed_locked(); |
193 | | |
194 | | std::atomic<int64_t> _next_id {1}; |
195 | | |
196 | | mutable std::shared_mutex _mutex; |
197 | | |
198 | | // Active tasks (PENDING + RUNNING), indexed by compaction_id. |
199 | | // Removed on complete/fail and moved to _completed_tasks. |
200 | | std::unordered_map<int64_t, CompactionTaskInfo> _active_tasks; |
201 | | |
202 | | // Completed tasks (FINISHED + FAILED), FIFO ring buffer. |
203 | | // Oldest records are evicted when exceeding compaction_task_tracker_max_records. |
204 | | std::deque<CompactionTaskInfo> _completed_tasks; |
205 | | }; |
206 | | |
207 | | } // namespace doris |