be/src/cloud/cloud_tablet_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "cloud/cloud_tablet_mgr.h" |
19 | | |
20 | | #include <bthread/countdown_event.h> |
21 | | |
22 | | #include <chrono> |
23 | | |
24 | | #include "cloud/cloud_cluster_info.h" |
25 | | #include "cloud/cloud_meta_mgr.h" |
26 | | #include "cloud/cloud_storage_engine.h" |
27 | | #include "cloud/cloud_tablet.h" |
28 | | #include "cloud/config.h" |
29 | | #include "common/status.h" |
30 | | #include "cpp/sync_point.h" |
31 | | #include "runtime/memory/cache_policy.h" |
32 | | #include "util/debug_points.h" |
33 | | #include "util/lru_cache.h" |
34 | | #include "util/stack_util.h" |
35 | | |
36 | | namespace doris { |
37 | | uint64_t g_tablet_report_inactive_duration_ms = 0; |
38 | | bvar::Adder<uint64_t> g_base_compaction_not_frozen_tablet_num( |
39 | | "base_compaction_not_frozen_tablet_num"); |
40 | | bvar::Adder<uint64_t> g_cumu_compaction_not_frozen_tablet_num( |
41 | | "cumu_compaction_not_frozen_tablet_num"); |
42 | | namespace { |
43 | | |
44 | | // port from |
45 | | // https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go |
46 | | template <typename Key, typename Val> |
47 | | class SingleFlight { |
48 | | public: |
49 | 9 | SingleFlight() = default; |
50 | | |
51 | | SingleFlight(const SingleFlight&) = delete; |
52 | | void operator=(const SingleFlight&) = delete; |
53 | | |
54 | | using Loader = std::function<Val(const Key&)>; |
55 | | |
56 | | // Do executes and returns the results of the given function, making |
57 | | // sure that only one execution is in-flight for a given key at a |
58 | | // time. If a duplicate comes in, the duplicate caller waits for the |
59 | | // original to complete and receives the same results. |
60 | 128k | Val load(const Key& key, Loader loader) { |
61 | 128k | std::unique_lock lock(_call_map_mtx); |
62 | | |
63 | 128k | auto it = _call_map.find(key); |
64 | 128k | if (it != _call_map.end()) { |
65 | 6 | auto call = it->second; |
66 | 6 | lock.unlock(); |
67 | 6 | if (int ec = call->event.wait(); ec != 0) { |
68 | 0 | throw std::system_error(std::error_code(ec, std::system_category()), |
69 | 0 | "CountdownEvent wait failed"); |
70 | 0 | } |
71 | 6 | return call->val; |
72 | 6 | } |
73 | 128k | auto call = std::make_shared<Call>(); |
74 | 128k | _call_map.emplace(key, call); |
75 | 128k | lock.unlock(); |
76 | | |
77 | 128k | call->val = loader(key); |
78 | 128k | call->event.signal(); |
79 | | |
80 | 128k | lock.lock(); |
81 | 128k | _call_map.erase(key); |
82 | 128k | lock.unlock(); |
83 | | |
84 | 128k | return call->val; |
85 | 128k | } |
86 | | |
87 | | private: |
88 | | // `Call` is an in-flight or completed `load` call |
89 | | struct Call { |
90 | | bthread::CountdownEvent event; |
91 | | Val val; |
92 | | }; |
93 | | |
94 | | std::mutex _call_map_mtx; |
95 | | std::unordered_map<Key, std::shared_ptr<Call>> _call_map; |
96 | | }; |
97 | | |
98 | | // tablet_id -> load tablet function |
99 | | SingleFlight<int64_t, Result<std::shared_ptr<CloudTablet>>> s_singleflight_load_tablet; |
100 | | |
101 | | } // namespace |
102 | | |
103 | | // tablet_id -> cached tablet |
104 | | // This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle. |
105 | | // It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order. |
106 | | // TODO(plat1ko): multi shard to increase concurrency |
107 | | class CloudTabletMgr::TabletMap { |
108 | | public: |
109 | 127k | void put(std::shared_ptr<CloudTablet> tablet) { |
110 | 127k | std::lock_guard lock(_mtx); |
111 | 127k | _map[tablet->tablet_id()] = std::move(tablet); |
112 | 127k | } |
113 | | |
114 | 28.4k | void erase(CloudTablet* tablet) { |
115 | 28.4k | std::lock_guard lock(_mtx); |
116 | 28.4k | auto it = _map.find(tablet->tablet_id()); |
117 | | // According to the implementation of `LRUCache`, `deleter` may be called after a tablet |
118 | | // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet |
119 | | // instance to be erased is the same one in the map. |
120 | 28.6k | if (it != _map.end() && it->second.get() == tablet) { |
121 | 28.6k | _map.erase(it); |
122 | 28.6k | } |
123 | 28.4k | } |
124 | | |
125 | 0 | std::shared_ptr<CloudTablet> get(int64_t tablet_id) { |
126 | 0 | std::lock_guard lock(_mtx); |
127 | 0 | if (auto it = _map.find(tablet_id); it != _map.end()) { |
128 | 0 | return it->second; |
129 | 0 | } |
130 | 0 | return nullptr; |
131 | 0 | } |
132 | | |
133 | 18.5k | size_t size() { return _map.size(); } |
134 | | |
135 | 18.5k | void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) { |
136 | 18.5k | std::lock_guard lock(_mtx); |
137 | 690M | for (auto& [_, tablet] : _map) { |
138 | 690M | visitor(tablet); |
139 | 690M | } |
140 | 18.5k | } |
141 | | |
142 | | private: |
143 | | std::mutex _mtx; |
144 | | std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map; |
145 | | }; |
146 | | |
147 | | // TODO(plat1ko): Prune cache |
148 | | CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine) |
149 | 135 | : _engine(engine), |
150 | 135 | _tablet_map(std::make_unique<TabletMap>()), |
151 | 135 | _cache(std::make_unique<LRUCachePolicy>( |
152 | 135 | CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity, |
153 | 135 | LRUCacheType::NUMBER, /*sweep time*/ 0, config::tablet_cache_shards, |
154 | 135 | /*element_count_capacity*/ 0, /*enable_prune*/ false, |
155 | 135 | /*is_lru_k*/ false)) {} |
156 | | |
157 | 134 | CloudTabletMgr::~CloudTabletMgr() = default; |
158 | | |
159 | 3.23M | void set_tablet_access_time_ms(CloudTablet* tablet) { |
160 | 3.23M | using namespace std::chrono; |
161 | 3.23M | int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
162 | 3.23M | tablet->last_access_time_ms = now; |
163 | 3.23M | } |
164 | | |
165 | | Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data, |
166 | | bool sync_delete_bitmap, |
167 | | SyncRowsetStats* sync_stats, |
168 | | bool force_use_only_cached, |
169 | 1.60M | bool cache_on_miss) { |
170 | 1.60M | DBUG_EXECUTE_IF("CloudTabletMgr::get_tablet.block", DBUG_BLOCK); |
171 | | // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr` |
172 | 1.60M | class Value : public LRUCacheValueBase { |
173 | 1.60M | public: |
174 | 1.60M | Value(const std::shared_ptr<CloudTablet>& tablet, TabletMap& tablet_map) |
175 | 1.60M | : tablet(tablet), tablet_map(tablet_map) {} |
176 | 1.60M | ~Value() override { tablet_map.erase(tablet.get()); } |
177 | | |
178 | | // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value` |
179 | | // only requires a reference. |
180 | 1.60M | std::shared_ptr<CloudTablet> tablet; |
181 | 1.60M | TabletMap& tablet_map; |
182 | 1.60M | }; |
183 | | |
184 | 18.4E | VLOG_DEBUG << "get_tablet tablet_id=" << tablet_id << " stack: " << get_stack_trace(); |
185 | | |
186 | 1.60M | auto tablet_id_str = std::to_string(tablet_id); |
187 | 1.60M | CacheKey key(tablet_id_str); |
188 | 1.60M | auto* handle = _cache->lookup(key); |
189 | | |
190 | 1.60M | if (handle == nullptr) { |
191 | 128k | if (force_use_only_cached) { |
192 | 0 | LOG(INFO) << "tablet=" << tablet_id |
193 | 0 | << "does not exists in local tablet cache, because param " |
194 | 0 | "force_use_only_cached=true, " |
195 | 0 | "treat it as an error"; |
196 | 0 | return ResultError(Status::InternalError( |
197 | 0 | "tablet={} does not exists in local tablet cache, because param " |
198 | 0 | "force_use_only_cached=true, " |
199 | 0 | "treat it as an error", |
200 | 0 | tablet_id)); |
201 | 0 | } |
202 | 128k | TEST_SYNC_POINT("CloudTabletMgr::get_tablet.not_found_in_cache"); |
203 | 128k | if (sync_stats) { |
204 | 17.6k | ++sync_stats->tablet_meta_cache_miss; |
205 | 17.6k | } |
206 | | // Insert into cache and tablet_map inside SingleFlight lambda to ensure |
207 | | // only the leader caller does this. Moving these outside the lambda causes |
208 | | // a race condition: when multiple concurrent callers share the same CloudTablet* |
209 | | // from SingleFlight, each creates a competing LRU cache entry. Delayed Value |
210 | | // destructors then erase the tablet_map entry (the raw pointer safety check |
211 | | // passes since all callers share the same pointer), and the tablet permanently |
212 | | // disappears from tablet_map. Subsequent get_tablet() calls hit the LRU cache |
213 | | // directly (cache hit path) which never re-inserts into tablet_map, making the |
214 | | // tablet invisible to the compaction scheduler. |
215 | 128k | auto load_tablet = [this, &key, warmup_data, sync_delete_bitmap, sync_stats, cache_on_miss]( |
216 | 128k | int64_t tablet_id) -> Result<std::shared_ptr<CloudTablet>> { |
217 | 128k | TabletMetaSharedPtr tablet_meta; |
218 | 128k | auto start = std::chrono::steady_clock::now(); |
219 | 128k | auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta); |
220 | 128k | auto end = std::chrono::steady_clock::now(); |
221 | 128k | if (sync_stats) { |
222 | 17.6k | sync_stats->get_remote_tablet_meta_rpc_ns += |
223 | 17.6k | std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count(); |
224 | 17.6k | } |
225 | 128k | if (!st.ok()) { |
226 | 110 | LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st; |
227 | 110 | return ResultError(st); |
228 | 110 | } |
229 | | |
230 | 128k | auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta)); |
231 | | // MUST sync stats to let compaction scheduler work correctly |
232 | 128k | SyncOptions options; |
233 | 128k | options.warmup_delta_data = warmup_data; |
234 | 128k | options.sync_delete_bitmap = sync_delete_bitmap; |
235 | 128k | st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), options, sync_stats); |
236 | 128k | if (!st.ok()) { |
237 | 0 | LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st; |
238 | 0 | return ResultError(st); |
239 | 0 | } |
240 | | |
241 | 128k | if (!cache_on_miss) { |
242 | 0 | set_tablet_access_time_ms(tablet.get()); |
243 | 0 | return tablet; |
244 | 0 | } |
245 | | |
246 | 128k | auto value = std::make_unique<Value>(tablet, *_tablet_map); |
247 | 128k | auto* insert_handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet), |
248 | 128k | CachePriority::NORMAL); |
249 | 128k | auto ret = std::shared_ptr<CloudTablet>(tablet.get(), |
250 | 128k | [this, insert_handle](CloudTablet* tablet_ptr) { |
251 | 128k | set_tablet_access_time_ms(tablet_ptr); |
252 | 128k | _cache->release(insert_handle); |
253 | 128k | }); |
254 | 128k | _tablet_map->put(std::move(tablet)); |
255 | 128k | return ret; |
256 | 128k | }; |
257 | | |
258 | 128k | auto load_result = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet)); |
259 | 128k | if (!load_result.has_value()) { |
260 | 110 | return ResultError(Status::InternalError("failed to get tablet {}, msg={}", tablet_id, |
261 | 110 | load_result.error())); |
262 | 110 | } |
263 | 128k | auto tablet = load_result.value(); |
264 | 128k | set_tablet_access_time_ms(tablet.get()); |
265 | 128k | return tablet; |
266 | 128k | } |
267 | 1.48M | if (sync_stats) { |
268 | 1.22M | ++sync_stats->tablet_meta_cache_hit; |
269 | 1.22M | } |
270 | 1.48M | CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get(); |
271 | 1.48M | set_tablet_access_time_ms(tablet_raw_ptr); |
272 | 1.49M | auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, handle](CloudTablet* tablet) { |
273 | 1.49M | set_tablet_access_time_ms(tablet); |
274 | 1.49M | _cache->release(handle); |
275 | 1.49M | }); |
276 | 1.48M | return tablet; |
277 | 1.60M | } |
278 | | |
279 | 0 | bool CloudTabletMgr::peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) { |
280 | 0 | if (tablet_meta == nullptr) { |
281 | 0 | return false; |
282 | 0 | } |
283 | 0 | auto tablet = _tablet_map->get(tablet_id); |
284 | 0 | if (!tablet) { |
285 | 0 | return false; |
286 | 0 | } |
287 | 0 | *tablet_meta = tablet->tablet_meta(); |
288 | 0 | return true; |
289 | 0 | } |
290 | | |
291 | 315 | void CloudTabletMgr::erase_tablet(int64_t tablet_id) { |
292 | 315 | auto tablet_id_str = std::to_string(tablet_id); |
293 | 315 | CacheKey key(tablet_id_str.data(), tablet_id_str.size()); |
294 | 315 | _cache->erase(key); |
295 | 315 | } |
296 | | |
297 | 14 | void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) { |
298 | 14 | LOG_INFO("begin to vacuum stale rowsets"); |
299 | 14 | std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum; |
300 | 14 | tablets_to_vacuum.reserve(_tablet_map->size()); |
301 | 831k | _tablet_map->traverse([&tablets_to_vacuum](auto&& t) { |
302 | 831k | if (t->has_stale_rowsets()) { |
303 | 9.51k | tablets_to_vacuum.push_back(t); |
304 | 9.51k | } |
305 | 831k | }); |
306 | 14 | int num_vacuumed = 0; |
307 | 9.51k | for (auto& t : tablets_to_vacuum) { |
308 | 9.51k | if (stop_latch.count() <= 0) { |
309 | 0 | break; |
310 | 0 | } |
311 | | |
312 | 9.51k | num_vacuumed += t->delete_expired_stale_rowsets(); |
313 | 9.51k | } |
314 | 14 | LOG_INFO("finish vacuum stale rowsets") |
315 | 14 | .tag("num_vacuumed", num_vacuumed) |
316 | 14 | .tag("num_tablets", tablets_to_vacuum.size()); |
317 | | |
318 | 14 | { |
319 | 14 | LOG_INFO("begin to remove unused rowsets"); |
320 | 14 | std::vector<std::shared_ptr<CloudTablet>> tablets_to_remove_unused_rowsets; |
321 | 14 | tablets_to_remove_unused_rowsets.reserve(_tablet_map->size()); |
322 | 831k | _tablet_map->traverse([&tablets_to_remove_unused_rowsets](auto&& t) { |
323 | 831k | if (t->need_remove_unused_rowsets()) { |
324 | 4.96k | tablets_to_remove_unused_rowsets.push_back(t); |
325 | 4.96k | } |
326 | 831k | }); |
327 | 4.96k | for (auto& t : tablets_to_remove_unused_rowsets) { |
328 | 4.96k | t->remove_unused_rowsets(); |
329 | 4.96k | } |
330 | 14 | LOG_INFO("finish remove unused rowsets") |
331 | 14 | .tag("num_tablets", tablets_to_remove_unused_rowsets.size()); |
332 | 14 | if (config::enable_check_agg_and_remove_pre_rowsets_delete_bitmap) { |
333 | 0 | int64_t max_useless_rowset_count = 0; |
334 | 0 | int64_t tablet_id_with_max_useless_rowset_count = 0; |
335 | 0 | int64_t max_useless_rowset_version_count = 0; |
336 | 0 | int64_t tablet_id_with_max_useless_rowset_version_count = 0; |
337 | 0 | OlapStopWatch watch; |
338 | 0 | _tablet_map->traverse([&](auto&& tablet) { |
339 | 0 | int64_t useless_rowset_count = 0; |
340 | 0 | int64_t useless_rowset_version_count = 0; |
341 | 0 | tablet->check_agg_delete_bitmap_for_stale_rowsets(useless_rowset_count, |
342 | 0 | useless_rowset_version_count); |
343 | 0 | if (useless_rowset_count > max_useless_rowset_count) { |
344 | 0 | max_useless_rowset_count = useless_rowset_count; |
345 | 0 | tablet_id_with_max_useless_rowset_count = tablet->tablet_id(); |
346 | 0 | } |
347 | 0 | if (useless_rowset_version_count > max_useless_rowset_version_count) { |
348 | 0 | max_useless_rowset_version_count = useless_rowset_version_count; |
349 | 0 | tablet_id_with_max_useless_rowset_version_count = tablet->tablet_id(); |
350 | 0 | } |
351 | 0 | }); |
352 | 0 | g_max_rowsets_with_useless_delete_bitmap.set_value(max_useless_rowset_count); |
353 | 0 | g_max_rowsets_with_useless_delete_bitmap_version.set_value( |
354 | 0 | max_useless_rowset_version_count); |
355 | 0 | LOG(INFO) << "finish check_agg_delete_bitmap_for_stale_rowsets, cost(us)=" |
356 | 0 | << watch.get_elapse_time_us() |
357 | 0 | << ". max useless rowset count=" << max_useless_rowset_count |
358 | 0 | << ", tablet_id=" << tablet_id_with_max_useless_rowset_count |
359 | 0 | << ", max useless rowset version count=" << max_useless_rowset_version_count |
360 | 0 | << ", tablet_id=" << tablet_id_with_max_useless_rowset_version_count; |
361 | 0 | } |
362 | 14 | } |
363 | 14 | } |
364 | | |
365 | 18.5k | std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() { |
366 | 18.5k | std::vector<std::weak_ptr<CloudTablet>> weak_tablets; |
367 | 18.5k | weak_tablets.reserve(_tablet_map->size()); |
368 | 689M | _tablet_map->traverse([&weak_tablets](auto& t) { weak_tablets.push_back(t); }); |
369 | 18.5k | return weak_tablets; |
370 | 18.5k | } |
371 | | |
372 | 6 | void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) { |
373 | 6 | LOG_INFO("begin to sync tablets"); |
374 | 6 | int64_t last_sync_time_bound = ::time(nullptr) - config::tablet_sync_interval_s; |
375 | | |
376 | 6 | auto weak_tablets = get_weak_tablets(); |
377 | | |
378 | | // sort by last_sync_time |
379 | 1.10M | static auto cmp = [](const auto& a, const auto& b) { return a.first < b.first; }; |
380 | 6 | std::multiset<std::pair<int64_t, std::weak_ptr<CloudTablet>>, decltype(cmp)> |
381 | 6 | sync_time_tablet_set(cmp); |
382 | | |
383 | 335k | for (auto& weak_tablet : weak_tablets) { |
384 | 335k | if (auto tablet = weak_tablet.lock()) { |
385 | 335k | int64_t last_sync_time = tablet->last_sync_time_s; |
386 | 335k | if (last_sync_time <= last_sync_time_bound) { |
387 | 57.0k | sync_time_tablet_set.emplace(last_sync_time, weak_tablet); |
388 | 57.0k | } |
389 | 335k | } |
390 | 335k | } |
391 | | |
392 | 6 | int num_sync = 0; |
393 | 57.0k | for (auto&& [_, weak_tablet] : sync_time_tablet_set) { |
394 | 57.0k | if (stop_latch.count() <= 0) { |
395 | 0 | break; |
396 | 0 | } |
397 | | |
398 | 57.0k | if (auto tablet = weak_tablet.lock()) { |
399 | 57.0k | if (tablet->last_sync_time_s > last_sync_time_bound) { |
400 | 0 | continue; |
401 | 0 | } |
402 | | |
403 | 57.0k | ++num_sync; |
404 | 57.0k | auto st = tablet->sync_meta(); |
405 | 57.0k | if (!st) { |
406 | 0 | LOG_WARNING("failed to sync tablet meta {}", tablet->tablet_id()).error(st); |
407 | 0 | if (st.is<ErrorCode::NOT_FOUND>()) { |
408 | 0 | continue; |
409 | 0 | } |
410 | 0 | } |
411 | 57.0k | SyncOptions options; |
412 | 57.0k | options.query_version = -1; |
413 | 57.0k | options.merge_schema = true; |
414 | 57.0k | st = tablet->sync_rowsets(options); |
415 | 57.0k | if (!st) { |
416 | 0 | LOG_WARNING("failed to sync tablet rowsets {}", tablet->tablet_id()).error(st); |
417 | 0 | } |
418 | 57.0k | } |
419 | 57.0k | } |
420 | 6 | LOG_INFO("finish sync tablets").tag("num_sync", num_sync); |
421 | 6 | } |
422 | | |
423 | | Status CloudTabletMgr::get_topn_tablets_to_compact( |
424 | | int n, CompactionType compaction_type, const std::function<bool(CloudTablet*)>& filter_out, |
425 | 18.4k | std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t* max_score) { |
426 | 18.4k | DCHECK(compaction_type == CompactionType::BASE_COMPACTION || |
427 | 18.4k | compaction_type == CompactionType::CUMULATIVE_COMPACTION); |
428 | 18.4k | *max_score = 0; |
429 | 18.4k | int64_t max_score_tablet_id = 0; |
430 | | // clang-format off |
431 | 683M | auto score = [compaction_type](CloudTablet* t) { |
432 | 683M | return compaction_type == CompactionType::BASE_COMPACTION ? t->get_cloud_base_compaction_score() |
433 | 683M | : compaction_type == CompactionType::CUMULATIVE_COMPACTION ? t->get_cloud_cumu_compaction_score() |
434 | 614M | : 0; |
435 | 683M | }; |
436 | | |
437 | 18.4k | using namespace std::chrono; |
438 | 18.4k | auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
439 | 544M | auto skip = [now, compaction_type](CloudTablet* t) { |
440 | 544M | auto* cloud_cluster_info = static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info()); |
441 | | |
442 | 544M | if (config::enable_standby_passive_compaction && cloud_cluster_info->is_in_standby()) { |
443 | 0 | if (t->fetch_add_approximate_num_rowsets(0) < config::max_tablet_version_num * config::standby_compaction_version_ratio) { |
444 | 0 | return true; |
445 | 0 | } |
446 | 0 | } |
447 | | |
448 | | // Compaction read-write separation: skip tablets that should be compacted by other clusters. |
449 | | // Placed after standby check so standby invariants (version count threshold) are preserved. |
450 | 544M | if (cloud_cluster_info->should_skip_compaction(t)) { |
451 | 0 | return true; |
452 | 0 | } |
453 | | |
454 | 544M | int32_t max_version_config = t->max_version_config(); |
455 | 544M | if (compaction_type == CompactionType::BASE_COMPACTION) { |
456 | 67.8M | bool is_recent_failure = now - t->last_base_compaction_failure_time() < config::min_compaction_failure_interval_ms; |
457 | 67.8M | bool is_frozen = (now - t->last_load_time_ms > config::compaction_load_max_freeze_interval_s * 1000 |
458 | 67.8M | && now - t->last_base_compaction_success_time_ms < config::base_compaction_freeze_interval_s * 1000 |
459 | 67.8M | && t->fetch_add_approximate_num_rowsets(0) < max_version_config / 2); |
460 | 67.8M | g_base_compaction_not_frozen_tablet_num << !is_frozen; |
461 | 67.8M | return is_recent_failure || is_frozen; |
462 | 67.8M | } |
463 | | |
464 | | // If tablet has too many rowsets but not be compacted for a long time, compaction should be performed |
465 | | // regardless of whether there is a load job recently. |
466 | 476M | bool is_recent_failure = now - t->last_cumu_compaction_failure_time() < config::min_compaction_failure_interval_ms; |
467 | 476M | bool is_recent_no_suitable_version = now - t->last_cumu_no_suitable_version_ms < config::min_compaction_failure_interval_ms; |
468 | 476M | bool is_frozen = (now - t->last_load_time_ms > config::compaction_load_max_freeze_interval_s * 1000 |
469 | 476M | && now - t->last_cumu_compaction_success_time_ms < config::cumu_compaction_interval_s * 1000 |
470 | 476M | && t->fetch_add_approximate_num_rowsets(0) < max_version_config / 2); |
471 | 476M | g_cumu_compaction_not_frozen_tablet_num << !is_frozen; |
472 | 476M | return is_recent_failure || is_recent_no_suitable_version || is_frozen; |
473 | 544M | }; |
474 | | // We don't schedule tablets that are disabled for compaction |
475 | 549M | auto disable = [](CloudTablet* t) { return t->tablet_meta()->tablet_schema()->disable_auto_compaction(); }; |
476 | | |
477 | 18.4k | auto [num_filtered, num_disabled, num_skipped] = std::make_tuple(0, 0, 0); |
478 | | |
479 | 18.4k | auto weak_tablets = get_weak_tablets(); |
480 | 18.4k | std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf; |
481 | 18.4k | buf.reserve(n + 1); |
482 | 683M | for (auto& weak_tablet : weak_tablets) { |
483 | 683M | auto t = weak_tablet.lock(); |
484 | 683M | if (t == nullptr) { continue; } |
485 | | |
486 | 683M | int64_t s = score(t.get()); |
487 | 683M | if (s <= 0) { continue; } |
488 | 549M | if (s > *max_score) { |
489 | 106k | max_score_tablet_id = t->tablet_id(); |
490 | 106k | *max_score = s; |
491 | 106k | } |
492 | | |
493 | 549M | if (filter_out(t.get())) { ++num_filtered; continue; } |
494 | 549M | if (disable(t.get())) { ++num_disabled; continue; } |
495 | 544M | if (skip(t.get())) { ++num_skipped; continue; } |
496 | | |
497 | 540M | buf.emplace_back(std::move(t), s); |
498 | 6.83G | std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; }); |
499 | 540M | if (buf.size() > n) { buf.pop_back(); } |
500 | 540M | } |
501 | | |
502 | 18.4k | LOG_EVERY_N(INFO, 1000) << "get_topn_compaction_score, n=" << n << " type=" << compaction_type |
503 | 20 | << " num_tablets=" << weak_tablets.size() << " num_skipped=" << num_skipped |
504 | 20 | << " num_disabled=" << num_disabled << " num_filtered=" << num_filtered |
505 | 20 | << " max_score=" << *max_score << " max_score_tablet=" << max_score_tablet_id |
506 | 102 | << " tablets=[" << [&buf] { std::stringstream ss; for (auto& i : buf) ss << i.first->tablet_id() << ":" << i.second << ","; return ss.str(); }() << "]" |
507 | 18.4k | ; |
508 | | // clang-format on |
509 | | |
510 | 18.4k | tablets->clear(); |
511 | 18.4k | tablets->reserve(n + 1); |
512 | 110k | for (auto& [t, _] : buf) { |
513 | 110k | tablets->emplace_back(std::move(t)); |
514 | 110k | } |
515 | | |
516 | 18.4k | return Status::OK(); |
517 | 18.4k | } |
518 | | |
519 | | void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info, |
520 | 80 | uint64_t* tablet_num) { |
521 | 80 | DCHECK(tablets_info != nullptr); |
522 | 80 | VLOG_NOTICE << "begin to build all report cloud tablets info"; |
523 | | |
524 | 80 | HistogramStat tablet_version_num_hist; |
525 | | |
526 | 4.50M | auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) { |
527 | 4.50M | auto tablet = tablet_wk.lock(); |
528 | 4.50M | if (!tablet) return; |
529 | 4.50M | (*tablet_num)++; |
530 | 4.50M | TTabletInfo tablet_info; |
531 | 4.50M | tablet->build_tablet_report_info(&tablet_info); |
532 | 4.50M | using namespace std::chrono; |
533 | 4.50M | int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count(); |
534 | 4.50M | if (now - g_tablet_report_inactive_duration_ms < tablet->last_access_time_ms) { |
535 | | // the tablet is still being accessed and used in recently, so not report it |
536 | 4.50M | return; |
537 | 4.50M | } |
538 | 0 | auto& t_tablet = (*tablets_info)[tablet->tablet_id()]; |
539 | | // On the cloud, a specific BE has only one tablet replica; |
540 | | // there are no multiple replicas for a specific BE. |
541 | | // This is only to reuse the non-cloud report protocol. |
542 | 0 | tablet_version_num_hist.add(tablet_info.total_version_count); |
543 | 0 | t_tablet.tablet_infos.emplace_back(std::move(tablet_info)); |
544 | 0 | }; |
545 | | |
546 | 80 | auto weak_tablets = get_weak_tablets(); |
547 | 80 | std::for_each(weak_tablets.begin(), weak_tablets.end(), handler); |
548 | | |
549 | 80 | DorisMetrics::instance()->tablet_version_num_distribution->set_histogram( |
550 | 80 | tablet_version_num_hist); |
551 | 80 | LOG(INFO) << "success to build all cloud report tablets info. all_tablet_count=" << *tablet_num |
552 | 80 | << " exceed drop time limit count=" << tablets_info->size(); |
553 | 80 | } |
554 | | |
555 | 0 | void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info) { |
556 | 0 | auto weak_tablets = get_weak_tablets(); |
557 | 0 | for (auto& weak_tablet : weak_tablets) { |
558 | 0 | auto tablet = weak_tablet.lock(); |
559 | 0 | if (tablet == nullptr) { |
560 | 0 | continue; |
561 | 0 | } |
562 | 0 | if (tablets_info->size() >= num_tablets) { |
563 | 0 | return; |
564 | 0 | } |
565 | 0 | tablets_info->push_back(tablet->get_tablet_info()); |
566 | 0 | } |
567 | 0 | } |
568 | | |
569 | | void CloudTabletMgr::get_topn_tablet_delete_bitmap_score( |
570 | 14 | uint64_t* max_delete_bitmap_score, uint64_t* max_base_rowset_delete_bitmap_score) { |
571 | 14 | int64_t max_delete_bitmap_score_tablet_id = 0; |
572 | 14 | OlapStopWatch watch; |
573 | 14 | uint64_t total_delete_map_count = 0; |
574 | 14 | int64_t max_base_rowset_delete_bitmap_score_tablet_id = 0; |
575 | 14 | int n = config::check_tablet_delete_bitmap_score_top_n; |
576 | 14 | std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf; |
577 | 14 | buf.reserve(n + 1); |
578 | 829k | auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) { |
579 | 829k | auto t = tablet_wk.lock(); |
580 | 829k | if (!t) return; |
581 | 829k | uint64_t delete_bitmap_count = |
582 | 829k | t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count(); |
583 | 829k | total_delete_map_count += delete_bitmap_count; |
584 | 829k | if (delete_bitmap_count > *max_delete_bitmap_score) { |
585 | 79 | max_delete_bitmap_score_tablet_id = t->tablet_id(); |
586 | 79 | *max_delete_bitmap_score = delete_bitmap_count; |
587 | 79 | } |
588 | 829k | buf.emplace_back(std::move(t), delete_bitmap_count); |
589 | 16.5M | std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; }); |
590 | 829k | if (buf.size() > n) { |
591 | 829k | buf.pop_back(); |
592 | 829k | } |
593 | 829k | }; |
594 | 14 | auto weak_tablets = get_weak_tablets(); |
595 | 14 | std::for_each(weak_tablets.begin(), weak_tablets.end(), handler); |
596 | 140 | for (auto& [t, _] : buf) { |
597 | 140 | t->get_base_rowset_delete_bitmap_count(max_base_rowset_delete_bitmap_score, |
598 | 140 | &max_base_rowset_delete_bitmap_score_tablet_id); |
599 | 140 | } |
600 | 14 | std::stringstream ss; |
601 | 140 | for (auto& i : buf) { |
602 | 140 | ss << i.first->tablet_id() << ": " << i.second << ", "; |
603 | 140 | } |
604 | 14 | LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n |
605 | 14 | << ", tablet size=" << weak_tablets.size() |
606 | 14 | << ", total_delete_map_count=" << total_delete_map_count |
607 | 14 | << ", cost(us)=" << watch.get_elapse_time_us() |
608 | 14 | << ", max_delete_bitmap_score=" << *max_delete_bitmap_score |
609 | 14 | << ", max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id |
610 | 14 | << ", max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score |
611 | 14 | << ", max_base_rowset_delete_bitmap_score_tablet_id=" |
612 | 14 | << max_base_rowset_delete_bitmap_score_tablet_id << ", tablets=[" << ss.str() << "]"; |
613 | 14 | } |
614 | | |
615 | 2 | std::vector<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_all_tablet() { |
616 | 2 | std::vector<std::shared_ptr<CloudTablet>> tablets; |
617 | 2 | tablets.reserve(_tablet_map->size()); |
618 | 12.4k | _tablet_map->traverse([&tablets](auto& t) { tablets.push_back(t); }); |
619 | 2 | return tablets; |
620 | 2 | } |
621 | | |
622 | 2 | void CloudTabletMgr::put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet) { |
623 | 2 | _tablet_map->put(tablet); |
624 | 2 | } |
625 | | |
626 | | } // namespace doris |