Coverage Report

Created: 2025-09-10 20:18

/root/doris/cloud/src/recycler/checker.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/checker.h"
19
20
#include <aws/s3/S3Client.h>
21
#include <aws/s3/model/ListObjectsV2Request.h>
22
#include <butil/endpoint.h>
23
#include <butil/strings/string_split.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/cloud.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <chrono>
31
#include <cstdint>
32
#include <memory>
33
#include <mutex>
34
#include <numeric>
35
#include <sstream>
36
#include <string_view>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "common/bvars.h"
41
#include "common/config.h"
42
#include "common/defer.h"
43
#include "common/encryption_util.h"
44
#include "common/logging.h"
45
#include "common/util.h"
46
#include "cpp/sync_point.h"
47
#include "meta-service/meta_service.h"
48
#include "meta-service/meta_service_schema.h"
49
#include "meta-service/meta_service_tablet_stats.h"
50
#include "meta-store/blob_message.h"
51
#include "meta-store/keys.h"
52
#include "meta-store/txn_kv.h"
53
#include "meta-store/txn_kv_error.h"
54
#include "recycler/hdfs_accessor.h"
55
#include "recycler/s3_accessor.h"
56
#include "recycler/storage_vault_accessor.h"
57
#ifdef UNIT_TEST
58
#include "../test/mock_accessor.h"
59
#endif
60
#include "recycler/util.h"
61
62
namespace doris::cloud {
63
namespace config {
64
extern int32_t brpc_listen_port;
65
extern int32_t scan_instances_interval_seconds;
66
extern int32_t recycle_job_lease_expired_ms;
67
extern int32_t recycle_concurrency;
68
extern std::vector<std::string> recycle_whitelist;
69
extern std::vector<std::string> recycle_blacklist;
70
extern bool enable_inverted_check;
71
} // namespace config
72
73
5
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
74
5
    ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port);
75
5
}
76
77
5
Checker::~Checker() {
78
5
    if (!stopped()) {
79
1
        stop();
80
1
    }
81
5
}
82
83
4
int Checker::start() {
84
4
    DCHECK(txn_kv_);
85
4
    instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
86
87
    // launch instance scanner
88
4
    auto scanner_func = [this]() {
89
4
        std::this_thread::sleep_for(
90
4
                std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
91
8
        while (!stopped()) {
92
4
            std::vector<InstanceInfoPB> instances;
93
4
            get_all_instances(txn_kv_.get(), instances);
94
4
            LOG(INFO) << "Checker get instances: " << [&instances] {
95
4
                std::stringstream ss;
96
30
                for (auto& i : instances) ss << ' ' << i.instance_id();
97
4
                return ss.str();
98
4
            }();
99
4
            if (!instances.empty()) {
100
                // enqueue instances
101
3
                std::lock_guard lock(mtx_);
102
30
                for (auto& instance : instances) {
103
30
                    if (instance_filter_.filter_out(instance.instance_id())) continue;
104
30
                    if (instance.status() == InstanceInfoPB::DELETED) continue;
105
30
                    using namespace std::chrono;
106
30
                    auto enqueue_time_s =
107
30
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
108
30
                    auto [_, success] =
109
30
                            pending_instance_map_.insert({instance.instance_id(), enqueue_time_s});
110
                    // skip instance already in pending queue
111
30
                    if (success) {
112
30
                        pending_instance_queue_.push_back(std::move(instance));
113
30
                    }
114
30
                }
115
3
                pending_instance_cond_.notify_all();
116
3
            }
117
4
            {
118
4
                std::unique_lock lock(mtx_);
119
4
                notifier_.wait_for(lock,
120
4
                                   std::chrono::seconds(config::scan_instances_interval_seconds),
121
7
                                   [&]() { return stopped(); });
122
4
            }
123
4
        }
124
4
    };
125
4
    workers_.emplace_back(scanner_func);
126
    // Launch lease thread
127
4
    workers_.emplace_back([this] { lease_check_jobs(); });
128
    // Launch inspect thread
129
4
    workers_.emplace_back([this] { inspect_instance_check_interval(); });
130
131
    // launch check workers
132
8
    auto checker_func = [this]() {
133
38
        while (!stopped()) {
134
            // fetch instance to check
135
36
            InstanceInfoPB instance;
136
36
            long enqueue_time_s = 0;
137
36
            {
138
36
                std::unique_lock lock(mtx_);
139
48
                pending_instance_cond_.wait(lock, [&]() -> bool {
140
48
                    return !pending_instance_queue_.empty() || stopped();
141
48
                });
142
36
                if (stopped()) {
143
6
                    return;
144
6
                }
145
30
                instance = std::move(pending_instance_queue_.front());
146
30
                pending_instance_queue_.pop_front();
147
30
                enqueue_time_s = pending_instance_map_[instance.instance_id()];
148
30
                pending_instance_map_.erase(instance.instance_id());
149
30
            }
150
0
            const auto& instance_id = instance.instance_id();
151
30
            {
152
30
                std::lock_guard lock(mtx_);
153
                // skip instance in recycling
154
30
                if (working_instance_map_.count(instance_id)) continue;
155
30
            }
156
30
            auto checker = std::make_shared<InstanceChecker>(txn_kv_, instance.instance_id());
157
30
            if (checker->init(instance) != 0) {
158
0
                LOG(WARNING) << "failed to init instance checker, instance_id="
159
0
                             << instance.instance_id();
160
0
                continue;
161
0
            }
162
30
            std::string check_job_key;
163
30
            job_check_key({instance.instance_id()}, &check_job_key);
164
30
            int ret = prepare_instance_recycle_job(txn_kv_.get(), check_job_key,
165
30
                                                   instance.instance_id(), ip_port_,
166
30
                                                   config::check_object_interval_seconds * 1000);
167
30
            if (ret != 0) { // Prepare failed
168
20
                continue;
169
20
            } else {
170
10
                std::lock_guard lock(mtx_);
171
10
                working_instance_map_.emplace(instance_id, checker);
172
10
            }
173
10
            if (stopped()) return;
174
10
            using namespace std::chrono;
175
10
            auto ctime_ms =
176
10
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
177
10
            g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
178
179
10
            bool success {true};
180
181
10
            if (int ret = checker->do_check(); ret != 0) {
182
0
                success = false;
183
0
            }
184
185
10
            if (config::enable_inverted_check) {
186
0
                if (int ret = checker->do_inverted_check(); ret != 0) {
187
0
                    success = false;
188
0
                }
189
0
            }
190
191
10
            if (config::enable_delete_bitmap_inverted_check) {
192
0
                if (int ret = checker->do_delete_bitmap_inverted_check(); ret != 0) {
193
0
                    success = false;
194
0
                }
195
0
            }
196
197
10
            if (config::enable_mow_job_key_check) {
198
0
                if (int ret = checker->do_mow_job_key_check(); ret != 0) {
199
0
                    success = false;
200
0
                }
201
0
            }
202
203
10
            if (config::enable_tablet_stats_key_check) {
204
0
                if (int ret = checker->do_tablet_stats_key_check(); ret != 0) {
205
0
                    success = false;
206
0
                }
207
0
            }
208
209
10
            if (config::enable_restore_job_check) {
210
0
                if (int ret = checker->do_restore_job_check(); ret != 0) {
211
0
                    success = false;
212
0
                }
213
0
            }
214
215
10
            if (config::enable_delete_bitmap_storage_optimize_v2_check) {
216
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
217
0
                    ret != 0) {
218
0
                    success = false;
219
0
                }
220
0
            }
221
222
            // If instance checker has been aborted, don't finish this job
223
10
            if (!checker->stopped()) {
224
10
                finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
225
10
                                            ip_port_, success, ctime_ms);
226
10
            }
227
10
            {
228
10
                std::lock_guard lock(mtx_);
229
10
                working_instance_map_.erase(instance.instance_id());
230
10
            }
231
10
        }
232
8
    };
233
4
    int num_threads = config::recycle_concurrency; // FIXME: use a new config entry?
234
12
    for (int i = 0; i < num_threads; ++i) {
235
8
        workers_.emplace_back(checker_func);
236
8
    }
237
4
    return 0;
238
4
}
239
240
5
void Checker::stop() {
241
5
    stopped_ = true;
242
5
    notifier_.notify_all();
243
5
    pending_instance_cond_.notify_all();
244
5
    {
245
5
        std::lock_guard lock(mtx_);
246
5
        for (auto& [_, checker] : working_instance_map_) {
247
0
            checker->stop();
248
0
        }
249
5
    }
250
20
    for (auto& w : workers_) {
251
20
        if (w.joinable()) w.join();
252
20
    }
253
5
}
254
255
4
void Checker::lease_check_jobs() {
256
55
    while (!stopped()) {
257
51
        std::vector<std::string> instances;
258
51
        instances.reserve(working_instance_map_.size());
259
51
        {
260
51
            std::lock_guard lock(mtx_);
261
51
            for (auto& [id, _] : working_instance_map_) {
262
30
                instances.push_back(id);
263
30
            }
264
51
        }
265
51
        for (auto& i : instances) {
266
30
            std::string check_job_key;
267
30
            job_check_key({i}, &check_job_key);
268
30
            int ret = lease_instance_recycle_job(txn_kv_.get(), check_job_key, i, ip_port_);
269
30
            if (ret == 1) {
270
0
                std::lock_guard lock(mtx_);
271
0
                if (auto it = working_instance_map_.find(i); it != working_instance_map_.end()) {
272
0
                    it->second->stop();
273
0
                }
274
0
            }
275
30
        }
276
51
        {
277
51
            std::unique_lock lock(mtx_);
278
51
            notifier_.wait_for(lock,
279
51
                               std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
280
100
                               [&]() { return stopped(); });
281
51
        }
282
51
    }
283
4
}
284
285
0
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
286
34
void Checker::do_inspect(const InstanceInfoPB& instance) {
287
34
    std::string check_job_key = job_check_key({instance.instance_id()});
288
34
    std::unique_ptr<Transaction> txn;
289
34
    std::string val;
290
34
    TxnErrorCode err = txn_kv_->create_txn(&txn);
291
34
    if (err != TxnErrorCode::TXN_OK) {
292
0
        LOG_CHECK_INTERVAL_ALARM << "failed to create txn";
293
0
        return;
294
0
    }
295
34
    err = txn->get(check_job_key, &val);
296
34
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
297
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get kv, err=" << err
298
0
                                 << " key=" << hex(check_job_key);
299
0
        return;
300
0
    }
301
34
    auto checker = InstanceChecker(txn_kv_, instance.instance_id());
302
34
    if (checker.init(instance) != 0) {
303
0
        LOG_CHECK_INTERVAL_ALARM << "failed to init instance checker, instance_id="
304
0
                                 << instance.instance_id();
305
0
        return;
306
0
    }
307
308
34
    int64_t bucket_lifecycle_days = 0;
309
34
    if (checker.get_bucket_lifecycle(&bucket_lifecycle_days) != 0) {
310
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get bucket lifecycle, instance_id="
311
0
                                 << instance.instance_id();
312
0
        return;
313
0
    }
314
34
    DCHECK(bucket_lifecycle_days > 0);
315
316
34
    if (bucket_lifecycle_days == INT64_MAX) {
317
        // No s3 bucket (may all accessors are HdfsAccessor), skip inspect
318
34
        return;
319
34
    }
320
321
0
    int64_t last_ctime_ms = -1;
322
0
    auto job_status = JobRecyclePB::IDLE;
323
0
    auto has_last_ctime = [&]() {
324
0
        JobRecyclePB job_info;
325
0
        if (!job_info.ParseFromString(val)) {
326
0
            LOG_CHECK_INTERVAL_ALARM << "failed to parse JobRecyclePB, key=" << hex(check_job_key);
327
0
        }
328
0
        DCHECK(job_info.instance_id() == instance.instance_id());
329
0
        if (!job_info.has_last_ctime_ms()) return false;
330
0
        last_ctime_ms = job_info.last_ctime_ms();
331
0
        job_status = job_info.status();
332
0
        g_bvar_checker_last_success_time_ms.put(instance.instance_id(),
333
0
                                                job_info.last_success_time_ms());
334
0
        return true;
335
0
    };
336
0
    using namespace std::chrono;
337
0
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
338
0
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || !has_last_ctime()) {
339
        // Use instance's ctime for instances that do not have job's last ctime
340
0
        last_ctime_ms = instance.ctime();
341
0
    }
342
0
    DCHECK(now - last_ctime_ms >= 0);
343
0
    int64_t expiration_ms =
344
0
            bucket_lifecycle_days > config::reserved_buffer_days
345
0
                    ? (bucket_lifecycle_days - config::reserved_buffer_days) * 86400000
346
0
                    : bucket_lifecycle_days * 86400000;
347
0
    TEST_SYNC_POINT_CALLBACK("Checker:do_inspect", &last_ctime_ms);
348
0
    if (now - last_ctime_ms >= expiration_ms) {
349
0
        LOG_CHECK_INTERVAL_ALARM << "check risks, instance_id: " << instance.instance_id()
350
0
                                 << " last_ctime_ms: " << last_ctime_ms
351
0
                                 << " job_status: " << job_status
352
0
                                 << " bucket_lifecycle_days: " << bucket_lifecycle_days
353
0
                                 << " reserved_buffer_days: " << config::reserved_buffer_days
354
0
                                 << " expiration_ms: " << expiration_ms;
355
0
    }
356
0
}
357
#undef LOG_CHECK_INTERVAL_ALARM
358
4
void Checker::inspect_instance_check_interval() {
359
7
    while (!stopped()) {
360
3
        LOG(INFO) << "start to inspect instance check interval";
361
3
        std::vector<InstanceInfoPB> instances;
362
3
        get_all_instances(txn_kv_.get(), instances);
363
30
        for (const auto& instance : instances) {
364
30
            if (instance_filter_.filter_out(instance.instance_id())) continue;
365
30
            if (stopped()) return;
366
30
            if (instance.status() == InstanceInfoPB::DELETED) continue;
367
30
            do_inspect(instance);
368
30
        }
369
3
        {
370
3
            std::unique_lock lock(mtx_);
371
3
            notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
372
6
                               [&]() { return stopped(); });
373
3
        }
374
3
    }
375
4
}
376
377
// return 0 for success get a key, 1 for key not found, negative for error
378
19
int key_exist(TxnKv* txn_kv, std::string_view key) {
379
19
    std::unique_ptr<Transaction> txn;
380
19
    TxnErrorCode err = txn_kv->create_txn(&txn);
381
19
    if (err != TxnErrorCode::TXN_OK) {
382
0
        LOG(WARNING) << "failed to init txn, err=" << err;
383
0
        return -1;
384
0
    }
385
19
    std::string val;
386
19
    switch (txn->get(key, &val)) {
387
17
    case TxnErrorCode::TXN_OK:
388
17
        return 0;
389
2
    case TxnErrorCode::TXN_KEY_NOT_FOUND:
390
2
        return 1;
391
0
    default:
392
0
        return -1;
393
19
    }
394
19
}
395
396
InstanceChecker::InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id)
397
86
        : txn_kv_(std::move(txn_kv)), instance_id_(instance_id) {}
398
399
86
int InstanceChecker::init(const InstanceInfoPB& instance) {
400
86
    int ret = init_obj_store_accessors(instance);
401
86
    if (ret != 0) {
402
0
        return ret;
403
0
    }
404
405
86
    return init_storage_vault_accessors(instance);
406
86
}
407
408
86
int InstanceChecker::init_obj_store_accessors(const InstanceInfoPB& instance) {
409
86
    for (const auto& obj_info : instance.obj_info()) {
410
86
#ifdef UNIT_TEST
411
86
        auto accessor = std::make_shared<MockAccessor>();
412
#else
413
        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
414
        if (!s3_conf) {
415
            LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
416
            return -1;
417
        }
418
419
        std::shared_ptr<S3Accessor> accessor;
420
        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
421
        if (ret != 0) {
422
            LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
423
                         << " resource_id=" << obj_info.id();
424
            return ret;
425
        }
426
#endif
427
428
86
        accessor_map_.emplace(obj_info.id(), std::move(accessor));
429
86
    }
430
431
86
    return 0;
432
86
}
433
434
86
int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance) {
435
86
    if (instance.resource_ids().empty()) {
436
86
        return 0;
437
86
    }
438
439
0
    FullRangeGetOptions opts(txn_kv_);
440
0
    opts.prefetch = true;
441
0
    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
442
0
                                      storage_vault_key({instance_id_, "\xff"}), std::move(opts));
443
444
0
    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
445
0
        auto [k, v] = *kv;
446
0
        StorageVaultPB vault;
447
0
        if (!vault.ParseFromArray(v.data(), v.size())) {
448
0
            LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
449
0
            return -1;
450
0
        }
451
0
        TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
452
0
                                 &accessor_map_, &vault);
453
0
        if (vault.has_hdfs_info()) {
454
0
            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
455
0
            int ret = accessor->init();
456
0
            if (ret != 0) {
457
0
                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
458
0
                             << " resource_id=" << vault.id() << " name=" << vault.name();
459
0
                return ret;
460
0
            }
461
462
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
463
0
        } else if (vault.has_obj_info()) {
464
0
#ifdef UNIT_TEST
465
0
            auto accessor = std::make_shared<MockAccessor>();
466
#else
467
            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
468
            if (!s3_conf) {
469
                LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
470
                return -1;
471
            }
472
473
            std::shared_ptr<S3Accessor> accessor;
474
            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
475
            if (ret != 0) {
476
                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
477
                             << " resource_id=" << vault.id() << " name=" << vault.name();
478
                return ret;
479
            }
480
#endif
481
482
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
483
0
        }
484
0
    }
485
486
0
    if (!it->is_valid()) {
487
0
        LOG_WARNING("failed to get storage vault kv");
488
0
        return -1;
489
0
    }
490
0
    return 0;
491
0
}
492
493
16
int InstanceChecker::do_check() {
494
16
    TEST_SYNC_POINT("InstanceChecker.do_check");
495
16
    LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
496
16
    int check_ret = 0;
497
16
    long num_scanned = 0;
498
16
    long num_scanned_with_segment = 0;
499
16
    long num_rowset_loss = 0;
500
16
    long instance_volume = 0;
501
16
    using namespace std::chrono;
502
16
    auto start_time = steady_clock::now();
503
16
    DORIS_CLOUD_DEFER {
504
16
        auto cost = duration<float>(steady_clock::now() - start_time).count();
505
16
        LOG(INFO) << "check instance objects finished, cost=" << cost
506
16
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
507
16
                  << " num_scanned_with_segment=" << num_scanned_with_segment
508
16
                  << " num_rowset_loss=" << num_rowset_loss
509
16
                  << " instance_volume=" << instance_volume;
510
16
        g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
511
16
        g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
512
16
        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
513
16
        g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
514
        // FIXME(plat1ko): What if some list operation failed?
515
16
        g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
516
16
    };
517
518
16
    struct TabletFiles {
519
16
        int64_t tablet_id {0};
520
16
        std::unordered_set<std::string> files;
521
16
    };
522
16
    TabletFiles tablet_files_cache;
523
524
4.05k
    auto check_rowset_objects = [&, this](doris::RowsetMetaCloudPB& rs_meta, std::string_view key) {
525
4.05k
        if (rs_meta.num_segments() == 0) {
526
0
            return;
527
0
        }
528
529
4.05k
        bool data_loss = false;
530
4.05k
        bool segment_file_loss = false;
531
4.05k
        bool index_file_loss = false;
532
533
4.05k
        DORIS_CLOUD_DEFER {
534
4.05k
            if (data_loss) {
535
30
                LOG(INFO) << "segment file is" << (segment_file_loss ? "" : " not") << " loss, "
536
30
                          << "index file is" << (index_file_loss ? "" : " not") << " loss, "
537
30
                          << "rowset.tablet_id = " << rs_meta.tablet_id();
538
30
                num_rowset_loss++;
539
30
            }
540
4.05k
        };
541
542
4.05k
        ++num_scanned_with_segment;
543
4.05k
        if (tablet_files_cache.tablet_id != rs_meta.tablet_id()) {
544
454
            long tablet_volume = 0;
545
            // Clear cache
546
454
            tablet_files_cache.tablet_id = 0;
547
454
            tablet_files_cache.files.clear();
548
            // Get all file paths under this tablet directory
549
454
            auto find_it = accessor_map_.find(rs_meta.resource_id());
550
454
            if (find_it == accessor_map_.end()) {
551
0
                LOG_WARNING("resource id not found in accessor map")
552
0
                        .tag("resource_id", rs_meta.resource_id())
553
0
                        .tag("tablet_id", rs_meta.tablet_id())
554
0
                        .tag("rowset_id", rs_meta.rowset_id_v2());
555
0
                check_ret = -1;
556
0
                return;
557
0
            }
558
559
454
            std::unique_ptr<ListIterator> list_iter;
560
454
            int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
561
454
                                                      &list_iter);
562
454
            if (ret != 0) { // No need to log, because S3Accessor has logged this error
563
0
                check_ret = -1;
564
0
                return;
565
0
            }
566
567
18.5k
            for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
568
18.0k
                tablet_files_cache.files.insert(std::move(file->path));
569
18.0k
                tablet_volume += file->size;
570
18.0k
            }
571
454
            tablet_files_cache.tablet_id = rs_meta.tablet_id();
572
454
            instance_volume += tablet_volume;
573
454
        }
574
575
16.1k
        for (int i = 0; i < rs_meta.num_segments(); ++i) {
576
12.0k
            auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
577
578
12.0k
            if (tablet_files_cache.files.contains(path)) {
579
12.0k
                continue;
580
12.0k
            }
581
582
11
            if (1 == key_exist(txn_kv_.get(), key)) {
583
                // Rowset has been deleted instead of data loss
584
0
                break;
585
0
            }
586
11
            data_loss = true;
587
11
            segment_file_loss = true;
588
11
            TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
589
11
            LOG(WARNING) << "object not exist, path=" << path
590
11
                         << ", rs_meta=" << rs_meta.ShortDebugString() << " key=" << hex(key);
591
11
        }
592
593
4.05k
        std::unique_ptr<Transaction> txn;
594
4.05k
        TxnErrorCode err = txn_kv_->create_txn(&txn);
595
4.05k
        if (err != TxnErrorCode::TXN_OK) {
596
0
            LOG(WARNING) << "failed to init txn, err=" << err;
597
0
            check_ret = -1;
598
0
            return;
599
0
        }
600
601
4.05k
        TabletIndexPB tablet_index;
602
4.05k
        if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) == -1) {
603
0
            LOG(WARNING) << "failed to get tablet index, tablet_id= " << rs_meta.tablet_id();
604
0
            check_ret = -1;
605
0
            return;
606
0
        }
607
608
4.05k
        auto tablet_schema_key =
609
4.05k
                meta_schema_key({instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
610
4.05k
        ValueBuf tablet_schema_val;
611
4.05k
        err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
612
613
4.05k
        if (err != TxnErrorCode::TXN_OK) {
614
2.00k
            check_ret = -1;
615
2.00k
            LOG(WARNING) << "failed to get schema, err=" << err;
616
2.00k
            return;
617
2.00k
        }
618
619
2.05k
        auto* schema = rs_meta.mutable_tablet_schema();
620
2.05k
        if (!parse_schema_value(tablet_schema_val, schema)) {
621
0
            LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
622
0
            return;
623
0
        }
624
625
2.05k
        std::vector<std::pair<int64_t, std::string>> index_ids;
626
2.05k
        for (const auto& i : rs_meta.tablet_schema().index()) {
627
2.05k
            if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
628
2.05k
                index_ids.emplace_back(i.index_id(), i.index_suffix_name());
629
2.05k
            }
630
2.05k
        }
631
2.05k
        if (!index_ids.empty()) {
632
8.10k
            for (int i = 0; i < rs_meta.num_segments(); ++i) {
633
6.05k
                std::vector<std::string> index_path_v;
634
6.05k
                if (rs_meta.tablet_schema().inverted_index_storage_format() ==
635
6.05k
                    InvertedIndexStorageFormatPB::V1) {
636
6.01k
                    for (const auto& index_id : index_ids) {
637
6.01k
                        LOG(INFO) << "check inverted index, tablet_id=" << rs_meta.tablet_id()
638
6.01k
                                  << " rowset_id=" << rs_meta.rowset_id_v2() << " segment_id=" << i
639
6.01k
                                  << " index_id=" << index_id.first
640
6.01k
                                  << " index_suffix_name=" << index_id.second;
641
6.01k
                        index_path_v.emplace_back(
642
6.01k
                                inverted_index_path_v1(rs_meta.tablet_id(), rs_meta.rowset_id_v2(),
643
6.01k
                                                       i, index_id.first, index_id.second));
644
6.01k
                    }
645
6.01k
                } else {
646
40
                    index_path_v.emplace_back(
647
40
                            inverted_index_path_v2(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i));
648
40
                }
649
650
6.05k
                if (std::ranges::all_of(index_path_v, [&](const auto& idx_file_path) {
651
6.05k
                        if (!tablet_files_cache.files.contains(idx_file_path)) {
652
23
                            LOG(INFO) << "loss index file: " << idx_file_path;
653
23
                            return false;
654
23
                        }
655
6.03k
                        return true;
656
6.05k
                    })) {
657
6.03k
                    continue;
658
6.03k
                }
659
23
                index_file_loss = true;
660
23
                data_loss = true;
661
23
            }
662
2.05k
        }
663
2.05k
    };
664
665
    // scan visible rowsets
666
16
    auto start_key = meta_rowset_key({instance_id_, 0, 0});
667
16
    auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
668
669
16
    std::unique_ptr<RangeGetIterator> it;
670
16
    do {
671
16
        std::unique_ptr<Transaction> txn;
672
16
        TxnErrorCode err = txn_kv_->create_txn(&txn);
673
16
        if (err != TxnErrorCode::TXN_OK) {
674
0
            LOG(WARNING) << "failed to init txn, err=" << err;
675
0
            return -1;
676
0
        }
677
678
16
        err = txn->get(start_key, end_key, &it);
679
16
        if (err != TxnErrorCode::TXN_OK) {
680
0
            LOG(WARNING) << "internal error, failed to get rowset meta, err=" << err;
681
0
            return -1;
682
0
        }
683
16
        num_scanned += it->size();
684
685
4.07k
        while (it->has_next() && !stopped()) {
686
4.05k
            auto [k, v] = it->next();
687
4.05k
            if (!it->has_next()) {
688
6
                start_key = k;
689
6
            }
690
691
4.05k
            doris::RowsetMetaCloudPB rs_meta;
692
4.05k
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
693
0
                ++num_rowset_loss;
694
0
                LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
695
0
                continue;
696
0
            }
697
4.05k
            check_rowset_objects(rs_meta, k);
698
4.05k
        }
699
16
        start_key.push_back('\x00'); // Update to next smallest key for iteration
700
16
    } while (it->more() && !stopped());
701
702
16
    return num_rowset_loss > 0 ? 1 : check_ret;
703
16
}
704
705
34
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
706
    // If there are multiple buckets, return the minimum lifecycle.
707
34
    int64_t min_lifecycle_days = INT64_MAX;
708
34
    int64_t tmp_liefcycle_days = 0;
709
34
    for (const auto& [id, accessor] : accessor_map_) {
710
34
        if (accessor->type() != AccessorType::S3) {
711
34
            continue;
712
34
        }
713
714
0
        auto* s3_accessor = static_cast<S3Accessor*>(accessor.get());
715
716
0
        if (s3_accessor->check_versioning() != 0) {
717
0
            return -1;
718
0
        }
719
720
0
        if (s3_accessor->get_life_cycle(&tmp_liefcycle_days) != 0) {
721
0
            return -1;
722
0
        }
723
724
0
        if (tmp_liefcycle_days < min_lifecycle_days) {
725
0
            min_lifecycle_days = tmp_liefcycle_days;
726
0
        }
727
0
    }
728
34
    *lifecycle_days = min_lifecycle_days;
729
34
    return 0;
730
34
}
731
732
5
int InstanceChecker::do_inverted_check() {
733
5
    if (accessor_map_.size() > 1) {
734
0
        LOG(INFO) << "currently not support inverted check for multi accessor. instance_id="
735
0
                  << instance_id_;
736
0
        return 0;
737
0
    }
738
739
5
    LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
740
5
    int check_ret = 0;
741
5
    long num_scanned = 0;
742
5
    long num_file_leak = 0;
743
5
    using namespace std::chrono;
744
5
    auto start_time = steady_clock::now();
745
5
    DORIS_CLOUD_DEFER {
746
5
        g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
747
5
        g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
748
5
        auto cost = duration<float>(steady_clock::now() - start_time).count();
749
5
        LOG(INFO) << "inverted check instance objects finished, cost=" << cost
750
5
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
751
5
                  << " num_file_leak=" << num_file_leak;
752
5
    };
753
754
5
    struct TabletRowsets {
755
5
        int64_t tablet_id {0};
756
5
        std::unordered_set<std::string> rowset_ids;
757
5
    };
758
5
    TabletRowsets tablet_rowsets_cache;
759
760
5
    RowsetIndexesFormatV1 rowset_index_cache_v1;
761
5
    RowsetIndexesFormatV2 rowset_index_cache_v2;
762
763
    // Return 0 if check success, return 1 if file is garbage data, negative if error occurred
764
108
    auto check_segment_file = [&](const std::string& obj_key) {
765
108
        std::vector<std::string> str;
766
108
        butil::SplitString(obj_key, '/', &str);
767
        // data/{tablet_id}/{rowset_id}_{seg_num}.dat
768
108
        if (str.size() < 3) {
769
            // clang-format off
770
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
771
0
                         << " value = " << str.size();
772
            // clang-format on
773
0
            return -1;
774
0
        }
775
776
108
        int64_t tablet_id = atol(str[1].c_str());
777
108
        if (tablet_id <= 0) {
778
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
779
0
            return -1;
780
0
        }
781
782
108
        if (!str[2].ends_with(".dat")) {
783
            // skip check not segment file
784
54
            return 0;
785
54
        }
786
787
54
        std::string rowset_id;
788
54
        if (auto pos = str.back().find('_'); pos != std::string::npos) {
789
54
            rowset_id = str.back().substr(0, pos);
790
54
        } else {
791
0
            LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
792
0
            return -1;
793
0
        }
794
795
54
        if (tablet_rowsets_cache.tablet_id == tablet_id) {
796
7
            if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
797
2
                return 0;
798
5
            } else {
799
5
                LOG(WARNING) << "rowset not exists, key=" << obj_key;
800
5
                return -1;
801
5
            }
802
7
        }
803
        // Get all rowset id of this tablet
804
47
        tablet_rowsets_cache.tablet_id = tablet_id;
805
47
        tablet_rowsets_cache.rowset_ids.clear();
806
47
        std::unique_ptr<Transaction> txn;
807
47
        TxnErrorCode err = txn_kv_->create_txn(&txn);
808
47
        if (err != TxnErrorCode::TXN_OK) {
809
0
            LOG(WARNING) << "failed to create txn";
810
0
            return -1;
811
0
        }
812
47
        std::unique_ptr<RangeGetIterator> it;
813
47
        auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
814
47
        auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
815
47
        do {
816
47
            TxnErrorCode err = txn->get(begin, end, &it);
817
47
            if (err != TxnErrorCode::TXN_OK) {
818
0
                LOG(WARNING) << "failed to get rowset kv, err=" << err;
819
0
                return -1;
820
0
            }
821
47
            if (!it->has_next()) {
822
10
                break;
823
10
            }
824
37
            while (it->has_next()) {
825
                // recycle corresponding resources
826
37
                auto [k, v] = it->next();
827
37
                doris::RowsetMetaCloudPB rowset;
828
37
                if (!rowset.ParseFromArray(v.data(), v.size())) {
829
0
                    LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
830
0
                    return -1;
831
0
                }
832
37
                tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
833
37
                if (!it->has_next()) {
834
37
                    begin = k;
835
37
                    begin.push_back('\x00'); // Update to next smallest key for iteration
836
37
                    break;
837
37
                }
838
37
            }
839
37
        } while (it->more() && !stopped());
840
841
47
        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
842
            // Garbage data leak
843
12
            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
844
12
            return 1;
845
12
        }
846
847
35
        return 0;
848
47
    };
849
850
108
    auto check_inverted_index_file = [&](const std::string& obj_key) {
851
108
        std::vector<std::string> str;
852
108
        butil::SplitString(obj_key, '/', &str);
853
        // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
854
        // format v2: data/{tablet_id}/{rowset_id}_{seg_num}.idx
855
108
        if (str.size() < 3) {
856
            // clang-format off
857
0
            LOG(WARNING) << "split obj_key error, str.size() should be less than 3,"
858
0
                         << " value = " << str.size();
859
            // clang-format on
860
0
            return -1;
861
0
        }
862
863
108
        int64_t tablet_id = atol(str[1].c_str());
864
108
        if (tablet_id <= 0) {
865
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
866
0
            return -1;
867
0
        }
868
869
        // v1: {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
870
        // v2: {rowset_id}_{seg_num}.idx
871
108
        std::string rowset_info = str.back();
872
873
108
        if (!rowset_info.ends_with(".idx")) {
874
54
            return 0; // Not an index file
875
54
        }
876
877
54
        InvertedIndexStorageFormatPB inverted_index_storage_format =
878
54
                std::count(rowset_info.begin(), rowset_info.end(), '_') > 1
879
54
                        ? InvertedIndexStorageFormatPB::V1
880
54
                        : InvertedIndexStorageFormatPB::V2;
881
882
54
        size_t pos = rowset_info.find_last_of('_');
883
54
        if (pos == std::string::npos || pos + 1 >= str.back().size() - 4) {
884
0
            LOG(WARNING) << "Invalid index_id format, key=" << obj_key;
885
0
            return -1;
886
0
        }
887
54
        if (inverted_index_storage_format == InvertedIndexStorageFormatPB::V1) {
888
14
            return check_inverted_index_file_storage_format_v1(tablet_id, obj_key, rowset_info,
889
14
                                                               rowset_index_cache_v1);
890
40
        } else {
891
40
            return check_inverted_index_file_storage_format_v2(tablet_id, obj_key, rowset_info,
892
40
                                                               rowset_index_cache_v2);
893
40
        }
894
54
    };
895
    // so we choose to skip here.
896
5
    TEST_SYNC_POINT_RETURN_WITH_VALUE("InstanceChecker::do_inverted_check", (int)0);
897
898
3
    for (auto& [_, accessor] : accessor_map_) {
899
3
        std::unique_ptr<ListIterator> list_iter;
900
3
        int ret = accessor->list_directory("data", &list_iter);
901
3
        if (ret != 0) {
902
0
            return -1;
903
0
        }
904
905
111
        for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
906
108
            ++num_scanned;
907
108
            int ret = check_segment_file(file->path);
908
108
            if (ret != 0) {
909
17
                LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
910
17
                             << " path=" << file->path;
911
17
                if (ret == 1) {
912
12
                    ++num_file_leak;
913
12
                } else {
914
5
                    check_ret = -1;
915
5
                }
916
17
            }
917
108
            ret = check_inverted_index_file(file->path);
918
108
            if (ret != 0) {
919
13
                LOG(WARNING) << "failed to check index file, uri=" << accessor->uri()
920
13
                             << " path=" << file->path;
921
13
                if (ret == 1) {
922
13
                    ++num_file_leak;
923
13
                } else {
924
0
                    check_ret = -1;
925
0
                }
926
13
            }
927
108
        }
928
929
3
        if (!list_iter->is_valid()) {
930
0
            LOG(WARNING) << "failed to list data directory. uri=" << accessor->uri();
931
0
            return -1;
932
0
        }
933
3
    }
934
3
    return num_file_leak > 0 ? 1 : check_ret;
935
3
}
936
937
3
int InstanceChecker::traverse_mow_tablet(const std::function<int(int64_t, bool)>& check_func) {
938
3
    std::unique_ptr<RangeGetIterator> it;
939
3
    auto begin = meta_rowset_key({instance_id_, 0, 0});
940
3
    auto end = meta_rowset_key({instance_id_, std::numeric_limits<int64_t>::max(), 0});
941
43
    do {
942
43
        std::unique_ptr<Transaction> txn;
943
43
        TxnErrorCode err = txn_kv_->create_txn(&txn);
944
43
        if (err != TxnErrorCode::TXN_OK) {
945
0
            LOG(WARNING) << "failed to create txn";
946
0
            return -1;
947
0
        }
948
43
        err = txn->get(begin, end, &it, false, 1);
949
43
        if (err != TxnErrorCode::TXN_OK) {
950
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
951
0
            return -1;
952
0
        }
953
43
        if (!it->has_next()) {
954
3
            break;
955
3
        }
956
80
        while (it->has_next() && !stopped()) {
957
40
            auto [k, v] = it->next();
958
40
            std::string_view k1 = k;
959
40
            k1.remove_prefix(1);
960
40
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
961
40
            decode_key(&k1, &out);
962
            // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
963
40
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
964
965
40
            if (!it->has_next()) {
966
                // Update to next smallest key for iteration
967
                // scan for next tablet in this instance
968
40
                begin = meta_rowset_key({instance_id_, tablet_id + 1, 0});
969
40
            }
970
971
40
            TabletMetaCloudPB tablet_meta;
972
40
            int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
973
40
            if (ret < 0) {
974
0
                LOG(WARNING) << fmt::format(
975
0
                        "failed to get_tablet_meta in do_delete_bitmap_integrity_check(), "
976
0
                        "instance_id={}, tablet_id={}",
977
0
                        instance_id_, tablet_id);
978
0
                return ret;
979
0
            }
980
981
40
            if (tablet_meta.enable_unique_key_merge_on_write()) {
982
                // only check merge-on-write table
983
30
                bool has_sequence_col = tablet_meta.schema().has_sequence_col_idx() &&
984
30
                                        tablet_meta.schema().sequence_col_idx() != -1;
985
30
                int ret = check_func(tablet_id, has_sequence_col);
986
30
                if (ret < 0) {
987
                    // return immediately when encounter unexpected error,
988
                    // otherwise, we continue to check the next tablet
989
0
                    return ret;
990
0
                }
991
30
            }
992
40
        }
993
40
    } while (it->more() && !stopped());
994
3
    return 0;
995
3
}
996
997
int InstanceChecker::traverse_rowset_delete_bitmaps(
998
        int64_t tablet_id, std::string rowset_id,
999
0
        const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback) {
1000
0
    std::unique_ptr<RangeGetIterator> it;
1001
0
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, 0, 0});
1002
0
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id,
1003
0
                                       std::numeric_limits<int64_t>::max(),
1004
0
                                       std::numeric_limits<int64_t>::max()});
1005
0
    do {
1006
0
        std::unique_ptr<Transaction> txn;
1007
0
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1008
0
        if (err != TxnErrorCode::TXN_OK) {
1009
0
            LOG(WARNING) << "failed to create txn";
1010
0
            return -1;
1011
0
        }
1012
0
        err = txn->get(begin, end, &it);
1013
0
        if (err != TxnErrorCode::TXN_OK) {
1014
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1015
0
            return -1;
1016
0
        }
1017
0
        if (!it->has_next()) {
1018
0
            break;
1019
0
        }
1020
0
        while (it->has_next() && !stopped()) {
1021
0
            auto [k, v] = it->next();
1022
0
            std::string_view k1 = k;
1023
0
            k1.remove_prefix(1);
1024
0
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1025
0
            decode_key(&k1, &out);
1026
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1027
0
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1028
0
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1029
1030
0
            int ret = callback(tablet_id, rowset_id, version, segment_id);
1031
0
            if (ret != 0) {
1032
0
                return ret;
1033
0
            }
1034
1035
0
            if (!it->has_next()) {
1036
0
                begin = k;
1037
0
                begin.push_back('\x00'); // Update to next smallest key for iteration
1038
0
                break;
1039
0
            }
1040
0
        }
1041
0
    } while (it->more() && !stopped());
1042
1043
0
    return 0;
1044
0
}
1045
1046
int InstanceChecker::collect_tablet_rowsets(
1047
50
        int64_t tablet_id, const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb) {
1048
50
    std::unique_ptr<Transaction> txn;
1049
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1050
50
    if (err != TxnErrorCode::TXN_OK) {
1051
0
        LOG(WARNING) << "failed to create txn";
1052
0
        return -1;
1053
0
    }
1054
50
    std::unique_ptr<RangeGetIterator> it;
1055
50
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1056
50
    auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1057
1058
50
    int64_t rowsets_num {0};
1059
50
    do {
1060
50
        TxnErrorCode err = txn->get(begin, end, &it);
1061
50
        if (err != TxnErrorCode::TXN_OK) {
1062
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1063
0
            return -1;
1064
0
        }
1065
50
        if (!it->has_next()) {
1066
0
            break;
1067
0
        }
1068
394
        while (it->has_next() && !stopped()) {
1069
394
            auto [k, v] = it->next();
1070
394
            doris::RowsetMetaCloudPB rowset;
1071
394
            if (!rowset.ParseFromArray(v.data(), v.size())) {
1072
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1073
0
                return -1;
1074
0
            }
1075
1076
394
            ++rowsets_num;
1077
394
            collect_cb(rowset);
1078
1079
394
            if (!it->has_next()) {
1080
50
                begin = k;
1081
50
                begin.push_back('\x00'); // Update to next smallest key for iteration
1082
50
                break;
1083
50
            }
1084
394
        }
1085
50
    } while (it->more() && !stopped());
1086
1087
50
    LOG(INFO) << fmt::format(
1088
50
            "[delete bitmap checker] successfully collect rowsets for instance_id={}, "
1089
50
            "tablet_id={}, rowsets_num={}",
1090
50
            instance_id_, tablet_id, rowsets_num);
1091
50
    return 0;
1092
50
}
1093
1094
2
int InstanceChecker::do_delete_bitmap_inverted_check() {
1095
2
    LOG(INFO) << fmt::format(
1096
2
            "[delete bitmap checker] begin to do_delete_bitmap_inverted_check for instance_id={}",
1097
2
            instance_id_);
1098
1099
    // number of delete bitmap keys being scanned
1100
2
    int64_t total_delete_bitmap_keys {0};
1101
    // number of delete bitmaps which belongs to non mow tablet
1102
2
    int64_t abnormal_delete_bitmaps {0};
1103
    // number of delete bitmaps which doesn't have corresponding rowset in MS
1104
2
    int64_t leaked_delete_bitmaps {0};
1105
1106
2
    auto start_time = std::chrono::steady_clock::now();
1107
2
    DORIS_CLOUD_DEFER {
1108
2
        g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_, leaked_delete_bitmaps);
1109
2
        g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_, abnormal_delete_bitmaps);
1110
2
        g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_, total_delete_bitmap_keys);
1111
1112
2
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1113
2
                            std::chrono::steady_clock::now() - start_time)
1114
2
                            .count();
1115
2
        if (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) {
1116
1
            LOG(WARNING) << fmt::format(
1117
1
                    "[delete bitmap check fails] delete bitmap inverted check for instance_id={}, "
1118
1
                    "cost={} ms, total_delete_bitmap_keys={}, leaked_delete_bitmaps={}, "
1119
1
                    "abnormal_delete_bitmaps={}",
1120
1
                    instance_id_, cost, total_delete_bitmap_keys, leaked_delete_bitmaps,
1121
1
                    abnormal_delete_bitmaps);
1122
1
        } else {
1123
1
            LOG(INFO) << fmt::format(
1124
1
                    "[delete bitmap checker] delete bitmap inverted check for instance_id={}, "
1125
1
                    "passed. cost={} ms, total_delete_bitmap_keys={}",
1126
1
                    instance_id_, cost, total_delete_bitmap_keys);
1127
1
        }
1128
2
    };
1129
1130
2
    struct TabletsRowsetsCache {
1131
2
        int64_t tablet_id {-1};
1132
2
        bool enable_merge_on_write {false};
1133
2
        std::unordered_set<std::string> rowsets {};
1134
2
        std::unordered_set<std::string> pending_delete_bitmaps {};
1135
2
    } tablet_rowsets_cache {};
1136
1137
2
    std::unique_ptr<RangeGetIterator> it;
1138
2
    auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0});
1139
2
    auto end =
1140
2
            meta_delete_bitmap_key({instance_id_, std::numeric_limits<int64_t>::max(), "", 0, 0});
1141
2
    do {
1142
2
        std::unique_ptr<Transaction> txn;
1143
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1144
2
        if (err != TxnErrorCode::TXN_OK) {
1145
0
            LOG(WARNING) << "failed to create txn";
1146
0
            return -1;
1147
0
        }
1148
2
        err = txn->get(begin, end, &it);
1149
2
        if (err != TxnErrorCode::TXN_OK) {
1150
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1151
0
            return -1;
1152
0
        }
1153
2
        if (!it->has_next()) {
1154
0
            break;
1155
0
        }
1156
502
        while (it->has_next() && !stopped()) {
1157
500
            auto [k, v] = it->next();
1158
500
            std::string_view k1 = k;
1159
500
            k1.remove_prefix(1);
1160
500
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1161
500
            decode_key(&k1, &out);
1162
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1163
500
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
1164
500
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1165
500
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1166
500
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1167
1168
500
            ++total_delete_bitmap_keys;
1169
1170
500
            if (!it->has_next()) {
1171
2
                begin = k;
1172
2
                begin.push_back('\x00'); // Update to next smallest key for iteration
1173
2
            }
1174
1175
500
            if (tablet_rowsets_cache.tablet_id == -1 ||
1176
500
                tablet_rowsets_cache.tablet_id != tablet_id) {
1177
30
                TabletMetaCloudPB tablet_meta;
1178
30
                int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1179
30
                if (ret < 0) {
1180
0
                    LOG(WARNING) << fmt::format(
1181
0
                            "[delete bitmap checker] failed to get_tablet_meta in "
1182
0
                            "do_delete_bitmap_inverted_check(), instance_id={}, tablet_id={}",
1183
0
                            instance_id_, tablet_id);
1184
0
                    return ret;
1185
0
                }
1186
1187
30
                tablet_rowsets_cache.tablet_id = tablet_id;
1188
30
                tablet_rowsets_cache.enable_merge_on_write =
1189
30
                        tablet_meta.enable_unique_key_merge_on_write();
1190
30
                tablet_rowsets_cache.rowsets.clear();
1191
30
                tablet_rowsets_cache.pending_delete_bitmaps.clear();
1192
1193
30
                if (tablet_rowsets_cache.enable_merge_on_write) {
1194
                    // only collect rowsets for merge-on-write tablet
1195
20
                    auto collect_cb =
1196
199
                            [&tablet_rowsets_cache](const doris::RowsetMetaCloudPB& rowset) {
1197
199
                                tablet_rowsets_cache.rowsets.insert(rowset.rowset_id_v2());
1198
199
                            };
1199
20
                    ret = collect_tablet_rowsets(tablet_id, collect_cb);
1200
20
                    if (ret < 0) {
1201
0
                        return ret;
1202
0
                    }
1203
                    // get pending delete bitmaps
1204
20
                    ret = get_pending_delete_bitmap_keys(
1205
20
                            tablet_id, tablet_rowsets_cache.pending_delete_bitmaps);
1206
20
                    if (ret < 0) {
1207
0
                        return ret;
1208
0
                    }
1209
20
                }
1210
30
            }
1211
500
            DCHECK_EQ(tablet_id, tablet_rowsets_cache.tablet_id);
1212
1213
500
            if (!tablet_rowsets_cache.enable_merge_on_write) {
1214
                // clang-format off
1215
40
                TEST_SYNC_POINT_CALLBACK(
1216
40
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
1217
40
                        &tablet_id, &rowset_id, &version, &segment_id);
1218
                // clang-format on
1219
40
                ++abnormal_delete_bitmaps;
1220
                // log an error and continue to check the next delete bitmap
1221
40
                LOG(WARNING) << fmt::format(
1222
40
                        "[delete bitmap check fails] find a delete bitmap belongs to tablet "
1223
40
                        "which is not a merge-on-write table! instance_id={}, tablet_id={}, "
1224
40
                        "version={}, segment_id={}",
1225
40
                        instance_id_, tablet_id, version, segment_id);
1226
40
                continue;
1227
40
            }
1228
1229
460
            if (!tablet_rowsets_cache.rowsets.contains(rowset_id) &&
1230
460
                !tablet_rowsets_cache.pending_delete_bitmaps.contains(std::string(k))) {
1231
170
                TEST_SYNC_POINT_CALLBACK(
1232
170
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap",
1233
170
                        &tablet_id, &rowset_id, &version, &segment_id);
1234
170
                ++leaked_delete_bitmaps;
1235
                // log an error and continue to check the next delete bitmap
1236
170
                LOG(WARNING) << fmt::format(
1237
170
                        "[delete bitmap check fails] can't find corresponding rowset for delete "
1238
170
                        "bitmap instance_id={}, tablet_id={}, rowset_id={}, version={}, "
1239
170
                        "segment_id={}",
1240
170
                        instance_id_, tablet_id, rowset_id, version, segment_id);
1241
170
            }
1242
460
        }
1243
2
    } while (it->more() && !stopped());
1244
1245
2
    return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0;
1246
2
}
1247
1248
int InstanceChecker::get_pending_delete_bitmap_keys(
1249
50
        int64_t tablet_id, std::unordered_set<std::string>& pending_delete_bitmaps) {
1250
50
    std::unique_ptr<Transaction> txn;
1251
50
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1252
50
    if (err != TxnErrorCode::TXN_OK) {
1253
0
        LOG(WARNING) << "failed to create txn";
1254
0
        return -1;
1255
0
    }
1256
50
    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
1257
50
    std::string pending_val;
1258
50
    err = txn->get(pending_key, &pending_val);
1259
50
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1260
0
        LOG(WARNING) << "failed to get pending delete bitmap kv, err=" << err;
1261
0
        return -1;
1262
0
    }
1263
50
    if (err == TxnErrorCode::TXN_OK) {
1264
2
        PendingDeleteBitmapPB pending_info;
1265
2
        if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
1266
0
            LOG(WARNING) << "failed to parse PendingDeleteBitmapPB, tablet=" << tablet_id;
1267
0
            return -1;
1268
0
        }
1269
12
        for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) {
1270
12
            pending_delete_bitmaps.emplace(std::string(delete_bitmap_key));
1271
12
        }
1272
2
    }
1273
50
    return 0;
1274
50
}
1275
1276
int InstanceChecker::check_inverted_index_file_storage_format_v1(
1277
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1278
14
        RowsetIndexesFormatV1& rowset_index_cache_v1) {
1279
    // format v1: data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1280
14
    std::string rowset_id;
1281
14
    int64_t segment_id;
1282
14
    std::string index_id_with_suffix_name;
1283
    // {rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
1284
14
    std::vector<std::string> str;
1285
14
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1286
14
    if (str.size() < 3) {
1287
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 3, rowset_info = "
1288
0
                     << rowset_info;
1289
0
        return -1;
1290
0
    }
1291
14
    rowset_id = str[0];
1292
14
    segment_id = std::atoll(str[1].c_str());
1293
14
    index_id_with_suffix_name = str[2];
1294
1295
14
    if (rowset_index_cache_v1.rowset_id == rowset_id) {
1296
0
        if (rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1297
0
            if (auto it = rowset_index_cache_v1.index_ids.find(index_id_with_suffix_name);
1298
0
                it == rowset_index_cache_v1.index_ids.end()) {
1299
                // clang-format off
1300
0
                LOG(WARNING) << fmt::format("index_id with suffix name not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1301
                // clang-format on
1302
0
                return -1;
1303
0
            }
1304
0
        } else {
1305
            // clang-format off
1306
0
            LOG(WARNING) << fmt::format("segment id not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1307
            // clang-format on
1308
0
            return -1;
1309
0
        }
1310
0
    }
1311
1312
14
    rowset_index_cache_v1.rowset_id = rowset_id;
1313
14
    rowset_index_cache_v1.segment_ids.clear();
1314
14
    rowset_index_cache_v1.index_ids.clear();
1315
1316
14
    std::unique_ptr<Transaction> txn;
1317
14
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1318
14
    if (err != TxnErrorCode::TXN_OK) {
1319
0
        LOG(WARNING) << "failed to create txn";
1320
0
        return -1;
1321
0
    }
1322
14
    std::unique_ptr<RangeGetIterator> it;
1323
14
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1324
14
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1325
14
    do {
1326
14
        TxnErrorCode err = txn->get(begin, end, &it);
1327
14
        if (err != TxnErrorCode::TXN_OK) {
1328
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1329
0
            return -1;
1330
0
        }
1331
14
        if (!it->has_next()) {
1332
8
            break;
1333
8
        }
1334
6
        while (it->has_next()) {
1335
            // recycle corresponding resources
1336
6
            auto [k, v] = it->next();
1337
6
            doris::RowsetMetaCloudPB rs_meta;
1338
6
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1339
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1340
0
                return -1;
1341
0
            }
1342
1343
6
            TabletIndexPB tablet_index;
1344
6
            if (get_tablet_idx(txn_kv_.get(), instance_id_, rs_meta.tablet_id(), tablet_index) ==
1345
6
                -1) {
1346
0
                LOG(WARNING) << "failedt to get tablet index, tablet_id= " << rs_meta.tablet_id();
1347
0
                return -1;
1348
0
            }
1349
1350
6
            auto tablet_schema_key = meta_schema_key(
1351
6
                    {instance_id_, tablet_index.index_id(), rs_meta.schema_version()});
1352
6
            ValueBuf tablet_schema_val;
1353
6
            err = cloud::blob_get(txn.get(), tablet_schema_key, &tablet_schema_val);
1354
1355
6
            if (err != TxnErrorCode::TXN_OK) {
1356
0
                LOG(WARNING) << "failed to get schema, err=" << err;
1357
0
                return -1;
1358
0
            }
1359
1360
6
            auto* schema = rs_meta.mutable_tablet_schema();
1361
6
            if (!parse_schema_value(tablet_schema_val, schema)) {
1362
0
                LOG(WARNING) << "malformed schema value, key=" << hex(tablet_schema_key);
1363
0
                return -1;
1364
0
            }
1365
1366
12
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1367
6
                rowset_index_cache_v1.segment_ids.insert(i);
1368
6
            }
1369
1370
6
            for (const auto& i : rs_meta.tablet_schema().index()) {
1371
6
                if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
1372
6
                    LOG(INFO) << fmt::format(
1373
6
                            "record index info, index_id: {}, index_suffix_name: {}", i.index_id(),
1374
6
                            i.index_suffix_name());
1375
6
                    rowset_index_cache_v1.index_ids.insert(
1376
6
                            fmt::format("{}{}", i.index_id(), i.index_suffix_name()));
1377
6
                }
1378
6
            }
1379
1380
6
            if (!it->has_next()) {
1381
6
                begin = k;
1382
6
                begin.push_back('\x00'); // Update to next smallest key for iteration
1383
6
                break;
1384
6
            }
1385
6
        }
1386
6
    } while (it->more() && !stopped());
1387
1388
14
    if (!rowset_index_cache_v1.segment_ids.contains(segment_id)) {
1389
        // Garbage data leak
1390
        // clang-format off
1391
8
        LOG(WARNING) << "rowset_index_cache_v1.segment_ids don't contains segment_id, rowset should be recycled,"
1392
8
                     << " key = " << file_path
1393
8
                     << " segment_id = " << segment_id;
1394
        // clang-format on
1395
8
        return 1;
1396
8
    }
1397
1398
6
    if (!rowset_index_cache_v1.index_ids.contains(index_id_with_suffix_name)) {
1399
        // Garbage data leak
1400
        // clang-format off
1401
0
        LOG(WARNING) << "rowset_index_cache_v1.index_ids don't contains index_id_with_suffix_name,"
1402
0
                     << " rowset with inde meta should be recycled, key=" << file_path
1403
0
                     << " index_id_with_suffix_name=" << index_id_with_suffix_name;
1404
        // clang-format on
1405
0
        return 1;
1406
0
    }
1407
1408
6
    return 0;
1409
6
}
1410
1411
int InstanceChecker::check_inverted_index_file_storage_format_v2(
1412
        int64_t tablet_id, const std::string& file_path, const std::string& rowset_info,
1413
40
        RowsetIndexesFormatV2& rowset_index_cache_v2) {
1414
40
    std::string rowset_id;
1415
40
    int64_t segment_id;
1416
    // {rowset_id}_{seg_num}.idx
1417
40
    std::vector<std::string> str;
1418
40
    butil::SplitString(rowset_info.substr(0, rowset_info.size() - 4), '_', &str);
1419
40
    if (str.size() < 2) {
1420
        // clang-format off
1421
0
        LOG(WARNING) << "Split rowset info with '_' error, str size < 2, rowset_info = " << rowset_info;
1422
        // clang-format on
1423
0
        return -1;
1424
0
    }
1425
40
    rowset_id = str[0];
1426
40
    segment_id = std::atoll(str[1].c_str());
1427
1428
40
    if (rowset_index_cache_v2.rowset_id == rowset_id) {
1429
0
        if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1430
            // clang-format off
1431
0
            LOG(WARNING) << fmt::format("index file not found, rowset_info = {}, obj_key = {}", rowset_info, file_path);
1432
            // clang-format on
1433
0
            return -1;
1434
0
        }
1435
0
    }
1436
1437
40
    rowset_index_cache_v2.rowset_id = rowset_id;
1438
40
    rowset_index_cache_v2.segment_ids.clear();
1439
1440
40
    std::unique_ptr<Transaction> txn;
1441
40
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1442
40
    if (err != TxnErrorCode::TXN_OK) {
1443
0
        LOG(WARNING) << "failed to create txn";
1444
0
        return -1;
1445
0
    }
1446
40
    std::unique_ptr<RangeGetIterator> it;
1447
40
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1448
40
    auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
1449
40
    do {
1450
40
        TxnErrorCode err = txn->get(begin, end, &it);
1451
40
        if (err != TxnErrorCode::TXN_OK) {
1452
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1453
0
            return -1;
1454
0
        }
1455
40
        if (!it->has_next()) {
1456
5
            break;
1457
5
        }
1458
35
        while (it->has_next()) {
1459
            // recycle corresponding resources
1460
35
            auto [k, v] = it->next();
1461
35
            doris::RowsetMetaCloudPB rs_meta;
1462
35
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
1463
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1464
0
                return -1;
1465
0
            }
1466
1467
70
            for (size_t i = 0; i < rs_meta.num_segments(); i++) {
1468
35
                rowset_index_cache_v2.segment_ids.insert(i);
1469
35
            }
1470
1471
35
            if (!it->has_next()) {
1472
35
                begin = k;
1473
35
                begin.push_back('\x00'); // Update to next smallest key for iteration
1474
35
                break;
1475
35
            }
1476
35
        }
1477
35
    } while (it->more() && !stopped());
1478
1479
40
    if (!rowset_index_cache_v2.segment_ids.contains(segment_id)) {
1480
        // Garbage data leak
1481
5
        LOG(WARNING) << "rowset with index meta should be recycled, key=" << file_path;
1482
5
        return 1;
1483
5
    }
1484
1485
35
    return 0;
1486
40
}
1487
1488
int InstanceChecker::check_delete_bitmap_storage_optimize_v2(
1489
        int64_t tablet_id, bool has_sequence_col,
1490
30
        int64_t& rowsets_with_useless_delete_bitmap_version) {
1491
    // end_version: create_time
1492
30
    std::map<int64_t, int64_t> tablet_rowsets_map {};
1493
    // rowset_id: {start_version, end_version}
1494
30
    std::map<std::string, std::pair<int64_t, int64_t>> rowset_version_map;
1495
    // Get all visible rowsets of this tablet
1496
195
    auto collect_cb = [&](const doris::RowsetMetaCloudPB& rowset) {
1497
195
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1498
            // ignore dummy rowset [0-1]
1499
0
            return;
1500
0
        }
1501
195
        tablet_rowsets_map[rowset.end_version()] = rowset.creation_time();
1502
195
        rowset_version_map[rowset.rowset_id_v2()] =
1503
195
                std::make_pair(rowset.start_version(), rowset.end_version());
1504
195
    };
1505
30
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1506
0
        return ret;
1507
0
    }
1508
1509
30
    std::unordered_set<std::string> pending_delete_bitmaps;
1510
30
    if (auto ret = get_pending_delete_bitmap_keys(tablet_id, pending_delete_bitmaps); ret < 0) {
1511
0
        return ret;
1512
0
    }
1513
1514
30
    std::unique_ptr<RangeGetIterator> it;
1515
30
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
1516
30
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
1517
30
    std::string last_rowset_id = "";
1518
30
    int64_t last_version = 0;
1519
30
    int64_t last_failed_version = 0;
1520
30
    std::vector<int64_t> failed_versions;
1521
30
    auto print_failed_versions = [&]() {
1522
4
        TEST_SYNC_POINT_CALLBACK(
1523
4
                "InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_"
1524
4
                "rowset",
1525
4
                &tablet_id, &last_rowset_id);
1526
4
        rowsets_with_useless_delete_bitmap_version++;
1527
        // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18]
1528
        // print as [8-11, 13, 17-18]
1529
4
        int64_t last_start_version = -1;
1530
4
        int64_t last_end_version = -1;
1531
4
        std::stringstream ss;
1532
4
        ss << "[";
1533
9
        for (int64_t version : failed_versions) {
1534
9
            if (last_start_version == -1) {
1535
4
                last_start_version = version;
1536
4
                last_end_version = version;
1537
4
                continue;
1538
4
            }
1539
5
            if (last_end_version + 1 == version) {
1540
2
                last_end_version = version;
1541
3
            } else {
1542
3
                if (last_start_version == last_end_version) {
1543
3
                    ss << last_start_version << ", ";
1544
3
                } else {
1545
0
                    ss << last_start_version << "-" << last_end_version << ", ";
1546
0
                }
1547
3
                last_start_version = version;
1548
3
                last_end_version = version;
1549
3
            }
1550
5
        }
1551
4
        if (last_start_version == last_end_version) {
1552
3
            ss << last_start_version;
1553
3
        } else {
1554
1
            ss << last_start_version << "-" << last_end_version;
1555
1
        }
1556
4
        ss << "]";
1557
4
        std::stringstream version_str;
1558
4
        auto it = rowset_version_map.find(last_rowset_id);
1559
4
        if (it != rowset_version_map.end()) {
1560
4
            version_str << "[" << it->second.first << "-" << it->second.second << "]";
1561
4
        }
1562
4
        LOG(WARNING) << fmt::format(
1563
4
                "[delete bitmap check fails] delete bitmap storage optimize v2 check fail "
1564
4
                "for instance_id={}, tablet_id={}, rowset_id={}, version={} found delete "
1565
4
                "bitmap with versions={}, size={}",
1566
4
                instance_id_, tablet_id, last_rowset_id, version_str.str(), ss.str(),
1567
4
                failed_versions.size());
1568
4
    };
1569
30
    using namespace std::chrono;
1570
30
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
1571
30
    do {
1572
30
        std::unique_ptr<Transaction> txn;
1573
30
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1574
30
        if (err != TxnErrorCode::TXN_OK) {
1575
0
            LOG(WARNING) << "failed to create txn";
1576
0
            return -1;
1577
0
        }
1578
30
        err = txn->get(begin, end, &it);
1579
30
        if (err != TxnErrorCode::TXN_OK) {
1580
0
            LOG(WARNING) << "failed to get delete bitmap kv, err=" << err;
1581
0
            return -1;
1582
0
        }
1583
30
        if (!it->has_next()) {
1584
0
            break;
1585
0
        }
1586
771
        while (it->has_next() && !stopped()) {
1587
741
            auto [k, v] = it->next();
1588
741
            std::string_view k1 = k;
1589
741
            k1.remove_prefix(1);
1590
741
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1591
741
            decode_key(&k1, &out);
1592
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1593
741
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1594
741
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1595
741
            if (!it->has_next()) {
1596
30
                begin = k;
1597
30
                begin.push_back('\x00'); // Update to next smallest key for iteration
1598
30
            }
1599
741
            if (rowset_id == last_rowset_id && version == last_version) {
1600
                // skip the same rowset and version
1601
167
                continue;
1602
167
            }
1603
574
            if (rowset_id != last_rowset_id && !failed_versions.empty()) {
1604
3
                print_failed_versions();
1605
3
                last_failed_version = 0;
1606
3
                failed_versions.clear();
1607
3
            }
1608
574
            last_rowset_id = rowset_id;
1609
574
            last_version = version;
1610
574
            if (tablet_rowsets_map.find(version) != tablet_rowsets_map.end()) {
1611
548
                continue;
1612
548
            }
1613
26
            auto version_it = rowset_version_map.find(rowset_id);
1614
26
            if (version_it == rowset_version_map.end()) {
1615
                // checked in do_delete_bitmap_inverted_check
1616
1
                continue;
1617
1
            }
1618
25
            if (pending_delete_bitmaps.contains(std::string(k))) {
1619
3
                continue;
1620
3
            }
1621
22
            if (has_sequence_col && version >= version_it->second.first &&
1622
22
                version <= version_it->second.second) {
1623
5
                continue;
1624
5
            }
1625
            // there may be an interval in this situation:
1626
            // 1. finish compaction job; 2. checker; 3. finish agg and remove delete bitmap to ms
1627
17
            auto rowset_it = tablet_rowsets_map.upper_bound(version);
1628
17
            if (rowset_it == tablet_rowsets_map.end()) {
1629
1
                if (version != last_failed_version) {
1630
1
                    failed_versions.push_back(version);
1631
1
                }
1632
1
                last_failed_version = version;
1633
1
                continue;
1634
1
            }
1635
16
            if (rowset_it->second + config::delete_bitmap_storage_optimize_v2_check_skip_seconds >=
1636
16
                now) {
1637
8
                continue;
1638
8
            }
1639
8
            if (version != last_failed_version) {
1640
8
                failed_versions.push_back(version);
1641
8
            }
1642
8
            last_failed_version = version;
1643
8
        }
1644
30
    } while (it->more() && !stopped());
1645
30
    if (!failed_versions.empty()) {
1646
1
        print_failed_versions();
1647
1
    }
1648
30
    LOG(INFO) << fmt::format(
1649
30
            "[delete bitmap checker] finish check delete bitmap storage optimize v2 for "
1650
30
            "instance_id={}, tablet_id={}, rowsets_num={}, "
1651
30
            "rowsets_with_useless_delete_bitmap_version={}",
1652
30
            instance_id_, tablet_id, tablet_rowsets_map.size(),
1653
30
            rowsets_with_useless_delete_bitmap_version);
1654
30
    return (rowsets_with_useless_delete_bitmap_version > 1 ? 1 : 0);
1655
30
}
1656
1657
3
int InstanceChecker::do_delete_bitmap_storage_optimize_check(int version) {
1658
3
    if (version != 2) {
1659
0
        return -1;
1660
0
    }
1661
3
    int64_t total_tablets_num {0};
1662
3
    int64_t failed_tablets_num {0};
1663
1664
    // for v2 check
1665
3
    int64_t max_rowsets_with_useless_delete_bitmap_version = 0;
1666
3
    int64_t tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = 0;
1667
1668
    // check that for every visible rowset, there exists at least delete one bitmap in MS
1669
30
    int ret = traverse_mow_tablet([&](int64_t tablet_id, bool has_sequence_col) {
1670
30
        ++total_tablets_num;
1671
30
        int64_t rowsets_with_useless_delete_bitmap_version = 0;
1672
30
        int res = check_delete_bitmap_storage_optimize_v2(
1673
30
                tablet_id, has_sequence_col, rowsets_with_useless_delete_bitmap_version);
1674
30
        if (rowsets_with_useless_delete_bitmap_version >
1675
30
            max_rowsets_with_useless_delete_bitmap_version) {
1676
1
            max_rowsets_with_useless_delete_bitmap_version =
1677
1
                    rowsets_with_useless_delete_bitmap_version;
1678
1
            tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = tablet_id;
1679
1
        }
1680
30
        failed_tablets_num += (res != 0);
1681
30
        return res;
1682
30
    });
1683
1684
3
    if (ret < 0) {
1685
0
        return ret;
1686
0
    }
1687
1688
3
    g_bvar_max_rowsets_with_useless_delete_bitmap_version.put(
1689
3
            instance_id_, max_rowsets_with_useless_delete_bitmap_version);
1690
1691
3
    std::stringstream ss;
1692
3
    ss << "[delete bitmap checker] check delete bitmap storage optimize v" << version
1693
3
       << " for instance_id=" << instance_id_ << ", total_tablets_num=" << total_tablets_num
1694
3
       << ", failed_tablets_num=" << failed_tablets_num
1695
3
       << ". max_rowsets_with_useless_delete_bitmap_version="
1696
3
       << max_rowsets_with_useless_delete_bitmap_version
1697
3
       << ", tablet_id=" << tablet_id_with_max_rowsets_with_useless_delete_bitmap_version;
1698
3
    LOG(INFO) << ss.str();
1699
1700
3
    return (failed_tablets_num > 0) ? 1 : 0;
1701
3
}
1702
1703
3
int InstanceChecker::do_mow_job_key_check() {
1704
3
    std::unique_ptr<RangeGetIterator> it;
1705
3
    std::string begin = mow_tablet_job_key({instance_id_, 0, 0});
1706
3
    std::string end = mow_tablet_job_key({instance_id_, INT64_MAX, 0});
1707
3
    MowTabletJobPB mow_tablet_job;
1708
3
    do {
1709
3
        std::unique_ptr<Transaction> txn;
1710
3
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1711
3
        if (err != TxnErrorCode::TXN_OK) {
1712
0
            LOG(WARNING) << "failed to create txn";
1713
0
            return -1;
1714
0
        }
1715
3
        err = txn->get(begin, end, &it);
1716
3
        if (err != TxnErrorCode::TXN_OK) {
1717
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
1718
0
            return -1;
1719
0
        }
1720
3
        int64_t now = duration_cast<std::chrono::seconds>(
1721
3
                              std::chrono::system_clock::now().time_since_epoch())
1722
3
                              .count();
1723
3
        while (it->has_next() && !stopped()) {
1724
2
            auto [k, v] = it->next();
1725
2
            std::string_view k1 = k;
1726
2
            k1.remove_prefix(1);
1727
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1728
2
            decode_key(&k1, &out);
1729
            // 0x01 "meta" ${instance_id} "mow_tablet_job" ${table_id} ${initiator}
1730
2
            auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1731
2
            auto initiator = std::get<int64_t>(std::get<0>(out[4]));
1732
2
            if (!mow_tablet_job.ParseFromArray(v.data(), v.size())) [[unlikely]] {
1733
0
                LOG(WARNING) << "failed to parse MowTabletJobPB";
1734
0
                return -1;
1735
0
            }
1736
2
            int64_t expiration = mow_tablet_job.expiration();
1737
            // check job key failed should meet both following two condition:
1738
            // 1. job key is expired
1739
            // 2. table lock key is not found or key is not expired
1740
2
            if (expiration < now - config::mow_job_key_check_expiration_diff_seconds) {
1741
2
                std::string lock_key =
1742
2
                        meta_delete_bitmap_update_lock_key({instance_id_, table_id, -1});
1743
2
                std::string lock_val;
1744
2
                err = txn->get(lock_key, &lock_val);
1745
2
                std::string reason = "";
1746
2
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
1747
0
                    reason = "table lock key not found";
1748
1749
2
                } else {
1750
2
                    DeleteBitmapUpdateLockPB lock_info;
1751
2
                    if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
1752
0
                        LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB";
1753
0
                        return -1;
1754
0
                    }
1755
2
                    if (lock_info.expiration() > now || lock_info.lock_id() != -1) {
1756
2
                        reason = "table lock is not expired,lock_id=" +
1757
2
                                 std::to_string(lock_info.lock_id());
1758
2
                    }
1759
2
                }
1760
2
                if (reason != "") {
1761
2
                    LOG(WARNING) << fmt::format(
1762
2
                            "[compaction key check fails] mow job key check fail for "
1763
2
                            "instance_id={}, table_id={}, initiator={}, expiration={}, now={}, "
1764
2
                            "reason={}",
1765
2
                            instance_id_, table_id, initiator, expiration, now, reason);
1766
2
                    return -1;
1767
2
                }
1768
2
            }
1769
2
        }
1770
1
        begin = it->next_begin_key(); // Update to next smallest key for iteration
1771
1
    } while (it->more() && !stopped());
1772
1
    return 0;
1773
3
}
1774
1775
4
int InstanceChecker::do_tablet_stats_key_check() {
1776
4
    int ret = 0;
1777
1778
4
    int64_t nums_leak = 0;
1779
4
    int64_t nums_loss = 0;
1780
4
    int64_t nums_scanned = 0;
1781
4
    int64_t nums_abnormal = 0;
1782
1783
4
    std::string begin = meta_tablet_key({instance_id_, 0, 0, 0, 0});
1784
4
    std::string end = meta_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1785
    // inverted check tablet exists
1786
4
    LOG(INFO) << "begin inverted check stats_tablet_key";
1787
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1788
4
        int ret = check_stats_tablet_key_exists(key, value);
1789
4
        nums_scanned++;
1790
4
        if (ret == 1) {
1791
1
            nums_loss++;
1792
1
        }
1793
4
        return ret;
1794
4
    });
1795
4
    if (ret == -1) {
1796
0
        LOG(WARNING) << "failed to inverted check if stats tablet key exists";
1797
0
        return -1;
1798
4
    } else if (ret == 1) {
1799
1
        LOG(WARNING) << "stats_tablet_key loss, nums_scanned=" << nums_scanned
1800
1
                     << ", nums_loss=" << nums_loss;
1801
1
        return 1;
1802
1
    }
1803
3
    LOG(INFO) << "finish inverted check stats_tablet_key, nums_scanned=" << nums_scanned
1804
3
              << ", nums_loss=" << nums_loss;
1805
1806
3
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1807
3
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1808
3
    nums_scanned = 0;
1809
    // check tablet exists
1810
3
    LOG(INFO) << "begin check stats_tablet_key leaked";
1811
4
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1812
4
        int ret = check_stats_tablet_key_leaked(key, value);
1813
4
        nums_scanned++;
1814
4
        if (ret == 1) {
1815
1
            nums_leak++;
1816
1
        }
1817
4
        return ret;
1818
4
    });
1819
3
    if (ret == -1) {
1820
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1821
0
        return -1;
1822
3
    } else if (ret == 1) {
1823
1
        LOG(WARNING) << "stats_tablet_key leaked, nums_scanned=" << nums_scanned
1824
1
                     << ", nums_leak=" << nums_leak;
1825
1
        return 1;
1826
1
    }
1827
2
    LOG(INFO) << "finish check stats_tablet_key leaked, nums_scanned=" << nums_scanned
1828
2
              << ", nums_leak=" << nums_leak;
1829
1830
2
    begin = stats_tablet_key({instance_id_, 0, 0, 0, 0});
1831
2
    end = stats_tablet_key({instance_id_, INT64_MAX, 0, 0, 0});
1832
2
    nums_scanned = 0;
1833
    // check if key is normal
1834
2
    LOG(INFO) << "begin check stats_tablet_key abnormal";
1835
2
    ret = scan_and_handle_kv(begin, end, [&](std::string_view key, std::string_view value) {
1836
2
        int ret = check_stats_tablet_key(key, value);
1837
2
        nums_scanned++;
1838
2
        if (ret == 1) {
1839
1
            nums_abnormal++;
1840
1
        }
1841
2
        return ret;
1842
2
    });
1843
2
    if (ret == -1) {
1844
0
        LOG(WARNING) << "failed to check if stats tablet key exists";
1845
0
        return -1;
1846
2
    } else if (ret == 1) {
1847
1
        LOG(WARNING) << "stats_tablet_key abnormal, nums_scanned=" << nums_scanned
1848
1
                     << ", nums_abnormal=" << nums_abnormal;
1849
1
        return 1;
1850
1
    }
1851
1
    LOG(INFO) << "finish check stats_tablet_key, nums_scanned=" << nums_scanned
1852
1
              << ", nums_abnormal=" << nums_abnormal;
1853
1
    return 0;
1854
2
}
1855
1856
4
int InstanceChecker::check_stats_tablet_key_exists(std::string_view key, std::string_view value) {
1857
4
    std::string_view k1 = key;
1858
4
    k1.remove_prefix(1);
1859
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1860
4
    decode_key(&k1, &out);
1861
    // 0x01 "meta" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1862
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1863
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1864
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1865
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1866
4
    std::string tablet_stats_key =
1867
4
            stats_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1868
4
    int ret = key_exist(txn_kv_.get(), tablet_stats_key);
1869
4
    if (ret == 1) {
1870
        // clang-format off
1871
1
        LOG(WARNING) << "stats tablet key's tablet key loss,"
1872
1
                    << " stats tablet key=" << hex(tablet_stats_key)
1873
1
                    << " meta tablet key=" << hex(key);
1874
        // clang-format on
1875
1
        return 1;
1876
3
    } else if (ret == -1) {
1877
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_stats_key);
1878
0
        return -1;
1879
0
    }
1880
3
    LOG(INFO) << "check stats_tablet_key_exists ok, key=" << hex(key);
1881
3
    return 0;
1882
4
}
1883
1884
4
int InstanceChecker::check_stats_tablet_key_leaked(std::string_view key, std::string_view value) {
1885
4
    std::string_view k1 = key;
1886
4
    k1.remove_prefix(1);
1887
4
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1888
4
    decode_key(&k1, &out);
1889
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1890
4
    auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1891
4
    auto index_id = std::get<int64_t>(std::get<0>(out[4]));
1892
4
    auto partition_id = std::get<int64_t>(std::get<0>(out[5]));
1893
4
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1894
4
    std::string tablet_key =
1895
4
            meta_tablet_key({instance_id_, table_id, index_id, partition_id, tablet_id});
1896
4
    int ret = key_exist(txn_kv_.get(), tablet_key);
1897
4
    if (ret == 1) {
1898
        // clang-format off
1899
1
        LOG(WARNING) << "stats tablet key's tablet key leak,"
1900
1
                    << " stats tablet key=" << hex(key)
1901
1
                    << " meta tablet key=" << hex(tablet_key);
1902
        // clang-format on
1903
1
        return 1;
1904
3
    } else if (ret == -1) {
1905
0
        LOG(WARNING) << "failed to check key exists, key=" << hex(tablet_key);
1906
0
        return -1;
1907
0
    }
1908
3
    LOG(INFO) << "check stats_tablet_key_leaked ok, key=" << hex(key);
1909
3
    return 0;
1910
4
}
1911
1912
2
int InstanceChecker::check_stats_tablet_key(std::string_view key, std::string_view value) {
1913
2
    TabletStatsPB tablet_stats_pb;
1914
2
    std::string_view k1 = key;
1915
2
    k1.remove_prefix(1);
1916
2
    std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1917
2
    decode_key(&k1, &out);
1918
    // 0x01 "stats" ${instance_id} "tablet" ${table_id} ${index_id} ${partition_id} ${tablet_id}
1919
2
    auto tablet_id = std::get<int64_t>(std::get<0>(out[6]));
1920
2
    std::unique_ptr<Transaction> txn;
1921
2
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1922
2
    if (err != TxnErrorCode::TXN_OK) {
1923
0
        LOG_WARNING("failed to recycle tablet ")
1924
0
                .tag("tablet id", tablet_id)
1925
0
                .tag("instance_id", instance_id_)
1926
0
                .tag("reason", "failed to create txn");
1927
0
        return -1;
1928
0
    }
1929
2
    std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id});
1930
2
    std::string tablet_idx_val;
1931
2
    TabletIndexPB tablet_idx;
1932
2
    err = txn->get(tablet_idx_key, &tablet_idx_val);
1933
2
    if (err != TxnErrorCode::TXN_OK) {
1934
        // clang-format off
1935
0
        LOG(WARNING) << "failed to get tablet index key,"
1936
0
                        << " key=" << hex(tablet_idx_key)
1937
0
                        << " code=" << err;
1938
        // clang-format on
1939
0
        return -1;
1940
0
    }
1941
2
    tablet_idx.ParseFromString(tablet_idx_val);
1942
2
    MetaServiceCode code = MetaServiceCode::OK;
1943
2
    std::string msg;
1944
2
    internal_get_tablet_stats(code, msg, txn.get(), instance_id_, tablet_idx, tablet_stats_pb);
1945
2
    if (code != MetaServiceCode::OK) {
1946
        // clang-format off
1947
0
        LOG(WARNING) << "failed to get tablet stats,"
1948
0
                        << " code=" << code 
1949
0
                        << " msg=" << msg;
1950
        // clang-format on
1951
0
        return -1;
1952
0
    }
1953
1954
2
    GetRowsetResponse resp;
1955
    // get rowsets in tablet
1956
2
    internal_get_rowset(txn.get(), 0, std::numeric_limits<int64_t>::max() - 1, instance_id_,
1957
2
                        tablet_id, code, msg, &resp);
1958
2
    if (code != MetaServiceCode::OK) {
1959
0
        LOG_WARNING("failed to get rowsets of tablet when check stats tablet key")
1960
0
                .tag("tablet id", tablet_id)
1961
0
                .tag("msg", msg)
1962
0
                .tag("code", code)
1963
0
                .tag("instance id", instance_id_);
1964
0
        return -1;
1965
0
    }
1966
2
    int64_t num_rows = 0;
1967
2
    int64_t num_rowsets = 0;
1968
2
    int64_t num_segments = 0;
1969
2
    int64_t total_data_size = 0;
1970
2
    for (const auto& rs_meta : resp.rowset_meta()) {
1971
2
        num_rows += rs_meta.num_rows();
1972
2
        num_rowsets++;
1973
2
        num_segments += rs_meta.num_segments();
1974
2
        total_data_size += rs_meta.total_disk_size();
1975
2
    }
1976
2
    int ret = 0;
1977
2
    if (tablet_stats_pb.data_size() != total_data_size) {
1978
1
        ret = 1;
1979
        // clang-format off
1980
1
        LOG(WARNING) << " tablet_stats_pb's data size is not same with all rowset total data size,"
1981
1
                        << " tablet_stats_pb's data size=" << tablet_stats_pb.data_size()
1982
1
                        << " all rowset total data size=" << total_data_size
1983
1
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
1984
        // clang-format on
1985
1
    } else if (tablet_stats_pb.num_rows() != num_rows) {
1986
0
        ret = 1;
1987
        // clang-format off
1988
0
        LOG(WARNING) << " tablet_stats_pb's num_rows is not same with all rowset total num_rows,"
1989
0
                        << " tablet_stats_pb's num_rows=" << tablet_stats_pb.num_rows()
1990
0
                        << " all rowset total num_rows=" << num_rows
1991
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
1992
        // clang-format on
1993
1
    } else if (tablet_stats_pb.num_rowsets() != num_rowsets) {
1994
0
        ret = 1;
1995
        // clang-format off
1996
0
        LOG(WARNING) << " tablet_stats_pb's num_rowsets is not same with all rowset nums,"
1997
0
                        << " tablet_stats_pb's num_rowsets=" << tablet_stats_pb.num_rowsets()
1998
0
                        << " all rowset nums=" << num_rowsets
1999
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2000
        // clang-format on
2001
1
    } else if (tablet_stats_pb.num_segments() != num_segments) {
2002
0
        ret = 1;
2003
        // clang-format off
2004
0
        LOG(WARNING) << " tablet_stats_pb's num_segments is not same with all rowset total num_segments,"
2005
0
                        << " tablet_stats_pb's num_segments=" << tablet_stats_pb.num_segments()
2006
0
                        << " all rowset total num_segments=" << num_segments
2007
0
                        << " stats tablet meta=" << tablet_stats_pb.ShortDebugString();
2008
        // clang-format on
2009
0
    }
2010
2011
2
    return ret;
2012
2
}
2013
2014
int InstanceChecker::scan_and_handle_kv(
2015
        std::string& start_key, const std::string& end_key,
2016
9
        std::function<int(std::string_view, std::string_view)> handle_kv) {
2017
9
    std::unique_ptr<Transaction> txn;
2018
9
    int ret = 0;
2019
9
    TxnErrorCode err = txn_kv_->create_txn(&txn);
2020
9
    if (err != TxnErrorCode::TXN_OK) {
2021
0
        LOG(WARNING) << "failed to init txn";
2022
0
        return -1;
2023
0
    }
2024
9
    std::unique_ptr<RangeGetIterator> it;
2025
9
    do {
2026
9
        err = txn->get(start_key, end_key, &it);
2027
9
        if (err != TxnErrorCode::TXN_OK) {
2028
0
            LOG(WARNING) << "failed to get tablet idx, ret=" << err;
2029
0
            return -1;
2030
0
        }
2031
2032
19
        while (it->has_next() && !stopped()) {
2033
10
            auto [k, v] = it->next();
2034
2035
10
            int handle_ret = handle_kv(k, v);
2036
10
            if (handle_ret == -1) {
2037
0
                return -1;
2038
10
            } else {
2039
10
                ret = std::max(ret, handle_ret);
2040
10
            }
2041
10
            if (!it->has_next()) {
2042
9
                start_key = k;
2043
9
            }
2044
10
        }
2045
9
        start_key = it->next_begin_key();
2046
9
    } while (it->more() && !stopped());
2047
9
    return ret;
2048
9
}
2049
2050
1
int InstanceChecker::do_restore_job_check() {
2051
1
    int64_t num_prepared = 0;
2052
1
    int64_t num_committed = 0;
2053
1
    int64_t num_dropped = 0;
2054
1
    int64_t num_completed = 0;
2055
1
    int64_t num_recycling = 0;
2056
1
    int64_t num_cost_many_time = 0;
2057
1
    const int64_t COST_MANY_THRESHOLD = 3600;
2058
2059
1
    using namespace std::chrono;
2060
1
    auto start_time = steady_clock::now();
2061
1
    DORIS_CLOUD_DEFER {
2062
1
        g_bvar_checker_restore_job_prepared_state.put(instance_id_, num_prepared);
2063
1
        g_bvar_checker_restore_job_committed_state.put(instance_id_, num_committed);
2064
1
        g_bvar_checker_restore_job_dropped_state.put(instance_id_, num_dropped);
2065
1
        g_bvar_checker_restore_job_completed_state.put(instance_id_, num_completed);
2066
1
        g_bvar_checker_restore_job_recycling_state.put(instance_id_, num_recycling);
2067
1
        g_bvar_checker_restore_job_cost_many_time.put(instance_id_, num_cost_many_time);
2068
1
        auto cost_ms =
2069
1
                duration_cast<std::chrono::milliseconds>(steady_clock::now() - start_time).count();
2070
1
        LOG(INFO) << "check instance restore jobs finished, cost=" << cost_ms
2071
1
                  << "ms. instance_id=" << instance_id_ << " num_prepared=" << num_prepared
2072
1
                  << " num_committed=" << num_committed << " num_dropped=" << num_dropped
2073
1
                  << " num_completed=" << num_completed << " num_recycling=" << num_recycling
2074
1
                  << " num_cost_many_time=" << num_cost_many_time;
2075
1
    };
2076
2077
1
    LOG_INFO("begin to check restore jobs").tag("instance_id", instance_id_);
2078
2079
1
    JobRestoreTabletKeyInfo restore_job_key_info0 {instance_id_, 0};
2080
1
    JobRestoreTabletKeyInfo restore_job_key_info1 {instance_id_, INT64_MAX};
2081
1
    std::string begin;
2082
1
    std::string end;
2083
1
    job_restore_tablet_key(restore_job_key_info0, &begin);
2084
1
    job_restore_tablet_key(restore_job_key_info1, &end);
2085
1
    std::unique_ptr<RangeGetIterator> it;
2086
1
    do {
2087
1
        std::unique_ptr<Transaction> txn;
2088
1
        TxnErrorCode err = txn_kv_->create_txn(&txn);
2089
1
        if (err != TxnErrorCode::TXN_OK) {
2090
0
            LOG(WARNING) << "failed to create txn";
2091
0
            return -1;
2092
0
        }
2093
1
        err = txn->get(begin, end, &it);
2094
1
        if (err != TxnErrorCode::TXN_OK) {
2095
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
2096
0
            return -1;
2097
0
        }
2098
1
        if (!it->has_next()) {
2099
0
            break;
2100
0
        }
2101
3
        while (it->has_next()) {
2102
3
            auto [k, v] = it->next();
2103
3
            RestoreJobCloudPB restore_job_pb;
2104
3
            if (!restore_job_pb.ParseFromArray(v.data(), v.size())) {
2105
0
                LOG_WARNING("malformed restore job value").tag("key", hex(k));
2106
0
                return -1;
2107
0
            }
2108
2109
3
            switch (restore_job_pb.state()) {
2110
1
            case RestoreJobCloudPB::PREPARED:
2111
1
                ++num_prepared;
2112
1
                break;
2113
1
            case RestoreJobCloudPB::COMMITTED:
2114
1
                ++num_committed;
2115
1
                break;
2116
0
            case RestoreJobCloudPB::DROPPED:
2117
0
                ++num_dropped;
2118
0
                break;
2119
1
            case RestoreJobCloudPB::COMPLETED:
2120
1
                ++num_completed;
2121
1
                break;
2122
0
            case RestoreJobCloudPB::RECYCLING:
2123
0
                ++num_recycling;
2124
0
                break;
2125
0
            default:
2126
0
                break;
2127
3
            }
2128
2129
3
            int64_t current_time = ::time(nullptr);
2130
3
            if ((restore_job_pb.state() == RestoreJobCloudPB::PREPARED ||
2131
3
                 restore_job_pb.state() == RestoreJobCloudPB::COMMITTED) &&
2132
3
                current_time > restore_job_pb.ctime_s() + COST_MANY_THRESHOLD) {
2133
                // restore job run more than 1 hour
2134
1
                ++num_cost_many_time;
2135
1
                LOG_WARNING("restore job cost too many time")
2136
1
                        .tag("key", hex(k))
2137
1
                        .tag("tablet_id", restore_job_pb.tablet_id())
2138
1
                        .tag("state", restore_job_pb.state())
2139
1
                        .tag("ctime_s", restore_job_pb.ctime_s())
2140
1
                        .tag("mtime_s", restore_job_pb.mtime_s());
2141
1
            }
2142
2143
3
            if (!it->has_next()) {
2144
1
                begin = k;
2145
1
                begin.push_back('\x00'); // Update to next smallest key for iteration
2146
1
                break;
2147
1
            }
2148
3
        }
2149
1
    } while (it->more() && !stopped());
2150
1
    return 0;
2151
1
}
2152
2153
} // namespace doris::cloud