Coverage Report

Created: 2025-10-23 15:36

/root/doris/cloud/src/recycler/checker.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/checker.h"
19
20
#include <aws/s3/S3Client.h>
21
#include <aws/s3/model/ListObjectsV2Request.h>
22
#include <butil/endpoint.h>
23
#include <butil/strings/string_split.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/cloud.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <chrono>
31
#include <cstdint>
32
#include <functional>
33
#include <memory>
34
#include <mutex>
35
#include <numeric>
36
#include <sstream>
37
#include <string_view>
38
#include <unordered_set>
39
#include <vector>
40
41
#include "common/bvars.h"
42
#include "common/config.h"
43
#include "common/defer.h"
44
#include "common/encryption_util.h"
45
#include "common/logging.h"
46
#include "common/util.h"
47
#include "cpp/sync_point.h"
48
#include "meta-service/meta_service.h"
49
#include "meta-service/meta_service_schema.h"
50
#include "meta-service/meta_service_tablet_stats.h"
51
#include "meta-store/blob_message.h"
52
#include "meta-store/keys.h"
53
#include "meta-store/txn_kv.h"
54
#include "snapshot/snapshot_manager.h"
55
#ifdef ENABLE_HDFS_STORAGE_VAULT
56
#include "recycler/hdfs_accessor.h"
57
#endif
58
#include "recycler/s3_accessor.h"
59
#include "recycler/storage_vault_accessor.h"
60
#ifdef UNIT_TEST
61
#include "../test/mock_accessor.h"
62
#endif
63
#include "recycler/util.h"
64
65
namespace doris::cloud {
66
namespace config {
67
extern int32_t brpc_listen_port;
68
extern int32_t scan_instances_interval_seconds;
69
extern int32_t recycle_job_lease_expired_ms;
70
extern int32_t recycle_concurrency;
71
extern std::vector<std::string> recycle_whitelist;
72
extern std::vector<std::string> recycle_blacklist;
73
extern bool enable_inverted_check;
74
} // namespace config
75
76
using namespace std::chrono;
77
78
5
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
79
5
    ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port);
80
5
}
81
82
5
Checker::~Checker() {
83
5
    if (!stopped()) {
84
1
        stop();
85
1
    }
86
5
}
87
88
4
int Checker::start() {
89
4
    DCHECK(txn_kv_);
90
4
    instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
91
92
    // launch instance scanner
93
4
    auto scanner_func = [this]() {
94
4
        std::this_thread::sleep_for(
95
4
                std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
96
8
        while (!stopped()) {
97
4
            std::vector<InstanceInfoPB> instances;
98
4
            get_all_instances(txn_kv_.get(), instances);
99
4
            LOG(INFO) << "Checker get instances: " << [&instances] {
100
4
                std::stringstream ss;
101
30
                for (auto& i : instances) ss << ' ' << i.instance_id();
102
4
                return ss.str();
103
4
            }();
104
4
            if (!instances.empty()) {
105
                // enqueue instances
106
3
                std::lock_guard lock(mtx_);
107
30
                for (auto& instance : instances) {
108
30
                    if (instance_filter_.filter_out(instance.instance_id())) continue;
109
30
                    if (instance.status() == InstanceInfoPB::DELETED) continue;
110
30
                    using namespace std::chrono;
111
30
                    auto enqueue_time_s =
112
30
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
113
30
                    auto [_, success] =
114
30
                            pending_instance_map_.insert({instance.instance_id(), enqueue_time_s});
115
                    // skip instance already in pending queue
116
30
                    if (success) {
117
30
                        pending_instance_queue_.push_back(std::move(instance));
118
30
                    }
119
30
                }
120
3
                pending_instance_cond_.notify_all();
121
3
            }
122
4
            {
123
4
                std::unique_lock lock(mtx_);
124
4
                notifier_.wait_for(lock,
125
4
                                   std::chrono::seconds(config::scan_instances_interval_seconds),
126
7
                                   [&]() { return stopped(); });
127
4
            }
128
4
        }
129
4
    };
130
4
    workers_.emplace_back(scanner_func);
131
    // Launch lease thread
132
4
    workers_.emplace_back([this] { lease_check_jobs(); });
133
    // Launch inspect thread
134
4
    workers_.emplace_back([this] { inspect_instance_check_interval(); });
135
136
    // launch check workers
137
8
    auto checker_func = [this]() {
138
38
        while (!stopped()) {
139
            // fetch instance to check
140
36
            InstanceInfoPB instance;
141
36
            long enqueue_time_s = 0;
142
36
            {
143
36
                std::unique_lock lock(mtx_);
144
48
                pending_instance_cond_.wait(lock, [&]() -> bool {
145
48
                    return !pending_instance_queue_.empty() || stopped();
146
48
                });
147
36
                if (stopped()) {
148
6
                    return;
149
6
                }
150
30
                instance = std::move(pending_instance_queue_.front());
151
30
                pending_instance_queue_.pop_front();
152
30
                enqueue_time_s = pending_instance_map_[instance.instance_id()];
153
30
                pending_instance_map_.erase(instance.instance_id());
154
30
            }
155
0
            const auto& instance_id = instance.instance_id();
156
30
            {
157
30
                std::lock_guard lock(mtx_);
158
                // skip instance in recycling
159
30
                if (working_instance_map_.count(instance_id)) continue;
160
30
            }
161
30
            auto checker = std::make_shared<InstanceChecker>(txn_kv_, instance.instance_id());
162
30
            if (checker->init(instance) != 0) {
163
0
                LOG(WARNING) << "failed to init instance checker, instance_id="
164
0
                             << instance.instance_id();
165
0
                continue;
166
0
            }
167
30
            std::string check_job_key;
168
30
            job_check_key({instance.instance_id()}, &check_job_key);
169
30
            int ret = prepare_instance_recycle_job(txn_kv_.get(), check_job_key,
170
30
                                                   instance.instance_id(), ip_port_,
171
30
                                                   config::check_object_interval_seconds * 1000);
172
30
            if (ret != 0) { // Prepare failed
173
20
                continue;
174
20
            } else {
175
10
                std::lock_guard lock(mtx_);
176
10
                working_instance_map_.emplace(instance_id, checker);
177
10
            }
178
10
            if (stopped()) return;
179
10
            using namespace std::chrono;
180
10
            auto ctime_ms =
181
10
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
182
10
            g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
183
184
10
            bool success {true};
185
186
10
            if (int ret = checker->do_check(); ret != 0) {
187
0
                success = false;
188
0
            }
189
190
10
            if (config::enable_inverted_check) {
191
0
                if (int ret = checker->do_inverted_check(); ret != 0) {
192
0
                    success = false;
193
0
                }
194
0
            }
195
196
10
            if (config::enable_delete_bitmap_inverted_check) {
197
0
                if (int ret = checker->do_delete_bitmap_inverted_check(); ret != 0) {
198
0
                    success = false;
199
0
                }
200
0
            }
201
202
10
            if (config::enable_mow_job_key_check) {
203
0
                if (int ret = checker->do_mow_job_key_check(); ret != 0) {
204
0
                    success = false;
205
0
                }
206
0
            }
207
208
10
            if (config::enable_tablet_stats_key_check) {
209
0
                if (int ret = checker->do_tablet_stats_key_check(); ret != 0) {
210
0
                    success = false;
211
0
                }
212
0
            }
213
214
10
            if (config::enable_restore_job_check) {
215
0
                if (int ret = checker->do_restore_job_check(); ret != 0) {
216
0
                    success = false;
217
0
                }
218
0
            }
219
220
10
            if (config::enable_txn_key_check) {
221
0
                if (int ret = checker->do_txn_key_check(); ret != 0) {
222
0
                    success = false;
223
0
                }
224
0
            }
225
226
10
            if (config::enable_meta_rowset_key_check) {
227
0
                if (int ret = checker->do_meta_rowset_key_check(); ret != 0) {
228
0
                    success = false;
229
0
                }
230
0
            }
231
232
10
            if (config::enable_delete_bitmap_storage_optimize_v2_check) {
233
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
234
0
                    ret != 0) {
235
0
                    success = false;
236
0
                }
237
0
            }
238
239
10
            if (config::enable_version_key_check) {
240
0
                if (int ret = checker->do_version_key_check(); ret != 0) {
241
0
                    success = false;
242
0
                }
243
0
            }
244
245
10
            if (config::enable_snapshot_check) {
246
0
                if (int ret = checker->do_snapshots_check(); ret != 0) {
247
0
                    success = false;
248
0
                }
249
0
            }
250
251
10
            if (config::enable_mvcc_meta_key_check) {
252
0
                if (int ret = checker->do_mvcc_meta_key_check(); ret != 0) {
253
0
                    success = false;
254
0
                }
255
0
            }
256
257
            // If instance checker has been aborted, don't finish this job
258
10
            if (!checker->stopped()) {
259
10
                finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
260
10
                                            ip_port_, success, ctime_ms);
261
10
            }
262
10
            {
263
10
                std::lock_guard lock(mtx_);
264
10
                working_instance_map_.erase(instance.instance_id());
265
10
            }
266
10
        }
267
8
    };
268
4
    int num_threads = config::recycle_concurrency; // FIXME: use a new config entry?
269
12
    for (int i = 0; i < num_threads; ++i) {
270
8
        workers_.emplace_back(checker_func);
271
8
    }
272
4
    return 0;
273
4
}
274
275
5
void Checker::stop() {
276
5
    stopped_ = true;
277
5
    notifier_.notify_all();
278
5
    pending_instance_cond_.notify_all();
279
5
    {
280
5
        std::lock_guard lock(mtx_);
281
5
        for (auto& [_, checker] : working_instance_map_) {
282
0
            checker->stop();
283
0
        }
284
5
    }
285
20
    for (auto& w : workers_) {
286
20
        if (w.joinable()) w.join();
287
20
    }
288
5
}
289
290
4
void Checker::lease_check_jobs() {
291
55
    while (!stopped()) {
292
51
        std::vector<std::string> instances;
293
51
        instances.reserve(working_instance_map_.size());
294
51
        {
295
51
            std::lock_guard lock(mtx_);
296
51
            for (auto& [id, _] : working_instance_map_) {
297
30
                instances.push_back(id);
298
30
            }
299
51
        }
300
51
        for (auto& i : instances) {
301
30
            std::string check_job_key;
302
30
            job_check_key({i}, &check_job_key);
303
30
            int ret = lease_instance_recycle_job(txn_kv_.get(), check_job_key, i, ip_port_);
304
30
            if (ret == 1) {
305
0
                std::lock_guard lock(mtx_);
306
0
                if (auto it = working_instance_map_.find(i); it != working_instance_map_.end()) {
307
0
                    it->second->stop();
308
0
                }
309
0
            }
310
30
        }
311
51
        {
312
51
            std::unique_lock lock(mtx_);
313
51
            notifier_.wait_for(lock,
314
51
                               std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
315
101
                               [&]() { return stopped(); });
316
51
        }
317
51
    }
318
4
}
319
0
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
320
34
void Checker::do_inspect(const InstanceInfoPB& instance) {
321
34
    std::string check_job_key = job_check_key({instance.instance_id()});
322
34
    std::unique_ptr<Transaction> txn;
323
34
    std::string val;
324
34
    TxnErrorCode err = txn_kv_->create_txn(&txn);
325
34
    if (err != TxnErrorCode::TXN_OK) {
326
0
        LOG_CHECK_INTERVAL_ALARM << "failed to create txn";
327
0
        return;
328
0
    }
329
34
    err = txn->get(check_job_key, &val);
330
34
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
331
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get kv, err=" << err
332
0
                                 << " key=" << hex(check_job_key);
333
0
        return;
334
0
    }
335
34
    auto checker = InstanceChecker(txn_kv_, instance.instance_id());
336
34
    if (checker.init(instance) != 0) {
337
0
        LOG_CHECK_INTERVAL_ALARM << "failed to init instance checker, instance_id="
338
0
                                 << instance.instance_id();
339
0
        return;
340
0
    }
341
342
34
    int64_t bucket_lifecycle_days = 0;
343
34
    if (checker.get_bucket_lifecycle(&bucket_lifecycle_days) != 0) {
344
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get bucket lifecycle, instance_id="
345
0
                                 << instance.instance_id();
346
0
        return;
347
0
    }
348
34
    DCHECK(bucket_lifecycle_days > 0);
349
350
34
    if (bucket_lifecycle_days == INT64_MAX) {
351
        // No s3 bucket (may all accessors are HdfsAccessor), skip inspect
352
34
        return;
353
34
    }
354
355
0
    int64_t last_ctime_ms = -1;
356
0
    auto job_status = JobRecyclePB::IDLE;
357
0
    auto has_last_ctime = [&]() {
358
0
        JobRecyclePB job_info;
359
0
        if (!job_info.ParseFromString(val)) {
360
0
            LOG_CHECK_INTERVAL_ALARM << "failed to parse JobRecyclePB, key=" << hex(check_job_key);
361
0
        }
362
0
        DCHECK(job_info.instance_id() == instance.instance_id());
363
0
        if (!job_info.has_last_ctime_ms()) return false;
364
0
        last_ctime_ms = job_info.last_ctime_ms();
365
0
        job_status = job_info.status();
366
0
        g_bvar_checker_last_success_time_ms.put(instance.instance_id(),
367
0
                                                job_info.last_success_time_ms());
368
0
        return true;
369
0
    };
370
0
    using namespace std::chrono;
371
0
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
372
0
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || !has_last_ctime()) {
373
        // Use instance's ctime for instances that do not have job's last ctime
374
0
        last_ctime_ms = instance.ctime();
375
0
    }
376
0
    DCHECK(now - last_ctime_ms >= 0);
377
0
    int64_t expiration_ms =
378
0
            bucket_lifecycle_days > config::reserved_buffer_days
379
0
                    ? (bucket_lifecycle_days - config::reserved_buffer_days) * 86400000
380
0
                    : bucket_lifecycle_days * 86400000;
381
0
    TEST_SYNC_POINT_CALLBACK("Checker:do_inspect", &last_ctime_ms);
382
0
    if (now - last_ctime_ms >= expiration_ms) {
383
0
        LOG_CHECK_INTERVAL_ALARM << "check risks, instance_id: " << instance.instance_id()
384
0
                                 << " last_ctime_ms: " << last_ctime_ms
385
0
                                 << " job_status: " << job_status
386
0
                                 << " bucket_lifecycle_days: " << bucket_lifecycle_days
387
0
                                 << " reserved_buffer_days: " << config::reserved_buffer_days
388
0
                                 << " expiration_ms: " << expiration_ms;
389
0
    }
390
0
}
391
#undef LOG_CHECK_INTERVAL_ALARM
392
4
void Checker::inspect_instance_check_interval() {
393
7
    while (!stopped()) {
394
3
        LOG(INFO) << "start to inspect instance check interval";
395
3
        std::vector<InstanceInfoPB> instances;
396
3
        get_all_instances(txn_kv_.get(), instances);
397
30
        for (const auto& instance : instances) {
398
30
            if (instance_filter_.filter_out(instance.instance_id())) continue;
399
30
            if (stopped()) return;
400
30
            if (instance.status() == InstanceInfoPB::DELETED) continue;
401
30
            do_inspect(instance);
402
30
        }
403
3
        {
404
3
            std::unique_lock lock(mtx_);
405
3
            notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
406
6
                               [&]() { return stopped(); });
407
3
        }
408
3
    }
409
4
}
410
411
// return 0 for success get a key, 1 for key not found, negative for error
412
24
int key_exist(TxnKv* txn_kv, std::string_view key) {
413
24
    std::unique_ptr<Transaction> txn;
414
24
    TxnErrorCode err = txn_kv->create_txn(&txn);
415
24
    if (err != TxnErrorCode::TXN_OK) {
416
0
        LOG(WARNING) << "failed to init txn, err=" << err;
417
0
        return -1;
418
0
    }
419
24
    std::string val;
420
24
    switch (txn->get(key, &val)) {
421
21
    case TxnErrorCode::TXN_OK:
422
21
        return 0;
423
3
    case TxnErrorCode::TXN_KEY_NOT_FOUND:
424
3
        return 1;
425
0
    default:
426
0
        return -1;
427
24
    }
428
24
}
429
430
InstanceChecker::InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id)
431
97
        : txn_kv_(txn_kv), instance_id_(instance_id) {
432
97
    snapshot_manager_ = std::make_shared<SnapshotManager>(std::move(txn_kv));
433
97
}
434
435
97
int InstanceChecker::init(const InstanceInfoPB& instance) {
436
97
    int ret = init_obj_store_accessors(instance);
437
97
    if (ret != 0) {
438
0
        return ret;
439
0
    }
440
441
97
    return init_storage_vault_accessors(instance);
442
97
}
443
444
97
int InstanceChecker::init_obj_store_accessors(const InstanceInfoPB& instance) {
445
97
    for (const auto& obj_info : instance.obj_info()) {
446
97
#ifdef UNIT_TEST
447
97
        auto accessor = std::make_shared<MockAccessor>();
448
#else
449
        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
450
        if (!s3_conf) {
451
            LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
452
            return -1;
453
        }
454
455
        std::shared_ptr<S3Accessor> accessor;
456
        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
457
        if (ret != 0) {
458
            LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
459
                         << " resource_id=" << obj_info.id();
460
            return ret;
461
        }
462
#endif
463
464
97
        accessor_map_.emplace(obj_info.id(), std::move(accessor));
465
97
    }
466
467
97
    return 0;
468
97
}
469
470
97
int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance) {
471
97
    if (instance.resource_ids().empty()) {
472
97
        return 0;
473
97
    }
474
475
0
    FullRangeGetOptions opts(txn_kv_);
476
0
    opts.prefetch = true;
477
0
    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
478
0
                                      storage_vault_key({instance_id_, "\xff"}), std::move(opts));
479
480
0
    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
481
0
        auto [k, v] = *kv;
482
0
        StorageVaultPB vault;
483
0
        if (!vault.ParseFromArray(v.data(), v.size())) {
484
0
            LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
485
0
            return -1;
486
0
        }
487
0
        TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
488
0
                                 &accessor_map_, &vault);
489
0
        if (vault.has_hdfs_info()) {
490
0
#ifdef ENABLE_HDFS_STORAGE_VAULT
491
0
            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
492
0
            int ret = accessor->init();
493
0
            if (ret != 0) {
494
0
                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
495
0
                             << " resource_id=" << vault.id() << " name=" << vault.name();
496
0
                return ret;
497
0
            }
498
499
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
500
#else
501
            LOG(ERROR) << "HDFS is disabled (via the ENABLE_HDFS_STORAGE_VAULT build option), "
502
                       << "but HDFS storage vaults were detected";
503
#endif
504
0
        } else if (vault.has_obj_info()) {
505
0
#ifdef UNIT_TEST
506
0
            auto accessor = std::make_shared<MockAccessor>();
507
#else
508
            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
509
            if (!s3_conf) {
510
                LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
511
                return -1;
512
            }
513
514
            std::shared_ptr<S3Accessor> accessor;
515
            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
516
            if (ret != 0) {
517
                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
518
                             << " resource_id=" << vault.id() << " name=" << vault.name();
519
                return ret;
520
            }
521
#endif
522
523
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
524
0
        }
525
0
    }
526
527
0
    if (!it->is_valid()) {
528
0
        LOG_WARNING("failed to get storage vault kv");
529
0
        return -1;
530
0
    }
531
0
    return 0;
532
0
}
533
534
16
int InstanceChecker::do_check() {
535
16
    TEST_SYNC_POINT("InstanceChecker.do_check");
536
16
    LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
537
16
    int check_ret = 0;
538
16
    long num_scanned = 0;
539
16
    long num_scanned_with_segment = 0;
540
16
    long num_rowset_loss = 0;
541
16
    long instance_volume = 0;
542
16
    using namespace std::chrono;
543
16
    auto start_time = steady_clock::now();
544
16
    DORIS_CLOUD_DEFER {
545
16
        auto cost = duration<float>(steady_clock::now() - start_time).count();
546
16
        LOG(INFO) << "check instance objects finished, cost=" << cost
547
16
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
548
16
                  << " num_scanned_with_segment=" << num_scanned_with_segment
549
16
                  << " num_rowset_loss=" << num_rowset_loss
550
16
                  << " instance_volume=" << instance_volume;
551
16
        g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
552
16
        g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
553
16
        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
554
16
        g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
555
        // FIXME(plat1ko): What if some list operation failed?
556
16
        g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
557
16
    };
558
559
16
    struct TabletFiles {
560
16
        int64_t tablet_id {0};
561
16
        std::unordered_set<std::string> files;
562
16
    };
563
16
    TabletFiles tablet_files_cache;
564
565
4.05k
    auto check_rowset_objects = [&, this](doris::RowsetMetaCloudPB& rs_meta, std::string_view key) {
566
4.05k
        if (rs_meta.num_segments() == 0) {
567
0
            return;
568
0
        }
569
570
4.05k
        bool data_loss = false;
571
4.05k
        bool segment_file_loss = false;
572
4.05k
        bool index_file_loss = false;
573
574
4.05k
        DORIS_CLOUD_DEFER {
575
4.05k
            if (data_loss) {
576
33
                LOG(INFO) << "segment file is" << (segment_file_loss ? "" : " not") << " loss, "
577
33
                          << "index file is" << (index_file_loss ? "" : " not") << " loss, "
578
33
                          << "rowset.tablet_id = " << rs_meta.tablet_id();
579
33
                num_rowset_loss++;
580
33
            }
581
4.05k
        };
582
583
4.05k
        ++num_scanned_with_segment;
584
4.05k
        if (tablet_files_cache.tablet_id != rs_meta.tablet_id()) {
585
454
            long tablet_volume = 0;
586
            // Clear cache
587
454
            tablet_files_cache.tablet_id = 0;
588
454
            tablet_files_cache.files.clear();
589
            // Get all file paths under this tablet directory
590
454
            auto find_it = accessor_map_.find(rs_meta.resource_id());
591
454
            if (find_it == accessor_map_.end()) {
592
0
                LOG_WARNING("resource id not found in accessor map")
593
0
                        .tag("resource_id", rs_meta.resource_id())
594
0
                        .tag("tablet_id", rs_meta.tablet_id())
595
0
                        .tag("rowset_id", rs_meta.rowset_id_v2());
596
0
                check_ret = -1;
597
0
                return;
598
0
            }
599
600
454
            std::unique_ptr<ListIterator> list_iter;
601
454
            int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
602
454
                                                      &list_iter);
603
454
            if (ret != 0) { // No need to log, because S3Accessor has logged this error
604
0
                check_ret = -1;
605
0
                return;
606
0
            }
607
608
18.5k
            for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
609
18.0k
                tablet_files_cache.files.insert(std::move(file->path));
610
18.0k
                tablet_volume += file->size;
611
18.0k
            }
612
454
            tablet_files_cache.tablet_id = rs_meta.tablet_id();
613
454
            instance_volume += tablet_volume;
614
454
        }
615
616
16.1k
        for (int i = 0; i < rs_meta.num_segments(); ++i) {
617
12.0k
            auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
618
619
12.0k
            if (tablet_files_cache.files.contains(path)) {
620
12.0k
                continue;
621
12.0k
            }
622
623
12
            if (1 == key_exist(txn_kv_.get(), key)) {
624
                // Rowset has been deleted instead of data loss
625
0
                break;
626
0
            }
627
12
            data_loss = true;
628
12
            segment_file_loss = true;
629
12
            TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
630
12
            LOG(WARNING) << "object not exist, path=" << path
631
12
                         << ", rs_meta=" << rs_meta.ShortDebugString() << " key=" << hex(key);
632
12
        }
633
634
4.05k
        std::unique_ptr<Transaction> txn;
635
4.05k
        TxnErrorCode err = txn_kv_->create_txn(&txn);
636
4.05k
        if (err != TxnErrorCode::TXN_OK) {
637
0
            LOG(WARNING) << "failed to init txn, err=" << err;
638
0
            check_ret = -1;
639
0
            return;
640
0
        }
641
642
4.05k
        TabletIndexPB tablet_index;
643
4.05k
        if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) == -1) {
644
0
            LOG(WARNING) << "failed to get tablet index, tablet_id= " << rs_meta.tablet_id();
645
0
            check_ret = -1;
646
0
            return;
647
0
        }
648
649
4.05k
        auto tablet_schema_key =
650
4.05k
                meta_schema_key({instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
651
4.05k
        ValueBuf tablet_schema_val;
652
4.05k
        err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
653
654
4.05k
        if (err != TxnErrorCode::TXN_OK) {
655
2.00k
            check_ret = -1;
656
2.00k
            LOG(WARNING) << "failed to get schema, err=" << err;
657
2.00k
            return;
658
2.00k
        }
659
660
2.05k
        auto* schema = rs_meta.mutable_tablet_schema();
661
2.05k
        if (!parse_schema_value(tablet_schema_val, schema)) {
662
0
            LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
663
0
            return;
664
0
        }
665
666
2.05k
        std::vector<std::pair<int64_t, std::string>> index_ids;
667
2.05k
        for (const auto& i : rs_meta.tablet_schema().index()) {
668
2.05k
            if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
669
2.05k
                index_ids.emplace_back(i.index_id(), i.index_suffix_name());
670
2.05k
            }
671
2.05k
        }
672
2.05k
        if (!index_ids.empty()) {
673
8.10k
            for (int i = 0; i < rs_meta.num_segments(); ++i) {
674
6.05k
                std::vector<std::string> index_path_v;
675
6.05k
                if (rs_meta.tablet_schema().inverted_index_storage_format() ==
676
6.05k
                    InvertedIndexStorageFormatPB::V1) {
677
6.01k
                    for (const auto& index_id : index_ids) {
678
6.01k
                        LOG(INFO) << "check inverted index, tablet_id=" << rs_meta.tablet_id()
679
6.01k
                                  << " rowset_id=" << rs_meta.rowset_id_v2() << " segment_id=" << i
680
6.01k
                                  << " index_id=" << index_id.first
681
6.01k
                                  << " index_suffix_name=" << index_id.second;
682
6.01k
                        index_path_v.emplace_back(
683
6.01k
                                inverted_index_path_v1(rs_meta.tablet_id(), rs_meta.rowset_id_v2(),
684
6.01k
                                                       i, index_id.first, index_id.second));
685
6.01k
                    }
686
6.01k
                } else {
687
40
                    index_path_v.emplace_back(
688
40
                            inverted_index_path_v2(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i));
689
40
                }
690
691
6.05k
                if (std::ranges::all_of(index_path_v, [&](const auto& idx_file_path) {
692
6.05k
                        if (!tablet_files_cache.files.contains(idx_file_path)) {
693
23
                            LOG(INFO) << "loss index file: " << idx_file_path;
694
23
                            return false;
695
23
                        }
696
6.03k
                        return true;
697
6.05k
                    })) {
698
6.03k
                    continue;
699
6.03k
                }
700
23
                index_file_loss = true;
701
23
                data_loss = true;
702
23
            }
703
2.05k
        }
704
2.05k
    };
705
706
    // scan visible rowsets
707
16
    auto start_key = meta_rowset_key({instance_id_, 0, 0});
708
16
    auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
709
710
16
    std::unique_ptr<RangeGetIterator> it;
711
16
    do {
712
16
        std::unique_ptr<Transaction> txn;
713
16
        TxnErrorCode err = txn_kv_->create_txn(&txn);
714
16
        if (err != TxnErrorCode::TXN_OK) {
715
0
            LOG(WARNING) << "failed to init txn, err=" << err;
716
0
            return -1;
717
0
        }
718
719
16
        err = txn->get(start_key, end_key, &it);
720
16
        if (err != TxnErrorCode::TXN_OK) {
721
0
            LOG(WARNING) << "internal error, failed to get rowset meta, err=" << err;
722
0
            return -1;
723
0
        }
724
16
        num_scanned += it->size();
725
726
4.07k
        while (it->has_next() && !stopped()) {
727
4.05k
            auto [k, v] = it->next();
728
4.05k
            if (!it->has_next()) {
729
6
                start_key = k;
730
6
            }
731
732
4.05k
            doris::RowsetMetaCloudPB rs_meta;
733
4.05k
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
734
0
                ++num_rowset_loss;
735
0
                LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
736
0
                continue;
737
0
            }
738
4.05k
            check_rowset_objects(rs_meta, k);
739
4.05k
        }
740
16
        start_key.push_back('\x00'); // Update to next smallest key for iteration
741
16
    } while (it->more() && !stopped());
742
743
16
    return num_rowset_loss > 0 ? 1 : check_ret;
744
16
}
745
746
34
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
747
    // If there are multiple buckets, return the minimum lifecycle.
748
34
    int64_t min_lifecycle_days = INT64_MAX;
749
34
    int64_t tmp_liefcycle_days = 0;
750
34
    for (const auto& [id, accessor] : accessor_map_) {
751
34
        if (accessor->type() != AccessorType::S3) {
752
34
            continue;
753
34
        }
754
755
0
        auto* s3_accessor = static_cast<S3Accessor*>(accessor.get());
756
757
0
        if (s3_accessor->check_versioning() != 0) {
758
0
            return -1;
759
0
        }
760
761
0
        if (s3_accessor->get_life_cycle(&tmp_liefcycle_days) != 0) {
762
0
            return -1;
763
0
        }
764
765
0
        if (tmp_liefcycle_days < min_lifecycle_days) {
766
0
            min_lifecycle_days = tmp_liefcycle_days;
767
0
        }
768
0
    }
769
34
    *lifecycle_days = min_lifecycle_days;
770
34
    return 0;
771
34
}
772
773
5
int InstanceChecker::do_inverted_check() {
774
5
    if (accessor_map_.size() > 1) {
775
0
        LOG(INFO) << "currently not support inverted check for multi accessor. instance_id="
776
0
                  << instance_id_;
777
0
        return 0;
778
0
    }
779
780
5
    LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
781
5
    int check_ret = 0;
782
5
    long num_scanned = 0;
783
5
    long num_file_leak = 0;
784
5
    using namespace std::chrono;
785
5
    auto start_time = steady_clock::now();
786
5
    DORIS_CLOUD_DEFER {
787
5
        g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
788
5
        g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
789
5
        auto cost = duration<float>(steady_clock::now() - start_time).count();
790
5
        LOG(INFO) << "inverted check instance objects finished, cost=" << cost
791
5
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
792
5
                  << " num_file_leak=" << num_file_leak;
793
5
    };
794
795
5
    struct TabletRowsets {
796
5
        int64_t tablet_id {0};
797
5
        std::unordered_set<std::string> rowset_ids;
798
5
    };
799
5
    TabletRowsets tablet_rowsets_cache;
800
801
5
    RowsetIndexesFormatV1 rowset_index_cache_v1;
802
5
    RowsetIndexesFormatV2 rowset_index_cache_v2;
803
804
    // Return 0 if check success, return 1 if file is garbage data, negative if error occurred
805
108
    auto check_segment_file = [&](const std::string& obj_key) {
806
108
        std::vector<std::string> str;
807
108
        butil::SplitString(obj_key, '/', &str);
808
        // data/{tablet_id}/{rowset_id}_{seg_num}.dat
809
108
        if (str.size() < 3) {
810
            // clang-format off
811
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
812
0
                         << " value = " << str.size();
813
            // clang-format on
814
0
            return -1;
815
0
        }
816
817
108
        int64_t tablet_id = atol(str[1].c_str());
818
108
        if (tablet_id <= 0) {
819
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
820
0
            return -1;
821
0
        }
822
823
108
        if (!str[2].ends_with(".dat")) {
824
            // skip check not segment file
825
54
            return 0;
826
54
        }
827
828
54
        std::string rowset_id;
829
54
        if (auto pos = str.back().find('_'); pos != std::string::npos) {
830
54
            rowset_id = str.back().substr(0, pos);
831
54
        } else {
832
0
            LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
833
0
            return -1;
834
0
        }
835
836
54
        if (tablet_rowsets_cache.tablet_id == tablet_id) {
837
7
            if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
838
2
                return 0;
839
5
            } else {
840
5
                LOG(WARNING) << "rowset not exists, key=" << obj_key;
841
5
                return -1;
842
5
            }
843
7
        }
844
        // Get all rowset id of this tablet
845
47
        tablet_rowsets_cache.tablet_id = tablet_id;
846
47
        tablet_rowsets_cache.rowset_ids.clear();
847
47
        std::unique_ptr<Transaction> txn;
848
47
        TxnErrorCode err = txn_kv_->create_txn(&txn);
849
47
        if (err != TxnErrorCode::TXN_OK) {
850
0
            LOG(WARNING) << "failed to create txn";
851
0
            return -1;
852
0
        }
853
47
        std::unique_ptr<RangeGetIterator> it;
854
47
        auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
855
47
        auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
856
47
        do {
857
47
            TxnErrorCode err = txn->get(begin, end, &it);
858
47
            if (err != TxnErrorCode::TXN_OK) {
859
0
                LOG(WARNING) << "failed to get rowset kv, err=" << err;
860
0
                return -1;
861
0
            }
862
47
            if (!it->has_next()) {
863
10
                break;
864
10
            }
865
37
            while (it->has_next()) {
866
                // recycle corresponding resources
867
37
                auto [k, v] = it->next();
868
37
                doris::RowsetMetaCloudPB rowset;
869
37
                if (!rowset.ParseFromArray(v.data(), v.size())) {
870
0
                    LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
871
0
                    return -1;
872
0
                }
873
37
                tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
874
37
                if (!it->has_next()) {
875
37
                    begin = k;
876
37
                    begin.push_back('\x00'); // Update to next smallest key for iteration
877
37
                    break;
878
37
                }
879
37
            }
880
37
        } while (it->more() && !stopped());
881
882
47
        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
883
            // Garbage data leak
884
12
            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
885
12
            return 1;
886
12
        }
887
888
35
        return 0;
889
47
    };
890
891
108
    auto check_inverted_index_file = [&](const std::string& obj_key) {
892
108
        std::vector<std::string> str;
893
108
        butil::SplitString(obj_key, '/', &str);
894
        // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
895
        // format v2: data/{tablet_id}/{rowset_id}_{seg_num}.idx
896
108
        if (str.size() < 3) {
897
            // clang-format off
898
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
899
0
                         << " value = " << str.size();
900
            // clang-format on
901
0
            return -1;
902
0
        }
903
904
108
        int64_t tablet_id = atol(str[1].c_str());
905
108
        if (tablet_id <= 0) {
906
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
907
0
            return -1;
908
0
        }
909
910
        // v1: {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
911
        // v2: {rowset_id}_{seg_num}.idx
912
108
        std::string rowset_info = str.back();
913
914
108
        if (!rowset_info.ends_with(".idx")) {
915
54
            return 0; // Not an index file
916
54
        }
917
918
54
        InvertedIndexStorageFormatPB inverted_index_storage_format =
919
54
                std::count(rowset_info.begin(), rowset_info.end(), '_') > 1
920
54
                        ? InvertedIndexStorageFormatPB::V1
921
54
                        : InvertedIndexStorageFormatPB::V2;
922
923
54
        size_t pos = rowset_info.find_last_of('_');
924
54
        if (pos == std::string::npos || pos + 1 >= str.back().size() - 4) {
925
0
            LOG(WARNING) << "Invalid index_id format, key=" << obj_key;
926
0
            return -1;
927
0
        }
928
54
        if (inverted_index_storage_format == InvertedIndexStorageFormatPB::V1) {
929
14
            return check_inverted_index_file_storage_format_v1(tablet_id, obj_key, rowset_info,
930
14
                                                               rowset_index_cache_v1);
931
40
        } else {
932
40
            return check_inverted_index_file_storage_format_v2(tablet_id, obj_key, rowset_info,
933
40
                                                               rowset_index_cache_v2);
934
40
        }
935
54
    };
936
    // so we choose to skip here.
937
5
    TEST_SYNC_POINT_RETURN_WITH_VALUE("InstanceChecker::do_inverted_check", (int)0);
938
939
3
    for (auto& [_, accessor] : accessor_map_) {
940
3
        std::unique_ptr<ListIterator> list_iter;
941
3
        int ret = accessor->list_directory("data", &list_iter);
942
3
        if (ret != 0) {
943
0
            return -1;
944
0
        }
945
946
111
        for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
947
108
            ++num_scanned;
948
108
            int ret = check_segment_file(file->path);
949
108
            if (ret != 0) {
950
17
                LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
951
17
                             << " path=" << file->path;
952
17
                if (ret == 1) {
953
12
                    ++num_file_leak;
954
12
                } else {
955
5
                    check_ret = -1;
956
5
                }
957
17
            }
958
108
            ret = check_inverted_index_file(file->path);
959
108
            if (ret != 0) {
960
13
                LOG(WARNING) << "failed to check index file, uri=" << accessor->uri()
961
13
                             << " path=" << file->path;
962
13
                if (ret == 1) {
963
13
                    ++num_file_leak;
964
13
                } else {
965
0
                    check_ret = -1;
966
0
                }
967
13
            }
968
108
        }
969
970
3
        if (!list_iter->is_valid()) {
971
0
            LOG(WARNING) << "failed to list data directory. uri=" << accessor->uri();
972
0
            return -1;
973
0
        }
974
3
    }
975
3
    return num_file_leak > 0 ? 1 : check_ret;
976
3
}
977
978
3
int InstanceChecker::traverse_mow_tablet(const std::function<int(int64_t, bool)>& check_func) {
979
3
    std::unique_ptr<RangeGetIterator> it;
980
3
    auto begin = meta_rowset_key({instance_id_, 0, 0});
981
3
    auto end = meta_rowset_key({instance_id_, std::numeric_limits<int64_t>::max(), 0});
982
43
    do {
983
43
        std::unique_ptr<Transaction> txn;
984
43
        TxnErrorCode err = txn_kv_->create_txn(&txn);
985
43
        if (err != TxnErrorCode::TXN_OK) {
986
0
            LOG(WARNING) << "failed to create txn";
987
0
            return -1;
988
0
        }
989
43
        err = txn->get(begin, end, &it, false, 1);
990
43
        if (err != TxnErrorCode::TXN_OK) {
991
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
992
0
            return -1;
993
0
        }
994
43
        if (!it->has_next()) {
995
3
            break;
996
3
        }
997
80
        while (it->has_next() && !stopped()) {
998
40
            auto [k, v] = it->next();
999
40
            std::string_view k1 = k;
1000
40
            k1.remove_prefix(1);
1001
40
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1002
40
            decode_key(&k1, &out);
1003
            // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
1004
40
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
1005
1006
40
            if (!it->has_next()) {
1007
                // Update to next smallest key for iteration
1008
                // scan for next tablet in this instance
1009
40
                begin = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1010
40
            }
1011
1012
40
            TabletMetaCloudPB tablet_meta;
1013
40
            int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1014
40
            if (ret < 0) {
1015
0
                LOG(WARNING) << fmt::format(
1016
0
                        "failed to get_tablet_meta in do_delete_bitmap_integrity_check(), "
1017
0
                        "instance_id={}, tablet_id={}",
1018
0
                        instance_id_, tablet_id);
1019
0
                return ret;
1020
0
            }
1021
1022
40
            if (tablet_meta.enable_unique_key_merge_on_write()) {
1023
                // only check merge-on-write table
1024
30
                bool has_sequence_col = tablet_meta.schema().has_sequence_col_idx() &&
1025
30
                                        tablet_meta.schema().sequence_col_idx() != -1;
1026
30
                int ret = check_func(tablet_id, has_sequence_col);
1027
30
                if (ret < 0) {
1028
                    // return immediately when encounter unexpected error,
1029
                    // otherwise, we continue to check the next tablet
1030
0
                    return ret;
1031
0
                }
1032
30
            }
1033
40
        }
1034
40
    } while (it->more() && !stopped());
1035
3
    return 0;
1036
3
}
1037
1038
int InstanceChecker::traverse_rowset_delete_bitmaps(
1039
        int64_t tablet_id, std::string rowset_id,
1040
0
        const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback) {
1041
0
    std::unique_ptr<RangeGetIterator> it;
1042
0
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, 0, 0});
1043
0
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id,
1044
0
                                       std::numeric_limits<int64_t>::max(),
1045
0
                                       std::numeric_limits<int64_t>::max()});
1046
0
    do {
1047
0
        std::unique_ptr<Transaction> txn;
1048
0
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1049
0
        if (err != TxnErrorCode::TXN_OK) {
1050
0
            LOG(WARNING) << "failed to create txn";
1051
0
            return -1;
1052
0
        }
1053
0
        err = txn->get(begin, end, &it);
1054
0
        if (err != TxnErrorCode::TXN_OK) {
1055
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1056
0
            return -1;
1057
0
        }
1058
0
        if (!it->has_next()) {
1059
0
            break;
1060
0
        }
1061
0
        while (it->has_next() && !stopped()) {
1062
0
            auto [k, v] = it->next();
1063
0
            std::string_view k1 = k;
1064
0
            k1.remove_prefix(1);
1065
0
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1066
0
            decode_key(&k1, &out);
1067
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1068
0
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1069
0
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1070
1071
0
            int ret = callback(tablet_id, rowset_id, version, segment_id);
1072
0
            if (ret != 0) {
1073
0
                return ret;
1074
0
            }
1075
1076
0
            if (!it->has_next()) {
1077
0
                begin = k;
1078
0
                begin.push_back('\x00'); // Update to next smallest key for iteration
1079
0
                break;
1080
0
            }
1081
0
        }
1082
0
    } while (it->more() && !stopped());
1083
1084
0
    return 0;
1085
0
}
1086
1087
int InstanceChecker::collect_tablet_rowsets(
1088
50
        int64_t tablet_id, const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb) {
1089
50
    std::unique_ptr<Transaction> txn;
1090
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1091
50
    if (err != TxnErrorCode::TXN_OK) {
1092
0
        LOG(WARNING) << "failed to create txn";
1093
0
        return -1;
1094
0
    }
1095
50
    std::unique_ptr<RangeGetIterator> it;
1096
50
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1097
50
    auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1098
1099
50
    int64_t rowsets_num {0};
1100
50
    do {
1101
50
        TxnErrorCode err = txn->get(begin, end, &it);
1102
50
        if (err != TxnErrorCode::TXN_OK) {
1103
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1104
0
            return -1;
1105
0
        }
1106
50
        if (!it->has_next()) {
1107
0
            break;
1108
0
        }
1109
394
        while (it->has_next() && !stopped()) {
1110
394
            auto [k, v] = it->next();
1111
394
            doris::RowsetMetaCloudPB rowset;
1112
394
            if (!rowset.ParseFromArray(v.data(), v.size())) {
1113
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1114
0
                return -1;
1115
0
            }
1116
1117
394
            ++rowsets_num;
1118
394
            collect_cb(rowset);
1119
1120
394
            if (!it->has_next()) {
1121
50
                begin = k;
1122
50
                begin.push_back('\x00'); // Update to next smallest key for iteration
1123
50
                break;
1124
50
            }
1125
394
        }
1126
50
    } while (it->more() && !stopped());
1127
1128
50
    LOG(INFO) << fmt::format(
1129
50
            "[delete bitmap checker] successfully collect rowsets for instance_id={}, "
1130
50
            "tablet_id={}, rowsets_num={}",
1131
50
            instance_id_, tablet_id, rowsets_num);
1132
50
    return 0;
1133
50
}
1134
1135
2
int InstanceChecker::do_delete_bitmap_inverted_check() {
1136
2
    LOG(INFO) << fmt::format(
1137
2
            "[delete bitmap checker] begin to do_delete_bitmap_inverted_check for instance_id={}",
1138
2
            instance_id_);
1139
1140
    // number of delete bitmap keys being scanned
1141
2
    int64_t total_delete_bitmap_keys {0};
1142
    // number of delete bitmaps which belongs to non mow tablet
1143
2
    int64_t abnormal_delete_bitmaps {0};
1144
    // number of delete bitmaps which doesn't have corresponding rowset in MS
1145
2
    int64_t leaked_delete_bitmaps {0};
1146
1147
2
    auto start_time = std::chrono::steady_clock::now();
1148
2
    DORIS_CLOUD_DEFER {
1149
2
        g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_, leaked_delete_bitmaps);
1150
2
        g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_, abnormal_delete_bitmaps);
1151
2
        g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_, total_delete_bitmap_keys);
1152
1153
2
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1154
2
                            std::chrono::steady_clock::now() - start_time)
1155
2
                            .count();
1156
2
        if (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) {
1157
1
            LOG(WARNING) << fmt::format(
1158
1
                    "[delete bitmap check fails] delete bitmap inverted check for instance_id={}, "
1159
1
                    "cost={} ms, total_delete_bitmap_keys={}, leaked_delete_bitmaps={}, "
1160
1
                    "abnormal_delete_bitmaps={}",
1161
1
                    instance_id_, cost, total_delete_bitmap_keys, leaked_delete_bitmaps,
1162
1
                    abnormal_delete_bitmaps);
1163
1
        } else {
1164
1
            LOG(INFO) << fmt::format(
1165
1
                    "[delete bitmap checker] delete bitmap inverted check for instance_id={}, "
1166
1
                    "passed. cost={} ms, total_delete_bitmap_keys={}",
1167
1
                    instance_id_, cost, total_delete_bitmap_keys);
1168
1
        }
1169
2
    };
1170
1171
2
    struct TabletsRowsetsCache {
1172
2
        int64_t tablet_id {-1};
1173
2
        bool enable_merge_on_write {false};
1174
2
        std::unordered_set<std::string> rowsets {};
1175
2
        std::unordered_set<std::string> pending_delete_bitmaps {};
1176
2
    } tablet_rowsets_cache {};
1177
1178
2
    std::unique_ptr<RangeGetIterator> it;
1179
2
    auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0});
1180
2
    auto end =
1181
2
            meta_delete_bitmap_key({instance_id_, std::numeric_limits<int64_t>::max(), "", 0, 0});
1182
2
    do {
1183
2
        std::unique_ptr<Transaction> txn;
1184
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1185
2
        if (err != TxnErrorCode::TXN_OK) {
1186
0
            LOG(WARNING) << "failed to create txn";
1187
0
            return -1;
1188
0
        }
1189
2
        err = txn->get(begin, end, &it);
1190
2
        if (err != TxnErrorCode::TXN_OK) {
1191
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1192
0
            return -1;
1193
0
        }
1194
2
        if (!it->has_next()) {
1195
0
            break;
1196
0
        }
1197
502
        while (it->has_next() && !stopped()) {
1198
500
            auto [k, v] = it->next();
1199
500
            std::string_view k1 = k;
1200
500
            k1.remove_prefix(1);
1201
500
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1202
500
            decode_key(&k1, &out);
1203
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1204
500
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
1205
500
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1206
500
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1207
500
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1208
1209
500
            ++total_delete_bitmap_keys;
1210
1211
500
            if (!it->has_next()) {
1212
2
                begin = k;
1213
2
                begin.push_back('\x00'); // Update to next smallest key for iteration
1214
2
            }
1215
1216
500
            if (tablet_rowsets_cache.tablet_id == -1 ||
1217
500
                tablet_rowsets_cache.tablet_id != tablet_id) {
1218
30
                TabletMetaCloudPB tablet_meta;
1219
30
                int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1220
30
                if (ret < 0) {
1221
0
                    LOG(WARNING) << fmt::format(
1222
0
                            "[delete bitmap checker] failed to get_tablet_meta in "
1223
0
                            "do_delete_bitmap_inverted_check(), instance_id={}, tablet_id={}",
1224
0
                            instance_id_, tablet_id);
1225
0
                    return ret;
1226
0
                }
1227
1228
30
                tablet_rowsets_cache.tablet_id = tablet_id;
1229
30
                tablet_rowsets_cache.enable_merge_on_write =
1230
30
                        tablet_meta.enable_unique_key_merge_on_write();
1231
30
                tablet_rowsets_cache.rowsets.clear();
1232
30
                tablet_rowsets_cache.pending_delete_bitmaps.clear();
1233
1234
30
                if (tablet_rowsets_cache.enable_merge_on_write) {
1235
                    // only collect rowsets for merge-on-write tablet
1236
20
                    auto collect_cb =
1237
199
                            [&tablet_rowsets_cache](const doris::RowsetMetaCloudPB& rowset) {
1238
199
                                tablet_rowsets_cache.rowsets.insert(rowset.rowset_id_v2());
1239
199
                            };
1240
20
                    ret = collect_tablet_rowsets(tablet_id, collect_cb);
1241
20
                    if (ret < 0) {
1242
0
                        return ret;
1243
0
                    }
1244
                    // get pending delete bitmaps
1245
20
                    ret = get_pending_delete_bitmap_keys(
1246
20
                            tablet_id, tablet_rowsets_cache.pending_delete_bitmaps);
1247
20
                    if (ret < 0) {
1248
0
                        return ret;
1249
0
                    }
1250
20
                }
1251
30
            }
1252
500
            DCHECK_EQ(tablet_id, tablet_rowsets_cache.tablet_id);
1253
1254
500
            if (!tablet_rowsets_cache.enable_merge_on_write) {
1255
                // clang-format off
1256
40
                TEST_SYNC_POINT_CALLBACK(
1257
40
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
1258
40
                        &tablet_id, &rowset_id, &version, &segment_id);
1259
                // clang-format on
1260
40
                ++abnormal_delete_bitmaps;
1261
                // log an error and continue to check the next delete bitmap
1262
40
                LOG(WARNING) << fmt::format(
1263
40
                        "[delete bitmap check fails] find a delete bitmap belongs to tablet "
1264
40
                        "which is not a merge-on-write table! instance_id={}, tablet_id={}, "
1265
40
                        "version={}, segment_id={}",
1266
40
                        instance_id_, tablet_id, version, segment_id);
1267
40
                continue;
1268
40
            }
1269
1270
460
            if (!tablet_rowsets_cache.rowsets.contains(rowset_id) &&
1271
460
                !tablet_rowsets_cache.pending_delete_bitmaps.contains(std::string(k))) {
1272
170
                TEST_SYNC_POINT_CALLBACK(
1273
170
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap",
1274
170
                        &tablet_id, &rowset_id, &version, &segment_id);
1275
170
                ++leaked_delete_bitmaps;
1276
                // log an error and continue to check the next delete bitmap
1277
170
                LOG(WARNING) << fmt::format(
1278
170
                        "[delete bitmap check fails] can't find corresponding rowset for delete "
1279
170
                        "bitmap instance_id={}, tablet_id={}, rowset_id={}, version={}, "
1280
170
                        "segment_id={}",
1281
170
                        instance_id_, tablet_id, rowset_id, version, segment_id);
1282
170
            }
1283
460
        }
1284
2
    } while (it->more() && !stopped());
1285
1286
2
    return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0;
1287
2
}
1288
1289
int InstanceChecker::get_pending_delete_bitmap_keys(
1290
50
        int64_t tablet_id, std::unordered_set<std::string>& pending_delete_bitmaps) {
1291
50
    std::unique_ptr<Transaction> txn;
1292
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1293
50
    if (err != TxnErrorCode::TXN_OK) {
1294
0
        LOG(WARNING) << "failed to create txn";
1295
0
        return -1;
1296
0
    }
1297
50
    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
1298
50
    std::string pending_val;
1299
50
    err = txn->get(pending_key, &pending_val);
1300
50
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1301
0
        LOG(WARNING) << "failed to get pending delete bitmap kv, err=" << err;
1302
0
        return -1;
1303
0
    }
1304
50
    if (err == TxnErrorCode::TXN_OK) {
1305
2
        PendingDeleteBitmapPB pending_info;
1306
2
        if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
1307
0
            LOG(WARNING) << "failed to parse PendingDeleteBitmapPB, tablet=" << tablet_id;
1308
0
            return -1;
1309
0
        }
1310
12
        for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) {
1311
12
            pending_delete_bitmaps.emplace(std::string(delete_bitmap_key));
1312
12
        }
1313
2
    }
1314
50
    return 0;
1315
50
}
1316
1317
int InstanceChecker::check_inverted_index_file_storage_format_v1(
1318
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1319
14
        RowsetIndexesFormatV1& rowset_index_cache_v1) {
1320
    // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1321
14
    std::string rowset_id;
1322
14
    int64_t segment_id;
1323
14
    std::string index_id_with_suffix_name;
1324
    // {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1325
14
    std::vector<std::string> str;
1326
14
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1327
14
    if (str.size() < 3) {
1328
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 3, rowset_info = "
1329
0
                     << rowset_info;
1330
0
        return -1;
1331
0
    }
1332
14
    rowset_id = str[0];
1333
14
    segment_id = std::atoll(str[1].c_str());
1334
14
    index_id_with_suffix_name = str[2];
1335
1336
14
    if (rowset_index_cache_v1.rowset_id == rowset_id) {
1337
0
        if (rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1338
0
            if (auto it = rowset_index_cache_v1.index_ids.find(index_id_with_suffix_name);
1339
0
                it == rowset_index_cache_v1.index_ids.end()) {
1340
                // clang-format off
1341
0
                LOG(WARNING) << fmt::format("index_id with suffix name not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1342
                // clang-format on
1343
0
                return -1;
1344
0
            }
1345
0
        } else {
1346
            // clang-format off
1347
0
            LOG(WARNING) << fmt::format("segment id not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1348
            // clang-format on
1349
0
            return -1;
1350
0
        }
1351
0
    }
1352
1353
14
    rowset_index_cache_v1.rowset_id = rowset_id;
1354
14
    rowset_index_cache_v1.segment_ids.clear();
1355
14
    rowset_index_cache_v1.index_ids.clear();
1356
1357
14
    std::unique_ptr<Transaction> txn;
1358
14
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1359
14
    if (err != TxnErrorCode::TXN_OK) {
1360
0
        LOG(WARNING) << "failed to create txn";
1361
0
        return -1;
1362
0
    }
1363
14
    std::unique_ptr<RangeGetIterator> it;
1364
14
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1365
14
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1366
14
    do {
1367
14
        TxnErrorCode err = txn->get(begin, end, &it);
1368
14
        if (err != TxnErrorCode::TXN_OK) {
1369
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1370
0
            return -1;
1371
0
        }
1372
14
        if (!it->has_next()) {
1373
8
            break;
1374
8
        }
1375
6
        while (it->has_next()) {
1376
            // recycle corresponding resources
1377
6
            auto [k, v] = it->next();
1378
6
            doris::RowsetMetaCloudPB rs_meta;
1379
6
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1380
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1381
0
                return -1;
1382
0
            }
1383
1384
6
            TabletIndexPB tablet_index;
1385
6
            if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) ==
1386
6
                -1) {
1387
0
                LOG(WARNING) << "failedt to get tablet index, tablet_id= " << rs_meta.tablet_id();
1388
0
                return -1;
1389
0
            }
1390
1391
6
            auto tablet_schema_key = meta_schema_key(
1392
6
                    {instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
1393
6
            ValueBuf tablet_schema_val;
1394
6
            err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
1395
1396
6
            if (err != TxnErrorCode::TXN_OK) {
1397
0
                LOG(WARNING) << "failed to get schema, err=" << err;
1398
0
                return -1;
1399
0
            }
1400
1401
6
            auto* schema = rs_meta.mutable_tablet_schema();
1402
6
            if (!parse_schema_value(tablet_schema_val, schema)) {
1403
0
                LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
1404
0
                return -1;
1405
0
            }
1406
1407
12
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1408
6
                rowset_index_cache_v1.segment_ids.insert(i);
1409
6
            }
1410
1411
6
            for (const auto& i : rs_meta.tablet_schema().index()) {
1412
6
                if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
1413
6
                    LOG(INFO) << fmt::format(
1414
6
                            "record index info, index_id: {}, index_suffix_name: {}", i.index_id(),
1415
6
                            i.index_suffix_name());
1416
6
                    rowset_index_cache_v1.index_ids.insert(
1417
6
                            fmt::format("{}{}", i.index_id(), i.index_suffix_name()));
1418
6
                }
1419
6
            }
1420
1421
6
            if (!it->has_next()) {
1422
6
                begin = k;
1423
6
                begin.push_back('\x00'); // Update to next smallest key for iteration
1424
6
                break;
1425
6
            }
1426
6
        }
1427
6
    } while (it->more() && !stopped());
1428
1429
14
    if (!rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1430
        // Garbage data leak
1431
        // clang-format off
1432
8
        LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains segment_id, rowset should be recycled,"
1433
8
                     << " key = " << file_path
1434
8
                     << " segment_id = " << segment_id;
1435
        // clang-format on
1436
8
        return 1;
1437
8
    }
1438
1439
6
    if (!rowset_index_cache_v1.index_ids.contains(index_id_with_suffix_name)) {
1440
        // Garbage data leak
1441
        // clang-format off
1442
0
        LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains index_id_with_suffix_name,"
1443
0
                     << " rowset with inde meta should be recycled, key=" << file_path
1444
0
                     << " index_id_with_suffix_name=" << index_id_with_suffix_name;
1445
        // clang-format on
1446
0
        return 1;
1447
0
    }
1448
1449
6
    return 0;
1450
6
}
1451
1452
int InstanceChecker::check_inverted_index_file_storage_format_v2(
1453
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1454
40
        RowsetIndexesFormatV2& rowset_index_cache_v2) {
1455
40
    std::string rowset_id;
1456
40
    int64_t segment_id;
1457
    // {rowset_id}_{seg_num}.idx
1458
40
    std::vector<std::string> str;
1459
40
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1460
40
    if (str.size() < 2) {
1461
        // clang-format off
1462
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 2, rowset_info = " << rowset_info;
1463
        // clang-format on
1464
0
        return -1;
1465
0
    }
1466
40
    rowset_id = str[0];
1467
40
    segment_id = std::atoll(str[1].c_str());
1468
1469
40
    if (rowset_index_cache_v2.rowset_id == rowset_id) {
1470
0
        if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1471
            // clang-format off
1472
0
            LOG(WARNING) << fmt::format("index file not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1473
            // clang-format on
1474
0
            return -1;
1475
0
        }
1476
0
    }
1477
1478
40
    rowset_index_cache_v2.rowset_id = rowset_id;
1479
40
    rowset_index_cache_v2.segment_ids.clear();
1480
1481
40
    std::unique_ptr<Transaction> txn;
1482
40
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1483
40
    if (err != TxnErrorCode::TXN_OK) {
1484
0
        LOG(WARNING) << "failed to create txn";
1485
0
        return -1;
1486
0
    }
1487
40
    std::unique_ptr<RangeGetIterator> it;
1488
40
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1489
40
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1490
40
    do {
1491
40
        TxnErrorCode err = txn->get(begin, end, &it);
1492
40
        if (err != TxnErrorCode::TXN_OK) {
1493
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1494
0
            return -1;
1495
0
        }
1496
40
        if (!it->has_next()) {
1497
5
            break;
1498
5
        }
1499
35
        while (it->has_next()) {
1500
            // recycle corresponding resources
1501
35
            auto [k, v] = it->next();
1502
35
            doris::RowsetMetaCloudPB rs_meta;
1503
35
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1504
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1505
0
                return -1;
1506
0
            }
1507
1508
70
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1509
35
                rowset_index_cache_v2.segment_ids.insert(i);
1510
35
            }
1511
1512
35
            if (!it->has_next()) {
1513
35
                begin = k;
1514
35
                begin.push_back('\x00'); // Update to next smallest key for iteration
1515
35
                break;
1516
35
            }
1517
35
        }
1518
35
    } while (it->more() && !stopped());
1519
1520
40
    if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1521
        // Garbage data leak
1522
5
        LOG(WARNING) << "rowset with index meta should be recycled, key=" << file_path;
1523
5
        return 1;
1524
5
    }
1525
1526
35
    return 0;
1527
40
}
1528
1529
int InstanceChecker::check_delete_bitmap_storage_optimize_v2(
1530
        int64_t tablet_id, bool has_sequence_col,
1531
30
        int64_t& rowsets_with_useless_delete_bitmap_version) {
1532
    // end_version: create_time
1533
30
    std::map<int64_t, int64_t> tablet_rowsets_map {};
1534
    // rowset_id: {start_version, end_version}
1535
30
    std::map<std::string, std::pair<int64_t, int64_t>> rowset_version_map;
1536
    // Get all visible rowsets of this tablet
1537
195
    auto collect_cb = [&](const doris::RowsetMetaCloudPB& rowset) {
1538
195
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1539
            // ignore dummy rowset [0-1]
1540
0
            return;
1541
0
        }
1542
195
        tablet_rowsets_map[rowset.end_version()] = rowset.creation_time();
1543
195
        rowset_version_map[rowset.rowset_id_v2()] =
1544
195
                std::make_pair(rowset.start_version(), rowset.end_version());
1545
195
    };
1546
30
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1547
0
        return ret;
1548
0
    }
1549
1550
30
    std::unordered_set<std::string> pending_delete_bitmaps;
1551
30
    if (auto ret = get_pending_delete_bitmap_keys(tablet_id, pending_delete_bitmaps); ret < 0) {
1552
0
        return ret;
1553
0
    }
1554
1555
30
    std::unique_ptr<RangeGetIterator> it;
1556
30
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
1557
30
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
1558
30
    std::string last_rowset_id = "";
1559
30
    int64_t last_version = 0;
1560
30
    int64_t last_failed_version = 0;
1561
30
    std::vector<int64_t> failed_versions;
1562
30
    auto print_failed_versions = [&]() {
1563
4
        TEST_SYNC_POINT_CALLBACK(
1564
4
                "InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_"
1565
4
                "rowset",
1566
4
                &tablet_id, &last_rowset_id);
1567
4
        rowsets_with_useless_delete_bitmap_version++;
1568
        // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18]
1569
        // print as [8-11, 13, 17-18]
1570
4
        int64_t last_start_version = -1;
1571
4
        int64_t last_end_version = -1;
1572
4
        std::stringstream ss;
1573
4
        ss << "[";
1574
9
        for (int64_t version : failed_versions) {
1575
9
            if (last_start_version == -1) {
1576
4
                last_start_version = version;
1577
4
                last_end_version = version;
1578
4
                continue;
1579
4
            }
1580
5
            if (last_end_version + 1 == version) {
1581
2
                last_end_version = version;
1582
3
            } else {
1583
3
                if (last_start_version == last_end_version) {
1584
3
                    ss << last_start_version << ", ";
1585
3
                } else {
1586
0
                    ss << last_start_version << "-" << last_end_version << ", ";
1587
0
                }
1588
3
                last_start_version = version;
1589
3
                last_end_version = version;
1590
3
            }
1591
5
        }
1592
4
        if (last_start_version == last_end_version) {
1593
3
            ss << last_start_version;
1594
3
        } else {
1595
1
            ss << last_start_version << "-" << last_end_version;
1596
1
        }
1597
4
        ss << "]";
1598
4
        std::stringstream version_str;
1599
4
        auto it = rowset_version_map.find(last_rowset_id);
1600
4
        if (it != rowset_version_map.end()) {
1601
4
            version_str << "[" << it->second.first << "-" << it->second.second << "]";
1602
4
        }
1603
4
        LOG(WARNING) << fmt::format(
1604
4
                "[delete bitmap check fails] delete bitmap storage optimize v2 check fail "
1605
4
                "for instance_id={}, tablet_id={}, rowset_id={}, version={} found delete "
1606
4
                "bitmap with versions={}, size={}",
1607
4
                instance_id_, tablet_id, last_rowset_id, version_str.str(), ss.str(),
1608
4
                failed_versions.size());
1609
4
    };
1610
30
    using namespace std::chrono;
1611
30
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
1612
30
    do {
1613
30
        std::unique_ptr<Transaction> txn;
1614
30
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1615
30
        if (err != TxnErrorCode::TXN_OK) {
1616
0
            LOG(WARNING) << "failed to create txn";
1617
0
            return -1;
1618
0
        }
1619
30
        err = txn->get(begin, end, &it);
1620
30
        if (err != TxnErrorCode::TXN_OK) {
1621
0
            LOG(WARNING) << "failed to get delete bitmap kv, err=" << err;
1622
0
            return -1;
1623
0
        }
1624
30
        if (!it->has_next()) {
1625
0
            break;
1626
0
        }
1627
771
        while (it->has_next() && !stopped()) {
1628
741
            auto [k, v] = it->next();
1629
741
            std::string_view k1 = k;
1630
741
            k1.remove_prefix(1);
1631
741
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1632
741
            decode_key(&k1, &out);
1633
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1634
741
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1635
741
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1636
741
            if (!it->has_next()) {
1637
30
                begin = k;
1638
30
                begin.push_back('\x00'); // Update to next smallest key for iteration
1639
30
            }
1640
741
            if (rowset_id == last_rowset_id && version == last_version) {
1641
                // skip the same rowset and version
1642
167
                continue;
1643
167
            }
1644
574
            if (rowset_id != last_rowset_id && !failed_versions.empty()) {
1645
3
                print_failed_versions();
1646
3
                last_failed_version = 0;
1647
3
                failed_versions.clear();
1648
3
            }
1649
574
            last_rowset_id = rowset_id;
1650
574
            last_version = version;
1651
574
            if (tablet_rowsets_map.find(version) != tablet_rowsets_map.end()) {
1652
548
                continue;
1653
548
            }
1654
26
            auto version_it = rowset_version_map.find(rowset_id);
1655
26
            if (version_it == rowset_version_map.end()) {
1656
                // checked in do_delete_bitmap_inverted_check
1657
1
                continue;
1658
1
            }
1659
25
            if (pending_delete_bitmaps.contains(std::string(k))) {
1660
3
                continue;
1661
3
            }
1662
22
            if (has_sequence_col && version >= version_it->second.first &&
1663
22
                version <= version_it->second.second) {
1664
5
                continue;
1665
5
            }
1666
            // there may be an interval in this situation:
1667
            // 1. finish compaction job; 2. checker; 3. finish agg and remove delete bitmap to ms
1668
17
            auto rowset_it = tablet_rowsets_map.upper_bound(version);
1669
17
            if (rowset_it == tablet_rowsets_map.end()) {
1670
1
                if (version != last_failed_version) {
1671
1
                    failed_versions.push_back(version);
1672
1
                }
1673
1
                last_failed_version = version;
1674
1
                continue;
1675
1
            }
1676
16
            if (rowset_it->second + config::delete_bitmap_storage_optimize_v2_check_skip_seconds >=
1677
16
                now) {
1678
8
                continue;
1679
8
            }
1680
8
            if (version != last_failed_version) {
1681
8
                failed_versions.push_back(version);
1682
8
            }
1683
8
            last_failed_version = version;
1684
8
        }
1685
30
    } while (it->more() && !stopped());
1686
30
    if (!failed_versions.empty()) {
1687
1
        print_failed_versions();
1688
1
    }
1689
30
    LOG(INFO) << fmt::format(
1690
30
            "[delete bitmap checker] finish check delete bitmap storage optimize v2 for "
1691
30
            "instance_id={}, tablet_id={}, rowsets_num={}, "
1692
30
            "rowsets_with_useless_delete_bitmap_version={}",
1693
30
            instance_id_, tablet_id, tablet_rowsets_map.size(),
1694
30
            rowsets_with_useless_delete_bitmap_version);
1695
30
    return (rowsets_with_useless_delete_bitmap_version > 1 ? 1 : 0);
1696
30
}
1697
1698
3
int InstanceChecker::do_delete_bitmap_storage_optimize_check(int version) {
1699
3
    if (version != 2) {
1700
0
        return -1;
1701
0
    }
1702
3
    int64_t total_tablets_num {0};
1703
3
    int64_t failed_tablets_num {0};
1704
1705
    // for v2 check
1706
3
    int64_t max_rowsets_with_useless_delete_bitmap_version = 0;
1707
3
    int64_t tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = 0;
1708
1709
    // check that for every visible rowset, there exists at least delete one bitmap in MS
1710
30
    int ret = traverse_mow_tablet([&](int64_t tablet_id, bool has_sequence_col) {
1711
30
        ++total_tablets_num;
1712
30
        int64_t rowsets_with_useless_delete_bitmap_version = 0;
1713
30
        int res = check_delete_bitmap_storage_optimize_v2(
1714
30
                tablet_id, has_sequence_col, rowsets_with_useless_delete_bitmap_version);
1715
30
        if (rowsets_with_useless_delete_bitmap_version >
1716
30
            max_rowsets_with_useless_delete_bitmap_version) {
1717
1
            max_rowsets_with_useless_delete_bitmap_version =
1718
1
                    rowsets_with_useless_delete_bitmap_version;
1719
1
            tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = tablet_id;
1720
1
        }
1721
30
        failed_tablets_num += (res != 0);
1722
30
        return res;
1723
30
    });
1724
1725
3
    if (ret < 0) {
1726
0
        return ret;
1727
0
    }
1728
1729
3
    g_bvar_max_rowsets_with_useless_delete_bitmap_version.put(
1730
3
            instance_id_, max_rowsets_with_useless_delete_bitmap_version);
1731
1732
3
    std::stringstream ss;
1733
3
    ss << "[delete bitmap checker] check delete bitmap storage optimize v" << version
1734
3
       << " for instance_id=" << instance_id_ << ", total_tablets_num=" << total_tablets_num
1735
3
       << ", failed_tablets_num=" << failed_tablets_num
1736
3
       << ". max_rowsets_with_useless_delete_bitmap_version="
1737
3
       << max_rowsets_with_useless_delete_bitmap_version
1738
3
       << ", tablet_id=" << tablet_id_with_max_rowsets_with_useless_delete_bitmap_version;
1739
3
    LOG(INFO) << ss.str();
1740
1741
3
    return (failed_tablets_num > 0) ? 1 : 0;
1742
3
}
1743
1744
3
int InstanceChecker::do_mow_job_key_check() {
1745
3
    std::unique_ptr<RangeGetIterator> it;
1746
3
    std::string begin = mow_tablet_job_key({instance_id_, 0, 0});
1747
3
    std::string end = mow_tablet_job_key({instance_id_, INT64_MAX, 0});
1748
3
    MowTabletJobPB mow_tablet_job;
1749
3
    do {
1750
3
        std::unique_ptr<Transaction> txn;
1751
3
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1752
3
        if (err != TxnErrorCode::TXN_OK) {
1753
0
            LOG(WARNING) << "failed to create txn";
1754
0
            return -1;
1755
0
        }
1756
3
        err = txn->get(begin, end, &it);
1757
3
        if (err != TxnErrorCode::TXN_OK) {
1758
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
1759
0
            return -1;
1760
0
        }
1761
3
        int64_t now = duration_cast<std::chrono::seconds>(
1762
3
                              std::chrono::system_clock::now().time_since_epoch())
1763
3
                              .count();
1764
3
        while (it->has_next() && !stopped()) {
1765
2
            auto [k, v] = it->next();
1766
2
            std::string_view k1 = k;
1767
2
            k1.remove_prefix(1);
1768
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1769
2
            decode_key(&k1, &out);
1770
            // 0x01 "meta" ${instance_id} "mow_tablet_job" ${table_id} ${initiator}
1771
2
            auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1772
2
            auto initiator = std::get<int64_t>(std::get<0>(out[4]));
1773
2
            if (!mow_tablet_job.ParseFromArray(v.data(), v.size())) [[unlikely]] {
1774
0
                LOG(WARNING) << "failed to parse MowTabletJobPB";
1775
0
                return -1;
1776
0
            }
1777
2
            int64_t expiration = mow_tablet_job.expiration();
1778
            // check job key failed should meet both following two condition:
1779
            // 1. job key is expired
1780
            // 2. table lock key is not found or key is not expired
1781
2
            if (expiration < now - config::mow_job_key_check_expiration_diff_seconds) {
1782
2
                std::string lock_key =
1783
2
                        meta_delete_bitmap_update_lock_key({instance_id_, table_id, -1});
1784
2
                std::string lock_val;
1785
2
                err = txn->get(lock_key, &lock_val);
1786
2
                std::string reason = "";
1787
2
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
1788
0
                    reason = "table lock key not found";
1789
1790
2
                } else {
1791
2
                    DeleteBitmapUpdateLockPB lock_info;
1792
2
                    if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
1793
0
                        LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB";
1794
0
                        return -1;
1795
0
                    }
1796
2
                    if (lock_info.expiration() > now || lock_info.lock_id() != -1) {
1797
2
                        reason = "table lock is not expired,lock_id=" +
1798
2
                                 std::to_string(lock_info.lock_id());
1799
2
                    }
1800
2
                }
1801
2
                if (reason != "") {
1802
2
                    LOG(WARNING) << fmt::format(
1803
2
                            "[compaction key check fails] mow job key check fail for "
1804
2
                            "instance_id={}, table_id={}, initiator={}, expiration={}, now={}, "
1805
2
                            "reason={}",
1806
2
                            instance_id_, table_id, initiator, expiration, now, reason);
1807
2
                    return -1;
1808
2
                }
1809
2
            }
1810
2
        }
1811
1
        begin = it->next_begin_key(); // Update to next smallest key for iteration
1812
1
    } while (it->more() && !stopped());
1813
1
    return 0;
1814
3
}
1815
4
int InstanceChecker::do_tablet_stats_key_check() {
1816
4
    int ret = 0;
1817
1818
4
    int64_t nums_leak = 0;
1819
4
    int64_t nums_loss = 0;
1820
4
    int64_t nums_scanned = 0;
1821
4
    int64_t nums_abnormal = 0;
1822
1823
4
    std::string begin = meta_tablet_key({instance_id_, 0, 0, 0, 0});
1824
4
    std::string end = meta_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1825
    // inverted check tablet exists
1826
4
    LOG(INFO) << "begin inverted check stats_tablet_key";
1827
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1828
4
        int ret = check_stats_tablet_key_exists(key, value);
1829
4
        nums_scanned++;
1830
4
        if (ret == 1) {
1831
1
            nums_loss++;
1832
1
        }
1833
4
        return ret;
1834
4
    });
1835
4
    if (ret == -1) {
1836
0
        LOG(WARNING) << "failed to inverted check if stats tablet key exists";
1837
0
        return -1;
1838
4
    } else if (ret == 1) {
1839
1
        LOG(WARNING) << "stats_tablet_key loss, nums_scanned=" << nums_scanned
1840
1
                     << ", nums_loss=" << nums_loss;
1841
1
        return 1;
1842
1
    }
1843
3
    LOG(INFO) << "finish inverted check stats_tablet_key, nums_scanned=" << nums_scanned
1844
3
              << ", nums_loss=" << nums_loss;
1845
1846
3
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1847
3
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1848
3
    nums_scanned = 0;
1849
    // check tablet exists
1850
3
    LOG(INFO) << "begin check stats_tablet_key leaked";
1851
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1852
4
        int ret = check_stats_tablet_key_leaked(key, value);
1853
4
        nums_scanned++;
1854
4
        if (ret == 1) {
1855
1
            nums_leak++;
1856
1
        }
1857
4
        return ret;
1858
4
    });
1859
3
    if (ret == -1) {
1860
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1861
0
        return -1;
1862
3
    } else if (ret == 1) {
1863
1
        LOG(WARNING) << "stats_tablet_key leaked, nums_scanned=" << nums_scanned
1864
1
                     << ", nums_leak=" << nums_leak;
1865
1
        return 1;
1866
1
    }
1867
2
    LOG(INFO) << "finish check stats_tablet_key leaked, nums_scanned=" << nums_scanned
1868
2
              << ", nums_leak=" << nums_leak;
1869
1870
2
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1871
2
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1872
2
    nums_scanned = 0;
1873
    // check if key is normal
1874
2
    LOG(INFO) << "begin check stats_tablet_key abnormal";
1875
2
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1876
2
        int ret = check_stats_tablet_key(key, value);
1877
2
        nums_scanned++;
1878
2
        if (ret == 1) {
1879
1
            nums_abnormal++;
1880
1
        }
1881
2
        return ret;
1882
2
    });
1883
2
    if (ret == -1) {
1884
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1885
0
        return -1;
1886
2
    } else if (ret == 1) {
1887
1
        LOG(WARNING) << "stats_tablet_key abnormal, nums_scanned=" << nums_scanned
1888
1
                     << ", nums_abnormal=" << nums_abnormal;
1889
1
        return 1;
1890
1
    }
1891
1
    LOG(INFO) << "finish check stats_tablet_key, nums_scanned=" << nums_scanned
1892
1
              << ", nums_abnormal=" << nums_abnormal;
1893
1
    return 0;
1894
2
}
1895
1896
4
int InstanceChecker::check_stats_tablet_key_exists(std::string_view key, std::string_view value) {
1897
4
    std::string_view k1 = key;
1898
4
    k1.remove_prefix(1);
1899
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1900
4
    decode_key(&k1, &out);
1901
    // 0x01 "meta" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1902
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1903
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1904
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1905
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1906
4
    std::string tablet_stats_key =
1907
4
            stats_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1908
4
    int ret = key_exist(txn_kv_.get(), tablet_stats_key);
1909
4
    if (ret == 1) {
1910
        // clang-format off
1911
1
        LOG(WARNING) << "stats tablet key's tablet key loss,"
1912
1
                    << " stats tablet key=" << hex(tablet_stats_key)
1913
1
                    << " meta tablet key=" << hex(key);
1914
        // clang-format on
1915
1
        return 1;
1916
3
    } else if (ret == -1) {
1917
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_stats_key);
1918
0
        return -1;
1919
0
    }
1920
3
    LOG(INFO) << "check stats_tablet_key_exists ok, key=" << hex(key);
1921
3
    return 0;
1922
4
}
1923
1924
4
int InstanceChecker::check_stats_tablet_key_leaked(std::string_view key, std::string_view value) {
1925
4
    std::string_view k1 = key;
1926
4
    k1.remove_prefix(1);
1927
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1928
4
    decode_key(&k1, &out);
1929
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1930
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1931
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1932
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1933
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1934
4
    std::string tablet_key =
1935
4
            meta_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1936
4
    int ret = key_exist(txn_kv_.get(), tablet_key);
1937
4
    if (ret == 1) {
1938
        // clang-format off
1939
1
        LOG(WARNING) << "stats tablet key's tablet key leak,"
1940
1
                    << " stats tablet key=" << hex(key)
1941
1
                    << " meta tablet key=" << hex(tablet_key);
1942
        // clang-format on
1943
1
        return 1;
1944
3
    } else if (ret == -1) {
1945
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_key);
1946
0
        return -1;
1947
0
    }
1948
3
    LOG(INFO) << "check stats_tablet_key_leaked ok, key=" << hex(key);
1949
3
    return 0;
1950
4
}
1951
1952
2
int InstanceChecker::check_stats_tablet_key(std::string_view key, std::string_view value) {
1953
2
    TabletStatsPB tablet_stats_pb;
1954
2
    std::string_view k1 = key;
1955
2
    k1.remove_prefix(1);
1956
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1957
2
    decode_key(&k1, &out);
1958
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1959
2
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1960
2
    std::unique_ptr<Transaction> txn;
1961
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1962
2
    if (err != TxnErrorCode::TXN_OK) {
1963
0
        LOG_WARNING("failed to recycle tablet ")
1964
0
                .tag("tablet id", tablet_id)
1965
0
                .tag("instance_id", instance_id_)
1966
0
                .tag("reason", "failed to create txn");
1967
0
        return -1;
1968
0
    }
1969
2
    std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id});
1970
2
    std::string tablet_idx_val;
1971
2
    TabletIndexPB tablet_idx;
1972
2
    err = txn->get(tablet_idx_key, &tablet_idx_val);
1973
2
    if (err != TxnErrorCode::TXN_OK) {
1974
        // clang-format off
1975
0
        LOG(WARNING) << "failed to get tablet index key,"
1976
0
                        << " key=" << hex(tablet_idx_key)
1977
0
                        << " code=" << err;
1978
        // clang-format on
1979
0
        return -1;
1980
0
    }
1981
2
    tablet_idx.ParseFromString(tablet_idx_val);
1982
2
    MetaServiceCode code = MetaServiceCode::OK;
1983
2
    std::string msg;
1984
2
    internal_get_tablet_stats(code, msg, txn.get(), instance_id_, tablet_idx, tablet_stats_pb);
1985
2
    if (code != MetaServiceCode::OK) {
1986
        // clang-format off
1987
0
        LOG(WARNING) << "failed to get tablet stats,"
1988
0
                        << " code=" << code 
1989
0
                        << " msg=" << msg;
1990
        // clang-format on
1991
0
        return -1;
1992
0
    }
1993
1994
2
    GetRowsetResponse resp;
1995
    // get rowsets in tablet
1996
2
    internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id_,
1997
2
                        tablet_id, code, msg, &resp);
1998
2
    if (code != MetaServiceCode::OK) {
1999
0
        LOG_WARNING("failed to get rowsets of tablet when check stats tablet key")
2000
0
                .tag("tablet id", tablet_id)
2001
0
                .tag("msg", msg)
2002
0
                .tag("code", code)
2003
0
                .tag("instance id", instance_id_);
2004
0
        return -1;
2005
0
    }
2006
2
    int64_t num_rows = 0;
2007
2
    int64_t num_rowsets = 0;
2008
2
    int64_t num_segments = 0;
2009
2
    int64_t total_data_size = 0;
2010
2
    for (const auto& rs_meta : resp.rowset_meta()) {
2011
2
        num_rows += rs_meta.num_rows();
2012
2
        num_rowsets++;
2013
2
        num_segments += rs_meta.num_segments();
2014
2
        total_data_size += rs_meta.total_disk_size();
2015
2
    }
2016
2
    int ret = 0;
2017
2
    if (tablet_stats_pb.data_size() != total_data_size) {
2018
1
        ret = 1;
2019
        // clang-format off
2020
1
        LOG(WARNING) << " tablet_stats_pb's data size is not same with all rowset total data size,"
2021
1
                        << " tablet_stats_pb's data size=" << tablet_stats_pb.data_size()
2022
1
                        << " all rowset total data size=" << total_data_size
2023
1
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2024
        // clang-format on
2025
1
    } else if (tablet_stats_pb.num_rows() != num_rows) {
2026
0
        ret = 1;
2027
        // clang-format off
2028
0
        LOG(WARNING) << " tablet_stats_pb's num_rows is not same with all rowset total num_rows,"
2029
0
                        << " tablet_stats_pb's num_rows=" << tablet_stats_pb.num_rows()
2030
0
                        << " all rowset total num_rows=" << num_rows
2031
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2032
        // clang-format on
2033
1
    } else if (tablet_stats_pb.num_rowsets() != num_rowsets) {
2034
0
        ret = 1;
2035
        // clang-format off
2036
0
        LOG(WARNING) << " tablet_stats_pb's num_rowsets is not same with all rowset nums,"
2037
0
                        << " tablet_stats_pb's num_rowsets=" << tablet_stats_pb.num_rowsets()
2038
0
                        << " all rowset nums=" << num_rowsets
2039
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2040
        // clang-format on
2041
1
    } else if (tablet_stats_pb.num_segments() != num_segments) {
2042
0
        ret = 1;
2043
        // clang-format off
2044
0
        LOG(WARNING) << " tablet_stats_pb's num_segments is not same with all rowset total num_segments,"
2045
0
                        << " tablet_stats_pb's num_segments=" << tablet_stats_pb.num_segments()
2046
0
                        << " all rowset total num_segments=" << num_segments
2047
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2048
        // clang-format on
2049
0
    }
2050
2051
2
    return ret;
2052
2
}
2053
2054
int InstanceChecker::scan_and_handle_kv(
2055
        std::string& start_key, const std::string& end_key,
2056
12
        std::function<int(std::string_view, std::string_view)> handle_kv) {
2057
12
    std::unique_ptr<Transaction> txn;
2058
12
    int ret = 0;
2059
12
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2060
12
    if (err != TxnErrorCode::TXN_OK) {
2061
0
        LOG(WARNING) << "failed to init txn";
2062
0
        return -1;
2063
0
    }
2064
12
    std::unique_ptr<RangeGetIterator> it;
2065
12
    int limit = 10000;
2066
12
    TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:limit", &limit);
2067
13
    do {
2068
13
        err = txn->get(start_key, end_key, &it, false, limit);
2069
13
        TEST_SYNC_POINT_CALLBACK("InstanceChecker:scan_and_handle_kv:get_err", &err);
2070
13
        if (err == TxnErrorCode::TXN_TOO_OLD) {
2071
1
            LOG(WARNING) << "failed to get range kv, err=txn too old, "
2072
1
                         << " now fallback to non snapshot scan";
2073
1
            err = txn_kv_->create_txn(&txn);
2074
1
            if (err == TxnErrorCode::TXN_OK) {
2075
1
                err = txn->get(start_key, end_key, &it);
2076
1
            }
2077
1
        }
2078
13
        if (err != TxnErrorCode::TXN_OK) {
2079
0
            LOG(WARNING) << "internal error, failed to get range kv, err=" << err;
2080
0
            return -1;
2081
0
        }
2082
2083
229
        while (it->has_next() && !stopped()) {
2084
216
            auto [k, v] = it->next();
2085
2086
216
            int handle_ret = handle_kv(k, v);
2087
216
            if (handle_ret == -1) {
2088
0
                return -1;
2089
216
            } else {
2090
216
                ret = std::max(ret, handle_ret);
2091
216
            }
2092
216
            if (!it->has_next()) {
2093
13
                start_key = k;
2094
13
            }
2095
216
        }
2096
13
        start_key = it->next_begin_key();
2097
13
    } while (it->more() && !stopped());
2098
12
    return ret;
2099
12
}
2100
2101
2
int InstanceChecker::do_version_key_check() {
2102
2
    std::unique_ptr<RangeGetIterator> it;
2103
2
    std::string begin = table_version_key({instance_id_, 0, 0});
2104
2
    std::string end = table_version_key({instance_id_, INT64_MAX, 0});
2105
2
    bool check_res = true;
2106
2
    do {
2107
2
        std::unique_ptr<Transaction> txn;
2108
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
2109
2
        if (err != TxnErrorCode::TXN_OK) {
2110
0
            LOG(WARNING) << "failed to create txn";
2111
0
            return -1;
2112
0
        }
2113
2
        err = txn->get(begin, end, &it);
2114
2
        if (err != TxnErrorCode::TXN_OK) {
2115
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2116
0
            return -1;
2117
0
        }
2118
4
        while (it->has_next() && !stopped()) {
2119
2
            auto [k, v] = it->next();
2120
2
            std::string_view k1 = k;
2121
2
            k1.remove_prefix(1);
2122
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2123
2
            decode_key(&k1, &out);
2124
2
            int64_t table_version = -1;
2125
            // 0x01 "version" ${instance_id} "table" ${db_id} ${tbl_id}
2126
2
            if (!txn->decode_atomic_int(v, &table_version)) {
2127
0
                LOG(WARNING) << "malformed table version value";
2128
0
                return -1;
2129
0
            }
2130
2
            auto table_id = std::get<int64_t>(std::get<0>(out[4]));
2131
2
            auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2132
2
            std::string partition_version_key_begin =
2133
2
                    partition_version_key({instance_id_, db_id, table_id, 0});
2134
2
            std::string partition_version_key_end =
2135
2
                    partition_version_key({instance_id_, db_id, table_id, INT64_MAX});
2136
2
            VersionPB partition_version_pb;
2137
2138
2
            do {
2139
2
                std::unique_ptr<Transaction> txn;
2140
2
                TxnErrorCode err = txn_kv_->create_txn(&txn);
2141
2
                if (err != TxnErrorCode::TXN_OK) {
2142
0
                    LOG(WARNING) << "failed to create txn";
2143
0
                    return -1;
2144
0
                }
2145
2
                err = txn->get(partition_version_key_begin, partition_version_key_end, &it);
2146
2
                if (err != TxnErrorCode::TXN_OK) {
2147
0
                    LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2148
0
                    return -1;
2149
0
                }
2150
13
                while (it->has_next() && !stopped()) {
2151
11
                    auto [k, v] = it->next();
2152
                    // 0x01 "version" ${instance_id} "partition" ${db_id} ${tbl_id} ${partition_id}
2153
11
                    std::string_view k1 = k;
2154
11
                    k1.remove_prefix(1);
2155
11
                    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2156
11
                    decode_key(&k1, &out);
2157
11
                    if (!partition_version_pb.ParseFromArray(v.data(), v.size())) [[unlikely]] {
2158
0
                        LOG(WARNING) << "failed to parse partition VersionPB";
2159
0
                        return -1;
2160
0
                    }
2161
11
                    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
2162
11
                    int64_t partition_version = partition_version_pb.version();
2163
11
                    if (table_version < partition_version) {
2164
3
                        check_res = false;
2165
3
                        LOG(WARNING)
2166
3
                                << "table version is less than partition version,"
2167
3
                                << " table_id: " << table_id << "tablet_version: " << table_version
2168
3
                                << " partition_id: " << partition_id
2169
3
                                << " partition_version: " << partition_version;
2170
3
                    }
2171
11
                }
2172
2
                partition_version_key_begin = it->next_begin_key();
2173
2
            } while (it->more() && !stopped());
2174
2
        }
2175
2
        begin = it->next_begin_key(); // Update to next smallest key for iteration
2176
2
    } while (it->more() && !stopped());
2177
2
    return check_res ? 0 : -1;
2178
2
}
2179
2180
1
int InstanceChecker::do_restore_job_check() {
2181
1
    int64_t num_prepared = 0;
2182
1
    int64_t num_committed = 0;
2183
1
    int64_t num_dropped = 0;
2184
1
    int64_t num_completed = 0;
2185
1
    int64_t num_recycling = 0;
2186
1
    int64_t num_cost_many_time = 0;
2187
1
    const int64_t COST_MANY_THRESHOLD = 3600;
2188
2189
1
    using namespace std::chrono;
2190
1
    auto start_time = steady_clock::now();
2191
1
    DORIS_CLOUD_DEFER {
2192
1
        g_bvar_checker_restore_job_prepared_state.put(instance_id_, num_prepared);
2193
1
        g_bvar_checker_restore_job_committed_state.put(instance_id_, num_committed);
2194
1
        g_bvar_checker_restore_job_dropped_state.put(instance_id_, num_dropped);
2195
1
        g_bvar_checker_restore_job_completed_state.put(instance_id_, num_completed);
2196
1
        g_bvar_checker_restore_job_recycling_state.put(instance_id_, num_recycling);
2197
1
        g_bvar_checker_restore_job_cost_many_time.put(instance_id_, num_cost_many_time);
2198
1
        auto cost_ms =
2199
1
                duration_cast<std::chrono::milliseconds>(steady_clock::now() - start_time).count();
2200
1
        LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
2201
1
                  << "ms. instance_id=" << instance_id_ << " num_prepared=" << num_prepared
2202
1
                  << " num_committed=" << num_committed << " num_dropped=" << num_dropped
2203
1
                  << " num_completed=" << num_completed << " num_recycling=" << num_recycling
2204
1
                  << " num_cost_many_time=" << num_cost_many_time;
2205
1
    };
2206
2207
1
    LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
2208
2209
1
    JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
2210
1
    JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
2211
1
    std::string begin;
2212
1
    std::string end;
2213
1
    job_restore_tablet_key(restore_job_key_info0, &begin);
2214
1
    job_restore_tablet_key(restore_job_key_info1, &end);
2215
1
    std::unique_ptr<RangeGetIterator> it;
2216
1
    do {
2217
1
        std::unique_ptr<Transaction> txn;
2218
1
        TxnErrorCode err = txn_kv_->create_txn(&txn);
2219
1
        if (err != TxnErrorCode::TXN_OK) {
2220
0
            LOG(WARNING) << "failed to create txn";
2221
0
            return -1;
2222
0
        }
2223
1
        err = txn->get(begin, end, &it);
2224
1
        if (err != TxnErrorCode::TXN_OK) {
2225
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2226
0
            return -1;
2227
0
        }
2228
2229
1
        if (!it->has_next()) {
2230
0
            break;
2231
0
        }
2232
3
        while (it->has_next()) {
2233
3
            auto [k, v] = it->next();
2234
3
            RestoreJobCloudPB restore_job_pb;
2235
3
            if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
2236
0
                LOG_WARNING("malformed restore job value").tag("key", hex(k));
2237
0
                return -1;
2238
0
            }
2239
2240
3
            switch (restore_job_pb.state()) {
2241
1
            case RestoreJobCloudPB::PREPARED:
2242
1
                ++num_prepared;
2243
1
                break;
2244
1
            case RestoreJobCloudPB::COMMITTED:
2245
1
                ++num_committed;
2246
1
                break;
2247
0
            case RestoreJobCloudPB::DROPPED:
2248
0
                ++num_dropped;
2249
0
                break;
2250
1
            case RestoreJobCloudPB::COMPLETED:
2251
1
                ++num_completed;
2252
1
                break;
2253
0
            case RestoreJobCloudPB::RECYCLING:
2254
0
                ++num_recycling;
2255
0
                break;
2256
0
            default:
2257
0
                break;
2258
3
            }
2259
2260
3
            int64_t current_time = ::time(nullptr);
2261
3
            if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
2262
3
                 restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
2263
3
                current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD) {
2264
                // restore job run more than 1 hour
2265
1
                ++num_cost_many_time;
2266
1
                LOG_WARNING("restore job cost too many time")
2267
1
                        .tag("key", hex(k))
2268
1
                        .tag("tablet_id", restore_job_pb.tablet_id())
2269
1
                        .tag("state", restore_job_pb.state())
2270
1
                        .tag("ctime_s", restore_job_pb.ctime_s())
2271
1
                        .tag("mtime_s", restore_job_pb.mtime_s());
2272
1
            }
2273
2274
3
            if (!it->has_next()) {
2275
1
                begin = k;
2276
1
                begin.push_back('\x00'); // Update to next smallest key for iteration
2277
1
                break;
2278
1
            }
2279
3
        }
2280
1
    } while (it->more() && !stopped());
2281
1
    return 0;
2282
1
}
2283
2284
3
int InstanceChecker::check_txn_info_key(std::string_view key, std::string_view value) {
2285
3
    std::unordered_map<int64_t, std::string> txn_info_;
2286
3
    TxnLabelPB txn_label_pb;
2287
2288
6
    auto handle_check_txn_label_key = [&](std::string_view key, std::string_view value) -> int {
2289
6
        TxnInfoPB txn_info_pb;
2290
6
        std::string_view k1 = key;
2291
6
        k1.remove_prefix(1);
2292
6
        std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2293
6
        decode_key(&k1, &out);
2294
        // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2295
6
        if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2296
0
            LOG(WARNING) << "failed to parse TxnInfoPB";
2297
0
            return -1;
2298
0
        }
2299
6
        auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2300
6
        auto it = txn_info_.find(txn_id);
2301
6
        if (it == txn_info_.end()) {
2302
0
            return 0;
2303
6
        } else {
2304
6
            if (it->second != txn_info_pb.label()) {
2305
1
                LOG(WARNING) << "txn_info_pb's txn_label not same with txn_label_pb's txn_label,"
2306
1
                             << " txn_info_pb's txn_label: " << txn_info_pb.label()
2307
1
                             << " txn_label_pb meta: " << txn_label_pb.ShortDebugString();
2308
1
                return 1;
2309
1
            }
2310
6
        }
2311
5
        return 0;
2312
6
    };
2313
3
    std::string_view k1 = key;
2314
3
    k1.remove_prefix(1);
2315
3
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2316
3
    decode_key(&k1, &out);
2317
    // 0x01 "txn" ${instance_id} "txn_label" ${db_id} ${label}
2318
3
    if (!txn_label_pb.ParseFromArray(value.data(), value.size() - VERSION_STAMP_LEN)) {
2319
1
        LOG(WARNING) << "failed to parse TxnLabelPB";
2320
1
        return -1;
2321
1
    }
2322
2
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2323
2
    auto label = std::get<std::string>(std::get<0>(out[4]));
2324
    // txn_id -> txn_label
2325
6
    for (const auto& txn_id : txn_label_pb.txn_ids()) {
2326
6
        txn_info_.insert({txn_id, label});
2327
6
    }
2328
2
    std::string txn_info_key_begin = txn_info_key({instance_id_, db_id, 0});
2329
2
    std::string txn_info_key_end = txn_info_key({instance_id_, db_id, INT64_MAX});
2330
2
    return scan_and_handle_kv(txn_info_key_begin, txn_info_key_end,
2331
6
                              [&](std::string_view k, std::string_view v) -> int {
2332
6
                                  return handle_check_txn_label_key(k, v);
2333
6
                              });
2334
3
}
2335
2336
6
int InstanceChecker::check_txn_label_key(std::string_view key, std::string_view value) {
2337
6
    TxnInfoPB txn_info_pb;
2338
6
    std::string_view k1 = key;
2339
6
    k1.remove_prefix(1);
2340
6
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2341
6
    decode_key(&k1, &out);
2342
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2343
6
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2344
1
        LOG(WARNING) << "failed to parse TxnInfoPB";
2345
1
        return -1;
2346
1
    }
2347
5
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2348
5
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2349
5
    auto label = txn_info_pb.label();
2350
5
    std::string txn_label = txn_label_key({instance_id_, db_id, label});
2351
5
    std::string txn_label_val;
2352
5
    TxnLabelPB txn_label_pb;
2353
5
    std::unique_ptr<Transaction> txn;
2354
5
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2355
5
    if (err != TxnErrorCode::TXN_OK) {
2356
0
        LOG(WARNING) << "failed to init txn";
2357
0
        return -1;
2358
0
    }
2359
5
    if (txn->get(txn_label, &txn_label_val) != TxnErrorCode::TXN_OK) {
2360
1
        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_label);
2361
1
        return -1;
2362
1
    }
2363
4
    txn_label_pb.ParseFromString(txn_label_val);
2364
4
    auto txn_ids = txn_label_pb.txn_ids();
2365
4
    if (!std::count(txn_ids.begin(), txn_ids.end(), txn_id)) {
2366
        // clang-format off txn_info_pb
2367
1
        LOG(WARNING) << "txn_info_pb's txn_id not found in txn_label_pb info,"
2368
1
                     << " txn_id: " << txn_id
2369
1
                     << " txn_label_pb meta: " << txn_label_pb.ShortDebugString();
2370
        // clang-format on
2371
1
        return 1;
2372
1
    }
2373
3
    return 0;
2374
4
}
2375
2376
4
int InstanceChecker::check_txn_index_key(std::string_view key, std::string_view value) {
2377
4
    TxnInfoPB txn_info_pb;
2378
4
    std::string_view k1 = key;
2379
4
    k1.remove_prefix(1);
2380
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2381
4
    decode_key(&k1, &out);
2382
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2383
4
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2384
1
        LOG(WARNING) << "failed to parse TxnInfoPB";
2385
1
        return -1;
2386
1
    }
2387
3
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2388
3
    auto db_id = std::get<int64_t>(std::get<0>(out[3]));
2389
    /// get tablet id
2390
3
    std::string txn_index = txn_index_key({instance_id_, txn_id});
2391
3
    std::string txn_index_val;
2392
3
    TxnIndexPB txn_index_pb;
2393
3
    std::unique_ptr<Transaction> txn;
2394
3
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2395
3
    if (err != TxnErrorCode::TXN_OK) {
2396
0
        LOG(WARNING) << "failed to init txn";
2397
0
        return -1;
2398
0
    }
2399
3
    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
2400
1
        LOG(WARNING) << "failed to get txn label key, key=" << hex(txn_index);
2401
1
        return -1;
2402
1
    }
2403
2
    txn_index_pb.ParseFromString(txn_index_val);
2404
2
    if (txn_index_pb.tablet_index().db_id() != db_id) {
2405
        // clang-format off txn_info_pb
2406
1
        LOG(WARNING) << "txn_index_pb's db_id not same with txn_info_pb's db_id,"
2407
1
                     << " txn_index_pb meta: " << txn_index_pb.ShortDebugString()
2408
1
                     << " txn_info_pb meta: " << txn_info_pb.ShortDebugString();
2409
        // clang-format on
2410
1
        return 1;
2411
1
    }
2412
1
    return 0;
2413
2
}
2414
2415
3
int InstanceChecker::check_txn_running_key(std::string_view key, std::string_view value) {
2416
3
    TxnRunningPB txn_running_pb;
2417
3
    int64_t current_time =
2418
3
            duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
2419
3
    if (!txn_running_pb.ParseFromArray(value.data(), value.size())) {
2420
1
        LOG(WARNING) << "failed to parse TxnRunningPB";
2421
1
        return -1;
2422
1
    }
2423
2
    if (txn_running_pb.timeout_time() <= current_time) {
2424
1
        LOG(WARNING) << "txn_running_pb.timeout_time() is less than current_time,"
2425
1
                     << " but txn_running_key exists, "
2426
1
                     << " txn_running_pb meta: " << txn_running_pb.ShortDebugString();
2427
1
        return 1;
2428
1
    }
2429
1
    return 0;
2430
2
}
2431
2432
0
int InstanceChecker::do_txn_key_check() {
2433
0
    int ret = 0;
2434
2435
    // check txn info key depend on txn label key
2436
0
    std::string begin = txn_label_key({instance_id_, 0, ""});
2437
0
    std::string end = txn_label_key({instance_id_, INT64_MAX, ""});
2438
0
    int64_t num_scanned = 0;
2439
0
    int64_t num_abnormal = 0;
2440
0
    LOG(INFO) << "begin check txn_label_key and txn_info_key";
2441
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2442
0
        num_scanned++;
2443
0
        int ret = check_txn_info_key(k, v);
2444
0
        if (ret == 1) {
2445
0
            num_abnormal++;
2446
0
        }
2447
0
        return ret;
2448
0
    });
2449
2450
0
    if (ret == 1) {
2451
0
        LOG(WARNING) << "failed to check txn_info_key depending on txn_label_key, num_scanned="
2452
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2453
0
        return 1;
2454
0
    } else if (ret == -1) {
2455
0
        LOG(WARNING) << "failed to check txn label key and txn info key";
2456
0
        return -1;
2457
0
    }
2458
2459
    // check txn label key depend on txn info key
2460
0
    begin = txn_info_key({instance_id_, 0, 0});
2461
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2462
0
    num_scanned = 0;
2463
0
    num_abnormal = 0;
2464
0
    LOG(INFO) << "begin check txn_label_key and txn_info_key";
2465
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2466
0
        num_scanned++;
2467
0
        int ret = check_txn_label_key(k, v);
2468
0
        if (ret == 1) {
2469
0
            num_abnormal++;
2470
0
        }
2471
0
        return ret;
2472
0
    });
2473
0
    if (ret == 1) {
2474
0
        LOG(WARNING) << "failed to check txn_label_key depending on txn_info_key, num_scanned="
2475
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2476
0
        return 1;
2477
0
    } else if (ret == -1) {
2478
0
        LOG(WARNING) << "failed to inverted check txn label key and txn info key";
2479
0
        return -1;
2480
0
    }
2481
0
    LOG(INFO) << "finish check txn_label_key and txn_info_key, num_scanned=" << num_scanned
2482
0
              << ", num_abnormal=" << num_abnormal;
2483
2484
    // check txn index key depend on txn info key
2485
0
    begin = txn_info_key({instance_id_, 0, 0});
2486
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2487
0
    num_scanned = 0;
2488
0
    num_abnormal = 0;
2489
0
    LOG(INFO) << "begin check txn_index_key and txn_info_key";
2490
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2491
0
        num_scanned++;
2492
0
        int ret = check_txn_index_key(k, v);
2493
0
        if (ret == 1) {
2494
0
            num_abnormal++;
2495
0
        }
2496
0
        return ret;
2497
0
    });
2498
0
    if (ret == 1) {
2499
0
        LOG(WARNING) << "failed to check txn_idx_key depending on txn_info_key, num_scanned="
2500
0
                     << num_scanned << ", num_abnormal=" << num_abnormal;
2501
0
        return 1;
2502
0
    } else if (ret == -1) {
2503
0
        LOG(WARNING) << "failed to check txn index key";
2504
0
        return -1;
2505
0
    }
2506
0
    LOG(INFO) << "finish check txn_index_key and txn_info_key, num_scanned=" << num_scanned
2507
0
              << ", num_abnormal=" << num_abnormal;
2508
2509
    // check txn running key
2510
0
    begin = txn_running_key({instance_id_, 0, 0});
2511
0
    end = txn_running_key({instance_id_, INT64_MAX, 0});
2512
0
    num_scanned = 0;
2513
0
    num_abnormal = 0;
2514
0
    LOG(INFO) << "begin check txn_running_key";
2515
0
    ret = scan_and_handle_kv(begin, end, [&, this](std::string_view k, std::string_view v) -> int {
2516
0
        num_scanned++;
2517
0
        int ret = check_txn_running_key(k, v);
2518
0
        if (ret == 1) {
2519
0
            num_abnormal++;
2520
0
        }
2521
0
        return ret;
2522
0
    });
2523
0
    if (ret == 1) {
2524
0
        LOG(WARNING) << "failed to check txn_running_key, num_scanned=" << num_scanned
2525
0
                     << ", num_abnormal=" << num_abnormal;
2526
0
        return 1;
2527
0
    } else if (ret == -1) {
2528
0
        LOG(WARNING) << "failed to check txn running key";
2529
0
        return -1;
2530
0
    }
2531
0
    LOG(INFO) << "finish check txn_running_key, num_scanned=" << num_scanned
2532
0
              << ", num_abnormal=" << num_abnormal;
2533
0
    return 0;
2534
0
}
2535
2536
2
int InstanceChecker::check_meta_tmp_rowset_key(std::string_view key, std::string_view value) {
2537
2
    TxnInfoPB txn_info_pb;
2538
2
    std::string_view k1 = key;
2539
2
    k1.remove_prefix(1);
2540
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
2541
2
    decode_key(&k1, &out);
2542
    // 0x01 "txn" ${instance_id} "txn_info" ${db_id} ${txn_id}
2543
2
    if (!txn_info_pb.ParseFromArray(value.data(), value.size())) {
2544
0
        LOG(WARNING) << "failed to parse TxnInfoPB";
2545
0
        return -1;
2546
0
    }
2547
    /// get tablet id
2548
2
    auto txn_id = std::get<int64_t>(std::get<0>(out[4]));
2549
2
    std::string txn_index = txn_index_key({instance_id_, txn_id});
2550
2
    std::string txn_index_val;
2551
2
    TxnIndexPB txn_index_pb;
2552
2
    std::unique_ptr<Transaction> txn;
2553
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2554
2
    if (err != TxnErrorCode::TXN_OK) {
2555
0
        LOG(WARNING) << "failed to init txn";
2556
0
        return -1;
2557
0
    }
2558
2
    if (txn->get(txn_index, &txn_index_val) != TxnErrorCode::TXN_OK) {
2559
0
        LOG(WARNING) << "failed to get txn index key, key=" << txn_index;
2560
0
        return -1;
2561
0
    }
2562
2
    txn_index_pb.ParseFromString(txn_index_val);
2563
2
    auto tablet_id = txn_index_pb.tablet_index().tablet_id();
2564
2
    std::string meta_tmp_rowset_key = meta_rowset_tmp_key({instance_id_, txn_id, tablet_id});
2565
2
    int is_key_exist = key_exist(txn_kv_.get(), meta_tmp_rowset_key);
2566
2
    if (is_key_exist == 1) {
2567
0
        if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_VISIBLE) {
2568
            // clang-format off
2569
0
            LOG(INFO) << "meta tmp rowset key not exist but txn status != TXN_STATUS_VISIBLE"
2570
0
                        << "meta tmp rowset key=" << meta_tmp_rowset_key
2571
0
                        << "txn_info=" << txn_info_pb.ShortDebugString();
2572
            // clang-format on
2573
0
            return 1;
2574
0
        }
2575
2
    } else if (is_key_exist == 0) {
2576
2
        if (txn_info_pb.status() != TxnStatusPB::TXN_STATUS_PREPARED) {
2577
            // clang-format off
2578
1
            LOG(INFO) << "meta tmp rowset key exist but txn status != TXN_STATUS_PREPARED"
2579
1
                        << "meta tmp rowset key=" << meta_tmp_rowset_key
2580
1
                        << "txn_info=" << txn_info_pb.ShortDebugString();
2581
            // clang-format on
2582
1
            return 1;
2583
1
        }
2584
2
    } else {
2585
0
        LOG(WARNING) << "failed to get key, key=" << meta_tmp_rowset_key;
2586
0
        return -1;
2587
0
    }
2588
1
    return 0;
2589
2
}
2590
2591
2
int InstanceChecker::check_meta_rowset_key(std::string_view key, std::string_view value) {
2592
2
    RowsetMetaCloudPB meta_rowset_pb;
2593
2
    if (!meta_rowset_pb.ParseFromArray(value.data(), value.size())) {
2594
0
        LOG(WARNING) << "failed to parse RowsetMetaCloudPB";
2595
0
        return -1;
2596
0
    }
2597
2
    std::string tablet_index_key = meta_tablet_idx_key({instance_id_, meta_rowset_pb.tablet_id()});
2598
2
    if (key_exist(txn_kv_.get(), tablet_index_key) == 1) {
2599
1
        LOG(WARNING) << "rowset's tablet id not found in fdb"
2600
1
                     << "tablet_index_key: " << tablet_index_key
2601
1
                     << "rowset meta: " << meta_rowset_pb.ShortDebugString();
2602
1
        return 1;
2603
1
    }
2604
1
    return 0;
2605
2
}
2606
2607
0
int InstanceChecker::do_meta_rowset_key_check() {
2608
0
    int ret = 0;
2609
2610
0
    std::string begin = meta_rowset_key({instance_id_, 0, 0});
2611
0
    std::string end = meta_rowset_key({instance_id_, INT64_MAX, 0});
2612
0
    int64_t num_scanned = 0;
2613
0
    int64_t num_loss = 0;
2614
2615
0
    ret = scan_and_handle_kv(begin, end, [&](std::string_view k, std::string_view v) {
2616
0
        num_scanned++;
2617
0
        int ret = check_meta_rowset_key(k, v);
2618
0
        if (ret == 1) {
2619
0
            num_loss++;
2620
0
        }
2621
0
        return ret;
2622
0
    });
2623
0
    if (ret == -1) {
2624
0
        LOG(WARNING) << "failed to check meta rowset key,";
2625
0
        return -1;
2626
0
    } else if (ret == 1) {
2627
0
        LOG(WARNING) << "meta rowset key may be loss, num_scanned=" << num_scanned
2628
0
                     << ", num_loss=" << num_loss;
2629
0
    }
2630
0
    LOG(INFO) << "meta rowset key check finish, num_scanned=" << num_scanned
2631
0
              << ", num_loss=" << num_loss;
2632
2633
0
    begin = txn_info_key({instance_id_, 0, 0});
2634
0
    end = txn_info_key({instance_id_, INT64_MAX, 0});
2635
0
    num_scanned = 0;
2636
0
    num_loss = 0;
2637
2638
0
    ret = scan_and_handle_kv(begin, end, [&](std::string_view k, std::string_view v) {
2639
0
        num_scanned++;
2640
0
        int ret = check_meta_tmp_rowset_key(k, v);
2641
0
        if (ret == 1) {
2642
0
            num_loss++;
2643
0
        }
2644
0
        return ret;
2645
0
    });
2646
0
    if (ret == -1) {
2647
0
        LOG(WARNING) << "failed to check tmp meta rowset key";
2648
0
        return -1;
2649
0
    } else if (ret == 1) {
2650
0
        LOG(WARNING) << "meta tmp rowset key may be loss, num_scanned=" << num_scanned
2651
0
                     << ", num_loss=" << num_loss;
2652
0
    }
2653
0
    LOG(INFO) << "meta tmp rowset key check finish, num_scanned=" << num_scanned
2654
0
              << ", num_loss=" << num_loss;
2655
2656
0
    return ret;
2657
0
}
2658
2659
0
StorageVaultAccessor* InstanceChecker::get_accessor(const std::string& id) {
2660
0
    auto it = accessor_map_.find(id);
2661
0
    if (it == accessor_map_.end()) {
2662
0
        return nullptr;
2663
0
    }
2664
0
    return it->second.get();
2665
0
}
2666
2667
0
void InstanceChecker::get_all_accessor(std::vector<StorageVaultAccessor*>* accessors) {
2668
0
    for (const auto& [_, accessor] : accessor_map_) {
2669
0
        accessors->push_back(accessor.get());
2670
0
    }
2671
0
}
2672
} // namespace doris::cloud