Coverage Report

Created: 2025-07-23 15:33

/root/doris/cloud/src/recycler/checker.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "recycler/checker.h"
19
20
#include <aws/s3/S3Client.h>
21
#include <aws/s3/model/ListObjectsV2Request.h>
22
#include <butil/endpoint.h>
23
#include <butil/strings/string_split.h>
24
#include <fmt/core.h>
25
#include <gen_cpp/cloud.pb.h>
26
#include <gen_cpp/olap_file.pb.h>
27
#include <glog/logging.h>
28
29
#include <algorithm>
30
#include <chrono>
31
#include <cstdint>
32
#include <memory>
33
#include <mutex>
34
#include <numeric>
35
#include <sstream>
36
#include <string_view>
37
#include <unordered_set>
38
#include <vector>
39
40
#include "common/bvars.h"
41
#include "common/config.h"
42
#include "common/encryption_util.h"
43
#include "common/logging.h"
44
#include "common/util.h"
45
#include "cpp/sync_point.h"
46
#include "meta-store/keys.h"
47
#include "meta-store/txn_kv.h"
48
#include "meta-store/txn_kv_error.h"
49
#include "recycler/hdfs_accessor.h"
50
#include "recycler/s3_accessor.h"
51
#include "recycler/storage_vault_accessor.h"
52
#ifdef UNIT_TEST
53
#include "../test/mock_accessor.h"
54
#endif
55
#include "recycler/util.h"
56
57
namespace doris::cloud {
58
namespace config {
59
extern int32_t brpc_listen_port;
60
extern int32_t scan_instances_interval_seconds;
61
extern int32_t recycle_job_lease_expired_ms;
62
extern int32_t recycle_concurrency;
63
extern std::vector<std::string> recycle_whitelist;
64
extern std::vector<std::string> recycle_blacklist;
65
extern bool enable_inverted_check;
66
} // namespace config
67
68
5
Checker::Checker(std::shared_ptr<TxnKv> txn_kv) : txn_kv_(std::move(txn_kv)) {
69
5
    ip_port_ = std::string(butil::my_ip_cstr()) + ":" + std::to_string(config::brpc_listen_port);
70
5
}
71
72
5
Checker::~Checker() {
73
5
    if (!stopped()) {
74
1
        stop();
75
1
    }
76
5
}
77
78
4
int Checker::start() {
79
4
    DCHECK(txn_kv_);
80
4
    instance_filter_.reset(config::recycle_whitelist, config::recycle_blacklist);
81
82
    // launch instance scanner
83
4
    auto scanner_func = [this]() {
84
4
        std::this_thread::sleep_for(
85
4
                std::chrono::seconds(config::recycler_sleep_before_scheduling_seconds));
86
8
        while (!stopped()) {
87
4
            std::vector<InstanceInfoPB> instances;
88
4
            get_all_instances(txn_kv_.get(), instances);
89
4
            LOG(INFO) << "Checker get instances: " << [&instances] {
90
4
                std::stringstream ss;
91
30
                for (auto& i : instances) ss << ' ' << i.instance_id();
92
4
                return ss.str();
93
4
            }();
94
4
            if (!instances.empty()) {
95
                // enqueue instances
96
3
                std::lock_guard lock(mtx_);
97
30
                for (auto& instance : instances) {
98
30
                    if (instance_filter_.filter_out(instance.instance_id())) continue;
99
30
                    if (instance.status() == InstanceInfoPB::DELETED) continue;
100
30
                    using namespace std::chrono;
101
30
                    auto enqueue_time_s =
102
30
                            duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
103
30
                    auto [_, success] =
104
30
                            pending_instance_map_.insert({instance.instance_id(), enqueue_time_s});
105
                    // skip instance already in pending queue
106
30
                    if (success) {
107
30
                        pending_instance_queue_.push_back(std::move(instance));
108
30
                    }
109
30
                }
110
3
                pending_instance_cond_.notify_all();
111
3
            }
112
4
            {
113
4
                std::unique_lock lock(mtx_);
114
4
                notifier_.wait_for(lock,
115
4
                                   std::chrono::seconds(config::scan_instances_interval_seconds),
116
7
                                   [&]() { return stopped(); });
117
4
            }
118
4
        }
119
4
    };
120
4
    workers_.emplace_back(scanner_func);
121
    // Launch lease thread
122
4
    workers_.emplace_back([this] { lease_check_jobs(); });
123
    // Launch inspect thread
124
4
    workers_.emplace_back([this] { inspect_instance_check_interval(); });
125
126
    // launch check workers
127
8
    auto checker_func = [this]() {
128
38
        while (!stopped()) {
129
            // fetch instance to check
130
37
            InstanceInfoPB instance;
131
37
            long enqueue_time_s = 0;
132
37
            {
133
37
                std::unique_lock lock(mtx_);
134
50
                pending_instance_cond_.wait(lock, [&]() -> bool {
135
50
                    return !pending_instance_queue_.empty() || stopped();
136
50
                });
137
37
                if (stopped()) {
138
7
                    return;
139
7
                }
140
30
                instance = std::move(pending_instance_queue_.front());
141
30
                pending_instance_queue_.pop_front();
142
30
                enqueue_time_s = pending_instance_map_[instance.instance_id()];
143
30
                pending_instance_map_.erase(instance.instance_id());
144
30
            }
145
0
            const auto& instance_id = instance.instance_id();
146
30
            {
147
30
                std::lock_guard lock(mtx_);
148
                // skip instance in recycling
149
30
                if (working_instance_map_.count(instance_id)) continue;
150
30
            }
151
30
            auto checker = std::make_shared<InstanceChecker>(txn_kv_, instance.instance_id());
152
30
            if (checker->init(instance) != 0) {
153
0
                LOG(WARNING) << "failed to init instance checker, instance_id="
154
0
                             << instance.instance_id();
155
0
                continue;
156
0
            }
157
30
            std::string check_job_key;
158
30
            job_check_key({instance.instance_id()}, &check_job_key);
159
30
            int ret = prepare_instance_recycle_job(txn_kv_.get(), check_job_key,
160
30
                                                   instance.instance_id(), ip_port_,
161
30
                                                   config::check_object_interval_seconds * 1000);
162
30
            if (ret != 0) { // Prepare failed
163
20
                continue;
164
20
            } else {
165
10
                std::lock_guard lock(mtx_);
166
10
                working_instance_map_.emplace(instance_id, checker);
167
10
            }
168
10
            if (stopped()) return;
169
10
            using namespace std::chrono;
170
10
            auto ctime_ms =
171
10
                    duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
172
10
            g_bvar_checker_enqueue_cost_s.put(instance_id, ctime_ms / 1000 - enqueue_time_s);
173
174
10
            bool success {true};
175
176
10
            if (int ret = checker->do_check(); ret != 0) {
177
0
                success = false;
178
0
            }
179
180
10
            if (config::enable_inverted_check) {
181
0
                if (int ret = checker->do_inverted_check(); ret != 0) {
182
0
                    success = false;
183
0
                }
184
0
            }
185
186
10
            if (config::enable_delete_bitmap_inverted_check) {
187
0
                if (int ret = checker->do_delete_bitmap_inverted_check(); ret != 0) {
188
0
                    success = false;
189
0
                }
190
0
            }
191
192
10
            if (config::enable_mow_job_key_check) {
193
0
                if (int ret = checker->do_mow_job_key_check(); ret != 0) {
194
0
                    success = false;
195
0
                }
196
0
            }
197
198
10
            if (config::enable_delete_bitmap_storage_optimize_v2_check) {
199
0
                if (int ret = checker->do_delete_bitmap_storage_optimize_check(2 /*version*/);
200
0
                    ret != 0) {
201
0
                    success = false;
202
0
                }
203
0
            }
204
205
            // If instance checker has been aborted, don't finish this job
206
10
            if (!checker->stopped()) {
207
10
                finish_instance_recycle_job(txn_kv_.get(), check_job_key, instance.instance_id(),
208
10
                                            ip_port_, success, ctime_ms);
209
10
            }
210
10
            {
211
10
                std::lock_guard lock(mtx_);
212
10
                working_instance_map_.erase(instance.instance_id());
213
10
            }
214
10
        }
215
8
    };
216
4
    int num_threads = config::recycle_concurrency; // FIXME: use a new config entry?
217
12
    for (int i = 0; i < num_threads; ++i) {
218
8
        workers_.emplace_back(checker_func);
219
8
    }
220
4
    return 0;
221
4
}
222
223
5
void Checker::stop() {
224
5
    stopped_ = true;
225
5
    notifier_.notify_all();
226
5
    pending_instance_cond_.notify_all();
227
5
    {
228
5
        std::lock_guard lock(mtx_);
229
5
        for (auto& [_, checker] : working_instance_map_) {
230
0
            checker->stop();
231
0
        }
232
5
    }
233
20
    for (auto& w : workers_) {
234
20
        if (w.joinable()) w.join();
235
20
    }
236
5
}
237
238
4
void Checker::lease_check_jobs() {
239
55
    while (!stopped()) {
240
51
        std::vector<std::string> instances;
241
51
        instances.reserve(working_instance_map_.size());
242
51
        {
243
51
            std::lock_guard lock(mtx_);
244
51
            for (auto& [id, _] : working_instance_map_) {
245
30
                instances.push_back(id);
246
30
            }
247
51
        }
248
51
        for (auto& i : instances) {
249
30
            std::string check_job_key;
250
30
            job_check_key({i}, &check_job_key);
251
30
            int ret = lease_instance_recycle_job(txn_kv_.get(), check_job_key, i, ip_port_);
252
30
            if (ret == 1) {
253
0
                std::lock_guard lock(mtx_);
254
0
                if (auto it = working_instance_map_.find(i); it != working_instance_map_.end()) {
255
0
                    it->second->stop();
256
0
                }
257
0
            }
258
30
        }
259
51
        {
260
51
            std::unique_lock lock(mtx_);
261
51
            notifier_.wait_for(lock,
262
51
                               std::chrono::milliseconds(config::recycle_job_lease_expired_ms / 3),
263
102
                               [&]() { return stopped(); });
264
51
        }
265
51
    }
266
4
}
267
268
0
#define LOG_CHECK_INTERVAL_ALARM LOG(WARNING) << "Err for check interval: "
269
34
void Checker::do_inspect(const InstanceInfoPB& instance) {
270
34
    std::string check_job_key = job_check_key({instance.instance_id()});
271
34
    std::unique_ptr<Transaction> txn;
272
34
    std::string val;
273
34
    TxnErrorCode err = txn_kv_->create_txn(&txn);
274
34
    if (err != TxnErrorCode::TXN_OK) {
275
0
        LOG_CHECK_INTERVAL_ALARM << "failed to create txn";
276
0
        return;
277
0
    }
278
34
    err = txn->get(check_job_key, &val);
279
34
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
280
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get kv, err=" << err
281
0
                                 << " key=" << hex(check_job_key);
282
0
        return;
283
0
    }
284
34
    auto checker = InstanceChecker(txn_kv_, instance.instance_id());
285
34
    if (checker.init(instance) != 0) {
286
0
        LOG_CHECK_INTERVAL_ALARM << "failed to init instance checker, instance_id="
287
0
                                 << instance.instance_id();
288
0
        return;
289
0
    }
290
291
34
    int64_t bucket_lifecycle_days = 0;
292
34
    if (checker.get_bucket_lifecycle(&bucket_lifecycle_days) != 0) {
293
0
        LOG_CHECK_INTERVAL_ALARM << "failed to get bucket lifecycle, instance_id="
294
0
                                 << instance.instance_id();
295
0
        return;
296
0
    }
297
34
    DCHECK(bucket_lifecycle_days > 0);
298
299
34
    if (bucket_lifecycle_days == INT64_MAX) {
300
        // No s3 bucket (may all accessors are HdfsAccessor), skip inspect
301
34
        return;
302
34
    }
303
304
0
    int64_t last_ctime_ms = -1;
305
0
    auto job_status = JobRecyclePB::IDLE;
306
0
    auto has_last_ctime = [&]() {
307
0
        JobRecyclePB job_info;
308
0
        if (!job_info.ParseFromString(val)) {
309
0
            LOG_CHECK_INTERVAL_ALARM << "failed to parse JobRecyclePB, key=" << hex(check_job_key);
310
0
        }
311
0
        DCHECK(job_info.instance_id() == instance.instance_id());
312
0
        if (!job_info.has_last_ctime_ms()) return false;
313
0
        last_ctime_ms = job_info.last_ctime_ms();
314
0
        job_status = job_info.status();
315
0
        g_bvar_checker_last_success_time_ms.put(instance.instance_id(),
316
0
                                                job_info.last_success_time_ms());
317
0
        return true;
318
0
    };
319
0
    using namespace std::chrono;
320
0
    auto now = duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
321
0
    if (err == TxnErrorCode::TXN_KEY_NOT_FOUND || !has_last_ctime()) {
322
        // Use instance's ctime for instances that do not have job's last ctime
323
0
        last_ctime_ms = instance.ctime();
324
0
    }
325
0
    DCHECK(now - last_ctime_ms >= 0);
326
0
    int64_t expiration_ms =
327
0
            bucket_lifecycle_days > config::reserved_buffer_days
328
0
                    ? (bucket_lifecycle_days - config::reserved_buffer_days) * 86400000
329
0
                    : bucket_lifecycle_days * 86400000;
330
0
    TEST_SYNC_POINT_CALLBACK("Checker:do_inspect", &last_ctime_ms);
331
0
    if (now - last_ctime_ms >= expiration_ms) {
332
0
        LOG_CHECK_INTERVAL_ALARM << "check risks, instance_id: " << instance.instance_id()
333
0
                                 << " last_ctime_ms: " << last_ctime_ms
334
0
                                 << " job_status: " << job_status
335
0
                                 << " bucket_lifecycle_days: " << bucket_lifecycle_days
336
0
                                 << " reserved_buffer_days: " << config::reserved_buffer_days
337
0
                                 << " expiration_ms: " << expiration_ms;
338
0
    }
339
0
}
340
#undef LOG_CHECK_INTERVAL_ALARM
341
4
void Checker::inspect_instance_check_interval() {
342
8
    while (!stopped()) {
343
4
        LOG(INFO) << "start to inspect instance check interval";
344
4
        std::vector<InstanceInfoPB> instances;
345
4
        get_all_instances(txn_kv_.get(), instances);
346
30
        for (const auto& instance : instances) {
347
30
            if (instance_filter_.filter_out(instance.instance_id())) continue;
348
30
            if (stopped()) return;
349
30
            if (instance.status() == InstanceInfoPB::DELETED) continue;
350
30
            do_inspect(instance);
351
30
        }
352
4
        {
353
4
            std::unique_lock lock(mtx_);
354
4
            notifier_.wait_for(lock, std::chrono::seconds(config::scan_instances_interval_seconds),
355
7
                               [&]() { return stopped(); });
356
4
        }
357
4
    }
358
4
}
359
360
// return 0 for success get a key, 1 for key not found, negative for error
361
5.01k
int key_exist(TxnKv* txn_kv, std::string_view key) {
362
5.01k
    std::unique_ptr<Transaction> txn;
363
5.01k
    TxnErrorCode err = txn_kv->create_txn(&txn);
364
5.01k
    if (err != TxnErrorCode::TXN_OK) {
365
0
        LOG(WARNING) << "failed to init txn, err=" << err;
366
0
        return -1;
367
0
    }
368
5.01k
    std::string val;
369
5.01k
    switch (txn->get(key, &val)) {
370
3.01k
    case TxnErrorCode::TXN_OK:
371
3.01k
        return 0;
372
2.00k
    case TxnErrorCode::TXN_KEY_NOT_FOUND:
373
2.00k
        return 1;
374
0
    default:
375
0
        return -1;
376
5.01k
    }
377
5.01k
}
378
379
InstanceChecker::InstanceChecker(std::shared_ptr<TxnKv> txn_kv, const std::string& instance_id)
380
75
        : txn_kv_(std::move(txn_kv)), instance_id_(instance_id) {}
381
382
75
int InstanceChecker::init(const InstanceInfoPB& instance) {
383
75
    int ret = init_obj_store_accessors(instance);
384
75
    if (ret != 0) {
385
0
        return ret;
386
0
    }
387
388
75
    return init_storage_vault_accessors(instance);
389
75
}
390
391
75
int InstanceChecker::init_obj_store_accessors(const InstanceInfoPB& instance) {
392
75
    for (const auto& obj_info : instance.obj_info()) {
393
75
#ifdef UNIT_TEST
394
75
        auto accessor = std::make_shared<MockAccessor>();
395
#else
396
        auto s3_conf = S3Conf::from_obj_store_info(obj_info);
397
        if (!s3_conf) {
398
            LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
399
            return -1;
400
        }
401
402
        std::shared_ptr<S3Accessor> accessor;
403
        int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
404
        if (ret != 0) {
405
            LOG(WARNING) << "failed to init object accessor. instance_id=" << instance_id_
406
                         << " resource_id=" << obj_info.id();
407
            return ret;
408
        }
409
#endif
410
411
75
        accessor_map_.emplace(obj_info.id(), std::move(accessor));
412
75
    }
413
414
75
    return 0;
415
75
}
416
417
75
int InstanceChecker::init_storage_vault_accessors(const InstanceInfoPB& instance) {
418
75
    if (instance.resource_ids().empty()) {
419
75
        return 0;
420
75
    }
421
422
0
    FullRangeGetIteratorOptions opts(txn_kv_);
423
0
    opts.prefetch = true;
424
0
    auto it = txn_kv_->full_range_get(storage_vault_key({instance_id_, ""}),
425
0
                                      storage_vault_key({instance_id_, "\xff"}), std::move(opts));
426
427
0
    for (auto kv = it->next(); kv.has_value(); kv = it->next()) {
428
0
        auto [k, v] = *kv;
429
0
        StorageVaultPB vault;
430
0
        if (!vault.ParseFromArray(v.data(), v.size())) {
431
0
            LOG(WARNING) << "malformed storage vault, unable to deserialize key=" << hex(k);
432
0
            return -1;
433
0
        }
434
0
        TEST_SYNC_POINT_CALLBACK("InstanceRecycler::init_storage_vault_accessors.mock_vault",
435
0
                                 &accessor_map_, &vault);
436
0
        if (vault.has_hdfs_info()) {
437
0
            auto accessor = std::make_shared<HdfsAccessor>(vault.hdfs_info());
438
0
            int ret = accessor->init();
439
0
            if (ret != 0) {
440
0
                LOG(WARNING) << "failed to init hdfs accessor. instance_id=" << instance_id_
441
0
                             << " resource_id=" << vault.id() << " name=" << vault.name();
442
0
                return ret;
443
0
            }
444
445
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
446
0
        } else if (vault.has_obj_info()) {
447
0
#ifdef UNIT_TEST
448
0
            auto accessor = std::make_shared<MockAccessor>();
449
#else
450
            auto s3_conf = S3Conf::from_obj_store_info(vault.obj_info());
451
            if (!s3_conf) {
452
                LOG(WARNING) << "failed to init object accessor, instance_id=" << instance_id_;
453
                return -1;
454
            }
455
456
            std::shared_ptr<S3Accessor> accessor;
457
            int ret = S3Accessor::create(std::move(*s3_conf), &accessor);
458
            if (ret != 0) {
459
                LOG(WARNING) << "failed to init s3 accessor. instance_id=" << instance_id_
460
                             << " resource_id=" << vault.id() << " name=" << vault.name();
461
                return ret;
462
            }
463
#endif
464
465
0
            accessor_map_.emplace(vault.id(), std::move(accessor));
466
0
        }
467
0
    }
468
469
0
    if (!it->is_valid()) {
470
0
        LOG_WARNING("failed to get storage vault kv");
471
0
        return -1;
472
0
    }
473
0
    return 0;
474
0
}
475
476
13
int InstanceChecker::do_check() {
477
13
    TEST_SYNC_POINT("InstanceChecker.do_check");
478
13
    LOG(INFO) << "begin to check instance objects instance_id=" << instance_id_;
479
13
    int check_ret = 0;
480
13
    long num_scanned = 0;
481
13
    long num_scanned_with_segment = 0;
482
13
    long num_rowset_loss = 0;
483
13
    long instance_volume = 0;
484
13
    using namespace std::chrono;
485
13
    auto start_time = steady_clock::now();
486
13
    DORIS_CLOUD_DEFER {
487
13
        auto cost = duration<float>(steady_clock::now() - start_time).count();
488
13
        LOG(INFO) << "check instance objects finished, cost=" << cost
489
13
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
490
13
                  << " num_scanned_with_segment=" << num_scanned_with_segment
491
13
                  << " num_rowset_loss=" << num_rowset_loss
492
13
                  << " instance_volume=" << instance_volume;
493
13
        g_bvar_checker_num_scanned.put(instance_id_, num_scanned);
494
13
        g_bvar_checker_num_scanned_with_segment.put(instance_id_, num_scanned_with_segment);
495
13
        g_bvar_checker_num_check_failed.put(instance_id_, num_rowset_loss);
496
13
        g_bvar_checker_check_cost_s.put(instance_id_, static_cast<long>(cost));
497
        // FIXME(plat1ko): What if some list operation failed?
498
13
        g_bvar_checker_instance_volume.put(instance_id_, instance_volume);
499
13
    };
500
501
13
    struct TabletFiles {
502
13
        int64_t tablet_id {0};
503
13
        std::unordered_set<std::string> files;
504
13
    };
505
13
    TabletFiles tablet_files_cache;
506
507
13
    auto check_rowset_objects = [&, this](const doris::RowsetMetaCloudPB& rs_meta,
508
5.00k
                                          std::string_view key) {
509
5.00k
        if (rs_meta.num_segments() == 0) {
510
0
            return;
511
0
        }
512
513
5.00k
        ++num_scanned_with_segment;
514
5.00k
        if (tablet_files_cache.tablet_id != rs_meta.tablet_id()) {
515
500
            long tablet_volume = 0;
516
            // Clear cache
517
500
            tablet_files_cache.tablet_id = 0;
518
500
            tablet_files_cache.files.clear();
519
            // Get all file paths under this tablet directory
520
500
            auto find_it = accessor_map_.find(rs_meta.resource_id());
521
500
            if (find_it == accessor_map_.end()) {
522
0
                LOG_WARNING("resource id not found in accessor map")
523
0
                        .tag("resource_id", rs_meta.resource_id())
524
0
                        .tag("tablet_id", rs_meta.tablet_id())
525
0
                        .tag("rowset_id", rs_meta.rowset_id_v2());
526
0
                check_ret = -1;
527
0
                return;
528
0
            }
529
530
500
            std::unique_ptr<ListIterator> list_iter;
531
500
            int ret = find_it->second->list_directory(tablet_path_prefix(rs_meta.tablet_id()),
532
500
                                                      &list_iter);
533
500
            if (ret != 0) { // No need to log, because S3Accessor has logged this error
534
0
                check_ret = -1;
535
0
                return;
536
0
            }
537
538
19.4k
            for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
539
18.9k
                tablet_files_cache.files.insert(std::move(file->path));
540
18.9k
                tablet_volume += file->size;
541
18.9k
            }
542
500
            tablet_files_cache.tablet_id = rs_meta.tablet_id();
543
500
            instance_volume += tablet_volume;
544
500
        }
545
546
5.00k
        bool data_loss = false;
547
18.0k
        for (int i = 0; i < rs_meta.num_segments(); ++i) {
548
13.0k
            auto path = segment_path(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i);
549
550
13.0k
            if (tablet_files_cache.files.contains(path)) {
551
12.9k
                continue;
552
12.9k
            }
553
554
11
            if (1 == key_exist(txn_kv_.get(), key)) {
555
                // Rowset has been deleted instead of data loss
556
0
                break;
557
0
            }
558
11
            data_loss = true;
559
11
            TEST_SYNC_POINT_CALLBACK("InstanceChecker.do_check1", &path);
560
11
            LOG(WARNING) << "object not exist, path=" << path
561
11
                         << ", rs_meta=" << rs_meta.ShortDebugString() << " key=" << hex(key);
562
11
        }
563
564
5.00k
        std::vector<std::pair<int64_t, std::string>> index_ids;
565
5.00k
        for (const auto& i : rs_meta.tablet_schema().index()) {
566
5.00k
            if (i.has_index_type() && i.index_type() == IndexType::INVERTED) {
567
5.00k
                index_ids.emplace_back(i.index_id(), i.index_suffix_name());
568
5.00k
            }
569
5.00k
        }
570
5.00k
        std::string tablet_idx_key = meta_tablet_idx_key({instance_id_, rs_meta.tablet_id()});
571
5.00k
        if (!key_exist(txn_kv_.get(), tablet_idx_key)) {
572
10.0k
            for (int i = 0; i < rs_meta.num_segments(); ++i) {
573
7.00k
                std::vector<std::string> index_path_v;
574
7.00k
                std::vector<std::string> loss_file_path;
575
7.00k
                if (rs_meta.tablet_schema().inverted_index_storage_format() ==
576
7.00k
                    InvertedIndexStorageFormatPB::V1) {
577
9.00k
                    for (const auto& index_id : index_ids) {
578
9.00k
                        LOG(INFO) << "check inverted index, tablet_id=" << rs_meta.tablet_id()
579
9.00k
                                  << " rowset_id=" << rs_meta.rowset_id_v2()
580
9.00k
                                  << " segment_index=" << i << " index_id=" << index_id.first
581
9.00k
                                  << " index_suffix_name=" << index_id.second;
582
9.00k
                        index_path_v.emplace_back(
583
9.00k
                                inverted_index_path_v1(rs_meta.tablet_id(), rs_meta.rowset_id_v2(),
584
9.00k
                                                       i, index_id.first, index_id.second));
585
9.00k
                    }
586
7.00k
                } else {
587
0
                    index_path_v.emplace_back(
588
0
                            inverted_index_path_v2(rs_meta.tablet_id(), rs_meta.rowset_id_v2(), i));
589
0
                }
590
591
7.00k
                if (!index_path_v.empty()) {
592
7.00k
                    if (std::all_of(index_path_v.begin(), index_path_v.end(),
593
7.00k
                                    [&](const auto& idx_file_path) {
594
7.00k
                                        if (!tablet_files_cache.files.contains(idx_file_path)) {
595
1.00k
                                            loss_file_path.emplace_back(idx_file_path);
596
1.00k
                                            return false;
597
1.00k
                                        }
598
6.00k
                                        return true;
599
7.00k
                                    })) {
600
6.00k
                        continue;
601
6.00k
                    }
602
7.00k
                }
603
604
1.00k
                data_loss = true;
605
1.00k
                LOG(WARNING) << "object not exist, path="
606
1.00k
                             << std::accumulate(loss_file_path.begin(), loss_file_path.end(),
607
1.00k
                                                std::string(),
608
1.00k
                                                [](const auto& a, const auto& b) {
609
1.00k
                                                    return a.empty() ? b : a + ", " + b;
610
1.00k
                                                })
611
1.00k
                             << " key=" << hex(tablet_idx_key);
612
1.00k
            }
613
3.00k
        }
614
615
5.00k
        if (data_loss) {
616
1.00k
            ++num_rowset_loss;
617
1.00k
        }
618
5.00k
    };
619
620
    // scan visible rowsets
621
13
    auto start_key = meta_rowset_key({instance_id_, 0, 0});
622
13
    auto end_key = meta_rowset_key({instance_id_, INT64_MAX, 0});
623
624
13
    std::unique_ptr<RangeGetIterator> it;
625
13
    do {
626
13
        std::unique_ptr<Transaction> txn;
627
13
        TxnErrorCode err = txn_kv_->create_txn(&txn);
628
13
        if (err != TxnErrorCode::TXN_OK) {
629
0
            LOG(WARNING) << "failed to init txn, err=" << err;
630
0
            return -1;
631
0
        }
632
633
13
        err = txn->get(start_key, end_key, &it);
634
13
        if (err != TxnErrorCode::TXN_OK) {
635
0
            LOG(WARNING) << "internal error, failed to get rowset meta, err=" << err;
636
0
            return -1;
637
0
        }
638
13
        num_scanned += it->size();
639
640
5.01k
        while (it->has_next() && !stopped()) {
641
5.00k
            auto [k, v] = it->next();
642
5.00k
            if (!it->has_next()) start_key = k;
643
644
5.00k
            doris::RowsetMetaCloudPB rs_meta;
645
5.00k
            if (!rs_meta.ParseFromArray(v.data(), v.size())) {
646
0
                ++num_rowset_loss;
647
0
                LOG(WARNING) << "malformed rowset meta. key=" << hex(k) << " val=" << hex(v);
648
0
                continue;
649
0
            }
650
5.00k
            check_rowset_objects(rs_meta, k);
651
5.00k
        }
652
13
        start_key.push_back('\x00'); // Update to next smallest key for iteration
653
13
    } while (it->more() && !stopped());
654
655
13
    return num_rowset_loss > 0 ? 1 : check_ret;
656
13
}
657
658
34
int InstanceChecker::get_bucket_lifecycle(int64_t* lifecycle_days) {
659
    // If there are multiple buckets, return the minimum lifecycle.
660
34
    int64_t min_lifecycle_days = INT64_MAX;
661
34
    int64_t tmp_liefcycle_days = 0;
662
34
    for (const auto& [id, accessor] : accessor_map_) {
663
34
        if (accessor->type() != AccessorType::S3) {
664
34
            continue;
665
34
        }
666
667
0
        auto* s3_accessor = static_cast<S3Accessor*>(accessor.get());
668
669
0
        if (s3_accessor->check_versioning() != 0) {
670
0
            return -1;
671
0
        }
672
673
0
        if (s3_accessor->get_life_cycle(&tmp_liefcycle_days) != 0) {
674
0
            return -1;
675
0
        }
676
677
0
        if (tmp_liefcycle_days < min_lifecycle_days) {
678
0
            min_lifecycle_days = tmp_liefcycle_days;
679
0
        }
680
0
    }
681
34
    *lifecycle_days = min_lifecycle_days;
682
34
    return 0;
683
34
}
684
685
2
int InstanceChecker::do_inverted_check() {
686
2
    if (accessor_map_.size() > 1) {
687
0
        LOG(INFO) << "currently not support inverted check for multi accessor. instance_id="
688
0
                  << instance_id_;
689
0
        return 0;
690
0
    }
691
692
2
    LOG(INFO) << "begin to inverted check objects instance_id=" << instance_id_;
693
2
    int check_ret = 0;
694
2
    long num_scanned = 0;
695
2
    long num_file_leak = 0;
696
2
    using namespace std::chrono;
697
2
    auto start_time = steady_clock::now();
698
2
    DORIS_CLOUD_DEFER {
699
2
        g_bvar_inverted_checker_num_scanned.put(instance_id_, num_scanned);
700
2
        g_bvar_inverted_checker_num_check_failed.put(instance_id_, num_file_leak);
701
2
        auto cost = duration<float>(steady_clock::now() - start_time).count();
702
2
        LOG(INFO) << "inverted check instance objects finished, cost=" << cost
703
2
                  << "s. instance_id=" << instance_id_ << " num_scanned=" << num_scanned
704
2
                  << " num_file_leak=" << num_file_leak;
705
2
    };
706
707
2
    struct TabletRowsets {
708
2
        int64_t tablet_id {0};
709
2
        std::unordered_set<std::string> rowset_ids;
710
2
    };
711
2
    TabletRowsets tablet_rowsets_cache;
712
713
2
    struct TabletIndexes {
714
2
        int64_t tablet_id {0};
715
2
        std::unordered_set<int64_t> index_ids;
716
2
    };
717
2
    TabletIndexes tablet_indexes_cache;
718
719
    // Return 0 if check success, return 1 if file is garbage data, negative if error occurred
720
3.90k
    auto check_segment_file = [&](const std::string& obj_key) {
721
3.90k
        std::vector<std::string> str;
722
3.90k
        butil::SplitString(obj_key, '/', &str);
723
        // data/{tablet_id}/{rowset_id}_{seg_num}.dat
724
3.90k
        if (str.size() < 3) {
725
0
            return -1;
726
0
        }
727
728
3.90k
        int64_t tablet_id = atol(str[1].c_str());
729
3.90k
        if (tablet_id <= 0) {
730
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
731
0
            return -1;
732
0
        }
733
734
3.90k
        std::string rowset_id;
735
3.90k
        if (auto pos = str.back().find('_'); pos != std::string::npos) {
736
3.90k
            rowset_id = str.back().substr(0, pos);
737
3.90k
        } else {
738
0
            LOG(WARNING) << "failed to parse rowset_id, key=" << obj_key;
739
0
            return -1;
740
0
        }
741
742
3.90k
        if (tablet_rowsets_cache.tablet_id == tablet_id) {
743
3.80k
            if (tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
744
3.80k
                return 0;
745
3.80k
            } else {
746
0
                LOG(WARNING) << "rowset not exists, key=" << obj_key;
747
0
                return -1;
748
0
            }
749
3.80k
        }
750
        // Get all rowset id of this tablet
751
100
        tablet_rowsets_cache.tablet_id = tablet_id;
752
100
        tablet_rowsets_cache.rowset_ids.clear();
753
100
        std::unique_ptr<Transaction> txn;
754
100
        TxnErrorCode err = txn_kv_->create_txn(&txn);
755
100
        if (err != TxnErrorCode::TXN_OK) {
756
0
            LOG(WARNING) << "failed to create txn";
757
0
            return -1;
758
0
        }
759
100
        std::unique_ptr<RangeGetIterator> it;
760
100
        auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
761
100
        auto end = meta_rowset_key({instance_id_, tablet_id, INT64_MAX});
762
100
        do {
763
100
            TxnErrorCode err = txn->get(begin, end, &it);
764
100
            if (err != TxnErrorCode::TXN_OK) {
765
0
                LOG(WARNING) << "failed to get rowset kv, err=" << err;
766
0
                return -1;
767
0
            }
768
100
            if (!it->has_next()) {
769
0
                break;
770
0
            }
771
1.00k
            while (it->has_next()) {
772
                // recycle corresponding resources
773
1.00k
                auto [k, v] = it->next();
774
1.00k
                doris::RowsetMetaCloudPB rowset;
775
1.00k
                if (!rowset.ParseFromArray(v.data(), v.size())) {
776
0
                    LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
777
0
                    return -1;
778
0
                }
779
1.00k
                tablet_rowsets_cache.rowset_ids.insert(rowset.rowset_id_v2());
780
1.00k
                if (!it->has_next()) {
781
100
                    begin = k;
782
100
                    begin.push_back('\x00'); // Update to next smallest key for iteration
783
100
                    break;
784
100
                }
785
1.00k
            }
786
100
        } while (it->more() && !stopped());
787
788
100
        if (!tablet_rowsets_cache.rowset_ids.contains(rowset_id)) {
789
            // Garbage data leak
790
0
            LOG(WARNING) << "rowset should be recycled, key=" << obj_key;
791
0
            return 1;
792
0
        }
793
794
100
        return 0;
795
100
    };
796
3.90k
    auto check_inverted_index_file = [&](const std::string& obj_key) {
797
3.90k
        std::vector<std::string> str;
798
3.90k
        butil::SplitString(obj_key, '/', &str);
799
        // data/{tablet_id}/{rowset_id}_{seg_num}_{idx_id}{idx_suffix}.idx
800
3.90k
        if (str.size() < 3) {
801
0
            return -1;
802
0
        }
803
804
3.90k
        int64_t tablet_id = atol(str[1].c_str());
805
3.90k
        if (tablet_id <= 0) {
806
0
            LOG(WARNING) << "failed to parse tablet_id, key=" << obj_key;
807
0
            return -1;
808
0
        }
809
810
3.90k
        if (!str.back().ends_with(".idx")) {
811
1.00k
            return 0; // Not an index file
812
1.00k
        }
813
814
2.90k
        int64_t index_id;
815
816
2.90k
        size_t pos = str.back().find_last_of('_');
817
2.90k
        if (pos == std::string::npos || pos + 1 >= str.back().size() - 4) {
818
0
            LOG(WARNING) << "Invalid index_id format, key=" << obj_key;
819
0
            return -1;
820
0
        }
821
2.90k
        index_id = atol(str.back().substr(pos + 1, str.back().size() - 4).c_str());
822
823
2.90k
        if (tablet_indexes_cache.tablet_id == tablet_id) {
824
2.80k
            if (tablet_indexes_cache.index_ids.contains(index_id)) {
825
1.00k
                return 0;
826
1.80k
            } else {
827
1.80k
                LOG(WARNING) << "index not exists, key=" << obj_key;
828
1.80k
                return -1;
829
1.80k
            }
830
2.80k
        }
831
        // Get all index id of this tablet
832
100
        tablet_indexes_cache.tablet_id = tablet_id;
833
100
        tablet_indexes_cache.index_ids.clear();
834
100
        std::unique_ptr<Transaction> txn;
835
100
        TxnErrorCode err = txn_kv_->create_txn(&txn);
836
100
        if (err != TxnErrorCode::TXN_OK) {
837
0
            LOG(WARNING) << "failed to create txn";
838
0
            return -1;
839
0
        }
840
100
        auto tablet_idx_key = meta_tablet_idx_key({instance_id_, tablet_id});
841
100
        std::string tablet_idx_val;
842
100
        err = txn->get(tablet_idx_key, &tablet_idx_val);
843
100
        if (err != TxnErrorCode::TXN_OK) {
844
0
            LOG(WARNING) << "failed to get tablet idx,"
845
0
                         << " key=" << hex(tablet_idx_key) << " err=" << err;
846
0
            return -1;
847
0
        }
848
849
100
        TabletIndexPB tablet_idx_pb;
850
100
        if (!tablet_idx_pb.ParseFromArray(tablet_idx_val.data(), tablet_idx_val.size())) {
851
0
            LOG(WARNING) << "malformed index meta value, key=" << hex(tablet_idx_key);
852
0
            return -1;
853
0
        }
854
100
        if (!tablet_idx_pb.has_index_id()) {
855
0
            LOG(WARNING) << "tablet index meta does not have index_id, key=" << hex(tablet_idx_key);
856
0
            return -1;
857
0
        }
858
100
        tablet_indexes_cache.index_ids.insert(tablet_idx_pb.index_id());
859
860
100
        if (!tablet_indexes_cache.index_ids.contains(index_id)) {
861
100
            LOG(WARNING) << "index should be recycled, key=" << obj_key;
862
100
            return 1;
863
100
        }
864
865
0
        return 0;
866
100
    };
867
    // so we choose to skip here.
868
2
    TEST_SYNC_POINT_RETURN_WITH_VALUE("InstanceChecker::do_inverted_check", (int)0);
869
870
1
    for (auto& [_, accessor] : accessor_map_) {
871
1
        std::unique_ptr<ListIterator> list_iter;
872
1
        int ret = accessor->list_directory("data", &list_iter);
873
1
        if (ret != 0) {
874
0
            return -1;
875
0
        }
876
877
3.90k
        for (auto file = list_iter->next(); file.has_value(); file = list_iter->next()) {
878
3.90k
            ++num_scanned;
879
3.90k
            int ret = check_segment_file(file->path);
880
3.90k
            if (ret != 0) {
881
0
                LOG(WARNING) << "failed to check segment file, uri=" << accessor->uri()
882
0
                             << " path=" << file->path;
883
0
                if (ret == 1) {
884
0
                    ++num_file_leak;
885
0
                } else {
886
0
                    check_ret = -1;
887
0
                }
888
0
            }
889
3.90k
            ret = check_inverted_index_file(file->path);
890
3.90k
            if (ret != 0) {
891
1.90k
                LOG(WARNING) << "failed to check index file, uri=" << accessor->uri()
892
1.90k
                             << " path=" << file->path;
893
1.90k
                if (ret == 1) {
894
100
                    ++num_file_leak;
895
1.80k
                } else {
896
1.80k
                    check_ret = -1;
897
1.80k
                }
898
1.90k
            }
899
3.90k
        }
900
901
1
        if (!list_iter->is_valid()) {
902
0
            LOG(WARNING) << "failed to list data directory. uri=" << accessor->uri();
903
0
            return -1;
904
0
        }
905
1
    }
906
1
    return num_file_leak > 0 ? 1 : check_ret;
907
1
}
908
909
3
int InstanceChecker::traverse_mow_tablet(const std::function<int(int64_t)>& check_func) {
910
3
    std::unique_ptr<RangeGetIterator> it;
911
3
    auto begin = meta_rowset_key({instance_id_, 0, 0});
912
3
    auto end = meta_rowset_key({instance_id_, std::numeric_limits<int64_t>::max(), 0});
913
42
    do {
914
42
        std::unique_ptr<Transaction> txn;
915
42
        TxnErrorCode err = txn_kv_->create_txn(&txn);
916
42
        if (err != TxnErrorCode::TXN_OK) {
917
0
            LOG(WARNING) << "failed to create txn";
918
0
            return -1;
919
0
        }
920
42
        err = txn->get(begin, end, &it, false, 1);
921
42
        if (err != TxnErrorCode::TXN_OK) {
922
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
923
0
            return -1;
924
0
        }
925
42
        if (!it->has_next()) {
926
3
            break;
927
3
        }
928
78
        while (it->has_next() && !stopped()) {
929
39
            auto [k, v] = it->next();
930
39
            std::string_view k1 = k;
931
39
            k1.remove_prefix(1);
932
39
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
933
39
            decode_key(&k1, &out);
934
            // 0x01 "meta" ${instance_id} "rowset" ${tablet_id} ${version} -> RowsetMetaCloudPB
935
39
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
936
937
39
            if (!it->has_next()) {
938
                // Update to next smallest key for iteration
939
                // scan for next tablet in this instance
940
39
                begin = meta_rowset_key({instance_id_, tablet_id + 1, 0});
941
39
            }
942
943
39
            TabletMetaCloudPB tablet_meta;
944
39
            int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
945
39
            if (ret < 0) {
946
0
                LOG(WARNING) << fmt::format(
947
0
                        "failed to get_tablet_meta in do_delete_bitmap_integrity_check(), "
948
0
                        "instance_id={}, tablet_id={}",
949
0
                        instance_id_, tablet_id);
950
0
                return ret;
951
0
            }
952
953
39
            if (tablet_meta.enable_unique_key_merge_on_write()) {
954
                // only check merge-on-write table
955
29
                int ret = check_func(tablet_id);
956
29
                if (ret < 0) {
957
                    // return immediately when encounter unexpected error,
958
                    // otherwise, we continue to check the next tablet
959
0
                    return ret;
960
0
                }
961
29
            }
962
39
        }
963
39
    } while (it->more() && !stopped());
964
3
    return 0;
965
3
}
966
967
int InstanceChecker::traverse_rowset_delete_bitmaps(
968
        int64_t tablet_id, std::string rowset_id,
969
0
        const std::function<int(int64_t, std::string_view, int64_t, int64_t)>& callback) {
970
0
    std::unique_ptr<RangeGetIterator> it;
971
0
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id, 0, 0});
972
0
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id, rowset_id,
973
0
                                       std::numeric_limits<int64_t>::max(),
974
0
                                       std::numeric_limits<int64_t>::max()});
975
0
    do {
976
0
        std::unique_ptr<Transaction> txn;
977
0
        TxnErrorCode err = txn_kv_->create_txn(&txn);
978
0
        if (err != TxnErrorCode::TXN_OK) {
979
0
            LOG(WARNING) << "failed to create txn";
980
0
            return -1;
981
0
        }
982
0
        err = txn->get(begin, end, &it);
983
0
        if (err != TxnErrorCode::TXN_OK) {
984
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
985
0
            return -1;
986
0
        }
987
0
        if (!it->has_next()) {
988
0
            break;
989
0
        }
990
0
        while (it->has_next() && !stopped()) {
991
0
            auto [k, v] = it->next();
992
0
            std::string_view k1 = k;
993
0
            k1.remove_prefix(1);
994
0
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
995
0
            decode_key(&k1, &out);
996
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
997
0
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
998
0
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
999
1000
0
            int ret = callback(tablet_id, rowset_id, version, segment_id);
1001
0
            if (ret != 0) {
1002
0
                return ret;
1003
0
            }
1004
1005
0
            if (!it->has_next()) {
1006
0
                begin = k;
1007
0
                begin.push_back('\x00'); // Update to next smallest key for iteration
1008
0
                break;
1009
0
            }
1010
0
        }
1011
0
    } while (it->more() && !stopped());
1012
1013
0
    return 0;
1014
0
}
1015
1016
int InstanceChecker::collect_tablet_rowsets(
1017
49
        int64_t tablet_id, const std::function<void(const doris::RowsetMetaCloudPB&)>& collect_cb) {
1018
49
    std::unique_ptr<Transaction> txn;
1019
49
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1020
49
    if (err != TxnErrorCode::TXN_OK) {
1021
0
        LOG(WARNING) << "failed to create txn";
1022
0
        return -1;
1023
0
    }
1024
49
    std::unique_ptr<RangeGetIterator> it;
1025
49
    auto begin = meta_rowset_key({instance_id_, tablet_id, 0});
1026
49
    auto end = meta_rowset_key({instance_id_, tablet_id + 1, 0});
1027
1028
49
    int64_t rowsets_num {0};
1029
49
    do {
1030
49
        TxnErrorCode err = txn->get(begin, end, &it);
1031
49
        if (err != TxnErrorCode::TXN_OK) {
1032
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1033
0
            return -1;
1034
0
        }
1035
49
        if (!it->has_next()) {
1036
0
            break;
1037
0
        }
1038
389
        while (it->has_next() && !stopped()) {
1039
389
            auto [k, v] = it->next();
1040
389
            doris::RowsetMetaCloudPB rowset;
1041
389
            if (!rowset.ParseFromArray(v.data(), v.size())) {
1042
0
                LOG(WARNING) << "malformed rowset meta value, key=" << hex(k);
1043
0
                return -1;
1044
0
            }
1045
1046
389
            ++rowsets_num;
1047
389
            collect_cb(rowset);
1048
1049
389
            if (!it->has_next()) {
1050
49
                begin = k;
1051
49
                begin.push_back('\x00'); // Update to next smallest key for iteration
1052
49
                break;
1053
49
            }
1054
389
        }
1055
49
    } while (it->more() && !stopped());
1056
1057
49
    LOG(INFO) << fmt::format(
1058
49
            "[delete bitmap checker] successfully collect rowsets for instance_id={}, "
1059
49
            "tablet_id={}, rowsets_num={}",
1060
49
            instance_id_, tablet_id, rowsets_num);
1061
49
    return 0;
1062
49
}
1063
1064
2
int InstanceChecker::do_delete_bitmap_inverted_check() {
1065
2
    LOG(INFO) << fmt::format(
1066
2
            "[delete bitmap checker] begin to do_delete_bitmap_inverted_check for instance_id={}",
1067
2
            instance_id_);
1068
1069
    // number of delete bitmap keys being scanned
1070
2
    int64_t total_delete_bitmap_keys {0};
1071
    // number of delete bitmaps which belongs to non mow tablet
1072
2
    int64_t abnormal_delete_bitmaps {0};
1073
    // number of delete bitmaps which doesn't have corresponding rowset in MS
1074
2
    int64_t leaked_delete_bitmaps {0};
1075
1076
2
    auto start_time = std::chrono::steady_clock::now();
1077
2
    DORIS_CLOUD_DEFER {
1078
2
        g_bvar_inverted_checker_leaked_delete_bitmaps.put(instance_id_, leaked_delete_bitmaps);
1079
2
        g_bvar_inverted_checker_abnormal_delete_bitmaps.put(instance_id_, abnormal_delete_bitmaps);
1080
2
        g_bvar_inverted_checker_delete_bitmaps_scanned.put(instance_id_, total_delete_bitmap_keys);
1081
1082
2
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1083
2
                            std::chrono::steady_clock::now() - start_time)
1084
2
                            .count();
1085
2
        if (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) {
1086
1
            LOG(WARNING) << fmt::format(
1087
1
                    "[delete bitmap check fails] delete bitmap inverted check for instance_id={}, "
1088
1
                    "cost={} ms, total_delete_bitmap_keys={}, leaked_delete_bitmaps={}, "
1089
1
                    "abnormal_delete_bitmaps={}",
1090
1
                    instance_id_, cost, total_delete_bitmap_keys, leaked_delete_bitmaps,
1091
1
                    abnormal_delete_bitmaps);
1092
1
        } else {
1093
1
            LOG(INFO) << fmt::format(
1094
1
                    "[delete bitmap checker] delete bitmap inverted check for instance_id={}, "
1095
1
                    "passed. cost={} ms, total_delete_bitmap_keys={}",
1096
1
                    instance_id_, cost, total_delete_bitmap_keys);
1097
1
        }
1098
2
    };
1099
1100
2
    struct TabletsRowsetsCache {
1101
2
        int64_t tablet_id {-1};
1102
2
        bool enable_merge_on_write {false};
1103
2
        std::unordered_set<std::string> rowsets {};
1104
2
        std::unordered_set<std::string> pending_delete_bitmaps {};
1105
2
    } tablet_rowsets_cache {};
1106
1107
2
    std::unique_ptr<RangeGetIterator> it;
1108
2
    auto begin = meta_delete_bitmap_key({instance_id_, 0, "", 0, 0});
1109
2
    auto end =
1110
2
            meta_delete_bitmap_key({instance_id_, std::numeric_limits<int64_t>::max(), "", 0, 0});
1111
2
    do {
1112
2
        std::unique_ptr<Transaction> txn;
1113
2
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1114
2
        if (err != TxnErrorCode::TXN_OK) {
1115
0
            LOG(WARNING) << "failed to create txn";
1116
0
            return -1;
1117
0
        }
1118
2
        err = txn->get(begin, end, &it);
1119
2
        if (err != TxnErrorCode::TXN_OK) {
1120
0
            LOG(WARNING) << "failed to get rowset kv, err=" << err;
1121
0
            return -1;
1122
0
        }
1123
2
        if (!it->has_next()) {
1124
0
            break;
1125
0
        }
1126
502
        while (it->has_next() && !stopped()) {
1127
500
            auto [k, v] = it->next();
1128
500
            std::string_view k1 = k;
1129
500
            k1.remove_prefix(1);
1130
500
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1131
500
            decode_key(&k1, &out);
1132
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1133
500
            auto tablet_id = std::get<int64_t>(std::get<0>(out[3]));
1134
500
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1135
500
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1136
500
            auto segment_id = std::get<std::int64_t>(std::get<0>(out[6]));
1137
1138
500
            ++total_delete_bitmap_keys;
1139
1140
500
            if (!it->has_next()) {
1141
2
                begin = k;
1142
2
                begin.push_back('\x00'); // Update to next smallest key for iteration
1143
2
            }
1144
1145
500
            if (tablet_rowsets_cache.tablet_id == -1 ||
1146
500
                tablet_rowsets_cache.tablet_id != tablet_id) {
1147
30
                TabletMetaCloudPB tablet_meta;
1148
30
                int ret = get_tablet_meta(txn_kv_.get(), instance_id_, tablet_id, tablet_meta);
1149
30
                if (ret < 0) {
1150
0
                    LOG(WARNING) << fmt::format(
1151
0
                            "[delete bitmap checker] failed to get_tablet_meta in "
1152
0
                            "do_delete_bitmap_inverted_check(), instance_id={}, tablet_id={}",
1153
0
                            instance_id_, tablet_id);
1154
0
                    return ret;
1155
0
                }
1156
1157
30
                tablet_rowsets_cache.tablet_id = tablet_id;
1158
30
                tablet_rowsets_cache.enable_merge_on_write =
1159
30
                        tablet_meta.enable_unique_key_merge_on_write();
1160
30
                tablet_rowsets_cache.rowsets.clear();
1161
30
                tablet_rowsets_cache.pending_delete_bitmaps.clear();
1162
1163
30
                if (tablet_rowsets_cache.enable_merge_on_write) {
1164
                    // only collect rowsets for merge-on-write tablet
1165
20
                    auto collect_cb =
1166
199
                            [&tablet_rowsets_cache](const doris::RowsetMetaCloudPB& rowset) {
1167
199
                                tablet_rowsets_cache.rowsets.insert(rowset.rowset_id_v2());
1168
199
                            };
1169
20
                    ret = collect_tablet_rowsets(tablet_id, collect_cb);
1170
20
                    if (ret < 0) {
1171
0
                        return ret;
1172
0
                    }
1173
                    // get pending delete bitmaps
1174
20
                    ret = get_pending_delete_bitmap_keys(
1175
20
                            tablet_id, tablet_rowsets_cache.pending_delete_bitmaps);
1176
20
                    if (ret < 0) {
1177
0
                        return ret;
1178
0
                    }
1179
20
                }
1180
30
            }
1181
500
            DCHECK_EQ(tablet_id, tablet_rowsets_cache.tablet_id);
1182
1183
500
            if (!tablet_rowsets_cache.enable_merge_on_write) {
1184
                // clang-format off
1185
40
                TEST_SYNC_POINT_CALLBACK(
1186
40
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_abnormal_delete_bitmap",
1187
40
                        &tablet_id, &rowset_id, &version, &segment_id);
1188
                // clang-format on
1189
40
                ++abnormal_delete_bitmaps;
1190
                // log an error and continue to check the next delete bitmap
1191
40
                LOG(WARNING) << fmt::format(
1192
40
                        "[delete bitmap check fails] find a delete bitmap belongs to tablet "
1193
40
                        "which is not a merge-on-write table! instance_id={}, tablet_id={}, "
1194
40
                        "version={}, segment_id={}",
1195
40
                        instance_id_, tablet_id, version, segment_id);
1196
40
                continue;
1197
40
            }
1198
1199
460
            if (!tablet_rowsets_cache.rowsets.contains(rowset_id) &&
1200
460
                !tablet_rowsets_cache.pending_delete_bitmaps.contains(std::string(k))) {
1201
170
                TEST_SYNC_POINT_CALLBACK(
1202
170
                        "InstanceChecker::do_delete_bitmap_inverted_check.get_leaked_delete_bitmap",
1203
170
                        &tablet_id, &rowset_id, &version, &segment_id);
1204
170
                ++leaked_delete_bitmaps;
1205
                // log an error and continue to check the next delete bitmap
1206
170
                LOG(WARNING) << fmt::format(
1207
170
                        "[delete bitmap check fails] can't find corresponding rowset for delete "
1208
170
                        "bitmap instance_id={}, tablet_id={}, rowset_id={}, version={}, "
1209
170
                        "segment_id={}",
1210
170
                        instance_id_, tablet_id, rowset_id, version, segment_id);
1211
170
            }
1212
460
        }
1213
2
    } while (it->more() && !stopped());
1214
1215
2
    return (leaked_delete_bitmaps > 0 || abnormal_delete_bitmaps > 0) ? 1 : 0;
1216
2
}
1217
1218
int InstanceChecker::get_pending_delete_bitmap_keys(
1219
49
        int64_t tablet_id, std::unordered_set<std::string>& pending_delete_bitmaps) {
1220
49
    std::unique_ptr<Transaction> txn;
1221
49
    TxnErrorCode err = txn_kv_->create_txn(&txn);
1222
49
    if (err != TxnErrorCode::TXN_OK) {
1223
0
        LOG(WARNING) << "failed to create txn";
1224
0
        return -1;
1225
0
    }
1226
49
    std::string pending_key = meta_pending_delete_bitmap_key({instance_id_, tablet_id});
1227
49
    std::string pending_val;
1228
49
    err = txn->get(pending_key, &pending_val);
1229
49
    if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) {
1230
0
        LOG(WARNING) << "failed to get pending delete bitmap kv, err=" << err;
1231
0
        return -1;
1232
0
    }
1233
49
    if (err == TxnErrorCode::TXN_OK) {
1234
2
        PendingDeleteBitmapPB pending_info;
1235
2
        if (!pending_info.ParseFromString(pending_val)) [[unlikely]] {
1236
0
            LOG(WARNING) << "failed to parse PendingDeleteBitmapPB, tablet=" << tablet_id;
1237
0
            return -1;
1238
0
        }
1239
12
        for (auto& delete_bitmap_key : pending_info.delete_bitmap_keys()) {
1240
12
            pending_delete_bitmaps.emplace(std::string(delete_bitmap_key));
1241
12
        }
1242
2
    }
1243
49
    return 0;
1244
49
}
1245
1246
int InstanceChecker::check_delete_bitmap_storage_optimize_v2(
1247
29
        int64_t tablet_id, int64_t& rowsets_with_useless_delete_bitmap_version) {
1248
    // end_version: create_time
1249
29
    std::map<int64_t, int64_t> tablet_rowsets_map {};
1250
    // rowset_id: {start_version, end_version}
1251
29
    std::map<std::string, std::pair<int64_t, int64_t>> rowset_version_map;
1252
    // Get all visible rowsets of this tablet
1253
190
    auto collect_cb = [&](const doris::RowsetMetaCloudPB& rowset) {
1254
190
        if (rowset.start_version() == 0 && rowset.end_version() == 1) {
1255
            // ignore dummy rowset [0-1]
1256
0
            return;
1257
0
        }
1258
190
        tablet_rowsets_map[rowset.end_version()] = rowset.creation_time();
1259
190
        rowset_version_map[rowset.rowset_id_v2()] =
1260
190
                std::make_pair(rowset.start_version(), rowset.end_version());
1261
190
    };
1262
29
    if (int ret = collect_tablet_rowsets(tablet_id, collect_cb); ret != 0) {
1263
0
        return ret;
1264
0
    }
1265
1266
29
    std::unordered_set<std::string> pending_delete_bitmaps;
1267
29
    if (auto ret = get_pending_delete_bitmap_keys(tablet_id, pending_delete_bitmaps); ret < 0) {
1268
0
        return ret;
1269
0
    }
1270
1271
29
    std::unique_ptr<RangeGetIterator> it;
1272
29
    auto begin = meta_delete_bitmap_key({instance_id_, tablet_id, "", 0, 0});
1273
29
    auto end = meta_delete_bitmap_key({instance_id_, tablet_id + 1, "", 0, 0});
1274
29
    std::string last_rowset_id = "";
1275
29
    int64_t last_version = 0;
1276
29
    int64_t last_failed_version = 0;
1277
29
    std::vector<int64_t> failed_versions;
1278
29
    auto print_failed_versions = [&]() {
1279
4
        TEST_SYNC_POINT_CALLBACK(
1280
4
                "InstanceChecker::check_delete_bitmap_storage_optimize_v2.get_abnormal_"
1281
4
                "rowset",
1282
4
                &tablet_id, &last_rowset_id);
1283
4
        rowsets_with_useless_delete_bitmap_version++;
1284
        // some versions are continuous, such as [8, 9, 10, 11, 13, 17, 18]
1285
        // print as [8-11, 13, 17-18]
1286
4
        int64_t last_start_version = -1;
1287
4
        int64_t last_end_version = -1;
1288
4
        std::stringstream ss;
1289
4
        ss << "[";
1290
9
        for (int64_t version : failed_versions) {
1291
9
            if (last_start_version == -1) {
1292
4
                last_start_version = version;
1293
4
                last_end_version = version;
1294
4
                continue;
1295
4
            }
1296
5
            if (last_end_version + 1 == version) {
1297
2
                last_end_version = version;
1298
3
            } else {
1299
3
                if (last_start_version == last_end_version) {
1300
3
                    ss << last_start_version << ", ";
1301
3
                } else {
1302
0
                    ss << last_start_version << "-" << last_end_version << ", ";
1303
0
                }
1304
3
                last_start_version = version;
1305
3
                last_end_version = version;
1306
3
            }
1307
5
        }
1308
4
        if (last_start_version == last_end_version) {
1309
3
            ss << last_start_version;
1310
3
        } else {
1311
1
            ss << last_start_version << "-" << last_end_version;
1312
1
        }
1313
4
        ss << "]";
1314
4
        std::stringstream version_str;
1315
4
        auto it = rowset_version_map.find(last_rowset_id);
1316
4
        if (it != rowset_version_map.end()) {
1317
4
            version_str << "[" << it->second.first << "-" << it->second.second << "]";
1318
4
        }
1319
4
        LOG(WARNING) << fmt::format(
1320
4
                "[delete bitmap check fails] delete bitmap storage optimize v2 check fail "
1321
4
                "for instance_id={}, tablet_id={}, rowset_id={}, version={} found delete "
1322
4
                "bitmap with versions={}, size={}",
1323
4
                instance_id_, tablet_id, last_rowset_id, version_str.str(), ss.str(),
1324
4
                failed_versions.size());
1325
4
    };
1326
29
    using namespace std::chrono;
1327
29
    int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
1328
29
    do {
1329
29
        std::unique_ptr<Transaction> txn;
1330
29
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1331
29
        if (err != TxnErrorCode::TXN_OK) {
1332
0
            LOG(WARNING) << "failed to create txn";
1333
0
            return -1;
1334
0
        }
1335
29
        err = txn->get(begin, end, &it);
1336
29
        if (err != TxnErrorCode::TXN_OK) {
1337
0
            LOG(WARNING) << "failed to get delete bitmap kv, err=" << err;
1338
0
            return -1;
1339
0
        }
1340
29
        if (!it->has_next()) {
1341
0
            break;
1342
0
        }
1343
722
        while (it->has_next() && !stopped()) {
1344
693
            auto [k, v] = it->next();
1345
693
            std::string_view k1 = k;
1346
693
            k1.remove_prefix(1);
1347
693
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1348
693
            decode_key(&k1, &out);
1349
            // 0x01 "meta" ${instance_id} "delete_bitmap" ${tablet_id} ${rowset_id} ${version} ${segment_id} -> roaringbitmap
1350
693
            auto rowset_id = std::get<std::string>(std::get<0>(out[4]));
1351
693
            auto version = std::get<std::int64_t>(std::get<0>(out[5]));
1352
693
            if (!it->has_next()) {
1353
29
                begin = k;
1354
29
                begin.push_back('\x00'); // Update to next smallest key for iteration
1355
29
            }
1356
693
            if (rowset_id == last_rowset_id && version == last_version) {
1357
                // skip the same rowset and version
1358
133
                continue;
1359
133
            }
1360
560
            if (rowset_id != last_rowset_id && !failed_versions.empty()) {
1361
3
                print_failed_versions();
1362
3
                last_failed_version = 0;
1363
3
                failed_versions.clear();
1364
3
            }
1365
560
            last_rowset_id = rowset_id;
1366
560
            last_version = version;
1367
560
            if (tablet_rowsets_map.find(version) != tablet_rowsets_map.end()) {
1368
539
                continue;
1369
539
            }
1370
21
            if (rowset_version_map.find(rowset_id) == rowset_version_map.end()) {
1371
                // checked in do_delete_bitmap_inverted_check
1372
1
                continue;
1373
1
            }
1374
20
            if (pending_delete_bitmaps.contains(std::string(k))) {
1375
3
                continue;
1376
3
            }
1377
            // there may be an interval in this situation:
1378
            // 1. finish compaction job; 2. checker; 3. finish agg and remove delete bitmap to ms
1379
17
            auto rowset_it = tablet_rowsets_map.upper_bound(version);
1380
17
            if (rowset_it == tablet_rowsets_map.end()) {
1381
1
                if (version != last_failed_version) {
1382
1
                    failed_versions.push_back(version);
1383
1
                }
1384
1
                last_failed_version = version;
1385
1
                continue;
1386
1
            }
1387
16
            if (rowset_it->second + config::delete_bitmap_storage_optimize_v2_check_skip_seconds >=
1388
16
                now) {
1389
8
                continue;
1390
8
            }
1391
8
            if (version != last_failed_version) {
1392
8
                failed_versions.push_back(version);
1393
8
            }
1394
8
            last_failed_version = version;
1395
8
        }
1396
29
    } while (it->more() && !stopped());
1397
29
    if (!failed_versions.empty()) {
1398
1
        print_failed_versions();
1399
1
    }
1400
29
    LOG(INFO) << fmt::format(
1401
29
            "[delete bitmap checker] finish check delete bitmap storage optimize v2 for "
1402
29
            "instance_id={}, tablet_id={}, rowsets_num={}, "
1403
29
            "rowsets_with_useless_delete_bitmap_version={}",
1404
29
            instance_id_, tablet_id, tablet_rowsets_map.size(),
1405
29
            rowsets_with_useless_delete_bitmap_version);
1406
29
    return (rowsets_with_useless_delete_bitmap_version > 1 ? 1 : 0);
1407
29
}
1408
1409
3
int InstanceChecker::do_delete_bitmap_storage_optimize_check(int version) {
1410
3
    if (version != 2) {
1411
0
        return -1;
1412
0
    }
1413
3
    int64_t total_tablets_num {0};
1414
3
    int64_t failed_tablets_num {0};
1415
1416
    // for v2 check
1417
3
    int64_t max_rowsets_with_useless_delete_bitmap_version = 0;
1418
3
    int64_t tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = 0;
1419
1420
    // check that for every visible rowset, there exists at least delete one bitmap in MS
1421
29
    int ret = traverse_mow_tablet([&](int64_t tablet_id) {
1422
29
        ++total_tablets_num;
1423
29
        int64_t rowsets_with_useless_delete_bitmap_version = 0;
1424
29
        int res = check_delete_bitmap_storage_optimize_v2(
1425
29
                tablet_id, rowsets_with_useless_delete_bitmap_version);
1426
29
        if (rowsets_with_useless_delete_bitmap_version >
1427
29
            max_rowsets_with_useless_delete_bitmap_version) {
1428
1
            max_rowsets_with_useless_delete_bitmap_version =
1429
1
                    rowsets_with_useless_delete_bitmap_version;
1430
1
            tablet_id_with_max_rowsets_with_useless_delete_bitmap_version = tablet_id;
1431
1
        }
1432
29
        failed_tablets_num += (res != 0);
1433
29
        return res;
1434
29
    });
1435
1436
3
    if (ret < 0) {
1437
0
        return ret;
1438
0
    }
1439
1440
3
    g_bvar_max_rowsets_with_useless_delete_bitmap_version.put(
1441
3
            instance_id_, max_rowsets_with_useless_delete_bitmap_version);
1442
1443
3
    std::stringstream ss;
1444
3
    ss << "[delete bitmap checker] check delete bitmap storage optimize v" << version
1445
3
       << " for instance_id=" << instance_id_ << ", total_tablets_num=" << total_tablets_num
1446
3
       << ", failed_tablets_num=" << failed_tablets_num
1447
3
       << ". max_rowsets_with_useless_delete_bitmap_version="
1448
3
       << max_rowsets_with_useless_delete_bitmap_version
1449
3
       << ", tablet_id=" << tablet_id_with_max_rowsets_with_useless_delete_bitmap_version;
1450
3
    LOG(INFO) << ss.str();
1451
1452
3
    return (failed_tablets_num > 0) ? 1 : 0;
1453
3
}
1454
1455
3
int InstanceChecker::do_mow_job_key_check() {
1456
3
    std::unique_ptr<RangeGetIterator> it;
1457
3
    std::string begin = mow_tablet_job_key({instance_id_, 0, 0});
1458
3
    std::string end = mow_tablet_job_key({instance_id_, INT64_MAX, 0});
1459
3
    MowTabletJobPB mow_tablet_job;
1460
3
    do {
1461
3
        std::unique_ptr<Transaction> txn;
1462
3
        TxnErrorCode err = txn_kv_->create_txn(&txn);
1463
3
        if (err != TxnErrorCode::TXN_OK) {
1464
0
            LOG(WARNING) << "failed to create txn";
1465
0
            return -1;
1466
0
        }
1467
3
        err = txn->get(begin, end, &it);
1468
3
        if (err != TxnErrorCode::TXN_OK) {
1469
0
            LOG(WARNING) << "failed to get mow tablet job key, err=" << err;
1470
0
            return -1;
1471
0
        }
1472
3
        int64_t now = duration_cast<std::chrono::seconds>(
1473
3
                              std::chrono::system_clock::now().time_since_epoch())
1474
3
                              .count();
1475
3
        while (it->has_next() && !stopped()) {
1476
2
            auto [k, v] = it->next();
1477
2
            std::string_view k1 = k;
1478
2
            k1.remove_prefix(1);
1479
2
            std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out;
1480
2
            decode_key(&k1, &out);
1481
            // 0x01 "meta" ${instance_id} "mow_tablet_job" ${table_id} ${initiator}
1482
2
            auto table_id = std::get<int64_t>(std::get<0>(out[3]));
1483
2
            auto initiator = std::get<int64_t>(std::get<0>(out[4]));
1484
2
            if (!mow_tablet_job.ParseFromArray(v.data(), v.size())) [[unlikely]] {
1485
0
                LOG(WARNING) << "failed to parse MowTabletJobPB";
1486
0
                return -1;
1487
0
            }
1488
2
            int64_t expiration = mow_tablet_job.expiration();
1489
            // check job key failed should meet both following two condition:
1490
            // 1. job key is expired
1491
            // 2. table lock key is not found or key is not expired
1492
2
            if (expiration < now - config::mow_job_key_check_expiration_diff_seconds) {
1493
2
                std::string lock_key =
1494
2
                        meta_delete_bitmap_update_lock_key({instance_id_, table_id, -1});
1495
2
                std::string lock_val;
1496
2
                err = txn->get(lock_key, &lock_val);
1497
2
                std::string reason = "";
1498
2
                if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
1499
0
                    reason = "table lock key not found";
1500
1501
2
                } else {
1502
2
                    DeleteBitmapUpdateLockPB lock_info;
1503
2
                    if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
1504
0
                        LOG(WARNING) << "failed to parse DeleteBitmapUpdateLockPB";
1505
0
                        return -1;
1506
0
                    }
1507
2
                    if (lock_info.expiration() > now || lock_info.lock_id() != -1) {
1508
2
                        reason = "table lock is not expired,lock_id=" +
1509
2
                                 std::to_string(lock_info.lock_id());
1510
2
                    }
1511
2
                }
1512
2
                if (reason != "") {
1513
2
                    LOG(WARNING) << fmt::format(
1514
2
                            "[compaction key check fails] mow job key check fail for "
1515
2
                            "instance_id={}, table_id={}, initiator={}, expiration={}, now={}, "
1516
2
                            "reason={}",
1517
2
                            instance_id_, table_id, initiator, expiration, now, reason);
1518
2
                    return -1;
1519
2
                }
1520
2
            }
1521
2
        }
1522
1
        begin = it->next_begin_key(); // Update to next smallest key for iteration
1523
1
    } while (it->more() && !stopped());
1524
1
    return 0;
1525
3
}
1526
1527
} // namespace doris::cloud