Coverage Report

Created: 2026-06-25 13:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_tablet.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/latency_recorder.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/rapidjson.h>
28
#include <rapidjson/stringbuffer.h>
29
30
#include <algorithm>
31
#include <atomic>
32
#include <chrono>
33
#include <cstdint>
34
#include <memory>
35
#include <ranges>
36
#include <ratio>
37
#include <shared_mutex>
38
#include <unordered_map>
39
#include <vector>
40
41
#include "cloud/cloud_meta_mgr.h"
42
#include "cloud/cloud_storage_engine.h"
43
#include "cloud/cloud_tablet_mgr.h"
44
#include "cloud/cloud_warm_up_manager.h"
45
#include "cloud/config.h"
46
#include "common/cast_set.h"
47
#include "common/config.h"
48
#include "common/logging.h"
49
#include "cpp/sync_point.h"
50
#include "io/cache/block_file_cache_downloader.h"
51
#include "io/cache/block_file_cache_factory.h"
52
#include "storage/compaction/compaction.h"
53
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
54
#include "storage/index/inverted/inverted_index_desc.h"
55
#include "storage/olap_define.h"
56
#include "storage/rowset/beta_rowset.h"
57
#include "storage/rowset/rowset.h"
58
#include "storage/rowset/rowset_factory.h"
59
#include "storage/rowset/rowset_fwd.h"
60
#include "storage/rowset/rowset_writer.h"
61
#include "storage/storage_policy.h"
62
#include "storage/tablet/base_tablet.h"
63
#include "storage/tablet/tablet_schema.h"
64
#include "storage/txn/txn_manager.h"
65
#include "util/debug_points.h"
66
#include "util/stack_util.h"
67
68
namespace doris {
69
using namespace ErrorCode;
70
71
bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms(
72
        "cu_compaction_get_delete_bitmap_lock_time_ms");
73
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
74
        "base_compaction_get_delete_bitmap_lock_time_ms");
75
76
bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
77
bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");
78
79
bvar::Adder<int64_t> g_capture_prefer_cache_count("capture_prefer_cache_count");
80
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count(
81
        "capture_with_freshness_tolerance_count");
82
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count(
83
        "capture_with_freshness_tolerance_fallback_count");
84
bvar::Adder<int64_t> g_rowset_warmup_state_missing_count("rowset_warmup_state_missing_count");
85
bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window(
86
        "capture_prefer_cache_count_window", &g_capture_prefer_cache_count, 30);
87
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_count_window(
88
        "capture_with_freshness_tolerance_count_window", &g_capture_with_freshness_tolerance_count,
89
        30);
90
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_count_window(
91
        "capture_with_freshness_tolerance_fallback_count_window",
92
        &g_capture_with_freshness_tolerance_fallback_count, 30);
93
94
static constexpr int LOAD_INITIATOR_ID = -1;
95
96
namespace {
97
98
bool is_schema_change_output_rowset(const RowsetSharedPtr& rowset,
99
3.24k
                                    const std::vector<RowsetSharedPtr>& output_rowsets) {
100
5.70k
    return std::ranges::any_of(output_rowsets, [&rowset](const RowsetSharedPtr& output_rowset) {
101
5.70k
        return output_rowset->rowset_id() == rowset->rowset_id();
102
5.70k
    });
103
3.24k
}
104
105
} // namespace
106
107
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
108
        "file_cache_cloud_tablet_submitted_segment_size");
109
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
110
        "file_cache_cloud_tablet_submitted_segment_num");
111
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
112
        "file_cache_cloud_tablet_submitted_index_size");
113
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
114
        "file_cache_cloud_tablet_submitted_index_num");
115
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
116
        "file_cache_cloud_tablet_finished_segment_size");
117
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
118
        "file_cache_cloud_tablet_finished_segment_num");
119
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
120
        "file_cache_cloud_tablet_finished_index_size");
121
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
122
        "file_cache_cloud_tablet_finished_index_num");
123
124
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
125
        "file_cache_recycle_cached_data_segment_num");
126
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
127
        "file_cache_recycle_cached_data_segment_size");
128
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
129
        "file_cache_recycle_cached_data_index_num");
130
131
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
132
        "file_cache_warm_up_segment_complete_num");
133
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
134
        "file_cache_warm_up_segment_failed_num");
135
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_complete_num(
136
        "file_cache_warm_up_inverted_idx_complete_num");
137
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_failed_num(
138
        "file_cache_warm_up_inverted_idx_failed_num");
139
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
140
        "file_cache_warm_up_rowset_complete_num");
141
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
142
        "file_cache_warm_up_rowset_triggered_by_job_num");
143
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
144
        "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
145
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num(
146
        "file_cache_warm_up_rowset_triggered_by_event_driven_num");
147
bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency(
148
        "file_cache_warm_up_rowset_all_segments_latency");
149
150
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
151
115k
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
152
153
16.0k
CloudTablet::~CloudTablet() = default;
154
155
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
156
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
157
0
}
158
159
175k
std::string CloudTablet::tablet_path() const {
160
175k
    return "";
161
175k
}
162
163
Status CloudTablet::capture_rs_readers(const Version& spec_version,
164
                                       std::vector<RowSetSplits>* rs_splits,
165
4.74k
                                       const CaptureRowsetOps& opts) {
166
4.74k
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
167
4.74k
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
168
4.74k
        return Status::Error<false>(-230, "injected error");
169
4.74k
    });
170
4.74k
    std::shared_lock rlock(_meta_lock);
171
4.74k
    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
172
4.74k
            spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions}));
173
4.74k
    return Status::OK();
174
4.74k
}
175
176
[[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked(
177
1.01M
        const Version& version_range, const CaptureRowsetOps& options) const {
178
1.01M
    if (options.query_freshness_tolerance_ms > 0) {
179
28
        return capture_versions_with_freshness_tolerance(version_range, options);
180
1.01M
    } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
181
15
        return capture_versions_prefer_cache(version_range);
182
15
    }
183
1.01M
    return BaseTablet::capture_consistent_versions_unlocked(version_range, options);
184
1.01M
}
185
186
Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
187
15
        const Version& spec_version) const {
188
15
    g_capture_prefer_cache_count << 1;
189
15
    Versions version_path;
190
    // Caller (capture_consistent_versions_unlocked) already holds shared
191
    // `_meta_lock`; do NOT re-acquire it here. The lock is writer-preferring,
192
    // so a recursive shared acquisition self-deadlocks if a writer queues in
193
    // between the outer and inner lock.
194
15
    auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache(
195
15
            spec_version, version_path,
196
134
            [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); });
197
15
    if (!st.ok()) {
198
0
        return ResultError(st);
199
0
    }
200
15
    int64_t path_max_version = version_path.back().second;
201
15
    VLOG_DEBUG << fmt::format(
202
0
            "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, "
203
0
            "tablet_id={}, spec_version={}, path_max_version={}",
204
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
205
0
                          return fmt::format("{}", version.to_string());
206
0
                      }),
207
0
                      ", "),
208
0
            tablet_id(), spec_version.to_string(), path_max_version);
209
15
    return version_path;
210
15
}
211
212
356
bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const {
213
356
    if (start_version > end_version) {
214
0
        return false;
215
0
    }
216
356
    Version version {start_version, end_version};
217
356
    auto it = _rs_version_map.find(version);
218
356
    if (it == _rs_version_map.end()) {
219
78
        it = _stale_rs_version_map.find(version);
220
78
        if (it == _stale_rs_version_map.end()) {
221
0
            LOG_WARNING(
222
0
                    "fail to find Rowset in rs_version or stale_rs_version for version. "
223
0
                    "tablet={}, version={}",
224
0
                    tablet_id(), version.to_string());
225
0
            return false;
226
0
        }
227
78
    }
228
356
    const auto& rs = it->second;
229
356
    if (rs->visible_timestamp() < _engine.startup_timepoint()) {
230
        // We only care about rowsets that are created after startup time point. For other rowsets,
231
        // we assume they are warmed up.
232
19
        return true;
233
19
    }
234
337
    return is_rowset_warmed_up(rs->rowset_id());
235
356
};
236
237
Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance(
238
28
        const Version& spec_version, const CaptureRowsetOps& options) const {
239
28
    g_capture_with_freshness_tolerance_count << 1;
240
28
    using namespace std::chrono;
241
28
    auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
242
28
    auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms);
243
    // find a version path where every edge(rowset) has been warmuped
244
28
    Versions version_path;
245
    // Caller (capture_consistent_versions_unlocked) already holds shared
246
    // `_meta_lock`; do NOT re-acquire it here. The lock is writer-preferring,
247
    // so a recursive shared acquisition self-deadlocks if a writer queues in
248
    // between the outer and inner lock.
249
28
    if (enable_unique_key_merge_on_write()) {
250
        // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
251
        // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue.
252
14
        RETURN_IF_ERROR_RESULT(
253
14
                _timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
254
14
                        spec_version, version_path, [&](int64_t start, int64_t end) {
255
14
                            return rowset_is_warmed_up_unlocked(start, end);
256
14
                        }));
257
14
    } else {
258
14
        RETURN_IF_ERROR_RESULT(
259
14
                _timestamped_version_tracker.capture_consistent_versions_with_validator(
260
14
                        spec_version, version_path, [&](int64_t start, int64_t end) {
261
14
                            return rowset_is_warmed_up_unlocked(start, end);
262
14
                        }));
263
14
    }
264
28
    int64_t path_max_version = version_path.back().second;
265
    // use std::views::concat after C++26
266
501
    auto check_fn = [this, path_max_version, freshness_limit_tp](const auto& rs_meta) {
267
501
        return _check_rowset_should_be_visible_but_not_warmed_up(rs_meta, path_max_version,
268
501
                                                                 freshness_limit_tp);
269
501
    };
270
28
    bool should_fallback =
271
28
            std::ranges::any_of(std::views::values(_tablet_meta->all_rs_metas()), check_fn) ||
272
28
            std::ranges::any_of(std::views::values(_tablet_meta->all_stale_rs_metas()), check_fn);
273
28
    if (should_fallback) {
274
        // The outer caller still holds the shared `_meta_lock`; the base
275
        // unlocked fallback below runs under that lock.
276
5
        g_capture_with_freshness_tolerance_fallback_count << 1;
277
        // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version
278
        // but has not been warmuped up yet, fallback to capture rowsets as usual
279
5
        return BaseTablet::capture_consistent_versions_unlocked(spec_version, options);
280
5
    }
281
23
    VLOG_DEBUG << fmt::format(
282
0
            "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, "
283
0
            "tablet_id={}, spec_version={}, path_max_version={}",
284
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
285
0
                          return fmt::format("{}", version.to_string());
286
0
                      }),
287
0
                      ", "),
288
0
            tablet_id(), spec_version.to_string(), path_max_version);
289
23
    return version_path;
290
28
}
291
292
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
293
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
294
1.07M
Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* stats) {
295
1.07M
    RETURN_IF_ERROR(sync_if_not_running(stats));
296
297
1.07M
    if (options.query_version > 0) {
298
1.00M
        DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", {
299
1.00M
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
300
1.00M
            auto stale_version = dp->param<int64_t>("version", -1);
301
1.00M
            if (target_tablet_id == tablet_id() && stale_version >= 0) {
302
1.00M
                std::unique_lock wlock(_meta_lock);
303
1.00M
                LOG(INFO) << "override cloud tablet local max_version for query_version sync"
304
1.00M
                          << ", tablet_id=" << tablet_id() << ", old_max_version=" << _max_version
305
1.00M
                          << ", stale_version=" << stale_version
306
1.00M
                          << ", query_version=" << options.query_version;
307
1.00M
                _max_version = stale_version;
308
1.00M
            }
309
1.00M
        });
310
1.00M
        auto lock_start = std::chrono::steady_clock::now();
311
1.00M
        std::shared_lock rlock(_meta_lock);
312
1.01M
        if (stats) {
313
1.01M
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
314
1.01M
                                                std::chrono::steady_clock::now() - lock_start)
315
1.01M
                                                .count();
316
1.01M
        }
317
1.00M
        if (_max_version >= options.query_version) {
318
1.00M
            return Status::OK();
319
1.00M
        }
320
1.00M
    }
321
322
    // serially execute sync to reduce unnecessary network overhead
323
67.1k
    auto sync_lock_start = std::chrono::steady_clock::now();
324
67.1k
    std::unique_lock lock(_sync_meta_lock);
325
67.1k
    if (stats) {
326
3.03k
        stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
327
3.03k
                                                 std::chrono::steady_clock::now() - sync_lock_start)
328
3.03k
                                                 .count();
329
3.03k
    }
330
67.1k
    if (options.query_version > 0) {
331
3.03k
        auto lock_start = std::chrono::steady_clock::now();
332
3.03k
        std::shared_lock rlock(_meta_lock);
333
3.04k
        if (stats) {
334
3.04k
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
335
3.04k
                                                std::chrono::steady_clock::now() - lock_start)
336
3.04k
                                                .count();
337
3.04k
        }
338
3.03k
        if (_max_version >= options.query_version) {
339
192
            return Status::OK();
340
192
        }
341
3.03k
    }
342
343
66.9k
    auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, options, stats);
344
66.9k
    if (st.is<ErrorCode::NOT_FOUND>()) {
345
0
        clear_cache();
346
0
    }
347
348
66.9k
    return st;
349
67.1k
}
350
351
// Sync tablet meta and all rowset meta if not running.
352
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
353
// It should be a quite rare situation.
354
1.07M
Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
355
1.07M
    if (tablet_state() == TABLET_RUNNING) {
356
1.07M
        return Status::OK();
357
1.07M
    }
358
359
    // Serially execute sync to reduce unnecessary network overhead
360
18.4E
    auto sync_lock_start = std::chrono::steady_clock::now();
361
18.4E
    std::unique_lock lock(_sync_meta_lock);
362
18.4E
    if (stats) {
363
0
        stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
364
0
                                                 std::chrono::steady_clock::now() - sync_lock_start)
365
0
                                                 .count();
366
0
    }
367
368
18.4E
    {
369
18.4E
        auto lock_start = std::chrono::steady_clock::now();
370
18.4E
        std::shared_lock rlock(_meta_lock);
371
18.4E
        if (stats) {
372
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
373
0
                                                std::chrono::steady_clock::now() - lock_start)
374
0
                                                .count();
375
0
        }
376
18.4E
        if (tablet_state() == TABLET_RUNNING) {
377
0
            return Status::OK();
378
0
        }
379
18.4E
    }
380
381
18.4E
    TabletMetaSharedPtr tablet_meta;
382
18.4E
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
383
18.4E
    if (!st.ok()) {
384
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
385
0
            clear_cache();
386
0
        }
387
0
        return st;
388
0
    }
389
390
18.4E
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
391
        // MoW may go to here when load while schema change
392
8.90k
        return Status::OK();
393
8.90k
    }
394
395
18.4E
    TimestampedVersionTracker empty_tracker;
396
18.4E
    {
397
18.4E
        auto lock_start = std::chrono::steady_clock::now();
398
18.4E
        std::lock_guard wlock(_meta_lock);
399
18.4E
        if (stats) {
400
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
401
0
                                                std::chrono::steady_clock::now() - lock_start)
402
0
                                                .count();
403
0
        }
404
18.4E
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
405
18.4E
        _rs_version_map.clear();
406
18.4E
        _stale_rs_version_map.clear();
407
18.4E
        std::swap(_timestamped_version_tracker, empty_tracker);
408
18.4E
        _tablet_meta->clear_rowsets();
409
18.4E
        _tablet_meta->clear_stale_rowset();
410
18.4E
        _max_version = -1;
411
18.4E
    }
412
413
0
    st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, {}, stats);
414
18.4E
    if (st.is<ErrorCode::NOT_FOUND>()) {
415
0
        clear_cache();
416
0
    }
417
18.4E
    return st;
418
18.4E
}
419
420
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
421
                              std::unique_lock<BthreadSharedMutex>& meta_lock,
422
313k
                              bool warmup_delta_data) {
423
313k
    if (to_add.empty()) {
424
4.54k
        return;
425
4.54k
    }
426
427
18.4E
    VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace();
428
429
308k
    if (!version_overlap) {
430
300k
        _add_rowsets_directly(to_add, warmup_delta_data);
431
300k
        return;
432
300k
    }
433
434
    // Filter out existed rowsets
435
8.66k
    auto remove_it =
436
14.1k
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
437
14.1k
                if (auto find_it = _rs_version_map.find(rs->version());
438
14.1k
                    find_it == _rs_version_map.end()) {
439
10.1k
                    return false;
440
10.1k
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
441
3.23k
                    return true; // Same rowset
442
3.23k
                }
443
444
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
445
                // replace existed rowset with `to_add` rowset. This may occur when:
446
                //  1. schema change converts rowsets which have been double written to new tablet
447
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
448
449
                // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
450
451
799
                std::vector<RowsetSharedPtr> unused_rowsets;
452
799
                if (auto find_it = _rs_version_map.find(rs->version());
453
812
                    find_it != _rs_version_map.end()) {
454
812
                    if (find_it->second->rowset_id() == rs->rowset_id()) {
455
0
                        LOG(WARNING) << "tablet_id=" << tablet_id()
456
0
                                     << ", rowset_id=" << rs->rowset_id().to_string()
457
0
                                     << ", existed rowset_id="
458
0
                                     << find_it->second->rowset_id().to_string();
459
0
                        DCHECK(find_it->second->rowset_id() != rs->rowset_id())
460
0
                                << "tablet_id=" << tablet_id()
461
0
                                << ", rowset_id=" << rs->rowset_id().to_string()
462
0
                                << ", existed rowset_id="
463
0
                                << find_it->second->rowset_id().to_string();
464
0
                    }
465
812
                    unused_rowsets.push_back(find_it->second);
466
812
                }
467
799
                add_unused_rowsets(unused_rowsets);
468
469
799
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
470
799
                _rs_version_map[rs->version()] = rs;
471
799
                _tablet_meta->add_rowsets_unchecked({rs});
472
799
                update_base_size(*rs);
473
799
                return true;
474
14.1k
            });
475
476
8.66k
    to_add.erase(remove_it, to_add.end());
477
478
    // delete rowsets with overlapped version
479
8.66k
    std::vector<RowsetSharedPtr> to_add_directly;
480
10.1k
    for (auto& to_add_rs : to_add) {
481
        // delete rowsets with overlapped version
482
10.1k
        std::vector<RowsetSharedPtr> to_delete;
483
10.1k
        Version to_add_v = to_add_rs->version();
484
        // if start_version  > max_version, we can skip checking overlap here.
485
10.1k
        if (to_add_v.first > _max_version) {
486
            // if start_version  > max_version, we can skip checking overlap here.
487
10.0k
            to_add_directly.push_back(to_add_rs);
488
10.0k
        } else {
489
44
            to_add_directly.push_back(to_add_rs);
490
180
            for (auto& [v, rs] : _rs_version_map) {
491
180
                if (to_add_v.contains(v)) {
492
0
                    to_delete.push_back(rs);
493
0
                }
494
180
            }
495
44
            delete_rowsets(to_delete, meta_lock);
496
44
        }
497
10.1k
    }
498
499
8.66k
    _add_rowsets_directly(to_add_directly, warmup_delta_data);
500
8.66k
}
501
502
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
503
7.18k
                                 std::unique_lock<BthreadSharedMutex>&) {
504
7.18k
    if (to_delete.empty()) {
505
52
        return;
506
52
    }
507
7.12k
    std::vector<RowsetMetaSharedPtr> rs_metas;
508
7.12k
    rs_metas.reserve(to_delete.size());
509
7.12k
    int64_t now = ::time(nullptr);
510
57.3k
    for (auto&& rs : to_delete) {
511
57.3k
        rs->rowset_meta()->set_stale_at(now);
512
57.3k
        rs_metas.push_back(rs->rowset_meta());
513
57.3k
        _stale_rs_version_map[rs->version()] = rs;
514
57.3k
    }
515
7.12k
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
516
57.2k
    for (auto&& rs : to_delete) {
517
57.2k
        _rs_version_map.erase(rs->version());
518
57.2k
    }
519
520
7.12k
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
521
7.12k
}
522
523
void CloudTablet::delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
524
                                                   std::unique_lock<BthreadSharedMutex>&,
525
10
                                                   bool recycle_deleted_rowsets) {
526
10
    if (to_delete.empty()) {
527
1
        return;
528
1
    }
529
9
    std::vector<RowsetMetaSharedPtr> rs_metas;
530
9
    rs_metas.reserve(to_delete.size());
531
10
    for (auto&& rs : to_delete) {
532
10
        rs_metas.push_back(rs->rowset_meta());
533
10
        _rs_version_map.erase(rs->version());
534
        // Remove edge from version graph so that the greedy capture algorithm
535
        // won't prefer the wider stale compaction rowset over individual SC
536
        // output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
537
10
        _timestamped_version_tracker.delete_version(rs->version());
538
10
    }
539
540
    // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
541
    // stale tracking mechanism (_stale_rs_version_map / _stale_version_path_map)
542
    // because SC output will create new rowsets with identical version ranges;
543
    // a later compaction could put those into stale as well, causing two stale
544
    // paths to reference the same version key -- when one path is cleaned first,
545
    // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
546
9
    _tablet_meta->modify_rs_metas({}, rs_metas, true);
547
548
9
    if (recycle_deleted_rowsets) {
549
        // Schedule for direct cache cleanup. MS has already recycled these rowsets.
550
8
        add_unused_rowsets(to_delete);
551
8
    }
552
9
}
553
554
void CloudTablet::replace_rowsets_with_schema_change_output(
555
        const std::vector<RowsetSharedPtr>& output_rowsets, int64_t alter_version,
556
        std::unique_lock<BthreadSharedMutex>& meta_lock, const char* stage,
557
11.2k
        bool recycle_deleted_rowsets) {
558
11.2k
    std::vector<RowsetSharedPtr> to_delete;
559
12.4k
    for (auto& [v, rs] : _rs_version_map) {
560
12.4k
        if (v.first >= 2 && v.second <= alter_version &&
561
12.4k
            !is_schema_change_output_rowset(rs, output_rowsets)) {
562
6
            to_delete.push_back(rs);
563
6
        }
564
12.4k
    }
565
11.2k
    if (!to_delete.empty()) {
566
6
        LOG_INFO(
567
6
                "schema change: delete {} local rowsets in [2, {}] before adding SC output, "
568
6
                "tablet_id={}, stage={}, versions=[{}]",
569
6
                to_delete.size(), alter_version, tablet_id(), stage,
570
6
                fmt::join(to_delete | std::views::transform([](const auto& rs) {
571
6
                              return rs->version().to_string();
572
6
                          }),
573
6
                          ", "));
574
6
        delete_rowsets_for_schema_change(to_delete, meta_lock, recycle_deleted_rowsets);
575
6
    }
576
11.2k
    add_rowsets(output_rowsets, true, meta_lock, false);
577
11.2k
}
578
579
4.95k
uint64_t CloudTablet::delete_expired_stale_rowsets() {
580
4.95k
    if (config::enable_mow_verbose_log) {
581
0
        LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
582
0
    }
583
4.95k
    std::vector<RowsetSharedPtr> expired_rowsets;
584
    // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
585
4.95k
    std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
586
4.95k
    int64_t expired_stale_sweep_endtime =
587
4.95k
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
588
4.95k
    {
589
4.95k
        std::unique_lock wlock(_meta_lock);
590
591
4.95k
        std::vector<int64_t> path_ids;
592
        // capture the path version to delete
593
4.95k
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
594
595
4.95k
        if (path_ids.empty()) {
596
2.53k
            return 0;
597
2.53k
        }
598
599
6.72k
        for (int64_t path_id : path_ids) {
600
6.72k
            int64_t start_version = -1;
601
6.72k
            int64_t end_version = -1;
602
6.72k
            std::vector<RowsetSharedPtr> stale_rowsets;
603
            // delete stale versions in version graph
604
6.72k
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
605
54.7k
            for (auto& v_ts : version_path->timestamped_versions()) {
606
54.7k
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
607
54.7k
                if (rs_it != _stale_rs_version_map.end()) {
608
54.7k
                    expired_rowsets.push_back(rs_it->second);
609
54.7k
                    stale_rowsets.push_back(rs_it->second);
610
54.7k
                    VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id()
611
0
                               << " rowset_id=" << rs_it->second->rowset_id().to_string()
612
0
                               << " version=" << rs_it->first.to_string();
613
54.7k
                    _stale_rs_version_map.erase(rs_it);
614
54.7k
                } else {
615
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
616
0
                                 << tablet_id();
617
                    // clang-format off
618
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
619
                    // clang-format on
620
0
                }
621
54.7k
                if (start_version < 0) {
622
6.72k
                    start_version = v_ts->version().first;
623
6.72k
                }
624
54.7k
                end_version = v_ts->version().second;
625
54.7k
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
626
54.7k
            }
627
6.72k
            Version version(start_version, end_version);
628
6.72k
            if (!stale_rowsets.empty()) {
629
6.72k
                deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
630
6.72k
            }
631
6.72k
        }
632
2.41k
        _reconstruct_version_tracker_if_necessary();
633
2.41k
    }
634
635
    // if the rowset is not used by any query, we can recycle its cached data early.
636
0
    auto recycled_rowsets = recycle_cached_data(expired_rowsets);
637
2.41k
    if (!recycled_rowsets.empty()) {
638
2.41k
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
639
2.41k
        manager.recycle_cache(tablet_id(), recycled_rowsets);
640
2.41k
    }
641
2.41k
    if (config::enable_mow_verbose_log) {
642
0
        LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
643
0
    }
644
645
2.41k
    add_unused_rowsets(expired_rowsets);
646
2.41k
    if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS &&
647
2.41k
        enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) {
648
        // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
649
562
        OlapStopWatch watch;
650
3.52k
        for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
651
            // agg delete bitmap for pre rowset
652
3.52k
            DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
653
3.52k
            agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
654
            // add remove delete bitmap
655
3.52k
            if (!remove_delete_bitmap_key_ranges.empty()) {
656
1.05k
                std::vector<RowsetId> rowset_ids;
657
11.3k
                for (const auto& rs : unused_rowsets) {
658
11.3k
                    rowset_ids.push_back(rs->rowset_id());
659
11.3k
                }
660
1.05k
                std::lock_guard<std::mutex> lock(_gc_mutex);
661
1.05k
                _unused_delete_bitmap.push_back(
662
1.05k
                        std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
663
1.05k
            }
664
3.52k
        }
665
562
        LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
666
562
                  << ", size=" << deleted_stale_rowsets.size()
667
562
                  << ", cost(us)=" << watch.get_elapse_time_us();
668
562
    }
669
2.41k
    return expired_rowsets.size();
670
4.95k
}
671
672
667k
bool CloudTablet::need_remove_unused_rowsets() {
673
667k
    std::lock_guard<std::mutex> lock(_gc_mutex);
674
667k
    return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
675
667k
}
676
677
3.23k
void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
678
3.23k
    std::lock_guard<std::mutex> lock(_gc_mutex);
679
55.5k
    for (const auto& rowset : rowsets) {
680
55.5k
        _unused_rowsets[rowset->rowset_id()] = rowset;
681
55.5k
        g_unused_rowsets_bytes << rowset->total_disk_size();
682
55.5k
    }
683
3.23k
    g_unused_rowsets_count << rowsets.size();
684
3.23k
}
685
686
2.59k
void CloudTablet::remove_unused_rowsets() {
687
2.59k
    std::vector<std::shared_ptr<Rowset>> removed_rowsets;
688
2.59k
    int64_t removed_delete_bitmap_num = 0;
689
2.59k
    OlapStopWatch watch;
690
2.59k
    {
691
2.59k
        std::lock_guard<std::mutex> lock(_gc_mutex);
692
        // 1. remove unused rowsets's cache data and delete bitmap
693
58.2k
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
694
55.6k
            auto& rs = it->second;
695
55.6k
            if (rs.use_count() > 1) {
696
118
                LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
697
118
                             << " has " << rs.use_count() << " references, it cannot be removed";
698
118
                ++it;
699
118
                continue;
700
118
            }
701
55.5k
            tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
702
55.5k
            _rowset_warm_up_states.erase(rs->rowset_id());
703
55.5k
            rs->clear_cache();
704
55.5k
            g_unused_rowsets_count << -1;
705
55.5k
            g_unused_rowsets_bytes << -rs->total_disk_size();
706
55.5k
            removed_rowsets.push_back(std::move(rs));
707
55.5k
            it = _unused_rowsets.erase(it);
708
55.5k
        }
709
2.59k
    }
710
711
2.59k
    {
712
2.59k
        std::vector<RecycledRowsets> recycled_rowsets;
713
714
55.5k
        for (auto& rs : removed_rowsets) {
715
55.5k
            auto index_names = rs->get_index_file_names();
716
55.5k
            recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
717
55.5k
            int64_t segment_size_sum = 0;
718
78.6k
            for (int32_t i = 0; i < rs->num_segments(); i++) {
719
23.0k
                segment_size_sum += rs->rowset_meta()->segment_file_size(i);
720
23.0k
            }
721
55.5k
            g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
722
55.5k
            g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
723
55.5k
            g_file_cache_recycle_cached_data_index_num << index_names.size();
724
55.5k
        }
725
726
2.59k
        if (recycled_rowsets.size() > 0) {
727
2.58k
            auto& manager =
728
2.58k
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
729
2.58k
            manager.recycle_cache(tablet_id(), recycled_rowsets);
730
2.58k
        }
731
2.59k
    }
732
733
2.59k
    {
734
2.59k
        std::lock_guard<std::mutex> lock(_gc_mutex);
735
        // 2. remove delete bitmap of pre rowsets
736
3.64k
        for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
737
1.05k
            auto& rowset_ids = std::get<0>(*it);
738
1.05k
            bool find_unused_rowset = false;
739
11.3k
            for (const auto& rowset_id : rowset_ids) {
740
11.3k
                if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
741
0
                    LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
742
0
                              << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
743
0
                    find_unused_rowset = true;
744
0
                    break;
745
0
                }
746
11.3k
            }
747
1.05k
            if (find_unused_rowset) {
748
0
                ++it;
749
0
                continue;
750
0
            }
751
1.05k
            auto& key_ranges = std::get<1>(*it);
752
1.05k
            tablet_meta()->delete_bitmap().remove(key_ranges);
753
1.05k
            it = _unused_delete_bitmap.erase(it);
754
1.05k
            removed_delete_bitmap_num++;
755
            // TODO(kaijie): recycle cache for unused delete bitmap
756
1.05k
        }
757
2.59k
    }
758
759
2.59k
    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
760
2.59k
              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
761
2.59k
              << ", removed_rowsets_num=" << removed_rowsets.size()
762
2.59k
              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
763
2.59k
              << ", cost(us)=" << watch.get_elapse_time_us();
764
2.59k
}
765
766
318k
void CloudTablet::update_base_size(const Rowset& rs) {
767
    // Define base rowset as the rowset of version [2-x]
768
318k
    if (rs.start_version() == 2) {
769
105k
        _base_size = rs.total_disk_size();
770
105k
    }
771
318k
}
772
773
452
void CloudTablet::clear_cache() {
774
452
    auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
775
454
    if (!recycled_rowsets.empty()) {
776
454
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
777
454
        manager.recycle_cache(tablet_id(), recycled_rowsets);
778
454
    }
779
452
    _engine.tablet_mgr().erase_tablet(tablet_id());
780
452
}
781
782
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
783
2.87k
        const std::vector<RowsetSharedPtr>& rowsets) {
784
2.87k
    std::vector<RecycledRowsets> recycled_rowsets;
785
59.4k
    for (const auto& rs : rowsets) {
786
        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
787
59.4k
        if (rs.use_count() > 2) {
788
3.53k
            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
789
3.53k
                         << " references. File Cache won't be recycled when query is using it.";
790
3.53k
            continue;
791
3.53k
        }
792
55.9k
        rs->clear_cache();
793
55.9k
        auto index_names = rs->get_index_file_names();
794
55.9k
        recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
795
796
55.9k
        int64_t segment_size_sum = 0;
797
79.2k
        for (int32_t i = 0; i < rs->num_segments(); i++) {
798
23.3k
            segment_size_sum += rs->rowset_meta()->segment_file_size(i);
799
23.3k
        }
800
55.9k
        g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
801
55.9k
        g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
802
55.9k
        g_file_cache_recycle_cached_data_index_num << index_names.size();
803
55.9k
    }
804
2.87k
    return recycled_rowsets;
805
2.87k
}
806
807
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
808
220k
                                          int64_t num_rows, int64_t data_size) {
809
220k
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
810
220k
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
811
220k
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
812
220k
    int64_t cumu_num_deltas = 0;
813
220k
    int64_t cumu_num_rowsets = 0;
814
220k
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
815
461k
    for (auto& [v, r] : _rs_version_map) {
816
461k
        if (v.second < cp) {
817
230k
            continue;
818
230k
        }
819
230k
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
820
230k
        ++cumu_num_rowsets;
821
230k
    }
822
    // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets
823
    // in the version map, so we use the max value to ensure that the approximate number
824
    // of rowsets is at least the size of _rs_version_map.
825
    // Note that this is not the exact number of rowsets, but an approximate number.
826
220k
    int64_t approximate_num_rowsets =
827
220k
            std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size()));
828
220k
    _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed);
829
220k
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
830
220k
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
831
220k
}
832
833
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
834
194k
        RowsetWriterContext& context, bool vertical) {
835
194k
    context.rowset_id = _engine.next_rowset_id();
836
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
837
194k
    context.tablet_id = tablet_id();
838
194k
    context.index_id = index_id();
839
194k
    context.partition_id = partition_id();
840
194k
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
841
194k
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
842
194k
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
843
194k
}
844
845
// create a rowset writer with rowset_id and seg_id
846
// after writer, merge this transient rowset with original rowset
847
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
848
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
849
1.39k
        int64_t txn_expiration) {
850
1.39k
    if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
851
1.39k
        rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
852
0
        auto msg = fmt::format(
853
0
                "wrong rowset state when create_transient_rowset_writer, rowset state should be "
854
0
                "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
855
0
                RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
856
0
                tablet_id());
857
        // see `CloudRowsetWriter::build` for detail.
858
        // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
859
        // in `RowsetMeta::merge_rowset_meta()` in previous trials.
860
0
        LOG(WARNING) << msg;
861
0
        DCHECK(false) << msg;
862
0
    }
863
1.39k
    RowsetWriterContext context;
864
1.39k
    context.rowset_state = PREPARED;
865
1.39k
    context.segments_overlap = OVERLAPPING;
866
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
867
    // This is because the partial update for a variant needs to ignore the extracted columns.
868
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
869
    // the complete variant is constructed by reading all the sub-columns of the variant.
870
1.39k
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
871
1.39k
    context.newest_write_timestamp = UnixSeconds();
872
1.39k
    context.tablet_id = table_id();
873
1.39k
    context.enable_segcompaction = false;
874
1.39k
    context.write_type = DataWriteType::TYPE_DIRECT;
875
1.39k
    context.partial_update_info = std::move(partial_update_info);
876
1.39k
    context.is_transient_rowset_writer = true;
877
1.39k
    context.rowset_id = rowset.rowset_id();
878
1.39k
    context.tablet_id = tablet_id();
879
1.39k
    context.index_id = index_id();
880
1.39k
    context.partition_id = partition_id();
881
1.39k
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
882
1.39k
    context.txn_expiration = txn_expiration;
883
1.39k
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
884
    // TODO(liaoxin) enable packed file for transient rowset
885
1.39k
    context.allow_packed_file = false;
886
887
1.39k
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
888
1.39k
    if (!storage_resource) {
889
0
        return ResultError(std::move(storage_resource.error()));
890
0
    }
891
892
1.39k
    context.storage_resource = *storage_resource.value();
893
894
1.39k
    return RowsetFactory::create_rowset_writer(_engine, context, false)
895
1.39k
            .transform([&](auto&& writer) {
896
1.39k
                writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
897
1.39k
                return writer;
898
1.39k
            });
899
1.39k
}
900
901
74.4M
int64_t CloudTablet::get_cloud_base_compaction_score() const {
902
74.4M
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
903
33.7k
        bool has_delete = false;
904
33.7k
        int64_t point = cumulative_layer_point();
905
33.7k
        std::shared_lock rlock(_meta_lock);
906
439k
        for (const auto& [_, rs_meta] : _tablet_meta->all_rs_metas()) {
907
439k
            if (rs_meta->start_version() >= point) {
908
404k
                continue;
909
404k
            }
910
34.8k
            if (rs_meta->has_delete_predicate()) {
911
0
                has_delete = true;
912
0
                break;
913
0
            }
914
34.8k
        }
915
33.7k
        if (!has_delete) {
916
33.7k
            return 0;
917
33.7k
        }
918
33.7k
    }
919
920
74.4M
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
921
74.4M
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
922
74.4M
}
923
924
668M
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
925
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
926
    //  number of tablet versions simultaneously.
927
668M
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
928
668M
}
929
930
// return a json string to show the compaction status of this tablet
931
2.60k
void CloudTablet::get_compaction_status(std::string* json_result) {
932
2.60k
    rapidjson::Document root;
933
2.60k
    root.SetObject();
934
935
2.60k
    rapidjson::Document path_arr;
936
2.60k
    path_arr.SetArray();
937
938
2.60k
    std::vector<RowsetSharedPtr> rowsets;
939
2.60k
    std::vector<RowsetSharedPtr> stale_rowsets;
940
2.60k
    {
941
2.60k
        std::shared_lock rdlock(_meta_lock);
942
2.60k
        rowsets.reserve(_rs_version_map.size());
943
12.3k
        for (auto& it : _rs_version_map) {
944
12.3k
            rowsets.push_back(it.second);
945
12.3k
        }
946
2.60k
        stale_rowsets.reserve(_stale_rs_version_map.size());
947
11.0k
        for (auto& it : _stale_rs_version_map) {
948
11.0k
            stale_rowsets.push_back(it.second);
949
11.0k
        }
950
2.60k
    }
951
2.60k
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
952
2.60k
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
953
954
    // get snapshot version path json_doc
955
2.60k
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
956
2.60k
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
957
2.60k
    rapidjson::Value cumu_value;
958
2.60k
    std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
959
2.60k
    cumu_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
960
2.60k
                         root.GetAllocator());
961
2.60k
    root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
962
2.60k
    rapidjson::Value base_value;
963
2.60k
    format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
964
2.60k
    base_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
965
2.60k
                         root.GetAllocator());
966
2.60k
    root.AddMember("last base failure time", base_value, root.GetAllocator());
967
2.60k
    rapidjson::Value full_value;
968
2.60k
    format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
969
2.60k
    full_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
970
2.60k
                         root.GetAllocator());
971
2.60k
    root.AddMember("last full failure time", full_value, root.GetAllocator());
972
2.60k
    rapidjson::Value cumu_success_value;
973
2.60k
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
974
2.60k
    cumu_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
975
2.60k
                                 root.GetAllocator());
976
2.60k
    root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
977
2.60k
    rapidjson::Value base_success_value;
978
2.60k
    format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
979
2.60k
    base_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
980
2.60k
                                 root.GetAllocator());
981
2.60k
    root.AddMember("last base success time", base_success_value, root.GetAllocator());
982
2.60k
    rapidjson::Value full_success_value;
983
2.60k
    format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
984
2.60k
    full_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
985
2.60k
                                 root.GetAllocator());
986
2.60k
    root.AddMember("last full success time", full_success_value, root.GetAllocator());
987
2.60k
    rapidjson::Value cumu_schedule_value;
988
2.60k
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
989
2.60k
    cumu_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
990
2.60k
                                  root.GetAllocator());
991
2.60k
    root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
992
2.60k
    rapidjson::Value base_schedule_value;
993
2.60k
    format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
994
2.60k
    base_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
995
2.60k
                                  root.GetAllocator());
996
2.60k
    root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
997
2.60k
    rapidjson::Value full_schedule_value;
998
2.60k
    format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
999
2.60k
    full_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1000
2.60k
                                  root.GetAllocator());
1001
2.60k
    root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
1002
2.60k
    rapidjson::Value cumu_compaction_status_value;
1003
2.60k
    cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
1004
2.60k
                                           cast_set<uint>(_last_cumu_compaction_status.length()),
1005
2.60k
                                           root.GetAllocator());
1006
2.60k
    root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
1007
2.60k
    rapidjson::Value base_compaction_status_value;
1008
2.60k
    base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
1009
2.60k
                                           cast_set<uint>(_last_base_compaction_status.length()),
1010
2.60k
                                           root.GetAllocator());
1011
2.60k
    root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
1012
2.60k
    rapidjson::Value full_compaction_status_value;
1013
2.60k
    full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
1014
2.60k
                                           cast_set<uint>(_last_full_compaction_status.length()),
1015
2.60k
                                           root.GetAllocator());
1016
2.60k
    root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());
1017
2.60k
    rapidjson::Value exec_compaction_time;
1018
2.60k
    std::string num_str {std::to_string(exec_compaction_time_us.load())};
1019
2.60k
    exec_compaction_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
1020
2.60k
                                   root.GetAllocator());
1021
2.60k
    root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator());
1022
2.60k
    rapidjson::Value local_read_time;
1023
2.60k
    num_str = std::to_string(local_read_time_us.load());
1024
2.60k
    local_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
1025
2.60k
                              root.GetAllocator());
1026
2.60k
    root.AddMember("compaction local read time us", local_read_time, root.GetAllocator());
1027
2.60k
    rapidjson::Value remote_read_time;
1028
2.60k
    num_str = std::to_string(remote_read_time_us.load());
1029
2.60k
    remote_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
1030
2.60k
                               root.GetAllocator());
1031
2.60k
    root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator());
1032
1033
    // print all rowsets' version as an array
1034
2.60k
    rapidjson::Document versions_arr;
1035
2.60k
    rapidjson::Document missing_versions_arr;
1036
2.60k
    versions_arr.SetArray();
1037
2.60k
    missing_versions_arr.SetArray();
1038
2.60k
    int64_t last_version = -1;
1039
12.3k
    for (auto& rowset : rowsets) {
1040
12.3k
        const Version& ver = rowset->version();
1041
12.3k
        if (ver.first != last_version + 1) {
1042
0
            rapidjson::Value miss_value;
1043
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
1044
0
                                 missing_versions_arr.GetAllocator());
1045
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
1046
0
        }
1047
12.3k
        rapidjson::Value value;
1048
12.3k
        std::string version_str = rowset->get_rowset_info_str();
1049
12.3k
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
1050
12.3k
                        versions_arr.GetAllocator());
1051
12.3k
        versions_arr.PushBack(value, versions_arr.GetAllocator());
1052
12.3k
        last_version = ver.second;
1053
12.3k
    }
1054
2.60k
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
1055
2.60k
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
1056
1057
    // print all stale rowsets' version as an array
1058
2.60k
    rapidjson::Document stale_versions_arr;
1059
2.60k
    stale_versions_arr.SetArray();
1060
11.0k
    for (auto& rowset : stale_rowsets) {
1061
11.0k
        rapidjson::Value value;
1062
11.0k
        std::string version_str = rowset->get_rowset_info_str();
1063
11.0k
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
1064
11.0k
                        stale_versions_arr.GetAllocator());
1065
11.0k
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
1066
11.0k
    }
1067
2.60k
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
1068
1069
    // add stale version rowsets
1070
2.60k
    root.AddMember("stale version path", path_arr, root.GetAllocator());
1071
1072
    // to json string
1073
2.60k
    rapidjson::StringBuffer strbuf;
1074
2.60k
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1075
2.60k
    root.Accept(writer);
1076
2.60k
    *json_result = std::string(strbuf.GetString());
1077
2.60k
}
1078
1079
219k
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
1080
220k
    if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
1081
220k
        _cumulative_point = new_point;
1082
220k
        return;
1083
220k
    }
1084
    // cumulative point should only be reset to -1, or be increased
1085
    // FIXME: could happen in currently unresolved race conditions
1086
18.4E
    LOG(WARNING) << "Unexpected cumulative point: " << new_point
1087
18.4E
                 << ", origin: " << _cumulative_point.load();
1088
18.4E
}
1089
1090
Status CloudTablet::check_rowset_schema_for_build_index(std::vector<TColumn>& columns,
1091
593
                                                        int schema_version) {
1092
593
    std::map<std::string, TabletColumn> fe_col_map;
1093
3.99k
    for (int i = 0; i < columns.size(); i++) {
1094
3.40k
        fe_col_map[columns[i].column_name] = TabletColumn(columns[i]);
1095
3.40k
    }
1096
1097
593
    std::shared_lock rlock(_meta_lock);
1098
1.59k
    for (const auto& [version, rs] : _rs_version_map) {
1099
1.59k
        if (version.first == 0) {
1100
576
            continue;
1101
576
        }
1102
1103
1.01k
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1104
179
            continue;
1105
179
        }
1106
1107
5.61k
        for (auto rs_col : rs->tablet_schema()->columns()) {
1108
5.61k
            auto find_ret = fe_col_map.find(rs_col->name());
1109
5.61k
            if (find_ret == fe_col_map.end()) {
1110
7
                return Status::InternalError(
1111
7
                        "check rowset meta failed:rowset's col is dropped in FE.");
1112
7
            }
1113
1114
5.60k
            if (rs_col->unique_id() != find_ret->second.unique_id()) {
1115
4
                return Status::InternalError("check rowset meta failed:col id not match.");
1116
4
            }
1117
1118
5.60k
            if (rs_col->type() != find_ret->second.type()) {
1119
1
                return Status::InternalError("check rowset meta failed:col type not match.");
1120
1
            }
1121
5.60k
        }
1122
835
    }
1123
1124
581
    return Status::OK();
1125
593
}
1126
1127
Result<RowsetSharedPtr> CloudTablet::pick_a_rowset_for_index_change(int schema_version,
1128
2.20k
                                                                    bool& is_base_rowset) {
1129
2.20k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudTablet::pick_a_rowset_for_index_change",
1130
2.20k
                                      Result<RowsetSharedPtr>(nullptr));
1131
2.20k
    RowsetSharedPtr ret_rowset = nullptr;
1132
2.20k
    std::shared_lock rlock(_meta_lock);
1133
11.5k
    for (const auto& [version, rs] : _rs_version_map) {
1134
11.5k
        if (version.first == 0) {
1135
2.20k
            continue;
1136
2.20k
        }
1137
9.36k
        if (rs->num_rows() == 0) {
1138
6.43k
            VLOG_DEBUG << "[index_change]find empty rs, index change may "
1139
0
                          "failed, id="
1140
0
                       << rs->rowset_id().to_string();
1141
6.43k
        }
1142
1143
9.36k
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1144
4.34k
            VLOG_DEBUG << "[index_change] skip rowset " << rs->tablet_schema()->schema_version()
1145
0
                       << "," << schema_version;
1146
4.34k
            continue;
1147
4.34k
        }
1148
1149
5.02k
        if (ret_rowset == nullptr) {
1150
1.62k
            ret_rowset = rs;
1151
1.62k
            continue;
1152
1.62k
        }
1153
1154
3.39k
        if (rs->start_version() > ret_rowset->start_version()) {
1155
290
            ret_rowset = rs;
1156
290
        }
1157
3.39k
    }
1158
1159
2.20k
    if (ret_rowset != nullptr) {
1160
1.62k
        is_base_rowset = ret_rowset->version().first < _cumulative_point;
1161
1.62k
    }
1162
1163
2.20k
    return ret_rowset;
1164
2.20k
}
1165
1166
3.98k
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction_unlocked() {
1167
3.98k
    std::vector<RowsetSharedPtr> candidate_rowsets;
1168
20.7k
    for (const auto& [version, rs] : _rs_version_map) {
1169
20.7k
        if (version.first != 0 && version.first < _cumulative_point &&
1170
20.7k
            (_alter_version == -1 || version.second <= _alter_version)) {
1171
12.9k
            candidate_rowsets.push_back(rs);
1172
12.9k
        }
1173
20.7k
    }
1174
3.98k
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1175
3.98k
    return candidate_rowsets;
1176
3.98k
}
1177
1178
95
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction_unlocked() {
1179
95
    std::vector<RowsetSharedPtr> candidate_rowsets;
1180
666
    for (auto& [v, rs] : _rs_version_map) {
1181
        // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
1182
666
        if (v.first != 0) {
1183
571
            candidate_rowsets.push_back(rs);
1184
571
        }
1185
666
    }
1186
95
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1187
95
    return candidate_rowsets;
1188
95
}
1189
1190
48
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
1191
48
    return _engine.calc_delete_bitmap_executor();
1192
48
}
1193
1194
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
1195
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
1196
                                       const RowsetIdUnorderedSet& cur_rowset_ids, int64_t lock_id,
1197
20.4k
                                       int64_t next_visible_version) {
1198
20.4k
    RowsetSharedPtr rowset = txn_info->rowset;
1199
20.4k
    int64_t cur_version = rowset->start_version();
1200
    // update delete bitmap info, in order to avoid recalculation when trying again
1201
20.4k
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1202
20.4k
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
1203
1204
20.4k
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
1205
20.4k
        rowset_writer->num_rows() > 0) {
1206
19
        DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
1207
19
            return Status::InternalError<false>("injected update_tmp_rowset error.");
1208
19
        });
1209
19
        const auto& rowset_meta = rowset->rowset_meta();
1210
19
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta, table_id()));
1211
19
    }
1212
1213
20.4k
    RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id,
1214
20.4k
                                             next_visible_version, rowset));
1215
1216
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
1217
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
1218
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
1219
20.4k
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1220
20.4k
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
1221
20.4k
            txn_info->publish_info));
1222
1223
20.4k
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
1224
20.4k
        auto sleep_sec = dp->param<int>("sleep", 5);
1225
20.4k
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1226
20.4k
    });
1227
1228
20.4k
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
1229
20.4k
        auto retry = dp->param<bool>("retry", false);
1230
20.4k
        auto sleep_sec = dp->param<int>("sleep", 0);
1231
20.4k
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1232
20.4k
        if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
1233
20.4k
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1234
20.4k
                    "injected DELETE_BITMAP_LOCK_ERROR");
1235
20.4k
        } else {
1236
20.4k
            return Status::InternalError<false>("injected non-retryable error");
1237
20.4k
        }
1238
20.4k
    });
1239
1240
20.4k
    return Status::OK();
1241
20.4k
}
1242
1243
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
1244
                                             DeleteBitmapPtr delete_bitmap, int64_t lock_id,
1245
20.3k
                                             int64_t next_visible_version, RowsetSharedPtr rowset) {
1246
20.3k
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1247
20.3k
    for (auto iter = delete_bitmap->delete_bitmap.begin();
1248
204k
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
1249
        // skip sentinel mark, which is used for delete bitmap correctness check
1250
183k
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
1251
7.98k
            new_delete_bitmap->merge(
1252
7.98k
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
1253
7.98k
                    iter->second);
1254
7.98k
        }
1255
183k
    }
1256
    // lock_id != -1 means this is in an explict txn
1257
20.3k
    bool is_explicit_txn = (lock_id != -1);
1258
18.4E
    auto ms_lock_id = !is_explicit_txn ? txn_id : lock_id;
1259
20.3k
    std::optional<StorageResource> storage_resource;
1260
20.3k
    auto storage_resource_result = rowset->rowset_meta()->remote_storage_resource();
1261
20.3k
    if (storage_resource_result) {
1262
20.3k
        storage_resource = *storage_resource_result.value();
1263
20.3k
    }
1264
20.3k
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
1265
20.3k
            *this, ms_lock_id, LOAD_INITIATOR_ID, new_delete_bitmap.get(), new_delete_bitmap.get(),
1266
20.3k
            rowset->rowset_id().to_string(), storage_resource,
1267
20.3k
            config::delete_bitmap_store_write_version, table_id(), txn_id, is_explicit_txn,
1268
20.3k
            next_visible_version));
1269
20.3k
    return Status::OK();
1270
20.3k
}
1271
1272
1
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
1273
1
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
1274
1275
    // sort the existing versions in ascending order
1276
1
    std::sort(existing_versions.begin(), existing_versions.end(),
1277
3
              [](const Version& a, const Version& b) {
1278
                  // simple because 2 versions are certainly not overlapping
1279
3
                  return a.first < b.first;
1280
3
              });
1281
1282
    // From the first version(=0), find the missing version until spec_version
1283
1
    int64_t last_version = -1;
1284
1
    Versions missed_versions;
1285
4
    for (const Version& version : existing_versions) {
1286
4
        if (version.first > last_version + 1) {
1287
            // there is a hole between versions
1288
2
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
1289
2
        }
1290
4
        last_version = version.second;
1291
4
        if (last_version >= spec_version) {
1292
1
            break;
1293
1
        }
1294
4
    }
1295
1
    if (last_version < spec_version) {
1296
        // there is a hole between the last version and the specificed version.
1297
0
        missed_versions.emplace_back(last_version + 1, spec_version);
1298
0
    }
1299
1
    return missed_versions;
1300
1
}
1301
1302
Status CloudTablet::calc_delete_bitmap_for_compaction(
1303
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
1304
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
1305
        int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
1306
3.65k
        bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) {
1307
3.65k
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1308
3.65k
    std::unique_ptr<RowLocationSet> missed_rows;
1309
3.65k
    if ((config::enable_missing_rows_correctness_check ||
1310
3.65k
         config::enable_mow_compaction_correctness_check_core ||
1311
3.65k
         config::enable_mow_compaction_correctness_check_fail) &&
1312
3.65k
        !allow_delete_in_cumu_compaction &&
1313
3.65k
        (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1314
0
         !config::enable_prune_delete_sign_when_base_compaction)) {
1315
        // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false
1316
0
        missed_rows = std::make_unique<RowLocationSet>();
1317
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
1318
0
    }
1319
1320
3.65k
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
1321
3.65k
    if (config::enable_rowid_conversion_correctness_check &&
1322
3.65k
        tablet_schema()->cluster_key_uids().empty()) {
1323
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
1324
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
1325
0
    }
1326
1327
    // 1. calc delete bitmap for historical data
1328
3.65k
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1329
3.65k
    Version version = max_version();
1330
3.65k
    std::size_t missed_rows_size = 0;
1331
3.65k
    calc_compaction_output_rowset_delete_bitmap(
1332
3.65k
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
1333
3.65k
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1334
3.65k
    if (missed_rows) {
1335
0
        missed_rows_size = missed_rows->size();
1336
0
        if (!allow_delete_in_cumu_compaction) {
1337
0
            if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1338
0
                 !config::enable_prune_delete_sign_when_base_compaction) &&
1339
0
                tablet_state() == TABLET_RUNNING) {
1340
0
                if (merged_rows + filtered_rows >= 0 &&
1341
0
                    merged_rows + filtered_rows != missed_rows_size) {
1342
0
                    std::string err_msg = fmt::format(
1343
0
                            "cumulative compaction: the merged rows({}), the filtered rows({}) is "
1344
0
                            "not equal to missed rows({}) in rowid conversion, tablet_id: {}, "
1345
0
                            "table_id:{}",
1346
0
                            merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id());
1347
0
                    LOG(WARNING) << err_msg;
1348
0
                    if (config::enable_mow_compaction_correctness_check_core) {
1349
0
                        CHECK(false) << err_msg;
1350
0
                    } else if (config::enable_mow_compaction_correctness_check_fail) {
1351
0
                        return Status::InternalError<false>(err_msg);
1352
0
                    } else {
1353
0
                        DCHECK(false) << err_msg;
1354
0
                    }
1355
0
                }
1356
0
            }
1357
0
        }
1358
0
    }
1359
3.65k
    if (location_map) {
1360
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1361
0
        location_map->clear();
1362
0
    }
1363
1364
    // 2. calc delete bitmap for incremental data
1365
3.65k
    int64_t t1 = MonotonicMicros();
1366
3.65k
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
1367
3.65k
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
1368
3.65k
    int64_t t2 = MonotonicMicros();
1369
3.65k
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
1370
3.63k
        g_cu_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1371
3.63k
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
1372
18
        g_base_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1373
18
    }
1374
3.65k
    get_delete_bitmap_lock_start_time = t2;
1375
3.65k
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1376
3.65k
    int64_t t3 = MonotonicMicros();
1377
1378
3.65k
    calc_compaction_output_rowset_delete_bitmap(
1379
3.65k
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
1380
3.65k
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1381
3.65k
    int64_t t4 = MonotonicMicros();
1382
3.65k
    if (location_map) {
1383
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1384
0
    }
1385
3.65k
    int64_t t5 = MonotonicMicros();
1386
1387
    // 3. store delete bitmap
1388
3.65k
    DeleteBitmapPtr delete_bitmap_v2 = nullptr;
1389
3.65k
    auto delete_bitmap_size = output_rowset_delete_bitmap->delete_bitmap.size();
1390
3.65k
    auto store_version = config::delete_bitmap_store_write_version;
1391
3.65k
    if (store_version == 2 || store_version == 3) {
1392
0
        delete_bitmap_v2 = std::make_shared<DeleteBitmap>(*output_rowset_delete_bitmap);
1393
0
        std::vector<std::pair<RowsetId, int64_t>> retained_rowsets_to_seg_num;
1394
0
        {
1395
0
            std::shared_lock rlock(get_header_lock());
1396
0
            for (const auto& [rowset_version, rowset_ptr] : rowset_map()) {
1397
0
                if (rowset_version.second < output_rowset->start_version()) {
1398
0
                    retained_rowsets_to_seg_num.emplace_back(
1399
0
                            std::make_pair(rowset_ptr->rowset_id(), rowset_ptr->num_segments()));
1400
0
                }
1401
0
            }
1402
0
        }
1403
0
        if (config::enable_agg_delta_delete_bitmap_for_store_v2) {
1404
0
            tablet_meta()->delete_bitmap().subset_and_agg(
1405
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1406
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1407
0
        } else {
1408
0
            tablet_meta()->delete_bitmap().subset(
1409
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1410
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1411
0
        }
1412
0
    }
1413
3.65k
    std::optional<StorageResource> storage_resource;
1414
3.65k
    auto storage_resource_result = output_rowset->rowset_meta()->remote_storage_resource();
1415
3.65k
    if (storage_resource_result) {
1416
3.46k
        storage_resource = *storage_resource_result.value();
1417
3.46k
    }
1418
3.65k
    auto st = _engine.meta_mgr().update_delete_bitmap(
1419
3.65k
            *this, -1, initiator, output_rowset_delete_bitmap.get(), delete_bitmap_v2.get(),
1420
3.65k
            output_rowset->rowset_id().to_string(), storage_resource, store_version, table_id());
1421
3.65k
    int64_t t6 = MonotonicMicros();
1422
3.65k
    LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
1423
3.65k
              << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
1424
3.65k
              << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
1425
3.65k
              << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
1426
3.65k
              << " us, st=" << st.to_string() << ". store_version=" << store_version
1427
3.65k
              << ", calculated delete bitmap size=" << delete_bitmap_size
1428
3.65k
              << ", update delete bitmap size="
1429
3.65k
              << output_rowset_delete_bitmap->delete_bitmap.size();
1430
3.65k
    return st;
1431
3.65k
}
1432
1433
void CloudTablet::agg_delete_bitmap_for_compaction(
1434
        int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
1435
        DeleteBitmapPtr& new_delete_bitmap,
1436
3.57k
        std::map<std::string, int64_t>& pre_rowset_to_versions) {
1437
9.25k
    for (auto& rowset : pre_rowsets) {
1438
13.7k
        for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
1439
4.51k
            auto d = tablet_meta()->delete_bitmap().get_agg_without_cache(
1440
4.51k
                    {rowset->rowset_id(), seg_id, end_version}, start_version);
1441
4.51k
            if (d->isEmpty()) {
1442
3.39k
                continue;
1443
3.39k
            }
1444
1.12k
            VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
1445
0
                       << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
1446
0
                       << ", rowset_version=" << rowset->version().to_string()
1447
0
                       << ". compaction start_version=" << start_version
1448
0
                       << ", end_version=" << end_version
1449
0
                       << ". delete_bitmap cardinality=" << d->cardinality();
1450
1.12k
            DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
1451
1.12k
            new_delete_bitmap->set(end_key, *d);
1452
1.12k
            pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
1453
1.12k
        }
1454
9.25k
    }
1455
3.57k
}
1456
1457
55.3k
Status CloudTablet::sync_meta() {
1458
55.3k
    if (!config::enable_file_cache) {
1459
1
        return Status::OK();
1460
1
    }
1461
1462
55.3k
    TabletMetaSharedPtr tablet_meta;
1463
55.3k
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
1464
55.3k
    if (!st.ok()) {
1465
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
1466
0
            clear_cache();
1467
0
        }
1468
0
        return st;
1469
0
    }
1470
1471
55.3k
    auto new_compaction_policy = tablet_meta->compaction_policy();
1472
55.3k
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
1473
1
        _tablet_meta->set_compaction_policy(new_compaction_policy);
1474
1
    }
1475
55.3k
    auto new_time_series_compaction_goal_size_mbytes =
1476
55.3k
            tablet_meta->time_series_compaction_goal_size_mbytes();
1477
55.3k
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
1478
55.3k
        new_time_series_compaction_goal_size_mbytes) {
1479
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
1480
0
                new_time_series_compaction_goal_size_mbytes);
1481
0
    }
1482
55.3k
    auto new_time_series_compaction_file_count_threshold =
1483
55.3k
            tablet_meta->time_series_compaction_file_count_threshold();
1484
55.3k
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
1485
55.3k
        new_time_series_compaction_file_count_threshold) {
1486
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
1487
0
                new_time_series_compaction_file_count_threshold);
1488
0
    }
1489
55.3k
    auto new_time_series_compaction_time_threshold_seconds =
1490
55.3k
            tablet_meta->time_series_compaction_time_threshold_seconds();
1491
55.3k
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
1492
55.3k
        new_time_series_compaction_time_threshold_seconds) {
1493
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
1494
0
                new_time_series_compaction_time_threshold_seconds);
1495
0
    }
1496
55.3k
    auto new_time_series_compaction_empty_rowsets_threshold =
1497
55.3k
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
1498
55.3k
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
1499
55.3k
        new_time_series_compaction_empty_rowsets_threshold) {
1500
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
1501
0
                new_time_series_compaction_empty_rowsets_threshold);
1502
0
    }
1503
55.3k
    auto new_time_series_compaction_level_threshold =
1504
55.3k
            tablet_meta->time_series_compaction_level_threshold();
1505
55.3k
    if (_tablet_meta->time_series_compaction_level_threshold() !=
1506
55.3k
        new_time_series_compaction_level_threshold) {
1507
0
        _tablet_meta->set_time_series_compaction_level_threshold(
1508
0
                new_time_series_compaction_level_threshold);
1509
0
    }
1510
    // Sync disable_auto_compaction (stored in tablet_schema)
1511
55.3k
    auto new_disable_auto_compaction = tablet_meta->tablet_schema()->disable_auto_compaction();
1512
55.3k
    if (_tablet_meta->tablet_schema()->disable_auto_compaction() != new_disable_auto_compaction) {
1513
5
        _tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction(
1514
5
                new_disable_auto_compaction);
1515
5
    }
1516
    // Sync vertical_compaction_num_columns_per_group
1517
55.3k
    auto new_vertical_compaction_num_columns_per_group =
1518
55.3k
            tablet_meta->vertical_compaction_num_columns_per_group();
1519
55.3k
    if (_tablet_meta->vertical_compaction_num_columns_per_group() !=
1520
55.3k
        new_vertical_compaction_num_columns_per_group) {
1521
0
        _tablet_meta->set_vertical_compaction_num_columns_per_group(
1522
0
                new_vertical_compaction_num_columns_per_group);
1523
0
    }
1524
1525
55.3k
    return Status::OK();
1526
55.3k
}
1527
1528
3.79M
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
1529
3.79M
    std::shared_lock rdlock(_meta_lock);
1530
3.79M
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
1531
3.79M
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
1532
    // Currently, this information will not be used by the cloud report,
1533
    // but it may be used in the future.
1534
3.79M
}
1535
1536
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
1537
20.2k
                                              DeleteBitmap* expected_delete_bitmap) {
1538
20.2k
    DeleteBitmapPtr cached_delete_bitmap;
1539
20.2k
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1540
20.2k
    Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
1541
20.2k
            txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
1542
20.3k
    if (st.ok()) {
1543
20.3k
        bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
1544
20.3k
        auto msg = fmt::format(
1545
20.3k
                "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
1546
20.3k
                "txn_id={}, tablet_id={}",
1547
20.3k
                expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
1548
20.3k
                tablet_id());
1549
20.3k
        if (!res) {
1550
0
            DCHECK(res) << msg;
1551
0
            return Status::InternalError<false>(msg);
1552
0
        }
1553
20.3k
    }
1554
20.2k
    return Status::OK();
1555
20.2k
}
1556
1557
35
WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
1558
35
    std::shared_lock rlock(_meta_lock);
1559
35
    if (!_rowset_warm_up_states.contains(rowset_id)) {
1560
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1561
1
    }
1562
34
    auto& warmup_info = _rowset_warm_up_states[rowset_id];
1563
34
    warmup_info.update_state();
1564
34
    return warmup_info.state;
1565
35
}
1566
1567
bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
1568
34
                                          std::chrono::steady_clock::time_point start_tp) {
1569
34
    std::lock_guard wlock(_meta_lock);
1570
34
    return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
1571
34
}
1572
1573
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
1574
5
                                                              RowsetId rowset_id, int64_t delta) {
1575
5
    std::lock_guard wlock(_meta_lock);
1576
5
    return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
1577
5
}
1578
1579
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
1580
                                                                       RowsetId rowset_id,
1581
5.19k
                                                                       int64_t delta) {
1582
5.19k
    auto it = _rowset_warm_up_states.find(rowset_id);
1583
5.19k
    if (it == _rowset_warm_up_states.end()) {
1584
0
        return false;
1585
0
    }
1586
5.19k
    if (it->second.state.trigger_source != source) {
1587
        // Only the same trigger source can update the state
1588
2
        return false;
1589
2
    }
1590
5.18k
    it->second.num_inverted_idx += delta;
1591
5.18k
    return true;
1592
5.19k
}
1593
1594
bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
1595
                                                   WarmUpTriggerSource source,
1596
52.6k
                                                   std::chrono::steady_clock::time_point start_tp) {
1597
52.6k
    auto rowset_id = rowset.rowset_id();
1598
1599
    // Check if rowset already has warmup state
1600
52.6k
    if (_rowset_warm_up_states.contains(rowset_id)) {
1601
10
        auto existing_state = _rowset_warm_up_states[rowset_id].state;
1602
1603
        // For job-triggered warmup (one-time and periodic warmup), allow it to proceed
1604
        // except when there's already another job-triggered warmup in progress
1605
10
        if (source == WarmUpTriggerSource::JOB) {
1606
5
            if (existing_state.trigger_source == WarmUpTriggerSource::JOB &&
1607
5
                existing_state.progress == WarmUpProgress::DOING) {
1608
                // Same job type already in progress, skip to avoid duplicate warmup
1609
1
                return false;
1610
1
            }
1611
5
        } else {
1612
            // For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists
1613
5
            return false;
1614
5
        }
1615
10
    }
1616
1617
52.6k
    if (source == WarmUpTriggerSource::JOB) {
1618
9
        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
1619
52.6k
    } else if (source == WarmUpTriggerSource::SYNC_ROWSET) {
1620
52.6k
        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
1621
18.4E
    } else if (source == WarmUpTriggerSource::EVENT_DRIVEN) {
1622
13
        g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1;
1623
13
    }
1624
52.6k
    _rowset_warm_up_states[rowset_id] = {
1625
52.6k
            .state = {.trigger_source = source,
1626
52.6k
                      .progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE
1627
52.6k
                                                              : WarmUpProgress::DOING)},
1628
52.6k
            .num_segments = rowset.num_segments(),
1629
52.6k
            .start_tp = start_tp};
1630
52.6k
    return true;
1631
52.6k
}
1632
1633
57.8k
void CloudTablet::RowsetWarmUpInfo::update_state() {
1634
57.8k
    if (has_finished()) {
1635
52.6k
        g_file_cache_warm_up_rowset_complete_num << 1;
1636
52.6k
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1637
52.6k
                            std::chrono::steady_clock::now() - start_tp)
1638
52.6k
                            .count();
1639
52.6k
        g_file_cache_warm_up_rowset_all_segments_latency << cost;
1640
52.6k
        state.progress = WarmUpProgress::DONE;
1641
52.6k
    }
1642
57.8k
}
1643
1644
WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
1645
                                                        RowsetId rowset_id, Status status,
1646
                                                        int64_t segment_num,
1647
57.8k
                                                        int64_t inverted_idx_num) {
1648
57.8k
    std::lock_guard wlock(_meta_lock);
1649
57.8k
    auto it = _rowset_warm_up_states.find(rowset_id);
1650
57.8k
    if (it == _rowset_warm_up_states.end()) {
1651
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1652
1
    }
1653
57.8k
    auto& warmup_info = it->second;
1654
57.8k
    if (warmup_info.state.trigger_source != trigger_source) {
1655
        // Only the same trigger source can update the state
1656
2
        return warmup_info.state;
1657
2
    }
1658
18.4E
    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status;
1659
57.8k
    if (segment_num > 0) {
1660
52.6k
        g_file_cache_warm_up_segment_complete_num << segment_num;
1661
52.6k
        if (!status.ok()) {
1662
1
            g_file_cache_warm_up_segment_failed_num << segment_num;
1663
1
        }
1664
52.6k
    }
1665
57.8k
    if (inverted_idx_num > 0) {
1666
5.18k
        g_file_cache_warm_up_inverted_idx_complete_num << inverted_idx_num;
1667
5.18k
        if (!status.ok()) {
1668
0
            g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num;
1669
0
        }
1670
5.18k
    }
1671
57.8k
    warmup_info.done(segment_num, inverted_idx_num);
1672
57.8k
    return warmup_info.state;
1673
57.8k
}
1674
1675
342
bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
1676
342
    auto it = _rowset_warm_up_states.find(rowset_id);
1677
342
    if (it == _rowset_warm_up_states.end()) {
1678
        // The rowset is not in warmup state, which means the rowset has never been warmed up.
1679
        // This may happen when the upstream BE tried to warm up rowsets on this BE but this BE
1680
        // was restarting so the warmup failed, and _rowset_warm_up_states has no entry for it.
1681
        //
1682
        // Normally the startup_timepoint check in rowset_is_warmed_up_unlocked() would filter out
1683
        // such rowsets (visible_timestamp < startup_timepoint → assumed warmed up). However,
1684
        // compaction-produced rowsets have their visible_timestamp set at rowset builder
1685
        // initialization time rather than the final transaction commit time on meta-service,
1686
        // so their visible_timestamp can be earlier than startup_timepoint, causing the
1687
        // startup_timepoint check to NOT filter them out and reaching here with no warmup entry.
1688
        //
1689
        // If such a rowset is before the cumulative compaction point and base compaction never
1690
        // happens, returning false here would cause the version path algorithm to exclude it,
1691
        // leading to a persistently low path_max_version. With continuous upstream ingestion,
1692
        // the freshness tolerance fallback check would keep triggering, making every query on
1693
        // this tablet fall back to reading all data from remote storage.
1694
        //
1695
        // Returning true (optimistically treating it as warmed up) allows the version path to
1696
        // include it. On cache miss the data is transparently read from remote storage per-segment
1697
        // and cached locally in 1MB blocks, so the problem self-heals through subsequent queries.
1698
7
        g_rowset_warmup_state_missing_count << 1;
1699
7
        LOG_EVERY_N(WARNING, 100) << fmt::format(
1700
1
                "rowset warmup state missing, considering it as warmed up. tablet_id={}, "
1701
1
                "rowset_id={}",
1702
1
                tablet_id(), rowset_id.to_string());
1703
7
        return true;
1704
7
    }
1705
335
    return it->second.state.progress == WarmUpProgress::DONE;
1706
342
}
1707
1708
675
void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
1709
675
    _rowset_warm_up_states[rowset_id] = {
1710
675
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1711
675
                      .progress = WarmUpProgress::DONE},
1712
675
            .num_segments = 1,
1713
675
            .start_tp = std::chrono::steady_clock::now()};
1714
675
}
1715
1716
93
void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) {
1717
93
    _rowset_warm_up_states[rowset_id] = {
1718
93
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1719
93
                      .progress = WarmUpProgress::DOING},
1720
93
            .num_segments = 1,
1721
93
            .start_tp = std::chrono::steady_clock::now()};
1722
93
}
1723
1724
bool CloudTablet::_check_rowset_should_be_visible_but_not_warmed_up(
1725
        const RowsetMetaSharedPtr& rs_meta, int64_t path_max_version,
1726
501
        std::chrono::system_clock::time_point freshness_limit_tp) const {
1727
501
    if (rs_meta->version() == Version {0, 1}) {
1728
        // skip rowset[0-1]
1729
26
        return false;
1730
26
    }
1731
475
    bool ret = rs_meta->start_version() > path_max_version &&
1732
475
               rs_meta->visible_timestamp() < freshness_limit_tp;
1733
475
    if (ret && config::read_cluster_cache_opt_verbose_log) {
1734
5
        using namespace std::chrono;
1735
5
        std::time_t t1 = system_clock::to_time_t(rs_meta->visible_timestamp());
1736
5
        std::tm tm1 = *std::localtime(&t1);
1737
5
        std::ostringstream oss1;
1738
5
        oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S");
1739
1740
5
        std::time_t t2 = system_clock::to_time_t(freshness_limit_tp);
1741
5
        std::tm tm2 = *std::localtime(&t2);
1742
5
        std::ostringstream oss2;
1743
5
        oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S");
1744
5
        LOG_INFO(
1745
5
                "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, "
1746
5
                "find a rowset which should be visible but not warmed up, tablet_id={}, "
1747
5
                "path_max_version={}, rowset_id={}, version={}, visible_time={}, "
1748
5
                "freshness_limit={}, version_graph={}, rowset_warmup_digest={}",
1749
5
                tablet_id(), path_max_version, rs_meta->rowset_id().to_string(),
1750
5
                rs_meta->version().to_string(), oss1.str(), oss2.str(),
1751
5
                _timestamped_version_tracker.debug_string(), rowset_warmup_digest());
1752
5
    }
1753
475
    return ret;
1754
501
}
1755
1756
void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs,
1757
                                                const StorageResource* storage_resource, int seg_id,
1758
1759
52.6k
                                                int64_t expiration_time) {
1760
    // clang-format off
1761
52.6k
    const auto& rowset_meta = rs->rowset_meta();
1762
52.6k
    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
1763
    // Use rowset_meta->fs() instead of storage_resource->fs to support packed file.
1764
    // RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
1765
    // packed_slice_locations is not empty, which correctly maps segment file paths
1766
    // to their actual locations within packed files.
1767
52.6k
    auto file_system = rowset_meta->fs();
1768
52.6k
    if (!file_system) {
1769
0
        LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
1770
0
                     << ", rowset_id=" << rowset_meta->rowset_id();
1771
0
        return;
1772
0
    }
1773
52.6k
    _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
1774
52.6k
            .path = storage_resource->remote_segment_path(*rowset_meta, seg_id),
1775
52.6k
            .file_size = rs->rowset_meta()->segment_file_size(seg_id),
1776
52.6k
            .file_system = file_system,
1777
52.6k
            .ctx = {
1778
52.6k
                    .expiration_time = expiration_time,
1779
52.6k
                    .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1780
52.6k
                    .is_warmup = true
1781
52.6k
            },
1782
52.6k
            .download_done {[=](Status st) {
1783
52.6k
                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
1784
52.6k
                            if (rs->version().second > rs->version().first) {
1785
52.6k
                                auto sleep_time = dp->param<int>("sleep", 3);
1786
52.6k
                                LOG_INFO(
1787
52.6k
                                        "[verbose] block download for rowset={}, "
1788
52.6k
                                        "version={}, sleep={}",
1789
52.6k
                                        rs->rowset_id().to_string(),
1790
52.6k
                                        rs->version().to_string(), sleep_time);
1791
52.6k
                                std::this_thread::sleep_for(
1792
52.6k
                                        std::chrono::seconds(sleep_time));
1793
52.6k
                            }
1794
52.6k
                });
1795
52.6k
                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
1796
52.6k
                if (!st) {
1797
0
                    LOG_WARNING("add rowset warm up error ").error(st);
1798
0
                }
1799
52.6k
            }},
1800
52.6k
            .tablet_id = _tablet_meta->tablet_id(),
1801
52.6k
    });
1802
    // clang-format on
1803
52.6k
}
1804
1805
void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs,
1806
                                                       const StorageResource* storage_resource,
1807
                                                       const io::Path& idx_path, int64_t idx_size,
1808
5.18k
                                                       int64_t expiration_time) {
1809
    // clang-format off
1810
5.18k
    const auto& rowset_meta = rs->rowset_meta();
1811
5.18k
    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
1812
    // Use rowset_meta->fs() instead of storage_resource->fs to support packed file for idx files.
1813
5.18k
    auto file_system = rowset_meta->fs();
1814
5.18k
    if (!file_system) {
1815
0
        LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
1816
0
                     << ", rowset_id=" << rowset_meta->rowset_id();
1817
0
        return;
1818
0
    }
1819
5.18k
    io::DownloadFileMeta meta {
1820
5.18k
            .path = idx_path,
1821
5.18k
            .file_size = idx_size,
1822
5.18k
            .file_system = file_system,
1823
5.18k
            .ctx = {
1824
5.18k
                    .expiration_time = expiration_time,
1825
5.18k
                    .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1826
5.18k
                    .is_warmup = true
1827
5.18k
            },
1828
5.18k
            .download_done {[=](Status st) {
1829
5.18k
                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_idx.callback.block", {
1830
5.18k
                    auto sleep_time = dp->param<int>("sleep", 3);
1831
5.18k
                    LOG_INFO(
1832
5.18k
                            "[verbose] block download for "
1833
5.18k
                            "rowset={}, inverted_idx_file={}, "
1834
5.18k
                            "sleep={}",
1835
5.18k
                            rs->rowset_id().to_string(), idx_path.string(), sleep_time);
1836
5.18k
                    std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
1837
5.18k
                });
1838
5.18k
                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1);
1839
5.18k
                if (!st) {
1840
0
                    LOG_WARNING("add rowset warm up error ").error(st);
1841
0
                }
1842
5.18k
            }},
1843
5.18k
            .tablet_id = _tablet_meta->tablet_id(),
1844
5.18k
    };
1845
5.18k
    self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
1846
5.18k
    _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
1847
5.18k
    g_file_cache_cloud_tablet_submitted_index_num << 1;
1848
5.18k
    g_file_cache_cloud_tablet_submitted_index_size << idx_size;
1849
    // clang-format on
1850
5.18k
}
1851
1852
void CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets,
1853
309k
                                        bool warmup_delta_data) {
1854
#ifdef BE_TEST
1855
    warmup_delta_data = false;
1856
#endif
1857
317k
    for (auto& rs : rowsets) {
1858
317k
        if (warmup_delta_data) {
1859
            // Pre-set encryption algorithm to avoid re-entrant get_tablet() call
1860
            // inside RowsetMeta::fs() which causes SingleFlight deadlock when the
1861
            // tablet is not yet cached (during initial load_tablet).
1862
172k
            rs->rowset_meta()->set_encryption_algorithm(_tablet_meta->encryption_algorithm());
1863
172k
            bool warm_up_state_updated = false;
1864
            // Warmup rowset data in background
1865
224k
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
1866
52.6k
                const auto& rowset_meta = rs->rowset_meta();
1867
52.6k
                constexpr int64_t interval = 600; // 10 mins
1868
                // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
1869
                // So we need to filter out the old rowsets avoid to download the whole table.
1870
52.6k
                if (warmup_delta_data &&
1871
52.6k
                    ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
1872
0
                    continue;
1873
0
                }
1874
1875
52.6k
                auto storage_resource = rowset_meta->remote_storage_resource();
1876
52.6k
                if (!storage_resource) {
1877
0
                    LOG(WARNING) << storage_resource.error();
1878
0
                    continue;
1879
0
                }
1880
1881
52.6k
                int64_t expiration_time = _tablet_meta->ttl_seconds();
1882
52.6k
                g_file_cache_cloud_tablet_submitted_segment_num << 1;
1883
52.6k
                if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
1884
52.6k
                    g_file_cache_cloud_tablet_submitted_segment_size
1885
52.6k
                            << rs->rowset_meta()->segment_file_size(seg_id);
1886
52.6k
                }
1887
52.6k
                if (!warm_up_state_updated) {
1888
52.6k
                    VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
1889
17
                               << ") triggerd by sync rowset";
1890
52.6k
                    if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
1891
52.6k
                                                          WarmUpTriggerSource::SYNC_ROWSET)) {
1892
0
                        LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id()
1893
0
                                  << ", skip it";
1894
0
                        break;
1895
0
                    }
1896
52.6k
                    warm_up_state_updated = true;
1897
52.6k
                }
1898
1899
52.6k
                if (!config::file_cache_enable_only_warm_up_idx) {
1900
52.6k
                    _submit_segment_download_task(rs, storage_resource.value(), seg_id,
1901
52.6k
                                                  expiration_time);
1902
52.6k
                }
1903
1904
52.6k
                auto schema_ptr = rowset_meta->tablet_schema();
1905
52.6k
                auto idx_version = schema_ptr->get_inverted_index_storage_format();
1906
52.6k
                if (idx_version == InvertedIndexStorageFormatPB::V1) {
1907
1.16k
                    std::unordered_map<int64_t, int64_t> index_size_map;
1908
1.16k
                    auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
1909
1.16k
                    for (const auto& info : inverted_index_info.index_info()) {
1910
434
                        if (info.index_file_size() != -1) {
1911
434
                            index_size_map[info.index_id()] = info.index_file_size();
1912
434
                        } else {
1913
0
                            VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
1914
0
                                       << ", index_id " << info.index_id();
1915
0
                        }
1916
434
                    }
1917
1.16k
                    for (const auto& index : schema_ptr->inverted_indexes()) {
1918
434
                        auto idx_path = storage_resource.value()->remote_idx_v1_path(
1919
434
                                *rowset_meta, seg_id, index->index_id(), index->get_index_suffix());
1920
434
                        _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path,
1921
434
                                                             index_size_map[index->index_id()],
1922
434
                                                             expiration_time);
1923
434
                    }
1924
51.4k
                } else {
1925
51.4k
                    if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
1926
4.75k
                        auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
1927
4.75k
                        int64_t idx_size = 0;
1928
4.75k
                        if (inverted_index_info.has_index_size()) {
1929
4.71k
                            idx_size = inverted_index_info.index_size();
1930
4.71k
                        } else {
1931
33
                            VLOG_DEBUG << "index_size is not set for segment " << seg_id;
1932
33
                        }
1933
4.75k
                        auto idx_path =
1934
4.75k
                                storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
1935
4.75k
                        _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path,
1936
4.75k
                                                             idx_size, expiration_time);
1937
4.75k
                    }
1938
51.4k
                }
1939
52.6k
            }
1940
172k
        }
1941
317k
        _rs_version_map.emplace(rs->version(), rs);
1942
317k
        _timestamped_version_tracker.add_version(rs->version());
1943
317k
        _max_version = std::max(rs->end_version(), _max_version);
1944
317k
        update_base_size(*rs);
1945
317k
    }
1946
309k
    _tablet_meta->add_rowsets_unchecked(rowsets);
1947
309k
}
1948
1949
789k
void CloudTablet::clear_unused_visible_pending_rowsets() {
1950
789k
    int64_t cur_max_version = max_version().second;
1951
789k
    int32_t max_version_count = max_version_config();
1952
789k
    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
1953
789k
                                   std::chrono::system_clock::now().time_since_epoch())
1954
789k
                                   .count();
1955
1956
789k
    std::unique_lock<std::mutex> wlock(_visible_pending_rs_lock);
1957
912k
    for (auto it = _visible_pending_rs_map.begin(); it != _visible_pending_rs_map.end();) {
1958
122k
        if (int64_t version = it->first, expiration_time = it->second.expiration_time;
1959
122k
            version <= cur_max_version || expiration_time < current_time) {
1960
122k
            it = _visible_pending_rs_map.erase(it);
1961
122k
        } else {
1962
13
            ++it;
1963
13
        }
1964
122k
    }
1965
1966
789k
    while (!_visible_pending_rs_map.empty() && _visible_pending_rs_map.size() > max_version_count) {
1967
0
        _visible_pending_rs_map.erase(--_visible_pending_rs_map.end());
1968
0
    }
1969
789k
}
1970
1971
void CloudTablet::try_make_committed_rs_visible(int64_t txn_id, int64_t visible_version,
1972
174k
                                                int64_t version_update_time_ms) {
1973
174k
    if (enable_unique_key_merge_on_write()) {
1974
        // for mow tablet, we get committed rowset from `CloudTxnDeleteBitmapCache` rather than `CommittedRowsetManager`
1975
51.6k
        try_make_committed_rs_visible_for_mow(txn_id, visible_version, version_update_time_ms);
1976
51.6k
        return;
1977
51.6k
    }
1978
1979
122k
    auto& committed_rs_mgr = _engine.committed_rs_mgr();
1980
122k
    auto res = committed_rs_mgr.get_committed_rowset(txn_id, tablet_id());
1981
122k
    if (!res.has_value()) {
1982
0
        return;
1983
0
    }
1984
122k
    auto [rowset_meta, expiration_time] = res.value();
1985
122k
    bool is_empty_rowset = (rowset_meta == nullptr);
1986
122k
    if (!is_empty_rowset) {
1987
35.7k
        rowset_meta->set_cloud_fields_after_visible(visible_version, version_update_time_ms);
1988
35.7k
    }
1989
122k
    {
1990
122k
        std::lock_guard<std::mutex> lock(_visible_pending_rs_lock);
1991
122k
        _visible_pending_rs_map.emplace(
1992
122k
                visible_version,
1993
122k
                VisiblePendingRowset {rowset_meta, expiration_time, is_empty_rowset});
1994
122k
    }
1995
122k
    apply_visible_pending_rowsets();
1996
122k
    committed_rs_mgr.remove_committed_rowset(txn_id, tablet_id());
1997
122k
}
1998
1999
void CloudTablet::try_make_committed_rs_visible_for_mow(int64_t txn_id, int64_t visible_version,
2000
51.6k
                                                        int64_t version_update_time_ms) {
2001
51.6k
    Defer defer {[&] {
2002
51.6k
        _engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, tablet_id());
2003
51.6k
    }};
2004
51.6k
    auto res = _engine.txn_delete_bitmap_cache().get_rowset_and_delete_bitmap(txn_id, tablet_id());
2005
51.6k
    if (!res.has_value()) {
2006
125
        return;
2007
125
    }
2008
51.5k
    auto [rowset, delete_bitmap] = res.value();
2009
51.5k
    bool is_empty_rowset = (rowset == nullptr);
2010
51.5k
    {
2011
51.5k
        std::unique_lock lock {_sync_meta_lock};
2012
51.5k
        std::unique_lock meta_wlock {_meta_lock};
2013
51.5k
        if (_max_version + 1 != visible_version) {
2014
558
            return;
2015
558
        }
2016
50.9k
        if (is_empty_rowset) {
2017
30.5k
            Versions existing_versions;
2018
218k
            for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
2019
218k
                existing_versions.emplace_back(rs->version());
2020
218k
            }
2021
30.5k
            if (existing_versions.empty()) {
2022
0
                return;
2023
0
            }
2024
30.5k
            auto max_version = std::ranges::max(existing_versions, {}, &Version::first);
2025
30.5k
            auto prev_rowset = get_rowset_by_version(max_version);
2026
30.5k
            auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
2027
30.5k
                    this, visible_version, prev_rowset->rowset_meta(), &rowset);
2028
30.5k
            if (!st.ok()) {
2029
0
                return;
2030
0
            }
2031
30.5k
        } else {
2032
186k
            for (const auto& [delete_bitmap_key, bitmap_value] : delete_bitmap->delete_bitmap) {
2033
                // skip sentinel mark, which is used for delete bitmap correctness check
2034
186k
                if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
2035
7.78k
                    tablet_meta()->delete_bitmap().merge(
2036
7.78k
                            {std::get<0>(delete_bitmap_key), std::get<1>(delete_bitmap_key),
2037
7.78k
                             visible_version},
2038
7.78k
                            bitmap_value);
2039
7.78k
                }
2040
186k
            }
2041
20.3k
        }
2042
50.9k
        rowset->rowset_meta()->set_cloud_fields_after_visible(visible_version,
2043
50.9k
                                                              version_update_time_ms);
2044
50.9k
        add_rowsets({rowset}, false, meta_wlock, true);
2045
50.9k
    }
2046
50.9k
    LOG(INFO) << "mow added visible pending rowset, txn_id=" << txn_id
2047
50.9k
              << ", tablet_id=" << tablet_id() << ", version=" << visible_version
2048
50.9k
              << ", rowset_id=" << rowset->rowset_id().to_string();
2049
50.9k
}
2050
2051
122k
void CloudTablet::apply_visible_pending_rowsets() {
2052
122k
    Defer defer {[&] { clear_unused_visible_pending_rowsets(); }};
2053
2054
122k
    std::unique_lock lock(_sync_meta_lock);
2055
122k
    std::unique_lock meta_wlock(_meta_lock);
2056
122k
    int64_t next_version = _max_version + 1;
2057
122k
    std::vector<RowsetSharedPtr> to_add;
2058
122k
    std::lock_guard<std::mutex> pending_lock(_visible_pending_rs_lock);
2059
122k
    for (auto it = _visible_pending_rs_map.upper_bound(_max_version);
2060
243k
         it != _visible_pending_rs_map.end(); ++it) {
2061
121k
        int64_t version = it->first;
2062
121k
        if (version != next_version) break;
2063
2064
121k
        auto& pending_rs = it->second;
2065
121k
        if (pending_rs.is_empty_rowset) {
2066
85.8k
            RowsetSharedPtr prev_rowset {nullptr};
2067
85.8k
            if (!to_add.empty()) {
2068
1
                prev_rowset = to_add.back();
2069
85.8k
            } else {
2070
85.8k
                Versions existing_versions;
2071
306k
                for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
2072
306k
                    existing_versions.emplace_back(rs->version());
2073
306k
                }
2074
85.8k
                if (existing_versions.empty()) {
2075
1
                    break;
2076
1
                }
2077
85.8k
                auto max_version = std::ranges::max(existing_versions, {}, &Version::first);
2078
85.8k
                prev_rowset = get_rowset_by_version(max_version);
2079
85.8k
            }
2080
85.8k
            RowsetSharedPtr rowset;
2081
85.8k
            auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
2082
85.8k
                    this, version, prev_rowset->rowset_meta(), &rowset);
2083
85.8k
            if (!st.ok()) {
2084
0
                return;
2085
0
            }
2086
85.8k
            to_add.push_back(std::move(rowset));
2087
85.8k
        } else {
2088
35.4k
            RowsetSharedPtr rowset;
2089
35.4k
            auto st = RowsetFactory::create_rowset(nullptr, "", pending_rs.rowset_meta, &rowset);
2090
35.4k
            if (!st.ok()) {
2091
0
                LOG(WARNING) << "failed to create rowset from pending rowset meta, tablet_id="
2092
0
                             << tablet_id() << ", version=" << version
2093
0
                             << ", rowset_id=" << pending_rs.rowset_meta->rowset_id().to_string()
2094
0
                             << ", error=" << st;
2095
0
                break;
2096
0
            }
2097
35.4k
            to_add.push_back(std::move(rowset));
2098
35.4k
        }
2099
121k
        next_version++;
2100
121k
    }
2101
122k
    if (!to_add.empty()) {
2102
121k
        add_rowsets(to_add, false, meta_wlock, true);
2103
121k
        LOG_INFO(
2104
121k
                "applied_visible_pending_rowsets, tablet_id={}, new_max_version={}, "
2105
121k
                "count={}, new_rowsets={}",
2106
121k
                tablet_id(), _max_version, to_add.size(),
2107
121k
                fmt::join(to_add | std::views::transform([](const RowsetSharedPtr& rs) {
2108
121k
                              return fmt::format("{}{}", rs->rowset_id().to_string(),
2109
121k
                                                 rs->version().to_string());
2110
121k
                          }),
2111
121k
                          ","));
2112
121k
    }
2113
122k
}
2114
2115
} // namespace doris