Coverage Report

Created: 2025-06-13 21:21

/root/doris/cloud/src/recycler/checker.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/checker.h"
19
20
#include <aws/s3/S3Client.h>
21
#include <aws/s3/model/ListObjectsV2Request.h>
22
#include <butil/endpoint.h>
23
#include <butil/strings/string_split.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/cloud.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <chrono>
30
#include <cstdint>
31
#include <memory>
32
#include <mutex>
33
#include <sstream>
34
#include <string_view>
35
#include <unordered_set>
36
#include <vector>
37
38
#include "common/bvars.h"
39
#include "common/config.h"
40
#include "common/encryption_util.h"
41
#include "common/logging.h"
42
#include "common/util.h"
43
#include "cpp/sync_point.h"
44
#include "meta-service/keys.h"
45
#include "meta-service/txn_kv.h"
46
#include "meta-service/txn_kv_error.h"
47
#include "recycler/hdfs_accessor.h"
48
#include "recycler/s3_accessor.h"
49
#include "recycler/storage_vault_accessor.h"
50
#ifdef UNIT_TEST
51
#include "../test/mock_accessor.h"
52
#endif
53
#include "recycler/util.h"
54
55
namespace doris::cloud {
56
namespace config {
57
extern int32_t brpc_listen_port;
58
extern int32_t scan_instances_interval_seconds;
59
extern int32_t recycle_job_lease_expired_ms;
60
extern int32_t recycle_concurrency;
61
extern std::vector<std::string> recycle_whitelist;
62
extern std::vector<std::string> recycle_blacklist;
63
extern bool enable_inverted_check;
64
} // namespace config
65
66
5
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
67
5
    ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port);
68
5
}
69
70
5
Checker::~Checker() {
71
5
    if (!stopped()) {
72
1
        stop();
73
1
    }
74
5
}
75
76
4
int Checker::start() {
77
4
    DCHECK(txn_kv_);
78
4
    instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
79
80
    // launch instance scanner
81
4
    auto scanner_func = [this]() {
82
4
        std::this_thread::sleep_for(
83
4
                std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
84
7
        while (!stopped()) {
85
3
            std::vector<InstanceInfoPB> instances;
86
3
            get_all_instances(txn_kv_.get(), instances);
87
3
            LOG(INFO) << "Checker get instances: " << [&instances] {
88
3
                std::stringstream ss;
89
30
                for (auto& i : instances) ss << ' ' << i.instance_id();
90
3
                return ss.str();
91
3
            }();
92
3
            if (!instances.empty()) {
93
                // enqueue instances
94
3
                std::lock_guard lock(mtx_);
95
30
                for (auto& instance : instances) {
96
30
                    if (instance_filter_.filter_out(instance.instance_id())) continue;
97
30
                    if (instance.status() == InstanceInfoPB::DELETED) continue;
98
30
                    using namespace std::chrono;
99
30
                    auto enqueue_time_s =
100
30
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
101
30
                    auto [_, success] =
102
30
                            pending_instance_map_.insert({instance.instance_id(), enqueue_time_s});
103
                    // skip instance already in pending queue
104
30
                    if (success) {
105
30
                        pending_instance_queue_.push_back(std::move(instance));
106
30
                    }
107
30
                }
108
3
                pending_instance_cond_.notify_all();
109
3
            }
110
3
            {
111
3
                std::unique_lock lock(mtx_);
112
3
                notifier_.wait_for(lock,
113
3
                                   std::chrono::seconds(config::scan_instances_interval_seconds),
114
6
                                   [&]() { return stopped(); });
115
3
            }
116
3
        }
117
4
    };
118
4
    workers_.emplace_back(scanner_func);
119
    // Launch lease thread
120
4
    workers_.emplace_back([this] { lease_check_jobs(); });
121
    // Launch inspect thread
122
4
    workers_.emplace_back([this] { inspect_instance_check_interval(); });
123
124
    // launch check workers
125
8
    auto checker_func = [this]() {
126
38
        while (!stopped()) {
127
            // fetch instance to check
128
36
            InstanceInfoPB instance;
129
36
            long enqueue_time_s = 0;
130
36
            {
131
36
                std::unique_lock lock(mtx_);
132
48
                pending_instance_cond_.wait(lock, [&]() -> bool {
133
48
                    return !pending_instance_queue_.empty() || stopped();
134
48
                });
135
36
                if (stopped()) {
136
6
                    return;
137
6
                }
138
30
                instance = std::move(pending_instance_queue_.front());
139
30
                pending_instance_queue_.pop_front();
140
30
                enqueue_time_s = pending_instance_map_[instance.instance_id()];
141
30
                pending_instance_map_.erase(instance.instance_id());
142
30
            }
143
0
            const auto& instance_id = instance.instance_id();
144
30
            {
145
30
                std::lock_guard lock(mtx_);
146
                // skip instance in recycling
147
30
                if (working_instance_map_.count(instance_id)) continue;
148
30
            }
149
30
            auto checker = std::make_shared<InstanceChecker>(txn_kv_, instance.instance_id());
150
30
            if (checker->init(instance) != 0) {
151
0
                LOG(WARNING) << "failed to init instance checker, instance_id="
152
0
                             << instance.instance_id();
153
0
                continue;
154
0
            }
155
30
            std::string check_job_key;
156
30
            job_check_key({instance.instance_id()}, &check_job_key);
157
30
            int ret = prepare_instance_recycle_job(txn_kv_.get(), check_job_key,
158
30
                                                   instance.instance_id(), ip_port_,
159
30
                                                   config::check_object_interval_seconds * 1000);
160
30
            if (ret != 0) { // Prepare failed
161
20
                continue;
162
20
            } else {
163
10
                std::lock_guard lock(mtx_);
164
10
                working_instance_map_.emplace(instance_id, checker);
165
10
            }
166
10
            if (stopped()) return;
167
10
            using namespace std::chrono;
168
10
            auto ctime_ms =
169
10
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
170
10
            g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
171
172
10
            bool success {true};
173
174
10
            if (int ret = checker->do_check(); ret != 0) {
175
0
                success = false;
176
0
            }
177
178
10
            if (config::enable_inverted_check) {
179
0
                if (int ret = checker->do_inverted_check(); ret != 0) {
180
0
                    success = false;
181
0
                }
182
0
            }
183
184
10
            if (config::enable_delete_bitmap_inverted_check) {
185
0
                if (int ret = checker->do_delete_bitmap_inverted_check(); ret != 0) {
186
0
                    success = false;
187
0
                }
188
0
            }
189
190
10
            if (config::enable_delete_bitmap_storage_optimize_check) {
191
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(); ret != 0) {
192
0
                    success = false;
193
0
                }
194
0
            }
195
196
10
            if (config::enable_mow_compaction_key_check) {
197
0
                if (int ret = checker->do_mow_compaction_key_check(); ret != 0) {
198
0
                    success = false;
199
0
                }
200
0
            }
201
202
10
            if (config::enable_delete_bitmap_storage_optimize_v2_check) {
203
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
204
0
                    ret != 0) {
205
0
                    success = false;
206
0
                }
207
0
            }
208
209
            // If instance checker has been aborted, don't finish this job
210
10
            if (!checker->stopped()) {
211
10
                finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
212
10
                                            ip_port_, success, ctime_ms);
213
10
            }
214
10
            {
215
10
                std::lock_guard lock(mtx_);
216
10
                working_instance_map_.erase(instance.instance_id());
217
10
            }
218
10
        }
219
8
    };
220
4
    int num_threads = config::recycle_concurrency; // FIXME: use a new config entry?
221
12
    for (int i = 0; i < num_threads; ++i) {
222
8
        workers_.emplace_back(checker_func);
223
8
    }
224
4
    return 0;
225
4
}
226
227
5
void Checker::stop() {
228
5
    stopped_ = true;
229
5
    notifier_.notify_all();
230
5
    pending_instance_cond_.notify_all();
231
5
    {
232
5
        std::lock_guard lock(mtx_);
233
5
        for (auto& [_, checker] : working_instance_map_) {
234
0
            checker->stop();
235
0
        }
236
5
    }
237
20
    for (auto& w : workers_) {
238
20
        if (w.joinable()) w.join();
239
20
    }
240
5
}
241
242
4
void Checker::lease_check_jobs() {
243
54
    while (!stopped()) {
244
50
        std::vector<std::string> instances;
245
50
        instances.reserve(working_instance_map_.size());
246
50
        {
247
50
            std::lock_guard lock(mtx_);
248
50
            for (auto& [id, _] : working_instance_map_) {
249
30
                instances.push_back(id);
250
30
            }
251
50
        }
252
50
        for (auto& i : instances) {
253
30
            std::string check_job_key;
254
30
            job_check_key({i}, &check_job_key);
255
30
            int ret = lease_instance_recycle_job(txn_kv_.get(), check_job_key, i, ip_port_);
256
30
            if (ret == 1) {
257
0
                std::lock_guard lock(mtx_);
258
0
                if (auto it = working_instance_map_.find(i); it != working_instance_map_.end()) {
259
0
                    it->second->stop();
260
0
                }
261
0
            }
262
30
        }
263
50
        {
264
50
            std::unique_lock lock(mtx_);
265
50
            notifier_.wait_for(lock,
266
50
                               std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
267
100
                               [&]() { return stopped(); });
268
50
        }
269
50
    }
270
4
}
271
272
0
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
273
34
void Checker::do_inspect(const InstanceInfoPB& instance) {
274
34
    std::string check_job_key = job_check_key({instance.instance_id()});
275
34
    std::unique_ptr<Transaction> txn;
276
34
    std::string val;
277
34
    TxnErrorCode err = txn_kv_->create_txn(&txn);
278
34
    if (err != TxnErrorCode::TXN_OK) {
279
0
        LOG_CHECK_INTERVAL_ALARM << "failed to create txn";
280
0
        return;
281
0
    }
282
34
    err = txn->get(check_job_key, &val);
283
34
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
284
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get kv, err=" << err
285
0
                                 << " key=" << hex(check_job_key);
286
0
        return;
287
0
    }
288
34
    auto checker = InstanceChecker(txn_kv_, instance.instance_id());
289
34
    if (checker.init(instance) != 0) {
290
0
        LOG_CHECK_INTERVAL_ALARM << "failed to init instance checker, instance_id="
291
0
                                 << instance.instance_id();
292
0
        return;
293
0
    }
294
295
34
    int64_t bucket_lifecycle_days = 0;
296
34
    if (checker.get_bucket_lifecycle(&bucket_lifecycle_days) != 0) {
297
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get bucket lifecycle, instance_id="
298
0
                                 << instance.instance_id();
299
0
        return;
300
0
    }
301
34
    DCHECK(bucket_lifecycle_days > 0);
302
303
34
    if (bucket_lifecycle_days == INT64_MAX) {
304
        // No s3 bucket (may all accessors are HdfsAccessor), skip inspect
305
34
        return;
306
34
    }
307
308
0
    int64_t last_ctime_ms = -1;
309
0
    auto job_status = JobRecyclePB::IDLE;
310
0
    auto has_last_ctime = [&]() {
311
0
        JobRecyclePB job_info;
312
0
        if (!job_info.ParseFromString(val)) {
313
0
            LOG_CHECK_INTERVAL_ALARM << "failed to parse JobRecyclePB, key=" << hex(check_job_key);
314
0
        }
315
0
        DCHECK(job_info.instance_id() == instance.instance_id());
316
0
        if (!job_info.has_last_ctime_ms()) return false;
317
0
        last_ctime_ms = job_info.last_ctime_ms();
318
0
        job_status = job_info.status();
319
0
        g_bvar_checker_last_success_time_ms.put(instance.instance_id(),
320
0
                                                job_info.last_success_time_ms());
321
0
        return true;
322
0
    };
323
0
    using namespace std::chrono;
324
0
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
325
0
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || !has_last_ctime()) {
326
        // Use instance's ctime for instances that do not have job's last ctime
327
0
        last_ctime_ms = instance.ctime();
328
0
    }
329
0
    DCHECK(now - last_ctime_ms >= 0);
330
0
    int64_t expiration_ms =
331
0
            bucket_lifecycle_days > config::reserved_buffer_days
332
0
                    ? (bucket_lifecycle_days - config::reserved_buffer_days) * 86400000
333
0
                    : bucket_lifecycle_days * 86400000;
334
0
    TEST_SYNC_POINT_CALLBACK("Checker:do_inspect", &last_ctime_ms);
335
0
    if (now - last_ctime_ms >= expiration_ms) {
336
0
        LOG_CHECK_INTERVAL_ALARM << "check risks, instance_id: " << instance.instance_id()
337
0
                                 << " last_ctime_ms: " << last_ctime_ms
338
0
                                 << " job_status: " << job_status
339
0
                                 << " bucket_lifecycle_days: " << bucket_lifecycle_days
340
0
                                 << " reserved_buffer_days: " << config::reserved_buffer_days
341
0
                                 << " expiration_ms: " << expiration_ms;
342
0
    }
343
0
}
344
#undef LOG_CHECK_INTERVAL_ALARM
345
4
void Checker::inspect_instance_check_interval() {
346
7
    while (!stopped()) {
347
3
        LOG(INFO) << "start to inspect instance check interval";
348
3
        std::vector<InstanceInfoPB> instances;
349
3
        get_all_instances(txn_kv_.get(), instances);
350
30
        for (const auto& instance : instances) {
351
30
            if (instance_filter_.filter_out(instance.instance_id())) continue;
352
30
            if (stopped()) return;
353
30
            if (instance.status() == InstanceInfoPB::DELETED) continue;
354
30
            do_inspect(instance);
355
30
        }
356
3
        {
357
3
            std::unique_lock lock(mtx_);
358
3
            notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
359
6
                               [&]() { return stopped(); });
360
3
        }
361
3
    }
362
4
}
363
364
// return 0 for success get a key, 1 for key not found, negative for error
365
8
int key_exist(TxnKv* txn_kv, std::string_view key) {
366
8
    std::unique_ptr<Transaction> txn;
367
8
    TxnErrorCode err = txn_kv->create_txn(&txn);
368
8
    if (err != TxnErrorCode::TXN_OK) {
369
0
        LOG(WARNING) << "failed to init txn, err=" << err;
370
0
        return -1;
371
0
    }
372
8
    std::string val;
373
8
    switch (txn->get(key, &val)) {
374
8
    case TxnErrorCode::TXN_OK:
375
8
        return 0;
376
0
    case TxnErrorCode::TXN_KEY_NOT_FOUND:
377
0
        return 1;
378
0
    default:
379
0
        return -1;
380
8
    }
381
8
}
382
383
InstanceChecker::InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id)
384
74
        : txn_kv_(std::move(txn_kv)), instance_id_(instance_id) {}
385
386
74
int InstanceChecker::init(const InstanceInfoPB& instance) {
387
74
    int ret = init_obj_store_accessors(instance);
388
74
    if (ret != 0) {
389
0
        return ret;
390
0
    }
391
392
74
    return init_storage_vault_accessors(instance);
393
74
}
394
395
74
int InstanceChecker::init_obj_store_accessors(const InstanceInfoPB& instance) {
396
74
    for (const auto& obj_info : instance.obj_info()) {
397
74
#ifdef UNIT_TEST
398
74
        auto accessor = std::make_shared<MockAccessor>();
399
#else
400
        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
401
        if (!s3_conf) {
402
            LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
403
            return -1;
404
        }
405
406
        std::shared_ptr<S3Accessor> accessor;
407
        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
408
        if (ret != 0) {
409
            LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
410
                         << " resource_id=" << obj_info.id();
411
            return ret;
412
        }
413
#endif
414
415
74
        accessor_map_.emplace(obj_info.id(), std::move(accessor));
416
74
    }
417
418
74
    return 0;
419
74
}
420
421
74
int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance) {
422
74
    if (instance.resource_ids().empty()) {
423
74
        return 0;
424
74
    }
425
426
0
    FullRangeGetIteratorOptions opts(txn_kv_);
427
0
    opts.prefetch = true;
428
0
    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
429
0
                                      storage_vault_key({instance_id_, "\xff"}), std::move(opts));
430
431
0
    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
432
0
        auto [k, v] = *kv;
433
0
        StorageVaultPB vault;
434
0
        if (!vault.ParseFromArray(v.data(), v.size())) {
435
0
            LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
436
0
            return -1;
437
0
        }
438
439
0
        if (vault.has_hdfs_info()) {
440
0
            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
441
0
            int ret = accessor->init();
442
0
            if (ret != 0) {
443
0
                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
444
0
                             << " resource_id=" << vault.id() << " name=" << vault.name();
445
0
                return ret;
446
0
            }
447
448
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
449
0
        } else if (vault.has_obj_info()) {
450
0
#ifdef UNIT_TEST
451
0
            auto accessor = std::make_shared<MockAccessor>();
452
#else
453
            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
454
            if (!s3_conf) {
455
                LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
456
                return -1;
457
            }
458
459
            std::shared_ptr<S3Accessor> accessor;
460
            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
461
            if (ret != 0) {
462
                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
463
                             << " resource_id=" << vault.id() << " name=" << vault.name();
464
                return ret;
465
            }
466
#endif
467
468
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
469
0
        }
470
0
    }
471
472
0
    if (!it->is_valid()) {
473
0
        LOG_WARNING("failed to get storage vault kv");
474
0
        return -1;
475
0
    }
476
0
    return 0;
477
0
}
478
479
12
int InstanceChecker::do_check() {
480
12
    TEST_SYNC_POINT("InstanceChecker.do_check");
481
12
    LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
482
12
    int check_ret = 0;
483
12
    long num_scanned = 0;
484
12
    long num_scanned_with_segment = 0;
485
12
    long num_rowset_loss = 0;
486
12
    long instance_volume = 0;
487
12
    using namespace std::chrono;
488
12
    auto start_time = steady_clock::now();
489
12
    std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
490
12
        auto cost = duration<float>(steady_clock::now() - start_time).count();
491
12
        LOG(INFO) << "check instance objects finished, cost=" << cost
492
12
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
493
12
                  << " num_scanned_with_segment=" << num_scanned_with_segment
494
12
                  << " num_rowset_loss=" << num_rowset_loss
495
12
                  << " instance_volume=" << instance_volume;
496
12
        g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
497
12
        g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
498
12
        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
499
12
        g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
500
        // FIXME(plat1ko): What if some list operation failed?
501
12
        g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
502
12
    });
503
504
12
    struct TabletFiles {
505
12
        int64_t tablet_id {0};
506
12
        std::unordered_set<std::string> files;
507
12
    };
508
12
    TabletFiles tablet_files_cache;
509
510
12
    auto check_rowset_objects = [&, this](const doris::RowsetMetaCloudPB& rs_meta,
511
4.00k
                                          std::string_view key) {
512
4.00k
        if (rs_meta.num_segments() == 0) {
513
0
            return;
514
0
        }
515
516
4.00k
        ++num_scanned_with_segment;
517
4.00k
        if (tablet_files_cache.tablet_id != rs_meta.tablet_id()) {
518
400
            long tablet_volume = 0;
519
            // Clear cache
520
400
            tablet_files_cache.tablet_id = 0;
521
400
            tablet_files_cache.files.clear();
522
            // Get all file paths under this tablet directory
523
400
            auto find_it = accessor_map_.find(rs_meta.resource_id());
524
400
            if (find_it == accessor_map_.end()) {
525
0
                LOG_WARNING("resource id not found in accessor map")
526
0
                        .tag("resource_id", rs_meta.resource_id())
527
0
                        .tag("tablet_id", rs_meta.tablet_id())
528
0
                        .tag("rowset_id", rs_meta.rowset_id_v2());
529
0
                check_ret = -1;
530
0
                return;
531
0
            }
532
533
400
            std::unique_ptr<ListIterator> list_iter;
534
400
            int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
535
400
                                                      &list_iter);
536
400
            if (ret != 0) { // No need to log, because S3Accessor has logged this error
537
0
                check_ret = -1;
538
0
                return;
539
0
            }
540
541
18.3k
            for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
542
17.9k
                tablet_files_cache.files.insert(std::move(file->path));
543
17.9k
                tablet_volume += file->size;
544
17.9k
            }
545
400
            tablet_files_cache.tablet_id = rs_meta.tablet_id();
546
400
            instance_volume += tablet_volume;
547
400
        }
548
549
4.00k
        bool data_loss = false;
550
16.0k
        for (int i = 0; i < rs_meta.num_segments(); ++i) {
551
12.0k
            auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
552
12.0k
            if (tablet_files_cache.files.contains(path)) {
553
11.9k
                continue;
554
11.9k
            }
555
556
8
            if (1 == key_exist(txn_kv_.get(), key)) {
557
                // Rowset has been deleted instead of data loss
558
0
                break;
559
0
            }
560
8
            data_loss = true;
561
8
            TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
562
8
            LOG(WARNING) << "object not exist, path=" << path << " key=" << hex(key);
563
8
        }
564
565
4.00k
        if (data_loss) {
566
7
            ++num_rowset_loss;
567
7
        }
568
4.00k
    };
569
570
    // scan visible rowsets
571
12
    auto start_key = meta_rowset_key({instance_id_, 0, 0});
572
12
    auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
573
574
12
    std::unique_ptr<RangeGetIterator> it;
575
12
    do {
576
12
        std::unique_ptr<Transaction> txn;
577
12
        TxnErrorCode err = txn_kv_->create_txn(&txn);
578
12
        if (err != TxnErrorCode::TXN_OK) {
579
0
            LOG(WARNING) << "failed to init txn, err=" << err;
580
0
            return -1;
581
0
        }
582
583
12
        err = txn->get(start_key, end_key, &it);
584
12
        if (err != TxnErrorCode::TXN_OK) {
585
0
            LOG(WARNING) << "internal error, failed to get rowset meta, err=" << err;
586
0
            return -1;
587
0
        }
588
12
        num_scanned += it->size();
589
590
4.01k
        while (it->has_next() && !stopped()) {
591
4.00k
            auto [k, v] = it->next();
592
4.00k
            if (!it->has_next()) start_key = k;
593
594
4.00k
            doris::RowsetMetaCloudPB rs_meta;
595
4.00k
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
596
0
                ++num_rowset_loss;
597
0
                LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
598
0
                continue;
599
0
            }
600
4.00k
            check_rowset_objects(rs_meta, k);
601
4.00k
        }
602
12
        start_key.push_back('\x00'); // Update to next smallest key for iteration
603
12
    } while (it->more() && !stopped());
604
605
12
    return num_rowset_loss > 0 ? 1 : check_ret;
606
12
}
607
608
34
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
609
    // If there are multiple buckets, return the minimum lifecycle.
610
34
    int64_t min_lifecycle_days = INT64_MAX;
611
34
    int64_t tmp_liefcycle_days = 0;
612
34
    for (const auto& [id, accessor] : accessor_map_) {
613
34
        if (accessor->type() != AccessorType::S3) {
614
34
            continue;
615
34
        }
616
617
0
        auto* s3_accessor = static_cast<S3Accessor*>(accessor.get());
618
619
0
        if (s3_accessor->check_versioning() != 0) {
620
0
            return -1;
621
0
        }
622
623
0
        if (s3_accessor->get_life_cycle(&tmp_liefcycle_days) != 0) {
624
0
            return -1;
625
0
        }
626
627
0
        if (tmp_liefcycle_days < min_lifecycle_days) {
628
0
            min_lifecycle_days = tmp_liefcycle_days;
629
0
        }
630
0
    }
631
34
    *lifecycle_days = min_lifecycle_days;
632
34
    return 0;
633
34
}
634
635
1
int InstanceChecker::do_inverted_check() {
636
1
    if (accessor_map_.size() > 1) {
637
0
        LOG(INFO) << "currently not support inverted check for multi accessor. instance_id="
638
0
                  << instance_id_;
639
0
        return 0;
640
0
    }
641
642
1
    LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
643
1
    int check_ret = 0;
644
1
    long num_scanned = 0;
645
1
    long num_file_leak = 0;
646
1
    using namespace std::chrono;
647
1
    auto start_time = steady_clock::now();
648
1
    std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
649
1
        g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
650
1
        g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
651
1
        auto cost = duration<float>(steady_clock::now() - start_time).count();
652
1
        LOG(INFO) << "inverted check instance objects finished, cost=" << cost
653
1
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
654
1
                  << " num_file_leak=" << num_file_leak;
655
1
    });
656
657
1
    struct TabletRowsets {
658
1
        int64_t tablet_id {0};
659
1
        std::unordered_set<std::string> rowset_ids;
660
1
    };
661
1
    TabletRowsets tablet_rowsets_cache;
662
663
    // Return 0 if check success, return 1 if file is garbage data, negative if error occurred
664
1
    auto check_segment_file = [&](const std::string& obj_key) {
665
0
        std::vector<std::string> str;
666
0
        butil::SplitString(obj_key, '/', &str);
667
        // data/{tablet_id}/{rowset_id}_{seg_num}.dat
668
0
        if (str.size() < 3) {
669
0
            return -1;
670
0
        }
671
672
0
        int64_t tablet_id = atol(str[1].c_str());
673
0
        if (tablet_id <= 0) {
674
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
675
0
            return -1;
676
0
        }
677
678
0
        std::string rowset_id;
679
0
        if (auto pos = str.back().find('_'); pos != std::string::npos) {
680
0
            rowset_id = str.back().substr(0, pos);
681
0
        } else {
682
0
            LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
683
0
            return -1;
684
0
        }
685
686
0
        if (tablet_rowsets_cache.tablet_id == tablet_id) {
687
0
            if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
688
0
                return 0;
689
0
            } else {
690
0
                LOG(WARNING) << "rowset not exists, key=" << obj_key;
691
0
                return -1;
692
0
            }
693
0
        }
694
        // Get all rowset id of this tablet
695
0
        tablet_rowsets_cache.tablet_id = tablet_id;
696
0
        tablet_rowsets_cache.rowset_ids.clear();
697
0
        std::unique_ptr<Transaction> txn;
698
0
        TxnErrorCode err = txn_kv_->create_txn(&txn);
699
0
        if (err != TxnErrorCode::TXN_OK) {
700
0
            LOG(WARNING) << "failed to create txn";
701
0
            return -1;
702
0
        }
703
0
        std::unique_ptr<RangeGetIterator> it;
704
0
        auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
705
0
        auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
706
0
        do {
707
0
            TxnErrorCode err = txn->get(begin, end, &it);
708
0
            if (err != TxnErrorCode::TXN_OK) {
709
0
                LOG(WARNING) << "failed to get rowset kv, err=" << err;
710
0
                return -1;
711
0
            }
712
0
            if (!it->has_next()) {
713
0
                break;
714
0
            }
715
0
            while (it->has_next()) {
716
                // recycle corresponding resources
717
0
                auto [k, v] = it->next();
718
0
                doris::RowsetMetaCloudPB rowset;
719
0
                if (!rowset.ParseFromArray(v.data(), v.size())) {
720
0
                    LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
721
0
                    return -1;
722
0
                }
723
0
                tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
724
0
                if (!it->has_next()) {
725
0
                    begin = k;
726
0
                    begin.push_back('\x00'); // Update to next smallest key for iteration
727
0
                    break;
728
0
                }
729
0
            }
730
0
        } while (it->more() && !stopped());
731
732
0
        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
733
            // Garbage data leak
734
0
            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
735
0
            return 1;
736
0
        }
737
738
0
        return 0;
739
0
    };
740
741
    // TODO(Xiaocc): Currently we haven't implemented one generator-like s3 accessor list function
742
    // so we choose to skip here.
743
1
    TEST_SYNC_POINT_RETURN_WITH_VALUE("InstanceChecker::do_inverted_check", (int)0);
744
745
0
    for (auto& [_, accessor] : accessor_map_) {
746
0
        std::unique_ptr<ListIterator> list_iter;
747
0
        int ret = accessor->list_directory("data", &list_iter);
748
0
        if (ret != 0) {
749
0
            return -1;
750
0
        }
751
752
0
        for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
753
0
            ++num_scanned;
754
0
            int ret = check_segment_file(file->path);
755
0
            if (ret != 0) {
756
0
                LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
757
0
                             << " path=" << file->path;
758
0
                if (ret == 1) {
759
0
                    ++num_file_leak;
760
0
                } else {
761
0
                    check_ret = -1;
762
0
                }
763
0
            }
764
0
        }
765
766
0
        if (!list_iter->is_valid()) {
767
0
            LOG(WARNING) << "failed to list data directory. uri=" << accessor->uri();
768
0
            return -1;
769
0
        }
770
0
    }
771
0
    return num_file_leak > 0 ? 1 : check_ret;
772
0
}
773
774
5
int InstanceChecker::traverse_mow_tablet(const std::function<int(int64_t)>& check_func) {
775
5
    std::unique_ptr<RangeGetIterator> it;
776
5
    auto begin = meta_rowset_key({instance_id_, 0, 0});
777
5
    auto end = meta_rowset_key({instance_id_, std::numeric_limits<int64_t>::max(), 0});
778
79
    do {
779
79
        std::unique_ptr<Transaction> txn;
780
79
        TxnErrorCode err = txn_kv_->create_txn(&txn);
781
79
        if (err != TxnErrorCode::TXN_OK) {
782
0
            LOG(WARNING) << "failed to create txn";
783
0
            return -1;
784
0
        }
785
79
        err = txn->get(begin, end, &it, false, 1);
786
79
        if (err != TxnErrorCode::TXN_OK) {
787
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
788
0
            return -1;
789
0
        }
790
79
        if (!it->has_next()) {
791
5
            break;
792
5
        }
793
148
        while (it->has_next() && !stopped()) {
794
74
            auto [k, v] = it->next();
795
74
            std::string_view k1 = k;
796
74
            k1.remove_prefix(1);
797
74
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
798
74
            decode_key(&k1, &out);
799
            // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
800
74
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
801
802
74
            if (!it->has_next()) {
803
                // Update to next smallest key for iteration
804
                // scan for next tablet in this instance
805
74
                begin = meta_rowset_key({instance_id_, tablet_id + 1, 0});
806
74
            }
807
808
74
            TabletMetaCloudPB tablet_meta;
809
74
            int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
810
74
            if (ret < 0) {
811
0
                LOG(WARNING) << fmt::format(
812
0
                        "failed to get_tablet_meta in do_delete_bitmap_integrity_check(), "
813
0
                        "instance_id={}, tablet_id={}",
814
0
                        instance_id_, tablet_id);
815
0
                return ret;
816
0
            }
817
818
74
            if (tablet_meta.enable_unique_key_merge_on_write()) {
819
                // only check merge-on-write table
820
54
                int ret = check_func(tablet_id);
821
54
                if (ret < 0) {
822
                    // return immediately when encounter unexpected error,
823
                    // otherwise, we continue to check the next tablet
824
0
                    return ret;
825
0
                }
826
54
            }
827
74
        }
828
74
    } while (it->more() && !stopped());
829
5
    return 0;
830
5
}
831
832
int InstanceChecker::traverse_rowset_delete_bitmaps(
833
        int64_t tablet_id, std::string rowset_id,
834
60
        const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback) {
835
60
    std::unique_ptr<RangeGetIterator> it;
836
60
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, 0, 0});
837
60
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id,
838
60
                                       std::numeric_limits<int64_t>::max(),
839
60
                                       std::numeric_limits<int64_t>::max()});
840
60
    do {
841
60
        std::unique_ptr<Transaction> txn;
842
60
        TxnErrorCode err = txn_kv_->create_txn(&txn);
843
60
        if (err != TxnErrorCode::TXN_OK) {
844
0
            LOG(WARNING) << "failed to create txn";
845
0
            return -1;
846
0
        }
847
60
        err = txn->get(begin, end, &it);
848
60
        if (err != TxnErrorCode::TXN_OK) {
849
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
850
0
            return -1;
851
0
        }
852
60
        if (!it->has_next()) {
853
0
            break;
854
0
        }
855
150
        while (it->has_next() && !stopped()) {
856
150
            auto [k, v] = it->next();
857
150
            std::string_view k1 = k;
858
150
            k1.remove_prefix(1);
859
150
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
860
150
            decode_key(&k1, &out);
861
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
862
150
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
863
150
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
864
865
150
            int ret = callback(tablet_id, rowset_id, version, segment_id);
866
150
            if (ret != 0) {
867
10
                return ret;
868
10
            }
869
870
140
            if (!it->has_next()) {
871
50
                begin = k;
872
50
                begin.push_back('\x00'); // Update to next smallest key for iteration
873
50
                break;
874
50
            }
875
140
        }
876
60
    } while (it->more() && !stopped());
877
878
50
    return 0;
879
60
}
880
881
int InstanceChecker::collect_tablet_rowsets(
882
74
        int64_t tablet_id, const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb) {
883
74
    std::unique_ptr<Transaction> txn;
884
74
    TxnErrorCode err = txn_kv_->create_txn(&txn);
885
74
    if (err != TxnErrorCode::TXN_OK) {
886
0
        LOG(WARNING) << "failed to create txn";
887
0
        return -1;
888
0
    }
889
74
    std::unique_ptr<RangeGetIterator> it;
890
74
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
891
74
    auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0});
892
893
74
    int64_t rowsets_num {0};
894
74
    do {
895
74
        TxnErrorCode err = txn->get(begin, end, &it);
896
74
        if (err != TxnErrorCode::TXN_OK) {
897
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
898
0
            return -1;
899
0
        }
900
74
        if (!it->has_next()) {
901
0
            break;
902
0
        }
903
564
        while (it->has_next() && !stopped()) {
904
564
            auto [k, v] = it->next();
905
564
            doris::RowsetMetaCloudPB rowset;
906
564
            if (!rowset.ParseFromArray(v.data(), v.size())) {
907
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
908
0
                return -1;
909
0
            }
910
911
564
            ++rowsets_num;
912
564
            collect_cb(rowset);
913
914
564
            if (!it->has_next()) {
915
74
                begin = k;
916
74
                begin.push_back('\x00'); // Update to next smallest key for iteration
917
74
                break;
918
74
            }
919
564
        }
920
74
    } while (it->more() && !stopped());
921
922
74
    LOG(INFO) << fmt::format(
923
74
            "[delete bitmap checker] successfully collect rowsets for instance_id={}, "
924
74
            "tablet_id={}, rowsets_num={}",
925
74
            instance_id_, tablet_id, rowsets_num);
926
74
    return 0;
927
74
}
928
929
2
int InstanceChecker::do_delete_bitmap_inverted_check() {
930
2
    LOG(INFO) << fmt::format(
931
2
            "[delete bitmap checker] begin to do_delete_bitmap_inverted_check for instance_id={}",
932
2
            instance_id_);
933
934
    // number of delete bitmap keys being scanned
935
2
    int64_t total_delete_bitmap_keys {0};
936
    // number of delete bitmaps which belongs to non mow tablet
937
2
    int64_t abnormal_delete_bitmaps {0};
938
    // number of delete bitmaps which doesn't have corresponding rowset in MS
939
2
    int64_t leaked_delete_bitmaps {0};
940
941
2
    auto start_time = std::chrono::steady_clock::now();
942
2
    std::unique_ptr<int, std::function<void(int*)>> defer_log_statistics((int*)0x01, [&](int*) {
943
2
        g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_, leaked_delete_bitmaps);
944
2
        g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_, abnormal_delete_bitmaps);
945
2
        g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_, total_delete_bitmap_keys);
946
947
2
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
948
2
                            std::chrono::steady_clock::now() - start_time)
949
2
                            .count();
950
2
        if (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) {
951
1
            LOG(WARNING) << fmt::format(
952
1
                    "[delete bitmap check fails] delete bitmap inverted check for instance_id={}, "
953
1
                    "cost={} ms, total_delete_bitmap_keys={}, leaked_delete_bitmaps={}, "
954
1
                    "abnormal_delete_bitmaps={}",
955
1
                    instance_id_, cost, total_delete_bitmap_keys, leaked_delete_bitmaps,
956
1
                    abnormal_delete_bitmaps);
957
1
        } else {
958
1
            LOG(INFO) << fmt::format(
959
1
                    "[delete bitmap checker] delete bitmap inverted check for instance_id={}, "
960
1
                    "passed. cost={} ms, total_delete_bitmap_keys={}",
961
1
                    instance_id_, cost, total_delete_bitmap_keys);
962
1
        }
963
2
    });
964
965
2
    struct TabletsRowsetsCache {
966
2
        int64_t tablet_id {-1};
967
2
        bool enable_merge_on_write {false};
968
2
        std::unordered_set<std::string> rowsets {};
969
2
        std::unordered_set<std::string> pending_delete_bitmaps {};
970
2
    } tablet_rowsets_cache {};
971
972
2
    std::unique_ptr<RangeGetIterator> it;
973
2
    auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0});
974
2
    auto end =
975
2
            meta_delete_bitmap_key({instance_id_, std::numeric_limits<int64_t>::max(), "", 0, 0});
976
2
    do {
977
2
        std::unique_ptr<Transaction> txn;
978
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
979
2
        if (err != TxnErrorCode::TXN_OK) {
980
0
            LOG(WARNING) << "failed to create txn";
981
0
            return -1;
982
0
        }
983
2
        err = txn->get(begin, end, &it);
984
2
        if (err != TxnErrorCode::TXN_OK) {
985
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
986
0
            return -1;
987
0
        }
988
2
        if (!it->has_next()) {
989
0
            break;
990
0
        }
991
502
        while (it->has_next() && !stopped()) {
992
500
            auto [k, v] = it->next();
993
500
            std::string_view k1 = k;
994
500
            k1.remove_prefix(1);
995
500
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
996
500
            decode_key(&k1, &out);
997
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
998
500
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
999
500
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1000
500
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1001
500
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1002
1003
500
            ++total_delete_bitmap_keys;
1004
1005
500
            if (!it->has_next()) {
1006
2
                begin = k;
1007
2
                begin.push_back('\x00'); // Update to next smallest key for iteration
1008
2
            }
1009
1010
500
            if (tablet_rowsets_cache.tablet_id == -1 ||
1011
500
                tablet_rowsets_cache.tablet_id != tablet_id) {
1012
30
                TabletMetaCloudPB tablet_meta;
1013
30
                int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1014
30
                if (ret < 0) {
1015
0
                    LOG(WARNING) << fmt::format(
1016
0
                            "[delete bitmap checker] failed to get_tablet_meta in "
1017
0
                            "do_delete_bitmap_inverted_check(), instance_id={}, tablet_id={}",
1018
0
                            instance_id_, tablet_id);
1019
0
                    return ret;
1020
0
                }
1021
1022
30
                tablet_rowsets_cache.tablet_id = tablet_id;
1023
30
                tablet_rowsets_cache.enable_merge_on_write =
1024
30
                        tablet_meta.enable_unique_key_merge_on_write();
1025
30
                tablet_rowsets_cache.rowsets.clear();
1026
30
                tablet_rowsets_cache.pending_delete_bitmaps.clear();
1027
1028
30
                if (tablet_rowsets_cache.enable_merge_on_write) {
1029
                    // only collect rowsets for merge-on-write tablet
1030
20
                    auto collect_cb =
1031
199
                            [&tablet_rowsets_cache](const doris::RowsetMetaCloudPB& rowset) {
1032
199
                                tablet_rowsets_cache.rowsets.insert(rowset.rowset_id_v2());
1033
199
                            };
1034
20
                    ret = collect_tablet_rowsets(tablet_id, collect_cb);
1035
20
                    if (ret < 0) {
1036
0
                        return ret;
1037
0
                    }
1038
                    // get pending delete bitmaps
1039
20
                    ret = get_pending_delete_bitmap_keys(
1040
20
                            tablet_id, tablet_rowsets_cache.pending_delete_bitmaps);
1041
20
                    if (ret < 0) {
1042
0
                        return ret;
1043
0
                    }
1044
20
                }
1045
30
            }
1046
500
            DCHECK_EQ(tablet_id, tablet_rowsets_cache.tablet_id);
1047
1048
500
            if (!tablet_rowsets_cache.enable_merge_on_write) {
1049
                // clang-format off
1050
40
                TEST_SYNC_POINT_CALLBACK(
1051
40
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
1052
40
                        &tablet_id, &rowset_id, &version, &segment_id);
1053
                // clang-format on
1054
40
                ++abnormal_delete_bitmaps;
1055
                // log an error and continue to check the next delete bitmap
1056
40
                LOG(WARNING) << fmt::format(
1057
40
                        "[delete bitmap check fails] find a delete bitmap belongs to tablet "
1058
40
                        "which is not a merge-on-write table! instance_id={}, tablet_id={}, "
1059
40
                        "version={}, segment_id={}",
1060
40
                        instance_id_, tablet_id, version, segment_id);
1061
40
                continue;
1062
40
            }
1063
1064
460
            if (!tablet_rowsets_cache.rowsets.contains(rowset_id) &&
1065
460
                !tablet_rowsets_cache.pending_delete_bitmaps.contains(std::string(k))) {
1066
170
                TEST_SYNC_POINT_CALLBACK(
1067
170
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap",
1068
170
                        &tablet_id, &rowset_id, &version, &segment_id);
1069
170
                ++leaked_delete_bitmaps;
1070
                // log an error and continue to check the next delete bitmap
1071
170
                LOG(WARNING) << fmt::format(
1072
170
                        "[delete bitmap check fails] can't find corresponding rowset for delete "
1073
170
                        "bitmap instance_id={}, tablet_id={}, rowset_id={}, version={}, "
1074
170
                        "segment_id={}",
1075
170
                        instance_id_, tablet_id, rowset_id, version, segment_id);
1076
170
            }
1077
460
        }
1078
2
    } while (it->more() && !stopped());
1079
1080
2
    return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0;
1081
2
}
1082
1083
25
int InstanceChecker::check_delete_bitmap_storage_optimize(int64_t tablet_id) {
1084
25
    using Version = std::pair<int64_t, int64_t>;
1085
25
    struct RowsetDigest {
1086
25
        std::string rowset_id;
1087
25
        Version version;
1088
25
        doris::SegmentsOverlapPB segments_overlap;
1089
1090
300
        bool operator<(const RowsetDigest& other) const {
1091
300
            return version.first < other.version.first;
1092
300
        }
1093
1094
115
        bool produced_by_compaction() const {
1095
115
            return (version.first < version.second) ||
1096
115
                   ((version.first == version.second) && segments_overlap == NONOVERLAPPING);
1097
115
        }
1098
25
    };
1099
1100
    // number of rowsets which may have problems
1101
25
    int64_t abnormal_rowsets_num {0};
1102
1103
25
    std::vector<RowsetDigest> tablet_rowsets {};
1104
    // Get all visible rowsets of this tablet
1105
175
    auto collect_cb = [&tablet_rowsets](const doris::RowsetMetaCloudPB& rowset) {
1106
175
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1107
            // ignore dummy rowset [0-1]
1108
0
            return;
1109
0
        }
1110
175
        tablet_rowsets.emplace_back(
1111
175
                rowset.rowset_id_v2(),
1112
175
                std::make_pair<int64_t, int64_t>(rowset.start_version(), rowset.end_version()),
1113
175
                rowset.segments_overlap_pb());
1114
175
    };
1115
25
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1116
0
        return ret;
1117
0
    }
1118
1119
25
    std::sort(tablet_rowsets.begin(), tablet_rowsets.end());
1120
1121
    // find right-most rowset which is produced by compaction
1122
25
    auto it = std::find_if(
1123
25
            tablet_rowsets.crbegin(), tablet_rowsets.crend(),
1124
115
            [](const RowsetDigest& rowset) { return rowset.produced_by_compaction(); });
1125
25
    if (it == tablet_rowsets.crend()) {
1126
5
        LOG(INFO) << fmt::format(
1127
5
                "[delete bitmap checker] skip to check delete bitmap storage optimize for "
1128
5
                "tablet_id={} because it doesn't have compacted rowsets.",
1129
5
                tablet_id);
1130
5
        return 0;
1131
5
    }
1132
1133
20
    int64_t start_version = it->version.first;
1134
20
    int64_t pre_min_version = it->version.second;
1135
1136
    // after BE sweeping stale rowsets, all rowsets in this tablet before
1137
    // should not have delete bitmaps with versions lower than `pre_min_version`
1138
20
    if (config::delete_bitmap_storage_optimize_check_version_gap > 0) {
1139
0
        pre_min_version -= config::delete_bitmap_storage_optimize_check_version_gap;
1140
0
        if (pre_min_version <= 1) {
1141
0
            LOG(INFO) << fmt::format(
1142
0
                    "[delete bitmap checker] skip to check delete bitmap storage optimize for "
1143
0
                    "tablet_id={} because pre_min_version is too small.",
1144
0
                    tablet_id);
1145
0
            return 0;
1146
0
        }
1147
0
    }
1148
1149
20
    auto check_func = [pre_min_version, instance_id = instance_id_](
1150
20
                              int64_t tablet_id, std::string_view rowset_id, int64_t version,
1151
150
                              int64_t segment_id) -> int {
1152
150
        if (version < pre_min_version) {
1153
10
            LOG(WARNING) << fmt::format(
1154
10
                    "[delete bitmap check fails] delete bitmap storage optimize check fail for "
1155
10
                    "instance_id={}, tablet_id={}, rowset_id={}, found delete bitmap with "
1156
10
                    "version={} < pre_min_version={}",
1157
10
                    instance_id, tablet_id, rowset_id, version, pre_min_version);
1158
10
            return 1;
1159
10
        }
1160
140
        return 0;
1161
150
    };
1162
1163
135
    for (const auto& rowset : tablet_rowsets) {
1164
        // check for all rowsets before the max compacted rowset
1165
135
        if (rowset.version.second < start_version) {
1166
60
            auto rowset_id = rowset.rowset_id;
1167
60
            int ret = traverse_rowset_delete_bitmaps(tablet_id, rowset_id, check_func);
1168
60
            if (ret < 0) {
1169
0
                return ret;
1170
0
            }
1171
1172
60
            if (ret != 0) {
1173
10
                ++abnormal_rowsets_num;
1174
10
                TEST_SYNC_POINT_CALLBACK(
1175
10
                        "InstanceChecker::check_delete_bitmap_storage_optimize.get_abnormal_rowset",
1176
10
                        &tablet_id, &rowset_id);
1177
10
            }
1178
60
        }
1179
135
    }
1180
1181
20
    LOG(INFO) << fmt::format(
1182
20
            "[delete bitmap checker] finish check delete bitmap storage optimize for "
1183
20
            "instance_id={}, tablet_id={}, rowsets_num={}, abnormal_rowsets_num={}, "
1184
20
            "pre_min_version={}",
1185
20
            instance_id_, tablet_id, tablet_rowsets.size(), abnormal_rowsets_num, pre_min_version);
1186
1187
20
    return (abnormal_rowsets_num > 1 ? 1 : 0);
1188
20
}
1189
1190
int InstanceChecker::get_pending_delete_bitmap_keys(
1191
49
        int64_t tablet_id, std::unordered_set<std::string>& pending_delete_bitmaps) {
1192
49
    std::unique_ptr<Transaction> txn;
1193
49
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1194
49
    if (err != TxnErrorCode::TXN_OK) {
1195
0
        LOG(WARNING) << "failed to create txn";
1196
0
        return -1;
1197
0
    }
1198
49
    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
1199
49
    std::string pending_val;
1200
49
    err = txn->get(pending_key, &pending_val);
1201
49
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1202
0
        LOG(WARNING) << "failed to get pending delete bitmap kv, err=" << err;
1203
0
        return -1;
1204
0
    }
1205
49
    if (err == TxnErrorCode::TXN_OK) {
1206
2
        PendingDeleteBitmapPB pending_info;
1207
2
        if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
1208
0
            LOG(WARNING) << "failed to parse PendingDeleteBitmapPB, tablet=" << tablet_id;
1209
0
            return -1;
1210
0
        }
1211
12
        for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) {
1212
12
            pending_delete_bitmaps.emplace(std::string(delete_bitmap_key));
1213
12
        }
1214
2
    }
1215
49
    return 0;
1216
49
}
1217
1218
int InstanceChecker::check_delete_bitmap_storage_optimize_v2(
1219
29
        int64_t tablet_id, int64_t& rowsets_with_useless_delete_bitmap_version) {
1220
    // end_version: create_time
1221
29
    std::map<int64_t, int64_t> tablet_rowsets_map {};
1222
    // rowset_id: {start_version, end_version}
1223
29
    std::map<std::string, std::pair<int64_t, int64_t>> rowset_version_map;
1224
    // Get all visible rowsets of this tablet
1225
190
    auto collect_cb = [&](const doris::RowsetMetaCloudPB& rowset) {
1226
190
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1227
            // ignore dummy rowset [0-1]
1228
0
            return;
1229
0
        }
1230
190
        tablet_rowsets_map[rowset.end_version()] = rowset.creation_time();
1231
190
        rowset_version_map[rowset.rowset_id_v2()] =
1232
190
                std::make_pair(rowset.start_version(), rowset.end_version());
1233
190
    };
1234
29
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1235
0
        return ret;
1236
0
    }
1237
1238
29
    std::unordered_set<std::string> pending_delete_bitmaps;
1239
29
    if (auto ret = get_pending_delete_bitmap_keys(tablet_id, pending_delete_bitmaps); ret < 0) {
1240
0
        return ret;
1241
0
    }
1242
1243
29
    std::unique_ptr<RangeGetIterator> it;
1244
29
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
1245
29
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
1246
29
    std::string last_rowset_id = "";
1247
29
    int64_t last_version = 0;
1248
29
    int64_t last_failed_version = 0;
1249
29
    std::vector<int64_t> failed_versions;
1250
29
    auto print_failed_versions = [&]() {
1251
4
        TEST_SYNC_POINT_CALLBACK(
1252
4
                "InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_"
1253
4
                "rowset",
1254
4
                &tablet_id, &last_rowset_id);
1255
4
        rowsets_with_useless_delete_bitmap_version++;
1256
        // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18]
1257
        // print as [8-11, 13, 17-18]
1258
4
        int64_t last_start_version = -1;
1259
4
        int64_t last_end_version = -1;
1260
4
        std::stringstream ss;
1261
4
        ss << "[";
1262
9
        for (int64_t version : failed_versions) {
1263
9
            if (last_start_version == -1) {
1264
4
                last_start_version = version;
1265
4
                last_end_version = version;
1266
4
                continue;
1267
4
            }
1268
5
            if (last_end_version + 1 == version) {
1269
2
                last_end_version = version;
1270
3
            } else {
1271
3
                if (last_start_version == last_end_version) {
1272
3
                    ss << last_start_version << ", ";
1273
3
                } else {
1274
0
                    ss << last_start_version << "-" << last_end_version << ", ";
1275
0
                }
1276
3
                last_start_version = version;
1277
3
                last_end_version = version;
1278
3
            }
1279
5
        }
1280
4
        if (last_start_version == last_end_version) {
1281
3
            ss << last_start_version;
1282
3
        } else {
1283
1
            ss << last_start_version << "-" << last_end_version;
1284
1
        }
1285
4
        ss << "]";
1286
4
        std::stringstream version_str;
1287
4
        auto it = rowset_version_map.find(last_rowset_id);
1288
4
        if (it != rowset_version_map.end()) {
1289
4
            version_str << "[" << it->second.first << "-" << it->second.second << "]";
1290
4
        }
1291
4
        LOG(WARNING) << fmt::format(
1292
4
                "[delete bitmap check fails] delete bitmap storage optimize v2 check fail "
1293
4
                "for instance_id={}, tablet_id={}, rowset_id={}, version={} found delete "
1294
4
                "bitmap with versions={}, size={}",
1295
4
                instance_id_, tablet_id, last_rowset_id, version_str.str(), ss.str(),
1296
4
                failed_versions.size());
1297
4
    };
1298
29
    using namespace std::chrono;
1299
29
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
1300
29
    do {
1301
29
        std::unique_ptr<Transaction> txn;
1302
29
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1303
29
        if (err != TxnErrorCode::TXN_OK) {
1304
0
            LOG(WARNING) << "failed to create txn";
1305
0
            return -1;
1306
0
        }
1307
29
        err = txn->get(begin, end, &it);
1308
29
        if (err != TxnErrorCode::TXN_OK) {
1309
0
            LOG(WARNING) << "failed to get delete bitmap kv, err=" << err;
1310
0
            return -1;
1311
0
        }
1312
29
        if (!it->has_next()) {
1313
0
            break;
1314
0
        }
1315
722
        while (it->has_next() && !stopped()) {
1316
693
            auto [k, v] = it->next();
1317
693
            std::string_view k1 = k;
1318
693
            k1.remove_prefix(1);
1319
693
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1320
693
            decode_key(&k1, &out);
1321
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1322
693
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1323
693
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1324
693
            if (!it->has_next()) {
1325
29
                begin = k;
1326
29
                begin.push_back('\x00'); // Update to next smallest key for iteration
1327
29
            }
1328
693
            if (rowset_id == last_rowset_id && version == last_version) {
1329
                // skip the same rowset and version
1330
133
                continue;
1331
133
            }
1332
560
            if (rowset_id != last_rowset_id && !failed_versions.empty()) {
1333
3
                print_failed_versions();
1334
3
                last_failed_version = 0;
1335
3
                failed_versions.clear();
1336
3
            }
1337
560
            last_rowset_id = rowset_id;
1338
560
            last_version = version;
1339
560
            if (tablet_rowsets_map.find(version) != tablet_rowsets_map.end()) {
1340
539
                continue;
1341
539
            }
1342
21
            if (rowset_version_map.find(rowset_id) == rowset_version_map.end()) {
1343
                // checked in do_delete_bitmap_inverted_check
1344
1
                continue;
1345
1
            }
1346
20
            if (pending_delete_bitmaps.contains(std::string(k))) {
1347
3
                continue;
1348
3
            }
1349
            // there may be an interval in this situation:
1350
            // 1. finish compaction job; 2. checker; 3. finish agg and remove delete bitmap to ms
1351
17
            auto rowset_it = tablet_rowsets_map.upper_bound(version);
1352
17
            if (rowset_it == tablet_rowsets_map.end()) {
1353
1
                if (version != last_failed_version) {
1354
1
                    failed_versions.push_back(version);
1355
1
                }
1356
1
                last_failed_version = version;
1357
1
                continue;
1358
1
            }
1359
16
            if (rowset_it->second + config::delete_bitmap_storage_optimize_v2_check_skip_seconds >=
1360
16
                now) {
1361
8
                continue;
1362
8
            }
1363
8
            if (version != last_failed_version) {
1364
8
                failed_versions.push_back(version);
1365
8
            }
1366
8
            last_failed_version = version;
1367
8
        }
1368
29
    } while (it->more() && !stopped());
1369
29
    if (!failed_versions.empty()) {
1370
1
        print_failed_versions();
1371
1
    }
1372
29
    LOG(INFO) << fmt::format(
1373
29
            "[delete bitmap checker] finish check delete bitmap storage optimize v2 for "
1374
29
            "instance_id={}, tablet_id={}, rowsets_num={}, "
1375
29
            "rowsets_with_useless_delete_bitmap_version={}",
1376
29
            instance_id_, tablet_id, tablet_rowsets_map.size(),
1377
29
            rowsets_with_useless_delete_bitmap_version);
1378
29
    return (rowsets_with_useless_delete_bitmap_version > 1 ? 1 : 0);
1379
29
}
1380
1381
5
int InstanceChecker::do_delete_bitmap_storage_optimize_check(int version) {
1382
5
    int64_t total_tablets_num {0};
1383
5
    int64_t failed_tablets_num {0};
1384
1385
    // for v2 check
1386
5
    int64_t max_rowsets_with_useless_delete_bitmap_version = 0;
1387
5
    int64_t tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = 0;
1388
1389
    // check that for every visible rowset, there exists at least delete one bitmap in MS
1390
54
    int ret = traverse_mow_tablet([&](int64_t tablet_id) {
1391
54
        ++total_tablets_num;
1392
54
        int64_t rowsets_with_useless_delete_bitmap_version = 0;
1393
54
        int res = 0;
1394
54
        if (version == 1) {
1395
25
            res = check_delete_bitmap_storage_optimize(tablet_id);
1396
29
        } else if (version == 2) {
1397
29
            res = check_delete_bitmap_storage_optimize_v2(
1398
29
                    tablet_id, rowsets_with_useless_delete_bitmap_version);
1399
29
            if (rowsets_with_useless_delete_bitmap_version >
1400
29
                max_rowsets_with_useless_delete_bitmap_version) {
1401
1
                max_rowsets_with_useless_delete_bitmap_version =
1402
1
                        rowsets_with_useless_delete_bitmap_version;
1403
1
                tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = tablet_id;
1404
1
            }
1405
29
        } else {
1406
0
            return -1;
1407
0
        }
1408
54
        failed_tablets_num += (res != 0);
1409
54
        return res;
1410
54
    });
1411
1412
5
    if (ret < 0) {
1413
0
        return ret;
1414
0
    }
1415
1416
5
    if (version == 2) {
1417
3
        g_bvar_max_rowsets_with_useless_delete_bitmap_version.put(
1418
3
                instance_id_, max_rowsets_with_useless_delete_bitmap_version);
1419
3
    }
1420
1421
5
    std::stringstream ss;
1422
5
    ss << "[delete bitmap checker] check delete bitmap storage optimize v" << version
1423
5
       << " for instance_id=" << instance_id_ << ", total_tablets_num=" << total_tablets_num
1424
5
       << ", failed_tablets_num=" << failed_tablets_num;
1425
5
    if (version == 2) {
1426
3
        ss << ". max_rowsets_with_useless_delete_bitmap_version="
1427
3
           << max_rowsets_with_useless_delete_bitmap_version
1428
3
           << ", tablet_id=" << tablet_id_with_max_rowsets_with_useless_delete_bitmap_version;
1429
3
    }
1430
5
    LOG(INFO) << ss.str();
1431
1432
5
    return (failed_tablets_num > 0) ? 1 : 0;
1433
5
}
1434
1435
3
int InstanceChecker::do_mow_compaction_key_check() {
1436
3
    std::unique_ptr<RangeGetIterator> it;
1437
3
    std::string begin = mow_tablet_compaction_key({instance_id_, 0, 0});
1438
3
    std::string end = mow_tablet_compaction_key({instance_id_, INT64_MAX, 0});
1439
3
    MowTabletCompactionPB mow_tablet_compaction;
1440
3
    do {
1441
3
        std::unique_ptr<Transaction> txn;
1442
3
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1443
3
        if (err != TxnErrorCode::TXN_OK) {
1444
0
            LOG(WARNING) << "failed to create txn";
1445
0
            return -1;
1446
0
        }
1447
3
        err = txn->get(begin, end, &it);
1448
3
        if (err != TxnErrorCode::TXN_OK) {
1449
0
            LOG(WARNING) << "failed to get mow tablet compaction key, err=" << err;
1450
0
            return -1;
1451
0
        }
1452
3
        int64_t now = duration_cast<std::chrono::seconds>(
1453
3
                              std::chrono::system_clock::now().time_since_epoch())
1454
3
                              .count();
1455
3
        while (it->has_next() && !stopped()) {
1456
2
            auto [k, v] = it->next();
1457
2
            std::string_view k1 = k;
1458
2
            k1.remove_prefix(1);
1459
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1460
2
            decode_key(&k1, &out);
1461
            // 0x01 "meta" ${instance_id} "mow_tablet_comp" ${table_id} ${initiator}
1462
2
            auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1463
2
            auto initiator = std::get<int64_t>(std::get<0>(out[4]));
1464
2
            if (!mow_tablet_compaction.ParseFromArray(v.data(), v.size())) [[unlikely]] {
1465
0
                LOG(WARNING) << "failed to parse MowTabletCompactionPB";
1466
0
                return -1;
1467
0
            }
1468
2
            int64_t expiration = mow_tablet_compaction.expiration();
1469
            //check compaction key failed should meet both following two condition:
1470
            //1.compaction key is expired
1471
            //2.table lock key is not found or key is not expired
1472
2
            if (expiration < now - config::compaction_key_check_expiration_diff_seconds) {
1473
2
                std::string lock_key =
1474
2
                        meta_delete_bitmap_update_lock_key({instance_id_, table_id, -1});
1475
2
                std::string lock_val;
1476
2
                err = txn->get(lock_key, &lock_val);
1477
2
                std::string reason = "";
1478
2
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
1479
0
                    reason = "table lock key not found";
1480
1481
2
                } else {
1482
2
                    DeleteBitmapUpdateLockPB lock_info;
1483
2
                    if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
1484
0
                        LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB";
1485
0
                        return -1;
1486
0
                    }
1487
2
                    if (lock_info.expiration() > now || lock_info.lock_id() != -1) {
1488
2
                        reason = "table lock is not expired,lock_id=" +
1489
2
                                 std::to_string(lock_info.lock_id());
1490
2
                    }
1491
2
                }
1492
2
                if (reason != "") {
1493
2
                    LOG(WARNING) << fmt::format(
1494
2
                            "[compaction key check fails] compaction key check fail for "
1495
2
                            "instance_id={}, table_id={}, initiator={}, expiration={}, now={}, "
1496
2
                            "reason={}",
1497
2
                            instance_id_, table_id, initiator, expiration, now, reason);
1498
2
                    return -1;
1499
2
                }
1500
2
            }
1501
2
        }
1502
1
        begin = it->next_begin_key(); // Update to next smallest key for iteration
1503
1
    } while (it->more() && !stopped());
1504
1
    return 0;
1505
3
}
1506
1507
} // namespace doris::cloud