be/src/storage/olap_server.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <gen_cpp/Types_types.h> |
19 | | #include <gen_cpp/olap_file.pb.h> |
20 | | #include <glog/logging.h> |
21 | | #include <rapidjson/prettywriter.h> |
22 | | #include <rapidjson/stringbuffer.h> |
23 | | #include <stdint.h> |
24 | | #include <sys/types.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <atomic> |
28 | | // IWYU pragma: no_include <bits/chrono.h> |
29 | | #include <gen_cpp/FrontendService.h> |
30 | | #include <gen_cpp/internal_service.pb.h> |
31 | | |
32 | | #include <chrono> // IWYU pragma: keep |
33 | | #include <cmath> |
34 | | #include <condition_variable> |
35 | | #include <cstdint> |
36 | | #include <ctime> |
37 | | #include <functional> |
38 | | #include <map> |
39 | | #include <memory> |
40 | | #include <mutex> |
41 | | #include <ostream> |
42 | | #include <random> |
43 | | #include <shared_mutex> |
44 | | #include <string> |
45 | | #include <thread> |
46 | | #include <unordered_set> |
47 | | #include <utility> |
48 | | #include <vector> |
49 | | |
50 | | #include "agent/utils.h" |
51 | | #include "common/config.h" |
52 | | #include "common/logging.h" |
53 | | #include "common/metrics/doris_metrics.h" |
54 | | #include "common/metrics/metrics.h" |
55 | | #include "common/status.h" |
56 | | #include "cpp/sync_point.h" |
57 | | #include "io/fs/file_writer.h" // IWYU pragma: keep |
58 | | #include "io/fs/path.h" |
59 | | #include "load/memtable/memtable_flush_executor.h" |
60 | | #include "runtime/memory/cache_manager.h" |
61 | | #include "runtime/memory/global_memory_arbitrator.h" |
62 | | #include "storage/compaction/cold_data_compaction.h" |
63 | | #include "storage/compaction/compaction_permit_limiter.h" |
64 | | #include "storage/compaction/cumulative_compaction.h" |
65 | | #include "storage/compaction/cumulative_compaction_policy.h" |
66 | | #include "storage/compaction/cumulative_compaction_time_series_policy.h" |
67 | | #include "storage/compaction/single_replica_compaction.h" |
68 | | #include "storage/compaction_task_tracker.h" |
69 | | #include "storage/data_dir.h" |
70 | | #include "storage/olap_common.h" |
71 | | #include "storage/olap_define.h" |
72 | | #include "storage/rowset/segcompaction.h" |
73 | | #include "storage/schema_change/schema_change.h" |
74 | | #include "storage/storage_engine.h" |
75 | | #include "storage/storage_policy.h" |
76 | | #include "storage/tablet/base_tablet.h" |
77 | | #include "storage/tablet/tablet.h" |
78 | | #include "storage/tablet/tablet_manager.h" |
79 | | #include "storage/tablet/tablet_meta.h" |
80 | | #include "storage/tablet/tablet_meta_manager.h" |
81 | | #include "storage/tablet/tablet_schema.h" |
82 | | #include "storage/task/engine_publish_version_task.h" |
83 | | #include "storage/task/index_builder.h" |
84 | | #include "util/client_cache.h" |
85 | | #include "util/countdown_latch.h" |
86 | | #include "util/debug_points.h" |
87 | | #include "util/mem_info.h" |
88 | | #include "util/thread.h" |
89 | | #include "util/threadpool.h" |
90 | | #include "util/thrift_rpc_helper.h" |
91 | | #include "util/time.h" |
92 | | #include "util/uid_util.h" |
93 | | #include "util/work_thread_pool.hpp" |
94 | | |
95 | | using std::string; |
96 | | |
97 | | namespace doris { |
98 | | #include "common/compile_check_begin.h" |
99 | | using io::Path; |
100 | | |
101 | | // number of running SCHEMA-CHANGE threads |
102 | | volatile uint32_t g_schema_change_active_threads = 0; |
103 | | bvar::Status<int64_t> g_cumu_compaction_task_num_per_round("cumu_compaction_task_num_per_round", 0); |
104 | | bvar::Status<int64_t> g_base_compaction_task_num_per_round("base_compaction_task_num_per_round", 0); |
105 | | |
106 | | static const uint64_t DEFAULT_SEED = 104729; |
107 | | static const uint64_t MOD_PRIME = 7652413; |
108 | | |
109 | 0 | CompactionSubmitRegistry::CompactionSubmitRegistry(CompactionSubmitRegistry&& r) { |
110 | 0 | std::swap(_tablet_submitted_cumu_compaction, r._tablet_submitted_cumu_compaction); |
111 | 0 | std::swap(_tablet_submitted_base_compaction, r._tablet_submitted_base_compaction); |
112 | 0 | std::swap(_tablet_submitted_full_compaction, r._tablet_submitted_full_compaction); |
113 | 0 | } |
114 | | |
115 | 8.91k | CompactionSubmitRegistry CompactionSubmitRegistry::create_snapshot() { |
116 | | // full compaction is not engaged in this method |
117 | 8.91k | std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
118 | 8.91k | CompactionSubmitRegistry registry; |
119 | 8.91k | registry._tablet_submitted_base_compaction = _tablet_submitted_base_compaction; |
120 | 8.91k | registry._tablet_submitted_cumu_compaction = _tablet_submitted_cumu_compaction; |
121 | 8.91k | return registry; |
122 | 8.91k | } |
123 | | |
124 | 13 | void CompactionSubmitRegistry::reset(const std::vector<DataDir*>& stores) { |
125 | | // full compaction is not engaged in this method |
126 | 13 | for (const auto& store : stores) { |
127 | 10 | _tablet_submitted_cumu_compaction[store] = {}; |
128 | 10 | _tablet_submitted_base_compaction[store] = {}; |
129 | 10 | } |
130 | 13 | } |
131 | | |
132 | | uint32_t CompactionSubmitRegistry::count_executing_compaction(DataDir* dir, |
133 | 20.9k | CompactionType compaction_type) { |
134 | | // non-lock, used in snapshot |
135 | 20.9k | const auto& compaction_tasks = _get_tablet_set(dir, compaction_type); |
136 | 20.9k | return cast_set<uint32_t>(std::count_if( |
137 | 20.9k | compaction_tasks.begin(), compaction_tasks.end(), |
138 | 20.9k | [](const auto& task) { return task->compaction_stage == CompactionStage::EXECUTING; })); |
139 | 20.9k | } |
140 | | |
141 | 10.4k | uint32_t CompactionSubmitRegistry::count_executing_cumu_and_base(DataDir* dir) { |
142 | | // non-lock, used in snapshot |
143 | 10.4k | return count_executing_compaction(dir, CompactionType::BASE_COMPACTION) + |
144 | 10.4k | count_executing_compaction(dir, CompactionType::CUMULATIVE_COMPACTION); |
145 | 10.4k | } |
146 | | |
147 | 20.9k | bool CompactionSubmitRegistry::has_compaction_task(DataDir* dir, CompactionType compaction_type) { |
148 | | // non-lock, used in snapshot |
149 | 20.9k | return !_get_tablet_set(dir, compaction_type).empty(); |
150 | 20.9k | } |
151 | | |
152 | | std::vector<TabletSharedPtr> CompactionSubmitRegistry::pick_topn_tablets_for_compaction( |
153 | | TabletManager* tablet_mgr, DataDir* data_dir, CompactionType compaction_type, |
154 | 10.4k | const CumuCompactionPolicyTable& cumu_compaction_policies, uint32_t* disk_max_score) { |
155 | | // non-lock, used in snapshot |
156 | 10.4k | return tablet_mgr->find_best_tablets_to_compaction(compaction_type, data_dir, |
157 | 10.4k | _get_tablet_set(data_dir, compaction_type), |
158 | 10.4k | disk_max_score, cumu_compaction_policies); |
159 | 10.4k | } |
160 | | |
161 | 196k | bool CompactionSubmitRegistry::insert(TabletSharedPtr tablet, CompactionType compaction_type) { |
162 | 196k | std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
163 | 196k | auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type); |
164 | 196k | bool already_exist = !(tablet_set.insert(tablet).second); |
165 | 196k | return already_exist; |
166 | 196k | } |
167 | | |
168 | | void CompactionSubmitRegistry::remove(TabletSharedPtr tablet, CompactionType compaction_type, |
169 | 195k | std::function<void()> wakeup_cb) { |
170 | 195k | std::unique_lock<std::mutex> l(_tablet_submitted_compaction_mutex); |
171 | 195k | auto& tablet_set = _get_tablet_set(tablet->data_dir(), compaction_type); |
172 | 195k | size_t removed = tablet_set.erase(tablet); |
173 | 195k | if (removed == 1) { |
174 | 195k | wakeup_cb(); |
175 | 195k | } |
176 | 195k | } |
177 | | |
178 | | CompactionSubmitRegistry::TabletSet& CompactionSubmitRegistry::_get_tablet_set( |
179 | 444k | DataDir* dir, CompactionType compaction_type) { |
180 | 444k | switch (compaction_type) { |
181 | 97.7k | case CompactionType::BASE_COMPACTION: |
182 | 97.7k | return _tablet_submitted_base_compaction[dir]; |
183 | 346k | case CompactionType::CUMULATIVE_COMPACTION: |
184 | 346k | return _tablet_submitted_cumu_compaction[dir]; |
185 | 0 | case CompactionType::FULL_COMPACTION: |
186 | 0 | return _tablet_submitted_full_compaction[dir]; |
187 | 0 | default: |
188 | 0 | CHECK(false) << "invalid compaction type"; |
189 | 444k | } |
190 | 444k | } |
191 | | |
192 | 8.92k | static int32_t get_cumu_compaction_threads_num(size_t data_dirs_num) { |
193 | 8.92k | int32_t threads_num = config::max_cumu_compaction_threads; |
194 | 8.92k | if (threads_num == -1) { |
195 | 8.92k | int32_t num_cores = doris::CpuInfo::num_cores(); |
196 | 8.92k | threads_num = std::max(cast_set<int32_t>(data_dirs_num), num_cores / 6); |
197 | 8.92k | } |
198 | 8.92k | threads_num = threads_num <= 0 ? 1 : threads_num; |
199 | 8.92k | return threads_num; |
200 | 8.92k | } |
201 | | |
202 | 8.92k | static int32_t get_base_compaction_threads_num(size_t data_dirs_num) { |
203 | 8.92k | int32_t threads_num = config::max_base_compaction_threads; |
204 | 8.92k | if (threads_num == -1) { |
205 | 0 | threads_num = cast_set<int32_t>(data_dirs_num); |
206 | 0 | } |
207 | 8.92k | threads_num = threads_num <= 0 ? 1 : threads_num; |
208 | 8.92k | return threads_num; |
209 | 8.92k | } |
210 | | |
211 | 8.92k | static int32_t get_single_replica_compaction_threads_num(size_t data_dirs_num) { |
212 | 8.92k | int32_t threads_num = config::max_single_replica_compaction_threads; |
213 | 8.92k | if (threads_num == -1) { |
214 | 8.92k | threads_num = cast_set<int32_t>(data_dirs_num); |
215 | 8.92k | } |
216 | 8.92k | threads_num = threads_num <= 0 ? 1 : threads_num; |
217 | 8.92k | return threads_num; |
218 | 8.92k | } |
219 | | |
220 | 6 | Status StorageEngine::start_bg_threads(std::shared_ptr<WorkloadGroup> wg_sptr) { |
221 | 6 | RETURN_IF_ERROR(Thread::create( |
222 | 6 | "StorageEngine", "unused_rowset_monitor_thread", |
223 | 6 | [this]() { this->_unused_rowset_monitor_thread_callback(); }, |
224 | 6 | &_unused_rowset_monitor_thread)); |
225 | 6 | LOG(INFO) << "unused rowset monitor thread started"; |
226 | | |
227 | 6 | RETURN_IF_ERROR(Thread::create( |
228 | 6 | "StorageEngine", "evict_querying_rowset_thread", |
229 | 6 | [this]() { this->_evict_quring_rowset_thread_callback(); }, |
230 | 6 | &_evict_quering_rowset_thread)); |
231 | 6 | LOG(INFO) << "evict quering thread started"; |
232 | | |
233 | | // start thread for monitoring the snapshot and trash folder |
234 | 6 | RETURN_IF_ERROR(Thread::create( |
235 | 6 | "StorageEngine", "garbage_sweeper_thread", |
236 | 6 | [this]() { this->_garbage_sweeper_thread_callback(); }, &_garbage_sweeper_thread)); |
237 | 6 | LOG(INFO) << "garbage sweeper thread started"; |
238 | | |
239 | | // start thread for monitoring the tablet with io error |
240 | 6 | RETURN_IF_ERROR(Thread::create( |
241 | 6 | "StorageEngine", "disk_stat_monitor_thread", |
242 | 6 | [this]() { this->_disk_stat_monitor_thread_callback(); }, &_disk_stat_monitor_thread)); |
243 | 6 | LOG(INFO) << "disk stat monitor thread started"; |
244 | | |
245 | | // convert store map to vector |
246 | 6 | std::vector<DataDir*> data_dirs = get_stores(); |
247 | | |
248 | 6 | auto base_compaction_threads = get_base_compaction_threads_num(data_dirs.size()); |
249 | 6 | auto cumu_compaction_threads = get_cumu_compaction_threads_num(data_dirs.size()); |
250 | 6 | auto single_replica_compaction_threads = |
251 | 6 | get_single_replica_compaction_threads_num(data_dirs.size()); |
252 | | |
253 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("BaseCompactionTaskThreadPool") |
254 | 6 | .set_min_threads(base_compaction_threads) |
255 | 6 | .set_max_threads(base_compaction_threads) |
256 | 6 | .build(&_base_compaction_thread_pool)); |
257 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("CumuCompactionTaskThreadPool") |
258 | 6 | .set_min_threads(cumu_compaction_threads) |
259 | 6 | .set_max_threads(cumu_compaction_threads) |
260 | 6 | .build(&_cumu_compaction_thread_pool)); |
261 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("SingleReplicaCompactionTaskThreadPool") |
262 | 6 | .set_min_threads(single_replica_compaction_threads) |
263 | 6 | .set_max_threads(single_replica_compaction_threads) |
264 | 6 | .build(&_single_replica_compaction_thread_pool)); |
265 | | |
266 | 6 | if (config::enable_segcompaction) { |
267 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("SegCompactionTaskThreadPool") |
268 | 6 | .set_min_threads(config::segcompaction_num_threads) |
269 | 6 | .set_max_threads(config::segcompaction_num_threads) |
270 | 6 | .build(&_seg_compaction_thread_pool)); |
271 | 6 | } |
272 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("ColdDataCompactionTaskThreadPool") |
273 | 6 | .set_min_threads(config::cold_data_compaction_thread_num) |
274 | 6 | .set_max_threads(config::cold_data_compaction_thread_num) |
275 | 6 | .build(&_cold_data_compaction_thread_pool)); |
276 | | |
277 | | // compaction tasks producer thread |
278 | 6 | RETURN_IF_ERROR(Thread::create( |
279 | 6 | "StorageEngine", "compaction_tasks_producer_thread", |
280 | 6 | [this]() { this->_compaction_tasks_producer_callback(); }, |
281 | 6 | &_compaction_tasks_producer_thread)); |
282 | 6 | LOG(INFO) << "compaction tasks producer thread started"; |
283 | | |
284 | 6 | RETURN_IF_ERROR(Thread::create( |
285 | 6 | "StorageEngine", "_update_replica_infos_thread", |
286 | 6 | [this]() { this->_update_replica_infos_callback(); }, &_update_replica_infos_thread)); |
287 | 6 | LOG(INFO) << "tablet replicas info update thread started"; |
288 | | |
289 | 6 | int32_t max_checkpoint_thread_num = config::max_meta_checkpoint_threads; |
290 | 6 | if (max_checkpoint_thread_num < 0) { |
291 | 6 | max_checkpoint_thread_num = cast_set<int32_t>(data_dirs.size()); |
292 | 6 | } |
293 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("TabletMetaCheckpointTaskThreadPool") |
294 | 6 | .set_max_threads(max_checkpoint_thread_num) |
295 | 6 | .build(&_tablet_meta_checkpoint_thread_pool)); |
296 | | |
297 | 6 | RETURN_IF_ERROR(Thread::create( |
298 | 6 | "StorageEngine", "tablet_checkpoint_tasks_producer_thread", |
299 | 6 | [this, data_dirs]() { this->_tablet_checkpoint_callback(data_dirs); }, |
300 | 6 | &_tablet_checkpoint_tasks_producer_thread)); |
301 | 6 | LOG(INFO) << "tablet checkpoint tasks producer thread started"; |
302 | | |
303 | 6 | RETURN_IF_ERROR(Thread::create( |
304 | 6 | "StorageEngine", "tablet_path_check_thread", |
305 | 6 | [this]() { this->_tablet_path_check_callback(); }, &_tablet_path_check_thread)); |
306 | 6 | LOG(INFO) << "tablet path check thread started"; |
307 | | |
308 | | // path scan and gc thread |
309 | 6 | if (config::path_gc_check) { |
310 | 10 | for (auto data_dir : get_stores()) { |
311 | 10 | std::shared_ptr<Thread> path_gc_thread; |
312 | 10 | RETURN_IF_ERROR(Thread::create( |
313 | 10 | "StorageEngine", "path_gc_thread", |
314 | 10 | [this, data_dir]() { this->_path_gc_thread_callback(data_dir); }, |
315 | 10 | &path_gc_thread)); |
316 | 10 | _path_gc_threads.emplace_back(path_gc_thread); |
317 | 10 | } |
318 | 6 | LOG(INFO) << "path gc threads started. number:" << get_stores().size(); |
319 | 6 | } |
320 | | |
321 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("CooldownTaskThreadPool") |
322 | 6 | .set_min_threads(config::cooldown_thread_num) |
323 | 6 | .set_max_threads(config::cooldown_thread_num) |
324 | 6 | .build(&_cooldown_thread_pool)); |
325 | 6 | LOG(INFO) << "cooldown thread pool started"; |
326 | | |
327 | 6 | RETURN_IF_ERROR(Thread::create( |
328 | 6 | "StorageEngine", "cooldown_tasks_producer_thread", |
329 | 6 | [this]() { this->_cooldown_tasks_producer_callback(); }, |
330 | 6 | &_cooldown_tasks_producer_thread)); |
331 | 6 | LOG(INFO) << "cooldown tasks producer thread started"; |
332 | | |
333 | 6 | RETURN_IF_ERROR(Thread::create( |
334 | 6 | "StorageEngine", "remove_unused_remote_files_thread", |
335 | 6 | [this]() { this->_remove_unused_remote_files_callback(); }, |
336 | 6 | &_remove_unused_remote_files_thread)); |
337 | 6 | LOG(INFO) << "remove unused remote files thread started"; |
338 | | |
339 | 6 | RETURN_IF_ERROR(Thread::create( |
340 | 6 | "StorageEngine", "cold_data_compaction_producer_thread", |
341 | 6 | [this]() { this->_cold_data_compaction_producer_callback(); }, |
342 | 6 | &_cold_data_compaction_producer_thread)); |
343 | 6 | LOG(INFO) << "cold data compaction producer thread started"; |
344 | | |
345 | | // add tablet publish version thread pool |
346 | 6 | RETURN_IF_ERROR(ThreadPoolBuilder("TabletPublishTxnThreadPool") |
347 | 6 | .set_min_threads(config::tablet_publish_txn_max_thread) |
348 | 6 | .set_max_threads(config::tablet_publish_txn_max_thread) |
349 | 6 | .build(&_tablet_publish_txn_thread_pool)); |
350 | | |
351 | 6 | RETURN_IF_ERROR(Thread::create( |
352 | 6 | "StorageEngine", "async_publish_version_thread", |
353 | 6 | [this]() { this->_async_publish_callback(); }, &_async_publish_thread)); |
354 | 6 | LOG(INFO) << "async publish thread started"; |
355 | | |
356 | 6 | RETURN_IF_ERROR(Thread::create( |
357 | 6 | "StorageEngine", "check_tablet_delete_bitmap_score_thread", |
358 | 6 | [this]() { this->_check_tablet_delete_bitmap_score_callback(); }, |
359 | 6 | &_check_delete_bitmap_score_thread)); |
360 | 6 | LOG(INFO) << "check tablet delete bitmap score thread started"; |
361 | | |
362 | 6 | _start_adaptive_thread_controller(); |
363 | | |
364 | 6 | LOG(INFO) << "all storage engine's background threads are started."; |
365 | 6 | return Status::OK(); |
366 | 6 | } |
367 | | |
368 | 6 | void StorageEngine::_garbage_sweeper_thread_callback() { |
369 | 6 | uint32_t max_interval = config::max_garbage_sweep_interval; |
370 | 6 | uint32_t min_interval = config::min_garbage_sweep_interval; |
371 | | |
372 | 6 | if (max_interval < min_interval || min_interval <= 0) { |
373 | 0 | LOG(WARNING) << "garbage sweep interval config is illegal: [max=" << max_interval |
374 | 0 | << " min=" << min_interval << "]."; |
375 | 0 | min_interval = 1; |
376 | 0 | max_interval = max_interval >= min_interval ? max_interval : min_interval; |
377 | 0 | LOG(INFO) << "force reset garbage sweep interval. " |
378 | 0 | << "max_interval=" << max_interval << ", min_interval=" << min_interval; |
379 | 0 | } |
380 | | |
381 | 6 | const double pi = M_PI; |
382 | 6 | double usage = 1.0; |
383 | | // After the program starts, the first round of cleaning starts after min_interval. |
384 | 6 | uint32_t curr_interval = min_interval; |
385 | 56 | do { |
386 | | // Function properties: |
387 | | // when usage < 0.6, ratio close to 1.(interval close to max_interval) |
388 | | // when usage at [0.6, 0.75], ratio is rapidly decreasing from 0.87 to 0.27. |
389 | | // when usage > 0.75, ratio is slowly decreasing. |
390 | | // when usage > 0.8, ratio close to min_interval. |
391 | | // when usage = 0.88, ratio is approximately 0.0057. |
392 | 56 | double ratio = (1.1 * (pi / 2 - std::atan(usage * 100 / 5 - 14)) - 0.28) / pi; |
393 | 56 | ratio = ratio > 0 ? ratio : 0; |
394 | | // TODO(dx): fix it |
395 | 56 | auto curr_interval_not_work = uint32_t(max_interval * ratio); |
396 | 56 | curr_interval_not_work = std::max(curr_interval_not_work, min_interval); |
397 | 56 | curr_interval_not_work = std::min(curr_interval_not_work, max_interval); |
398 | | |
399 | | // start clean trash and update usage. |
400 | 56 | Status res = start_trash_sweep(&usage); |
401 | 56 | if (res.ok() && _need_clean_trash.exchange(false, std::memory_order_relaxed)) { |
402 | 0 | res = start_trash_sweep(&usage, true); |
403 | 0 | } |
404 | | |
405 | 56 | if (!res.ok()) { |
406 | 0 | LOG(WARNING) << "one or more errors occur when sweep trash." |
407 | 0 | << "see previous message for detail. err code=" << res; |
408 | | // do nothing. continue next loop. |
409 | 0 | } |
410 | 56 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(curr_interval))); |
411 | 6 | } |
412 | | |
413 | 6 | void StorageEngine::_disk_stat_monitor_thread_callback() { |
414 | 6 | int32_t interval = config::disk_stat_monitor_interval; |
415 | 1.88k | do { |
416 | 1.88k | _start_disk_stat_monitor(); |
417 | | |
418 | 1.88k | interval = config::disk_stat_monitor_interval; |
419 | 1.88k | if (interval <= 0) { |
420 | 0 | LOG(WARNING) << "disk_stat_monitor_interval config is illegal: " << interval |
421 | 0 | << ", force set to 1"; |
422 | 0 | interval = 1; |
423 | 0 | } |
424 | 1.88k | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
425 | 6 | } |
426 | | |
427 | 6 | void StorageEngine::_unused_rowset_monitor_thread_callback() { |
428 | 6 | int32_t interval = config::unused_rowset_monitor_interval; |
429 | 319 | do { |
430 | 319 | start_delete_unused_rowset(); |
431 | | |
432 | 319 | interval = config::unused_rowset_monitor_interval; |
433 | 319 | if (interval <= 0) { |
434 | 0 | LOG(WARNING) << "unused_rowset_monitor_interval config is illegal: " << interval |
435 | 0 | << ", force set to 1"; |
436 | 0 | interval = 1; |
437 | 0 | } |
438 | 319 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
439 | 6 | } |
440 | | |
441 | 2.10k | int32_t StorageEngine::_auto_get_interval_by_disk_capacity(DataDir* data_dir) { |
442 | 2.10k | double disk_used = data_dir->get_usage(0); |
443 | 2.10k | double remain_used = 1 - disk_used; |
444 | 2.10k | DCHECK(remain_used >= 0 && remain_used <= 1); |
445 | 2.10k | DCHECK(config::path_gc_check_interval_second >= 0); |
446 | 2.10k | int32_t ret = 0; |
447 | 2.10k | if (remain_used > 0.9) { |
448 | | // if config::path_gc_check_interval_second == 24h |
449 | 0 | ret = config::path_gc_check_interval_second; |
450 | 2.10k | } else if (remain_used > 0.7) { |
451 | | // 12h |
452 | 0 | ret = config::path_gc_check_interval_second / 2; |
453 | 2.10k | } else if (remain_used > 0.5) { |
454 | | // 6h |
455 | 0 | ret = config::path_gc_check_interval_second / 4; |
456 | 2.10k | } else if (remain_used > 0.3) { |
457 | | // 4h |
458 | 567 | ret = config::path_gc_check_interval_second / 6; |
459 | 1.53k | } else { |
460 | | // 3h |
461 | 1.53k | ret = config::path_gc_check_interval_second / 8; |
462 | 1.53k | } |
463 | 2.10k | return ret; |
464 | 2.10k | } |
465 | | |
466 | 10 | void StorageEngine::_path_gc_thread_callback(DataDir* data_dir) { |
467 | 10 | LOG(INFO) << "try to start path gc thread!"; |
468 | 10 | time_t last_exec_time = 0; |
469 | 2.10k | do { |
470 | 2.10k | time_t current_time = time(nullptr); |
471 | | |
472 | 2.10k | int32_t interval = _auto_get_interval_by_disk_capacity(data_dir); |
473 | 2.10k | DBUG_EXECUTE_IF("_path_gc_thread_callback.interval.eq.1ms", { |
474 | 2.10k | LOG(INFO) << "debug point change interval eq 1ms"; |
475 | 2.10k | interval = 1; |
476 | 2.10k | while (DebugPoints::instance()->is_enable("_path_gc_thread_callback.always.do")) { |
477 | 2.10k | data_dir->perform_path_gc(); |
478 | 2.10k | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
479 | 2.10k | } |
480 | 2.10k | }); |
481 | 2.10k | if (interval <= 0) { |
482 | 2.10k | LOG(WARNING) << "path gc thread check interval config is illegal:" << interval |
483 | 2.10k | << " will be forced set to half hour"; |
484 | 2.10k | interval = 1800; // 0.5 hour |
485 | 2.10k | } |
486 | 2.10k | if (current_time - last_exec_time >= interval) { |
487 | 14 | LOG(INFO) << "try to perform path gc! disk remain [" << 1 - data_dir->get_usage(0) |
488 | 14 | << "] internal [" << interval << "]"; |
489 | 14 | data_dir->perform_path_gc(); |
490 | 14 | last_exec_time = time(nullptr); |
491 | 14 | } |
492 | 2.10k | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(5))); |
493 | 10 | LOG(INFO) << "stop path gc thread!"; |
494 | 10 | } |
495 | | |
496 | 6 | void StorageEngine::_tablet_checkpoint_callback(const std::vector<DataDir*>& data_dirs) { |
497 | 6 | int64_t interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; |
498 | 19 | do { |
499 | 23 | for (auto data_dir : data_dirs) { |
500 | 23 | LOG(INFO) << "begin to produce tablet meta checkpoint tasks, data_dir=" |
501 | 23 | << data_dir->path(); |
502 | 23 | auto st = _tablet_meta_checkpoint_thread_pool->submit_func( |
503 | 23 | [data_dir, this]() { _tablet_manager->do_tablet_meta_checkpoint(data_dir); }); |
504 | 23 | if (!st.ok()) { |
505 | 0 | LOG(WARNING) << "submit tablet checkpoint tasks failed."; |
506 | 0 | } |
507 | 23 | } |
508 | 19 | interval = config::generate_tablet_meta_checkpoint_tasks_interval_secs; |
509 | 19 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
510 | 6 | } |
511 | | |
512 | 6 | void StorageEngine::_tablet_path_check_callback() { |
513 | 6 | struct TabletIdComparator { |
514 | 6 | bool operator()(Tablet* a, Tablet* b) { return a->tablet_id() < b->tablet_id(); } |
515 | 6 | }; |
516 | | |
517 | 6 | using TabletQueue = std::priority_queue<Tablet*, std::vector<Tablet*>, TabletIdComparator>; |
518 | | |
519 | 6 | int64_t interval = config::tablet_path_check_interval_seconds; |
520 | 6 | if (interval <= 0) { |
521 | 6 | return; |
522 | 6 | } |
523 | | |
524 | 0 | int64_t last_tablet_id = 0; |
525 | 0 | do { |
526 | 0 | int32_t batch_size = config::tablet_path_check_batch_size; |
527 | 0 | if (batch_size <= 0) { |
528 | 0 | if (_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))) { |
529 | 0 | break; |
530 | 0 | } |
531 | 0 | continue; |
532 | 0 | } |
533 | | |
534 | 0 | LOG(INFO) << "start to check tablet path"; |
535 | |
|
536 | 0 | auto all_tablets = _tablet_manager->get_all_tablet( |
537 | 0 | [](Tablet* t) { return t->is_used() && t->tablet_state() == TABLET_RUNNING; }); |
538 | |
|
539 | 0 | TabletQueue big_id_tablets; |
540 | 0 | TabletQueue small_id_tablets; |
541 | 0 | for (auto tablet : all_tablets) { |
542 | 0 | auto tablet_id = tablet->tablet_id(); |
543 | 0 | TabletQueue* belong_tablets = nullptr; |
544 | 0 | if (tablet_id > last_tablet_id) { |
545 | 0 | if (big_id_tablets.size() < batch_size || |
546 | 0 | big_id_tablets.top()->tablet_id() > tablet_id) { |
547 | 0 | belong_tablets = &big_id_tablets; |
548 | 0 | } |
549 | 0 | } else if (big_id_tablets.size() < batch_size) { |
550 | 0 | if (small_id_tablets.size() < batch_size || |
551 | 0 | small_id_tablets.top()->tablet_id() > tablet_id) { |
552 | 0 | belong_tablets = &small_id_tablets; |
553 | 0 | } |
554 | 0 | } |
555 | 0 | if (belong_tablets != nullptr) { |
556 | 0 | belong_tablets->push(tablet.get()); |
557 | 0 | if (belong_tablets->size() > batch_size) { |
558 | 0 | belong_tablets->pop(); |
559 | 0 | } |
560 | 0 | } |
561 | 0 | } |
562 | |
|
563 | 0 | int32_t need_small_id_tablet_size = |
564 | 0 | batch_size - static_cast<int32_t>(big_id_tablets.size()); |
565 | |
|
566 | 0 | if (!big_id_tablets.empty()) { |
567 | 0 | last_tablet_id = big_id_tablets.top()->tablet_id(); |
568 | 0 | } |
569 | 0 | while (!big_id_tablets.empty()) { |
570 | 0 | big_id_tablets.top()->check_tablet_path_exists(); |
571 | 0 | big_id_tablets.pop(); |
572 | 0 | } |
573 | |
|
574 | 0 | if (!small_id_tablets.empty() && need_small_id_tablet_size > 0) { |
575 | 0 | while (static_cast<int32_t>(small_id_tablets.size()) > need_small_id_tablet_size) { |
576 | 0 | small_id_tablets.pop(); |
577 | 0 | } |
578 | |
|
579 | 0 | last_tablet_id = small_id_tablets.top()->tablet_id(); |
580 | 0 | while (!small_id_tablets.empty()) { |
581 | 0 | small_id_tablets.top()->check_tablet_path_exists(); |
582 | 0 | small_id_tablets.pop(); |
583 | 0 | } |
584 | 0 | } |
585 | |
|
586 | 0 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
587 | 0 | } |
588 | | |
589 | 8.91k | void StorageEngine::_adjust_compaction_thread_num() { |
590 | 8.91k | TEST_SYNC_POINT_RETURN_WITH_VOID("StorageEngine::_adjust_compaction_thread_num.return_void"); |
591 | 8.91k | auto base_compaction_threads_num = get_base_compaction_threads_num(_store_map.size()); |
592 | 8.91k | if (_base_compaction_thread_pool->max_threads() != base_compaction_threads_num) { |
593 | 0 | int old_max_threads = _base_compaction_thread_pool->max_threads(); |
594 | 0 | Status status = _base_compaction_thread_pool->set_max_threads(base_compaction_threads_num); |
595 | 0 | if (status.ok()) { |
596 | 0 | VLOG_NOTICE << "update base compaction thread pool max_threads from " << old_max_threads |
597 | 0 | << " to " << base_compaction_threads_num; |
598 | 0 | } |
599 | 0 | } |
600 | 8.91k | if (_base_compaction_thread_pool->min_threads() != base_compaction_threads_num) { |
601 | 0 | int old_min_threads = _base_compaction_thread_pool->min_threads(); |
602 | 0 | Status status = _base_compaction_thread_pool->set_min_threads(base_compaction_threads_num); |
603 | 0 | if (status.ok()) { |
604 | 0 | VLOG_NOTICE << "update base compaction thread pool min_threads from " << old_min_threads |
605 | 0 | << " to " << base_compaction_threads_num; |
606 | 0 | } |
607 | 0 | } |
608 | | |
609 | 8.91k | auto cumu_compaction_threads_num = get_cumu_compaction_threads_num(_store_map.size()); |
610 | 8.91k | if (_cumu_compaction_thread_pool->max_threads() != cumu_compaction_threads_num) { |
611 | 0 | int old_max_threads = _cumu_compaction_thread_pool->max_threads(); |
612 | 0 | Status status = _cumu_compaction_thread_pool->set_max_threads(cumu_compaction_threads_num); |
613 | 0 | if (status.ok()) { |
614 | 0 | VLOG_NOTICE << "update cumu compaction thread pool max_threads from " << old_max_threads |
615 | 0 | << " to " << cumu_compaction_threads_num; |
616 | 0 | } |
617 | 0 | } |
618 | 8.91k | if (_cumu_compaction_thread_pool->min_threads() != cumu_compaction_threads_num) { |
619 | 0 | int old_min_threads = _cumu_compaction_thread_pool->min_threads(); |
620 | 0 | Status status = _cumu_compaction_thread_pool->set_min_threads(cumu_compaction_threads_num); |
621 | 0 | if (status.ok()) { |
622 | 0 | VLOG_NOTICE << "update cumu compaction thread pool min_threads from " << old_min_threads |
623 | 0 | << " to " << cumu_compaction_threads_num; |
624 | 0 | } |
625 | 0 | } |
626 | | |
627 | 8.91k | auto single_replica_compaction_threads_num = |
628 | 8.91k | get_single_replica_compaction_threads_num(_store_map.size()); |
629 | 8.91k | if (_single_replica_compaction_thread_pool->max_threads() != |
630 | 8.91k | single_replica_compaction_threads_num) { |
631 | 0 | int old_max_threads = _single_replica_compaction_thread_pool->max_threads(); |
632 | 0 | Status status = _single_replica_compaction_thread_pool->set_max_threads( |
633 | 0 | single_replica_compaction_threads_num); |
634 | 0 | if (status.ok()) { |
635 | 0 | VLOG_NOTICE << "update single replica compaction thread pool max_threads from " |
636 | 0 | << old_max_threads << " to " << single_replica_compaction_threads_num; |
637 | 0 | } |
638 | 0 | } |
639 | 8.91k | if (_single_replica_compaction_thread_pool->min_threads() != |
640 | 8.91k | single_replica_compaction_threads_num) { |
641 | 0 | int old_min_threads = _single_replica_compaction_thread_pool->min_threads(); |
642 | 0 | Status status = _single_replica_compaction_thread_pool->set_min_threads( |
643 | 0 | single_replica_compaction_threads_num); |
644 | 0 | if (status.ok()) { |
645 | 0 | VLOG_NOTICE << "update single replica compaction thread pool min_threads from " |
646 | 0 | << old_min_threads << " to " << single_replica_compaction_threads_num; |
647 | 0 | } |
648 | 0 | } |
649 | 8.91k | } |
650 | | |
651 | 13 | void StorageEngine::_compaction_tasks_producer_callback() { |
652 | 13 | LOG(INFO) << "try to start compaction producer process!"; |
653 | | |
654 | 13 | std::vector<DataDir*> data_dirs = get_stores(); |
655 | 13 | _compaction_submit_registry.reset(data_dirs); |
656 | | |
657 | 13 | int round = 0; |
658 | 13 | CompactionType compaction_type; |
659 | | |
660 | | // Used to record the time when the score metric was last updated. |
661 | | // The update of the score metric is accompanied by the logic of selecting the tablet. |
662 | | // If there is no slot available, the logic of selecting the tablet will be terminated, |
663 | | // which causes the score metric update to be terminated. |
664 | | // In order to avoid this situation, we need to update the score regularly. |
665 | 13 | int64_t last_cumulative_score_update_time = 0; |
666 | 13 | int64_t last_base_score_update_time = 0; |
667 | 13 | static const int64_t check_score_interval_ms = 5000; // 5 secs |
668 | | |
669 | 13 | int64_t interval = config::generate_compaction_tasks_interval_ms; |
670 | 8.92k | do { |
671 | 8.92k | int64_t cur_time = UnixMillis(); |
672 | 8.92k | if (!config::disable_auto_compaction && |
673 | 8.92k | (!config::enable_compaction_pause_on_high_memory || |
674 | 8.92k | !GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE))) { |
675 | 8.92k | _adjust_compaction_thread_num(); |
676 | | |
677 | 8.92k | bool check_score = false; |
678 | 8.92k | if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { |
679 | 8.03k | compaction_type = CompactionType::CUMULATIVE_COMPACTION; |
680 | 8.03k | round++; |
681 | 8.03k | if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { |
682 | 1.47k | check_score = true; |
683 | 1.47k | last_cumulative_score_update_time = cur_time; |
684 | 1.47k | } |
685 | 8.03k | } else { |
686 | 888 | compaction_type = CompactionType::BASE_COMPACTION; |
687 | 888 | round = 0; |
688 | 888 | if (cur_time - last_base_score_update_time >= check_score_interval_ms) { |
689 | 839 | check_score = true; |
690 | 839 | last_base_score_update_time = cur_time; |
691 | 839 | } |
692 | 888 | } |
693 | 8.92k | std::unique_ptr<ThreadPool>& thread_pool = |
694 | 8.92k | (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
695 | 8.92k | ? _cumu_compaction_thread_pool |
696 | 8.92k | : _base_compaction_thread_pool; |
697 | 8.92k | bvar::Status<int64_t>& g_compaction_task_num_per_round = |
698 | 8.92k | (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
699 | 8.92k | ? g_cumu_compaction_task_num_per_round |
700 | 8.92k | : g_base_compaction_task_num_per_round; |
701 | 8.92k | if (config::compaction_num_per_round != -1) { |
702 | 0 | _compaction_num_per_round = config::compaction_num_per_round; |
703 | 8.92k | } else if (thread_pool->get_queue_size() == 0) { |
704 | | // If all tasks in the thread pool queue are executed, |
705 | | // double the number of tasks generated each time, |
706 | | // with a maximum of config::max_automatic_compaction_num_per_round tasks per generation. |
707 | 8.81k | if (_compaction_num_per_round < config::max_automatic_compaction_num_per_round) { |
708 | 37 | _compaction_num_per_round *= 2; |
709 | 37 | g_compaction_task_num_per_round.set_value(_compaction_num_per_round); |
710 | 37 | } |
711 | 8.81k | } else if (thread_pool->get_queue_size() > _compaction_num_per_round / 2) { |
712 | | // If all tasks in the thread pool is greater than |
713 | | // half of the tasks submitted in the previous round, |
714 | | // reduce the number of tasks generated each time by half, with a minimum of 1. |
715 | 3 | if (_compaction_num_per_round > 1) { |
716 | 1 | _compaction_num_per_round /= 2; |
717 | 1 | g_compaction_task_num_per_round.set_value(_compaction_num_per_round); |
718 | 1 | } |
719 | 3 | } |
720 | 8.92k | std::vector<TabletSharedPtr> tablets_compaction = |
721 | 8.92k | _generate_compaction_tasks(compaction_type, data_dirs, check_score); |
722 | 8.92k | if (tablets_compaction.size() == 0) { |
723 | 4.29k | std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex); |
724 | 4.29k | _wakeup_producer_flag = 0; |
725 | | // It is necessary to wake up the thread on timeout to prevent deadlock |
726 | | // in case of no running compaction task. |
727 | 4.29k | _compaction_producer_sleep_cv.wait_for( |
728 | 4.29k | lock, std::chrono::milliseconds(2000), |
729 | 8.58k | [this] { return _wakeup_producer_flag == 1; }); |
730 | 4.29k | continue; |
731 | 4.29k | } |
732 | | |
733 | 195k | for (const auto& tablet : tablets_compaction) { |
734 | 195k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
735 | 43.1k | tablet->set_last_base_compaction_schedule_time(UnixMillis()); |
736 | 152k | } else if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { |
737 | 152k | tablet->set_last_cumu_compaction_schedule_time(UnixMillis()); |
738 | 152k | } else if (compaction_type == CompactionType::FULL_COMPACTION) { |
739 | 0 | tablet->set_last_full_compaction_schedule_time(UnixMillis()); |
740 | 0 | } |
741 | 195k | Status st = _submit_compaction_task(tablet, compaction_type, false); |
742 | 195k | if (!st.ok()) { |
743 | 0 | LOG(WARNING) << "failed to submit compaction task for tablet: " |
744 | 0 | << tablet->tablet_id() << ", err: " << st; |
745 | 0 | } |
746 | 195k | } |
747 | 4.62k | interval = config::generate_compaction_tasks_interval_ms; |
748 | 4.62k | } else { |
749 | 1 | interval = 5000; // 5s to check disable_auto_compaction |
750 | 1 | } |
751 | | |
752 | | // wait some seconds for ut test |
753 | 4.63k | { |
754 | 4.63k | std ::vector<std ::any> args {}; |
755 | 4.63k | args.emplace_back(1); |
756 | 4.63k | doris ::SyncPoint ::get_instance()->process( |
757 | 4.63k | "StorageEngine::_compaction_tasks_producer_callback", std ::move(args)); |
758 | 4.63k | } |
759 | 4.63k | int64_t end_time = UnixMillis(); |
760 | 4.63k | DorisMetrics::instance()->compaction_producer_callback_a_round_time->set_value(end_time - |
761 | 4.63k | cur_time); |
762 | 8.92k | } while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(interval))); |
763 | 13 | } |
764 | | |
765 | 6 | void StorageEngine::_update_replica_infos_callback() { |
766 | | #ifdef GOOGLE_PROFILER |
767 | | ProfilerRegisterThread(); |
768 | | #endif |
769 | 6 | LOG(INFO) << "start to update replica infos!"; |
770 | | |
771 | 6 | int64_t interval = config::update_replica_infos_interval_seconds; |
772 | 161 | do { |
773 | 1.37M | auto all_tablets = _tablet_manager->get_all_tablet([](Tablet* t) { |
774 | 1.37M | return t->is_used() && t->tablet_state() == TABLET_RUNNING && |
775 | 1.37M | !t->tablet_meta()->tablet_schema()->disable_auto_compaction() && |
776 | 1.37M | t->tablet_meta()->tablet_schema()->enable_single_replica_compaction(); |
777 | 1.37M | }); |
778 | 161 | ClusterInfo* cluster_info = ExecEnv::GetInstance()->cluster_info(); |
779 | 161 | if (cluster_info == nullptr) { |
780 | 0 | LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
781 | 0 | std::this_thread::sleep_for(std::chrono::seconds(2)); |
782 | 0 | continue; |
783 | 0 | } |
784 | 161 | TNetworkAddress master_addr = cluster_info->master_fe_addr; |
785 | 161 | if (master_addr.hostname == "" || master_addr.port == 0) { |
786 | 6 | LOG(WARNING) << "Have not get FE Master heartbeat yet"; |
787 | 6 | std::this_thread::sleep_for(std::chrono::seconds(2)); |
788 | 6 | continue; |
789 | 6 | } |
790 | | |
791 | 155 | int start = 0; |
792 | 155 | int tablet_size = cast_set<int>(all_tablets.size()); |
793 | | // The while loop may take a long time, we should skip it when stop |
794 | 155 | while (start < tablet_size && _stop_background_threads_latch.count() > 0) { |
795 | 0 | int batch_size = std::min(100, tablet_size - start); |
796 | 0 | int end = start + batch_size; |
797 | 0 | TGetTabletReplicaInfosRequest request; |
798 | 0 | TGetTabletReplicaInfosResult result; |
799 | 0 | for (int i = start; i < end; i++) { |
800 | 0 | request.tablet_ids.emplace_back(all_tablets[i]->tablet_id()); |
801 | 0 | } |
802 | 0 | Status rpc_st = ThriftRpcHelper::rpc<FrontendServiceClient>( |
803 | 0 | master_addr.hostname, master_addr.port, |
804 | 0 | [&request, &result](FrontendServiceConnection& client) { |
805 | 0 | client->getTabletReplicaInfos(result, request); |
806 | 0 | }); |
807 | |
|
808 | 0 | if (!rpc_st.ok()) { |
809 | 0 | LOG(WARNING) << "Failed to get tablet replica infos, encounter rpc failure, " |
810 | 0 | "tablet start: " |
811 | 0 | << start << " end: " << end; |
812 | 0 | continue; |
813 | 0 | } |
814 | | |
815 | 0 | std::unique_lock<std::mutex> lock(_peer_replica_infos_mutex); |
816 | 0 | for (const auto& it : result.tablet_replica_infos) { |
817 | 0 | auto tablet_id = it.first; |
818 | 0 | auto tablet = _tablet_manager->get_tablet(tablet_id); |
819 | 0 | if (tablet == nullptr) { |
820 | 0 | VLOG_CRITICAL << "tablet ptr is nullptr"; |
821 | 0 | continue; |
822 | 0 | } |
823 | | |
824 | 0 | VLOG_NOTICE << tablet_id << " tablet has " << it.second.size() << " replicas"; |
825 | 0 | uint64_t min_modulo = MOD_PRIME; |
826 | 0 | TReplicaInfo peer_replica; |
827 | 0 | for (const auto& replica : it.second) { |
828 | 0 | int64_t peer_replica_id = replica.replica_id; |
829 | 0 | uint64_t modulo = HashUtil::hash64(&peer_replica_id, sizeof(peer_replica_id), |
830 | 0 | DEFAULT_SEED) % |
831 | 0 | MOD_PRIME; |
832 | 0 | if (modulo < min_modulo) { |
833 | 0 | peer_replica = replica; |
834 | 0 | min_modulo = modulo; |
835 | 0 | } |
836 | 0 | } |
837 | 0 | VLOG_NOTICE << "tablet " << tablet_id << ", peer replica host is " |
838 | 0 | << peer_replica.host; |
839 | 0 | _peer_replica_infos[tablet_id] = peer_replica; |
840 | 0 | } |
841 | 0 | _token = result.token; |
842 | 0 | VLOG_NOTICE << "get tablet replica infos from fe, size is " << end - start |
843 | 0 | << " token = " << result.token; |
844 | 0 | start = end; |
845 | 0 | } |
846 | 155 | interval = config::update_replica_infos_interval_seconds; |
847 | 161 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
848 | 6 | } |
849 | | |
850 | | Status StorageEngine::_submit_single_replica_compaction_task(TabletSharedPtr tablet, |
851 | 0 | CompactionType compaction_type) { |
852 | | // For single replica compaction, the local version to be merged is determined based on the version fetched from the peer replica. |
853 | | // Therefore, it is currently not possible to determine whether it should be a base compaction or cumulative compaction. |
854 | | // As a result, the tablet needs to be pushed to both the _tablet_submitted_cumu_compaction and the _tablet_submitted_base_compaction simultaneously. |
855 | 0 | bool already_exist = |
856 | 0 | _compaction_submit_registry.insert(tablet, CompactionType::CUMULATIVE_COMPACTION); |
857 | 0 | if (already_exist) { |
858 | 0 | return Status::AlreadyExist<false>( |
859 | 0 | "compaction task has already been submitted, tablet_id={}", tablet->tablet_id()); |
860 | 0 | } |
861 | | |
862 | 0 | already_exist = _compaction_submit_registry.insert(tablet, CompactionType::BASE_COMPACTION); |
863 | 0 | if (already_exist) { |
864 | 0 | _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); |
865 | 0 | return Status::AlreadyExist<false>( |
866 | 0 | "compaction task has already been submitted, tablet_id={}", tablet->tablet_id()); |
867 | 0 | } |
868 | | |
869 | 0 | auto compaction = std::make_shared<SingleReplicaCompaction>(*this, tablet, compaction_type); |
870 | 0 | DorisMetrics::instance()->single_compaction_request_total->increment(1); |
871 | 0 | auto st = compaction->prepare_compact(); |
872 | |
|
873 | 0 | auto clean_single_replica_compaction = [tablet, this]() { |
874 | 0 | _pop_tablet_from_submitted_compaction(tablet, CompactionType::CUMULATIVE_COMPACTION); |
875 | 0 | _pop_tablet_from_submitted_compaction(tablet, CompactionType::BASE_COMPACTION); |
876 | 0 | }; |
877 | |
|
878 | 0 | if (!st.ok()) { |
879 | 0 | clean_single_replica_compaction(); |
880 | 0 | if (!st.is<ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION>()) { |
881 | 0 | LOG(WARNING) << "failed to prepare single replica compaction, tablet_id=" |
882 | 0 | << tablet->tablet_id() << " : " << st; |
883 | 0 | return st; |
884 | 0 | } |
885 | 0 | return Status::OK(); // No suitable version, regard as OK |
886 | 0 | } |
887 | | |
888 | 0 | auto submit_st = _single_replica_compaction_thread_pool->submit_func( |
889 | 0 | [tablet, compaction = std::move(compaction), |
890 | 0 | clean_single_replica_compaction]() mutable { |
891 | 0 | tablet->execute_single_replica_compaction(*compaction); |
892 | 0 | clean_single_replica_compaction(); |
893 | 0 | }); |
894 | 0 | if (!submit_st.ok()) { |
895 | 0 | clean_single_replica_compaction(); |
896 | 0 | return Status::InternalError( |
897 | 0 | "failed to submit single replica compaction task to thread pool, " |
898 | 0 | "tablet_id={}", |
899 | 0 | tablet->tablet_id()); |
900 | 0 | } |
901 | 0 | return Status::OK(); |
902 | 0 | } |
903 | | |
904 | | void StorageEngine::get_tablet_rowset_versions(const PGetTabletVersionsRequest* request, |
905 | 0 | PGetTabletVersionsResponse* response) { |
906 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(request->tablet_id()); |
907 | 0 | if (tablet == nullptr) { |
908 | 0 | response->mutable_status()->set_status_code(TStatusCode::CANCELLED); |
909 | 0 | return; |
910 | 0 | } |
911 | 0 | std::vector<Version> local_versions = tablet->get_all_local_versions(); |
912 | 0 | for (const auto& local_version : local_versions) { |
913 | 0 | auto version = response->add_versions(); |
914 | 0 | version->set_first(local_version.first); |
915 | 0 | version->set_second(local_version.second); |
916 | 0 | } |
917 | 0 | response->mutable_status()->set_status_code(0); |
918 | 0 | } |
919 | | |
920 | | bool need_generate_compaction_tasks(int task_cnt_per_disk, int thread_per_disk, |
921 | 20.9k | CompactionType compaction_type, bool all_base) { |
922 | | // We need to reserve at least one Slot for cumulative compaction. |
923 | | // So when there is only one Slot, we have to judge whether there is a cumulative compaction |
924 | | // in the current submitted tasks. |
925 | | // If so, the last Slot can be assigned to Base compaction, |
926 | | // otherwise, this Slot needs to be reserved for cumulative compaction. |
927 | 20.9k | if (task_cnt_per_disk >= thread_per_disk) { |
928 | | // Return if no available slot |
929 | 144 | return false; |
930 | 20.8k | } else if (task_cnt_per_disk >= thread_per_disk - 1) { |
931 | | // Only one slot left, check if it can be assigned to base compaction task. |
932 | 62 | if (compaction_type == CompactionType::BASE_COMPACTION) { |
933 | 2 | if (all_base) { |
934 | 0 | return false; |
935 | 0 | } |
936 | 2 | } |
937 | 62 | } |
938 | 20.8k | return true; |
939 | 20.9k | } |
940 | | |
941 | 10.4k | int get_concurrent_per_disk(int max_score, int thread_per_disk) { |
942 | 10.4k | if (!config::enable_compaction_priority_scheduling) { |
943 | 0 | return thread_per_disk; |
944 | 0 | } |
945 | | |
946 | 10.4k | double load_average = 0; |
947 | 10.4k | if (DorisMetrics::instance()->system_metrics() != nullptr) { |
948 | 8.42k | load_average = DorisMetrics::instance()->system_metrics()->get_load_average_1_min(); |
949 | 8.42k | } |
950 | 10.4k | int num_cores = doris::CpuInfo::num_cores(); |
951 | 10.4k | bool cpu_usage_high = load_average > num_cores * 0.8; |
952 | | |
953 | 10.4k | auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); |
954 | 10.4k | bool memory_usage_high = static_cast<double>(process_memory_usage) > |
955 | 10.4k | static_cast<double>(MemInfo::soft_mem_limit()) * 0.8; |
956 | | |
957 | 10.4k | if (max_score <= config::low_priority_compaction_score_threshold && |
958 | 10.4k | (cpu_usage_high || memory_usage_high)) { |
959 | 4.71k | return config::low_priority_compaction_task_num_per_disk; |
960 | 4.71k | } |
961 | | |
962 | 5.76k | return thread_per_disk; |
963 | 10.4k | } |
964 | | |
965 | 20.9k | int32_t disk_compaction_slot_num(const DataDir& data_dir) { |
966 | 20.9k | return data_dir.is_ssd_disk() ? config::compaction_task_num_per_fast_disk |
967 | 20.9k | : config::compaction_task_num_per_disk; |
968 | 20.9k | } |
969 | | |
970 | | bool has_free_compaction_slot(CompactionSubmitRegistry* registry, DataDir* dir, |
971 | 10.4k | CompactionType compaction_type, uint32_t executing_cnt) { |
972 | 10.4k | int32_t thread_per_disk = disk_compaction_slot_num(*dir); |
973 | 10.4k | return need_generate_compaction_tasks( |
974 | 10.4k | executing_cnt, thread_per_disk, compaction_type, |
975 | 10.4k | !registry->has_compaction_task(dir, CompactionType::CUMULATIVE_COMPACTION)); |
976 | 10.4k | } |
977 | | |
978 | | std::vector<TabletSharedPtr> StorageEngine::_generate_compaction_tasks( |
979 | 8.91k | CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) { |
980 | 8.91k | TEST_SYNC_POINT_RETURN_WITH_VALUE("olap_server::_generate_compaction_tasks.return_empty", |
981 | 8.91k | std::vector<TabletSharedPtr> {}); |
982 | 8.91k | _update_cumulative_compaction_policy(); |
983 | 8.91k | std::vector<TabletSharedPtr> tablets_compaction; |
984 | 8.91k | uint32_t max_compaction_score = 0; |
985 | | |
986 | 8.91k | std::random_device rd; |
987 | 8.91k | std::mt19937 g(rd()); |
988 | 8.91k | std::shuffle(data_dirs.begin(), data_dirs.end(), g); |
989 | | |
990 | | // Copy _tablet_submitted_xxx_compaction map so that we don't need to hold _tablet_submitted_compaction_mutex |
991 | | // when traversing the data dir |
992 | 8.91k | auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot(); |
993 | 10.4k | for (auto* data_dir : data_dirs) { |
994 | 10.4k | bool need_pick_tablet = true; |
995 | 10.4k | uint32_t executing_task_num = |
996 | 10.4k | compaction_registry_snapshot.count_executing_cumu_and_base(data_dir); |
997 | 10.4k | need_pick_tablet = has_free_compaction_slot(&compaction_registry_snapshot, data_dir, |
998 | 10.4k | compaction_type, executing_task_num); |
999 | 10.4k | if (!need_pick_tablet && !check_score) { |
1000 | 0 | continue; |
1001 | 0 | } |
1002 | | |
1003 | | // Even if need_pick_tablet is false, we still need to call find_best_tablet_to_compaction(), |
1004 | | // So that we can update the max_compaction_score metric. |
1005 | 10.4k | if (!data_dir->reach_capacity_limit(0)) { |
1006 | 10.4k | uint32_t disk_max_score = 0; |
1007 | 10.4k | auto tablets = compaction_registry_snapshot.pick_topn_tablets_for_compaction( |
1008 | 10.4k | _tablet_manager.get(), data_dir, compaction_type, |
1009 | 10.4k | _cumulative_compaction_policies, &disk_max_score); |
1010 | 10.4k | int concurrent_num = |
1011 | 10.4k | get_concurrent_per_disk(disk_max_score, disk_compaction_slot_num(*data_dir)); |
1012 | 10.4k | need_pick_tablet = need_generate_compaction_tasks( |
1013 | 10.4k | executing_task_num, concurrent_num, compaction_type, |
1014 | 10.4k | !compaction_registry_snapshot.has_compaction_task( |
1015 | 10.4k | data_dir, CompactionType::CUMULATIVE_COMPACTION)); |
1016 | 197k | for (const auto& tablet : tablets) { |
1017 | 197k | if (tablet != nullptr) { |
1018 | 197k | if (need_pick_tablet) { |
1019 | 196k | tablets_compaction.emplace_back(tablet); |
1020 | 196k | } |
1021 | 197k | max_compaction_score = std::max(max_compaction_score, disk_max_score); |
1022 | 197k | } |
1023 | 197k | } |
1024 | 10.4k | } |
1025 | 10.4k | } |
1026 | | |
1027 | 8.91k | if (max_compaction_score > 0) { |
1028 | 4.67k | if (compaction_type == CompactionType::BASE_COMPACTION) { |
1029 | 884 | DorisMetrics::instance()->tablet_base_max_compaction_score->set_value( |
1030 | 884 | max_compaction_score); |
1031 | 3.79k | } else { |
1032 | 3.79k | DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value( |
1033 | 3.79k | max_compaction_score); |
1034 | 3.79k | } |
1035 | 4.67k | } |
1036 | 8.91k | return tablets_compaction; |
1037 | 8.91k | } |
1038 | | |
1039 | 8.91k | void StorageEngine::_update_cumulative_compaction_policy() { |
1040 | 8.91k | if (_cumulative_compaction_policies.empty()) { |
1041 | 6 | _cumulative_compaction_policies[CUMULATIVE_SIZE_BASED_POLICY] = |
1042 | 6 | CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
1043 | 6 | CUMULATIVE_SIZE_BASED_POLICY); |
1044 | 6 | _cumulative_compaction_policies[CUMULATIVE_TIME_SERIES_POLICY] = |
1045 | 6 | CumulativeCompactionPolicyFactory::create_cumulative_compaction_policy( |
1046 | 6 | CUMULATIVE_TIME_SERIES_POLICY); |
1047 | 6 | } |
1048 | 8.91k | } |
1049 | | |
1050 | | void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet, |
1051 | 195k | CompactionType compaction_type) { |
1052 | 195k | _compaction_submit_registry.remove(tablet, compaction_type, [this]() { |
1053 | 195k | std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex); |
1054 | 195k | _wakeup_producer_flag = 1; |
1055 | 195k | _compaction_producer_sleep_cv.notify_one(); |
1056 | 195k | }); |
1057 | 195k | } |
1058 | | |
1059 | | Status StorageEngine::_submit_compaction_task(TabletSharedPtr tablet, |
1060 | | CompactionType compaction_type, bool force, |
1061 | 196k | int trigger_method) { |
1062 | 196k | if (tablet->tablet_meta()->tablet_schema()->enable_single_replica_compaction() && |
1063 | 196k | should_fetch_from_peer(tablet->tablet_id())) { |
1064 | 0 | VLOG_CRITICAL << "start to submit single replica compaction task for tablet: " |
1065 | 0 | << tablet->tablet_id(); |
1066 | 0 | Status st = _submit_single_replica_compaction_task(tablet, compaction_type); |
1067 | 0 | if (!st.ok()) { |
1068 | 0 | LOG(WARNING) << "failed to submit single replica compaction task for tablet: " |
1069 | 0 | << tablet->tablet_id() << ", err: " << st; |
1070 | 0 | } |
1071 | |
|
1072 | 0 | return Status::OK(); |
1073 | 0 | } |
1074 | 196k | bool already_exist = _compaction_submit_registry.insert(tablet, compaction_type); |
1075 | 196k | if (already_exist) { |
1076 | 0 | return Status::AlreadyExist<false>( |
1077 | 0 | "compaction task has already been submitted, tablet_id={}, compaction_type={}.", |
1078 | 0 | tablet->tablet_id(), compaction_type); |
1079 | 0 | } |
1080 | 196k | tablet->compaction_stage = CompactionStage::PENDING; |
1081 | 196k | std::shared_ptr<CompactionMixin> compaction; |
1082 | 196k | int64_t permits = 0; |
1083 | 196k | Status st = Tablet::prepare_compaction_and_calculate_permits(compaction_type, tablet, |
1084 | 196k | compaction, permits); |
1085 | 196k | if (st.ok() && permits > 0) { |
1086 | 1.37k | if (!force) { |
1087 | 1.37k | _permit_limiter.request(permits); |
1088 | 1.37k | } |
1089 | | // Register task with CompactionTaskTracker as PENDING |
1090 | 1.37k | auto* tracker = CompactionTaskTracker::instance(); |
1091 | 1.37k | int64_t compaction_id = compaction->compaction_id(); |
1092 | 1.37k | { |
1093 | 1.37k | CompactionTaskInfo info; |
1094 | 1.37k | info.compaction_id = compaction_id; |
1095 | 1.37k | info.tablet_id = tablet->tablet_id(); |
1096 | 1.37k | info.table_id = tablet->get_table_id(); |
1097 | 1.37k | info.partition_id = tablet->partition_id(); |
1098 | 1.37k | info.compaction_type = (compaction_type == CompactionType::BASE_COMPACTION) |
1099 | 1.37k | ? CompactionProfileType::BASE |
1100 | 1.37k | : (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
1101 | 1.37k | ? CompactionProfileType::CUMULATIVE |
1102 | 1.37k | : CompactionProfileType::FULL; |
1103 | 1.37k | info.status = CompactionTaskStatus::PENDING; |
1104 | 1.37k | info.trigger_method = static_cast<TriggerMethod>(trigger_method); |
1105 | 1.37k | info.scheduled_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
1106 | 1.37k | std::chrono::system_clock::now().time_since_epoch()) |
1107 | 1.37k | .count(); |
1108 | 1.37k | info.permits = permits; |
1109 | 1.37k | info.backend_id = BackendOptions::get_backend_id(); |
1110 | 1.37k | info.compaction_score = tablet->get_real_compaction_score(); |
1111 | 1.37k | info.input_rowsets_count = compaction->input_rowsets_count(); |
1112 | 1.37k | info.input_row_num = compaction->input_row_num_value(); |
1113 | 1.37k | info.input_data_size = compaction->input_rowsets_data_size(); |
1114 | 1.37k | info.input_index_size = compaction->input_rowsets_index_size(); |
1115 | 1.37k | info.input_total_size = compaction->input_rowsets_total_size(); |
1116 | 1.37k | info.input_segments_num = compaction->input_segments_num_value(); |
1117 | 1.37k | info.input_version_range = compaction->input_version_range_str(); |
1118 | 1.37k | info.is_vertical = compaction->is_vertical(); |
1119 | 1.37k | tracker->register_task(std::move(info)); |
1120 | 1.37k | } |
1121 | 1.37k | std::unique_ptr<ThreadPool>& thread_pool = |
1122 | 1.37k | (compaction_type == CompactionType::CUMULATIVE_COMPACTION) |
1123 | 1.37k | ? _cumu_compaction_thread_pool |
1124 | 1.37k | : _base_compaction_thread_pool; |
1125 | 1.37k | VLOG_CRITICAL << "compaction thread pool. type: " |
1126 | 0 | << (compaction_type == CompactionType::CUMULATIVE_COMPACTION ? "CUMU" |
1127 | 0 | : "BASE") |
1128 | 0 | << ", num_threads: " << thread_pool->num_threads() |
1129 | 0 | << ", num_threads_pending_start: " << thread_pool->num_threads_pending_start() |
1130 | 0 | << ", num_active_threads: " << thread_pool->num_active_threads() |
1131 | 0 | << ", max_threads: " << thread_pool->max_threads() |
1132 | 0 | << ", min_threads: " << thread_pool->min_threads() |
1133 | 0 | << ", num_total_queued_tasks: " << thread_pool->get_queue_size(); |
1134 | 1.37k | auto status = thread_pool->submit_func([=, compaction = std::move(compaction), this]() { |
1135 | 1.36k | _handle_compaction(std::move(tablet), std::move(compaction), compaction_type, permits, |
1136 | 1.36k | force, compaction_id); |
1137 | 1.36k | }); |
1138 | 1.37k | if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { |
1139 | 1.37k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1140 | 1.37k | _cumu_compaction_thread_pool->get_queue_size()); |
1141 | 1.37k | } else if (compaction_type == CompactionType::BASE_COMPACTION) { |
1142 | 2 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
1143 | 2 | _base_compaction_thread_pool->get_queue_size()); |
1144 | 2 | } |
1145 | 1.37k | if (!st.ok()) { |
1146 | | // Cleanup tracker on submit failure |
1147 | 0 | tracker->remove_task(compaction_id); |
1148 | 0 | if (!force) { |
1149 | 0 | _permit_limiter.release(permits); |
1150 | 0 | } |
1151 | 0 | _pop_tablet_from_submitted_compaction(tablet, compaction_type); |
1152 | 0 | tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; |
1153 | 0 | return Status::InternalError( |
1154 | 0 | "failed to submit compaction task to thread pool, " |
1155 | 0 | "tablet_id={}, compaction_type={}.", |
1156 | 0 | tablet->tablet_id(), compaction_type); |
1157 | 0 | } |
1158 | 1.37k | return Status::OK(); |
1159 | 194k | } else { |
1160 | 194k | _pop_tablet_from_submitted_compaction(tablet, compaction_type); |
1161 | 194k | tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; |
1162 | 194k | if (!st.ok()) { |
1163 | 0 | return Status::InternalError( |
1164 | 0 | "failed to prepare compaction task and calculate permits, " |
1165 | 0 | "tablet_id={}, compaction_type={}, " |
1166 | 0 | "permit={}, current_permit={}, status={}", |
1167 | 0 | tablet->tablet_id(), compaction_type, permits, _permit_limiter.usage(), |
1168 | 0 | st.to_string()); |
1169 | 0 | } |
1170 | 194k | return st; |
1171 | 194k | } |
1172 | 196k | } |
1173 | | |
1174 | | void StorageEngine::_handle_compaction(TabletSharedPtr tablet, |
1175 | | std::shared_ptr<CompactionMixin> compaction, |
1176 | | CompactionType compaction_type, int64_t permits, bool force, |
1177 | 1.35k | int64_t compaction_id) { |
1178 | 1.35k | if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) [[likely]] { |
1179 | 1.35k | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(1); |
1180 | 1.35k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1181 | 1.35k | _cumu_compaction_thread_pool->get_queue_size()); |
1182 | 1.35k | } else if (compaction_type == CompactionType::BASE_COMPACTION) { |
1183 | 2 | DorisMetrics::instance()->base_compaction_task_running_total->increment(1); |
1184 | 2 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
1185 | 2 | _base_compaction_thread_pool->get_queue_size()); |
1186 | 2 | } |
1187 | 1.35k | bool is_large_task = true; |
1188 | 1.36k | Defer defer {[&]() { |
1189 | 1.36k | DBUG_EXECUTE_IF("StorageEngine._submit_compaction_task.sleep", { sleep(5); }) |
1190 | | // Idempotent cleanup: remove task from tracker |
1191 | 1.36k | CompactionTaskTracker::instance()->remove_task(compaction_id); |
1192 | 1.36k | if (!force) { |
1193 | 1.36k | _permit_limiter.release(permits); |
1194 | 1.36k | } |
1195 | 1.36k | _pop_tablet_from_submitted_compaction(tablet, compaction_type); |
1196 | 1.36k | tablet->compaction_stage = CompactionStage::NOT_SCHEDULED; |
1197 | 1.36k | if (compaction_type == CompactionType::CUMULATIVE_COMPACTION) { |
1198 | 1.36k | std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx); |
1199 | 1.36k | _cumu_compaction_thread_pool_used_threads--; |
1200 | 1.36k | if (!is_large_task) { |
1201 | 4 | _cumu_compaction_thread_pool_small_tasks_running--; |
1202 | 4 | } |
1203 | 1.36k | DorisMetrics::instance()->cumulative_compaction_task_running_total->increment(-1); |
1204 | 1.36k | DorisMetrics::instance()->cumulative_compaction_task_pending_total->set_value( |
1205 | 1.36k | _cumu_compaction_thread_pool->get_queue_size()); |
1206 | 1.36k | } else if (compaction_type == CompactionType::BASE_COMPACTION) { |
1207 | 2 | DorisMetrics::instance()->base_compaction_task_running_total->increment(-1); |
1208 | 2 | DorisMetrics::instance()->base_compaction_task_pending_total->set_value( |
1209 | 2 | _base_compaction_thread_pool->get_queue_size()); |
1210 | 2 | } |
1211 | 1.36k | }}; |
1212 | 1.35k | do { |
1213 | 1.35k | if (compaction->compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { |
1214 | 1.35k | std::lock_guard<std::mutex> lock(_cumu_compaction_delay_mtx); |
1215 | 1.35k | _cumu_compaction_thread_pool_used_threads++; |
1216 | 1.35k | if (config::large_cumu_compaction_task_min_thread_num > 1 && |
1217 | 1.35k | _cumu_compaction_thread_pool->max_threads() >= |
1218 | 22 | config::large_cumu_compaction_task_min_thread_num) { |
1219 | | // Determine if this is a large task based on configured thresholds |
1220 | 4 | is_large_task = (compaction->calc_input_rowsets_total_size() > |
1221 | 4 | config::large_cumu_compaction_task_bytes_threshold || |
1222 | 4 | compaction->calc_input_rowsets_row_num() > |
1223 | 4 | config::large_cumu_compaction_task_row_num_threshold); |
1224 | | |
1225 | | // Small task. No delay needed |
1226 | 4 | if (!is_large_task) { |
1227 | 4 | _cumu_compaction_thread_pool_small_tasks_running++; |
1228 | 4 | break; |
1229 | 4 | } |
1230 | | // Deal with large task |
1231 | 0 | if (_should_delay_large_task()) { |
1232 | 0 | LOG_WARNING( |
1233 | 0 | "failed to do CumulativeCompaction, cumu thread pool is " |
1234 | 0 | "intensive, delay large task.") |
1235 | 0 | .tag("tablet_id", tablet->tablet_id()) |
1236 | 0 | .tag("input_rows", compaction->calc_input_rowsets_row_num()) |
1237 | 0 | .tag("input_rowsets_total_size", |
1238 | 0 | compaction->calc_input_rowsets_total_size()) |
1239 | 0 | .tag("config::large_cumu_compaction_task_bytes_threshold", |
1240 | 0 | config::large_cumu_compaction_task_bytes_threshold) |
1241 | 0 | .tag("config::large_cumu_compaction_task_row_num_threshold", |
1242 | 0 | config::large_cumu_compaction_task_row_num_threshold) |
1243 | 0 | .tag("remaining threads", _cumu_compaction_thread_pool_used_threads) |
1244 | 0 | .tag("small_tasks_running", |
1245 | 0 | _cumu_compaction_thread_pool_small_tasks_running); |
1246 | | // Delay this task and sleep 5s for this tablet |
1247 | 0 | long now = duration_cast<std::chrono::milliseconds>( |
1248 | 0 | std::chrono::system_clock::now().time_since_epoch()) |
1249 | 0 | .count(); |
1250 | 0 | tablet->set_last_cumu_compaction_failure_time(now); |
1251 | 0 | return; |
1252 | 0 | } |
1253 | 0 | } |
1254 | 1.35k | } |
1255 | 1.35k | } while (false); |
1256 | 1.35k | if (!tablet->can_do_compaction(tablet->data_dir()->path_hash(), compaction_type)) { |
1257 | 0 | LOG(INFO) << "Tablet state has been changed, no need to begin this compaction " |
1258 | 0 | "task, tablet_id=" |
1259 | 0 | << tablet->tablet_id() << ", tablet_state=" << tablet->tablet_state(); |
1260 | 0 | return; |
1261 | 0 | } |
1262 | 1.35k | tablet->compaction_stage = CompactionStage::EXECUTING; |
1263 | | // Update tracker to RUNNING |
1264 | 1.35k | { |
1265 | 1.35k | RunningStats rs; |
1266 | 1.35k | rs.start_time_ms = std::chrono::duration_cast<std::chrono::milliseconds>( |
1267 | 1.35k | std::chrono::system_clock::now().time_since_epoch()) |
1268 | 1.35k | .count(); |
1269 | 1.35k | rs.permits = permits; |
1270 | 1.35k | CompactionTaskTracker::instance()->update_to_running(compaction_id, rs); |
1271 | 1.35k | } |
1272 | 1.35k | TEST_SYNC_POINT_RETURN_WITH_VOID("olap_server::execute_compaction"); |
1273 | 1.35k | tablet->execute_compaction(*compaction); |
1274 | 1.35k | } |
1275 | | |
1276 | | Status StorageEngine::submit_compaction_task(TabletSharedPtr tablet, CompactionType compaction_type, |
1277 | 0 | bool force, bool eager, int trigger_method) { |
1278 | 0 | if (!eager) { |
1279 | 0 | DCHECK(compaction_type == CompactionType::BASE_COMPACTION || |
1280 | 0 | compaction_type == CompactionType::CUMULATIVE_COMPACTION); |
1281 | 0 | auto compaction_registry_snapshot = _compaction_submit_registry.create_snapshot(); |
1282 | 0 | auto stores = get_stores(); |
1283 | |
|
1284 | 0 | bool is_busy = std::none_of( |
1285 | 0 | stores.begin(), stores.end(), |
1286 | 0 | [&compaction_registry_snapshot, compaction_type](auto* data_dir) { |
1287 | 0 | return has_free_compaction_slot( |
1288 | 0 | &compaction_registry_snapshot, data_dir, compaction_type, |
1289 | 0 | compaction_registry_snapshot.count_executing_cumu_and_base(data_dir)); |
1290 | 0 | }); |
1291 | 0 | if (is_busy) { |
1292 | 0 | LOG_EVERY_N(WARNING, 100) |
1293 | 0 | << "Too busy to submit a compaction task, tablet=" << tablet->get_table_id(); |
1294 | 0 | return Status::OK(); |
1295 | 0 | } |
1296 | 0 | } |
1297 | 0 | _update_cumulative_compaction_policy(); |
1298 | | // alter table tableName set ("compaction_policy"="time_series") |
1299 | | // if atler table's compaction policy, we need to modify tablet compaction policy shared ptr |
1300 | 0 | if (tablet->get_cumulative_compaction_policy() == nullptr || |
1301 | 0 | tablet->get_cumulative_compaction_policy()->name() != |
1302 | 0 | tablet->tablet_meta()->compaction_policy()) { |
1303 | 0 | tablet->set_cumulative_compaction_policy( |
1304 | 0 | _cumulative_compaction_policies.at(tablet->tablet_meta()->compaction_policy())); |
1305 | 0 | } |
1306 | 0 | tablet->set_skip_compaction(false); |
1307 | 0 | return _submit_compaction_task(tablet, compaction_type, force, trigger_method); |
1308 | 0 | } |
1309 | | |
1310 | | Status StorageEngine::_handle_seg_compaction(std::shared_ptr<SegcompactionWorker> worker, |
1311 | | SegCompactionCandidatesSharedPtr segments, |
1312 | 11 | uint64_t submission_time) { |
1313 | | // note: be aware that worker->_writer maybe released when the task is cancelled |
1314 | 11 | uint64_t exec_queue_time = GetCurrentTimeMicros() - submission_time; |
1315 | 11 | LOG(INFO) << "segcompaction thread pool queue time(ms): " << exec_queue_time / 1000; |
1316 | 11 | worker->compact_segments(segments); |
1317 | | // return OK here. error will be reported via BetaRowsetWriter::_segcompaction_status |
1318 | 11 | return Status::OK(); |
1319 | 11 | } |
1320 | | |
1321 | | Status StorageEngine::submit_seg_compaction_task(std::shared_ptr<SegcompactionWorker> worker, |
1322 | 11 | SegCompactionCandidatesSharedPtr segments) { |
1323 | 11 | uint64_t submission_time = GetCurrentTimeMicros(); |
1324 | 11 | return _seg_compaction_thread_pool->submit_func([this, worker, segments, submission_time] { |
1325 | 11 | static_cast<void>(_handle_seg_compaction(worker, segments, submission_time)); |
1326 | 11 | }); |
1327 | 11 | } |
1328 | | |
1329 | 0 | Status StorageEngine::process_index_change_task(const TAlterInvertedIndexReq& request) { |
1330 | 0 | auto tablet_id = request.tablet_id; |
1331 | 0 | TabletSharedPtr tablet = _tablet_manager->get_tablet(tablet_id); |
1332 | 0 | DBUG_EXECUTE_IF("StorageEngine::process_index_change_task_tablet_nullptr", |
1333 | 0 | { tablet = nullptr; }) |
1334 | 0 | if (tablet == nullptr) { |
1335 | 0 | LOG(WARNING) << "tablet: " << tablet_id << " not exist"; |
1336 | 0 | return Status::InternalError("tablet not exist, tablet_id={}.", tablet_id); |
1337 | 0 | } |
1338 | | |
1339 | 0 | IndexBuilderSharedPtr index_builder = std::make_shared<IndexBuilder>( |
1340 | 0 | *this, tablet, request.columns, request.alter_inverted_indexes, request.is_drop_op); |
1341 | 0 | RETURN_IF_ERROR(_handle_index_change(index_builder)); |
1342 | 0 | return Status::OK(); |
1343 | 0 | } |
1344 | | |
1345 | 0 | Status StorageEngine::_handle_index_change(IndexBuilderSharedPtr index_builder) { |
1346 | 0 | RETURN_IF_ERROR(index_builder->init()); |
1347 | 0 | RETURN_IF_ERROR(index_builder->do_build_inverted_index()); |
1348 | 0 | return Status::OK(); |
1349 | 0 | } |
1350 | | |
1351 | 6 | void StorageEngine::_cooldown_tasks_producer_callback() { |
1352 | 6 | int64_t interval = config::generate_cooldown_task_interval_sec; |
1353 | | // the cooldown replica may be slow to upload it's meta file, so we should wait |
1354 | | // until it has done uploaded |
1355 | 6 | int64_t skip_failed_interval = interval * 10; |
1356 | 476 | do { |
1357 | | // these tables are ordered by priority desc |
1358 | 476 | std::vector<TabletSharedPtr> tablets; |
1359 | 476 | std::vector<RowsetSharedPtr> rowsets; |
1360 | | // TODO(luwei) : a more efficient way to get cooldown tablets |
1361 | 476 | auto cur_time = time(nullptr); |
1362 | | // we should skip all the tablets which are not running and those pending to do cooldown |
1363 | | // also tablets once failed to do follow cooldown |
1364 | 476 | auto skip_tablet = [this, skip_failed_interval, |
1365 | 3.68M | cur_time](const TabletSharedPtr& tablet) -> bool { |
1366 | 3.68M | bool is_skip = |
1367 | 3.68M | cur_time - tablet->last_failed_follow_cooldown_time() < skip_failed_interval || |
1368 | 3.68M | TABLET_RUNNING != tablet->tablet_state(); |
1369 | 3.68M | if (is_skip) { |
1370 | 0 | return is_skip; |
1371 | 0 | } |
1372 | 3.68M | std::lock_guard<std::mutex> lock(_running_cooldown_mutex); |
1373 | 3.68M | return _running_cooldown_tablets.find(tablet->tablet_id()) != |
1374 | 3.68M | _running_cooldown_tablets.end(); |
1375 | 3.68M | }; |
1376 | 476 | _tablet_manager->get_cooldown_tablets(&tablets, &rowsets, std::move(skip_tablet)); |
1377 | 476 | LOG(INFO) << "cooldown producer get tablet num: " << tablets.size(); |
1378 | 476 | int max_priority = cast_set<int>(tablets.size()); |
1379 | 476 | int index = 0; |
1380 | 476 | for (const auto& tablet : tablets) { |
1381 | 0 | { |
1382 | 0 | std::lock_guard<std::mutex> lock(_running_cooldown_mutex); |
1383 | 0 | _running_cooldown_tablets.insert(tablet->tablet_id()); |
1384 | 0 | } |
1385 | 0 | PriorityThreadPool::Task task; |
1386 | 0 | RowsetSharedPtr rowset = std::move(rowsets[index++]); |
1387 | 0 | task.work_function = [tablet, rowset, task_size = tablets.size(), this]() { |
1388 | 0 | Status st = tablet->cooldown(rowset); |
1389 | 0 | { |
1390 | 0 | std::lock_guard<std::mutex> lock(_running_cooldown_mutex); |
1391 | 0 | _running_cooldown_tablets.erase(tablet->tablet_id()); |
1392 | 0 | } |
1393 | 0 | if (!st.ok()) { |
1394 | 0 | LOG(WARNING) << "failed to cooldown, tablet: " << tablet->tablet_id() |
1395 | 0 | << " err: " << st; |
1396 | 0 | } else { |
1397 | 0 | LOG(INFO) << "succeed to cooldown, tablet: " << tablet->tablet_id() |
1398 | 0 | << " cooldown progress (" |
1399 | 0 | << task_size - _cooldown_thread_pool->get_queue_size() << "/" |
1400 | 0 | << task_size << ")"; |
1401 | 0 | } |
1402 | 0 | }; |
1403 | 0 | task.priority = max_priority--; |
1404 | 0 | bool submited = _cooldown_thread_pool->offer(std::move(task)); |
1405 | |
|
1406 | 0 | if (!submited) { |
1407 | 0 | LOG(INFO) << "failed to submit cooldown task"; |
1408 | 0 | } |
1409 | 0 | } |
1410 | 476 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval))); |
1411 | 6 | } |
1412 | | |
1413 | 6 | void StorageEngine::_remove_unused_remote_files_callback() { |
1414 | 22 | while (!_stop_background_threads_latch.wait_for( |
1415 | 22 | std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) { |
1416 | 16 | LOG(INFO) << "begin to remove unused remote files"; |
1417 | 16 | do_remove_unused_remote_files(); |
1418 | 16 | } |
1419 | 6 | } |
1420 | | |
1421 | | static void collect_tablet_unused_remote_files( |
1422 | | Tablet* t, TConfirmUnusedRemoteFilesRequest& req, |
1423 | | std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>& buffer, |
1424 | 7 | int64_t& num_files_in_buffer, PendingRowsetSet& pending_remote_rowsets) { |
1425 | 7 | auto storage_resource = get_resource_by_storage_policy_id(t->storage_policy_id()); |
1426 | 7 | if (!storage_resource) { |
1427 | 0 | LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" |
1428 | 0 | << t->tablet_id() << " : " << storage_resource.error(); |
1429 | 0 | return; |
1430 | 0 | } |
1431 | | |
1432 | | // TODO(plat1ko): Support path v1 |
1433 | 7 | if (storage_resource->path_version > 0) { |
1434 | 0 | return; |
1435 | 0 | } |
1436 | | |
1437 | 7 | std::vector<io::FileInfo> files; |
1438 | | // FIXME(plat1ko): What if user reset resource in storage policy to another resource? |
1439 | | // Maybe we should also list files in previously uploaded resources. |
1440 | 7 | bool exists = true; |
1441 | 7 | auto st = storage_resource->fs->list(storage_resource->remote_tablet_path(t->tablet_id()), true, |
1442 | 7 | &files, &exists); |
1443 | 7 | if (!st.ok()) { |
1444 | 0 | LOG(WARNING) << "encounter error when remove unused remote files, tablet_id=" |
1445 | 0 | << t->tablet_id() << " : " << st; |
1446 | 0 | return; |
1447 | 0 | } |
1448 | 7 | if (!exists || files.empty()) { |
1449 | 0 | return; |
1450 | 0 | } |
1451 | | // get all cooldowned rowsets |
1452 | 7 | RowsetIdUnorderedSet cooldowned_rowsets; |
1453 | 7 | UniqueId cooldown_meta_id; |
1454 | 7 | { |
1455 | 7 | std::shared_lock rlock(t->get_header_lock()); |
1456 | 14 | for (const auto& [_, rs_meta] : t->tablet_meta()->all_rs_metas()) { |
1457 | 14 | if (!rs_meta->is_local()) { |
1458 | 14 | cooldowned_rowsets.insert(rs_meta->rowset_id()); |
1459 | 14 | } |
1460 | 14 | } |
1461 | 7 | if (cooldowned_rowsets.empty()) { |
1462 | 0 | return; |
1463 | 0 | } |
1464 | 7 | cooldown_meta_id = t->tablet_meta()->cooldown_meta_id(); |
1465 | 7 | } |
1466 | 0 | auto [cooldown_term, cooldown_replica_id] = t->cooldown_conf(); |
1467 | 7 | if (cooldown_replica_id != t->replica_id()) { |
1468 | 0 | return; |
1469 | 0 | } |
1470 | | // {cooldown_replica_id}.{cooldown_term}.meta |
1471 | 7 | std::string remote_meta_path = |
1472 | 7 | cooldown_tablet_meta_filename(cooldown_replica_id, cooldown_term); |
1473 | | // filter out the paths that should be reserved |
1474 | 14 | auto filter = [&](io::FileInfo& info) { |
1475 | 14 | std::string_view filename = info.file_name; |
1476 | 14 | if (filename.ends_with(".meta")) { |
1477 | 7 | return filename == remote_meta_path; |
1478 | 7 | } |
1479 | 7 | auto rowset_id = extract_rowset_id(filename); |
1480 | 7 | if (rowset_id.hi == 0) { |
1481 | 0 | return false; |
1482 | 0 | } |
1483 | 7 | return cooldowned_rowsets.contains(rowset_id) || pending_remote_rowsets.contains(rowset_id); |
1484 | 7 | }; |
1485 | 7 | files.erase(std::remove_if(files.begin(), files.end(), std::move(filter)), files.end()); |
1486 | 7 | if (files.empty()) { |
1487 | 7 | return; |
1488 | 7 | } |
1489 | 0 | files.shrink_to_fit(); |
1490 | 0 | num_files_in_buffer += files.size(); |
1491 | 0 | buffer.insert({t->tablet_id(), {*storage_resource, std::move(files)}}); |
1492 | 0 | auto& info = req.confirm_list.emplace_back(); |
1493 | 0 | info.__set_tablet_id(t->tablet_id()); |
1494 | 0 | info.__set_cooldown_replica_id(cooldown_replica_id); |
1495 | 0 | info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift()); |
1496 | 0 | } |
1497 | | |
1498 | | static void confirm_and_remove_unused_remote_files( |
1499 | | const TConfirmUnusedRemoteFilesRequest& req, |
1500 | | std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>& buffer, |
1501 | 0 | const int64_t num_files_in_buffer) { |
1502 | 0 | TConfirmUnusedRemoteFilesResult result; |
1503 | 0 | LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << buffer.size() |
1504 | 0 | << " num_files=" << num_files_in_buffer; |
1505 | 0 | auto st = MasterServerClient::instance()->confirm_unused_remote_files(req, &result); |
1506 | 0 | if (!st.ok()) { |
1507 | 0 | LOG(WARNING) << st; |
1508 | 0 | return; |
1509 | 0 | } |
1510 | 0 | for (auto id : result.confirmed_tablets) { |
1511 | 0 | if (auto it = buffer.find(id); LIKELY(it != buffer.end())) { |
1512 | 0 | auto& storage_resource = it->second.first; |
1513 | 0 | auto& files = it->second.second; |
1514 | 0 | std::vector<io::Path> paths; |
1515 | 0 | paths.reserve(files.size()); |
1516 | | // delete unused files |
1517 | 0 | LOG(INFO) << "delete unused files. root_path=" << storage_resource.fs->root_path() |
1518 | 0 | << " tablet_id=" << id; |
1519 | 0 | io::Path dir = storage_resource.remote_tablet_path(id); |
1520 | 0 | for (auto& file : files) { |
1521 | 0 | auto file_path = dir / file.file_name; |
1522 | 0 | LOG(INFO) << "delete unused file: " << file_path.native(); |
1523 | 0 | paths.push_back(std::move(file_path)); |
1524 | 0 | } |
1525 | 0 | st = storage_resource.fs->batch_delete(paths); |
1526 | 0 | if (!st.ok()) { |
1527 | 0 | LOG(WARNING) << "failed to delete unused files, tablet_id=" << id << " : " << st; |
1528 | 0 | } |
1529 | 0 | buffer.erase(it); |
1530 | 0 | } |
1531 | 0 | } |
1532 | 0 | } |
1533 | | |
1534 | 16 | void StorageEngine::do_remove_unused_remote_files() { |
1535 | 979k | auto tablets = tablet_manager()->get_all_tablet([](Tablet* t) { |
1536 | 979k | return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() && |
1537 | 979k | t->tablet_state() == TABLET_RUNNING && |
1538 | 979k | t->cooldown_conf_unlocked().cooldown_replica_id == t->replica_id(); |
1539 | 979k | }); |
1540 | 16 | TConfirmUnusedRemoteFilesRequest req; |
1541 | 16 | req.__isset.confirm_list = true; |
1542 | | // tablet_id -> [storage_resource, unused_remote_files] |
1543 | 16 | using unused_remote_files_buffer_t = |
1544 | 16 | std::unordered_map<int64_t, std::pair<StorageResource, std::vector<io::FileInfo>>>; |
1545 | 16 | unused_remote_files_buffer_t buffer; |
1546 | 16 | int64_t num_files_in_buffer = 0; |
1547 | | // assume a filename is 0.1KB, buffer size should not larger than 100MB |
1548 | 16 | constexpr int64_t max_files_in_buffer = 1000000; |
1549 | | |
1550 | | // batch confirm to reduce FE's overhead |
1551 | 16 | auto next_confirm_time = std::chrono::steady_clock::now() + |
1552 | 16 | std::chrono::seconds(config::confirm_unused_remote_files_interval_sec); |
1553 | 16 | for (auto& t : tablets) { |
1554 | 7 | if (t.use_count() <= 1 // this means tablet has been dropped |
1555 | 7 | || t->cooldown_conf_unlocked().cooldown_replica_id != t->replica_id() || |
1556 | 7 | t->tablet_state() != TABLET_RUNNING) { |
1557 | 0 | continue; |
1558 | 0 | } |
1559 | 7 | collect_tablet_unused_remote_files(t.get(), req, buffer, num_files_in_buffer, |
1560 | 7 | _pending_remote_rowsets); |
1561 | 7 | if (num_files_in_buffer > 0 && (num_files_in_buffer > max_files_in_buffer || |
1562 | 0 | std::chrono::steady_clock::now() > next_confirm_time)) { |
1563 | 0 | confirm_and_remove_unused_remote_files(req, buffer, num_files_in_buffer); |
1564 | 0 | buffer.clear(); |
1565 | 0 | req.confirm_list.clear(); |
1566 | 0 | num_files_in_buffer = 0; |
1567 | 0 | next_confirm_time = |
1568 | 0 | std::chrono::steady_clock::now() + |
1569 | 0 | std::chrono::seconds(config::confirm_unused_remote_files_interval_sec); |
1570 | 0 | } |
1571 | 7 | } |
1572 | 16 | if (num_files_in_buffer > 0) { |
1573 | 0 | confirm_and_remove_unused_remote_files(req, buffer, num_files_in_buffer); |
1574 | 0 | } |
1575 | 16 | } |
1576 | | |
1577 | 6 | void StorageEngine::_cold_data_compaction_producer_callback() { |
1578 | 26 | while (!_stop_background_threads_latch.wait_for( |
1579 | 26 | std::chrono::seconds(config::cold_data_compaction_interval_sec))) { |
1580 | 20 | if (config::disable_auto_compaction || |
1581 | 20 | GlobalMemoryArbitrator::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { |
1582 | 0 | continue; |
1583 | 0 | } |
1584 | | |
1585 | 20 | std::unordered_set<int64_t> copied_tablet_submitted; |
1586 | 20 | { |
1587 | 20 | std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
1588 | 20 | copied_tablet_submitted = _cold_compaction_tablet_submitted; |
1589 | 20 | } |
1590 | 20 | int64_t n = config::cold_data_compaction_thread_num - copied_tablet_submitted.size(); |
1591 | 20 | if (n <= 0) { |
1592 | 0 | continue; |
1593 | 0 | } |
1594 | 983k | auto tablets = _tablet_manager->get_all_tablet([&copied_tablet_submitted](Tablet* t) { |
1595 | 983k | return t->tablet_meta()->cooldown_meta_id().initialized() && t->is_used() && |
1596 | 983k | t->tablet_state() == TABLET_RUNNING && |
1597 | 983k | !copied_tablet_submitted.contains(t->tablet_id()) && |
1598 | 983k | !t->tablet_meta()->tablet_schema()->disable_auto_compaction(); |
1599 | 983k | }); |
1600 | 20 | std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_compact; |
1601 | 20 | tablet_to_compact.reserve(n + 1); |
1602 | 20 | std::vector<std::pair<TabletSharedPtr, int64_t>> tablet_to_follow; |
1603 | 20 | tablet_to_follow.reserve(n + 1); |
1604 | | |
1605 | 20 | for (auto& t : tablets) { |
1606 | 7 | if (t->replica_id() == t->cooldown_conf_unlocked().cooldown_replica_id) { |
1607 | 7 | auto score = t->calc_cold_data_compaction_score(); |
1608 | 7 | if (score < config::cold_data_compaction_score_threshold) { |
1609 | 7 | continue; |
1610 | 7 | } |
1611 | 0 | tablet_to_compact.emplace_back(t, score); |
1612 | 0 | if (tablet_to_compact.size() > n) { |
1613 | 0 | std::sort(tablet_to_compact.begin(), tablet_to_compact.end(), |
1614 | 0 | [](auto& a, auto& b) { return a.second > b.second; }); |
1615 | 0 | tablet_to_compact.pop_back(); |
1616 | 0 | } |
1617 | 0 | continue; |
1618 | 7 | } |
1619 | | // else, need to follow |
1620 | 0 | { |
1621 | 0 | std::lock_guard lock(_running_cooldown_mutex); |
1622 | 0 | if (_running_cooldown_tablets.contains(t->table_id())) { |
1623 | | // already in cooldown queue |
1624 | 0 | continue; |
1625 | 0 | } |
1626 | 0 | } |
1627 | | // TODO(plat1ko): some avoidance strategy if failed to follow |
1628 | 0 | auto score = t->calc_cold_data_compaction_score(); |
1629 | 0 | tablet_to_follow.emplace_back(t, score); |
1630 | |
|
1631 | 0 | if (tablet_to_follow.size() > n) { |
1632 | 0 | std::sort(tablet_to_follow.begin(), tablet_to_follow.end(), |
1633 | 0 | [](auto& a, auto& b) { return a.second > b.second; }); |
1634 | 0 | tablet_to_follow.pop_back(); |
1635 | 0 | } |
1636 | 0 | } |
1637 | | |
1638 | 20 | for (auto& [tablet, score] : tablet_to_compact) { |
1639 | 0 | LOG(INFO) << "submit cold data compaction. tablet_id=" << tablet->tablet_id() |
1640 | 0 | << " score=" << score; |
1641 | 0 | static_cast<void>( |
1642 | 0 | _cold_data_compaction_thread_pool->submit_func([t = std::move(tablet), this]() { |
1643 | 0 | _handle_cold_data_compaction(std::move(t)); |
1644 | 0 | })); |
1645 | 0 | } |
1646 | | |
1647 | 20 | for (auto& [tablet, score] : tablet_to_follow) { |
1648 | 0 | LOG(INFO) << "submit to follow cooldown meta. tablet_id=" << tablet->tablet_id() |
1649 | 0 | << " score=" << score; |
1650 | 0 | static_cast<void>(_cold_data_compaction_thread_pool->submit_func( |
1651 | 0 | [t = std::move(tablet), this]() { _follow_cooldown_meta(std::move(t)); })); |
1652 | 0 | } |
1653 | 20 | } |
1654 | 6 | } |
1655 | | |
1656 | 0 | void StorageEngine::_handle_cold_data_compaction(TabletSharedPtr t) { |
1657 | 0 | auto compaction = std::make_shared<ColdDataCompaction>(*this, t); |
1658 | 0 | { |
1659 | 0 | std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
1660 | 0 | _cold_compaction_tablet_submitted.insert(t->tablet_id()); |
1661 | 0 | } |
1662 | 0 | Defer defer {[&] { |
1663 | 0 | std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
1664 | 0 | _cold_compaction_tablet_submitted.erase(t->tablet_id()); |
1665 | 0 | }}; |
1666 | 0 | std::unique_lock cold_compaction_lock(t->get_cold_compaction_lock(), std::try_to_lock); |
1667 | 0 | if (!cold_compaction_lock.owns_lock()) { |
1668 | 0 | LOG(WARNING) << "try cold_compaction_lock failed, tablet_id=" << t->tablet_id(); |
1669 | 0 | return; |
1670 | 0 | } |
1671 | 0 | _update_cumulative_compaction_policy(); |
1672 | 0 | if (t->get_cumulative_compaction_policy() == nullptr || |
1673 | 0 | t->get_cumulative_compaction_policy()->name() != t->tablet_meta()->compaction_policy()) { |
1674 | 0 | t->set_cumulative_compaction_policy( |
1675 | 0 | _cumulative_compaction_policies.at(t->tablet_meta()->compaction_policy())); |
1676 | 0 | } |
1677 | |
|
1678 | 0 | auto st = compaction->prepare_compact(); |
1679 | 0 | if (!st.ok()) { |
1680 | 0 | LOG(WARNING) << "failed to prepare cold data compaction. tablet_id=" << t->tablet_id() |
1681 | 0 | << " err=" << st; |
1682 | 0 | return; |
1683 | 0 | } |
1684 | | |
1685 | 0 | st = compaction->execute_compact(); |
1686 | 0 | if (!st.ok()) { |
1687 | 0 | LOG(WARNING) << "failed to execute cold data compaction. tablet_id=" << t->tablet_id() |
1688 | 0 | << " err=" << st; |
1689 | 0 | return; |
1690 | 0 | } |
1691 | 0 | } |
1692 | | |
1693 | 0 | void StorageEngine::_follow_cooldown_meta(TabletSharedPtr t) { |
1694 | 0 | { |
1695 | 0 | std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
1696 | 0 | _cold_compaction_tablet_submitted.insert(t->tablet_id()); |
1697 | 0 | } |
1698 | 0 | auto st = t->cooldown(); |
1699 | 0 | { |
1700 | 0 | std::lock_guard lock(_cold_compaction_tablet_submitted_mtx); |
1701 | 0 | _cold_compaction_tablet_submitted.erase(t->tablet_id()); |
1702 | 0 | } |
1703 | 0 | if (!st.ok()) { |
1704 | | // The cooldown of the replica may be relatively slow |
1705 | | // resulting in a short period of time where following cannot be successful |
1706 | 0 | LOG_EVERY_N(WARNING, 5) << "failed to cooldown. tablet_id=" << t->tablet_id() |
1707 | 0 | << " err=" << st; |
1708 | 0 | } |
1709 | 0 | } |
1710 | | |
1711 | | void StorageEngine::add_async_publish_task(int64_t partition_id, int64_t tablet_id, |
1712 | | int64_t publish_version, int64_t transaction_id, |
1713 | 2.05k | bool is_recovery, int64_t commit_tso) { |
1714 | 2.05k | if (!is_recovery) { |
1715 | 2.05k | bool exists = false; |
1716 | 2.05k | { |
1717 | 2.05k | std::shared_lock<std::shared_mutex> rlock(_async_publish_lock); |
1718 | 2.05k | if (auto tablet_iter = _async_publish_tasks.find(tablet_id); |
1719 | 2.05k | tablet_iter != _async_publish_tasks.end()) { |
1720 | 2.05k | if (auto iter = tablet_iter->second.find(publish_version); |
1721 | 2.05k | iter != tablet_iter->second.end()) { |
1722 | 20 | exists = true; |
1723 | 20 | } |
1724 | 2.05k | } |
1725 | 2.05k | } |
1726 | 2.05k | if (exists) { |
1727 | 20 | return; |
1728 | 20 | } |
1729 | 2.03k | TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); |
1730 | 2.03k | if (tablet == nullptr) { |
1731 | 0 | LOG(INFO) << "tablet may be dropped when add async publish task, tablet_id: " |
1732 | 0 | << tablet_id; |
1733 | 0 | return; |
1734 | 0 | } |
1735 | 2.03k | PendingPublishInfoPB pending_publish_info_pb; |
1736 | 2.03k | pending_publish_info_pb.set_partition_id(partition_id); |
1737 | 2.03k | pending_publish_info_pb.set_transaction_id(transaction_id); |
1738 | 2.03k | pending_publish_info_pb.set_commit_tso(commit_tso); |
1739 | 2.03k | static_cast<void>(TabletMetaManager::save_pending_publish_info( |
1740 | 2.03k | tablet->data_dir(), tablet->tablet_id(), publish_version, |
1741 | 2.03k | pending_publish_info_pb.SerializeAsString())); |
1742 | 2.03k | } |
1743 | 2.05k | LOG(INFO) << "add pending publish task, tablet_id: " << tablet_id |
1744 | 2.03k | << " version: " << publish_version << " txn_id:" << transaction_id |
1745 | 2.03k | << " is_recovery: " << is_recovery; |
1746 | 2.03k | std::unique_lock<std::shared_mutex> wlock(_async_publish_lock); |
1747 | 2.03k | _async_publish_tasks[tablet_id][publish_version] = {transaction_id, partition_id, commit_tso}; |
1748 | 2.03k | } |
1749 | | |
1750 | 3 | int64_t StorageEngine::get_pending_publish_min_version(int64_t tablet_id) { |
1751 | 3 | std::shared_lock<std::shared_mutex> rlock(_async_publish_lock); |
1752 | 3 | auto iter = _async_publish_tasks.find(tablet_id); |
1753 | 3 | if (iter == _async_publish_tasks.end()) { |
1754 | 0 | return INT64_MAX; |
1755 | 0 | } |
1756 | 3 | if (iter->second.empty()) { |
1757 | 0 | return INT64_MAX; |
1758 | 0 | } |
1759 | 3 | return iter->second.begin()->first; |
1760 | 3 | } |
1761 | | |
1762 | 314k | void StorageEngine::_process_async_publish() { |
1763 | | // tablet, publish_version |
1764 | 314k | std::vector<std::pair<TabletSharedPtr, int64_t>> need_removed_tasks; |
1765 | 314k | { |
1766 | 314k | std::unique_lock<std::shared_mutex> wlock(_async_publish_lock); |
1767 | 314k | for (auto tablet_iter = _async_publish_tasks.begin(); |
1768 | 314k | tablet_iter != _async_publish_tasks.end();) { |
1769 | 10 | if (tablet_iter->second.empty()) { |
1770 | 1 | tablet_iter = _async_publish_tasks.erase(tablet_iter); |
1771 | 1 | continue; |
1772 | 1 | } |
1773 | 9 | int64_t tablet_id = tablet_iter->first; |
1774 | 9 | TabletSharedPtr tablet = tablet_manager()->get_tablet(tablet_id); |
1775 | 9 | if (!tablet) { |
1776 | 1 | LOG(WARNING) << "tablet does not exist when async publush, tablet_id: " |
1777 | 1 | << tablet_id; |
1778 | 1 | tablet_iter = _async_publish_tasks.erase(tablet_iter); |
1779 | 1 | continue; |
1780 | 1 | } |
1781 | | |
1782 | 8 | auto task_iter = tablet_iter->second.begin(); |
1783 | 8 | int64_t version = task_iter->first; |
1784 | 8 | int64_t transaction_id = std::get<0>(task_iter->second); |
1785 | 8 | int64_t partition_id = std::get<1>(task_iter->second); |
1786 | 8 | int64_t commit_tso = std::get<2>(task_iter->second); |
1787 | 8 | int64_t max_version = tablet->max_version().second; |
1788 | | |
1789 | 8 | if (version <= max_version) { |
1790 | 6 | need_removed_tasks.emplace_back(tablet, version); |
1791 | 6 | tablet_iter->second.erase(task_iter); |
1792 | 6 | tablet_iter++; |
1793 | 6 | continue; |
1794 | 6 | } |
1795 | 2 | if (version != max_version + 1) { |
1796 | 1 | int32_t max_version_config = tablet->max_version_config(); |
1797 | | // Keep only the most recent versions |
1798 | 31 | while (tablet_iter->second.size() > max_version_config) { |
1799 | 30 | need_removed_tasks.emplace_back(tablet, version); |
1800 | 30 | task_iter = tablet_iter->second.erase(task_iter); |
1801 | 30 | version = task_iter->first; |
1802 | 30 | } |
1803 | 1 | tablet_iter++; |
1804 | 1 | continue; |
1805 | 1 | } |
1806 | | |
1807 | 1 | auto async_publish_task = std::make_shared<AsyncTabletPublishTask>( |
1808 | 1 | *this, tablet, partition_id, transaction_id, version, commit_tso); |
1809 | 1 | static_cast<void>(_tablet_publish_txn_thread_pool->submit_func( |
1810 | 1 | [=]() { async_publish_task->handle(); })); |
1811 | 1 | tablet_iter->second.erase(task_iter); |
1812 | 1 | need_removed_tasks.emplace_back(tablet, version); |
1813 | 1 | tablet_iter++; |
1814 | 1 | } |
1815 | 314k | } |
1816 | 314k | for (auto& [tablet, publish_version] : need_removed_tasks) { |
1817 | 37 | static_cast<void>(TabletMetaManager::remove_pending_publish_info( |
1818 | 37 | tablet->data_dir(), tablet->tablet_id(), publish_version)); |
1819 | 37 | } |
1820 | 314k | } |
1821 | | |
1822 | 6 | void StorageEngine::_async_publish_callback() { |
1823 | 314k | while (!_stop_background_threads_latch.wait_for(std::chrono::milliseconds(30))) { |
1824 | 314k | _process_async_publish(); |
1825 | 314k | } |
1826 | 6 | } |
1827 | | |
1828 | 6 | void StorageEngine::_check_tablet_delete_bitmap_score_callback() { |
1829 | 6 | LOG(INFO) << "try to start check tablet delete bitmap score!"; |
1830 | 34 | while (!_stop_background_threads_latch.wait_for( |
1831 | 34 | std::chrono::seconds(config::check_tablet_delete_bitmap_interval_seconds))) { |
1832 | 28 | if (!config::enable_check_tablet_delete_bitmap_score) { |
1833 | 0 | return; |
1834 | 0 | } |
1835 | 28 | uint64_t max_delete_bitmap_score = 0; |
1836 | 28 | uint64_t max_base_rowset_delete_bitmap_score = 0; |
1837 | 28 | _tablet_manager->get_topn_tablet_delete_bitmap_score(&max_delete_bitmap_score, |
1838 | 28 | &max_base_rowset_delete_bitmap_score); |
1839 | 28 | if (max_delete_bitmap_score > 0) { |
1840 | 16 | _tablet_max_delete_bitmap_score_metrics->set_value(max_delete_bitmap_score); |
1841 | 16 | } |
1842 | 28 | if (max_base_rowset_delete_bitmap_score > 0) { |
1843 | 16 | _tablet_max_base_rowset_delete_bitmap_score_metrics->set_value( |
1844 | 16 | max_base_rowset_delete_bitmap_score); |
1845 | 16 | } |
1846 | 28 | } |
1847 | 6 | } |
1848 | | #include "common/compile_check_end.h" |
1849 | | } // namespace doris |