Coverage Report

Created: 2026-04-22 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_tablet_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet_mgr.h"
19
20
#include <bthread/countdown_event.h>
21
22
#include <chrono>
23
24
#include "cloud/cloud_cluster_info.h"
25
#include "cloud/cloud_meta_mgr.h"
26
#include "cloud/cloud_storage_engine.h"
27
#include "cloud/cloud_tablet.h"
28
#include "cloud/config.h"
29
#include "common/metrics/doris_metrics.h"
30
#include "common/status.h"
31
#include "cpp/lru_cache.h"
32
#include "cpp/sync_point.h"
33
#include "runtime/memory/cache_policy.h"
34
#include "util/debug_points.h"
35
#include "util/stack_util.h"
36
37
namespace doris {
38
uint64_t g_tablet_report_inactive_duration_ms = 0;
39
bvar::Adder<uint64_t> g_base_compaction_not_frozen_tablet_num(
40
        "base_compaction_not_frozen_tablet_num");
41
bvar::Adder<uint64_t> g_cumu_compaction_not_frozen_tablet_num(
42
        "cumu_compaction_not_frozen_tablet_num");
43
namespace {
44
45
// port from
46
// https://github.com/golang/groupcache/blob/master/singleflight/singleflight.go
47
template <typename Key, typename Val>
48
class SingleFlight {
49
public:
50
9
    SingleFlight() = default;
51
52
    SingleFlight(const SingleFlight&) = delete;
53
    void operator=(const SingleFlight&) = delete;
54
55
    using Loader = std::function<Val(const Key&)>;
56
57
    // Do executes and returns the results of the given function, making
58
    // sure that only one execution is in-flight for a given key at a
59
    // time. If a duplicate comes in, the duplicate caller waits for the
60
    // original to complete and receives the same results.
61
118k
    Val load(const Key& key, Loader loader) {
62
118k
        std::unique_lock lock(_call_map_mtx);
63
64
118k
        auto it = _call_map.find(key);
65
118k
        if (it != _call_map.end()) {
66
68
            auto call = it->second;
67
68
            lock.unlock();
68
68
            if (int ec = call->event.wait(); ec != 0) {
69
0
                throw std::system_error(std::error_code(ec, std::system_category()),
70
0
                                        "CountdownEvent wait failed");
71
0
            }
72
68
            return call->val;
73
68
        }
74
118k
        auto call = std::make_shared<Call>();
75
118k
        _call_map.emplace(key, call);
76
118k
        lock.unlock();
77
78
118k
        call->val = loader(key);
79
118k
        call->event.signal();
80
81
118k
        lock.lock();
82
118k
        _call_map.erase(key);
83
118k
        lock.unlock();
84
85
118k
        return call->val;
86
118k
    }
87
88
private:
89
    // `Call` is an in-flight or completed `load` call
90
    struct Call {
91
        bthread::CountdownEvent event;
92
        Val val;
93
    };
94
95
    std::mutex _call_map_mtx;
96
    std::unordered_map<Key, std::shared_ptr<Call>> _call_map;
97
};
98
99
// tablet_id -> load tablet function
100
SingleFlight<int64_t, Result<std::shared_ptr<CloudTablet>>> s_singleflight_load_tablet;
101
102
} // namespace
103
104
// tablet_id -> cached tablet
105
// This map owns all cached tablets. The lifetime of tablet can be longer than the LRU handle.
106
// It's also used for scenarios where users want to access the tablet by `tablet_id` without changing the LRU order.
107
// TODO(plat1ko): multi shard to increase concurrency
108
class CloudTabletMgr::TabletMap {
109
public:
110
116k
    void put(std::shared_ptr<CloudTablet> tablet) {
111
116k
        std::lock_guard lock(_mtx);
112
116k
        _map[tablet->tablet_id()] = std::move(tablet);
113
116k
    }
114
115
18.0k
    void erase(CloudTablet* tablet) {
116
18.0k
        std::lock_guard lock(_mtx);
117
18.0k
        auto it = _map.find(tablet->tablet_id());
118
        // According to the implementation of `LRUCache`, `deleter` may be called after a tablet
119
        // with same tablet id insert into cache and `TabletMap`. So we MUST check if the tablet
120
        // instance to be erased is the same one in the map.
121
18.2k
        if (it != _map.end() && it->second.get() == tablet) {
122
18.2k
            _map.erase(it);
123
18.2k
        }
124
18.0k
    }
125
126
0
    std::shared_ptr<CloudTablet> get(int64_t tablet_id) {
127
0
        std::lock_guard lock(_mtx);
128
0
        if (auto it = _map.find(tablet_id); it != _map.end()) {
129
0
            return it->second;
130
0
        }
131
0
        return nullptr;
132
0
    }
133
134
17.7k
    size_t size() { return _map.size(); }
135
136
17.7k
    void traverse(std::function<void(const std::shared_ptr<CloudTablet>&)> visitor) {
137
17.7k
        std::lock_guard lock(_mtx);
138
554M
        for (auto& [_, tablet] : _map) {
139
554M
            visitor(tablet);
140
554M
        }
141
17.7k
    }
142
143
private:
144
    std::mutex _mtx;
145
    std::unordered_map<int64_t, std::shared_ptr<CloudTablet>> _map;
146
};
147
148
// TODO(plat1ko): Prune cache
149
CloudTabletMgr::CloudTabletMgr(CloudStorageEngine& engine)
150
164
        : _engine(engine),
151
164
          _tablet_map(std::make_unique<TabletMap>()),
152
164
          _cache(std::make_unique<LRUCachePolicy>(
153
164
                  CachePolicy::CacheType::CLOUD_TABLET_CACHE, config::tablet_cache_capacity,
154
164
                  LRUCacheType::NUMBER, /*sweep time*/ 0, config::tablet_cache_shards,
155
164
                  /*element_count_capacity*/ 0, /*enable_prune*/ false,
156
164
                  /*is_lru_k*/ false)) {}
157
158
163
CloudTabletMgr::~CloudTabletMgr() = default;
159
160
3.03M
void set_tablet_access_time_ms(CloudTablet* tablet) {
161
3.03M
    using namespace std::chrono;
162
3.03M
    int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
163
3.03M
    tablet->last_access_time_ms = now;
164
3.03M
}
165
166
Result<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_tablet(int64_t tablet_id, bool warmup_data,
167
                                                                bool sync_delete_bitmap,
168
                                                                SyncRowsetStats* sync_stats,
169
                                                                bool force_use_only_cached,
170
1.50M
                                                                bool cache_on_miss) {
171
1.50M
    DBUG_EXECUTE_IF("CloudTabletMgr::get_tablet.block", DBUG_BLOCK);
172
    // LRU value type. `Value`'s lifetime MUST NOT be longer than `CloudTabletMgr`
173
1.50M
    class Value : public LRUCacheValueBase {
174
1.50M
    public:
175
1.50M
        Value(const std::shared_ptr<CloudTablet>& tablet, TabletMap& tablet_map)
176
1.50M
                : tablet(tablet), tablet_map(tablet_map) {}
177
1.50M
        ~Value() override { tablet_map.erase(tablet.get()); }
178
179
        // FIXME(plat1ko): The ownership of tablet seems to belong to 'TabletMap', while `Value`
180
        // only requires a reference.
181
1.50M
        std::shared_ptr<CloudTablet> tablet;
182
1.50M
        TabletMap& tablet_map;
183
1.50M
    };
184
185
18.4E
    VLOG_DEBUG << "get_tablet tablet_id=" << tablet_id << " stack: " << get_stack_trace();
186
187
1.50M
    auto tablet_id_str = std::to_string(tablet_id);
188
1.50M
    CacheKey key(tablet_id_str);
189
1.50M
    auto* handle = _cache->lookup(key);
190
191
1.50M
    if (handle == nullptr) {
192
118k
        if (force_use_only_cached) {
193
0
            LOG(INFO) << "tablet=" << tablet_id
194
0
                      << "does not exists in local tablet cache, because param "
195
0
                         "force_use_only_cached=true, "
196
0
                         "treat it as an error";
197
0
            return ResultError(Status::InternalError(
198
0
                    "tablet={} does not exists in local tablet cache, because param "
199
0
                    "force_use_only_cached=true, "
200
0
                    "treat it as an error",
201
0
                    tablet_id));
202
0
        }
203
118k
        TEST_SYNC_POINT("CloudTabletMgr::get_tablet.not_found_in_cache");
204
118k
        if (sync_stats) {
205
12.9k
            ++sync_stats->tablet_meta_cache_miss;
206
12.9k
        }
207
        // Insert into cache and tablet_map inside SingleFlight lambda to ensure
208
        // only the leader caller does this. Moving these outside the lambda causes
209
        // a race condition: when multiple concurrent callers share the same CloudTablet*
210
        // from SingleFlight, each creates a competing LRU cache entry. Delayed Value
211
        // destructors then erase the tablet_map entry (the raw pointer safety check
212
        // passes since all callers share the same pointer), and the tablet permanently
213
        // disappears from tablet_map. Subsequent get_tablet() calls hit the LRU cache
214
        // directly (cache hit path) which never re-inserts into tablet_map, making the
215
        // tablet invisible to the compaction scheduler.
216
118k
        auto load_tablet = [this, &key, warmup_data, sync_delete_bitmap, sync_stats, cache_on_miss](
217
118k
                                   int64_t tablet_id) -> Result<std::shared_ptr<CloudTablet>> {
218
118k
            TabletMetaSharedPtr tablet_meta;
219
118k
            auto start = std::chrono::steady_clock::now();
220
118k
            auto st = _engine.meta_mgr().get_tablet_meta(tablet_id, &tablet_meta);
221
118k
            auto end = std::chrono::steady_clock::now();
222
118k
            if (sync_stats) {
223
12.9k
                sync_stats->get_remote_tablet_meta_rpc_ns +=
224
12.9k
                        std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
225
12.9k
            }
226
118k
            if (!st.ok()) {
227
110
                LOG(WARNING) << "failed to tablet " << tablet_id << ": " << st;
228
110
                return ResultError(st);
229
110
            }
230
231
118k
            auto tablet = std::make_shared<CloudTablet>(_engine, std::move(tablet_meta));
232
            // MUST sync stats to let compaction scheduler work correctly
233
118k
            SyncOptions options;
234
118k
            options.warmup_delta_data = warmup_data;
235
118k
            options.sync_delete_bitmap = sync_delete_bitmap;
236
118k
            st = _engine.meta_mgr().sync_tablet_rowsets(tablet.get(), options, sync_stats);
237
118k
            if (!st.ok()) {
238
0
                LOG(WARNING) << "failed to sync tablet " << tablet_id << ": " << st;
239
0
                return ResultError(st);
240
0
            }
241
242
118k
            if (!cache_on_miss) {
243
0
                set_tablet_access_time_ms(tablet.get());
244
0
                return tablet;
245
0
            }
246
247
118k
            auto value = std::make_unique<Value>(tablet, *_tablet_map);
248
118k
            auto* insert_handle = _cache->insert(key, value.release(), 1, sizeof(CloudTablet),
249
118k
                                                 CachePriority::NORMAL);
250
118k
            auto ret = std::shared_ptr<CloudTablet>(tablet.get(),
251
118k
                                                    [this, insert_handle](CloudTablet* tablet_ptr) {
252
118k
                                                        set_tablet_access_time_ms(tablet_ptr);
253
118k
                                                        _cache->release(insert_handle);
254
118k
                                                    });
255
118k
            _tablet_map->put(std::move(tablet));
256
118k
            return ret;
257
118k
        };
258
259
118k
        auto load_result = s_singleflight_load_tablet.load(tablet_id, std::move(load_tablet));
260
118k
        if (!load_result.has_value()) {
261
110
            return ResultError(Status::InternalError("failed to get tablet {}, msg={}", tablet_id,
262
110
                                                     load_result.error()));
263
110
        }
264
118k
        auto tablet = load_result.value();
265
118k
        set_tablet_access_time_ms(tablet.get());
266
118k
        return tablet;
267
118k
    }
268
1.39M
    if (sync_stats) {
269
989k
        ++sync_stats->tablet_meta_cache_hit;
270
989k
    }
271
1.39M
    CloudTablet* tablet_raw_ptr = reinterpret_cast<Value*>(_cache->value(handle))->tablet.get();
272
1.39M
    set_tablet_access_time_ms(tablet_raw_ptr);
273
1.40M
    auto tablet = std::shared_ptr<CloudTablet>(tablet_raw_ptr, [this, handle](CloudTablet* tablet) {
274
1.40M
        set_tablet_access_time_ms(tablet);
275
1.40M
        _cache->release(handle);
276
1.40M
    });
277
1.39M
    return tablet;
278
1.50M
}
279
280
0
bool CloudTabletMgr::peek_tablet_meta(int64_t tablet_id, TabletMetaSharedPtr* tablet_meta) {
281
0
    if (tablet_meta == nullptr) {
282
0
        return false;
283
0
    }
284
0
    auto tablet = _tablet_map->get(tablet_id);
285
0
    if (!tablet) {
286
0
        return false;
287
0
    }
288
0
    *tablet_meta = tablet->tablet_meta();
289
0
    return true;
290
0
}
291
292
276
void CloudTabletMgr::erase_tablet(int64_t tablet_id) {
293
276
    auto tablet_id_str = std::to_string(tablet_id);
294
276
    CacheKey key(tablet_id_str.data(), tablet_id_str.size());
295
276
    _cache->erase(key);
296
276
}
297
298
11
void CloudTabletMgr::vacuum_stale_rowsets(const CountDownLatch& stop_latch) {
299
11
    LOG_INFO("begin to vacuum stale rowsets");
300
11
    std::vector<std::shared_ptr<CloudTablet>> tablets_to_vacuum;
301
11
    tablets_to_vacuum.reserve(_tablet_map->size());
302
574k
    _tablet_map->traverse([&tablets_to_vacuum](auto&& t) {
303
574k
        if (t->has_stale_rowsets()) {
304
4.46k
            tablets_to_vacuum.push_back(t);
305
4.46k
        }
306
574k
    });
307
11
    int num_vacuumed = 0;
308
4.46k
    for (auto& t : tablets_to_vacuum) {
309
4.46k
        if (stop_latch.count() <= 0) {
310
0
            break;
311
0
        }
312
313
4.46k
        num_vacuumed += t->delete_expired_stale_rowsets();
314
4.46k
    }
315
11
    LOG_INFO("finish vacuum stale rowsets")
316
11
            .tag("num_vacuumed", num_vacuumed)
317
11
            .tag("num_tablets", tablets_to_vacuum.size());
318
319
11
    {
320
11
        LOG_INFO("begin to remove unused rowsets");
321
11
        std::vector<std::shared_ptr<CloudTablet>> tablets_to_remove_unused_rowsets;
322
11
        tablets_to_remove_unused_rowsets.reserve(_tablet_map->size());
323
574k
        _tablet_map->traverse([&tablets_to_remove_unused_rowsets](auto&& t) {
324
574k
            if (t->need_remove_unused_rowsets()) {
325
2.42k
                tablets_to_remove_unused_rowsets.push_back(t);
326
2.42k
            }
327
574k
        });
328
2.42k
        for (auto& t : tablets_to_remove_unused_rowsets) {
329
2.42k
            t->remove_unused_rowsets();
330
2.42k
        }
331
11
        LOG_INFO("finish remove unused rowsets")
332
11
                .tag("num_tablets", tablets_to_remove_unused_rowsets.size());
333
11
        if (config::enable_check_agg_and_remove_pre_rowsets_delete_bitmap) {
334
0
            int64_t max_useless_rowset_count = 0;
335
0
            int64_t tablet_id_with_max_useless_rowset_count = 0;
336
0
            int64_t max_useless_rowset_version_count = 0;
337
0
            int64_t tablet_id_with_max_useless_rowset_version_count = 0;
338
0
            OlapStopWatch watch;
339
0
            _tablet_map->traverse([&](auto&& tablet) {
340
0
                int64_t useless_rowset_count = 0;
341
0
                int64_t useless_rowset_version_count = 0;
342
0
                tablet->check_agg_delete_bitmap_for_stale_rowsets(useless_rowset_count,
343
0
                                                                  useless_rowset_version_count);
344
0
                if (useless_rowset_count > max_useless_rowset_count) {
345
0
                    max_useless_rowset_count = useless_rowset_count;
346
0
                    tablet_id_with_max_useless_rowset_count = tablet->tablet_id();
347
0
                }
348
0
                if (useless_rowset_version_count > max_useless_rowset_version_count) {
349
0
                    max_useless_rowset_version_count = useless_rowset_version_count;
350
0
                    tablet_id_with_max_useless_rowset_version_count = tablet->tablet_id();
351
0
                }
352
0
            });
353
0
            g_max_rowsets_with_useless_delete_bitmap.set_value(max_useless_rowset_count);
354
0
            g_max_rowsets_with_useless_delete_bitmap_version.set_value(
355
0
                    max_useless_rowset_version_count);
356
0
            LOG(INFO) << "finish check_agg_delete_bitmap_for_stale_rowsets, cost(us)="
357
0
                      << watch.get_elapse_time_us()
358
0
                      << ". max useless rowset count=" << max_useless_rowset_count
359
0
                      << ", tablet_id=" << tablet_id_with_max_useless_rowset_count
360
0
                      << ", max useless rowset version count=" << max_useless_rowset_version_count
361
0
                      << ", tablet_id=" << tablet_id_with_max_useless_rowset_version_count;
362
0
        }
363
11
    }
364
11
    {
365
11
        _tablet_map->traverse(
366
574k
                [](auto&& tablet) { tablet->clear_unused_visible_pending_rowsets(); });
367
11
    }
368
11
}
369
370
17.6k
std::vector<std::weak_ptr<CloudTablet>> CloudTabletMgr::get_weak_tablets() {
371
17.6k
    std::vector<std::weak_ptr<CloudTablet>> weak_tablets;
372
17.6k
    weak_tablets.reserve(_tablet_map->size());
373
552M
    _tablet_map->traverse([&weak_tablets](auto& t) { weak_tablets.push_back(t); });
374
17.6k
    return weak_tablets;
375
17.6k
}
376
377
5
void CloudTabletMgr::sync_tablets(const CountDownLatch& stop_latch) {
378
5
    LOG_INFO("begin to sync tablets");
379
5
    int64_t last_sync_time_bound = ::time(nullptr) - config::tablet_sync_interval_s;
380
381
5
    auto weak_tablets = get_weak_tablets();
382
383
    // sort by last_sync_time
384
106k
    static auto cmp = [](const auto& a, const auto& b) { return a.first < b.first; };
385
5
    std::multiset<std::pair<int64_t, std::weak_ptr<CloudTablet>>, decltype(cmp)>
386
5
            sync_time_tablet_set(cmp);
387
388
257k
    for (auto& weak_tablet : weak_tablets) {
389
257k
        if (auto tablet = weak_tablet.lock()) {
390
257k
            int64_t last_sync_time = tablet->last_sync_time_s;
391
257k
            if (last_sync_time <= last_sync_time_bound) {
392
7.05k
                sync_time_tablet_set.emplace(last_sync_time, weak_tablet);
393
7.05k
            }
394
257k
        }
395
257k
    }
396
397
5
    int num_sync = 0;
398
7.05k
    for (auto&& [_, weak_tablet] : sync_time_tablet_set) {
399
7.05k
        if (stop_latch.count() <= 0) {
400
0
            break;
401
0
        }
402
403
7.05k
        if (auto tablet = weak_tablet.lock()) {
404
7.04k
            if (tablet->last_sync_time_s > last_sync_time_bound) {
405
0
                continue;
406
0
            }
407
408
7.04k
            ++num_sync;
409
7.04k
            auto st = tablet->sync_meta();
410
7.04k
            if (!st) {
411
0
                LOG_WARNING("failed to sync tablet meta {}", tablet->tablet_id()).error(st);
412
0
                if (st.is<ErrorCode::NOT_FOUND>()) {
413
0
                    continue;
414
0
                }
415
0
            }
416
7.04k
            SyncOptions options;
417
7.04k
            options.query_version = -1;
418
7.04k
            options.merge_schema = true;
419
7.04k
            st = tablet->sync_rowsets(options);
420
7.04k
            if (!st) {
421
0
                LOG_WARNING("failed to sync tablet rowsets {}", tablet->tablet_id()).error(st);
422
0
            }
423
7.04k
        }
424
7.05k
    }
425
5
    LOG_INFO("finish sync tablets").tag("num_sync", num_sync);
426
5
}
427
428
Status CloudTabletMgr::get_topn_tablets_to_compact(
429
        int n, CompactionType compaction_type, const std::function<bool(CloudTablet*)>& filter_out,
430
17.5k
        std::vector<std::shared_ptr<CloudTablet>>* tablets, int64_t* max_score) {
431
17.5k
    DCHECK(compaction_type == CompactionType::BASE_COMPACTION ||
432
17.5k
           compaction_type == CompactionType::CUMULATIVE_COMPACTION);
433
17.5k
    *max_score = 0;
434
17.5k
    int64_t max_score_tablet_id = 0;
435
    // clang-format off
436
547M
    auto score = [compaction_type](CloudTablet* t) {
437
547M
        return compaction_type == CompactionType::BASE_COMPACTION ? t->get_cloud_base_compaction_score()
438
547M
               : compaction_type == CompactionType::CUMULATIVE_COMPACTION ? t->get_cloud_cumu_compaction_score()
439
492M
               : 0;
440
547M
    };
441
442
17.5k
    using namespace std::chrono;
443
17.5k
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
444
256M
    auto skip = [now, compaction_type](CloudTablet* t) {
445
256M
        auto* cloud_cluster_info = static_cast<CloudClusterInfo*>(ExecEnv::GetInstance()->cluster_info());
446
447
256M
        if (config::enable_standby_passive_compaction && cloud_cluster_info->is_in_standby()) {
448
0
            if (t->fetch_add_approximate_num_rowsets(0) < config::max_tablet_version_num * config::standby_compaction_version_ratio) {
449
0
                return true;
450
0
            }
451
0
        }
452
453
        // Compaction read-write separation: skip tablets that should be compacted by other clusters.
454
        // Placed after standby check so standby invariants (version count threshold) are preserved.
455
256M
        if (cloud_cluster_info->should_skip_compaction(t)) {
456
0
            return true;
457
0
        }
458
459
256M
        int32_t max_version_config = t->max_version_config();
460
256M
        if (compaction_type == CompactionType::BASE_COMPACTION) {
461
54.3M
            bool is_recent_failure = now - t->last_base_compaction_failure_time() < config::min_compaction_failure_interval_ms;
462
54.3M
            bool is_frozen = (now - t->last_load_time_ms > config::compaction_load_max_freeze_interval_s * 1000
463
54.3M
                   && now - t->last_base_compaction_success_time_ms < config::base_compaction_freeze_interval_s * 1000
464
54.3M
                   && t->fetch_add_approximate_num_rowsets(0) < max_version_config / 2);
465
54.3M
            g_base_compaction_not_frozen_tablet_num << !is_frozen;
466
54.3M
            return is_recent_failure || is_frozen;
467
54.3M
        }
468
469
        // If tablet has too many rowsets but not be compacted for a long time, compaction should be performed
470
        // regardless of whether there is a load job recently.
471
201M
        bool is_recent_failure = now - t->last_cumu_compaction_failure_time() < config::min_compaction_failure_interval_ms;
472
201M
        bool is_recent_no_suitable_version = now - t->last_cumu_no_suitable_version_ms < config::min_compaction_failure_interval_ms;
473
201M
        bool is_frozen = (now - t->last_load_time_ms > config::compaction_load_max_freeze_interval_s * 1000
474
201M
               && now - t->last_cumu_compaction_success_time_ms < config::cumu_compaction_interval_s * 1000
475
201M
               && t->fetch_add_approximate_num_rowsets(0) < max_version_config / 2);
476
201M
        g_cumu_compaction_not_frozen_tablet_num << !is_frozen;
477
201M
        return is_recent_failure || is_recent_no_suitable_version || is_frozen;
478
256M
    };
479
    // We don't schedule tablets that are disabled for compaction
480
260M
    auto disable = [](CloudTablet* t) { return t->tablet_meta()->tablet_schema()->disable_auto_compaction(); };
481
482
17.5k
    auto [num_filtered, num_disabled, num_skipped] = std::make_tuple(0, 0, 0);
483
484
17.5k
    auto weak_tablets = get_weak_tablets();
485
17.5k
    std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf;
486
17.5k
    buf.reserve(n + 1);
487
547M
    for (auto& weak_tablet : weak_tablets) {
488
547M
        auto t = weak_tablet.lock();
489
547M
        if (t == nullptr) { continue; }
490
491
547M
        int64_t s = score(t.get());
492
547M
        if (s <= 0) { continue; }
493
260M
        if (s > *max_score) {
494
94.9k
            max_score_tablet_id = t->tablet_id();
495
94.9k
            *max_score = s;
496
94.9k
        }
497
498
260M
        if (filter_out(t.get())) { ++num_filtered; continue; }
499
260M
        if (disable(t.get())) { ++num_disabled; continue; }
500
256M
        if (skip(t.get())) { ++num_skipped; continue; }
501
502
251M
        buf.emplace_back(std::move(t), s);
503
2.95G
        std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; });
504
251M
        if (buf.size() > n) { buf.pop_back(); }
505
251M
    }
506
507
17.5k
    LOG_EVERY_N(INFO, 1000) << "get_topn_compaction_score, n=" << n << " type=" << compaction_type
508
19
               << " num_tablets=" << weak_tablets.size() << " num_skipped=" << num_skipped
509
19
               << " num_disabled=" << num_disabled << " num_filtered=" << num_filtered
510
19
               << " max_score=" << *max_score << " max_score_tablet=" << max_score_tablet_id
511
99
               << " tablets=[" << [&buf] { std::stringstream ss; for (auto& i : buf) ss << i.first->tablet_id() << ":" << i.second << ","; return ss.str(); }() << "]"
512
17.5k
               ;
513
    // clang-format on
514
515
17.5k
    tablets->clear();
516
17.5k
    tablets->reserve(n + 1);
517
105k
    for (auto& [t, _] : buf) {
518
105k
        tablets->emplace_back(std::move(t));
519
105k
    }
520
521
17.5k
    return Status::OK();
522
17.5k
}
523
524
void CloudTabletMgr::build_all_report_tablets_info(std::map<TTabletId, TTablet>* tablets_info,
525
73
                                                   uint64_t* tablet_num) {
526
73
    DCHECK(tablets_info != nullptr);
527
73
    VLOG_NOTICE << "begin to build all report cloud tablets info";
528
529
73
    HistogramStat tablet_version_num_hist;
530
531
3.93M
    auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
532
3.93M
        auto tablet = tablet_wk.lock();
533
3.93M
        if (!tablet) return;
534
3.93M
        (*tablet_num)++;
535
3.93M
        TTabletInfo tablet_info;
536
3.93M
        tablet->build_tablet_report_info(&tablet_info);
537
3.93M
        using namespace std::chrono;
538
3.93M
        int64_t now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
539
3.93M
        if (now - g_tablet_report_inactive_duration_ms < tablet->last_access_time_ms) {
540
            // the tablet is still being accessed and used in recently, so not report it
541
3.93M
            return;
542
3.93M
        }
543
0
        auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
544
        // On the cloud, a specific BE has only one tablet replica;
545
        // there are no multiple replicas for a specific BE.
546
        // This is only to reuse the non-cloud report protocol.
547
0
        tablet_version_num_hist.add(tablet_info.total_version_count);
548
0
        t_tablet.tablet_infos.emplace_back(std::move(tablet_info));
549
0
    };
550
551
73
    auto weak_tablets = get_weak_tablets();
552
73
    std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);
553
554
73
    DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
555
73
            tablet_version_num_hist);
556
73
    LOG(INFO) << "success to build all cloud report tablets info. all_tablet_count=" << *tablet_num
557
73
              << " exceed drop time limit count=" << tablets_info->size();
558
73
}
559
560
0
void CloudTabletMgr::get_tablet_info(int64_t num_tablets, std::vector<TabletInfo>* tablets_info) {
561
0
    auto weak_tablets = get_weak_tablets();
562
0
    for (auto& weak_tablet : weak_tablets) {
563
0
        auto tablet = weak_tablet.lock();
564
0
        if (tablet == nullptr) {
565
0
            continue;
566
0
        }
567
0
        if (tablets_info->size() >= num_tablets) {
568
0
            return;
569
0
        }
570
0
        tablets_info->push_back(tablet->get_tablet_info());
571
0
    }
572
0
}
573
574
void CloudTabletMgr::get_topn_tablet_delete_bitmap_score(
575
11
        uint64_t* max_delete_bitmap_score, uint64_t* max_base_rowset_delete_bitmap_score) {
576
11
    int64_t max_delete_bitmap_score_tablet_id = 0;
577
11
    OlapStopWatch watch;
578
11
    uint64_t total_delete_map_count = 0;
579
11
    int64_t max_base_rowset_delete_bitmap_score_tablet_id = 0;
580
11
    int n = config::check_tablet_delete_bitmap_score_top_n;
581
11
    std::vector<std::pair<std::shared_ptr<CloudTablet>, int64_t>> buf;
582
11
    buf.reserve(n + 1);
583
572k
    auto handler = [&](const std::weak_ptr<CloudTablet>& tablet_wk) {
584
572k
        auto t = tablet_wk.lock();
585
572k
        if (!t) return;
586
572k
        uint64_t delete_bitmap_count =
587
572k
                t.get()->tablet_meta()->delete_bitmap().get_delete_bitmap_count();
588
572k
        total_delete_map_count += delete_bitmap_count;
589
572k
        if (delete_bitmap_count > *max_delete_bitmap_score) {
590
65
            max_delete_bitmap_score_tablet_id = t->tablet_id();
591
65
            *max_delete_bitmap_score = delete_bitmap_count;
592
65
        }
593
572k
        buf.emplace_back(std::move(t), delete_bitmap_count);
594
11.4M
        std::sort(buf.begin(), buf.end(), [](auto& a, auto& b) { return a.second > b.second; });
595
572k
        if (buf.size() > n) {
596
572k
            buf.pop_back();
597
572k
        }
598
572k
    };
599
11
    auto weak_tablets = get_weak_tablets();
600
11
    std::for_each(weak_tablets.begin(), weak_tablets.end(), handler);
601
110
    for (auto& [t, _] : buf) {
602
110
        t->get_base_rowset_delete_bitmap_count(max_base_rowset_delete_bitmap_score,
603
110
                                               &max_base_rowset_delete_bitmap_score_tablet_id);
604
110
    }
605
11
    std::stringstream ss;
606
110
    for (auto& i : buf) {
607
110
        ss << i.first->tablet_id() << ": " << i.second << ", ";
608
110
    }
609
11
    LOG(INFO) << "get_topn_tablet_delete_bitmap_score, n=" << n
610
11
              << ", tablet size=" << weak_tablets.size()
611
11
              << ", total_delete_map_count=" << total_delete_map_count
612
11
              << ", cost(us)=" << watch.get_elapse_time_us()
613
11
              << ", max_delete_bitmap_score=" << *max_delete_bitmap_score
614
11
              << ", max_delete_bitmap_score_tablet_id=" << max_delete_bitmap_score_tablet_id
615
11
              << ", max_base_rowset_delete_bitmap_score=" << *max_base_rowset_delete_bitmap_score
616
11
              << ", max_base_rowset_delete_bitmap_score_tablet_id="
617
11
              << max_base_rowset_delete_bitmap_score_tablet_id << ", tablets=[" << ss.str() << "]";
618
11
}
619
620
2
std::vector<std::shared_ptr<CloudTablet>> CloudTabletMgr::get_all_tablet() {
621
2
    std::vector<std::shared_ptr<CloudTablet>> tablets;
622
2
    tablets.reserve(_tablet_map->size());
623
12.3k
    _tablet_map->traverse([&tablets](auto& t) { tablets.push_back(t); });
624
2
    return tablets;
625
2
}
626
627
2
void CloudTabletMgr::put_tablet_for_UT(std::shared_ptr<CloudTablet> tablet) {
628
2
    _tablet_map->put(tablet);
629
2
}
630
631
} // namespace doris