Coverage Report

Created: 2025-12-26 16:18

/root/doris/cloud/src/recycler/checker.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/checker.h"
19
20
#include <aws/s3/S3Client.h>
21
#include <aws/s3/model/ListObjectsV2Request.h>
22
#include <butil/endpoint.h>
23
#include <butil/strings/string_split.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/cloud.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <chrono>
31
#include <cstdint>
32
#include <functional>
33
#include <memory>
34
#include <mutex>
35
#include <numeric>
36
#include <sstream>
37
#include <string_view>
38
#include <unordered_set>
39
#include <vector>
40
41
#include "common/bvars.h"
42
#include "common/config.h"
43
#include "common/defer.h"
44
#include "common/encryption_util.h"
45
#include "common/logging.h"
46
#include "common/util.h"
47
#include "cpp/sync_point.h"
48
#include "meta-service/meta_service.h"
49
#include "meta-service/meta_service_schema.h"
50
#include "meta-service/meta_service_tablet_stats.h"
51
#include "meta-store/blob_message.h"
52
#include "meta-store/keys.h"
53
#include "meta-store/txn_kv.h"
54
#include "snapshot/snapshot_manager.h"
55
#ifdef ENABLE_HDFS_STORAGE_VAULT
56
#include "recycler/hdfs_accessor.h"
57
#endif
58
#include "recycler/s3_accessor.h"
59
#include "recycler/storage_vault_accessor.h"
60
#ifdef UNIT_TEST
61
#include "../test/mock_accessor.h"
62
#endif
63
#include "recycler/util.h"
64
65
namespace doris::cloud {
66
namespace config {
67
extern int32_t brpc_listen_port;
68
extern int32_t scan_instances_interval_seconds;
69
extern int32_t recycle_job_lease_expired_ms;
70
extern int32_t recycle_concurrency;
71
extern std::vector<std::string> recycle_whitelist;
72
extern std::vector<std::string> recycle_blacklist;
73
extern bool enable_inverted_check;
74
} // namespace config
75
76
using namespace std::chrono;
77
78
5
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
79
5
    ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port);
80
5
}
81
82
5
Checker::~Checker() {
83
5
    if (!stopped()) {
84
1
        stop();
85
1
    }
86
5
}
87
88
4
int Checker::start() {
89
4
    DCHECK(txn_kv_);
90
4
    instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
91
92
    // launch instance scanner
93
4
    auto scanner_func = [this]() {
94
4
        std::this_thread::sleep_for(
95
4
                std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
96
8
        while (!stopped()) {
97
4
            std::vector<InstanceInfoPB> instances;
98
4
            get_all_instances(txn_kv_.get(), instances);
99
4
            LOG(INFO) << "Checker get instances: " << [&instances] {
100
4
                std::stringstream ss;
101
30
                for (auto& i : instances) ss << ' ' << i.instance_id();
102
4
                return ss.str();
103
4
            }();
104
4
            if (!instances.empty()) {
105
                // enqueue instances
106
3
                std::lock_guard lock(mtx_);
107
30
                for (auto& instance : instances) {
108
30
                    if (instance_filter_.filter_out(instance.instance_id())) continue;
109
30
                    if (instance.status() == InstanceInfoPB::DELETED) continue;
110
30
                    using namespace std::chrono;
111
30
                    auto enqueue_time_s =
112
30
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
113
30
                    auto [_, success] =
114
30
                            pending_instance_map_.insert({instance.instance_id(), enqueue_time_s});
115
                    // skip instance already in pending queue
116
30
                    if (success) {
117
30
                        pending_instance_queue_.push_back(std::move(instance));
118
30
                    }
119
30
                }
120
3
                pending_instance_cond_.notify_all();
121
3
            }
122
4
            {
123
4
                std::unique_lock lock(mtx_);
124
4
                notifier_.wait_for(lock,
125
4
                                   std::chrono::seconds(config::scan_instances_interval_seconds),
126
7
                                   [&]() { return stopped(); });
127
4
            }
128
4
        }
129
4
    };
130
4
    workers_.emplace_back(scanner_func);
131
    // Launch lease thread
132
4
    workers_.emplace_back([this] { lease_check_jobs(); });
133
    // Launch inspect thread
134
4
    workers_.emplace_back([this] { inspect_instance_check_interval(); });
135
136
    // launch check workers
137
8
    auto checker_func = [this]() {
138
38
        while (!stopped()) {
139
            // fetch instance to check
140
36
            InstanceInfoPB instance;
141
36
            long enqueue_time_s = 0;
142
36
            {
143
36
                std::unique_lock lock(mtx_);
144
48
                pending_instance_cond_.wait(lock, [&]() -> bool {
145
48
                    return !pending_instance_queue_.empty() || stopped();
146
48
                });
147
36
                if (stopped()) {
148
6
                    return;
149
6
                }
150
30
                instance = std::move(pending_instance_queue_.front());
151
30
                pending_instance_queue_.pop_front();
152
30
                enqueue_time_s = pending_instance_map_[instance.instance_id()];
153
30
                pending_instance_map_.erase(instance.instance_id());
154
30
            }
155
0
            const auto& instance_id = instance.instance_id();
156
30
            {
157
30
                std::lock_guard lock(mtx_);
158
                // skip instance in recycling
159
30
                if (working_instance_map_.count(instance_id)) continue;
160
30
            }
161
30
            auto checker = std::make_shared<InstanceChecker>(txn_kv_, instance.instance_id());
162
30
            if (checker->init(instance) != 0) {
163
0
                LOG(WARNING) << "failed to init instance checker, instance_id="
164
0
                             << instance.instance_id();
165
0
                continue;
166
0
            }
167
30
            std::string check_job_key;
168
30
            job_check_key({instance.instance_id()}, &check_job_key);
169
30
            int ret = prepare_instance_recycle_job(txn_kv_.get(), check_job_key,
170
30
                                                   instance.instance_id(), ip_port_,
171
30
                                                   config::check_object_interval_seconds * 1000);
172
30
            if (ret != 0) { // Prepare failed
173
20
                continue;
174
20
            } else {
175
10
                std::lock_guard lock(mtx_);
176
10
                working_instance_map_.emplace(instance_id, checker);
177
10
            }
178
10
            if (stopped()) return;
179
10
            using namespace std::chrono;
180
10
            auto ctime_ms =
181
10
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
182
10
            g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
183
184
10
            bool success {true};
185
186
10
            if (int ret = checker->do_check(); ret != 0) {
187
0
                success = false;
188
0
            }
189
190
10
            if (config::enable_inverted_check) {
191
0
                if (int ret = checker->do_inverted_check(); ret != 0) {
192
0
                    success = false;
193
0
                }
194
0
            }
195
196
10
            if (config::enable_delete_bitmap_inverted_check) {
197
0
                if (int ret = checker->do_delete_bitmap_inverted_check(); ret != 0) {
198
0
                    success = false;
199
0
                }
200
0
            }
201
202
10
            if (config::enable_mow_job_key_check) {
203
0
                if (int ret = checker->do_mow_job_key_check(); ret != 0) {
204
0
                    success = false;
205
0
                }
206
0
            }
207
208
10
            if (config::enable_tablet_stats_key_check) {
209
0
                if (int ret = checker->do_tablet_stats_key_check(); ret != 0) {
210
0
                    success = false;
211
0
                }
212
0
            }
213
214
10
            if (config::enable_restore_job_check) {
215
0
                if (int ret = checker->do_restore_job_check(); ret != 0) {
216
0
                    success = false;
217
0
                }
218
0
            }
219
220
10
            if (config::enable_txn_key_check) {
221
0
                if (int ret = checker->do_txn_key_check(); ret != 0) {
222
0
                    success = false;
223
0
                }
224
0
            }
225
226
10
            if (config::enable_meta_rowset_key_check) {
227
0
                if (int ret = checker->do_meta_rowset_key_check(); ret != 0) {
228
0
                    success = false;
229
0
                }
230
0
            }
231
232
10
            if (config::enable_delete_bitmap_storage_optimize_v2_check) {
233
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
234
0
                    ret != 0) {
235
0
                    success = false;
236
0
                }
237
0
            }
238
239
10
            if (config::enable_version_key_check) {
240
0
                if (int ret = checker->do_version_key_check(); ret != 0) {
241
0
                    success = false;
242
0
                }
243
0
            }
244
245
10
            if (config::enable_snapshot_check) {
246
0
                if (int ret = checker->do_snapshots_check(); ret != 0) {
247
0
                    success = false;
248
0
                }
249
0
            }
250
251
            // If instance checker has been aborted, don't finish this job
252
10
            if (!checker->stopped()) {
253
10
                finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
254
10
                                            ip_port_, success, ctime_ms);
255
10
            }
256
10
            {
257
10
                std::lock_guard lock(mtx_);
258
10
                working_instance_map_.erase(instance.instance_id());
259
10
            }
260
10
        }
261
8
    };
262
4
    int num_threads = config::recycle_concurrency; // FIXME: use a new config entry?
263
12
    for (int i = 0; i < num_threads; ++i) {
264
8
        workers_.emplace_back(checker_func);
265
8
    }
266
4
    return 0;
267
4
}
268
269
5
void Checker::stop() {
270
5
    stopped_ = true;
271
5
    notifier_.notify_all();
272
5
    pending_instance_cond_.notify_all();
273
5
    {
274
5
        std::lock_guard lock(mtx_);
275
5
        for (auto& [_, checker] : working_instance_map_) {
276
0
            checker->stop();
277
0
        }
278
5
    }
279
20
    for (auto& w : workers_) {
280
20
        if (w.joinable()) w.join();
281
20
    }
282
5
}
283
284
4
void Checker::lease_check_jobs() {
285
54
    while (!stopped()) {
286
50
        std::vector<std::string> instances;
287
50
        instances.reserve(working_instance_map_.size());
288
50
        {
289
50
            std::lock_guard lock(mtx_);
290
50
            for (auto& [id, _] : working_instance_map_) {
291
30
                instances.push_back(id);
292
30
            }
293
50
        }
294
50
        for (auto& i : instances) {
295
30
            std::string check_job_key;
296
30
            job_check_key({i}, &check_job_key);
297
30
            int ret = lease_instance_recycle_job(txn_kv_.get(), check_job_key, i, ip_port_);
298
30
            if (ret == 1) {
299
0
                std::lock_guard lock(mtx_);
300
0
                if (auto it = working_instance_map_.find(i); it != working_instance_map_.end()) {
301
0
                    it->second->stop();
302
0
                }
303
0
            }
304
30
        }
305
50
        {
306
50
            std::unique_lock lock(mtx_);
307
50
            notifier_.wait_for(lock,
308
50
                               std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
309
101
                               [&]() { return stopped(); });
310
50
        }
311
50
    }
312
4
}
313
0
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
314
34
void Checker::do_inspect(const InstanceInfoPB& instance) {
315
34
    std::string check_job_key = job_check_key({instance.instance_id()});
316
34
    std::unique_ptr<Transaction> txn;
317
34
    std::string val;
318
34
    TxnErrorCode err = txn_kv_->create_txn(&txn);
319
34
    if (err != TxnErrorCode::TXN_OK) {
320
0
        LOG_CHECK_INTERVAL_ALARM << "failed to create txn";
321
0
        return;
322
0
    }
323
34
    err = txn->get(check_job_key, &val);
324
34
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
325
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get kv, err=" << err
326
0
                                 << " key=" << hex(check_job_key);
327
0
        return;
328
0
    }
329
34
    auto checker = InstanceChecker(txn_kv_, instance.instance_id());
330
34
    if (checker.init(instance) != 0) {
331
0
        LOG_CHECK_INTERVAL_ALARM << "failed to init instance checker, instance_id="
332
0
                                 << instance.instance_id();
333
0
        return;
334
0
    }
335
336
34
    int64_t bucket_lifecycle_days = 0;
337
34
    if (checker.get_bucket_lifecycle(&bucket_lifecycle_days) != 0) {
338
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get bucket lifecycle, instance_id="
339
0
                                 << instance.instance_id();
340
0
        return;
341
0
    }
342
34
    DCHECK(bucket_lifecycle_days > 0);
343
344
34
    if (bucket_lifecycle_days == INT64_MAX) {
345
        // No s3 bucket (may all accessors are HdfsAccessor), skip inspect
346
34
        return;
347
34
    }
348
349
0
    int64_t last_ctime_ms = -1;
350
0
    auto job_status = JobRecyclePB::IDLE;
351
0
    auto has_last_ctime = [&]() {
352
0
        JobRecyclePB job_info;
353
0
        if (!job_info.ParseFromString(val)) {
354
0
            LOG_CHECK_INTERVAL_ALARM << "failed to parse JobRecyclePB, key=" << hex(check_job_key);
355
0
        }
356
0
        DCHECK(job_info.instance_id() == instance.instance_id());
357
0
        if (!job_info.has_last_ctime_ms()) return false;
358
0
        last_ctime_ms = job_info.last_ctime_ms();
359
0
        job_status = job_info.status();
360
0
        g_bvar_checker_last_success_time_ms.put(instance.instance_id(),
361
0
                                                job_info.last_success_time_ms());
362
0
        return true;
363
0
    };
364
0
    using namespace std::chrono;
365
0
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
366
0
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || !has_last_ctime()) {
367
        // Use instance's ctime for instances that do not have job's last ctime
368
0
        last_ctime_ms = instance.ctime();
369
0
    }
370
0
    DCHECK(now - last_ctime_ms >= 0);
371
0
    int64_t expiration_ms =
372
0
            bucket_lifecycle_days > config::reserved_buffer_days
373
0
                    ? (bucket_lifecycle_days - config::reserved_buffer_days) * 86400000
374
0
                    : bucket_lifecycle_days * 86400000;
375
0
    TEST_SYNC_POINT_CALLBACK("Checker:do_inspect", &last_ctime_ms);
376
0
    if (now - last_ctime_ms >= expiration_ms) {
377
0
        LOG_CHECK_INTERVAL_ALARM << "check risks, instance_id: " << instance.instance_id()
378
0
                                 << " last_ctime_ms: " << last_ctime_ms
379
0
                                 << " job_status: " << job_status
380
0
                                 << " bucket_lifecycle_days: " << bucket_lifecycle_days
381
0
                                 << " reserved_buffer_days: " << config::reserved_buffer_days
382
0
                                 << " expiration_ms: " << expiration_ms;
383
0
    }
384
0
}
385
#undef LOG_CHECK_INTERVAL_ALARM
386
4
void Checker::inspect_instance_check_interval() {
387
8
    while (!stopped()) {
388
4
        LOG(INFO) << "start to inspect instance check interval";
389
4
        std::vector<InstanceInfoPB> instances;
390
4
        get_all_instances(txn_kv_.get(), instances);
391
30
        for (const auto& instance : instances) {
392
30
            if (instance_filter_.filter_out(instance.instance_id())) continue;
393
30
            if (stopped()) return;
394
30
            if (instance.status() == InstanceInfoPB::DELETED) continue;
395
30
            do_inspect(instance);
396
30
        }
397
4
        {
398
4
            std::unique_lock lock(mtx_);
399
4
            notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
400
7
                               [&]() { return stopped(); });
401
4
        }
402
4
    }
403
4
}
404
405
// return 0 for success get a key, 1 for key not found, negative for error
406
25
int key_exist(TxnKv* txn_kv, std::string_view key) {
407
25
    std::unique_ptr<Transaction> txn;
408
25
    TxnErrorCode err = txn_kv->create_txn(&txn);
409
25
    if (err != TxnErrorCode::TXN_OK) {
410
0
        LOG(WARNING) << "failed to init txn, err=" << err;
411
0
        return -1;
412
0
    }
413
25
    std::string val;
414
25
    switch (txn->get(key, &val)) {
415
22
    case TxnErrorCode::TXN_OK:
416
22
        return 0;
417
3
    case TxnErrorCode::TXN_KEY_NOT_FOUND:
418
3
        return 1;
419
0
    default:
420
0
        return -1;
421
25
    }
422
25
}
423
424
InstanceChecker::InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id)
425
97
        : txn_kv_(txn_kv), instance_id_(instance_id) {
426
97
    snapshot_manager_ = std::make_shared<SnapshotManager>(std::move(txn_kv));
427
97
}
428
429
97
int InstanceChecker::init(const InstanceInfoPB& instance) {
430
97
    int ret = init_obj_store_accessors(instance);
431
97
    if (ret != 0) {
432
0
        return ret;
433
0
    }
434
435
97
    return init_storage_vault_accessors(instance);
436
97
}
437
438
97
int InstanceChecker::init_obj_store_accessors(const InstanceInfoPB& instance) {
439
97
    for (const auto& obj_info : instance.obj_info()) {
440
97
#ifdef UNIT_TEST
441
97
        auto accessor = std::make_shared<MockAccessor>();
442
#else
443
        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
444
        if (!s3_conf) {
445
            LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
446
            return -1;
447
        }
448
449
        std::shared_ptr<S3Accessor> accessor;
450
        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
451
        if (ret != 0) {
452
            LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
453
                         << " resource_id=" << obj_info.id();
454
            return ret;
455
        }
456
#endif
457
458
97
        accessor_map_.emplace(obj_info.id(), std::move(accessor));
459
97
    }
460
461
97
    return 0;
462
97
}
463
464
97
int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance) {
465
97
    if (instance.resource_ids().empty()) {
466
97
        return 0;
467
97
    }
468
469
0
    FullRangeGetOptions opts(txn_kv_);
470
0
    opts.prefetch = true;
471
0
    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
472
0
                                      storage_vault_key({instance_id_, "\xff"}), std::move(opts));
473
474
0
    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
475
0
        auto [k, v] = *kv;
476
0
        StorageVaultPB vault;
477
0
        if (!vault.ParseFromArray(v.data(), v.size())) {
478
0
            LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
479
0
            return -1;
480
0
        }
481
0
        TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
482
0
                                 &accessor_map_, &vault);
483
0
        if (vault.has_hdfs_info()) {
484
0
#ifdef ENABLE_HDFS_STORAGE_VAULT
485
0
            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
486
0
            int ret = accessor->init();
487
0
            if (ret != 0) {
488
0
                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
489
0
                             << " resource_id=" << vault.id() << " name=" << vault.name();
490
0
                return ret;
491
0
            }
492
493
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
494
#else
495
            LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), "
496
                       << "but HDFS storage vaults were detected";
497
#endif
498
0
        } else if (vault.has_obj_info()) {
499
0
#ifdef UNIT_TEST
500
0
            auto accessor = std::make_shared<MockAccessor>();
501
#else
502
            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
503
            if (!s3_conf) {
504
                LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
505
                return -1;
506
            }
507
508
            std::shared_ptr<S3Accessor> accessor;
509
            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
510
            if (ret != 0) {
511
                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
512
                             << " resource_id=" << vault.id() << " name=" << vault.name();
513
                return ret;
514
            }
515
#endif
516
517
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
518
0
        }
519
0
    }
520
521
0
    if (!it->is_valid()) {
522
0
        LOG_WARNING("failed to get storage vault kv");
523
0
        return -1;
524
0
    }
525
0
    return 0;
526
0
}
527
528
16
int InstanceChecker::do_check() {
529
16
    TEST_SYNC_POINT("InstanceChecker.do_check");
530
16
    LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
531
16
    int check_ret = 0;
532
16
    long num_scanned = 0;
533
16
    long num_scanned_with_segment = 0;
534
16
    long num_rowset_loss = 0;
535
16
    long instance_volume = 0;
536
16
    using namespace std::chrono;
537
16
    auto start_time = steady_clock::now();
538
16
    DORIS_CLOUD_DEFER {
539
16
        auto cost = duration<float>(steady_clock::now() - start_time).count();
540
16
        LOG(INFO) << "check instance objects finished, cost=" << cost
541
16
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
542
16
                  << " num_scanned_with_segment=" << num_scanned_with_segment
543
16
                  << " num_rowset_loss=" << num_rowset_loss
544
16
                  << " instance_volume=" << instance_volume;
545
16
        g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
546
16
        g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
547
16
        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
548
16
        g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
549
        // FIXME(plat1ko): What if some list operation failed?
550
16
        g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
551
16
    };
552
553
16
    struct TabletFiles {
554
16
        int64_t tablet_id {0};
555
16
        std::unordered_set<std::string> files;
556
16
    };
557
16
    TabletFiles tablet_files_cache;
558
559
4.05k
    auto check_rowset_objects = [&, this](doris::RowsetMetaCloudPB& rs_meta, std::string_view key) {
560
4.05k
        if (rs_meta.num_segments() == 0) {
561
0
            return;
562
0
        }
563
564
4.05k
        bool data_loss = false;
565
4.05k
        bool segment_file_loss = false;
566
4.05k
        bool index_file_loss = false;
567
568
4.05k
        DORIS_CLOUD_DEFER {
569
4.05k
            if (data_loss) {
570
32
                LOG(INFO) << "segment file is" << (segment_file_loss ? "" : " not") << " loss, "
571
32
                          << "index file is" << (index_file_loss ? "" : " not") << " loss, "
572
32
                          << "rowset.tablet_id = " << rs_meta.tablet_id();
573
32
                num_rowset_loss++;
574
32
            }
575
4.05k
        };
576
577
4.05k
        ++num_scanned_with_segment;
578
4.05k
        if (tablet_files_cache.tablet_id != rs_meta.tablet_id()) {
579
454
            long tablet_volume = 0;
580
            // Clear cache
581
454
            tablet_files_cache.tablet_id = 0;
582
454
            tablet_files_cache.files.clear();
583
            // Get all file paths under this tablet directory
584
454
            auto find_it = accessor_map_.find(rs_meta.resource_id());
585
454
            if (find_it == accessor_map_.end()) {
586
0
                LOG_WARNING("resource id not found in accessor map")
587
0
                        .tag("resource_id", rs_meta.resource_id())
588
0
                        .tag("tablet_id", rs_meta.tablet_id())
589
0
                        .tag("rowset_id", rs_meta.rowset_id_v2());
590
0
                check_ret = -1;
591
0
                return;
592
0
            }
593
594
454
            std::unique_ptr<ListIterator> list_iter;
595
454
            int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
596
454
                                                      &list_iter);
597
454
            if (ret != 0) { // No need to log, because S3Accessor has logged this error
598
0
                check_ret = -1;
599
0
                return;
600
0
            }
601
602
18.5k
            for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
603
18.0k
                tablet_files_cache.files.insert(std::move(file->path));
604
18.0k
                tablet_volume += file->size;
605
18.0k
            }
606
454
            tablet_files_cache.tablet_id = rs_meta.tablet_id();
607
454
            instance_volume += tablet_volume;
608
454
        }
609
610
16.1k
        for (int i = 0; i < rs_meta.num_segments(); ++i) {
611
12.0k
            auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
612
613
12.0k
            if (tablet_files_cache.files.contains(path)) {
614
12.0k
                continue;
615
12.0k
            }
616
617
13
            if (1 == key_exist(txn_kv_.get(), key)) {
618
                // Rowset has been deleted instead of data loss
619
0
                break;
620
0
            }
621
13
            data_loss = true;
622
13
            segment_file_loss = true;
623
13
            TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
624
13
            LOG(WARNING) << "object not exist, path=" << path
625
13
                         << ", rs_meta=" << rs_meta.ShortDebugString() << " key=" << hex(key);
626
13
        }
627
628
4.05k
        std::unique_ptr<Transaction> txn;
629
4.05k
        TxnErrorCode err = txn_kv_->create_txn(&txn);
630
4.05k
        if (err != TxnErrorCode::TXN_OK) {
631
0
            LOG(WARNING) << "failed to init txn, err=" << err;
632
0
            check_ret = -1;
633
0
            return;
634
0
        }
635
636
4.05k
        TabletIndexPB tablet_index;
637
4.05k
        if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) == -1) {
638
0
            LOG(WARNING) << "failed to get tablet index, tablet_id= " << rs_meta.tablet_id();
639
0
            check_ret = -1;
640
0
            return;
641
0
        }
642
643
4.05k
        auto tablet_schema_key =
644
4.05k
                meta_schema_key({instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
645
4.05k
        ValueBuf tablet_schema_val;
646
4.05k
        err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
647
648
4.05k
        if (err != TxnErrorCode::TXN_OK) {
649
2.00k
            check_ret = -1;
650
2.00k
            LOG(WARNING) << "failed to get schema, err=" << err;
651
2.00k
            return;
652
2.00k
        }
653
654
2.05k
        auto* schema = rs_meta.mutable_tablet_schema();
655
2.05k
        if (!parse_schema_value(tablet_schema_val, schema)) {
656
0
            LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
657
0
            return;
658
0
        }
659
660
2.05k
        std::vector<std::pair<int64_t, std::string>> index_ids;
661
2.05k
        for (const auto& i : rs_meta.tablet_schema().index()) {
662
2.05k
            if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
663
2.05k
                index_ids.emplace_back(i.index_id(), i.index_suffix_name());
664
2.05k
            }
665
2.05k
        }
666
2.05k
        if (!index_ids.empty()) {
667
8.10k
            for (int i = 0; i < rs_meta.num_segments(); ++i) {
668
6.05k
                std::vector<std::string> index_path_v;
669
6.05k
                if (rs_meta.tablet_schema().inverted_index_storage_format() ==
670
6.05k
                    InvertedIndexStorageFormatPB::V1) {
671
6.01k
                    for (const auto& index_id : index_ids) {
672
6.01k
                        LOG(INFO) << "check inverted index, tablet_id=" << rs_meta.tablet_id()
673
6.01k
                                  << " rowset_id=" << rs_meta.rowset_id_v2() << " segment_id=" << i
674
6.01k
                                  << " index_id=" << index_id.first
675
6.01k
                                  << " index_suffix_name=" << index_id.second;
676
6.01k
                        index_path_v.emplace_back(
677
6.01k
                                inverted_index_path_v1(rs_meta.tablet_id(), rs_meta.rowset_id_v2(),
678
6.01k
                                                       i, index_id.first, index_id.second));
679
6.01k
                    }
680
6.01k
                } else {
681
40
                    index_path_v.emplace_back(
682
40
                            inverted_index_path_v2(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i));
683
40
                }
684
685
6.05k
                if (std::ranges::all_of(index_path_v, [&](const auto& idx_file_path) {
686
6.05k
                        if (!tablet_files_cache.files.contains(idx_file_path)) {
687
23
                            LOG(INFO) << "loss index file: " << idx_file_path;
688
23
                            return false;
689
23
                        }
690
6.03k
                        return true;
691
6.05k
                    })) {
692
6.03k
                    continue;
693
6.03k
                }
694
23
                index_file_loss = true;
695
23
                data_loss = true;
696
23
            }
697
2.05k
        }
698
2.05k
    };
699
700
    // scan visible rowsets
701
16
    auto start_key = meta_rowset_key({instance_id_, 0, 0});
702
16
    auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
703
704
16
    std::unique_ptr<RangeGetIterator> it;
705
16
    do {
706
16
        std::unique_ptr<Transaction> txn;
707
16
        TxnErrorCode err = txn_kv_->create_txn(&txn);
708
16
        if (err != TxnErrorCode::TXN_OK) {
709
0
            LOG(WARNING) << "failed to init txn, err=" << err;
710
0
            return -1;
711
0
        }
712
713
16
        err = txn->get(start_key, end_key, &it);
714
16
        if (err != TxnErrorCode::TXN_OK) {
715
0
            LOG(WARNING) << "internal error, failed to get rowset meta, err=" << err;
716
0
            return -1;
717
0
        }
718
16
        num_scanned += it->size();
719
720
4.07k
        while (it->has_next() && !stopped()) {
721
4.05k
            auto [k, v] = it->next();
722
4.05k
            if (!it->has_next()) {
723
6
                start_key = k;
724
6
            }
725
726
4.05k
            doris::RowsetMetaCloudPB rs_meta;
727
4.05k
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
728
0
                ++num_rowset_loss;
729
0
                LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
730
0
                continue;
731
0
            }
732
4.05k
            check_rowset_objects(rs_meta, k);
733
4.05k
        }
734
16
        start_key.push_back('\x00'); // Update to next smallest key for iteration
735
16
    } while (it->more() && !stopped());
736
737
16
    return num_rowset_loss > 0 ? 1 : check_ret;
738
16
}
739
740
34
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
741
    // If there are multiple buckets, return the minimum lifecycle.
742
34
    int64_t min_lifecycle_days = INT64_MAX;
743
34
    int64_t tmp_liefcycle_days = 0;
744
34
    for (const auto& [id, accessor] : accessor_map_) {
745
34
        if (accessor->type() != AccessorType::S3) {
746
34
            continue;
747
34
        }
748
749
0
        auto* s3_accessor = static_cast<S3Accessor*>(accessor.get());
750
751
0
        if (s3_accessor->check_versioning() != 0) {
752
0
            return -1;
753
0
        }
754
755
0
        if (s3_accessor->get_life_cycle(&tmp_liefcycle_days) != 0) {
756
0
            return -1;
757
0
        }
758
759
0
        if (tmp_liefcycle_days < min_lifecycle_days) {
760
0
            min_lifecycle_days = tmp_liefcycle_days;
761
0
        }
762
0
    }
763
34
    *lifecycle_days = min_lifecycle_days;
764
34
    return 0;
765
34
}
766
767
5
int InstanceChecker::do_inverted_check() {
768
5
    if (accessor_map_.size() > 1) {
769
0
        LOG(INFO) << "currently not support inverted check for multi accessor. instance_id="
770
0
                  << instance_id_;
771
0
        return 0;
772
0
    }
773
774
5
    LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
775
5
    int check_ret = 0;
776
5
    long num_scanned = 0;
777
5
    long num_file_leak = 0;
778
5
    using namespace std::chrono;
779
5
    auto start_time = steady_clock::now();
780
5
    DORIS_CLOUD_DEFER {
781
5
        g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
782
5
        g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
783
5
        auto cost = duration<float>(steady_clock::now() - start_time).count();
784
5
        LOG(INFO) << "inverted check instance objects finished, cost=" << cost
785
5
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
786
5
                  << " num_file_leak=" << num_file_leak;
787
5
    };
788
789
5
    struct TabletRowsets {
790
5
        int64_t tablet_id {0};
791
5
        std::unordered_set<std::string> rowset_ids;
792
5
    };
793
5
    TabletRowsets tablet_rowsets_cache;
794
795
5
    RowsetIndexesFormatV1 rowset_index_cache_v1;
796
5
    RowsetIndexesFormatV2 rowset_index_cache_v2;
797
798
    // Return 0 if check success, return 1 if file is garbage data, negative if error occurred
799
108
    auto check_segment_file = [&](const std::string& obj_key) {
800
108
        std::vector<std::string> str;
801
108
        butil::SplitString(obj_key, '/', &str);
802
        // data/{tablet_id}/{rowset_id}_{seg_num}.dat
803
108
        if (str.size() < 3) {
804
            // clang-format off
805
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
806
0
                         << " value = " << str.size();
807
            // clang-format on
808
0
            return -1;
809
0
        }
810
811
108
        int64_t tablet_id = atol(str[1].c_str());
812
108
        if (tablet_id <= 0) {
813
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
814
0
            return -1;
815
0
        }
816
817
108
        if (!str[2].ends_with(".dat")) {
818
            // skip check not segment file
819
54
            return 0;
820
54
        }
821
822
54
        std::string rowset_id;
823
54
        if (auto pos = str.back().find('_'); pos != std::string::npos) {
824
54
            rowset_id = str.back().substr(0, pos);
825
54
        } else {
826
0
            LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
827
0
            return -1;
828
0
        }
829
830
54
        if (tablet_rowsets_cache.tablet_id == tablet_id) {
831
7
            if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
832
2
                return 0;
833
5
            } else {
834
5
                LOG(WARNING) << "rowset not exists, key=" << obj_key;
835
5
                return -1;
836
5
            }
837
7
        }
838
        // Get all rowset id of this tablet
839
47
        tablet_rowsets_cache.tablet_id = tablet_id;
840
47
        tablet_rowsets_cache.rowset_ids.clear();
841
47
        std::unique_ptr<Transaction> txn;
842
47
        TxnErrorCode err = txn_kv_->create_txn(&txn);
843
47
        if (err != TxnErrorCode::TXN_OK) {
844
0
            LOG(WARNING) << "failed to create txn";
845
0
            return -1;
846
0
        }
847
47
        std::unique_ptr<RangeGetIterator> it;
848
47
        auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
849
47
        auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
850
47
        do {
851
47
            TxnErrorCode err = txn->get(begin, end, &it);
852
47
            if (err != TxnErrorCode::TXN_OK) {
853
0
                LOG(WARNING) << "failed to get rowset kv, err=" << err;
854
0
                return -1;
855
0
            }
856
47
            if (!it->has_next()) {
857
10
                break;
858
10
            }
859
37
            while (it->has_next()) {
860
                // recycle corresponding resources
861
37
                auto [k, v] = it->next();
862
37
                doris::RowsetMetaCloudPB rowset;
863
37
                if (!rowset.ParseFromArray(v.data(), v.size())) {
864
0
                    LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
865
0
                    return -1;
866
0
                }
867
37
                tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
868
37
                if (!it->has_next()) {
869
37
                    begin = k;
870
37
                    begin.push_back('\x00'); // Update to next smallest key for iteration
871
37
                    break;
872
37
                }
873
37
            }
874
37
        } while (it->more() && !stopped());
875
876
47
        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
877
            // Garbage data leak
878
12
            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
879
12
            return 1;
880
12
        }
881
882
35
        return 0;
883
47
    };
884
885
108
    auto check_inverted_index_file = [&](const std::string& obj_key) {
886
108
        std::vector<std::string> str;
887
108
        butil::SplitString(obj_key, '/', &str);
888
        // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
889
        // format v2: data/{tablet_id}/{rowset_id}_{seg_num}.idx
890
108
        if (str.size() < 3) {
891
            // clang-format off
892
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
893
0
                         << " value = " << str.size();
894
            // clang-format on
895
0
            return -1;
896
0
        }
897
898
108
        int64_t tablet_id = atol(str[1].c_str());
899
108
        if (tablet_id <= 0) {
900
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
901
0
            return -1;
902
0
        }
903
904
        // v1: {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
905
        // v2: {rowset_id}_{seg_num}.idx
906
108
        std::string rowset_info = str.back();
907
908
108
        if (!rowset_info.ends_with(".idx")) {
909
54
            return 0; // Not an index file
910
54
        }
911
912
54
        InvertedIndexStorageFormatPB inverted_index_storage_format =
913
54
                std::count(rowset_info.begin(), rowset_info.end(), '_') > 1
914
54
                        ? InvertedIndexStorageFormatPB::V1
915
54
                        : InvertedIndexStorageFormatPB::V2;
916
917
54
        size_t pos = rowset_info.find_last_of('_');
918
54
        if (pos == std::string::npos || pos + 1 >= str.back().size() - 4) {
919
0
            LOG(WARNING) << "Invalid index_id format, key=" << obj_key;
920
0
            return -1;
921
0
        }
922
54
        if (inverted_index_storage_format == InvertedIndexStorageFormatPB::V1) {
923
14
            return check_inverted_index_file_storage_format_v1(tablet_id, obj_key, rowset_info,
924
14
                                                               rowset_index_cache_v1);
925
40
        } else {
926
40
            return check_inverted_index_file_storage_format_v2(tablet_id, obj_key, rowset_info,
927
40
                                                               rowset_index_cache_v2);
928
40
        }
929
54
    };
930
    // so we choose to skip here.
931
5
    TEST_SYNC_POINT_RETURN_WITH_VALUE("InstanceChecker::do_inverted_check", (int)0);
932
933
3
    for (auto& [_, accessor] : accessor_map_) {
934
3
        std::unique_ptr<ListIterator> list_iter;
935
3
        int ret = accessor->list_directory("data", &list_iter);
936
3
        if (ret != 0) {
937
0
            return -1;
938
0
        }
939
940
111
        for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
941
108
            ++num_scanned;
942
108
            int ret = check_segment_file(file->path);
943
108
            if (ret != 0) {
944
17
                LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
945
17
                             << " path=" << file->path;
946
17
                if (ret == 1) {
947
12
                    ++num_file_leak;
948
12
                } else {
949
5
                    check_ret = -1;
950
5
                }
951
17
            }
952
108
            ret = check_inverted_index_file(file->path);
953
108
            if (ret != 0) {
954
13
                LOG(WARNING) << "failed to check index file, uri=" << accessor->uri()
955
13
                             << " path=" << file->path;
956
13
                if (ret == 1) {
957
13
                    ++num_file_leak;
958
13
                } else {
959
0
                    check_ret = -1;
960
0
                }
961
13
            }
962
108
        }
963
964
3
        if (!list_iter->is_valid()) {
965
0
            LOG(WARNING) << "failed to list data directory. uri=" << accessor->uri();
966
0
            return -1;
967
0
        }
968
3
    }
969
3
    return num_file_leak > 0 ? 1 : check_ret;
970
3
}
971
972
3
int InstanceChecker::traverse_mow_tablet(const std::function<int(int64_t, bool)>& check_func) {
973
3
    std::unique_ptr<RangeGetIterator> it;
974
3
    auto begin = meta_rowset_key({instance_id_, 0, 0});
975
3
    auto end = meta_rowset_key({instance_id_, std::numeric_limits<int64_t>::max(), 0});
976
43
    do {
977
43
        std::unique_ptr<Transaction> txn;
978
43
        TxnErrorCode err = txn_kv_->create_txn(&txn);
979
43
        if (err != TxnErrorCode::TXN_OK) {
980
0
            LOG(WARNING) << "failed to create txn";
981
0
            return -1;
982
0
        }
983
43
        err = txn->get(begin, end, &it, false, 1);
984
43
        if (err != TxnErrorCode::TXN_OK) {
985
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
986
0
            return -1;
987
0
        }
988
43
        if (!it->has_next()) {
989
3
            break;
990
3
        }
991
80
        while (it->has_next() && !stopped()) {
992
40
            auto [k, v] = it->next();
993
40
            std::string_view k1 = k;
994
40
            k1.remove_prefix(1);
995
40
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
996
40
            decode_key(&k1, &out);
997
            // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
998
40
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
999
1000
40
            if (!it->has_next()) {
1001
                // Update to next smallest key for iteration
1002
                // scan for next tablet in this instance
1003
40
                begin = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1004
40
            }
1005
1006
40
            TabletMetaCloudPB tablet_meta;
1007
40
            int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1008
40
            if (ret < 0) {
1009
0
                LOG(WARNING) << fmt::format(
1010
0
                        "failed to get_tablet_meta in do_delete_bitmap_integrity_check(), "
1011
0
                        "instance_id={}, tablet_id={}",
1012
0
                        instance_id_, tablet_id);
1013
0
                return ret;
1014
0
            }
1015
1016
40
            if (tablet_meta.enable_unique_key_merge_on_write()) {
1017
                // only check merge-on-write table
1018
30
                bool has_sequence_col = tablet_meta.schema().has_sequence_col_idx() &&
1019
30
                                        tablet_meta.schema().sequence_col_idx() != -1;
1020
30
                int ret = check_func(tablet_id, has_sequence_col);
1021
30
                if (ret < 0) {
1022
                    // return immediately when encounter unexpected error,
1023
                    // otherwise, we continue to check the next tablet
1024
0
                    return ret;
1025
0
                }
1026
30
            }
1027
40
        }
1028
40
    } while (it->more() && !stopped());
1029
3
    return 0;
1030
3
}
1031
1032
int InstanceChecker::traverse_rowset_delete_bitmaps(
1033
        int64_t tablet_id, std::string rowset_id,
1034
0
        const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback) {
1035
0
    std::unique_ptr<RangeGetIterator> it;
1036
0
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, 0, 0});
1037
0
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id,
1038
0
                                       std::numeric_limits<int64_t>::max(),
1039
0
                                       std::numeric_limits<int64_t>::max()});
1040
0
    do {
1041
0
        std::unique_ptr<Transaction> txn;
1042
0
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1043
0
        if (err != TxnErrorCode::TXN_OK) {
1044
0
            LOG(WARNING) << "failed to create txn";
1045
0
            return -1;
1046
0
        }
1047
0
        err = txn->get(begin, end, &it);
1048
0
        if (err != TxnErrorCode::TXN_OK) {
1049
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1050
0
            return -1;
1051
0
        }
1052
0
        if (!it->has_next()) {
1053
0
            break;
1054
0
        }
1055
0
        while (it->has_next() && !stopped()) {
1056
0
            auto [k, v] = it->next();
1057
0
            std::string_view k1 = k;
1058
0
            k1.remove_prefix(1);
1059
0
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1060
0
            decode_key(&k1, &out);
1061
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1062
0
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1063
0
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1064
1065
0
            int ret = callback(tablet_id, rowset_id, version, segment_id);
1066
0
            if (ret != 0) {
1067
0
                return ret;
1068
0
            }
1069
1070
0
            if (!it->has_next()) {
1071
0
                begin = k;
1072
0
                begin.push_back('\x00'); // Update to next smallest key for iteration
1073
0
                break;
1074
0
            }
1075
0
        }
1076
0
    } while (it->more() && !stopped());
1077
1078
0
    return 0;
1079
0
}
1080
1081
int InstanceChecker::collect_tablet_rowsets(
1082
50
        int64_t tablet_id, const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb) {
1083
50
    std::unique_ptr<Transaction> txn;
1084
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1085
50
    if (err != TxnErrorCode::TXN_OK) {
1086
0
        LOG(WARNING) << "failed to create txn";
1087
0
        return -1;
1088
0
    }
1089
50
    std::unique_ptr<RangeGetIterator> it;
1090
50
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1091
50
    auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1092
1093
50
    int64_t rowsets_num {0};
1094
50
    do {
1095
50
        TxnErrorCode err = txn->get(begin, end, &it);
1096
50
        if (err != TxnErrorCode::TXN_OK) {
1097
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1098
0
            return -1;
1099
0
        }
1100
50
        if (!it->has_next()) {
1101
0
            break;
1102
0
        }
1103
394
        while (it->has_next() && !stopped()) {
1104
394
            auto [k, v] = it->next();
1105
394
            doris::RowsetMetaCloudPB rowset;
1106
394
            if (!rowset.ParseFromArray(v.data(), v.size())) {
1107
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1108
0
                return -1;
1109
0
            }
1110
1111
394
            ++rowsets_num;
1112
394
            collect_cb(rowset);
1113
1114
394
            if (!it->has_next()) {
1115
50
                begin = k;
1116
50
                begin.push_back('\x00'); // Update to next smallest key for iteration
1117
50
                break;
1118
50
            }
1119
394
        }
1120
50
    } while (it->more() && !stopped());
1121
1122
50
    LOG(INFO) << fmt::format(
1123
50
            "[delete bitmap checker] successfully collect rowsets for instance_id={}, "
1124
50
            "tablet_id={}, rowsets_num={}",
1125
50
            instance_id_, tablet_id, rowsets_num);
1126
50
    return 0;
1127
50
}
1128
1129
2
int InstanceChecker::do_delete_bitmap_inverted_check() {
1130
2
    LOG(INFO) << fmt::format(
1131
2
            "[delete bitmap checker] begin to do_delete_bitmap_inverted_check for instance_id={}",
1132
2
            instance_id_);
1133
1134
    // number of delete bitmap keys being scanned
1135
2
    int64_t total_delete_bitmap_keys {0};
1136
    // number of delete bitmaps which belongs to non mow tablet
1137
2
    int64_t abnormal_delete_bitmaps {0};
1138
    // number of delete bitmaps which doesn't have corresponding rowset in MS
1139
2
    int64_t leaked_delete_bitmaps {0};
1140
1141
2
    auto start_time = std::chrono::steady_clock::now();
1142
2
    DORIS_CLOUD_DEFER {
1143
2
        g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_, leaked_delete_bitmaps);
1144
2
        g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_, abnormal_delete_bitmaps);
1145
2
        g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_, total_delete_bitmap_keys);
1146
1147
2
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1148
2
                            std::chrono::steady_clock::now() - start_time)
1149
2
                            .count();
1150
2
        if (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) {
1151
1
            LOG(WARNING) << fmt::format(
1152
1
                    "[delete bitmap check fails] delete bitmap inverted check for instance_id={}, "
1153
1
                    "cost={} ms, total_delete_bitmap_keys={}, leaked_delete_bitmaps={}, "
1154
1
                    "abnormal_delete_bitmaps={}",
1155
1
                    instance_id_, cost, total_delete_bitmap_keys, leaked_delete_bitmaps,
1156
1
                    abnormal_delete_bitmaps);
1157
1
        } else {
1158
1
            LOG(INFO) << fmt::format(
1159
1
                    "[delete bitmap checker] delete bitmap inverted check for instance_id={}, "
1160
1
                    "passed. cost={} ms, total_delete_bitmap_keys={}",
1161
1
                    instance_id_, cost, total_delete_bitmap_keys);
1162
1
        }
1163
2
    };
1164
1165
2
    struct TabletsRowsetsCache {
1166
2
        int64_t tablet_id {-1};
1167
2
        bool enable_merge_on_write {false};
1168
2
        std::unordered_set<std::string> rowsets {};
1169
2
        std::unordered_set<std::string> pending_delete_bitmaps {};
1170
2
    } tablet_rowsets_cache {};
1171
1172
2
    std::unique_ptr<RangeGetIterator> it;
1173
2
    auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0});
1174
2
    auto end =
1175
2
            meta_delete_bitmap_key({instance_id_, std::numeric_limits<int64_t>::max(), "", 0, 0});
1176
2
    do {
1177
2
        std::unique_ptr<Transaction> txn;
1178
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1179
2
        if (err != TxnErrorCode::TXN_OK) {
1180
0
            LOG(WARNING) << "failed to create txn";
1181
0
            return -1;
1182
0
        }
1183
2
        err = txn->get(begin, end, &it);
1184
2
        if (err != TxnErrorCode::TXN_OK) {
1185
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1186
0
            return -1;
1187
0
        }
1188
2
        if (!it->has_next()) {
1189
0
            break;
1190
0
        }
1191
502
        while (it->has_next() && !stopped()) {
1192
500
            auto [k, v] = it->next();
1193
500
            std::string_view k1 = k;
1194
500
            k1.remove_prefix(1);
1195
500
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1196
500
            decode_key(&k1, &out);
1197
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1198
500
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
1199
500
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1200
500
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1201
500
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1202
1203
500
            ++total_delete_bitmap_keys;
1204
1205
500
            if (!it->has_next()) {
1206
2
                begin = k;
1207
2
                begin.push_back('\x00'); // Update to next smallest key for iteration
1208
2
            }
1209
1210
500
            if (tablet_rowsets_cache.tablet_id == -1 ||
1211
500
                tablet_rowsets_cache.tablet_id != tablet_id) {
1212
30
                TabletMetaCloudPB tablet_meta;
1213
30
                int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1214
30
                if (ret < 0) {
1215
0
                    LOG(WARNING) << fmt::format(
1216
0
                            "[delete bitmap checker] failed to get_tablet_meta in "
1217
0
                            "do_delete_bitmap_inverted_check(), instance_id={}, tablet_id={}",
1218
0
                            instance_id_, tablet_id);
1219
0
                    return ret;
1220
0
                }
1221
1222
30
                tablet_rowsets_cache.tablet_id = tablet_id;
1223
30
                tablet_rowsets_cache.enable_merge_on_write =
1224
30
                        tablet_meta.enable_unique_key_merge_on_write();
1225
30
                tablet_rowsets_cache.rowsets.clear();
1226
30
                tablet_rowsets_cache.pending_delete_bitmaps.clear();
1227
1228
30
                if (tablet_rowsets_cache.enable_merge_on_write) {
1229
                    // only collect rowsets for merge-on-write tablet
1230
20
                    auto collect_cb =
1231
199
                            [&tablet_rowsets_cache](const doris::RowsetMetaCloudPB& rowset) {
1232
199
                                tablet_rowsets_cache.rowsets.insert(rowset.rowset_id_v2());
1233
199
                            };
1234
20
                    ret = collect_tablet_rowsets(tablet_id, collect_cb);
1235
20
                    if (ret < 0) {
1236
0
                        return ret;
1237
0
                    }
1238
                    // get pending delete bitmaps
1239
20
                    ret = get_pending_delete_bitmap_keys(
1240
20
                            tablet_id, tablet_rowsets_cache.pending_delete_bitmaps);
1241
20
                    if (ret < 0) {
1242
0
                        return ret;
1243
0
                    }
1244
20
                }
1245
30
            }
1246
500
            DCHECK_EQ(tablet_id, tablet_rowsets_cache.tablet_id);
1247
1248
500
            if (!tablet_rowsets_cache.enable_merge_on_write) {
1249
                // clang-format off
1250
40
                TEST_SYNC_POINT_CALLBACK(
1251
40
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
1252
40
                        &tablet_id, &rowset_id, &version, &segment_id);
1253
                // clang-format on
1254
40
                ++abnormal_delete_bitmaps;
1255
                // log an error and continue to check the next delete bitmap
1256
40
                LOG(WARNING) << fmt::format(
1257
40
                        "[delete bitmap check fails] find a delete bitmap belongs to tablet "
1258
40
                        "which is not a merge-on-write table! instance_id={}, tablet_id={}, "
1259
40
                        "version={}, segment_id={}",
1260
40
                        instance_id_, tablet_id, version, segment_id);
1261
40
                continue;
1262
40
            }
1263
1264
460
            if (!tablet_rowsets_cache.rowsets.contains(rowset_id) &&
1265
460
                !tablet_rowsets_cache.pending_delete_bitmaps.contains(std::string(k))) {
1266
170
                TEST_SYNC_POINT_CALLBACK(
1267
170
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap",
1268
170
                        &tablet_id, &rowset_id, &version, &segment_id);
1269
170
                ++leaked_delete_bitmaps;
1270
                // log an error and continue to check the next delete bitmap
1271
170
                LOG(WARNING) << fmt::format(
1272
170
                        "[delete bitmap check fails] can't find corresponding rowset for delete "
1273
170
                        "bitmap instance_id={}, tablet_id={}, rowset_id={}, version={}, "
1274
170
                        "segment_id={}",
1275
170
                        instance_id_, tablet_id, rowset_id, version, segment_id);
1276
170
            }
1277
460
        }
1278
2
    } while (it->more() && !stopped());
1279
1280
2
    return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0;
1281
2
}
1282
1283
int InstanceChecker::get_pending_delete_bitmap_keys(
1284
50
        int64_t tablet_id, std::unordered_set<std::string>& pending_delete_bitmaps) {
1285
50
    std::unique_ptr<Transaction> txn;
1286
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1287
50
    if (err != TxnErrorCode::TXN_OK) {
1288
0
        LOG(WARNING) << "failed to create txn";
1289
0
        return -1;
1290
0
    }
1291
50
    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
1292
50
    std::string pending_val;
1293
50
    err = txn->get(pending_key, &pending_val);
1294
50
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1295
0
        LOG(WARNING) << "failed to get pending delete bitmap kv, err=" << err;
1296
0
        return -1;
1297
0
    }
1298
50
    if (err == TxnErrorCode::TXN_OK) {
1299
2
        PendingDeleteBitmapPB pending_info;
1300
2
        if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
1301
0
            LOG(WARNING) << "failed to parse PendingDeleteBitmapPB, tablet=" << tablet_id;
1302
0
            return -1;
1303
0
        }
1304
12
        for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) {
1305
12
            pending_delete_bitmaps.emplace(std::string(delete_bitmap_key));
1306
12
        }
1307
2
    }
1308
50
    return 0;
1309
50
}
1310
1311
int InstanceChecker::check_inverted_index_file_storage_format_v1(
1312
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1313
14
        RowsetIndexesFormatV1& rowset_index_cache_v1) {
1314
    // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1315
14
    std::string rowset_id;
1316
14
    int64_t segment_id;
1317
14
    std::string index_id_with_suffix_name;
1318
    // {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1319
14
    std::vector<std::string> str;
1320
14
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1321
14
    if (str.size() < 3) {
1322
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 3, rowset_info = "
1323
0
                     << rowset_info;
1324
0
        return -1;
1325
0
    }
1326
14
    rowset_id = str[0];
1327
14
    segment_id = std::atoll(str[1].c_str());
1328
14
    index_id_with_suffix_name = str[2];
1329
1330
14
    if (rowset_index_cache_v1.rowset_id == rowset_id) {
1331
0
        if (rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1332
0
            if (auto it = rowset_index_cache_v1.index_ids.find(index_id_with_suffix_name);
1333
0
                it == rowset_index_cache_v1.index_ids.end()) {
1334
                // clang-format off
1335
0
                LOG(WARNING) << fmt::format("index_id with suffix name not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1336
                // clang-format on
1337
0
                return -1;
1338
0
            }
1339
0
        } else {
1340
            // clang-format off
1341
0
            LOG(WARNING) << fmt::format("segment id not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1342
            // clang-format on
1343
0
            return -1;
1344
0
        }
1345
0
    }
1346
1347
14
    rowset_index_cache_v1.rowset_id = rowset_id;
1348
14
    rowset_index_cache_v1.segment_ids.clear();
1349
14
    rowset_index_cache_v1.index_ids.clear();
1350
1351
14
    std::unique_ptr<Transaction> txn;
1352
14
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1353
14
    if (err != TxnErrorCode::TXN_OK) {
1354
0
        LOG(WARNING) << "failed to create txn";
1355
0
        return -1;
1356
0
    }
1357
14
    std::unique_ptr<RangeGetIterator> it;
1358
14
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1359
14
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1360
14
    do {
1361
14
        TxnErrorCode err = txn->get(begin, end, &it);
1362
14
        if (err != TxnErrorCode::TXN_OK) {
1363
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1364
0
            return -1;
1365
0
        }
1366
14
        if (!it->has_next()) {
1367
8
            break;
1368
8
        }
1369
6
        while (it->has_next()) {
1370
            // recycle corresponding resources
1371
6
            auto [k, v] = it->next();
1372
6
            doris::RowsetMetaCloudPB rs_meta;
1373
6
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1374
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1375
0
                return -1;
1376
0
            }
1377
1378
6
            TabletIndexPB tablet_index;
1379
6
            if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) ==
1380
6
                -1) {
1381
0
                LOG(WARNING) << "failedt to get tablet index, tablet_id= " << rs_meta.tablet_id();
1382
0
                return -1;
1383
0
            }
1384
1385
6
            auto tablet_schema_key = meta_schema_key(
1386
6
                    {instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
1387
6
            ValueBuf tablet_schema_val;
1388
6
            err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
1389
1390
6
            if (err != TxnErrorCode::TXN_OK) {
1391
0
                LOG(WARNING) << "failed to get schema, err=" << err;
1392
0
                return -1;
1393
0
            }
1394
1395
6
            auto* schema = rs_meta.mutable_tablet_schema();
1396
6
            if (!parse_schema_value(tablet_schema_val, schema)) {
1397
0
                LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
1398
0
                return -1;
1399
0
            }
1400
1401
12
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1402
6
                rowset_index_cache_v1.segment_ids.insert(i);
1403
6
            }
1404
1405
6
            for (const auto& i : rs_meta.tablet_schema().index()) {
1406
6
                if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
1407
6
                    LOG(INFO) << fmt::format(
1408
6
                            "record index info, index_id: {}, index_suffix_name: {}", i.index_id(),
1409
6
                            i.index_suffix_name());
1410
6
                    rowset_index_cache_v1.index_ids.insert(
1411
6
                            fmt::format("{}{}", i.index_id(), i.index_suffix_name()));
1412
6
                }
1413
6
            }
1414
1415
6
            if (!it->has_next()) {
1416
6
                begin = k;
1417
6
                begin.push_back('\x00'); // Update to next smallest key for iteration
1418
6
                break;
1419
6
            }
1420
6
        }
1421
6
    } while (it->more() && !stopped());
1422
1423
14
    if (!rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1424
        // Garbage data leak
1425
        // clang-format off
1426
8
        LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains segment_id, rowset should be recycled,"
1427
8
                     << " key = " << file_path
1428
8
                     << " segment_id = " << segment_id;
1429
        // clang-format on
1430
8
        return 1;
1431
8
    }
1432
1433
6
    if (!rowset_index_cache_v1.index_ids.contains(index_id_with_suffix_name)) {
1434
        // Garbage data leak
1435
        // clang-format off
1436
0
        LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains index_id_with_suffix_name,"
1437
0
                     << " rowset with inde meta should be recycled, key=" << file_path
1438
0
                     << " index_id_with_suffix_name=" << index_id_with_suffix_name;
1439
        // clang-format on
1440
0
        return 1;
1441
0
    }
1442
1443
6
    return 0;
1444
6
}
1445
1446
int InstanceChecker::check_inverted_index_file_storage_format_v2(
1447
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1448
40
        RowsetIndexesFormatV2& rowset_index_cache_v2) {
1449
40
    std::string rowset_id;
1450
40
    int64_t segment_id;
1451
    // {rowset_id}_{seg_num}.idx
1452
40
    std::vector<std::string> str;
1453
40
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1454
40
    if (str.size() < 2) {
1455
        // clang-format off
1456
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 2, rowset_info = " << rowset_info;
1457
        // clang-format on
1458
0
        return -1;
1459
0
    }
1460
40
    rowset_id = str[0];
1461
40
    segment_id = std::atoll(str[1].c_str());
1462
1463
40
    if (rowset_index_cache_v2.rowset_id == rowset_id) {
1464
0
        if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1465
            // clang-format off
1466
0
            LOG(WARNING) << fmt::format("index file not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1467
            // clang-format on
1468
0
            return -1;
1469
0
        }
1470
0
    }
1471
1472
40
    rowset_index_cache_v2.rowset_id = rowset_id;
1473
40
    rowset_index_cache_v2.segment_ids.clear();
1474
1475
40
    std::unique_ptr<Transaction> txn;
1476
40
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1477
40
    if (err != TxnErrorCode::TXN_OK) {
1478
0
        LOG(WARNING) << "failed to create txn";
1479
0
        return -1;
1480
0
    }
1481
40
    std::unique_ptr<RangeGetIterator> it;
1482
40
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1483
40
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1484
40
    do {
1485
40
        TxnErrorCode err = txn->get(begin, end, &it);
1486
40
        if (err != TxnErrorCode::TXN_OK) {
1487
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1488
0
            return -1;
1489
0
        }
1490
40
        if (!it->has_next()) {
1491
5
            break;
1492
5
        }
1493
35
        while (it->has_next()) {
1494
            // recycle corresponding resources
1495
35
            auto [k, v] = it->next();
1496
35
            doris::RowsetMetaCloudPB rs_meta;
1497
35
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1498
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1499
0
                return -1;
1500
0
            }
1501
1502
70
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1503
35
                rowset_index_cache_v2.segment_ids.insert(i);
1504
35
            }
1505
1506
35
            if (!it->has_next()) {
1507
35
                begin = k;
1508
35
                begin.push_back('\x00'); // Update to next smallest key for iteration
1509
35
                break;
1510
35
            }
1511
35
        }
1512
35
    } while (it->more() && !stopped());
1513
1514
40
    if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1515
        // Garbage data leak
1516
5
        LOG(WARNING) << "rowset with index meta should be recycled, key=" << file_path;
1517
5
        return 1;
1518
5
    }
1519
1520
35
    return 0;
1521
40
}
1522
1523
int InstanceChecker::check_delete_bitmap_storage_optimize_v2(
1524
        int64_t tablet_id, bool has_sequence_col,
1525
30
        int64_t& rowsets_with_useless_delete_bitmap_version) {
1526
    // end_version: create_time
1527
30
    std::map<int64_t, int64_t> tablet_rowsets_map {};
1528
    // rowset_id: {start_version, end_version}
1529
30
    std::map<std::string, std::pair<int64_t, int64_t>> rowset_version_map;
1530
    // Get all visible rowsets of this tablet
1531
195
    auto collect_cb = [&](const doris::RowsetMetaCloudPB& rowset) {
1532
195
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1533
            // ignore dummy rowset [0-1]
1534
0
            return;
1535
0
        }
1536
195
        tablet_rowsets_map[rowset.end_version()] = rowset.creation_time();
1537
195
        rowset_version_map[rowset.rowset_id_v2()] =
1538
195
                std::make_pair(rowset.start_version(), rowset.end_version());
1539
195
    };
1540
30
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1541
0
        return ret;
1542
0
    }
1543
1544
30
    std::unordered_set<std::string> pending_delete_bitmaps;
1545
30
    if (auto ret = get_pending_delete_bitmap_keys(tablet_id, pending_delete_bitmaps); ret < 0) {
1546
0
        return ret;
1547
0
    }
1548
1549
30
    std::unique_ptr<RangeGetIterator> it;
1550
30
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
1551
30
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
1552
30
    std::string last_rowset_id = "";
1553
30
    int64_t last_version = 0;
1554
30
    int64_t last_failed_version = 0;
1555
30
    std::vector<int64_t> failed_versions;
1556
30
    auto print_failed_versions = [&]() {
1557
4
        TEST_SYNC_POINT_CALLBACK(
1558
4
                "InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_"
1559
4
                "rowset",
1560
4
                &tablet_id, &last_rowset_id);
1561
4
        rowsets_with_useless_delete_bitmap_version++;
1562
        // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18]
1563
        // print as [8-11, 13, 17-18]
1564
4
        int64_t last_start_version = -1;
1565
4
        int64_t last_end_version = -1;
1566
4
        std::stringstream ss;
1567
4
        ss << "[";
1568
9
        for (int64_t version : failed_versions) {
1569
9
            if (last_start_version == -1) {
1570
4
                last_start_version = version;
1571
4
                last_end_version = version;
1572
4
                continue;
1573
4
            }
1574
5
            if (last_end_version + 1 == version) {
1575
2
                last_end_version = version;
1576
3
            } else {
1577
3
                if (last_start_version == last_end_version) {
1578
3
                    ss << last_start_version << ", ";
1579
3
                } else {
1580
0
                    ss << last_start_version << "-" << last_end_version << ", ";
1581
0
                }
1582
3
                last_start_version = version;
1583
3
                last_end_version = version;
1584
3
            }
1585
5
        }
1586
4
        if (last_start_version == last_end_version) {
1587
3
            ss << last_start_version;
1588
3
        } else {
1589
1
            ss << last_start_version << "-" << last_end_version;
1590
1
        }
1591
4
        ss << "]";
1592
4
        std::stringstream version_str;
1593
4
        auto it = rowset_version_map.find(last_rowset_id);
1594
4
        if (it != rowset_version_map.end()) {
1595
4
            version_str << "[" << it->second.first << "-" << it->second.second << "]";
1596
4
        }
1597
4
        LOG(WARNING) << fmt::format(
1598
4
                "[delete bitmap check fails] delete bitmap storage optimize v2 check fail "
1599
4
                "for instance_id={}, tablet_id={}, rowset_id={}, version={} found delete "
1600
4
                "bitmap with versions={}, size={}",
1601
4
                instance_id_, tablet_id, last_rowset_id, version_str.str(), ss.str(),
1602
4
                failed_versions.size());
1603
4
    };
1604
30
    using namespace std::chrono;
1605
30
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
1606
30
    do {
1607
30
        std::unique_ptr<Transaction> txn;
1608
30
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1609
30
        if (err != TxnErrorCode::TXN_OK) {
1610
0
            LOG(WARNING) << "failed to create txn";
1611
0
            return -1;
1612
0
        }
1613
30
        err = txn->get(begin, end, &it);
1614
30
        if (err != TxnErrorCode::TXN_OK) {
1615
0
            LOG(WARNING) << "failed to get delete bitmap kv, err=" << err;
1616
0
            return -1;
1617
0
        }
1618
30
        if (!it->has_next()) {
1619
0
            break;
1620
0
        }
1621
771
        while (it->has_next() && !stopped()) {
1622
741
            auto [k, v] = it->next();
1623
741
            std::string_view k1 = k;
1624
741
            k1.remove_prefix(1);
1625
741
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1626
741
            decode_key(&k1, &out);
1627
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1628
741
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1629
741
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1630
741
            if (!it->has_next()) {
1631
30
                begin = k;
1632
30
                begin.push_back('\x00'); // Update to next smallest key for iteration
1633
30
            }
1634
741
            if (rowset_id == last_rowset_id && version == last_version) {
1635
                // skip the same rowset and version
1636
167
                continue;
1637
167
            }
1638
574
            if (rowset_id != last_rowset_id && !failed_versions.empty()) {
1639
3
                print_failed_versions();
1640
3
                last_failed_version = 0;
1641
3
                failed_versions.clear();
1642
3
            }
1643
574
            last_rowset_id = rowset_id;
1644
574
            last_version = version;
1645
574
            if (tablet_rowsets_map.find(version) != tablet_rowsets_map.end()) {
1646
548
                continue;
1647
548
            }
1648
26
            auto version_it = rowset_version_map.find(rowset_id);
1649
26
            if (version_it == rowset_version_map.end()) {
1650
                // checked in do_delete_bitmap_inverted_check
1651
1
                continue;
1652
1
            }
1653
25
            if (pending_delete_bitmaps.contains(std::string(k))) {
1654
3
                continue;
1655
3
            }
1656
22
            if (has_sequence_col && version >= version_it->second.first &&
1657
22
                version <= version_it->second.second) {
1658
5
                continue;
1659
5
            }
1660
            // there may be an interval in this situation:
1661
            // 1. finish compaction job; 2. checker; 3. finish agg and remove delete bitmap to ms
1662
17
            auto rowset_it = tablet_rowsets_map.upper_bound(version);
1663
17
            if (rowset_it == tablet_rowsets_map.end()) {
1664
1
                if (version != last_failed_version) {
1665
1
                    failed_versions.push_back(version);
1666
1
                }
1667
1
                last_failed_version = version;
1668
1
                continue;
1669
1
            }
1670
16
            if (rowset_it->second + config::delete_bitmap_storage_optimize_v2_check_skip_seconds >=
1671
16
                now) {
1672
8
                continue;
1673
8
            }
1674
8
            if (version != last_failed_version) {
1675
8
                failed_versions.push_back(version);
1676
8
            }
1677
8
            last_failed_version = version;
1678
8
        }
1679
30
    } while (it->more() && !stopped());
1680
30
    if (!failed_versions.empty()) {
1681
1
        print_failed_versions();
1682
1
    }
1683
30
    LOG(INFO) << fmt::format(
1684
30
            "[delete bitmap checker] finish check delete bitmap storage optimize v2 for "
1685
30
            "instance_id={}, tablet_id={}, rowsets_num={}, "
1686
30
            "rowsets_with_useless_delete_bitmap_version={}",
1687
30
            instance_id_, tablet_id, tablet_rowsets_map.size(),
1688
30
            rowsets_with_useless_delete_bitmap_version);
1689
30
    return (rowsets_with_useless_delete_bitmap_version > 1 ? 1 : 0);
1690
30
}
1691
1692
3
int InstanceChecker::do_delete_bitmap_storage_optimize_check(int version) {
1693
3
    if (version != 2) {
1694
0
        return -1;
1695
0
    }
1696
3
    int64_t total_tablets_num {0};
1697
3
    int64_t failed_tablets_num {0};
1698
1699
    // for v2 check
1700
3
    int64_t max_rowsets_with_useless_delete_bitmap_version = 0;
1701
3
    int64_t tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = 0;
1702
1703
    // check that for every visible rowset, there exists at least delete one bitmap in MS
1704
30
    int ret = traverse_mow_tablet([&](int64_t tablet_id, bool has_sequence_col) {
1705
30
        ++total_tablets_num;
1706
30
        int64_t rowsets_with_useless_delete_bitmap_version = 0;
1707
30
        int res = check_delete_bitmap_storage_optimize_v2(
1708
30
                tablet_id, has_sequence_col, rowsets_with_useless_delete_bitmap_version);
1709
30
        if (rowsets_with_useless_delete_bitmap_version >
1710
30
            max_rowsets_with_useless_delete_bitmap_version) {
1711
1
            max_rowsets_with_useless_delete_bitmap_version =
1712
1
                    rowsets_with_useless_delete_bitmap_version;
1713
1
            tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = tablet_id;
1714
1
        }
1715
30
        failed_tablets_num += (res != 0);
1716
30
        return res;
1717
30
    });
1718
1719
3
    if (ret < 0) {
1720
0
        return ret;
1721
0
    }
1722
1723
3
    g_bvar_max_rowsets_with_useless_delete_bitmap_version.put(
1724
3
            instance_id_, max_rowsets_with_useless_delete_bitmap_version);
1725
1726
3
    std::stringstream ss;
1727
3
    ss << "[delete bitmap checker] check delete bitmap storage optimize v" << version
1728
3
       << " for instance_id=" << instance_id_ << ", total_tablets_num=" << total_tablets_num
1729
3
       << ", failed_tablets_num=" << failed_tablets_num
1730
3
       << ". max_rowsets_with_useless_delete_bitmap_version="
1731
3
       << max_rowsets_with_useless_delete_bitmap_version
1732
3
       << ", tablet_id=" << tablet_id_with_max_rowsets_with_useless_delete_bitmap_version;
1733
3
    LOG(INFO) << ss.str();
1734
1735
3
    return (failed_tablets_num > 0) ? 1 : 0;
1736
3
}
1737
1738
3
int InstanceChecker::do_mow_job_key_check() {
1739
3
    std::unique_ptr<RangeGetIterator> it;
1740
3
    std::string begin = mow_tablet_job_key({instance_id_, 0, 0});
1741
3
    std::string end = mow_tablet_job_key({instance_id_, INT64_MAX, 0});
1742
3
    MowTabletJobPB mow_tablet_job;
1743
3
    do {
1744
3
        std::unique_ptr<Transaction> txn;
1745
3
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1746
3
        if (err != TxnErrorCode::TXN_OK) {
1747
0
            LOG(WARNING) << "failed to create txn";
1748
0
            return -1;
1749
0
        }
1750
3
        err = txn->get(begin, end, &it);
1751
3
        if (err != TxnErrorCode::TXN_OK) {
1752
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
1753
0
            return -1;
1754
0
        }
1755
3
        int64_t now = duration_cast<std::chrono::seconds>(
1756
3
                              std::chrono::system_clock::now().time_since_epoch())
1757
3
                              .count();
1758
3
        while (it->has_next() && !stopped()) {
1759
2
            auto [k, v] = it->next();
1760
2
            std::string_view k1 = k;
1761
2
            k1.remove_prefix(1);
1762
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1763
2
            decode_key(&k1, &out);
1764
            // 0x01 "meta" ${instance_id} "mow_tablet_job" ${table_id} ${initiator}
1765
2
            auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1766
2
            auto initiator = std::get<int64_t>(std::get<0>(out[4]));
1767
2
            if (!mow_tablet_job.ParseFromArray(v.data(), v.size())) [[unlikely]] {
1768
0
                LOG(WARNING) << "failed to parse MowTabletJobPB";
1769
0
                return -1;
1770
0
            }
1771
2
            int64_t expiration = mow_tablet_job.expiration();
1772
            // check job key failed should meet both following two condition:
1773
            // 1. job key is expired
1774
            // 2. table lock key is not found or key is not expired
1775
2
            if (expiration < now - config::mow_job_key_check_expiration_diff_seconds) {
1776
2
                std::string lock_key =
1777
2
                        meta_delete_bitmap_update_lock_key({instance_id_, table_id, -1});
1778
2
                std::string lock_val;
1779
2
                err = txn->get(lock_key, &lock_val);
1780
2
                std::string reason = "";
1781
2
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
1782
0
                    reason = "table lock key not found";
1783
1784
2
                } else {
1785
2
                    DeleteBitmapUpdateLockPB lock_info;
1786
2
                    if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
1787
0
                        LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB";
1788
0
                        return -1;
1789
0
                    }
1790
2
                    if (lock_info.expiration() > now || lock_info.lock_id() != -1) {
1791
2
                        reason = "table lock is not expired,lock_id=" +
1792
2
                                 std::to_string(lock_info.lock_id());
1793
2
                    }
1794
2
                }
1795
2
                if (reason != "") {
1796
2
                    LOG(WARNING) << fmt::format(
1797
2
                            "[compaction key check fails] mow job key check fail for "
1798
2
                            "instance_id={}, table_id={}, initiator={}, expiration={}, now={}, "
1799
2
                            "reason={}",
1800
2
                            instance_id_, table_id, initiator, expiration, now, reason);
1801
2
                    return -1;
1802
2
                }
1803
2
            }
1804
2
        }
1805
1
        begin = it->next_begin_key(); // Update to next smallest key for iteration
1806
1
    } while (it->more() && !stopped());
1807
1
    return 0;
1808
3
}
1809
4
int InstanceChecker::do_tablet_stats_key_check() {
1810
4
    int ret = 0;
1811
1812
4
    int64_t nums_leak = 0;
1813
4
    int64_t nums_loss = 0;
1814
4
    int64_t nums_scanned = 0;
1815
4
    int64_t nums_abnormal = 0;
1816
1817
4
    std::string begin = meta_tablet_key({instance_id_, 0, 0, 0, 0});
1818
4
    std::string end = meta_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1819
    // inverted check tablet exists
1820
4
    LOG(INFO) << "begin inverted check stats_tablet_key";
1821
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1822
4
        int ret = check_stats_tablet_key_exists(key, value);
1823
4
        nums_scanned++;
1824
4
        if (ret == 1) {
1825
1
            nums_loss++;
1826
1
        }
1827
4
        return ret;
1828
4
    });
1829
4
    if (ret == -1) {
1830
0
        LOG(WARNING) << "failed to inverted check if stats tablet key exists";
1831
0
        return -1;
1832
4
    } else if (ret == 1) {
1833
1
        LOG(WARNING) << "stats_tablet_key loss, nums_scanned=" << nums_scanned
1834
1
                     << ", nums_loss=" << nums_loss;
1835
1
        return 1;
1836
1
    }
1837
3
    LOG(INFO) << "finish inverted check stats_tablet_key, nums_scanned=" << nums_scanned
1838
3
              << ", nums_loss=" << nums_loss;
1839
1840
3
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1841
3
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1842
3
    nums_scanned = 0;
1843
    // check tablet exists
1844
3
    LOG(INFO) << "begin check stats_tablet_key leaked";
1845
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1846
4
        int ret = check_stats_tablet_key_leaked(key, value);
1847
4
        nums_scanned++;
1848
4
        if (ret == 1) {
1849
1
            nums_leak++;
1850
1
        }
1851
4
        return ret;
1852
4
    });
1853
3
    if (ret == -1) {
1854
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1855
0
        return -1;
1856
3
    } else if (ret == 1) {
1857
1
        LOG(WARNING) << "stats_tablet_key leaked, nums_scanned=" << nums_scanned
1858
1
                     << ", nums_leak=" << nums_leak;
1859
1
        return 1;
1860
1
    }
1861
2
    LOG(INFO) << "finish check stats_tablet_key leaked, nums_scanned=" << nums_scanned
1862
2
              << ", nums_leak=" << nums_leak;
1863
1864
2
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1865
2
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1866
2
    nums_scanned = 0;
1867
    // check if key is normal
1868
2
    LOG(INFO) << "begin check stats_tablet_key abnormal";
1869
2
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1870
2
        int ret = check_stats_tablet_key(key, value);
1871
2
        nums_scanned++;
1872
2
        if (ret == 1) {
1873
1
            nums_abnormal++;
1874
1
        }
1875
2
        return ret;
1876
2
    });
1877
2
    if (ret == -1) {
1878
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1879
0
        return -1;
1880
2
    } else if (ret == 1) {
1881
1
        LOG(WARNING) << "stats_tablet_key abnormal, nums_scanned=" << nums_scanned
1882
1
                     << ", nums_abnormal=" << nums_abnormal;
1883
1
        return 1;
1884
1
    }
1885
1
    LOG(INFO) << "finish check stats_tablet_key, nums_scanned=" << nums_scanned
1886
1
              << ", nums_abnormal=" << nums_abnormal;
1887
1
    return 0;
1888
2
}
1889
1890
4
int InstanceChecker::check_stats_tablet_key_exists(std::string_view key, std::string_view value) {
1891
4
    std::string_view k1 = key;
1892
4
    k1.remove_prefix(1);
1893
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1894
4
    decode_key(&k1, &out);
1895
    // 0x01 "meta" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1896
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1897
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1898
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1899
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1900
4
    std::string tablet_stats_key =
1901
4
            stats_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1902
4
    int ret = key_exist(txn_kv_.get(), tablet_stats_key);
1903
4
    if (ret == 1) {
1904
        // clang-format off
1905
1
        LOG(WARNING) << "stats tablet key's tablet key loss,"
1906
1
                    << " stats tablet key=" << hex(tablet_stats_key)
1907
1
                    << " meta tablet key=" << hex(key);
1908
        // clang-format on
1909
1
        return 1;
1910
3
    } else if (ret == -1) {
1911
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_stats_key);
1912
0
        return -1;
1913
0
    }
1914
3
    LOG(INFO) << "check stats_tablet_key_exists ok, key=" << hex(key);
1915
3
    return 0;
1916
4
}
1917
1918
4
int InstanceChecker::check_stats_tablet_key_leaked(std::string_view key, std::string_view value) {
1919
4
    std::string_view k1 = key;
1920
4
    k1.remove_prefix(1);
1921
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1922
4
    decode_key(&k1, &out);
1923
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1924
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1925
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1926
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1927
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1928
4
    std::string tablet_key =
1929
4
            meta_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1930
4
    int ret = key_exist(txn_kv_.get(), tablet_key);
1931
4
    if (ret == 1) {
1932
        // clang-format off
1933
1
        LOG(WARNING) << "stats tablet key's tablet key leak,"
1934
1
                    << " stats tablet key=" << hex(key)
1935
1
                    << " meta tablet key=" << hex(tablet_key);
1936
        // clang-format on
1937
1
        return 1;
1938
3
    } else if (ret == -1) {
1939
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_key);
1940
0
        return -1;
1941
0
    }
1942
3
    LOG(INFO) << "check stats_tablet_key_leaked ok, key=" << hex(key);
1943
3
    return 0;
1944
4
}
1945
1946
2
int InstanceChecker::check_stats_tablet_key(std::string_view key, std::string_view value) {
1947
2
    TabletStatsPB tablet_stats_pb;
1948
2
    std::string_view k1 = key;
1949
2
    k1.remove_prefix(1);
1950
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1951
2
    decode_key(&k1, &out);
1952
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1953
2
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1954
2
    std::unique_ptr<Transaction> txn;
1955
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1956
2
    if (err != TxnErrorCode::TXN_OK) {
1957
0
        LOG_WARNING("failed to recycle tablet ")
1958
0
                .tag("tablet id", tablet_id)
1959
0
                .tag("instance_id", instance_id_)
1960
0
                .tag("reason", "failed to create txn");
1961
0
        return -1;
1962
0
    }
1963
2
    std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id});
1964
2
    std::string tablet_idx_val;
1965
2
    TabletIndexPB tablet_idx;
1966
2
    err = txn->get(tablet_idx_key, &tablet_idx_val);
1967
2
    if (err != TxnErrorCode::TXN_OK) {
1968
        // clang-format off
1969
0
        LOG(WARNING) << "failed to get tablet index key,"
1970
0
                        << " key=" << hex(tablet_idx_key)
1971
0
                        << " code=" << err;
1972
        // clang-format on
1973
0
        return -1;
1974
0
    }
1975
2
    tablet_idx.ParseFromString(tablet_idx_val);
1976
2
    MetaServiceCode code = MetaServiceCode::OK;
1977
2
    std::string msg;
1978
2
    internal_get_tablet_stats(code, msg, txn.get(), instance_id_, tablet_idx, tablet_stats_pb);
1979
2
    if (code != MetaServiceCode::OK) {
1980
        // clang-format off
1981
0
        LOG(WARNING) << "failed to get tablet stats,"
1982
0
                        << " code=" << code 
1983
0
                        << " msg=" << msg;
1984
        // clang-format on
1985
0
        return -1;
1986
0
    }
1987
1988
2
    GetRowsetResponse resp;
1989
    // get rowsets in tablet
1990
2
    internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id_,
1991
2
                        tablet_id, code, msg, &resp);
1992
2
    if (code != MetaServiceCode::OK) {
1993
0
        LOG_WARNING("failed to get rowsets of tablet when check stats tablet key")
1994
0
                .tag("tablet id", tablet_id)
1995
0
                .tag("msg", msg)
1996
0
                .tag("code", code)
1997
0
                .tag("instance id", instance_id_);
1998
0
        return -1;
1999
0
    }
2000
2
    int64_t num_rows = 0;
2001
2
    int64_t num_rowsets = 0;
2002
2
    int64_t num_segments = 0;
2003
2
    int64_t total_data_size = 0;
2004
2
    for (const auto& rs_meta : resp.rowset_meta()) {
2005
2
        num_rows += rs_meta.num_rows();
2006
2
        num_rowsets++;
2007
2
        num_segments += rs_meta.num_segments();
2008
2
        total_data_size += rs_meta.total_disk_size();
2009
2
    }
2010
2
    int ret = 0;
2011
2
    if (tablet_stats_pb.data_size() != total_data_size) {
2012
1
        ret = 1;
2013
        // clang-format off
2014
1
        LOG(WARNING) << " tablet_stats_pb's data size is not same with all rowset total data size,"
2015
1
                        << " tablet_stats_pb's data size=" << tablet_stats_pb.data_size()
2016
1
                        << " all rowset total data size=" << total_data_size
2017
1
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2018
        // clang-format on
2019
1
    } else if (tablet_stats_pb.num_rows() != num_rows) {
2020
0
        ret = 1;
2021
        // clang-format off
2022
0
        LOG(WARNING) << " tablet_stats_pb's num_rows is not same with all rowset total num_rows,"
2023
0
                        << " tablet_stats_pb's num_rows=" << tablet_stats_pb.num_rows()
2024
0
                        << " all rowset total num_rows=" << num_rows
2025
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2026
        // clang-format on
2027
1
    } else if (tablet_stats_pb.num_rowsets() != num_rowsets) {
2028
0
        ret = 1;
2029
        // clang-format off
2030
0
        LOG(WARNING) << " tablet_stats_pb's num_rowsets is not same with all rowset nums,"
2031
0
                        << " tablet_stats_pb's num_rowsets=" << tablet_stats_pb.num_rowsets()
2032
0
                        << " all rowset nums=" << num_rowsets
2033
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2034
        // clang-format on
2035
1
    } else if (tablet_stats_pb.num_segments() != num_segments) {
2036
0
        ret = 1;
2037
        // clang-format off
2038
0
        LOG(WARNING) << " tablet_stats_pb's num_segments is not same with all rowset total num_segments,"
2039
0
                        << " tablet_stats_pb's num_segments=" << tablet_stats_pb.num_segments()
2040
0
                        << " all rowset total num_segments=" << num_segments
2041
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2042
        // clang-format on
2043
0
    }
2044
2045
2
    return ret;
2046
2
}
2047
2048
int InstanceChecker::scan_and_handle_kv(
2049
        std::string& start_key, const std::string& end_key,
2050
12
        std::function<int(std::string_view, std::string_view)> handle_kv) {
2051
12
    std::unique_ptr<Transaction> txn;
2052
12
    int ret = 0;
2053
12
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2054
12
    if (err != TxnErrorCode::TXN_OK) {
2055
0
        LOG(WARNING) << "failed to init txn";
2056
0
        return -1;
2057
0
    }
2058
12
    std::unique_ptr<RangeGetIterator> it;
2059
12
    int limit = 10000;
2060
12
    TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:limit", &limit);
2061
13
    do {
2062
13
        err = txn->get(start_key, end_key, &it, false, limit);
2063
13
        TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:get_err", &err);
2064
13
        if (err == TxnErrorCode::TXN_TOO_OLD) {
2065
1
            LOG(WARNING) << "failed to get range kv, err=txn too old, "
2066
1
                         << " now fallback to non snapshot scan";
2067
1
            err = txn_kv_->create_txn(&txn);
2068
1
            if (err == TxnErrorCode::TXN_OK) {
2069
1
                err = txn->get(start_key, end_key, &it);
2070
1
            }
2071
1
        }
2072
13
        if (err != TxnErrorCode::TXN_OK) {
2073
0
            LOG(WARNING) << "internal error, failed to get range kv, err=" << err;
2074
0
            return -1;
2075
0
        }
2076
2077
229
        while (it->has_next() && !stopped()) {
2078
216
            auto [k, v] = it->next();
2079
2080
216
            int handle_ret = handle_kv(k, v);
2081
216
            if (handle_ret == -1) {
2082
0
                return -1;
2083
216
            } else {
2084
216
                ret = std::max(ret, handle_ret);
2085
216
            }
2086
216
            if (!it->has_next()) {
2087
13
                start_key = k;
2088
13
            }
2089
216
        }
2090
13
        start_key = it->next_begin_key();
2091
13
    } while (it->more() && !stopped());
2092
12
    return ret;
2093
12
}
2094
2095
2
int InstanceChecker::do_version_key_check() {
2096
2
    std::unique_ptr<RangeGetIterator> it;
2097
2
    std::string begin = table_version_key({instance_id_, 0, 0});
2098
2
    std::string end = table_version_key({instance_id_, INT64_MAX, 0});
2099
2
    bool check_res = true;
2100
2
    do {
2101
2
        std::unique_ptr<Transaction> txn;
2102
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
2103
2
        if (err != TxnErrorCode::TXN_OK) {
2104
0
            LOG(WARNING) << "failed to create txn";
2105
0
            return -1;
2106
0
        }
2107
2
        err = txn->get(begin, end, &it);
2108
2
        if (err != TxnErrorCode::TXN_OK) {
2109
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2110
0
            return -1;
2111
0
        }
2112
4
        while (it->has_next() && !stopped()) {
2113
2
            auto [k, v] = it->next();
2114
2
            std::string_view k1 = k;
2115
2
            k1.remove_prefix(1);
2116
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2117
2
            decode_key(&k1, &out);
2118
2
            int64_t table_version = -1;
2119
            // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id}
2120
2
            if (!txn->decode_atomic_int(v, &table_version)) {
2121
0
                LOG(WARNING) << "malformed table version value";
2122
0
                return -1;
2123
0
            }
2124
2
            auto table_id = std::get<int64_t>(std::get<0>(out[4]));
2125
2
            auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2126
2
            std::string partition_version_key_begin =
2127
2
                    partition_version_key({instance_id_, db_id, table_id, 0});
2128
2
            std::string partition_version_key_end =
2129
2
                    partition_version_key({instance_id_, db_id, table_id, INT64_MAX});
2130
2
            VersionPB partition_version_pb;
2131
2132
2
            do {
2133
2
                std::unique_ptr<Transaction> txn;
2134
2
                TxnErrorCode err = txn_kv_->create_txn(&txn);
2135
2
                if (err != TxnErrorCode::TXN_OK) {
2136
0
                    LOG(WARNING) << "failed to create txn";
2137
0
                    return -1;
2138
0
                }
2139
2
                err = txn->get(partition_version_key_begin, partition_version_key_end, &it);
2140
2
                if (err != TxnErrorCode::TXN_OK) {
2141
0
                    LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2142
0
                    return -1;
2143
0
                }
2144
13
                while (it->has_next() && !stopped()) {
2145
11
                    auto [k, v] = it->next();
2146
                    // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id}
2147
11
                    std::string_view k1 = k;
2148
11
                    k1.remove_prefix(1);
2149
11
                    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2150
11
                    decode_key(&k1, &out);
2151
11
                    if (!partition_version_pb.ParseFromArray(v.data(), v.size())) [[unlikely]] {
2152
0
                        LOG(WARNING) << "failed to parse partition VersionPB";
2153
0
                        return -1;
2154
0
                    }
2155
11
                    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
2156
11
                    int64_t partition_version = partition_version_pb.version();
2157
11
                    if (table_version < partition_version) {
2158
3
                        check_res = false;
2159
3
                        LOG(WARNING)
2160
3
                                << "table version is less than partition version,"
2161
3
                                << " table_id: " << table_id << "tablet_version: " << table_version
2162
3
                                << " partition_id: " << partition_id
2163
3
                                << " partition_version: " << partition_version;
2164
3
                    }
2165
11
                }
2166
2
                partition_version_key_begin = it->next_begin_key();
2167
2
            } while (it->more() && !stopped());
2168
2
        }
2169
2
        begin = it->next_begin_key(); // Update to next smallest key for iteration
2170
2
    } while (it->more() && !stopped());
2171
2
    return check_res ? 0 : -1;
2172
2
}
2173
2174
1
int InstanceChecker::do_restore_job_check() {
2175
1
    int64_t num_prepared = 0;
2176
1
    int64_t num_committed = 0;
2177
1
    int64_t num_dropped = 0;
2178
1
    int64_t num_completed = 0;
2179
1
    int64_t num_recycling = 0;
2180
1
    int64_t num_cost_many_time = 0;
2181
1
    const int64_t COST_MANY_THRESHOLD = 3600;
2182
2183
1
    using namespace std::chrono;
2184
1
    auto start_time = steady_clock::now();
2185
1
    DORIS_CLOUD_DEFER {
2186
1
        g_bvar_checker_restore_job_prepared_state.put(instance_id_, num_prepared);
2187
1
        g_bvar_checker_restore_job_committed_state.put(instance_id_, num_committed);
2188
1
        g_bvar_checker_restore_job_dropped_state.put(instance_id_, num_dropped);
2189
1
        g_bvar_checker_restore_job_completed_state.put(instance_id_, num_completed);
2190
1
        g_bvar_checker_restore_job_recycling_state.put(instance_id_, num_recycling);
2191
1
        g_bvar_checker_restore_job_cost_many_time.put(instance_id_, num_cost_many_time);
2192
1
        auto cost_ms =
2193
1
                duration_cast<std::chrono::milliseconds>(steady_clock::now() - start_time).count();
2194
1
        LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
2195
1
                  << "ms. instance_id=" << instance_id_ << " num_prepared=" << num_prepared
2196
1
                  << " num_committed=" << num_committed << " num_dropped=" << num_dropped
2197
1
                  << " num_completed=" << num_completed << " num_recycling=" << num_recycling
2198
1
                  << " num_cost_many_time=" << num_cost_many_time;
2199
1
    };
2200
2201
1
    LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
2202
2203
1
    JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
2204
1
    JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
2205
1
    std::string begin;
2206
1
    std::string end;
2207
1
    job_restore_tablet_key(restore_job_key_info0, &begin);
2208
1
    job_restore_tablet_key(restore_job_key_info1, &end);
2209
1
    std::unique_ptr<RangeGetIterator> it;
2210
1
    do {
2211
1
        std::unique_ptr<Transaction> txn;
2212
1
        TxnErrorCode err = txn_kv_->create_txn(&txn);
2213
1
        if (err != TxnErrorCode::TXN_OK) {
2214
0
            LOG(WARNING) << "failed to create txn";
2215
0
            return -1;
2216
0
        }
2217
1
        err = txn->get(begin, end, &it);
2218
1
        if (err != TxnErrorCode::TXN_OK) {
2219
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2220
0
            return -1;
2221
0
        }
2222
2223
1
        if (!it->has_next()) {
2224
0
            break;
2225
0
        }
2226
3
        while (it->has_next()) {
2227
3
            auto [k, v] = it->next();
2228
3
            RestoreJobCloudPB restore_job_pb;
2229
3
            if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
2230
0
                LOG_WARNING("malformed restore job value").tag("key", hex(k));
2231
0
                return -1;
2232
0
            }
2233
2234
3
            switch (restore_job_pb.state()) {
2235
1
            case RestoreJobCloudPB::PREPARED:
2236
1
                ++num_prepared;
2237
1
                break;
2238
1
            case RestoreJobCloudPB::COMMITTED:
2239
1
                ++num_committed;
2240
1
                break;
2241
0
            case RestoreJobCloudPB::DROPPED:
2242
0
                ++num_dropped;
2243
0
                break;
2244
1
            case RestoreJobCloudPB::COMPLETED:
2245
1
                ++num_completed;
2246
1
                break;
2247
0
            case RestoreJobCloudPB::RECYCLING:
2248
0
                ++num_recycling;
2249
0
                break;
2250
0
            default:
2251
0
                break;
2252
3
            }
2253
2254
3
            int64_t current_time = ::time(nullptr);
2255
3
            if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
2256
3
                 restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
2257
3
                current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD) {
2258
                // restore job run more than 1 hour
2259
1
                ++num_cost_many_time;
2260
1
                LOG_WARNING("restore job cost too many time")
2261
1
                        .tag("key", hex(k))
2262
1
                        .tag("tablet_id", restore_job_pb.tablet_id())
2263
1
                        .tag("state", restore_job_pb.state())
2264
1
                        .tag("ctime_s", restore_job_pb.ctime_s())
2265
1
                        .tag("mtime_s", restore_job_pb.mtime_s());
2266
1
            }
2267
2268
3
            if (!it->has_next()) {
2269
1
                begin = k;
2270
1
                begin.push_back('\x00'); // Update to next smallest key for iteration
2271
1
                break;
2272
1
            }
2273
3
        }
2274
1
    } while (it->more() && !stopped());
2275
1
    return 0;
2276
1
}
2277
2278
3
int InstanceChecker::check_txn_info_key(std::string_view key, std::string_view value) {
2279
3
    std::unordered_map<int64_t, std::string> txn_info_;
2280
3
    TxnLabelPB txn_label_pb;
2281
2282
6
    auto handle_check_txn_label_key = [&](std::string_view key, std::string_view value) -> int {
2283
6
        TxnInfoPB txn_info_pb;
2284
6
        std::string_view k1 = key;
2285
6
        k1.remove_prefix(1);
2286
6
        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2287
6
        decode_key(&k1, &out);
2288
        // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2289
6
        if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2290
0
            LOG(WARNING) << "failed to parse TxnInfoPB";
2291
0
            return -1;
2292
0
        }
2293
6
        auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2294
6
        auto it = txn_info_.find(txn_id);
2295
6
        if (it == txn_info_.end()) {
2296
0
            return 0;
2297
6
        } else {
2298
6
            if (it->second != txn_info_pb.label()) {
2299
1
                LOG(WARNING) << "txn_info_pb's txn_label not same with txn_label_pb's txn_label,"
2300
1
                             << " txn_info_pb's txn_label: " << txn_info_pb.label()
2301
1
                             << " txn_label_pb meta: " << txn_label_pb.ShortDebugString();
2302
1
                return 1;
2303
1
            }
2304
6
        }
2305
5
        return 0;
2306
6
    };
2307
3
    std::string_view k1 = key;
2308
3
    k1.remove_prefix(1);
2309
3
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2310
3
    decode_key(&k1, &out);
2311
    // 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
2312
3
    if (!txn_label_pb.ParseFromArray(value.data(), value.size() - VERSION_STAMP_LEN)) {
2313
1
        LOG(WARNING) << "failed to parse TxnLabelPB";
2314
1
        return -1;
2315
1
    }
2316
2
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2317
2
    auto label = std::get<std::string>(std::get<0>(out[4]));
2318
    // txn_id -> txn_label
2319
6
    for (const auto& txn_id : txn_label_pb.txn_ids()) {
2320
6
        txn_info_.insert({txn_id, label});
2321
6
    }
2322
2
    std::string txn_info_key_begin = txn_info_key({instance_id_, db_id, 0});
2323
2
    std::string txn_info_key_end = txn_info_key({instance_id_, db_id, INT64_MAX});
2324
2
    return scan_and_handle_kv(txn_info_key_begin, txn_info_key_end,
2325
6
                              [&](std::string_view k, std::string_view v) -> int {
2326
6
                                  return handle_check_txn_label_key(k, v);
2327
6
                              });
2328
3
}
2329
2330
6
int InstanceChecker::check_txn_label_key(std::string_view key, std::string_view value) {
2331
6
    TxnInfoPB txn_info_pb;
2332
6
    std::string_view k1 = key;
2333
6
    k1.remove_prefix(1);
2334
6
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2335
6
    decode_key(&k1, &out);
2336
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2337
6
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2338
1
        LOG(WARNING) << "failed to parse TxnInfoPB";
2339
1
        return -1;
2340
1
    }
2341
5
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2342
5
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2343
5
    auto label = txn_info_pb.label();
2344
5
    std::string txn_label = txn_label_key({instance_id_, db_id, label});
2345
5
    std::string txn_label_val;
2346
5
    TxnLabelPB txn_label_pb;
2347
5
    std::unique_ptr<Transaction> txn;
2348
5
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2349
5
    if (err != TxnErrorCode::TXN_OK) {
2350
0
        LOG(WARNING) << "failed to init txn";
2351
0
        return -1;
2352
0
    }
2353
5
    if (txn->get(txn_label, &txn_label_val) != TxnErrorCode::TXN_OK) {
2354
1
        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_label);
2355
1
        return -1;
2356
1
    }
2357
4
    txn_label_pb.ParseFromString(txn_label_val);
2358
4
    auto txn_ids = txn_label_pb.txn_ids();
2359
4
    if (!std::count(txn_ids.begin(), txn_ids.end(), txn_id)) {
2360
        // clang-format off txn_info_pb
2361
1
        LOG(WARNING) << "txn_info_pb's txn_id not found in txn_label_pb info,"
2362
1
                     << " txn_id: " << txn_id
2363
1
                     << " txn_label_pb meta: " << txn_label_pb.ShortDebugString();
2364
        // clang-format on
2365
1
        return 1;
2366
1
    }
2367
3
    return 0;
2368
4
}
2369
2370
4
int InstanceChecker::check_txn_index_key(std::string_view key, std::string_view value) {
2371
4
    TxnInfoPB txn_info_pb;
2372
4
    std::string_view k1 = key;
2373
4
    k1.remove_prefix(1);
2374
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2375
4
    decode_key(&k1, &out);
2376
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2377
4
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2378
1
        LOG(WARNING) << "failed to parse TxnInfoPB";
2379
1
        return -1;
2380
1
    }
2381
3
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2382
3
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2383
    /// get tablet id
2384
3
    std::string txn_index = txn_index_key({instance_id_, txn_id});
2385
3
    std::string txn_index_val;
2386
3
    TxnIndexPB txn_index_pb;
2387
3
    std::unique_ptr<Transaction> txn;
2388
3
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2389
3
    if (err != TxnErrorCode::TXN_OK) {
2390
0
        LOG(WARNING) << "failed to init txn";
2391
0
        return -1;
2392
0
    }
2393
3
    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
2394
1
        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_index);
2395
1
        return -1;
2396
1
    }
2397
2
    txn_index_pb.ParseFromString(txn_index_val);
2398
2
    if (txn_index_pb.tablet_index().db_id() != db_id) {
2399
        // clang-format off txn_info_pb
2400
1
        LOG(WARNING) << "txn_index_pb's db_id not same with txn_info_pb's db_id,"
2401
1
                     << " txn_index_pb meta: " << txn_index_pb.ShortDebugString()
2402
1
                     << " txn_info_pb meta: " << txn_info_pb.ShortDebugString();
2403
        // clang-format on
2404
1
        return 1;
2405
1
    }
2406
1
    return 0;
2407
2
}
2408
2409
3
int InstanceChecker::check_txn_running_key(std::string_view key, std::string_view value) {
2410
3
    TxnRunningPB txn_running_pb;
2411
3
    int64_t current_time =
2412
3
            duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
2413
3
    if (!txn_running_pb.ParseFromArray(value.data(), value.size())) {
2414
1
        LOG(WARNING) << "failed to parse TxnRunningPB";
2415
1
        return -1;
2416
1
    }
2417
2
    if (txn_running_pb.timeout_time() <= current_time) {
2418
1
        LOG(WARNING) << "txn_running_pb.timeout_time() is less than current_time,"
2419
1
                     << " but txn_running_key exists, "
2420
1
                     << " txn_running_pb meta: " << txn_running_pb.ShortDebugString();
2421
1
        return 1;
2422
1
    }
2423
1
    return 0;
2424
2
}
2425
2426
0
int InstanceChecker::do_txn_key_check() {
2427
0
    int ret = 0;
2428
2429
    // check txn info key depend on txn label key
2430
0
    std::string begin = txn_label_key({instance_id_, 0, ""});
2431
0
    std::string end = txn_label_key({instance_id_, INT64_MAX, ""});
2432
0
    int64_t num_scanned = 0;
2433
0
    int64_t num_abnormal = 0;
2434
0
    LOG(INFO) << "begin check txn_label_key and txn_info_key";
2435
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2436
0
        num_scanned++;
2437
0
        int ret = check_txn_info_key(k, v);
2438
0
        if (ret == 1) {
2439
0
            num_abnormal++;
2440
0
        }
2441
0
        return ret;
2442
0
    });
2443
2444
0
    if (ret == 1) {
2445
0
        LOG(WARNING) << "failed to check txn_info_key depending on txn_label_key, num_scanned="
2446
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2447
0
        return 1;
2448
0
    } else if (ret == -1) {
2449
0
        LOG(WARNING) << "failed to check txn label key and txn info key";
2450
0
        return -1;
2451
0
    }
2452
2453
    // check txn label key depend on txn info key
2454
0
    begin = txn_info_key({instance_id_, 0, 0});
2455
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2456
0
    num_scanned = 0;
2457
0
    num_abnormal = 0;
2458
0
    LOG(INFO) << "begin check txn_label_key and txn_info_key";
2459
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2460
0
        num_scanned++;
2461
0
        int ret = check_txn_label_key(k, v);
2462
0
        if (ret == 1) {
2463
0
            num_abnormal++;
2464
0
        }
2465
0
        return ret;
2466
0
    });
2467
0
    if (ret == 1) {
2468
0
        LOG(WARNING) << "failed to check txn_label_key depending on txn_info_key, num_scanned="
2469
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2470
0
        return 1;
2471
0
    } else if (ret == -1) {
2472
0
        LOG(WARNING) << "failed to inverted check txn label key and txn info key";
2473
0
        return -1;
2474
0
    }
2475
0
    LOG(INFO) << "finish check txn_label_key and txn_info_key, num_scanned=" << num_scanned
2476
0
              << ", num_abnormal=" << num_abnormal;
2477
2478
    // check txn index key depend on txn info key
2479
0
    begin = txn_info_key({instance_id_, 0, 0});
2480
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2481
0
    num_scanned = 0;
2482
0
    num_abnormal = 0;
2483
0
    LOG(INFO) << "begin check txn_index_key and txn_info_key";
2484
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2485
0
        num_scanned++;
2486
0
        int ret = check_txn_index_key(k, v);
2487
0
        if (ret == 1) {
2488
0
            num_abnormal++;
2489
0
        }
2490
0
        return ret;
2491
0
    });
2492
0
    if (ret == 1) {
2493
0
        LOG(WARNING) << "failed to check txn_idx_key depending on txn_info_key, num_scanned="
2494
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2495
0
        return 1;
2496
0
    } else if (ret == -1) {
2497
0
        LOG(WARNING) << "failed to check txn index key";
2498
0
        return -1;
2499
0
    }
2500
0
    LOG(INFO) << "finish check txn_index_key and txn_info_key, num_scanned=" << num_scanned
2501
0
              << ", num_abnormal=" << num_abnormal;
2502
2503
    // check txn running key
2504
0
    begin = txn_running_key({instance_id_, 0, 0});
2505
0
    end = txn_running_key({instance_id_, INT64_MAX, 0});
2506
0
    num_scanned = 0;
2507
0
    num_abnormal = 0;
2508
0
    LOG(INFO) << "begin check txn_running_key";
2509
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2510
0
        num_scanned++;
2511
0
        int ret = check_txn_running_key(k, v);
2512
0
        if (ret == 1) {
2513
0
            num_abnormal++;
2514
0
        }
2515
0
        return ret;
2516
0
    });
2517
0
    if (ret == 1) {
2518
0
        LOG(WARNING) << "failed to check txn_running_key, num_scanned=" << num_scanned
2519
0
                     << ", num_abnormal=" << num_abnormal;
2520
0
        return 1;
2521
0
    } else if (ret == -1) {
2522
0
        LOG(WARNING) << "failed to check txn running key";
2523
0
        return -1;
2524
0
    }
2525
0
    LOG(INFO) << "finish check txn_running_key, num_scanned=" << num_scanned
2526
0
              << ", num_abnormal=" << num_abnormal;
2527
0
    return 0;
2528
0
}
2529
2530
2
int InstanceChecker::check_meta_tmp_rowset_key(std::string_view key, std::string_view value) {
2531
2
    TxnInfoPB txn_info_pb;
2532
2
    std::string_view k1 = key;
2533
2
    k1.remove_prefix(1);
2534
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2535
2
    decode_key(&k1, &out);
2536
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2537
2
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2538
0
        LOG(WARNING) << "failed to parse TxnInfoPB";
2539
0
        return -1;
2540
0
    }
2541
    /// get tablet id
2542
2
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2543
2
    std::string txn_index = txn_index_key({instance_id_, txn_id});
2544
2
    std::string txn_index_val;
2545
2
    TxnIndexPB txn_index_pb;
2546
2
    std::unique_ptr<Transaction> txn;
2547
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2548
2
    if (err != TxnErrorCode::TXN_OK) {
2549
0
        LOG(WARNING) << "failed to init txn";
2550
0
        return -1;
2551
0
    }
2552
2
    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
2553
0
        LOG(WARNING) << "failed to get txn index key, key=" << txn_index;
2554
0
        return -1;
2555
0
    }
2556
2
    txn_index_pb.ParseFromString(txn_index_val);
2557
2
    auto tablet_id = txn_index_pb.tablet_index().tablet_id();
2558
2
    std::string meta_tmp_rowset_key = meta_rowset_tmp_key({instance_id_, txn_id, tablet_id});
2559
2
    int is_key_exist = key_exist(txn_kv_.get(), meta_tmp_rowset_key);
2560
2
    if (is_key_exist == 1) {
2561
0
        if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_VISIBLE) {
2562
            // clang-format off
2563
0
            LOG(INFO) << "meta tmp rowset key not exist but txn status != TXN_STATUS_VISIBLE"
2564
0
                        << "meta tmp rowset key=" << meta_tmp_rowset_key
2565
0
                        << "txn_info=" << txn_info_pb.ShortDebugString();
2566
            // clang-format on
2567
0
            return 1;
2568
0
        }
2569
2
    } else if (is_key_exist == 0) {
2570
2
        if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
2571
            // clang-format off
2572
1
            LOG(INFO) << "meta tmp rowset key exist but txn status != TXN_STATUS_PREPARED"
2573
1
                        << "meta tmp rowset key=" << meta_tmp_rowset_key
2574
1
                        << "txn_info=" << txn_info_pb.ShortDebugString();
2575
            // clang-format on
2576
1
            return 1;
2577
1
        }
2578
2
    } else {
2579
0
        LOG(WARNING) << "failed to get key, key=" << meta_tmp_rowset_key;
2580
0
        return -1;
2581
0
    }
2582
1
    return 0;
2583
2
}
2584
2585
2
int InstanceChecker::check_meta_rowset_key(std::string_view key, std::string_view value) {
2586
2
    RowsetMetaCloudPB meta_rowset_pb;
2587
2
    if (!meta_rowset_pb.ParseFromArray(value.data(), value.size())) {
2588
0
        LOG(WARNING) << "failed to parse RowsetMetaCloudPB";
2589
0
        return -1;
2590
0
    }
2591
2
    std::string tablet_index_key = meta_tablet_idx_key({instance_id_, meta_rowset_pb.tablet_id()});
2592
2
    if (key_exist(txn_kv_.get(), tablet_index_key) == 1) {
2593
1
        LOG(WARNING) << "rowset's tablet id not found in fdb"
2594
1
                     << "tablet_index_key: " << tablet_index_key
2595
1
                     << "rowset meta: " << meta_rowset_pb.ShortDebugString();
2596
1
        return 1;
2597
1
    }
2598
1
    return 0;
2599
2
}
2600
2601
0
int InstanceChecker::do_meta_rowset_key_check() {
2602
0
    int ret = 0;
2603
2604
0
    std::string begin = meta_rowset_key({instance_id_, 0, 0});
2605
0
    std::string end = meta_rowset_key({instance_id_, INT64_MAX, 0});
2606
0
    int64_t num_scanned = 0;
2607
0
    int64_t num_loss = 0;
2608
2609
0
    ret = scan_and_handle_kv(begin, end, [&](std::string_view k, std::string_view v) {
2610
0
        num_scanned++;
2611
0
        int ret = check_meta_rowset_key(k, v);
2612
0
        if (ret == 1) {
2613
0
            num_loss++;
2614
0
        }
2615
0
        return ret;
2616
0
    });
2617
0
    if (ret == -1) {
2618
0
        LOG(WARNING) << "failed to check meta rowset key,";
2619
0
        return -1;
2620
0
    } else if (ret == 1) {
2621
0
        LOG(WARNING) << "meta rowset key may be loss, num_scanned=" << num_scanned
2622
0
                     << ", num_loss=" << num_loss;
2623
0
    }
2624
0
    LOG(INFO) << "meta rowset key check finish, num_scanned=" << num_scanned
2625
0
              << ", num_loss=" << num_loss;
2626
2627
0
    begin = txn_info_key({instance_id_, 0, 0});
2628
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2629
0
    num_scanned = 0;
2630
0
    num_loss = 0;
2631
2632
0
    ret = scan_and_handle_kv(begin, end, [&](std::string_view k, std::string_view v) {
2633
0
        num_scanned++;
2634
0
        int ret = check_meta_tmp_rowset_key(k, v);
2635
0
        if (ret == 1) {
2636
0
            num_loss++;
2637
0
        }
2638
0
        return ret;
2639
0
    });
2640
0
    if (ret == -1) {
2641
0
        LOG(WARNING) << "failed to check tmp meta rowset key";
2642
0
        return -1;
2643
0
    } else if (ret == 1) {
2644
0
        LOG(WARNING) << "meta tmp rowset key may be loss, num_scanned=" << num_scanned
2645
0
                     << ", num_loss=" << num_loss;
2646
0
    }
2647
0
    LOG(INFO) << "meta tmp rowset key check finish, num_scanned=" << num_scanned
2648
0
              << ", num_loss=" << num_loss;
2649
2650
0
    return ret;
2651
0
}
2652
2653
0
StorageVaultAccessor* InstanceChecker::get_accessor(const std::string& id) {
2654
0
    auto it = accessor_map_.find(id);
2655
0
    if (it == accessor_map_.end()) {
2656
0
        return nullptr;
2657
0
    }
2658
0
    return it->second.get();
2659
0
}
2660
2661
0
void InstanceChecker::get_all_accessor(std::vector<StorageVaultAccessor*>* accessors) {
2662
0
    for (const auto& [_, accessor] : accessor_map_) {
2663
0
        accessors->push_back(accessor.get());
2664
0
    }
2665
0
}
2666
} // namespace doris::cloud