be/src/storage/compaction_task_tracker.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <atomic> |
21 | | #include <cstdint> |
22 | | #include <deque> |
23 | | #include <shared_mutex> |
24 | | #include <string> |
25 | | #include <unordered_map> |
26 | | #include <vector> |
27 | | |
28 | | namespace doris { |
29 | | |
30 | | // Standardized compaction type enum covering base / cumulative / full. |
31 | | enum class CompactionProfileType : uint8_t { |
32 | | BASE = 0, |
33 | | CUMULATIVE = 1, |
34 | | FULL = 2, |
35 | | }; |
36 | | |
37 | | const char* to_string(CompactionProfileType type); |
38 | | |
39 | | // Task lifecycle status. |
40 | | enum class CompactionTaskStatus : uint8_t { |
41 | | PENDING = 0, |
42 | | RUNNING = 1, |
43 | | FINISHED = 2, |
44 | | FAILED = 3, |
45 | | }; |
46 | | |
47 | | const char* to_string(CompactionTaskStatus status); |
48 | | |
49 | | // How the compaction was triggered. |
50 | | enum class TriggerMethod : uint8_t { |
51 | | AUTO = 0, |
52 | | MANUAL = 1, |
53 | | LOAD_TRIGGERED = 2, |
54 | | }; |
55 | | |
56 | | const char* to_string(TriggerMethod method); |
57 | | |
58 | | // Incremental info when transitioning from PENDING to RUNNING. |
59 | | struct RunningStats { |
60 | | int64_t start_time_ms {0}; |
61 | | bool is_vertical {false}; |
62 | | int64_t permits {0}; |
63 | | }; |
64 | | |
65 | | // Result info collected when a task completes or fails. |
66 | | struct CompletionStats { |
67 | | // Input stats backfill: local compaction populates these in build_basic_info() |
68 | | // which runs inside execute_compact_impl(), after register_task(). |
69 | | std::string input_version_range; |
70 | | int64_t input_rowsets_count {0}; |
71 | | int64_t input_row_num {0}; |
72 | | int64_t input_data_size {0}; |
73 | | int64_t input_index_size {0}; |
74 | | int64_t input_total_size {0}; |
75 | | int64_t input_segments_num {0}; |
76 | | int64_t end_time_ms {0}; |
77 | | int64_t merged_rows {0}; |
78 | | int64_t filtered_rows {0}; |
79 | | int64_t output_rows {0}; // _stats.output_rows (Merger statistics) |
80 | | int64_t output_row_num {0}; |
81 | | int64_t output_data_size {0}; |
82 | | int64_t output_index_size {0}; // _output_rowset->index_disk_size() |
83 | | int64_t output_total_size {0}; // _output_rowset->total_disk_size() |
84 | | int64_t output_segments_num {0}; |
85 | | std::string output_version; |
86 | | int64_t merge_latency_ms {0}; // _merge_rowsets_latency_timer (converted to ms) |
87 | | int64_t bytes_read_from_local {0}; |
88 | | int64_t bytes_read_from_remote {0}; |
89 | | int64_t peak_memory_bytes {0}; |
90 | | }; |
91 | | |
92 | | // Unified metadata describing a compaction task across its full lifecycle. |
93 | | struct CompactionTaskInfo { |
94 | | // ===== Identity ===== |
95 | | int64_t compaction_id {0}; // unique task ID assigned by Tracker |
96 | | int64_t backend_id {0}; // BE node ID |
97 | | int64_t table_id {0}; // table ID |
98 | | int64_t partition_id {0}; // partition ID |
99 | | int64_t tablet_id {0}; // tablet ID |
100 | | |
101 | | // ===== Task attributes ===== |
102 | | CompactionProfileType compaction_type {CompactionProfileType::BASE}; |
103 | | CompactionTaskStatus status {CompactionTaskStatus::PENDING}; |
104 | | TriggerMethod trigger_method {TriggerMethod::AUTO}; |
105 | | int64_t compaction_score {0}; // tablet compaction score at register time |
106 | | |
107 | | // ===== Timestamps ===== |
108 | | int64_t scheduled_time_ms {0}; // task registration time |
109 | | int64_t start_time_ms {0}; // task execution start time (0 while PENDING) |
110 | | int64_t end_time_ms {0}; // task end time (0 while not completed) |
111 | | |
112 | | // ===== Input statistics (available after prepare_compact) ===== |
113 | | int64_t input_rowsets_count {0}; |
114 | | int64_t input_row_num {0}; |
115 | | int64_t input_data_size {0}; // bytes, corresponds to _input_rowsets_data_size |
116 | | int64_t input_index_size {0}; // bytes, corresponds to _input_rowsets_index_size |
117 | | int64_t input_total_size {0}; // bytes, = data + index |
118 | | int64_t input_segments_num {0}; |
119 | | std::string input_version_range; // e.g. "[0-5]" |
120 | | |
121 | | // ===== Output statistics (written at complete/fail) ===== |
122 | | int64_t merged_rows {0}; |
123 | | int64_t filtered_rows {0}; |
124 | | int64_t output_rows {0}; // Merger output rows (_stats.output_rows; 0 for ordered path) |
125 | | int64_t output_row_num {0}; // from _output_rowset->num_rows() |
126 | | int64_t output_data_size {0}; // bytes, from _output_rowset->data_disk_size() |
127 | | int64_t output_index_size {0}; // bytes, from _output_rowset->index_disk_size() |
128 | | int64_t output_total_size {0}; // bytes, from _output_rowset->total_disk_size() |
129 | | int64_t output_segments_num {0}; |
130 | | std::string output_version; // e.g. "[0-5]" |
131 | | |
132 | | // ===== Merge performance ===== |
133 | | int64_t merge_latency_ms {0}; // merge rowsets latency (ms; 0 for ordered path) |
134 | | |
135 | | // ===== IO statistics (written at complete/fail) ===== |
136 | | int64_t bytes_read_from_local {0}; |
137 | | int64_t bytes_read_from_remote {0}; |
138 | | |
139 | | // ===== Resources ===== |
140 | | int64_t peak_memory_bytes {0}; // peak memory usage (bytes) |
141 | | bool is_vertical {false}; // whether vertical merge is used |
142 | | int64_t permits {0}; // compaction permits used |
143 | | |
144 | | // ===== Vertical compaction progress ===== |
145 | | int64_t vertical_total_groups {0}; // total column groups (0 for horizontal) |
146 | | int64_t vertical_completed_groups { |
147 | | 0}; // completed column groups (updated in real-time during RUNNING) |
148 | | |
149 | | // ===== Error ===== |
150 | | std::string status_msg; // failure message (empty on success) |
151 | | }; |
152 | | |
153 | | // Global singleton managing compaction task lifecycle. |
154 | | // Receives push reports from compaction entries and execution layer, |
155 | | // provides pull query interfaces for system table and HTTP API. |
156 | | class CompactionTaskTracker { |
157 | | public: |
158 | | static CompactionTaskTracker* instance(); |
159 | | |
160 | | // ID allocation: globally unique monotonically increasing, restarts from 1 after BE restart. |
161 | 528 | int64_t next_compaction_id() { return _next_id.fetch_add(1, std::memory_order_relaxed); } |
162 | | |
163 | | // ===== Push interfaces: lifecycle management (write lock) ===== |
164 | | // All push interfaces are no-op when enable_compaction_task_tracker=false. |
165 | | void register_task(CompactionTaskInfo info); |
166 | | void update_to_running(int64_t compaction_id, const RunningStats& stats); |
167 | | void update_progress(int64_t compaction_id, int64_t total_groups, int64_t completed_groups); |
168 | | void complete(int64_t compaction_id, const CompletionStats& stats); |
169 | | void fail(int64_t compaction_id, const CompletionStats& stats, const std::string& msg); |
170 | | void remove_task(int64_t compaction_id); |
171 | | |
172 | | // ===== Pull interfaces: queries (read lock) ===== |
173 | | // For system table: returns full snapshot copy of _active_tasks + _completed_tasks. |
174 | | std::vector<CompactionTaskInfo> get_all_tasks() const; |
175 | | |
176 | | // For HTTP API: iterates _completed_tasks only, returns filtered subset copy. |
177 | | std::vector<CompactionTaskInfo> get_completed_tasks(int64_t tablet_id = 0, int64_t top_n = 0, |
178 | | const std::string& compaction_type = "", |
179 | | int success_filter = -1) const; |
180 | | |
181 | | // Test only: clear all active and completed tasks. |
182 | | void clear_for_test(); |
183 | | |
184 | | private: |
185 | 1 | CompactionTaskTracker() = default; |
186 | | |
187 | | void _apply_completion(CompactionTaskInfo& info, const CompletionStats& stats); |
188 | | void _trim_completed_locked(); |
189 | | |
190 | | std::atomic<int64_t> _next_id {1}; |
191 | | |
192 | | mutable std::shared_mutex _mutex; |
193 | | |
194 | | // Active tasks (PENDING + RUNNING), indexed by compaction_id. |
195 | | // Removed on complete/fail and moved to _completed_tasks. |
196 | | std::unordered_map<int64_t, CompactionTaskInfo> _active_tasks; |
197 | | |
198 | | // Completed tasks (FINISHED + FAILED), FIFO ring buffer. |
199 | | // Oldest records are evicted when exceeding compaction_task_tracker_max_records. |
200 | | std::deque<CompactionTaskInfo> _completed_tasks; |
201 | | }; |
202 | | |
203 | | } // namespace doris |