Coverage Report

Created: 2026-01-04 11:13

/root/doris/cloud/src/recycler/checker.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/checker.h"
19
20
#include <aws/s3/S3Client.h>
21
#include <aws/s3/model/ListObjectsV2Request.h>
22
#include <butil/endpoint.h>
23
#include <butil/strings/string_split.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/cloud.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <chrono>
31
#include <cstdint>
32
#include <functional>
33
#include <memory>
34
#include <mutex>
35
#include <numeric>
36
#include <sstream>
37
#include <string_view>
38
#include <unordered_set>
39
#include <vector>
40
41
#include "common/bvars.h"
42
#include "common/config.h"
43
#include "common/defer.h"
44
#include "common/encryption_util.h"
45
#include "common/logging.h"
46
#include "common/util.h"
47
#include "cpp/sync_point.h"
48
#include "meta-service/meta_service.h"
49
#include "meta-service/meta_service_schema.h"
50
#include "meta-service/meta_service_tablet_stats.h"
51
#include "meta-store/blob_message.h"
52
#include "meta-store/keys.h"
53
#include "meta-store/txn_kv.h"
54
#include "meta-store/txn_kv_error.h"
55
#ifdef ENABLE_HDFS_STORAGE_VAULT
56
#include "recycler/hdfs_accessor.h"
57
#endif
58
#include "recycler/s3_accessor.h"
59
#include "recycler/storage_vault_accessor.h"
60
#ifdef UNIT_TEST
61
#include "../test/mock_accessor.h"
62
#endif
63
#include "recycler/util.h"
64
65
namespace doris::cloud {
66
namespace config {
67
extern int32_t brpc_listen_port;
68
extern int32_t scan_instances_interval_seconds;
69
extern int32_t recycle_job_lease_expired_ms;
70
extern int32_t recycle_concurrency;
71
extern std::vector<std::string> recycle_whitelist;
72
extern std::vector<std::string> recycle_blacklist;
73
extern bool enable_inverted_check;
74
} // namespace config
75
76
using namespace std::chrono;
77
78
5
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
79
5
    ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port);
80
5
}
81
82
5
Checker::~Checker() {
83
5
    if (!stopped()) {
84
1
        stop();
85
1
    }
86
5
}
87
88
4
int Checker::start() {
89
4
    DCHECK(txn_kv_);
90
4
    instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
91
92
    // launch instance scanner
93
4
    auto scanner_func = [this]() {
94
4
        std::this_thread::sleep_for(
95
4
                std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
96
8
        while (!stopped()) {
97
4
            std::vector<InstanceInfoPB> instances;
98
4
            get_all_instances(txn_kv_.get(), instances);
99
4
            LOG(INFO) << "Checker get instances: " << [&instances] {
100
4
                std::stringstream ss;
101
30
                for (auto& i : instances) ss << ' ' << i.instance_id();
102
4
                return ss.str();
103
4
            }();
104
4
            if (!instances.empty()) {
105
                // enqueue instances
106
3
                std::lock_guard lock(mtx_);
107
30
                for (auto& instance : instances) {
108
30
                    if (instance_filter_.filter_out(instance.instance_id())) continue;
109
30
                    if (instance.status() == InstanceInfoPB::DELETED) continue;
110
30
                    using namespace std::chrono;
111
30
                    auto enqueue_time_s =
112
30
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
113
30
                    auto [_, success] =
114
30
                            pending_instance_map_.insert({instance.instance_id(), enqueue_time_s});
115
                    // skip instance already in pending queue
116
30
                    if (success) {
117
30
                        pending_instance_queue_.push_back(std::move(instance));
118
30
                    }
119
30
                }
120
3
                pending_instance_cond_.notify_all();
121
3
            }
122
4
            {
123
4
                std::unique_lock lock(mtx_);
124
4
                notifier_.wait_for(lock,
125
4
                                   std::chrono::seconds(config::scan_instances_interval_seconds),
126
7
                                   [&]() { return stopped(); });
127
4
            }
128
4
        }
129
4
    };
130
4
    workers_.emplace_back(scanner_func);
131
    // Launch lease thread
132
4
    workers_.emplace_back([this] { lease_check_jobs(); });
133
    // Launch inspect thread
134
4
    workers_.emplace_back([this] { inspect_instance_check_interval(); });
135
136
    // launch check workers
137
8
    auto checker_func = [this]() {
138
38
        while (!stopped()) {
139
            // fetch instance to check
140
36
            InstanceInfoPB instance;
141
36
            long enqueue_time_s = 0;
142
36
            {
143
36
                std::unique_lock lock(mtx_);
144
48
                pending_instance_cond_.wait(lock, [&]() -> bool {
145
48
                    return !pending_instance_queue_.empty() || stopped();
146
48
                });
147
36
                if (stopped()) {
148
6
                    return;
149
6
                }
150
30
                instance = std::move(pending_instance_queue_.front());
151
30
                pending_instance_queue_.pop_front();
152
30
                enqueue_time_s = pending_instance_map_[instance.instance_id()];
153
30
                pending_instance_map_.erase(instance.instance_id());
154
30
            }
155
0
            const auto& instance_id = instance.instance_id();
156
30
            {
157
30
                std::lock_guard lock(mtx_);
158
                // skip instance in recycling
159
30
                if (working_instance_map_.count(instance_id)) continue;
160
30
            }
161
30
            auto checker = std::make_shared<InstanceChecker>(txn_kv_, instance.instance_id());
162
30
            if (checker->init(instance) != 0) {
163
0
                LOG(WARNING) << "failed to init instance checker, instance_id="
164
0
                             << instance.instance_id();
165
0
                continue;
166
0
            }
167
30
            std::string check_job_key;
168
30
            job_check_key({instance.instance_id()}, &check_job_key);
169
30
            int ret = prepare_instance_recycle_job(txn_kv_.get(), check_job_key,
170
30
                                                   instance.instance_id(), ip_port_,
171
30
                                                   config::check_object_interval_seconds * 1000);
172
30
            if (ret != 0) { // Prepare failed
173
20
                continue;
174
20
            } else {
175
10
                std::lock_guard lock(mtx_);
176
10
                working_instance_map_.emplace(instance_id, checker);
177
10
            }
178
10
            if (stopped()) return;
179
10
            using namespace std::chrono;
180
10
            auto ctime_ms =
181
10
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
182
10
            g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
183
184
10
            bool success {true};
185
186
10
            if (int ret = checker->do_check(); ret != 0) {
187
0
                success = false;
188
0
            }
189
190
10
            if (config::enable_inverted_check) {
191
0
                if (int ret = checker->do_inverted_check(); ret != 0) {
192
0
                    success = false;
193
0
                }
194
0
            }
195
196
10
            if (config::enable_delete_bitmap_inverted_check) {
197
0
                if (int ret = checker->do_delete_bitmap_inverted_check(); ret != 0) {
198
0
                    success = false;
199
0
                }
200
0
            }
201
202
10
            if (config::enable_mow_job_key_check) {
203
0
                if (int ret = checker->do_mow_job_key_check(); ret != 0) {
204
0
                    success = false;
205
0
                }
206
0
            }
207
208
10
            if (config::enable_tablet_stats_key_check) {
209
0
                if (int ret = checker->do_tablet_stats_key_check(); ret != 0) {
210
0
                    success = false;
211
0
                }
212
0
            }
213
214
10
            if (config::enable_restore_job_check) {
215
0
                if (int ret = checker->do_restore_job_check(); ret != 0) {
216
0
                    success = false;
217
0
                }
218
0
            }
219
220
10
            if (config::enable_txn_key_check) {
221
0
                if (int ret = checker->do_txn_key_check(); ret != 0) {
222
0
                    success = false;
223
0
                }
224
0
            }
225
226
10
            if (config::enable_meta_rowset_key_check) {
227
0
                if (int ret = checker->do_meta_rowset_key_check(); ret != 0) {
228
0
                    success = false;
229
0
                }
230
0
            }
231
232
10
            if (config::enable_delete_bitmap_storage_optimize_v2_check) {
233
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
234
0
                    ret != 0) {
235
0
                    success = false;
236
0
                }
237
0
            }
238
239
            // If instance checker has been aborted, don't finish this job
240
10
            if (!checker->stopped()) {
241
10
                finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
242
10
                                            ip_port_, success, ctime_ms);
243
10
            }
244
10
            {
245
10
                std::lock_guard lock(mtx_);
246
10
                working_instance_map_.erase(instance.instance_id());
247
10
            }
248
10
        }
249
8
    };
250
4
    int num_threads = config::recycle_concurrency; // FIXME: use a new config entry?
251
12
    for (int i = 0; i < num_threads; ++i) {
252
8
        workers_.emplace_back(checker_func);
253
8
    }
254
4
    return 0;
255
4
}
256
257
5
void Checker::stop() {
258
5
    stopped_ = true;
259
5
    notifier_.notify_all();
260
5
    pending_instance_cond_.notify_all();
261
5
    {
262
5
        std::lock_guard lock(mtx_);
263
5
        for (auto& [_, checker] : working_instance_map_) {
264
0
            checker->stop();
265
0
        }
266
5
    }
267
20
    for (auto& w : workers_) {
268
20
        if (w.joinable()) w.join();
269
20
    }
270
5
}
271
272
4
void Checker::lease_check_jobs() {
273
55
    while (!stopped()) {
274
51
        std::vector<std::string> instances;
275
51
        instances.reserve(working_instance_map_.size());
276
51
        {
277
51
            std::lock_guard lock(mtx_);
278
51
            for (auto& [id, _] : working_instance_map_) {
279
30
                instances.push_back(id);
280
30
            }
281
51
        }
282
51
        for (auto& i : instances) {
283
30
            std::string check_job_key;
284
30
            job_check_key({i}, &check_job_key);
285
30
            int ret = lease_instance_recycle_job(txn_kv_.get(), check_job_key, i, ip_port_);
286
30
            if (ret == 1) {
287
0
                std::lock_guard lock(mtx_);
288
0
                if (auto it = working_instance_map_.find(i); it != working_instance_map_.end()) {
289
0
                    it->second->stop();
290
0
                }
291
0
            }
292
30
        }
293
51
        {
294
51
            std::unique_lock lock(mtx_);
295
51
            notifier_.wait_for(lock,
296
51
                               std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
297
102
                               [&]() { return stopped(); });
298
51
        }
299
51
    }
300
4
}
301
0
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
302
34
void Checker::do_inspect(const InstanceInfoPB& instance) {
303
34
    std::string check_job_key = job_check_key({instance.instance_id()});
304
34
    std::unique_ptr<Transaction> txn;
305
34
    std::string val;
306
34
    TxnErrorCode err = txn_kv_->create_txn(&txn);
307
34
    if (err != TxnErrorCode::TXN_OK) {
308
0
        LOG_CHECK_INTERVAL_ALARM << "failed to create txn";
309
0
        return;
310
0
    }
311
34
    err = txn->get(check_job_key, &val);
312
34
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
313
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get kv, err=" << err
314
0
                                 << " key=" << hex(check_job_key);
315
0
        return;
316
0
    }
317
34
    auto checker = InstanceChecker(txn_kv_, instance.instance_id());
318
34
    if (checker.init(instance) != 0) {
319
0
        LOG_CHECK_INTERVAL_ALARM << "failed to init instance checker, instance_id="
320
0
                                 << instance.instance_id();
321
0
        return;
322
0
    }
323
324
34
    int64_t bucket_lifecycle_days = 0;
325
34
    if (checker.get_bucket_lifecycle(&bucket_lifecycle_days) != 0) {
326
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get bucket lifecycle, instance_id="
327
0
                                 << instance.instance_id();
328
0
        return;
329
0
    }
330
34
    DCHECK(bucket_lifecycle_days > 0);
331
332
34
    if (bucket_lifecycle_days == INT64_MAX) {
333
        // No s3 bucket (may all accessors are HdfsAccessor), skip inspect
334
34
        return;
335
34
    }
336
337
0
    int64_t last_ctime_ms = -1;
338
0
    auto job_status = JobRecyclePB::IDLE;
339
0
    auto has_last_ctime = [&]() {
340
0
        JobRecyclePB job_info;
341
0
        if (!job_info.ParseFromString(val)) {
342
0
            LOG_CHECK_INTERVAL_ALARM << "failed to parse JobRecyclePB, key=" << hex(check_job_key);
343
0
        }
344
0
        DCHECK(job_info.instance_id() == instance.instance_id());
345
0
        if (!job_info.has_last_ctime_ms()) return false;
346
0
        last_ctime_ms = job_info.last_ctime_ms();
347
0
        job_status = job_info.status();
348
0
        g_bvar_checker_last_success_time_ms.put(instance.instance_id(),
349
0
                                                job_info.last_success_time_ms());
350
0
        return true;
351
0
    };
352
0
    using namespace std::chrono;
353
0
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
354
0
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || !has_last_ctime()) {
355
        // Use instance's ctime for instances that do not have job's last ctime
356
0
        last_ctime_ms = instance.ctime();
357
0
    }
358
0
    DCHECK(now - last_ctime_ms >= 0);
359
0
    int64_t expiration_ms =
360
0
            bucket_lifecycle_days > config::reserved_buffer_days
361
0
                    ? (bucket_lifecycle_days - config::reserved_buffer_days) * 86400000
362
0
                    : bucket_lifecycle_days * 86400000;
363
0
    TEST_SYNC_POINT_CALLBACK("Checker:do_inspect", &last_ctime_ms);
364
0
    if (now - last_ctime_ms >= expiration_ms) {
365
0
        LOG_CHECK_INTERVAL_ALARM << "check risks, instance_id: " << instance.instance_id()
366
0
                                 << " last_ctime_ms: " << last_ctime_ms
367
0
                                 << " job_status: " << job_status
368
0
                                 << " bucket_lifecycle_days: " << bucket_lifecycle_days
369
0
                                 << " reserved_buffer_days: " << config::reserved_buffer_days
370
0
                                 << " expiration_ms: " << expiration_ms;
371
0
    }
372
0
}
373
#undef LOG_CHECK_INTERVAL_ALARM
374
4
void Checker::inspect_instance_check_interval() {
375
8
    while (!stopped()) {
376
4
        LOG(INFO) << "start to inspect instance check interval";
377
4
        std::vector<InstanceInfoPB> instances;
378
4
        get_all_instances(txn_kv_.get(), instances);
379
30
        for (const auto& instance : instances) {
380
30
            if (instance_filter_.filter_out(instance.instance_id())) continue;
381
30
            if (stopped()) return;
382
30
            if (instance.status() == InstanceInfoPB::DELETED) continue;
383
30
            do_inspect(instance);
384
30
        }
385
4
        {
386
4
            std::unique_lock lock(mtx_);
387
4
            notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
388
7
                               [&]() { return stopped(); });
389
4
        }
390
4
    }
391
4
}
392
393
// return 0 for success get a key, 1 for key not found, negative for error
394
22
int key_exist(TxnKv* txn_kv, std::string_view key) {
395
22
    std::unique_ptr<Transaction> txn;
396
22
    TxnErrorCode err = txn_kv->create_txn(&txn);
397
22
    if (err != TxnErrorCode::TXN_OK) {
398
0
        LOG(WARNING) << "failed to init txn, err=" << err;
399
0
        return -1;
400
0
    }
401
22
    std::string val;
402
22
    switch (txn->get(key, &val)) {
403
19
    case TxnErrorCode::TXN_OK:
404
19
        return 0;
405
3
    case TxnErrorCode::TXN_KEY_NOT_FOUND:
406
3
        return 1;
407
0
    default:
408
0
        return -1;
409
22
    }
410
22
}
411
412
InstanceChecker::InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id)
413
95
        : txn_kv_(std::move(txn_kv)), instance_id_(instance_id) {}
414
415
95
int InstanceChecker::init(const InstanceInfoPB& instance) {
416
95
    int ret = init_obj_store_accessors(instance);
417
95
    if (ret != 0) {
418
0
        return ret;
419
0
    }
420
421
95
    return init_storage_vault_accessors(instance);
422
95
}
423
424
95
int InstanceChecker::init_obj_store_accessors(const InstanceInfoPB& instance) {
425
95
    for (const auto& obj_info : instance.obj_info()) {
426
95
#ifdef UNIT_TEST
427
95
        auto accessor = std::make_shared<MockAccessor>();
428
#else
429
        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
430
        if (!s3_conf) {
431
            LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
432
            return -1;
433
        }
434
435
        std::shared_ptr<S3Accessor> accessor;
436
        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
437
        if (ret != 0) {
438
            LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
439
                         << " resource_id=" << obj_info.id();
440
            return ret;
441
        }
442
#endif
443
444
95
        accessor_map_.emplace(obj_info.id(), std::move(accessor));
445
95
    }
446
447
95
    return 0;
448
95
}
449
450
95
int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance) {
451
95
    if (instance.resource_ids().empty()) {
452
95
        return 0;
453
95
    }
454
455
0
    FullRangeGetIteratorOptions opts(txn_kv_);
456
0
    opts.prefetch = true;
457
0
    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
458
0
                                      storage_vault_key({instance_id_, "\xff"}), std::move(opts));
459
460
0
    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
461
0
        auto [k, v] = *kv;
462
0
        StorageVaultPB vault;
463
0
        if (!vault.ParseFromArray(v.data(), v.size())) {
464
0
            LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
465
0
            return -1;
466
0
        }
467
0
        TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
468
0
                                 &accessor_map_, &vault);
469
0
        if (vault.has_hdfs_info()) {
470
0
#ifdef ENABLE_HDFS_STORAGE_VAULT
471
0
            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
472
0
            int ret = accessor->init();
473
0
            if (ret != 0) {
474
0
                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
475
0
                             << " resource_id=" << vault.id() << " name=" << vault.name();
476
0
                return ret;
477
0
            }
478
479
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
480
#else
481
            LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), "
482
                       << "but HDFS storage vaults were detected";
483
#endif
484
0
        } else if (vault.has_obj_info()) {
485
0
#ifdef UNIT_TEST
486
0
            auto accessor = std::make_shared<MockAccessor>();
487
#else
488
            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
489
            if (!s3_conf) {
490
                LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
491
                return -1;
492
            }
493
494
            std::shared_ptr<S3Accessor> accessor;
495
            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
496
            if (ret != 0) {
497
                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
498
                             << " resource_id=" << vault.id() << " name=" << vault.name();
499
                return ret;
500
            }
501
#endif
502
503
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
504
0
        }
505
0
    }
506
507
0
    if (!it->is_valid()) {
508
0
        LOG_WARNING("failed to get storage vault kv");
509
0
        return -1;
510
0
    }
511
0
    return 0;
512
0
}
513
514
16
int InstanceChecker::do_check() {
515
16
    TEST_SYNC_POINT("InstanceChecker.do_check");
516
16
    LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
517
16
    int check_ret = 0;
518
16
    long num_scanned = 0;
519
16
    long num_scanned_with_segment = 0;
520
16
    long num_rowset_loss = 0;
521
16
    long instance_volume = 0;
522
16
    using namespace std::chrono;
523
16
    auto start_time = steady_clock::now();
524
16
    DORIS_CLOUD_DEFER {
525
16
        auto cost = duration<float>(steady_clock::now() - start_time).count();
526
16
        LOG(INFO) << "check instance objects finished, cost=" << cost
527
16
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
528
16
                  << " num_scanned_with_segment=" << num_scanned_with_segment
529
16
                  << " num_rowset_loss=" << num_rowset_loss
530
16
                  << " instance_volume=" << instance_volume;
531
16
        g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
532
16
        g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
533
16
        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
534
16
        g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
535
        // FIXME(plat1ko): What if some list operation failed?
536
16
        g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
537
16
    };
538
539
16
    struct TabletFiles {
540
16
        int64_t tablet_id {0};
541
16
        std::unordered_set<std::string> files;
542
16
    };
543
16
    TabletFiles tablet_files_cache;
544
545
4.05k
    auto check_rowset_objects = [&, this](doris::RowsetMetaCloudPB& rs_meta, std::string_view key) {
546
4.05k
        if (rs_meta.num_segments() == 0) {
547
0
            return;
548
0
        }
549
550
4.05k
        bool data_loss = false;
551
4.05k
        bool segment_file_loss = false;
552
4.05k
        bool index_file_loss = false;
553
554
4.05k
        DORIS_CLOUD_DEFER {
555
4.05k
            if (data_loss) {
556
32
                LOG(INFO) << "segment file is" << (segment_file_loss ? "" : " not") << " loss, "
557
32
                          << "index file is" << (index_file_loss ? "" : " not") << " loss, "
558
32
                          << "rowset.tablet_id = " << rs_meta.tablet_id();
559
32
                num_rowset_loss++;
560
32
            }
561
4.05k
        };
562
563
4.05k
        ++num_scanned_with_segment;
564
4.05k
        if (tablet_files_cache.tablet_id != rs_meta.tablet_id()) {
565
454
            long tablet_volume = 0;
566
            // Clear cache
567
454
            tablet_files_cache.tablet_id = 0;
568
454
            tablet_files_cache.files.clear();
569
            // Get all file paths under this tablet directory
570
454
            auto find_it = accessor_map_.find(rs_meta.resource_id());
571
454
            if (find_it == accessor_map_.end()) {
572
0
                LOG_WARNING("resource id not found in accessor map")
573
0
                        .tag("resource_id", rs_meta.resource_id())
574
0
                        .tag("tablet_id", rs_meta.tablet_id())
575
0
                        .tag("rowset_id", rs_meta.rowset_id_v2());
576
0
                check_ret = -1;
577
0
                return;
578
0
            }
579
580
454
            std::unique_ptr<ListIterator> list_iter;
581
454
            int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
582
454
                                                      &list_iter);
583
454
            if (ret != 0) { // No need to log, because S3Accessor has logged this error
584
0
                check_ret = -1;
585
0
                return;
586
0
            }
587
588
18.5k
            for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
589
18.0k
                tablet_files_cache.files.insert(std::move(file->path));
590
18.0k
                tablet_volume += file->size;
591
18.0k
            }
592
454
            tablet_files_cache.tablet_id = rs_meta.tablet_id();
593
454
            instance_volume += tablet_volume;
594
454
        }
595
596
16.1k
        for (int i = 0; i < rs_meta.num_segments(); ++i) {
597
12.0k
            auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
598
599
12.0k
            if (tablet_files_cache.files.contains(path)) {
600
12.0k
                continue;
601
12.0k
            }
602
603
10
            if (1 == key_exist(txn_kv_.get(), key)) {
604
                // Rowset has been deleted instead of data loss
605
0
                break;
606
0
            }
607
10
            data_loss = true;
608
10
            segment_file_loss = true;
609
10
            TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
610
10
            LOG(WARNING) << "object not exist, path=" << path
611
10
                         << ", rs_meta=" << rs_meta.ShortDebugString() << " key=" << hex(key);
612
10
        }
613
614
4.05k
        std::unique_ptr<Transaction> txn;
615
4.05k
        TxnErrorCode err = txn_kv_->create_txn(&txn);
616
4.05k
        if (err != TxnErrorCode::TXN_OK) {
617
0
            LOG(WARNING) << "failed to init txn, err=" << err;
618
0
            check_ret = -1;
619
0
            return;
620
0
        }
621
622
4.05k
        TabletIndexPB tablet_index;
623
4.05k
        if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) == -1) {
624
0
            LOG(WARNING) << "failed to get tablet index, tablet_id= " << rs_meta.tablet_id();
625
0
            check_ret = -1;
626
0
            return;
627
0
        }
628
629
4.05k
        auto tablet_schema_key =
630
4.05k
                meta_schema_key({instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
631
4.05k
        ValueBuf tablet_schema_val;
632
4.05k
        err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
633
634
4.05k
        if (err != TxnErrorCode::TXN_OK) {
635
2.00k
            check_ret = -1;
636
2.00k
            LOG(WARNING) << "failed to get schema, err=" << err;
637
2.00k
            return;
638
2.00k
        }
639
640
2.05k
        auto* schema = rs_meta.mutable_tablet_schema();
641
2.05k
        if (!parse_schema_value(tablet_schema_val, schema)) {
642
0
            LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
643
0
            return;
644
0
        }
645
646
2.05k
        std::vector<std::pair<int64_t, std::string>> index_ids;
647
2.05k
        for (const auto& i : rs_meta.tablet_schema().index()) {
648
2.05k
            if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
649
2.05k
                index_ids.emplace_back(i.index_id(), i.index_suffix_name());
650
2.05k
            }
651
2.05k
        }
652
2.05k
        if (!index_ids.empty()) {
653
8.10k
            for (int i = 0; i < rs_meta.num_segments(); ++i) {
654
6.05k
                std::vector<std::string> index_path_v;
655
6.05k
                if (rs_meta.tablet_schema().inverted_index_storage_format() ==
656
6.05k
                    InvertedIndexStorageFormatPB::V1) {
657
6.01k
                    for (const auto& index_id : index_ids) {
658
6.01k
                        LOG(INFO) << "check inverted index, tablet_id=" << rs_meta.tablet_id()
659
6.01k
                                  << " rowset_id=" << rs_meta.rowset_id_v2() << " segment_id=" << i
660
6.01k
                                  << " index_id=" << index_id.first
661
6.01k
                                  << " index_suffix_name=" << index_id.second;
662
6.01k
                        index_path_v.emplace_back(
663
6.01k
                                inverted_index_path_v1(rs_meta.tablet_id(), rs_meta.rowset_id_v2(),
664
6.01k
                                                       i, index_id.first, index_id.second));
665
6.01k
                    }
666
6.01k
                } else {
667
40
                    index_path_v.emplace_back(
668
40
                            inverted_index_path_v2(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i));
669
40
                }
670
671
6.05k
                if (std::ranges::all_of(index_path_v, [&](const auto& idx_file_path) {
672
6.05k
                        if (!tablet_files_cache.files.contains(idx_file_path)) {
673
23
                            LOG(INFO) << "loss index file: " << idx_file_path;
674
23
                            return false;
675
23
                        }
676
6.03k
                        return true;
677
6.05k
                    })) {
678
6.03k
                    continue;
679
6.03k
                }
680
23
                index_file_loss = true;
681
23
                data_loss = true;
682
23
            }
683
2.05k
        }
684
2.05k
    };
685
686
    // scan visible rowsets
687
16
    auto start_key = meta_rowset_key({instance_id_, 0, 0});
688
16
    auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
689
690
16
    std::unique_ptr<RangeGetIterator> it;
691
16
    do {
692
16
        std::unique_ptr<Transaction> txn;
693
16
        TxnErrorCode err = txn_kv_->create_txn(&txn);
694
16
        if (err != TxnErrorCode::TXN_OK) {
695
0
            LOG(WARNING) << "failed to init txn, err=" << err;
696
0
            return -1;
697
0
        }
698
699
16
        err = txn->get(start_key, end_key, &it);
700
16
        if (err != TxnErrorCode::TXN_OK) {
701
0
            LOG(WARNING) << "internal error, failed to get rowset meta, err=" << err;
702
0
            return -1;
703
0
        }
704
16
        num_scanned += it->size();
705
706
4.07k
        while (it->has_next() && !stopped()) {
707
4.05k
            auto [k, v] = it->next();
708
4.05k
            if (!it->has_next()) {
709
6
                start_key = k;
710
6
            }
711
712
4.05k
            doris::RowsetMetaCloudPB rs_meta;
713
4.05k
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
714
0
                ++num_rowset_loss;
715
0
                LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
716
0
                continue;
717
0
            }
718
4.05k
            check_rowset_objects(rs_meta, k);
719
4.05k
        }
720
16
        start_key.push_back('\x00'); // Update to next smallest key for iteration
721
16
    } while (it->more() && !stopped());
722
723
16
    return num_rowset_loss > 0 ? 1 : check_ret;
724
16
}
725
726
34
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
727
    // If there are multiple buckets, return the minimum lifecycle.
728
34
    int64_t min_lifecycle_days = INT64_MAX;
729
34
    int64_t tmp_liefcycle_days = 0;
730
34
    for (const auto& [id, accessor] : accessor_map_) {
731
34
        if (accessor->type() != AccessorType::S3) {
732
34
            continue;
733
34
        }
734
735
0
        auto* s3_accessor = static_cast<S3Accessor*>(accessor.get());
736
737
0
        if (s3_accessor->check_versioning() != 0) {
738
0
            return -1;
739
0
        }
740
741
0
        if (s3_accessor->get_life_cycle(&tmp_liefcycle_days) != 0) {
742
0
            return -1;
743
0
        }
744
745
0
        if (tmp_liefcycle_days < min_lifecycle_days) {
746
0
            min_lifecycle_days = tmp_liefcycle_days;
747
0
        }
748
0
    }
749
34
    *lifecycle_days = min_lifecycle_days;
750
34
    return 0;
751
34
}
752
753
5
int InstanceChecker::do_inverted_check() {
754
5
    if (accessor_map_.size() > 1) {
755
0
        LOG(INFO) << "currently not support inverted check for multi accessor. instance_id="
756
0
                  << instance_id_;
757
0
        return 0;
758
0
    }
759
760
5
    LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
761
5
    int check_ret = 0;
762
5
    long num_scanned = 0;
763
5
    long num_file_leak = 0;
764
5
    using namespace std::chrono;
765
5
    auto start_time = steady_clock::now();
766
5
    DORIS_CLOUD_DEFER {
767
5
        g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
768
5
        g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
769
5
        auto cost = duration<float>(steady_clock::now() - start_time).count();
770
5
        LOG(INFO) << "inverted check instance objects finished, cost=" << cost
771
5
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
772
5
                  << " num_file_leak=" << num_file_leak;
773
5
    };
774
775
5
    struct TabletRowsets {
776
5
        int64_t tablet_id {0};
777
5
        std::unordered_set<std::string> rowset_ids;
778
5
    };
779
5
    TabletRowsets tablet_rowsets_cache;
780
781
5
    RowsetIndexesFormatV1 rowset_index_cache_v1;
782
5
    RowsetIndexesFormatV2 rowset_index_cache_v2;
783
784
    // Return 0 if check success, return 1 if file is garbage data, negative if error occurred
785
108
    auto check_segment_file = [&](const std::string& obj_key) {
786
108
        std::vector<std::string> str;
787
108
        butil::SplitString(obj_key, '/', &str);
788
        // data/{tablet_id}/{rowset_id}_{seg_num}.dat
789
108
        if (str.size() < 3) {
790
            // clang-format off
791
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
792
0
                         << " value = " << str.size();
793
            // clang-format on
794
0
            return -1;
795
0
        }
796
797
108
        int64_t tablet_id = atol(str[1].c_str());
798
108
        if (tablet_id <= 0) {
799
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
800
0
            return -1;
801
0
        }
802
803
108
        if (!str[2].ends_with(".dat")) {
804
            // skip check not segment file
805
54
            return 0;
806
54
        }
807
808
54
        std::string rowset_id;
809
54
        if (auto pos = str.back().find('_'); pos != std::string::npos) {
810
54
            rowset_id = str.back().substr(0, pos);
811
54
        } else {
812
0
            LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
813
0
            return -1;
814
0
        }
815
816
54
        if (tablet_rowsets_cache.tablet_id == tablet_id) {
817
7
            if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
818
2
                return 0;
819
5
            } else {
820
5
                LOG(WARNING) << "rowset not exists, key=" << obj_key;
821
5
                return -1;
822
5
            }
823
7
        }
824
        // Get all rowset id of this tablet
825
47
        tablet_rowsets_cache.tablet_id = tablet_id;
826
47
        tablet_rowsets_cache.rowset_ids.clear();
827
47
        std::unique_ptr<Transaction> txn;
828
47
        TxnErrorCode err = txn_kv_->create_txn(&txn);
829
47
        if (err != TxnErrorCode::TXN_OK) {
830
0
            LOG(WARNING) << "failed to create txn";
831
0
            return -1;
832
0
        }
833
47
        std::unique_ptr<RangeGetIterator> it;
834
47
        auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
835
47
        auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
836
47
        do {
837
47
            TxnErrorCode err = txn->get(begin, end, &it);
838
47
            if (err != TxnErrorCode::TXN_OK) {
839
0
                LOG(WARNING) << "failed to get rowset kv, err=" << err;
840
0
                return -1;
841
0
            }
842
47
            if (!it->has_next()) {
843
10
                break;
844
10
            }
845
37
            while (it->has_next()) {
846
                // recycle corresponding resources
847
37
                auto [k, v] = it->next();
848
37
                doris::RowsetMetaCloudPB rowset;
849
37
                if (!rowset.ParseFromArray(v.data(), v.size())) {
850
0
                    LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
851
0
                    return -1;
852
0
                }
853
37
                tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
854
37
                if (!it->has_next()) {
855
37
                    begin = k;
856
37
                    begin.push_back('\x00'); // Update to next smallest key for iteration
857
37
                    break;
858
37
                }
859
37
            }
860
37
        } while (it->more() && !stopped());
861
862
47
        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
863
            // Garbage data leak
864
12
            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
865
12
            return 1;
866
12
        }
867
868
35
        return 0;
869
47
    };
870
871
108
    auto check_inverted_index_file = [&](const std::string& obj_key) {
872
108
        std::vector<std::string> str;
873
108
        butil::SplitString(obj_key, '/', &str);
874
        // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
875
        // format v2: data/{tablet_id}/{rowset_id}_{seg_num}.idx
876
108
        if (str.size() < 3) {
877
            // clang-format off
878
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
879
0
                         << " value = " << str.size();
880
            // clang-format on
881
0
            return -1;
882
0
        }
883
884
108
        int64_t tablet_id = atol(str[1].c_str());
885
108
        if (tablet_id <= 0) {
886
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
887
0
            return -1;
888
0
        }
889
890
        // v1: {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
891
        // v2: {rowset_id}_{seg_num}.idx
892
108
        std::string rowset_info = str.back();
893
894
108
        if (!rowset_info.ends_with(".idx")) {
895
54
            return 0; // Not an index file
896
54
        }
897
898
54
        InvertedIndexStorageFormatPB inverted_index_storage_format =
899
54
                std::count(rowset_info.begin(), rowset_info.end(), '_') > 1
900
54
                        ? InvertedIndexStorageFormatPB::V1
901
54
                        : InvertedIndexStorageFormatPB::V2;
902
903
54
        size_t pos = rowset_info.find_last_of('_');
904
54
        if (pos == std::string::npos || pos + 1 >= str.back().size() - 4) {
905
0
            LOG(WARNING) << "Invalid index_id format, key=" << obj_key;
906
0
            return -1;
907
0
        }
908
54
        if (inverted_index_storage_format == InvertedIndexStorageFormatPB::V1) {
909
14
            return check_inverted_index_file_storage_format_v1(tablet_id, obj_key, rowset_info,
910
14
                                                               rowset_index_cache_v1);
911
40
        } else {
912
40
            return check_inverted_index_file_storage_format_v2(tablet_id, obj_key, rowset_info,
913
40
                                                               rowset_index_cache_v2);
914
40
        }
915
54
    };
916
    // so we choose to skip here.
917
5
    TEST_SYNC_POINT_RETURN_WITH_VALUE("InstanceChecker::do_inverted_check", (int)0);
918
919
3
    for (auto& [_, accessor] : accessor_map_) {
920
3
        std::unique_ptr<ListIterator> list_iter;
921
3
        int ret = accessor->list_directory("data", &list_iter);
922
3
        if (ret != 0) {
923
0
            return -1;
924
0
        }
925
926
111
        for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
927
108
            ++num_scanned;
928
108
            int ret = check_segment_file(file->path);
929
108
            if (ret != 0) {
930
17
                LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
931
17
                             << " path=" << file->path;
932
17
                if (ret == 1) {
933
12
                    ++num_file_leak;
934
12
                } else {
935
5
                    check_ret = -1;
936
5
                }
937
17
            }
938
108
            ret = check_inverted_index_file(file->path);
939
108
            if (ret != 0) {
940
13
                LOG(WARNING) << "failed to check index file, uri=" << accessor->uri()
941
13
                             << " path=" << file->path;
942
13
                if (ret == 1) {
943
13
                    ++num_file_leak;
944
13
                } else {
945
0
                    check_ret = -1;
946
0
                }
947
13
            }
948
108
        }
949
950
3
        if (!list_iter->is_valid()) {
951
0
            LOG(WARNING) << "failed to list data directory. uri=" << accessor->uri();
952
0
            return -1;
953
0
        }
954
3
    }
955
3
    return num_file_leak > 0 ? 1 : check_ret;
956
3
}
957
958
3
int InstanceChecker::traverse_mow_tablet(const std::function<int(int64_t, bool)>& check_func) {
959
3
    std::unique_ptr<RangeGetIterator> it;
960
3
    auto begin = meta_rowset_key({instance_id_, 0, 0});
961
3
    auto end = meta_rowset_key({instance_id_, std::numeric_limits<int64_t>::max(), 0});
962
43
    do {
963
43
        std::unique_ptr<Transaction> txn;
964
43
        TxnErrorCode err = txn_kv_->create_txn(&txn);
965
43
        if (err != TxnErrorCode::TXN_OK) {
966
0
            LOG(WARNING) << "failed to create txn";
967
0
            return -1;
968
0
        }
969
43
        err = txn->get(begin, end, &it, false, 1);
970
43
        if (err != TxnErrorCode::TXN_OK) {
971
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
972
0
            return -1;
973
0
        }
974
43
        if (!it->has_next()) {
975
3
            break;
976
3
        }
977
80
        while (it->has_next() && !stopped()) {
978
40
            auto [k, v] = it->next();
979
40
            std::string_view k1 = k;
980
40
            k1.remove_prefix(1);
981
40
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
982
40
            decode_key(&k1, &out);
983
            // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
984
40
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
985
986
40
            if (!it->has_next()) {
987
                // Update to next smallest key for iteration
988
                // scan for next tablet in this instance
989
40
                begin = meta_rowset_key({instance_id_, tablet_id + 1, 0});
990
40
            }
991
992
40
            TabletMetaCloudPB tablet_meta;
993
40
            int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
994
40
            if (ret < 0) {
995
0
                LOG(WARNING) << fmt::format(
996
0
                        "failed to get_tablet_meta in do_delete_bitmap_integrity_check(), "
997
0
                        "instance_id={}, tablet_id={}",
998
0
                        instance_id_, tablet_id);
999
0
                return ret;
1000
0
            }
1001
1002
40
            if (tablet_meta.enable_unique_key_merge_on_write()) {
1003
                // only check merge-on-write table
1004
30
                bool has_sequence_col = tablet_meta.schema().has_sequence_col_idx() &&
1005
30
                                        tablet_meta.schema().sequence_col_idx() != -1;
1006
30
                int ret = check_func(tablet_id, has_sequence_col);
1007
30
                if (ret < 0) {
1008
                    // return immediately when encounter unexpected error,
1009
                    // otherwise, we continue to check the next tablet
1010
0
                    return ret;
1011
0
                }
1012
30
            }
1013
40
        }
1014
40
    } while (it->more() && !stopped());
1015
3
    return 0;
1016
3
}
1017
1018
int InstanceChecker::traverse_rowset_delete_bitmaps(
1019
        int64_t tablet_id, std::string rowset_id,
1020
0
        const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback) {
1021
0
    std::unique_ptr<RangeGetIterator> it;
1022
0
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, 0, 0});
1023
0
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id,
1024
0
                                       std::numeric_limits<int64_t>::max(),
1025
0
                                       std::numeric_limits<int64_t>::max()});
1026
0
    do {
1027
0
        std::unique_ptr<Transaction> txn;
1028
0
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1029
0
        if (err != TxnErrorCode::TXN_OK) {
1030
0
            LOG(WARNING) << "failed to create txn";
1031
0
            return -1;
1032
0
        }
1033
0
        err = txn->get(begin, end, &it);
1034
0
        if (err != TxnErrorCode::TXN_OK) {
1035
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1036
0
            return -1;
1037
0
        }
1038
0
        if (!it->has_next()) {
1039
0
            break;
1040
0
        }
1041
0
        while (it->has_next() && !stopped()) {
1042
0
            auto [k, v] = it->next();
1043
0
            std::string_view k1 = k;
1044
0
            k1.remove_prefix(1);
1045
0
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1046
0
            decode_key(&k1, &out);
1047
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1048
0
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1049
0
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1050
1051
0
            int ret = callback(tablet_id, rowset_id, version, segment_id);
1052
0
            if (ret != 0) {
1053
0
                return ret;
1054
0
            }
1055
1056
0
            if (!it->has_next()) {
1057
0
                begin = k;
1058
0
                begin.push_back('\x00'); // Update to next smallest key for iteration
1059
0
                break;
1060
0
            }
1061
0
        }
1062
0
    } while (it->more() && !stopped());
1063
1064
0
    return 0;
1065
0
}
1066
1067
int InstanceChecker::collect_tablet_rowsets(
1068
50
        int64_t tablet_id, const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb) {
1069
50
    std::unique_ptr<Transaction> txn;
1070
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1071
50
    if (err != TxnErrorCode::TXN_OK) {
1072
0
        LOG(WARNING) << "failed to create txn";
1073
0
        return -1;
1074
0
    }
1075
50
    std::unique_ptr<RangeGetIterator> it;
1076
50
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1077
50
    auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1078
1079
50
    int64_t rowsets_num {0};
1080
50
    do {
1081
50
        TxnErrorCode err = txn->get(begin, end, &it);
1082
50
        if (err != TxnErrorCode::TXN_OK) {
1083
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1084
0
            return -1;
1085
0
        }
1086
50
        if (!it->has_next()) {
1087
0
            break;
1088
0
        }
1089
394
        while (it->has_next() && !stopped()) {
1090
394
            auto [k, v] = it->next();
1091
394
            doris::RowsetMetaCloudPB rowset;
1092
394
            if (!rowset.ParseFromArray(v.data(), v.size())) {
1093
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1094
0
                return -1;
1095
0
            }
1096
1097
394
            ++rowsets_num;
1098
394
            collect_cb(rowset);
1099
1100
394
            if (!it->has_next()) {
1101
50
                begin = k;
1102
50
                begin.push_back('\x00'); // Update to next smallest key for iteration
1103
50
                break;
1104
50
            }
1105
394
        }
1106
50
    } while (it->more() && !stopped());
1107
1108
50
    LOG(INFO) << fmt::format(
1109
50
            "[delete bitmap checker] successfully collect rowsets for instance_id={}, "
1110
50
            "tablet_id={}, rowsets_num={}",
1111
50
            instance_id_, tablet_id, rowsets_num);
1112
50
    return 0;
1113
50
}
1114
1115
2
int InstanceChecker::do_delete_bitmap_inverted_check() {
1116
2
    LOG(INFO) << fmt::format(
1117
2
            "[delete bitmap checker] begin to do_delete_bitmap_inverted_check for instance_id={}",
1118
2
            instance_id_);
1119
1120
    // number of delete bitmap keys being scanned
1121
2
    int64_t total_delete_bitmap_keys {0};
1122
    // number of delete bitmaps which belongs to non mow tablet
1123
2
    int64_t abnormal_delete_bitmaps {0};
1124
    // number of delete bitmaps which doesn't have corresponding rowset in MS
1125
2
    int64_t leaked_delete_bitmaps {0};
1126
1127
2
    auto start_time = std::chrono::steady_clock::now();
1128
2
    DORIS_CLOUD_DEFER {
1129
2
        g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_, leaked_delete_bitmaps);
1130
2
        g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_, abnormal_delete_bitmaps);
1131
2
        g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_, total_delete_bitmap_keys);
1132
1133
2
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1134
2
                            std::chrono::steady_clock::now() - start_time)
1135
2
                            .count();
1136
2
        if (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) {
1137
1
            LOG(WARNING) << fmt::format(
1138
1
                    "[delete bitmap check fails] delete bitmap inverted check for instance_id={}, "
1139
1
                    "cost={} ms, total_delete_bitmap_keys={}, leaked_delete_bitmaps={}, "
1140
1
                    "abnormal_delete_bitmaps={}",
1141
1
                    instance_id_, cost, total_delete_bitmap_keys, leaked_delete_bitmaps,
1142
1
                    abnormal_delete_bitmaps);
1143
1
        } else {
1144
1
            LOG(INFO) << fmt::format(
1145
1
                    "[delete bitmap checker] delete bitmap inverted check for instance_id={}, "
1146
1
                    "passed. cost={} ms, total_delete_bitmap_keys={}",
1147
1
                    instance_id_, cost, total_delete_bitmap_keys);
1148
1
        }
1149
2
    };
1150
1151
2
    struct TabletsRowsetsCache {
1152
2
        int64_t tablet_id {-1};
1153
2
        bool enable_merge_on_write {false};
1154
2
        std::unordered_set<std::string> rowsets {};
1155
2
        std::unordered_set<std::string> pending_delete_bitmaps {};
1156
2
    } tablet_rowsets_cache {};
1157
1158
2
    std::unique_ptr<RangeGetIterator> it;
1159
2
    auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0});
1160
2
    auto end =
1161
2
            meta_delete_bitmap_key({instance_id_, std::numeric_limits<int64_t>::max(), "", 0, 0});
1162
2
    do {
1163
2
        std::unique_ptr<Transaction> txn;
1164
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1165
2
        if (err != TxnErrorCode::TXN_OK) {
1166
0
            LOG(WARNING) << "failed to create txn";
1167
0
            return -1;
1168
0
        }
1169
2
        err = txn->get(begin, end, &it);
1170
2
        if (err != TxnErrorCode::TXN_OK) {
1171
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1172
0
            return -1;
1173
0
        }
1174
2
        if (!it->has_next()) {
1175
0
            break;
1176
0
        }
1177
502
        while (it->has_next() && !stopped()) {
1178
500
            auto [k, v] = it->next();
1179
500
            std::string_view k1 = k;
1180
500
            k1.remove_prefix(1);
1181
500
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1182
500
            decode_key(&k1, &out);
1183
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1184
500
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
1185
500
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1186
500
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1187
500
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1188
1189
500
            ++total_delete_bitmap_keys;
1190
1191
500
            if (!it->has_next()) {
1192
2
                begin = k;
1193
2
                begin.push_back('\x00'); // Update to next smallest key for iteration
1194
2
            }
1195
1196
500
            if (tablet_rowsets_cache.tablet_id == -1 ||
1197
500
                tablet_rowsets_cache.tablet_id != tablet_id) {
1198
30
                TabletMetaCloudPB tablet_meta;
1199
30
                int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1200
30
                if (ret < 0) {
1201
0
                    LOG(WARNING) << fmt::format(
1202
0
                            "[delete bitmap checker] failed to get_tablet_meta in "
1203
0
                            "do_delete_bitmap_inverted_check(), instance_id={}, tablet_id={}",
1204
0
                            instance_id_, tablet_id);
1205
0
                    return ret;
1206
0
                }
1207
1208
30
                tablet_rowsets_cache.tablet_id = tablet_id;
1209
30
                tablet_rowsets_cache.enable_merge_on_write =
1210
30
                        tablet_meta.enable_unique_key_merge_on_write();
1211
30
                tablet_rowsets_cache.rowsets.clear();
1212
30
                tablet_rowsets_cache.pending_delete_bitmaps.clear();
1213
1214
30
                if (tablet_rowsets_cache.enable_merge_on_write) {
1215
                    // only collect rowsets for merge-on-write tablet
1216
20
                    auto collect_cb =
1217
199
                            [&tablet_rowsets_cache](const doris::RowsetMetaCloudPB& rowset) {
1218
199
                                tablet_rowsets_cache.rowsets.insert(rowset.rowset_id_v2());
1219
199
                            };
1220
20
                    ret = collect_tablet_rowsets(tablet_id, collect_cb);
1221
20
                    if (ret < 0) {
1222
0
                        return ret;
1223
0
                    }
1224
                    // get pending delete bitmaps
1225
20
                    ret = get_pending_delete_bitmap_keys(
1226
20
                            tablet_id, tablet_rowsets_cache.pending_delete_bitmaps);
1227
20
                    if (ret < 0) {
1228
0
                        return ret;
1229
0
                    }
1230
20
                }
1231
30
            }
1232
500
            DCHECK_EQ(tablet_id, tablet_rowsets_cache.tablet_id);
1233
1234
500
            if (!tablet_rowsets_cache.enable_merge_on_write) {
1235
                // clang-format off
1236
40
                TEST_SYNC_POINT_CALLBACK(
1237
40
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
1238
40
                        &tablet_id, &rowset_id, &version, &segment_id);
1239
                // clang-format on
1240
40
                ++abnormal_delete_bitmaps;
1241
                // log an error and continue to check the next delete bitmap
1242
40
                LOG(WARNING) << fmt::format(
1243
40
                        "[delete bitmap check fails] find a delete bitmap belongs to tablet "
1244
40
                        "which is not a merge-on-write table! instance_id={}, tablet_id={}, "
1245
40
                        "version={}, segment_id={}",
1246
40
                        instance_id_, tablet_id, version, segment_id);
1247
40
                continue;
1248
40
            }
1249
1250
460
            if (!tablet_rowsets_cache.rowsets.contains(rowset_id) &&
1251
460
                !tablet_rowsets_cache.pending_delete_bitmaps.contains(std::string(k))) {
1252
170
                TEST_SYNC_POINT_CALLBACK(
1253
170
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap",
1254
170
                        &tablet_id, &rowset_id, &version, &segment_id);
1255
170
                ++leaked_delete_bitmaps;
1256
                // log an error and continue to check the next delete bitmap
1257
170
                LOG(WARNING) << fmt::format(
1258
170
                        "[delete bitmap check fails] can't find corresponding rowset for delete "
1259
170
                        "bitmap instance_id={}, tablet_id={}, rowset_id={}, version={}, "
1260
170
                        "segment_id={}",
1261
170
                        instance_id_, tablet_id, rowset_id, version, segment_id);
1262
170
            }
1263
460
        }
1264
2
    } while (it->more() && !stopped());
1265
1266
2
    return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0;
1267
2
}
1268
1269
int InstanceChecker::get_pending_delete_bitmap_keys(
1270
50
        int64_t tablet_id, std::unordered_set<std::string>& pending_delete_bitmaps) {
1271
50
    std::unique_ptr<Transaction> txn;
1272
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1273
50
    if (err != TxnErrorCode::TXN_OK) {
1274
0
        LOG(WARNING) << "failed to create txn";
1275
0
        return -1;
1276
0
    }
1277
50
    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
1278
50
    std::string pending_val;
1279
50
    err = txn->get(pending_key, &pending_val);
1280
50
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1281
0
        LOG(WARNING) << "failed to get pending delete bitmap kv, err=" << err;
1282
0
        return -1;
1283
0
    }
1284
50
    if (err == TxnErrorCode::TXN_OK) {
1285
2
        PendingDeleteBitmapPB pending_info;
1286
2
        if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
1287
0
            LOG(WARNING) << "failed to parse PendingDeleteBitmapPB, tablet=" << tablet_id;
1288
0
            return -1;
1289
0
        }
1290
12
        for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) {
1291
12
            pending_delete_bitmaps.emplace(std::string(delete_bitmap_key));
1292
12
        }
1293
2
    }
1294
50
    return 0;
1295
50
}
1296
1297
int InstanceChecker::check_inverted_index_file_storage_format_v1(
1298
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1299
14
        RowsetIndexesFormatV1& rowset_index_cache_v1) {
1300
    // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1301
14
    std::string rowset_id;
1302
14
    int64_t segment_id;
1303
14
    std::string index_id_with_suffix_name;
1304
    // {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1305
14
    std::vector<std::string> str;
1306
14
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1307
14
    if (str.size() < 3) {
1308
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 3, rowset_info = "
1309
0
                     << rowset_info;
1310
0
        return -1;
1311
0
    }
1312
14
    rowset_id = str[0];
1313
14
    segment_id = std::atoll(str[1].c_str());
1314
14
    index_id_with_suffix_name = str[2];
1315
1316
14
    if (rowset_index_cache_v1.rowset_id == rowset_id) {
1317
0
        if (rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1318
0
            if (auto it = rowset_index_cache_v1.index_ids.find(index_id_with_suffix_name);
1319
0
                it == rowset_index_cache_v1.index_ids.end()) {
1320
                // clang-format off
1321
0
                LOG(WARNING) << fmt::format("index_id with suffix name not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1322
                // clang-format on
1323
0
                return -1;
1324
0
            }
1325
0
        } else {
1326
            // clang-format off
1327
0
            LOG(WARNING) << fmt::format("segment id not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1328
            // clang-format on
1329
0
            return -1;
1330
0
        }
1331
0
    }
1332
1333
14
    rowset_index_cache_v1.rowset_id = rowset_id;
1334
14
    rowset_index_cache_v1.segment_ids.clear();
1335
14
    rowset_index_cache_v1.index_ids.clear();
1336
1337
14
    std::unique_ptr<Transaction> txn;
1338
14
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1339
14
    if (err != TxnErrorCode::TXN_OK) {
1340
0
        LOG(WARNING) << "failed to create txn";
1341
0
        return -1;
1342
0
    }
1343
14
    std::unique_ptr<RangeGetIterator> it;
1344
14
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1345
14
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1346
14
    do {
1347
14
        TxnErrorCode err = txn->get(begin, end, &it);
1348
14
        if (err != TxnErrorCode::TXN_OK) {
1349
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1350
0
            return -1;
1351
0
        }
1352
14
        if (!it->has_next()) {
1353
8
            break;
1354
8
        }
1355
6
        while (it->has_next()) {
1356
            // recycle corresponding resources
1357
6
            auto [k, v] = it->next();
1358
6
            doris::RowsetMetaCloudPB rs_meta;
1359
6
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1360
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1361
0
                return -1;
1362
0
            }
1363
1364
6
            TabletIndexPB tablet_index;
1365
6
            if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) ==
1366
6
                -1) {
1367
0
                LOG(WARNING) << "failedt to get tablet index, tablet_id= " << rs_meta.tablet_id();
1368
0
                return -1;
1369
0
            }
1370
1371
6
            auto tablet_schema_key = meta_schema_key(
1372
6
                    {instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
1373
6
            ValueBuf tablet_schema_val;
1374
6
            err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
1375
1376
6
            if (err != TxnErrorCode::TXN_OK) {
1377
0
                LOG(WARNING) << "failed to get schema, err=" << err;
1378
0
                return -1;
1379
0
            }
1380
1381
6
            auto* schema = rs_meta.mutable_tablet_schema();
1382
6
            if (!parse_schema_value(tablet_schema_val, schema)) {
1383
0
                LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
1384
0
                return -1;
1385
0
            }
1386
1387
12
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1388
6
                rowset_index_cache_v1.segment_ids.insert(i);
1389
6
            }
1390
1391
6
            for (const auto& i : rs_meta.tablet_schema().index()) {
1392
6
                if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
1393
6
                    LOG(INFO) << fmt::format(
1394
6
                            "record index info, index_id: {}, index_suffix_name: {}", i.index_id(),
1395
6
                            i.index_suffix_name());
1396
6
                    rowset_index_cache_v1.index_ids.insert(
1397
6
                            fmt::format("{}{}", i.index_id(), i.index_suffix_name()));
1398
6
                }
1399
6
            }
1400
1401
6
            if (!it->has_next()) {
1402
6
                begin = k;
1403
6
                begin.push_back('\x00'); // Update to next smallest key for iteration
1404
6
                break;
1405
6
            }
1406
6
        }
1407
6
    } while (it->more() && !stopped());
1408
1409
14
    if (!rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1410
        // Garbage data leak
1411
        // clang-format off
1412
8
        LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains segment_id, rowset should be recycled,"
1413
8
                     << " key = " << file_path
1414
8
                     << " segment_id = " << segment_id;
1415
        // clang-format on
1416
8
        return 1;
1417
8
    }
1418
1419
6
    if (!rowset_index_cache_v1.index_ids.contains(index_id_with_suffix_name)) {
1420
        // Garbage data leak
1421
        // clang-format off
1422
0
        LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains index_id_with_suffix_name,"
1423
0
                     << " rowset with inde meta should be recycled, key=" << file_path
1424
0
                     << " index_id_with_suffix_name=" << index_id_with_suffix_name;
1425
        // clang-format on
1426
0
        return 1;
1427
0
    }
1428
1429
6
    return 0;
1430
6
}
1431
1432
int InstanceChecker::check_inverted_index_file_storage_format_v2(
1433
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1434
40
        RowsetIndexesFormatV2& rowset_index_cache_v2) {
1435
40
    std::string rowset_id;
1436
40
    int64_t segment_id;
1437
    // {rowset_id}_{seg_num}.idx
1438
40
    std::vector<std::string> str;
1439
40
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1440
40
    if (str.size() < 2) {
1441
        // clang-format off
1442
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 2, rowset_info = " << rowset_info;
1443
        // clang-format on
1444
0
        return -1;
1445
0
    }
1446
40
    rowset_id = str[0];
1447
40
    segment_id = std::atoll(str[1].c_str());
1448
1449
40
    if (rowset_index_cache_v2.rowset_id == rowset_id) {
1450
0
        if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1451
            // clang-format off
1452
0
            LOG(WARNING) << fmt::format("index file not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1453
            // clang-format on
1454
0
            return -1;
1455
0
        }
1456
0
    }
1457
1458
40
    rowset_index_cache_v2.rowset_id = rowset_id;
1459
40
    rowset_index_cache_v2.segment_ids.clear();
1460
1461
40
    std::unique_ptr<Transaction> txn;
1462
40
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1463
40
    if (err != TxnErrorCode::TXN_OK) {
1464
0
        LOG(WARNING) << "failed to create txn";
1465
0
        return -1;
1466
0
    }
1467
40
    std::unique_ptr<RangeGetIterator> it;
1468
40
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1469
40
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1470
40
    do {
1471
40
        TxnErrorCode err = txn->get(begin, end, &it);
1472
40
        if (err != TxnErrorCode::TXN_OK) {
1473
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1474
0
            return -1;
1475
0
        }
1476
40
        if (!it->has_next()) {
1477
5
            break;
1478
5
        }
1479
35
        while (it->has_next()) {
1480
            // recycle corresponding resources
1481
35
            auto [k, v] = it->next();
1482
35
            doris::RowsetMetaCloudPB rs_meta;
1483
35
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1484
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1485
0
                return -1;
1486
0
            }
1487
1488
70
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1489
35
                rowset_index_cache_v2.segment_ids.insert(i);
1490
35
            }
1491
1492
35
            if (!it->has_next()) {
1493
35
                begin = k;
1494
35
                begin.push_back('\x00'); // Update to next smallest key for iteration
1495
35
                break;
1496
35
            }
1497
35
        }
1498
35
    } while (it->more() && !stopped());
1499
1500
40
    if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1501
        // Garbage data leak
1502
5
        LOG(WARNING) << "rowset with index meta should be recycled, key=" << file_path;
1503
5
        return 1;
1504
5
    }
1505
1506
35
    return 0;
1507
40
}
1508
1509
int InstanceChecker::check_delete_bitmap_storage_optimize_v2(
1510
        int64_t tablet_id, bool has_sequence_col,
1511
30
        int64_t& rowsets_with_useless_delete_bitmap_version) {
1512
    // end_version: create_time
1513
30
    std::map<int64_t, int64_t> tablet_rowsets_map {};
1514
    // rowset_id: {start_version, end_version}
1515
30
    std::map<std::string, std::pair<int64_t, int64_t>> rowset_version_map;
1516
    // Get all visible rowsets of this tablet
1517
195
    auto collect_cb = [&](const doris::RowsetMetaCloudPB& rowset) {
1518
195
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1519
            // ignore dummy rowset [0-1]
1520
0
            return;
1521
0
        }
1522
195
        tablet_rowsets_map[rowset.end_version()] = rowset.creation_time();
1523
195
        rowset_version_map[rowset.rowset_id_v2()] =
1524
195
                std::make_pair(rowset.start_version(), rowset.end_version());
1525
195
    };
1526
30
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1527
0
        return ret;
1528
0
    }
1529
1530
30
    std::unordered_set<std::string> pending_delete_bitmaps;
1531
30
    if (auto ret = get_pending_delete_bitmap_keys(tablet_id, pending_delete_bitmaps); ret < 0) {
1532
0
        return ret;
1533
0
    }
1534
1535
30
    std::unique_ptr<RangeGetIterator> it;
1536
30
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
1537
30
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
1538
30
    std::string last_rowset_id = "";
1539
30
    int64_t last_version = 0;
1540
30
    int64_t last_failed_version = 0;
1541
30
    std::vector<int64_t> failed_versions;
1542
30
    auto print_failed_versions = [&]() {
1543
4
        TEST_SYNC_POINT_CALLBACK(
1544
4
                "InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_"
1545
4
                "rowset",
1546
4
                &tablet_id, &last_rowset_id);
1547
4
        rowsets_with_useless_delete_bitmap_version++;
1548
        // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18]
1549
        // print as [8-11, 13, 17-18]
1550
4
        int64_t last_start_version = -1;
1551
4
        int64_t last_end_version = -1;
1552
4
        std::stringstream ss;
1553
4
        ss << "[";
1554
9
        for (int64_t version : failed_versions) {
1555
9
            if (last_start_version == -1) {
1556
4
                last_start_version = version;
1557
4
                last_end_version = version;
1558
4
                continue;
1559
4
            }
1560
5
            if (last_end_version + 1 == version) {
1561
2
                last_end_version = version;
1562
3
            } else {
1563
3
                if (last_start_version == last_end_version) {
1564
3
                    ss << last_start_version << ", ";
1565
3
                } else {
1566
0
                    ss << last_start_version << "-" << last_end_version << ", ";
1567
0
                }
1568
3
                last_start_version = version;
1569
3
                last_end_version = version;
1570
3
            }
1571
5
        }
1572
4
        if (last_start_version == last_end_version) {
1573
3
            ss << last_start_version;
1574
3
        } else {
1575
1
            ss << last_start_version << "-" << last_end_version;
1576
1
        }
1577
4
        ss << "]";
1578
4
        std::stringstream version_str;
1579
4
        auto it = rowset_version_map.find(last_rowset_id);
1580
4
        if (it != rowset_version_map.end()) {
1581
4
            version_str << "[" << it->second.first << "-" << it->second.second << "]";
1582
4
        }
1583
4
        LOG(WARNING) << fmt::format(
1584
4
                "[delete bitmap check fails] delete bitmap storage optimize v2 check fail "
1585
4
                "for instance_id={}, tablet_id={}, rowset_id={}, version={} found delete "
1586
4
                "bitmap with versions={}, size={}",
1587
4
                instance_id_, tablet_id, last_rowset_id, version_str.str(), ss.str(),
1588
4
                failed_versions.size());
1589
4
    };
1590
30
    using namespace std::chrono;
1591
30
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
1592
30
    do {
1593
30
        std::unique_ptr<Transaction> txn;
1594
30
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1595
30
        if (err != TxnErrorCode::TXN_OK) {
1596
0
            LOG(WARNING) << "failed to create txn";
1597
0
            return -1;
1598
0
        }
1599
30
        err = txn->get(begin, end, &it);
1600
30
        if (err != TxnErrorCode::TXN_OK) {
1601
0
            LOG(WARNING) << "failed to get delete bitmap kv, err=" << err;
1602
0
            return -1;
1603
0
        }
1604
30
        if (!it->has_next()) {
1605
0
            break;
1606
0
        }
1607
771
        while (it->has_next() && !stopped()) {
1608
741
            auto [k, v] = it->next();
1609
741
            std::string_view k1 = k;
1610
741
            k1.remove_prefix(1);
1611
741
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1612
741
            decode_key(&k1, &out);
1613
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1614
741
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1615
741
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1616
741
            if (!it->has_next()) {
1617
30
                begin = k;
1618
30
                begin.push_back('\x00'); // Update to next smallest key for iteration
1619
30
            }
1620
741
            if (rowset_id == last_rowset_id && version == last_version) {
1621
                // skip the same rowset and version
1622
167
                continue;
1623
167
            }
1624
574
            if (rowset_id != last_rowset_id && !failed_versions.empty()) {
1625
3
                print_failed_versions();
1626
3
                last_failed_version = 0;
1627
3
                failed_versions.clear();
1628
3
            }
1629
574
            last_rowset_id = rowset_id;
1630
574
            last_version = version;
1631
574
            if (tablet_rowsets_map.find(version) != tablet_rowsets_map.end()) {
1632
548
                continue;
1633
548
            }
1634
26
            auto version_it = rowset_version_map.find(rowset_id);
1635
26
            if (version_it == rowset_version_map.end()) {
1636
                // checked in do_delete_bitmap_inverted_check
1637
1
                continue;
1638
1
            }
1639
25
            if (pending_delete_bitmaps.contains(std::string(k))) {
1640
3
                continue;
1641
3
            }
1642
22
            if (has_sequence_col && version >= version_it->second.first &&
1643
22
                version <= version_it->second.second) {
1644
5
                continue;
1645
5
            }
1646
            // there may be an interval in this situation:
1647
            // 1. finish compaction job; 2. checker; 3. finish agg and remove delete bitmap to ms
1648
17
            auto rowset_it = tablet_rowsets_map.upper_bound(version);
1649
17
            if (rowset_it == tablet_rowsets_map.end()) {
1650
1
                if (version != last_failed_version) {
1651
1
                    failed_versions.push_back(version);
1652
1
                }
1653
1
                last_failed_version = version;
1654
1
                continue;
1655
1
            }
1656
16
            if (rowset_it->second + config::delete_bitmap_storage_optimize_v2_check_skip_seconds >=
1657
16
                now) {
1658
8
                continue;
1659
8
            }
1660
8
            if (version != last_failed_version) {
1661
8
                failed_versions.push_back(version);
1662
8
            }
1663
8
            last_failed_version = version;
1664
8
        }
1665
30
    } while (it->more() && !stopped());
1666
30
    if (!failed_versions.empty()) {
1667
1
        print_failed_versions();
1668
1
    }
1669
30
    LOG(INFO) << fmt::format(
1670
30
            "[delete bitmap checker] finish check delete bitmap storage optimize v2 for "
1671
30
            "instance_id={}, tablet_id={}, rowsets_num={}, "
1672
30
            "rowsets_with_useless_delete_bitmap_version={}",
1673
30
            instance_id_, tablet_id, tablet_rowsets_map.size(),
1674
30
            rowsets_with_useless_delete_bitmap_version);
1675
30
    return (rowsets_with_useless_delete_bitmap_version > 1 ? 1 : 0);
1676
30
}
1677
1678
3
int InstanceChecker::do_delete_bitmap_storage_optimize_check(int version) {
1679
3
    if (version != 2) {
1680
0
        return -1;
1681
0
    }
1682
3
    int64_t total_tablets_num {0};
1683
3
    int64_t failed_tablets_num {0};
1684
1685
    // for v2 check
1686
3
    int64_t max_rowsets_with_useless_delete_bitmap_version = 0;
1687
3
    int64_t tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = 0;
1688
1689
    // check that for every visible rowset, there exists at least delete one bitmap in MS
1690
30
    int ret = traverse_mow_tablet([&](int64_t tablet_id, bool has_sequence_col) {
1691
30
        ++total_tablets_num;
1692
30
        int64_t rowsets_with_useless_delete_bitmap_version = 0;
1693
30
        int res = check_delete_bitmap_storage_optimize_v2(
1694
30
                tablet_id, has_sequence_col, rowsets_with_useless_delete_bitmap_version);
1695
30
        if (rowsets_with_useless_delete_bitmap_version >
1696
30
            max_rowsets_with_useless_delete_bitmap_version) {
1697
1
            max_rowsets_with_useless_delete_bitmap_version =
1698
1
                    rowsets_with_useless_delete_bitmap_version;
1699
1
            tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = tablet_id;
1700
1
        }
1701
30
        failed_tablets_num += (res != 0);
1702
30
        return res;
1703
30
    });
1704
1705
3
    if (ret < 0) {
1706
0
        return ret;
1707
0
    }
1708
1709
3
    g_bvar_max_rowsets_with_useless_delete_bitmap_version.put(
1710
3
            instance_id_, max_rowsets_with_useless_delete_bitmap_version);
1711
1712
3
    std::stringstream ss;
1713
3
    ss << "[delete bitmap checker] check delete bitmap storage optimize v" << version
1714
3
       << " for instance_id=" << instance_id_ << ", total_tablets_num=" << total_tablets_num
1715
3
       << ", failed_tablets_num=" << failed_tablets_num
1716
3
       << ". max_rowsets_with_useless_delete_bitmap_version="
1717
3
       << max_rowsets_with_useless_delete_bitmap_version
1718
3
       << ", tablet_id=" << tablet_id_with_max_rowsets_with_useless_delete_bitmap_version;
1719
3
    LOG(INFO) << ss.str();
1720
1721
3
    return (failed_tablets_num > 0) ? 1 : 0;
1722
3
}
1723
1724
3
int InstanceChecker::do_mow_job_key_check() {
1725
3
    std::unique_ptr<RangeGetIterator> it;
1726
3
    std::string begin = mow_tablet_job_key({instance_id_, 0, 0});
1727
3
    std::string end = mow_tablet_job_key({instance_id_, INT64_MAX, 0});
1728
3
    MowTabletJobPB mow_tablet_job;
1729
3
    do {
1730
3
        std::unique_ptr<Transaction> txn;
1731
3
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1732
3
        if (err != TxnErrorCode::TXN_OK) {
1733
0
            LOG(WARNING) << "failed to create txn";
1734
0
            return -1;
1735
0
        }
1736
3
        err = txn->get(begin, end, &it);
1737
3
        if (err != TxnErrorCode::TXN_OK) {
1738
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
1739
0
            return -1;
1740
0
        }
1741
3
        int64_t now = duration_cast<std::chrono::seconds>(
1742
3
                              std::chrono::system_clock::now().time_since_epoch())
1743
3
                              .count();
1744
3
        while (it->has_next() && !stopped()) {
1745
2
            auto [k, v] = it->next();
1746
2
            std::string_view k1 = k;
1747
2
            k1.remove_prefix(1);
1748
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1749
2
            decode_key(&k1, &out);
1750
            // 0x01 "meta" ${instance_id} "mow_tablet_job" ${table_id} ${initiator}
1751
2
            auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1752
2
            auto initiator = std::get<int64_t>(std::get<0>(out[4]));
1753
2
            if (!mow_tablet_job.ParseFromArray(v.data(), v.size())) [[unlikely]] {
1754
0
                LOG(WARNING) << "failed to parse MowTabletJobPB";
1755
0
                return -1;
1756
0
            }
1757
2
            int64_t expiration = mow_tablet_job.expiration();
1758
            // check job key failed should meet both following two condition:
1759
            // 1. job key is expired
1760
            // 2. table lock key is not found or key is not expired
1761
2
            if (expiration < now - config::mow_job_key_check_expiration_diff_seconds) {
1762
2
                std::string lock_key =
1763
2
                        meta_delete_bitmap_update_lock_key({instance_id_, table_id, -1});
1764
2
                std::string lock_val;
1765
2
                err = txn->get(lock_key, &lock_val);
1766
2
                std::string reason = "";
1767
2
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
1768
0
                    reason = "table lock key not found";
1769
1770
2
                } else {
1771
2
                    DeleteBitmapUpdateLockPB lock_info;
1772
2
                    if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
1773
0
                        LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB";
1774
0
                        return -1;
1775
0
                    }
1776
2
                    if (lock_info.expiration() > now || lock_info.lock_id() != -1) {
1777
2
                        reason = "table lock is not expired,lock_id=" +
1778
2
                                 std::to_string(lock_info.lock_id());
1779
2
                    }
1780
2
                }
1781
2
                if (reason != "") {
1782
2
                    LOG(WARNING) << fmt::format(
1783
2
                            "[compaction key check fails] mow job key check fail for "
1784
2
                            "instance_id={}, table_id={}, initiator={}, expiration={}, now={}, "
1785
2
                            "reason={}",
1786
2
                            instance_id_, table_id, initiator, expiration, now, reason);
1787
2
                    return -1;
1788
2
                }
1789
2
            }
1790
2
        }
1791
1
        begin = it->next_begin_key(); // Update to next smallest key for iteration
1792
1
    } while (it->more() && !stopped());
1793
1
    return 0;
1794
3
}
1795
4
int InstanceChecker::do_tablet_stats_key_check() {
1796
4
    int ret = 0;
1797
1798
4
    int64_t nums_leak = 0;
1799
4
    int64_t nums_loss = 0;
1800
4
    int64_t nums_scanned = 0;
1801
4
    int64_t nums_abnormal = 0;
1802
1803
4
    std::string begin = meta_tablet_key({instance_id_, 0, 0, 0, 0});
1804
4
    std::string end = meta_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1805
    // inverted check tablet exists
1806
4
    LOG(INFO) << "begin inverted check stats_tablet_key";
1807
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1808
4
        int ret = check_stats_tablet_key_exists(key, value);
1809
4
        nums_scanned++;
1810
4
        if (ret == 1) {
1811
1
            nums_loss++;
1812
1
        }
1813
4
        return ret;
1814
4
    });
1815
4
    if (ret == -1) {
1816
0
        LOG(WARNING) << "failed to inverted check if stats tablet key exists";
1817
0
        return -1;
1818
4
    } else if (ret == 1) {
1819
1
        LOG(WARNING) << "stats_tablet_key loss, nums_scanned=" << nums_scanned
1820
1
                     << ", nums_loss=" << nums_loss;
1821
1
        return 1;
1822
1
    }
1823
3
    LOG(INFO) << "finish inverted check stats_tablet_key, nums_scanned=" << nums_scanned
1824
3
              << ", nums_loss=" << nums_loss;
1825
1826
3
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1827
3
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1828
3
    nums_scanned = 0;
1829
    // check tablet exists
1830
3
    LOG(INFO) << "begin check stats_tablet_key leaked";
1831
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1832
4
        int ret = check_stats_tablet_key_leaked(key, value);
1833
4
        nums_scanned++;
1834
4
        if (ret == 1) {
1835
1
            nums_leak++;
1836
1
        }
1837
4
        return ret;
1838
4
    });
1839
3
    if (ret == -1) {
1840
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1841
0
        return -1;
1842
3
    } else if (ret == 1) {
1843
1
        LOG(WARNING) << "stats_tablet_key leaked, nums_scanned=" << nums_scanned
1844
1
                     << ", nums_leak=" << nums_leak;
1845
1
        return 1;
1846
1
    }
1847
2
    LOG(INFO) << "finish check stats_tablet_key leaked, nums_scanned=" << nums_scanned
1848
2
              << ", nums_leak=" << nums_leak;
1849
1850
2
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1851
2
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1852
2
    nums_scanned = 0;
1853
    // check if key is normal
1854
2
    LOG(INFO) << "begin check stats_tablet_key abnormal";
1855
2
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1856
2
        int ret = check_stats_tablet_key(key, value);
1857
2
        nums_scanned++;
1858
2
        if (ret == 1) {
1859
1
            nums_abnormal++;
1860
1
        }
1861
2
        return ret;
1862
2
    });
1863
2
    if (ret == -1) {
1864
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1865
0
        return -1;
1866
2
    } else if (ret == 1) {
1867
1
        LOG(WARNING) << "stats_tablet_key abnormal, nums_scanned=" << nums_scanned
1868
1
                     << ", nums_abnormal=" << nums_abnormal;
1869
1
        return 1;
1870
1
    }
1871
1
    LOG(INFO) << "finish check stats_tablet_key, nums_scanned=" << nums_scanned
1872
1
              << ", nums_abnormal=" << nums_abnormal;
1873
1
    return 0;
1874
2
}
1875
1876
4
int InstanceChecker::check_stats_tablet_key_exists(std::string_view key, std::string_view value) {
1877
4
    std::string_view k1 = key;
1878
4
    k1.remove_prefix(1);
1879
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1880
4
    decode_key(&k1, &out);
1881
    // 0x01 "meta" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1882
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1883
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1884
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1885
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1886
4
    std::string tablet_stats_key =
1887
4
            stats_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1888
4
    int ret = key_exist(txn_kv_.get(), tablet_stats_key);
1889
4
    if (ret == 1) {
1890
        // clang-format off
1891
1
        LOG(WARNING) << "stats tablet key's tablet key loss,"
1892
1
                    << " stats tablet key=" << hex(tablet_stats_key)
1893
1
                    << " meta tablet key=" << hex(key);
1894
        // clang-format on
1895
1
        return 1;
1896
3
    } else if (ret == -1) {
1897
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_stats_key);
1898
0
        return -1;
1899
0
    }
1900
3
    LOG(INFO) << "check stats_tablet_key_exists ok, key=" << hex(key);
1901
3
    return 0;
1902
4
}
1903
1904
4
int InstanceChecker::check_stats_tablet_key_leaked(std::string_view key, std::string_view value) {
1905
4
    std::string_view k1 = key;
1906
4
    k1.remove_prefix(1);
1907
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1908
4
    decode_key(&k1, &out);
1909
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1910
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1911
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1912
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1913
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1914
4
    std::string tablet_key =
1915
4
            meta_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1916
4
    int ret = key_exist(txn_kv_.get(), tablet_key);
1917
4
    if (ret == 1) {
1918
        // clang-format off
1919
1
        LOG(WARNING) << "stats tablet key's tablet key leak,"
1920
1
                    << " stats tablet key=" << hex(key)
1921
1
                    << " meta tablet key=" << hex(tablet_key);
1922
        // clang-format on
1923
1
        return 1;
1924
3
    } else if (ret == -1) {
1925
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_key);
1926
0
        return -1;
1927
0
    }
1928
3
    LOG(INFO) << "check stats_tablet_key_leaked ok, key=" << hex(key);
1929
3
    return 0;
1930
4
}
1931
1932
2
int InstanceChecker::check_stats_tablet_key(std::string_view key, std::string_view value) {
1933
2
    TabletStatsPB tablet_stats_pb;
1934
2
    std::string_view k1 = key;
1935
2
    k1.remove_prefix(1);
1936
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1937
2
    decode_key(&k1, &out);
1938
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1939
2
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1940
2
    std::unique_ptr<Transaction> txn;
1941
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1942
2
    if (err != TxnErrorCode::TXN_OK) {
1943
0
        LOG_WARNING("failed to recycle tablet ")
1944
0
                .tag("tablet id", tablet_id)
1945
0
                .tag("instance_id", instance_id_)
1946
0
                .tag("reason", "failed to create txn");
1947
0
        return -1;
1948
0
    }
1949
2
    std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id});
1950
2
    std::string tablet_idx_val;
1951
2
    TabletIndexPB tablet_idx;
1952
2
    err = txn->get(tablet_idx_key, &tablet_idx_val);
1953
2
    if (err != TxnErrorCode::TXN_OK) {
1954
        // clang-format off
1955
0
        LOG(WARNING) << "failed to get tablet index key,"
1956
0
                        << " key=" << hex(tablet_idx_key)
1957
0
                        << " code=" << err;
1958
        // clang-format on
1959
0
        return -1;
1960
0
    }
1961
2
    tablet_idx.ParseFromString(tablet_idx_val);
1962
2
    MetaServiceCode code = MetaServiceCode::OK;
1963
2
    std::string msg;
1964
2
    internal_get_tablet_stats(code, msg, txn.get(), instance_id_, tablet_idx, tablet_stats_pb);
1965
2
    if (code != MetaServiceCode::OK) {
1966
        // clang-format off
1967
0
        LOG(WARNING) << "failed to get tablet stats,"
1968
0
                        << " code=" << code 
1969
0
                        << " msg=" << msg;
1970
        // clang-format on
1971
0
        return -1;
1972
0
    }
1973
1974
2
    GetRowsetResponse resp;
1975
    // get rowsets in tablet
1976
2
    internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id_,
1977
2
                        tablet_id, code, msg, &resp);
1978
2
    if (code != MetaServiceCode::OK) {
1979
0
        LOG_WARNING("failed to get rowsets of tablet when check stats tablet key")
1980
0
                .tag("tablet id", tablet_id)
1981
0
                .tag("msg", msg)
1982
0
                .tag("code", code)
1983
0
                .tag("instance id", instance_id_);
1984
0
        return -1;
1985
0
    }
1986
2
    int64_t num_rows = 0;
1987
2
    int64_t num_rowsets = 0;
1988
2
    int64_t num_segments = 0;
1989
2
    int64_t total_data_size = 0;
1990
2
    for (const auto& rs_meta : resp.rowset_meta()) {
1991
2
        num_rows += rs_meta.num_rows();
1992
2
        num_rowsets++;
1993
2
        num_segments += rs_meta.num_segments();
1994
2
        total_data_size += rs_meta.total_disk_size();
1995
2
    }
1996
2
    int ret = 0;
1997
2
    if (tablet_stats_pb.data_size() != total_data_size) {
1998
1
        ret = 1;
1999
        // clang-format off
2000
1
        LOG(WARNING) << " tablet_stats_pb's data size is not same with all rowset total data size,"
2001
1
                        << " tablet_stats_pb's data size=" << tablet_stats_pb.data_size()
2002
1
                        << " all rowset total data size=" << total_data_size
2003
1
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2004
        // clang-format on
2005
1
    } else if (tablet_stats_pb.num_rows() != num_rows) {
2006
0
        ret = 1;
2007
        // clang-format off
2008
0
        LOG(WARNING) << " tablet_stats_pb's num_rows is not same with all rowset total num_rows,"
2009
0
                        << " tablet_stats_pb's num_rows=" << tablet_stats_pb.num_rows()
2010
0
                        << " all rowset total num_rows=" << num_rows
2011
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2012
        // clang-format on
2013
1
    } else if (tablet_stats_pb.num_rowsets() != num_rowsets) {
2014
0
        ret = 1;
2015
        // clang-format off
2016
0
        LOG(WARNING) << " tablet_stats_pb's num_rowsets is not same with all rowset nums,"
2017
0
                        << " tablet_stats_pb's num_rowsets=" << tablet_stats_pb.num_rowsets()
2018
0
                        << " all rowset nums=" << num_rowsets
2019
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2020
        // clang-format on
2021
1
    } else if (tablet_stats_pb.num_segments() != num_segments) {
2022
0
        ret = 1;
2023
        // clang-format off
2024
0
        LOG(WARNING) << " tablet_stats_pb's num_segments is not same with all rowset total num_segments,"
2025
0
                        << " tablet_stats_pb's num_segments=" << tablet_stats_pb.num_segments()
2026
0
                        << " all rowset total num_segments=" << num_segments
2027
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2028
        // clang-format on
2029
0
    }
2030
2031
2
    return ret;
2032
2
}
2033
2034
int InstanceChecker::scan_and_handle_kv(
2035
        std::string& start_key, const std::string& end_key,
2036
12
        std::function<int(std::string_view, std::string_view)> handle_kv) {
2037
12
    std::unique_ptr<Transaction> txn;
2038
12
    int ret = 0;
2039
12
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2040
12
    if (err != TxnErrorCode::TXN_OK) {
2041
0
        LOG(WARNING) << "failed to init txn";
2042
0
        return -1;
2043
0
    }
2044
12
    std::unique_ptr<RangeGetIterator> it;
2045
12
    int limit = 10000;
2046
12
    TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:limit", &limit);
2047
13
    do {
2048
13
        err = txn->get(start_key, end_key, &it, false, limit);
2049
13
        TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:get_err", &err);
2050
13
        if (err == TxnErrorCode::TXN_TOO_OLD) {
2051
1
            LOG(WARNING) << "failed to get range kv, err=txn too old, "
2052
1
                         << " now fallback to non snapshot scan";
2053
1
            err = txn_kv_->create_txn(&txn);
2054
1
            if (err == TxnErrorCode::TXN_OK) {
2055
1
                err = txn->get(start_key, end_key, &it);
2056
1
            }
2057
1
        }
2058
13
        if (err != TxnErrorCode::TXN_OK) {
2059
0
            LOG(WARNING) << "internal error, failed to get range kv, err=" << err;
2060
0
            return -1;
2061
0
        }
2062
2063
229
        while (it->has_next() && !stopped()) {
2064
216
            auto [k, v] = it->next();
2065
2066
216
            int handle_ret = handle_kv(k, v);
2067
216
            if (handle_ret == -1) {
2068
0
                return -1;
2069
216
            } else {
2070
216
                ret = std::max(ret, handle_ret);
2071
216
            }
2072
216
            if (!it->has_next()) {
2073
13
                start_key = k;
2074
13
            }
2075
216
        }
2076
13
        start_key = it->next_begin_key();
2077
13
    } while (it->more() && !stopped());
2078
12
    return ret;
2079
12
}
2080
2081
1
int InstanceChecker::do_restore_job_check() {
2082
1
    int64_t num_prepared = 0;
2083
1
    int64_t num_committed = 0;
2084
1
    int64_t num_dropped = 0;
2085
1
    int64_t num_completed = 0;
2086
1
    int64_t num_recycling = 0;
2087
1
    int64_t num_cost_many_time = 0;
2088
1
    const int64_t COST_MANY_THRESHOLD = 3600;
2089
2090
1
    using namespace std::chrono;
2091
1
    auto start_time = steady_clock::now();
2092
1
    DORIS_CLOUD_DEFER {
2093
1
        g_bvar_checker_restore_job_prepared_state.put(instance_id_, num_prepared);
2094
1
        g_bvar_checker_restore_job_committed_state.put(instance_id_, num_committed);
2095
1
        g_bvar_checker_restore_job_dropped_state.put(instance_id_, num_dropped);
2096
1
        g_bvar_checker_restore_job_completed_state.put(instance_id_, num_completed);
2097
1
        g_bvar_checker_restore_job_recycling_state.put(instance_id_, num_recycling);
2098
1
        g_bvar_checker_restore_job_cost_many_time.put(instance_id_, num_cost_many_time);
2099
1
        auto cost_ms =
2100
1
                duration_cast<std::chrono::milliseconds>(steady_clock::now() - start_time).count();
2101
1
        LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
2102
1
                  << "ms. instance_id=" << instance_id_ << " num_prepared=" << num_prepared
2103
1
                  << " num_committed=" << num_committed << " num_dropped=" << num_dropped
2104
1
                  << " num_completed=" << num_completed << " num_recycling=" << num_recycling
2105
1
                  << " num_cost_many_time=" << num_cost_many_time;
2106
1
    };
2107
2108
1
    LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
2109
2110
1
    JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
2111
1
    JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
2112
1
    std::string begin;
2113
1
    std::string end;
2114
1
    job_restore_tablet_key(restore_job_key_info0, &begin);
2115
1
    job_restore_tablet_key(restore_job_key_info1, &end);
2116
1
    std::unique_ptr<RangeGetIterator> it;
2117
1
    do {
2118
1
        std::unique_ptr<Transaction> txn;
2119
1
        TxnErrorCode err = txn_kv_->create_txn(&txn);
2120
1
        if (err != TxnErrorCode::TXN_OK) {
2121
0
            LOG(WARNING) << "failed to create txn";
2122
0
            return -1;
2123
0
        }
2124
1
        err = txn->get(begin, end, &it);
2125
1
        if (err != TxnErrorCode::TXN_OK) {
2126
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2127
0
            return -1;
2128
0
        }
2129
1
        if (!it->has_next()) {
2130
0
            break;
2131
0
        }
2132
3
        while (it->has_next()) {
2133
3
            auto [k, v] = it->next();
2134
3
            RestoreJobCloudPB restore_job_pb;
2135
3
            if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
2136
0
                LOG_WARNING("malformed restore job value").tag("key", hex(k));
2137
0
                return -1;
2138
0
            }
2139
2140
3
            switch (restore_job_pb.state()) {
2141
1
            case RestoreJobCloudPB::PREPARED:
2142
1
                ++num_prepared;
2143
1
                break;
2144
1
            case RestoreJobCloudPB::COMMITTED:
2145
1
                ++num_committed;
2146
1
                break;
2147
0
            case RestoreJobCloudPB::DROPPED:
2148
0
                ++num_dropped;
2149
0
                break;
2150
1
            case RestoreJobCloudPB::COMPLETED:
2151
1
                ++num_completed;
2152
1
                break;
2153
0
            case RestoreJobCloudPB::RECYCLING:
2154
0
                ++num_recycling;
2155
0
                break;
2156
0
            default:
2157
0
                break;
2158
3
            }
2159
2160
3
            int64_t current_time = ::time(nullptr);
2161
3
            if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
2162
3
                 restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
2163
3
                current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD) {
2164
                // restore job run more than 1 hour
2165
1
                ++num_cost_many_time;
2166
1
                LOG_WARNING("restore job cost too many time")
2167
1
                        .tag("key", hex(k))
2168
1
                        .tag("tablet_id", restore_job_pb.tablet_id())
2169
1
                        .tag("state", restore_job_pb.state())
2170
1
                        .tag("ctime_s", restore_job_pb.ctime_s())
2171
1
                        .tag("mtime_s", restore_job_pb.mtime_s());
2172
1
            }
2173
2174
3
            if (!it->has_next()) {
2175
1
                begin = k;
2176
1
                begin.push_back('\x00'); // Update to next smallest key for iteration
2177
1
                break;
2178
1
            }
2179
3
        }
2180
1
    } while (it->more() && !stopped());
2181
1
    return 0;
2182
1
}
2183
2184
3
int InstanceChecker::check_txn_info_key(std::string_view key, std::string_view value) {
2185
3
    std::unordered_map<int64_t, std::string> txn_info_;
2186
3
    TxnLabelPB txn_label_pb;
2187
2188
6
    auto handle_check_txn_label_key = [&](std::string_view key, std::string_view value) -> int {
2189
6
        TxnInfoPB txn_info_pb;
2190
6
        std::string_view k1 = key;
2191
6
        k1.remove_prefix(1);
2192
6
        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2193
6
        decode_key(&k1, &out);
2194
        // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2195
6
        if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2196
0
            LOG(WARNING) << "failed to parse TxnInfoPB";
2197
0
            return -1;
2198
0
        }
2199
6
        auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2200
6
        auto it = txn_info_.find(txn_id);
2201
6
        if (it == txn_info_.end()) {
2202
0
            return 0;
2203
6
        } else {
2204
6
            if (it->second != txn_info_pb.label()) {
2205
1
                LOG(WARNING) << "txn_info_pb's txn_label not same with txn_label_pb's txn_label,"
2206
1
                             << " txn_info_pb's txn_label: " << txn_info_pb.label()
2207
1
                             << " txn_label_pb meta: " << txn_label_pb.ShortDebugString();
2208
1
                return 1;
2209
1
            }
2210
6
        }
2211
5
        return 0;
2212
6
    };
2213
3
    std::string_view k1 = key;
2214
3
    k1.remove_prefix(1);
2215
3
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2216
3
    decode_key(&k1, &out);
2217
    // 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
2218
3
    if (!txn_label_pb.ParseFromArray(value.data(), value.size() - VERSION_STAMP_LEN)) {
2219
1
        LOG(WARNING) << "failed to parse TxnLabelPB";
2220
1
        return -1;
2221
1
    }
2222
2
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2223
2
    auto label = std::get<std::string>(std::get<0>(out[4]));
2224
    // txn_id -> txn_label
2225
6
    for (const auto& txn_id : txn_label_pb.txn_ids()) {
2226
6
        txn_info_.insert({txn_id, label});
2227
6
    }
2228
2
    std::string txn_info_key_begin = txn_info_key({instance_id_, db_id, 0});
2229
2
    std::string txn_info_key_end = txn_info_key({instance_id_, db_id, INT64_MAX});
2230
2
    return scan_and_handle_kv(txn_info_key_begin, txn_info_key_end,
2231
6
                              [&](std::string_view k, std::string_view v) -> int {
2232
6
                                  return handle_check_txn_label_key(k, v);
2233
6
                              });
2234
3
}
2235
2236
6
int InstanceChecker::check_txn_label_key(std::string_view key, std::string_view value) {
2237
6
    TxnInfoPB txn_info_pb;
2238
6
    std::string_view k1 = key;
2239
6
    k1.remove_prefix(1);
2240
6
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2241
6
    decode_key(&k1, &out);
2242
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2243
6
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2244
1
        LOG(WARNING) << "failed to parse TxnInfoPB";
2245
1
        return -1;
2246
1
    }
2247
5
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2248
5
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2249
5
    auto label = txn_info_pb.label();
2250
5
    std::string txn_label = txn_label_key({instance_id_, db_id, label});
2251
5
    std::string txn_label_val;
2252
5
    TxnLabelPB txn_label_pb;
2253
5
    std::unique_ptr<Transaction> txn;
2254
5
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2255
5
    if (err != TxnErrorCode::TXN_OK) {
2256
0
        LOG(WARNING) << "failed to init txn";
2257
0
        return -1;
2258
0
    }
2259
5
    if (txn->get(txn_label, &txn_label_val) != TxnErrorCode::TXN_OK) {
2260
1
        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_label);
2261
1
        return -1;
2262
1
    }
2263
4
    txn_label_pb.ParseFromString(txn_label_val);
2264
4
    auto txn_ids = txn_label_pb.txn_ids();
2265
4
    if (!std::count(txn_ids.begin(), txn_ids.end(), txn_id)) {
2266
        // clang-format off txn_info_pb
2267
1
        LOG(WARNING) << "txn_info_pb's txn_id not found in txn_label_pb info,"
2268
1
                     << " txn_id: " << txn_id
2269
1
                     << " txn_label_pb meta: " << txn_label_pb.ShortDebugString();
2270
        // clang-format on
2271
1
        return 1;
2272
1
    }
2273
3
    return 0;
2274
4
}
2275
2276
4
int InstanceChecker::check_txn_index_key(std::string_view key, std::string_view value) {
2277
4
    TxnInfoPB txn_info_pb;
2278
4
    std::string_view k1 = key;
2279
4
    k1.remove_prefix(1);
2280
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2281
4
    decode_key(&k1, &out);
2282
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2283
4
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2284
1
        LOG(WARNING) << "failed to parse TxnInfoPB";
2285
1
        return -1;
2286
1
    }
2287
3
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2288
3
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2289
    /// get tablet id
2290
3
    std::string txn_index = txn_index_key({instance_id_, txn_id});
2291
3
    std::string txn_index_val;
2292
3
    TxnIndexPB txn_index_pb;
2293
3
    std::unique_ptr<Transaction> txn;
2294
3
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2295
3
    if (err != TxnErrorCode::TXN_OK) {
2296
0
        LOG(WARNING) << "failed to init txn";
2297
0
        return -1;
2298
0
    }
2299
3
    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
2300
1
        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_index);
2301
1
        return -1;
2302
1
    }
2303
2
    txn_index_pb.ParseFromString(txn_index_val);
2304
2
    if (txn_index_pb.tablet_index().db_id() != db_id) {
2305
        // clang-format off txn_info_pb
2306
1
        LOG(WARNING) << "txn_index_pb's db_id not same with txn_info_pb's db_id,"
2307
1
                     << " txn_index_pb meta: " << txn_index_pb.ShortDebugString()
2308
1
                     << " txn_info_pb meta: " << txn_info_pb.ShortDebugString();
2309
        // clang-format on
2310
1
        return 1;
2311
1
    }
2312
1
    return 0;
2313
2
}
2314
2315
3
int InstanceChecker::check_txn_running_key(std::string_view key, std::string_view value) {
2316
3
    TxnRunningPB txn_running_pb;
2317
3
    int64_t current_time =
2318
3
            duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
2319
3
    if (!txn_running_pb.ParseFromArray(value.data(), value.size())) {
2320
1
        LOG(WARNING) << "failed to parse TxnRunningPB";
2321
1
        return -1;
2322
1
    }
2323
2
    if (txn_running_pb.timeout_time() <= current_time) {
2324
1
        LOG(WARNING) << "txn_running_pb.timeout_time() is less than current_time,"
2325
1
                     << " but txn_running_key exists, "
2326
1
                     << " txn_running_pb meta: " << txn_running_pb.ShortDebugString();
2327
1
        return 1;
2328
1
    }
2329
1
    return 0;
2330
2
}
2331
2332
0
int InstanceChecker::do_txn_key_check() {
2333
0
    int ret = 0;
2334
2335
    // check txn info key depend on txn label key
2336
0
    std::string begin = txn_label_key({instance_id_, 0, ""});
2337
0
    std::string end = txn_label_key({instance_id_, INT64_MAX, ""});
2338
0
    int64_t num_scanned = 0;
2339
0
    int64_t num_abnormal = 0;
2340
0
    LOG(INFO) << "begin check txn_label_key and txn_info_key";
2341
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2342
0
        num_scanned++;
2343
0
        int ret = check_txn_info_key(k, v);
2344
0
        if (ret == 1) {
2345
0
            num_abnormal++;
2346
0
        }
2347
0
        return ret;
2348
0
    });
2349
2350
0
    if (ret == 1) {
2351
0
        LOG(WARNING) << "failed to check txn_info_key depending on txn_label_key, num_scanned="
2352
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2353
0
        return 1;
2354
0
    } else if (ret == -1) {
2355
0
        LOG(WARNING) << "failed to check txn label key and txn info key";
2356
0
        return -1;
2357
0
    }
2358
2359
    // check txn label key depend on txn info key
2360
0
    begin = txn_info_key({instance_id_, 0, 0});
2361
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2362
0
    num_scanned = 0;
2363
0
    num_abnormal = 0;
2364
0
    LOG(INFO) << "begin check txn_label_key and txn_info_key";
2365
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2366
0
        num_scanned++;
2367
0
        int ret = check_txn_label_key(k, v);
2368
0
        if (ret == 1) {
2369
0
            num_abnormal++;
2370
0
        }
2371
0
        return ret;
2372
0
    });
2373
0
    if (ret == 1) {
2374
0
        LOG(WARNING) << "failed to check txn_label_key depending on txn_info_key, num_scanned="
2375
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2376
0
        return 1;
2377
0
    } else if (ret == -1) {
2378
0
        LOG(WARNING) << "failed to inverted check txn label key and txn info key";
2379
0
        return -1;
2380
0
    }
2381
0
    LOG(INFO) << "finish check txn_label_key and txn_info_key, num_scanned=" << num_scanned
2382
0
              << ", num_abnormal=" << num_abnormal;
2383
2384
    // check txn index key depend on txn info key
2385
0
    begin = txn_info_key({instance_id_, 0, 0});
2386
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2387
0
    num_scanned = 0;
2388
0
    num_abnormal = 0;
2389
0
    LOG(INFO) << "begin check txn_index_key and txn_info_key";
2390
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2391
0
        num_scanned++;
2392
0
        int ret = check_txn_index_key(k, v);
2393
0
        if (ret == 1) {
2394
0
            num_abnormal++;
2395
0
        }
2396
0
        return ret;
2397
0
    });
2398
0
    if (ret == 1) {
2399
0
        LOG(WARNING) << "failed to check txn_idx_key depending on txn_info_key, num_scanned="
2400
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2401
0
        return 1;
2402
0
    } else if (ret == -1) {
2403
0
        LOG(WARNING) << "failed to check txn index key";
2404
0
        return -1;
2405
0
    }
2406
0
    LOG(INFO) << "finish check txn_index_key and txn_info_key, num_scanned=" << num_scanned
2407
0
              << ", num_abnormal=" << num_abnormal;
2408
2409
    // check txn running key
2410
0
    begin = txn_running_key({instance_id_, 0, 0});
2411
0
    end = txn_running_key({instance_id_, INT64_MAX, 0});
2412
0
    num_scanned = 0;
2413
0
    num_abnormal = 0;
2414
0
    LOG(INFO) << "begin check txn_running_key";
2415
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2416
0
        num_scanned++;
2417
0
        int ret = check_txn_running_key(k, v);
2418
0
        if (ret == 1) {
2419
0
            num_abnormal++;
2420
0
        }
2421
0
        return ret;
2422
0
    });
2423
0
    if (ret == 1) {
2424
0
        LOG(WARNING) << "failed to check txn_running_key, num_scanned=" << num_scanned
2425
0
                     << ", num_abnormal=" << num_abnormal;
2426
0
        return 1;
2427
0
    } else if (ret == -1) {
2428
0
        LOG(WARNING) << "failed to check txn running key";
2429
0
        return -1;
2430
0
    }
2431
0
    LOG(INFO) << "finish check txn_running_key, num_scanned=" << num_scanned
2432
0
              << ", num_abnormal=" << num_abnormal;
2433
0
    return 0;
2434
0
}
2435
2436
2
int InstanceChecker::check_meta_tmp_rowset_key(std::string_view key, std::string_view value) {
2437
2
    TxnInfoPB txn_info_pb;
2438
2
    std::string_view k1 = key;
2439
2
    k1.remove_prefix(1);
2440
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2441
2
    decode_key(&k1, &out);
2442
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2443
2
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2444
0
        LOG(WARNING) << "failed to parse TxnInfoPB";
2445
0
        return -1;
2446
0
    }
2447
    /// get tablet id
2448
2
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2449
2
    std::string txn_index = txn_index_key({instance_id_, txn_id});
2450
2
    std::string txn_index_val;
2451
2
    TxnIndexPB txn_index_pb;
2452
2
    std::unique_ptr<Transaction> txn;
2453
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2454
2
    if (err != TxnErrorCode::TXN_OK) {
2455
0
        LOG(WARNING) << "failed to init txn";
2456
0
        return -1;
2457
0
    }
2458
2
    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
2459
0
        LOG(WARNING) << "failed to get txn index key, key=" << txn_index;
2460
0
        return -1;
2461
0
    }
2462
2
    txn_index_pb.ParseFromString(txn_index_val);
2463
2
    auto tablet_id = txn_index_pb.tablet_index().tablet_id();
2464
2
    std::string meta_tmp_rowset_key = meta_rowset_tmp_key({instance_id_, txn_id, tablet_id});
2465
2
    int is_key_exist = key_exist(txn_kv_.get(), meta_tmp_rowset_key);
2466
2
    if (is_key_exist == 1) {
2467
0
        if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_VISIBLE) {
2468
            // clang-format off
2469
0
            LOG(INFO) << "meta tmp rowset key not exist but txn status != TXN_STATUS_VISIBLE"
2470
0
                        << "meta tmp rowset key=" << meta_tmp_rowset_key
2471
0
                        << "txn_info=" << txn_info_pb.ShortDebugString();
2472
            // clang-format on
2473
0
            return 1;
2474
0
        }
2475
2
    } else if (is_key_exist == 0) {
2476
2
        if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
2477
            // clang-format off
2478
1
            LOG(INFO) << "meta tmp rowset key exist but txn status != TXN_STATUS_PREPARED"
2479
1
                        << "meta tmp rowset key=" << meta_tmp_rowset_key
2480
1
                        << "txn_info=" << txn_info_pb.ShortDebugString();
2481
            // clang-format on
2482
1
            return 1;
2483
1
        }
2484
2
    } else {
2485
0
        LOG(WARNING) << "failed to get key, key=" << meta_tmp_rowset_key;
2486
0
        return -1;
2487
0
    }
2488
1
    return 0;
2489
2
}
2490
2491
2
int InstanceChecker::check_meta_rowset_key(std::string_view key, std::string_view value) {
2492
2
    RowsetMetaCloudPB meta_rowset_pb;
2493
2
    if (!meta_rowset_pb.ParseFromArray(value.data(), value.size())) {
2494
0
        LOG(WARNING) << "failed to parse RowsetMetaCloudPB";
2495
0
        return -1;
2496
0
    }
2497
2
    std::string tablet_index_key = meta_tablet_idx_key({instance_id_, meta_rowset_pb.tablet_id()});
2498
2
    if (key_exist(txn_kv_.get(), tablet_index_key) == 1) {
2499
1
        LOG(WARNING) << "rowset's tablet id not found in fdb"
2500
1
                     << "tablet_index_key: " << tablet_index_key
2501
1
                     << "rowset meta: " << meta_rowset_pb.ShortDebugString();
2502
1
        return 1;
2503
1
    }
2504
1
    return 0;
2505
2
}
2506
2507
0
int InstanceChecker::do_meta_rowset_key_check() {
2508
0
    int ret = 0;
2509
2510
0
    std::string begin = meta_rowset_key({instance_id_, 0, 0});
2511
0
    std::string end = meta_rowset_key({instance_id_, INT64_MAX, 0});
2512
0
    int64_t num_scanned = 0;
2513
0
    int64_t num_loss = 0;
2514
2515
0
    ret = scan_and_handle_kv(begin, end, [&](std::string_view k, std::string_view v) {
2516
0
        num_scanned++;
2517
0
        int ret = check_meta_rowset_key(k, v);
2518
0
        if (ret == 1) {
2519
0
            num_loss++;
2520
0
        }
2521
0
        return ret;
2522
0
    });
2523
0
    if (ret == -1) {
2524
0
        LOG(WARNING) << "failed to check meta rowset key,";
2525
0
        return -1;
2526
0
    } else if (ret == 1) {
2527
0
        LOG(WARNING) << "meta rowset key may be loss, num_scanned=" << num_scanned
2528
0
                     << ", num_loss=" << num_loss;
2529
0
    }
2530
0
    LOG(INFO) << "meta rowset key check finish, num_scanned=" << num_scanned
2531
0
              << ", num_loss=" << num_loss;
2532
2533
0
    begin = txn_info_key({instance_id_, 0, 0});
2534
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2535
0
    num_scanned = 0;
2536
0
    num_loss = 0;
2537
2538
0
    ret = scan_and_handle_kv(begin, end, [&](std::string_view k, std::string_view v) {
2539
0
        num_scanned++;
2540
0
        int ret = check_meta_tmp_rowset_key(k, v);
2541
0
        if (ret == 1) {
2542
0
            num_loss++;
2543
0
        }
2544
0
        return ret;
2545
0
    });
2546
0
    if (ret == -1) {
2547
0
        LOG(WARNING) << "failed to check tmp meta rowset key";
2548
0
        return -1;
2549
0
    } else if (ret == 1) {
2550
0
        LOG(WARNING) << "meta tmp rowset key may be loss, num_scanned=" << num_scanned
2551
0
                     << ", num_loss=" << num_loss;
2552
0
    }
2553
0
    LOG(INFO) << "meta tmp rowset key check finish, num_scanned=" << num_scanned
2554
0
              << ", num_loss=" << num_loss;
2555
2556
0
    return ret;
2557
0
}
2558
2559
} // namespace doris::cloud