Coverage Report

Created: 2026-03-13 03:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_tablet.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/latency_recorder.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/rapidjson.h>
28
#include <rapidjson/stringbuffer.h>
29
30
#include <atomic>
31
#include <chrono>
32
#include <cstdint>
33
#include <memory>
34
#include <ranges>
35
#include <ratio>
36
#include <shared_mutex>
37
#include <unordered_map>
38
#include <vector>
39
40
#include "cloud/cloud_meta_mgr.h"
41
#include "cloud/cloud_storage_engine.h"
42
#include "cloud/cloud_tablet_mgr.h"
43
#include "cloud/cloud_warm_up_manager.h"
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "cpp/sync_point.h"
48
#include "io/cache/block_file_cache_downloader.h"
49
#include "io/cache/block_file_cache_factory.h"
50
#include "storage/compaction/compaction.h"
51
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
52
#include "storage/index/inverted/inverted_index_desc.h"
53
#include "storage/olap_define.h"
54
#include "storage/rowset/beta_rowset.h"
55
#include "storage/rowset/rowset.h"
56
#include "storage/rowset/rowset_factory.h"
57
#include "storage/rowset/rowset_fwd.h"
58
#include "storage/rowset/rowset_writer.h"
59
#include "storage/storage_policy.h"
60
#include "storage/tablet/base_tablet.h"
61
#include "storage/tablet/tablet_schema.h"
62
#include "storage/txn/txn_manager.h"
63
#include "util/debug_points.h"
64
#include "util/stack_util.h"
65
66
namespace doris {
67
#include "common/compile_check_begin.h"
68
using namespace ErrorCode;
69
70
bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms(
71
        "cu_compaction_get_delete_bitmap_lock_time_ms");
72
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
73
        "base_compaction_get_delete_bitmap_lock_time_ms");
74
75
bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
76
bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");
77
78
bvar::Adder<int64_t> g_capture_prefer_cache_count("capture_prefer_cache_count");
79
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count(
80
        "capture_with_freshness_tolerance_count");
81
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count(
82
        "capture_with_freshness_tolerance_fallback_count");
83
bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window(
84
        "capture_prefer_cache_count_window", &g_capture_prefer_cache_count, 30);
85
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_count_window(
86
        "capture_with_freshness_tolerance_count_window", &g_capture_with_freshness_tolerance_count,
87
        30);
88
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_count_window(
89
        "capture_with_freshness_tolerance_fallback_count_window",
90
        &g_capture_with_freshness_tolerance_fallback_count, 30);
91
92
static constexpr int LOAD_INITIATOR_ID = -1;
93
94
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
95
        "file_cache_cloud_tablet_submitted_segment_size");
96
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
97
        "file_cache_cloud_tablet_submitted_segment_num");
98
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
99
        "file_cache_cloud_tablet_submitted_index_size");
100
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
101
        "file_cache_cloud_tablet_submitted_index_num");
102
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
103
        "file_cache_cloud_tablet_finished_segment_size");
104
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
105
        "file_cache_cloud_tablet_finished_segment_num");
106
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
107
        "file_cache_cloud_tablet_finished_index_size");
108
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
109
        "file_cache_cloud_tablet_finished_index_num");
110
111
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
112
        "file_cache_recycle_cached_data_segment_num");
113
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
114
        "file_cache_recycle_cached_data_segment_size");
115
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
116
        "file_cache_recycle_cached_data_index_num");
117
118
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
119
        "file_cache_warm_up_segment_complete_num");
120
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
121
        "file_cache_warm_up_segment_failed_num");
122
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_complete_num(
123
        "file_cache_warm_up_inverted_idx_complete_num");
124
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_failed_num(
125
        "file_cache_warm_up_inverted_idx_failed_num");
126
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
127
        "file_cache_warm_up_rowset_complete_num");
128
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
129
        "file_cache_warm_up_rowset_triggered_by_job_num");
130
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
131
        "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
132
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num(
133
        "file_cache_warm_up_rowset_triggered_by_event_driven_num");
134
bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency(
135
        "file_cache_warm_up_rowset_all_segments_latency");
136
137
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
138
134k
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
139
140
34.5k
CloudTablet::~CloudTablet() = default;
141
142
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
143
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
144
0
}
145
146
12.3k
std::string CloudTablet::tablet_path() const {
147
12.3k
    return "";
148
12.3k
}
149
150
Status CloudTablet::capture_rs_readers(const Version& spec_version,
151
                                       std::vector<RowSetSplits>* rs_splits,
152
5.50k
                                       const CaptureRowsetOps& opts) {
153
5.50k
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
154
5.50k
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
155
5.50k
        return Status::Error<false>(-230, "injected error");
156
5.50k
    });
157
5.50k
    std::shared_lock rlock(_meta_lock);
158
5.50k
    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
159
5.50k
            spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions}));
160
5.50k
    return Status::OK();
161
5.50k
}
162
163
[[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked(
164
1.27M
        const Version& version_range, const CaptureRowsetOps& options) const {
165
1.27M
    if (options.query_freshness_tolerance_ms > 0) {
166
24
        return capture_versions_with_freshness_tolerance(version_range, options);
167
1.27M
    } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
168
15
        return capture_versions_prefer_cache(version_range);
169
15
    }
170
1.27M
    return BaseTablet::capture_consistent_versions_unlocked(version_range, options);
171
1.27M
}
172
173
Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
174
15
        const Version& spec_version) const {
175
15
    g_capture_prefer_cache_count << 1;
176
15
    Versions version_path;
177
15
    std::shared_lock rlock(_meta_lock);
178
15
    auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache(
179
15
            spec_version, version_path,
180
134
            [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); });
181
15
    if (!st.ok()) {
182
0
        return ResultError(st);
183
0
    }
184
15
    int64_t path_max_version = version_path.back().second;
185
15
    VLOG_DEBUG << fmt::format(
186
0
            "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, "
187
0
            "tablet_id={}, spec_version={}, path_max_version={}",
188
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
189
0
                          return fmt::format("{}", version.to_string());
190
0
                      }),
191
0
                      ", "),
192
0
            tablet_id(), spec_version.to_string(), path_max_version);
193
15
    return version_path;
194
15
}
195
196
260
bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const {
197
260
    if (start_version > end_version) {
198
0
        return false;
199
0
    }
200
260
    Version version {start_version, end_version};
201
260
    auto it = _rs_version_map.find(version);
202
260
    if (it == _rs_version_map.end()) {
203
78
        it = _stale_rs_version_map.find(version);
204
78
        if (it == _stale_rs_version_map.end()) {
205
0
            LOG_WARNING(
206
0
                    "fail to find Rowset in rs_version or stale_rs_version for version. "
207
0
                    "tablet={}, version={}",
208
0
                    tablet_id(), version.to_string());
209
0
            return false;
210
0
        }
211
78
    }
212
260
    const auto& rs = it->second;
213
260
    if (rs->visible_timestamp() < _engine.startup_timepoint()) {
214
        // We only care about rowsets that are created after startup time point. For other rowsets,
215
        // we assume they are warmed up.
216
16
        return true;
217
16
    }
218
244
    return is_rowset_warmed_up(rs->rowset_id());
219
260
};
220
221
Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance(
222
24
        const Version& spec_version, const CaptureRowsetOps& options) const {
223
24
    g_capture_with_freshness_tolerance_count << 1;
224
24
    using namespace std::chrono;
225
24
    auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
226
24
    auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms);
227
    // find a version path where every edge(rowset) has been warmuped
228
24
    Versions version_path;
229
24
    std::shared_lock rlock(_meta_lock);
230
24
    if (enable_unique_key_merge_on_write()) {
231
        // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
232
        // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue.
233
13
        RETURN_IF_ERROR_RESULT(
234
13
                _timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
235
13
                        spec_version, version_path, [&](int64_t start, int64_t end) {
236
13
                            return rowset_is_warmed_up_unlocked(start, end);
237
13
                        }));
238
13
    } else {
239
11
        RETURN_IF_ERROR_RESULT(
240
11
                _timestamped_version_tracker.capture_consistent_versions_with_validator(
241
11
                        spec_version, version_path, [&](int64_t start, int64_t end) {
242
11
                            return rowset_is_warmed_up_unlocked(start, end);
243
11
                        }));
244
11
    }
245
24
    int64_t path_max_version = version_path.back().second;
246
    // use std::views::concat after C++26
247
343
    auto check_fn = [this, path_max_version, freshness_limit_tp](const auto& rs_meta) {
248
343
        return _check_rowset_should_be_visible_but_not_warmed_up(rs_meta, path_max_version,
249
343
                                                                 freshness_limit_tp);
250
343
    };
251
24
    bool should_fallback =
252
24
            std::ranges::any_of(std::views::values(_tablet_meta->all_rs_metas()), check_fn) ||
253
24
            std::ranges::any_of(std::views::values(_tablet_meta->all_stale_rs_metas()), check_fn);
254
24
    if (should_fallback) {
255
9
        rlock.unlock();
256
9
        g_capture_with_freshness_tolerance_fallback_count << 1;
257
        // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version
258
        // but has not been warmuped up yet, fallback to capture rowsets as usual
259
9
        return BaseTablet::capture_consistent_versions_unlocked(spec_version, options);
260
9
    }
261
15
    VLOG_DEBUG << fmt::format(
262
0
            "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, "
263
0
            "tablet_id={}, spec_version={}, path_max_version={}",
264
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
265
0
                          return fmt::format("{}", version.to_string());
266
0
                      }),
267
0
                      ", "),
268
0
            tablet_id(), spec_version.to_string(), path_max_version);
269
15
    return version_path;
270
24
}
271
272
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
273
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
274
1.38M
Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* stats) {
275
1.38M
    RETURN_IF_ERROR(sync_if_not_running(stats));
276
277
1.38M
    if (options.query_version > 0) {
278
1.26M
        std::shared_lock rlock(_meta_lock);
279
1.26M
        if (_max_version >= options.query_version) {
280
1.16M
            return Status::OK();
281
1.16M
        }
282
1.26M
    }
283
284
    // serially execute sync to reduce unnecessary network overhead
285
222k
    std::unique_lock lock(_sync_meta_lock);
286
222k
    if (options.query_version > 0) {
287
114k
        std::shared_lock rlock(_meta_lock);
288
114k
        if (_max_version >= options.query_version) {
289
9.70k
            return Status::OK();
290
9.70k
        }
291
114k
    }
292
293
212k
    auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, options, stats);
294
212k
    if (st.is<ErrorCode::NOT_FOUND>()) {
295
0
        clear_cache();
296
0
    }
297
298
212k
    return st;
299
222k
}
300
301
// Sync tablet meta and all rowset meta if not running.
302
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
303
// It should be a quite rare situation.
304
1.38M
Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
305
1.39M
    if (tablet_state() == TABLET_RUNNING) {
306
1.39M
        return Status::OK();
307
1.39M
    }
308
309
    // Serially execute sync to reduce unnecessary network overhead
310
18.4E
    std::unique_lock lock(_sync_meta_lock);
311
312
18.4E
    {
313
18.4E
        std::shared_lock rlock(_meta_lock);
314
18.4E
        if (tablet_state() == TABLET_RUNNING) {
315
0
            return Status::OK();
316
0
        }
317
18.4E
    }
318
319
18.4E
    TabletMetaSharedPtr tablet_meta;
320
18.4E
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
321
18.4E
    if (!st.ok()) {
322
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
323
0
            clear_cache();
324
0
        }
325
0
        return st;
326
0
    }
327
328
18.4E
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
329
        // MoW may go to here when load while schema change
330
107
        return Status::OK();
331
107
    }
332
333
18.4E
    TimestampedVersionTracker empty_tracker;
334
18.4E
    {
335
18.4E
        std::lock_guard wlock(_meta_lock);
336
18.4E
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
337
18.4E
        _rs_version_map.clear();
338
18.4E
        _stale_rs_version_map.clear();
339
18.4E
        std::swap(_timestamped_version_tracker, empty_tracker);
340
18.4E
        _tablet_meta->clear_rowsets();
341
18.4E
        _tablet_meta->clear_stale_rowset();
342
18.4E
        _max_version = -1;
343
18.4E
    }
344
345
0
    st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, {}, stats);
346
18.4E
    if (st.is<ErrorCode::NOT_FOUND>()) {
347
0
        clear_cache();
348
0
    }
349
18.4E
    return st;
350
18.4E
}
351
352
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
353
                              std::unique_lock<std::shared_mutex>& meta_lock,
354
296k
                              bool warmup_delta_data) {
355
296k
    if (to_add.empty()) {
356
6.09k
        return;
357
6.09k
    }
358
359
18.4E
    VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace();
360
361
290k
    if (!version_overlap) {
362
282k
        _add_rowsets_directly(to_add, warmup_delta_data);
363
282k
        return;
364
282k
    }
365
366
    // Filter out existed rowsets
367
7.20k
    auto remove_it =
368
11.3k
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
369
11.3k
                if (auto find_it = _rs_version_map.find(rs->version());
370
11.3k
                    find_it == _rs_version_map.end()) {
371
10.7k
                    return false;
372
10.7k
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
373
47
                    return true; // Same rowset
374
47
                }
375
376
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
377
                // replace existed rowset with `to_add` rowset. This may occur when:
378
                //  1. schema change converts rowsets which have been double written to new tablet
379
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
380
381
                // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
382
383
502
                std::vector<RowsetSharedPtr> unused_rowsets;
384
502
                if (auto find_it = _rs_version_map.find(rs->version());
385
505
                    find_it != _rs_version_map.end()) {
386
505
                    if (find_it->second->rowset_id() == rs->rowset_id()) {
387
0
                        LOG(WARNING) << "tablet_id=" << tablet_id()
388
0
                                     << ", rowset_id=" << rs->rowset_id().to_string()
389
0
                                     << ", existed rowset_id="
390
0
                                     << find_it->second->rowset_id().to_string();
391
0
                        DCHECK(find_it->second->rowset_id() != rs->rowset_id())
392
0
                                << "tablet_id=" << tablet_id()
393
0
                                << ", rowset_id=" << rs->rowset_id().to_string()
394
0
                                << ", existed rowset_id="
395
0
                                << find_it->second->rowset_id().to_string();
396
0
                    }
397
505
                    unused_rowsets.push_back(find_it->second);
398
505
                }
399
502
                add_unused_rowsets(unused_rowsets);
400
401
502
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
402
502
                _rs_version_map[rs->version()] = rs;
403
502
                _tablet_meta->add_rowsets_unchecked({rs});
404
502
                update_base_size(*rs);
405
502
                return true;
406
11.3k
            });
407
408
7.20k
    to_add.erase(remove_it, to_add.end());
409
410
    // delete rowsets with overlapped version
411
7.20k
    std::vector<RowsetSharedPtr> to_add_directly;
412
10.7k
    for (auto& to_add_rs : to_add) {
413
        // delete rowsets with overlapped version
414
10.7k
        std::vector<RowsetSharedPtr> to_delete;
415
10.7k
        Version to_add_v = to_add_rs->version();
416
        // if start_version  > max_version, we can skip checking overlap here.
417
10.7k
        if (to_add_v.first > _max_version) {
418
            // if start_version  > max_version, we can skip checking overlap here.
419
10.7k
            to_add_directly.push_back(to_add_rs);
420
10.7k
        } else {
421
51
            to_add_directly.push_back(to_add_rs);
422
203
            for (auto& [v, rs] : _rs_version_map) {
423
203
                if (to_add_v.contains(v)) {
424
0
                    to_delete.push_back(rs);
425
0
                }
426
203
            }
427
51
            delete_rowsets(to_delete, meta_lock);
428
51
        }
429
10.7k
    }
430
431
7.20k
    _add_rowsets_directly(to_add_directly, warmup_delta_data);
432
7.20k
}
433
434
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
435
9.67k
                                 std::unique_lock<std::shared_mutex>&) {
436
9.67k
    if (to_delete.empty()) {
437
52
        return;
438
52
    }
439
9.62k
    std::vector<RowsetMetaSharedPtr> rs_metas;
440
9.62k
    rs_metas.reserve(to_delete.size());
441
9.62k
    int64_t now = ::time(nullptr);
442
75.3k
    for (auto&& rs : to_delete) {
443
75.3k
        rs->rowset_meta()->set_stale_at(now);
444
75.3k
        rs_metas.push_back(rs->rowset_meta());
445
75.3k
        _stale_rs_version_map[rs->version()] = rs;
446
75.3k
    }
447
9.62k
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
448
75.2k
    for (auto&& rs : to_delete) {
449
75.2k
        _rs_version_map.erase(rs->version());
450
75.2k
    }
451
452
9.62k
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
453
9.62k
}
454
455
9.47k
uint64_t CloudTablet::delete_expired_stale_rowsets() {
456
9.47k
    if (config::enable_mow_verbose_log) {
457
0
        LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
458
0
    }
459
9.47k
    std::vector<RowsetSharedPtr> expired_rowsets;
460
    // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
461
9.47k
    std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
462
9.47k
    int64_t expired_stale_sweep_endtime =
463
9.47k
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
464
9.47k
    {
465
9.47k
        std::unique_lock wlock(_meta_lock);
466
467
9.47k
        std::vector<int64_t> path_ids;
468
        // capture the path version to delete
469
9.47k
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
470
471
9.47k
        if (path_ids.empty()) {
472
4.70k
            return 0;
473
4.70k
        }
474
475
9.37k
        for (int64_t path_id : path_ids) {
476
9.37k
            int64_t start_version = -1;
477
9.37k
            int64_t end_version = -1;
478
9.37k
            std::vector<RowsetSharedPtr> stale_rowsets;
479
            // delete stale versions in version graph
480
9.37k
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
481
73.8k
            for (auto& v_ts : version_path->timestamped_versions()) {
482
73.8k
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
483
73.8k
                if (rs_it != _stale_rs_version_map.end()) {
484
73.8k
                    expired_rowsets.push_back(rs_it->second);
485
73.8k
                    stale_rowsets.push_back(rs_it->second);
486
73.8k
                    VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id()
487
0
                               << " rowset_id=" << rs_it->second->rowset_id().to_string()
488
0
                               << " version=" << rs_it->first.to_string();
489
73.8k
                    _stale_rs_version_map.erase(rs_it);
490
73.8k
                } else {
491
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
492
0
                                 << tablet_id();
493
                    // clang-format off
494
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
495
                    // clang-format on
496
0
                }
497
73.8k
                if (start_version < 0) {
498
9.37k
                    start_version = v_ts->version().first;
499
9.37k
                }
500
73.8k
                end_version = v_ts->version().second;
501
73.8k
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
502
73.8k
            }
503
9.37k
            Version version(start_version, end_version);
504
9.37k
            if (!stale_rowsets.empty()) {
505
9.37k
                deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
506
9.37k
            }
507
9.37k
        }
508
4.77k
        _reconstruct_version_tracker_if_necessary();
509
4.77k
    }
510
511
    // if the rowset is not used by any query, we can recycle its cached data early.
512
0
    auto recycled_rowsets = recycle_cached_data(expired_rowsets);
513
4.77k
    if (!recycled_rowsets.empty()) {
514
4.77k
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
515
4.77k
        manager.recycle_cache(tablet_id(), recycled_rowsets);
516
4.77k
    }
517
4.77k
    if (config::enable_mow_verbose_log) {
518
0
        LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
519
0
    }
520
521
4.77k
    add_unused_rowsets(expired_rowsets);
522
4.77k
    if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS &&
523
4.77k
        enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) {
524
        // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
525
716
        OlapStopWatch watch;
526
3.81k
        for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
527
            // agg delete bitmap for pre rowset
528
3.81k
            DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
529
3.81k
            agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
530
            // add remove delete bitmap
531
3.81k
            if (!remove_delete_bitmap_key_ranges.empty()) {
532
1.10k
                std::vector<RowsetId> rowset_ids;
533
13.1k
                for (const auto& rs : unused_rowsets) {
534
13.1k
                    rowset_ids.push_back(rs->rowset_id());
535
13.1k
                }
536
1.10k
                std::lock_guard<std::mutex> lock(_gc_mutex);
537
1.10k
                _unused_delete_bitmap.push_back(
538
1.10k
                        std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
539
1.10k
            }
540
3.81k
        }
541
716
        LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
542
716
                  << ", size=" << deleted_stale_rowsets.size()
543
716
                  << ", cost(us)=" << watch.get_elapse_time_us();
544
716
    }
545
4.77k
    return expired_rowsets.size();
546
9.47k
}
547
548
736k
bool CloudTablet::need_remove_unused_rowsets() {
549
736k
    std::lock_guard<std::mutex> lock(_gc_mutex);
550
736k
    return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
551
736k
}
552
553
5.27k
void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
554
5.27k
    std::lock_guard<std::mutex> lock(_gc_mutex);
555
74.3k
    for (const auto& rowset : rowsets) {
556
74.3k
        _unused_rowsets[rowset->rowset_id()] = rowset;
557
74.3k
        g_unused_rowsets_bytes << rowset->total_disk_size();
558
74.3k
    }
559
5.27k
    g_unused_rowsets_count << rowsets.size();
560
5.27k
}
561
562
4.93k
void CloudTablet::remove_unused_rowsets() {
563
4.93k
    std::vector<std::shared_ptr<Rowset>> removed_rowsets;
564
4.93k
    int64_t removed_delete_bitmap_num = 0;
565
4.93k
    OlapStopWatch watch;
566
4.93k
    {
567
4.93k
        std::lock_guard<std::mutex> lock(_gc_mutex);
568
        // 1. remove unused rowsets's cache data and delete bitmap
569
79.2k
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
570
74.3k
            auto& rs = it->second;
571
74.3k
            if (rs.use_count() > 1) {
572
0
                LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
573
0
                             << " has " << rs.use_count() << " references, it cannot be removed";
574
0
                ++it;
575
0
                continue;
576
0
            }
577
74.3k
            tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
578
74.3k
            _rowset_warm_up_states.erase(rs->rowset_id());
579
74.3k
            rs->clear_cache();
580
74.3k
            g_unused_rowsets_count << -1;
581
74.3k
            g_unused_rowsets_bytes << -rs->total_disk_size();
582
74.3k
            removed_rowsets.push_back(std::move(rs));
583
74.3k
            it = _unused_rowsets.erase(it);
584
74.3k
        }
585
4.93k
    }
586
587
4.93k
    {
588
4.93k
        std::vector<RecycledRowsets> recycled_rowsets;
589
590
74.3k
        for (auto& rs : removed_rowsets) {
591
74.3k
            auto index_names = rs->get_index_file_names();
592
74.3k
            recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
593
74.3k
            int64_t segment_size_sum = 0;
594
99.6k
            for (int32_t i = 0; i < rs->num_segments(); i++) {
595
25.3k
                segment_size_sum += rs->rowset_meta()->segment_file_size(i);
596
25.3k
            }
597
74.3k
            g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
598
74.3k
            g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
599
74.3k
            g_file_cache_recycle_cached_data_index_num << index_names.size();
600
74.3k
        }
601
602
4.93k
        if (recycled_rowsets.size() > 0) {
603
4.93k
            auto& manager =
604
4.93k
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
605
4.93k
            manager.recycle_cache(tablet_id(), recycled_rowsets);
606
4.93k
        }
607
4.93k
    }
608
609
4.93k
    {
610
4.93k
        std::lock_guard<std::mutex> lock(_gc_mutex);
611
        // 2. remove delete bitmap of pre rowsets
612
6.03k
        for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
613
1.10k
            auto& rowset_ids = std::get<0>(*it);
614
1.10k
            bool find_unused_rowset = false;
615
13.1k
            for (const auto& rowset_id : rowset_ids) {
616
13.1k
                if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
617
0
                    LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
618
0
                              << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
619
0
                    find_unused_rowset = true;
620
0
                    break;
621
0
                }
622
13.1k
            }
623
1.10k
            if (find_unused_rowset) {
624
0
                ++it;
625
0
                continue;
626
0
            }
627
1.10k
            auto& key_ranges = std::get<1>(*it);
628
1.10k
            tablet_meta()->delete_bitmap().remove(key_ranges);
629
1.10k
            it = _unused_delete_bitmap.erase(it);
630
1.10k
            removed_delete_bitmap_num++;
631
            // TODO(kaijie): recycle cache for unused delete bitmap
632
1.10k
        }
633
4.93k
    }
634
635
4.93k
    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
636
4.93k
              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
637
4.93k
              << ", removed_rowsets_num=" << removed_rowsets.size()
638
4.93k
              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
639
4.93k
              << ", cost(us)=" << watch.get_elapse_time_us();
640
4.93k
}
641
642
349k
void CloudTablet::update_base_size(const Rowset& rs) {
643
    // Define base rowset as the rowset of version [2-x]
644
349k
    if (rs.start_version() == 2) {
645
117k
        _base_size = rs.total_disk_size();
646
117k
    }
647
349k
}
648
649
241
void CloudTablet::clear_cache() {
650
241
    auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
651
241
    if (!recycled_rowsets.empty()) {
652
240
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
653
240
        manager.recycle_cache(tablet_id(), recycled_rowsets);
654
240
    }
655
241
    _engine.tablet_mgr().erase_tablet(tablet_id());
656
241
}
657
658
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
659
5.01k
        const std::vector<RowsetSharedPtr>& rowsets) {
660
5.01k
    std::vector<RecycledRowsets> recycled_rowsets;
661
76.6k
    for (const auto& rs : rowsets) {
662
        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
663
76.6k
        if (rs.use_count() > 2) {
664
1.94k
            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
665
1.94k
                         << " references. File Cache won't be recycled when query is using it.";
666
1.94k
            continue;
667
1.94k
        }
668
74.6k
        rs->clear_cache();
669
74.6k
        auto index_names = rs->get_index_file_names();
670
74.6k
        recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
671
672
74.6k
        int64_t segment_size_sum = 0;
673
100k
        for (int32_t i = 0; i < rs->num_segments(); i++) {
674
25.4k
            segment_size_sum += rs->rowset_meta()->segment_file_size(i);
675
25.4k
        }
676
74.6k
        g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
677
74.6k
        g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
678
74.6k
        g_file_cache_recycle_cached_data_index_num << index_names.size();
679
74.6k
    }
680
5.01k
    return recycled_rowsets;
681
5.01k
}
682
683
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
684
386k
                                          int64_t num_rows, int64_t data_size) {
685
386k
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
686
386k
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
687
386k
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
688
386k
    int64_t cumu_num_deltas = 0;
689
386k
    int64_t cumu_num_rowsets = 0;
690
386k
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
691
1.54M
    for (auto& [v, r] : _rs_version_map) {
692
1.54M
        if (v.second < cp) {
693
454k
            continue;
694
454k
        }
695
1.09M
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
696
1.09M
        ++cumu_num_rowsets;
697
1.09M
    }
698
    // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets
699
    // in the version map, so we use the max value to ensure that the approximate number
700
    // of rowsets is at least the size of _rs_version_map.
701
    // Note that this is not the exact number of rowsets, but an approximate number.
702
386k
    int64_t approximate_num_rowsets =
703
386k
            std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size()));
704
386k
    _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed);
705
386k
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
706
386k
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
707
386k
}
708
709
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
710
204k
        RowsetWriterContext& context, bool vertical) {
711
204k
    context.rowset_id = _engine.next_rowset_id();
712
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
713
204k
    context.tablet_id = tablet_id();
714
204k
    context.index_id = index_id();
715
204k
    context.partition_id = partition_id();
716
204k
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
717
204k
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
718
204k
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
719
204k
}
720
721
// create a rowset writer with rowset_id and seg_id
722
// after writer, merge this transient rowset with original rowset
723
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
724
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
725
3.31k
        int64_t txn_expiration) {
726
3.31k
    if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
727
3.31k
        rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
728
0
        auto msg = fmt::format(
729
0
                "wrong rowset state when create_transient_rowset_writer, rowset state should be "
730
0
                "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
731
0
                RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
732
0
                tablet_id());
733
        // see `CloudRowsetWriter::build` for detail.
734
        // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
735
        // in `RowsetMeta::merge_rowset_meta()` in previous trials.
736
0
        LOG(WARNING) << msg;
737
0
        DCHECK(false) << msg;
738
0
    }
739
3.31k
    RowsetWriterContext context;
740
3.31k
    context.rowset_state = PREPARED;
741
3.31k
    context.segments_overlap = OVERLAPPING;
742
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
743
    // This is because the partial update for a variant needs to ignore the extracted columns.
744
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
745
    // the complete variant is constructed by reading all the sub-columns of the variant.
746
3.31k
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
747
3.31k
    context.newest_write_timestamp = UnixSeconds();
748
3.31k
    context.tablet_id = table_id();
749
3.31k
    context.enable_segcompaction = false;
750
3.31k
    context.write_type = DataWriteType::TYPE_DIRECT;
751
3.31k
    context.partial_update_info = std::move(partial_update_info);
752
3.31k
    context.is_transient_rowset_writer = true;
753
3.31k
    context.rowset_id = rowset.rowset_id();
754
3.31k
    context.tablet_id = tablet_id();
755
3.31k
    context.index_id = index_id();
756
3.31k
    context.partition_id = partition_id();
757
3.31k
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
758
3.31k
    context.txn_expiration = txn_expiration;
759
3.31k
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
760
    // TODO(liaoxin) enable packed file for transient rowset
761
3.31k
    context.allow_packed_file = false;
762
763
3.31k
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
764
3.31k
    if (!storage_resource) {
765
0
        return ResultError(std::move(storage_resource.error()));
766
0
    }
767
768
3.31k
    context.storage_resource = *storage_resource.value();
769
770
3.31k
    return RowsetFactory::create_rowset_writer(_engine, context, false)
771
3.31k
            .transform([&](auto&& writer) {
772
3.26k
                writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
773
3.26k
                return writer;
774
3.26k
            });
775
3.31k
}
776
777
59.8M
int64_t CloudTablet::get_cloud_base_compaction_score() const {
778
59.8M
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
779
26.4k
        bool has_delete = false;
780
26.4k
        int64_t point = cumulative_layer_point();
781
26.4k
        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
782
264k
        for (const auto& [_, rs_meta] : _tablet_meta->all_rs_metas()) {
783
264k
            if (rs_meta->start_version() >= point) {
784
238k
                continue;
785
238k
            }
786
26.4k
            if (rs_meta->has_delete_predicate()) {
787
0
                has_delete = true;
788
0
                break;
789
0
            }
790
26.4k
        }
791
26.4k
        if (!has_delete) {
792
26.4k
            return 0;
793
26.4k
        }
794
26.4k
    }
795
796
59.8M
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
797
59.8M
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
798
59.8M
}
799
800
535M
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
801
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
802
    //  number of tablet versions simultaneously.
803
535M
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
804
535M
}
805
806
// return a json string to show the compaction status of this tablet
807
3.08k
void CloudTablet::get_compaction_status(std::string* json_result) {
808
3.08k
    rapidjson::Document root;
809
3.08k
    root.SetObject();
810
811
3.08k
    rapidjson::Document path_arr;
812
3.08k
    path_arr.SetArray();
813
814
3.08k
    std::vector<RowsetSharedPtr> rowsets;
815
3.08k
    std::vector<RowsetSharedPtr> stale_rowsets;
816
3.08k
    {
817
3.08k
        std::shared_lock rdlock(_meta_lock);
818
3.08k
        rowsets.reserve(_rs_version_map.size());
819
11.3k
        for (auto& it : _rs_version_map) {
820
11.3k
            rowsets.push_back(it.second);
821
11.3k
        }
822
3.08k
        stale_rowsets.reserve(_stale_rs_version_map.size());
823
12.2k
        for (auto& it : _stale_rs_version_map) {
824
12.2k
            stale_rowsets.push_back(it.second);
825
12.2k
        }
826
3.08k
    }
827
3.08k
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
828
3.08k
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
829
830
    // get snapshot version path json_doc
831
3.08k
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
832
3.08k
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
833
3.08k
    rapidjson::Value cumu_value;
834
3.08k
    std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
835
3.08k
    cumu_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
836
3.08k
                         root.GetAllocator());
837
3.08k
    root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
838
3.08k
    rapidjson::Value base_value;
839
3.08k
    format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
840
3.08k
    base_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
841
3.08k
                         root.GetAllocator());
842
3.08k
    root.AddMember("last base failure time", base_value, root.GetAllocator());
843
3.08k
    rapidjson::Value full_value;
844
3.08k
    format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
845
3.08k
    full_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
846
3.08k
                         root.GetAllocator());
847
3.08k
    root.AddMember("last full failure time", full_value, root.GetAllocator());
848
3.08k
    rapidjson::Value cumu_success_value;
849
3.08k
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
850
3.08k
    cumu_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
851
3.08k
                                 root.GetAllocator());
852
3.08k
    root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
853
3.08k
    rapidjson::Value base_success_value;
854
3.08k
    format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
855
3.08k
    base_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
856
3.08k
                                 root.GetAllocator());
857
3.08k
    root.AddMember("last base success time", base_success_value, root.GetAllocator());
858
3.08k
    rapidjson::Value full_success_value;
859
3.08k
    format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
860
3.08k
    full_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
861
3.08k
                                 root.GetAllocator());
862
3.08k
    root.AddMember("last full success time", full_success_value, root.GetAllocator());
863
3.08k
    rapidjson::Value cumu_schedule_value;
864
3.08k
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
865
3.08k
    cumu_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
866
3.08k
                                  root.GetAllocator());
867
3.08k
    root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
868
3.08k
    rapidjson::Value base_schedule_value;
869
3.08k
    format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
870
3.08k
    base_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
871
3.08k
                                  root.GetAllocator());
872
3.08k
    root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
873
3.08k
    rapidjson::Value full_schedule_value;
874
3.08k
    format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
875
3.08k
    full_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
876
3.08k
                                  root.GetAllocator());
877
3.08k
    root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
878
3.08k
    rapidjson::Value cumu_compaction_status_value;
879
3.08k
    cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
880
3.08k
                                           cast_set<uint>(_last_cumu_compaction_status.length()),
881
3.08k
                                           root.GetAllocator());
882
3.08k
    root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
883
3.08k
    rapidjson::Value base_compaction_status_value;
884
3.08k
    base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
885
3.08k
                                           cast_set<uint>(_last_base_compaction_status.length()),
886
3.08k
                                           root.GetAllocator());
887
3.08k
    root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
888
3.08k
    rapidjson::Value full_compaction_status_value;
889
3.08k
    full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
890
3.08k
                                           cast_set<uint>(_last_full_compaction_status.length()),
891
3.08k
                                           root.GetAllocator());
892
3.08k
    root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());
893
3.08k
    rapidjson::Value exec_compaction_time;
894
3.08k
    std::string num_str {std::to_string(exec_compaction_time_us.load())};
895
3.08k
    exec_compaction_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
896
3.08k
                                   root.GetAllocator());
897
3.08k
    root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator());
898
3.08k
    rapidjson::Value local_read_time;
899
3.08k
    num_str = std::to_string(local_read_time_us.load());
900
3.08k
    local_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
901
3.08k
                              root.GetAllocator());
902
3.08k
    root.AddMember("compaction local read time us", local_read_time, root.GetAllocator());
903
3.08k
    rapidjson::Value remote_read_time;
904
3.08k
    num_str = std::to_string(remote_read_time_us.load());
905
3.08k
    remote_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
906
3.08k
                               root.GetAllocator());
907
3.08k
    root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator());
908
909
    // print all rowsets' version as an array
910
3.08k
    rapidjson::Document versions_arr;
911
3.08k
    rapidjson::Document missing_versions_arr;
912
3.08k
    versions_arr.SetArray();
913
3.08k
    missing_versions_arr.SetArray();
914
3.08k
    int64_t last_version = -1;
915
11.3k
    for (auto& rowset : rowsets) {
916
11.3k
        const Version& ver = rowset->version();
917
11.3k
        if (ver.first != last_version + 1) {
918
0
            rapidjson::Value miss_value;
919
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
920
0
                                 missing_versions_arr.GetAllocator());
921
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
922
0
        }
923
11.3k
        rapidjson::Value value;
924
11.3k
        std::string version_str = rowset->get_rowset_info_str();
925
11.3k
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
926
11.3k
                        versions_arr.GetAllocator());
927
11.3k
        versions_arr.PushBack(value, versions_arr.GetAllocator());
928
11.3k
        last_version = ver.second;
929
11.3k
    }
930
3.08k
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
931
3.08k
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
932
933
    // print all stale rowsets' version as an array
934
3.08k
    rapidjson::Document stale_versions_arr;
935
3.08k
    stale_versions_arr.SetArray();
936
12.2k
    for (auto& rowset : stale_rowsets) {
937
12.2k
        rapidjson::Value value;
938
12.2k
        std::string version_str = rowset->get_rowset_info_str();
939
12.2k
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
940
12.2k
                        stale_versions_arr.GetAllocator());
941
12.2k
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
942
12.2k
    }
943
3.08k
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
944
945
    // add stale version rowsets
946
3.08k
    root.AddMember("stale version path", path_arr, root.GetAllocator());
947
948
    // to json string
949
3.08k
    rapidjson::StringBuffer strbuf;
950
3.08k
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
951
3.08k
    root.Accept(writer);
952
3.08k
    *json_result = std::string(strbuf.GetString());
953
3.08k
}
954
955
385k
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
956
386k
    if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
957
386k
        _cumulative_point = new_point;
958
386k
        return;
959
386k
    }
960
    // cumulative point should only be reset to -1, or be increased
961
    // FIXME: could happen in currently unresolved race conditions
962
18.4E
    LOG(WARNING) << "Unexpected cumulative point: " << new_point
963
18.4E
                 << ", origin: " << _cumulative_point.load();
964
18.4E
}
965
966
Status CloudTablet::check_rowset_schema_for_build_index(std::vector<TColumn>& columns,
967
584
                                                        int schema_version) {
968
584
    std::map<std::string, TabletColumn> fe_col_map;
969
3.96k
    for (int i = 0; i < columns.size(); i++) {
970
3.37k
        fe_col_map[columns[i].column_name] = TabletColumn(columns[i]);
971
3.37k
    }
972
973
584
    std::shared_lock rlock(_meta_lock);
974
1.24k
    for (const auto& [version, rs] : _rs_version_map) {
975
1.24k
        if (version.first == 0) {
976
565
            continue;
977
565
        }
978
979
682
        if (rs->tablet_schema()->schema_version() >= schema_version) {
980
167
            continue;
981
167
        }
982
983
3.81k
        for (auto rs_col : rs->tablet_schema()->columns()) {
984
3.81k
            auto find_ret = fe_col_map.find(rs_col->name());
985
3.81k
            if (find_ret == fe_col_map.end()) {
986
7
                return Status::InternalError(
987
7
                        "check rowset meta failed:rowset's col is dropped in FE.");
988
7
            }
989
990
3.80k
            if (rs_col->unique_id() != find_ret->second.unique_id()) {
991
4
                return Status::InternalError("check rowset meta failed:col id not match.");
992
4
            }
993
994
3.80k
            if (rs_col->type() != find_ret->second.type()) {
995
1
                return Status::InternalError("check rowset meta failed:col type not match.");
996
1
            }
997
3.80k
        }
998
515
    }
999
1000
572
    return Status::OK();
1001
584
}
1002
1003
Result<RowsetSharedPtr> CloudTablet::pick_a_rowset_for_index_change(int schema_version,
1004
1.56k
                                                                    bool& is_base_rowset) {
1005
1.56k
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudTablet::pick_a_rowset_for_index_change",
1006
1.56k
                                      Result<RowsetSharedPtr>(nullptr));
1007
1.56k
    RowsetSharedPtr ret_rowset = nullptr;
1008
1.56k
    std::shared_lock rlock(_meta_lock);
1009
6.26k
    for (const auto& [version, rs] : _rs_version_map) {
1010
6.26k
        if (version.first == 0) {
1011
1.56k
            continue;
1012
1.56k
        }
1013
4.69k
        if (rs->num_rows() == 0) {
1014
18.4E
            VLOG_DEBUG << "[index_change]find empty rs, index change may "
1015
18.4E
                          "failed, id="
1016
18.4E
                       << rs->rowset_id().to_string();
1017
1.95k
        }
1018
1019
4.69k
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1020
2.18k
            VLOG_DEBUG << "[index_change] skip rowset " << rs->tablet_schema()->schema_version()
1021
0
                       << "," << schema_version;
1022
2.18k
            continue;
1023
2.18k
        }
1024
1025
2.51k
        if (ret_rowset == nullptr) {
1026
1.00k
            ret_rowset = rs;
1027
1.00k
            continue;
1028
1.00k
        }
1029
1030
1.51k
        if (rs->start_version() > ret_rowset->start_version()) {
1031
164
            ret_rowset = rs;
1032
164
        }
1033
1.51k
    }
1034
1035
1.56k
    if (ret_rowset != nullptr) {
1036
1.00k
        is_base_rowset = ret_rowset->version().first < _cumulative_point;
1037
1.00k
    }
1038
1039
1.56k
    return ret_rowset;
1040
1.56k
}
1041
1042
3.55k
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
1043
3.55k
    std::vector<RowsetSharedPtr> candidate_rowsets;
1044
3.55k
    {
1045
3.55k
        std::shared_lock rlock(_meta_lock);
1046
18.7k
        for (const auto& [version, rs] : _rs_version_map) {
1047
18.7k
            if (version.first != 0 && version.first < _cumulative_point &&
1048
18.7k
                (_alter_version == -1 || version.second <= _alter_version)) {
1049
10.0k
                candidate_rowsets.push_back(rs);
1050
10.0k
            }
1051
18.7k
        }
1052
3.55k
    }
1053
3.55k
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1054
3.55k
    return candidate_rowsets;
1055
3.55k
}
1056
1057
105
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
1058
105
    std::vector<RowsetSharedPtr> candidate_rowsets;
1059
105
    {
1060
105
        std::shared_lock rlock(_meta_lock);
1061
744
        for (auto& [v, rs] : _rs_version_map) {
1062
            // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
1063
744
            if (v.first != 0) {
1064
639
                candidate_rowsets.push_back(rs);
1065
639
            }
1066
744
        }
1067
105
    }
1068
105
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1069
105
    return candidate_rowsets;
1070
105
}
1071
1072
68
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
1073
68
    return _engine.calc_delete_bitmap_executor();
1074
68
}
1075
1076
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
1077
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
1078
                                       const RowsetIdUnorderedSet& cur_rowset_ids, int64_t lock_id,
1079
54.5k
                                       int64_t next_visible_version) {
1080
54.5k
    RowsetSharedPtr rowset = txn_info->rowset;
1081
54.5k
    int64_t cur_version = rowset->start_version();
1082
    // update delete bitmap info, in order to avoid recalculation when trying again
1083
54.5k
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1084
54.5k
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
1085
1086
54.8k
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
1087
54.5k
        rowset_writer->num_rows() > 0) {
1088
22
        DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
1089
22
            return Status::InternalError<false>("injected update_tmp_rowset error.");
1090
22
        });
1091
22
        const auto& rowset_meta = rowset->rowset_meta();
1092
22
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
1093
22
    }
1094
1095
54.5k
    RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id,
1096
54.5k
                                             next_visible_version, rowset));
1097
1098
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
1099
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
1100
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
1101
54.5k
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1102
54.5k
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
1103
54.5k
            txn_info->publish_info));
1104
1105
54.5k
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
1106
54.5k
        auto sleep_sec = dp->param<int>("sleep", 5);
1107
54.5k
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1108
54.5k
    });
1109
1110
54.5k
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
1111
54.5k
        auto retry = dp->param<bool>("retry", false);
1112
54.5k
        auto sleep_sec = dp->param<int>("sleep", 0);
1113
54.5k
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1114
54.5k
        if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
1115
54.5k
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1116
54.5k
                    "injected DELETE_BITMAP_LOCK_ERROR");
1117
54.5k
        } else {
1118
54.5k
            return Status::InternalError<false>("injected non-retryable error");
1119
54.5k
        }
1120
54.5k
    });
1121
1122
54.5k
    return Status::OK();
1123
54.5k
}
1124
1125
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
1126
                                             DeleteBitmapPtr delete_bitmap, int64_t lock_id,
1127
54.6k
                                             int64_t next_visible_version, RowsetSharedPtr rowset) {
1128
54.6k
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1129
54.6k
    for (auto iter = delete_bitmap->delete_bitmap.begin();
1130
308k
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
1131
        // skip sentinel mark, which is used for delete bitmap correctness check
1132
253k
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
1133
10.1k
            new_delete_bitmap->merge(
1134
10.1k
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
1135
10.1k
                    iter->second);
1136
10.1k
        }
1137
253k
    }
1138
    // lock_id != -1 means this is in an explict txn
1139
54.6k
    bool is_explicit_txn = (lock_id != -1);
1140
54.6k
    auto ms_lock_id = !is_explicit_txn ? txn_id : lock_id;
1141
54.6k
    std::optional<StorageResource> storage_resource;
1142
54.6k
    auto storage_resource_result = rowset->rowset_meta()->remote_storage_resource();
1143
54.6k
    if (storage_resource_result) {
1144
54.5k
        storage_resource = *storage_resource_result.value();
1145
54.5k
    }
1146
54.6k
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
1147
54.6k
            *this, ms_lock_id, LOAD_INITIATOR_ID, new_delete_bitmap.get(), new_delete_bitmap.get(),
1148
54.6k
            rowset->rowset_id().to_string(), storage_resource,
1149
54.6k
            config::delete_bitmap_store_write_version, txn_id, is_explicit_txn,
1150
54.6k
            next_visible_version));
1151
54.6k
    return Status::OK();
1152
54.6k
}
1153
1154
0
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
1155
0
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
1156
1157
    // sort the existing versions in ascending order
1158
0
    std::sort(existing_versions.begin(), existing_versions.end(),
1159
0
              [](const Version& a, const Version& b) {
1160
                  // simple because 2 versions are certainly not overlapping
1161
0
                  return a.first < b.first;
1162
0
              });
1163
1164
    // From the first version(=0), find the missing version until spec_version
1165
0
    int64_t last_version = -1;
1166
0
    Versions missed_versions;
1167
0
    for (const Version& version : existing_versions) {
1168
0
        if (version.first > last_version + 1) {
1169
            // there is a hole between versions
1170
0
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
1171
0
        }
1172
0
        last_version = version.second;
1173
0
        if (last_version >= spec_version) {
1174
0
            break;
1175
0
        }
1176
0
    }
1177
0
    if (last_version < spec_version) {
1178
        // there is a hole between the last version and the specificed version.
1179
0
        missed_versions.emplace_back(last_version + 1, spec_version);
1180
0
    }
1181
0
    return missed_versions;
1182
0
}
1183
1184
Status CloudTablet::calc_delete_bitmap_for_compaction(
1185
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
1186
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
1187
        int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
1188
3.93k
        bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) {
1189
3.93k
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1190
3.93k
    std::unique_ptr<RowLocationSet> missed_rows;
1191
3.93k
    if ((config::enable_missing_rows_correctness_check ||
1192
3.93k
         config::enable_mow_compaction_correctness_check_core ||
1193
3.93k
         config::enable_mow_compaction_correctness_check_fail) &&
1194
3.93k
        !allow_delete_in_cumu_compaction &&
1195
3.93k
        (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1196
0
         !config::enable_prune_delete_sign_when_base_compaction)) {
1197
        // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false
1198
0
        missed_rows = std::make_unique<RowLocationSet>();
1199
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
1200
0
    }
1201
1202
3.93k
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
1203
3.93k
    if (config::enable_rowid_conversion_correctness_check &&
1204
3.93k
        tablet_schema()->cluster_key_uids().empty()) {
1205
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
1206
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
1207
0
    }
1208
1209
    // 1. calc delete bitmap for historical data
1210
3.93k
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1211
3.93k
    Version version = max_version();
1212
3.93k
    std::size_t missed_rows_size = 0;
1213
3.93k
    calc_compaction_output_rowset_delete_bitmap(
1214
3.93k
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
1215
3.93k
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1216
3.93k
    if (missed_rows) {
1217
0
        missed_rows_size = missed_rows->size();
1218
0
        if (!allow_delete_in_cumu_compaction) {
1219
0
            if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1220
0
                 !config::enable_prune_delete_sign_when_base_compaction) &&
1221
0
                tablet_state() == TABLET_RUNNING) {
1222
0
                if (merged_rows + filtered_rows >= 0 &&
1223
0
                    merged_rows + filtered_rows != missed_rows_size) {
1224
0
                    std::string err_msg = fmt::format(
1225
0
                            "cumulative compaction: the merged rows({}), the filtered rows({}) is "
1226
0
                            "not equal to missed rows({}) in rowid conversion, tablet_id: {}, "
1227
0
                            "table_id:{}",
1228
0
                            merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id());
1229
0
                    LOG(WARNING) << err_msg;
1230
0
                    if (config::enable_mow_compaction_correctness_check_core) {
1231
0
                        CHECK(false) << err_msg;
1232
0
                    } else if (config::enable_mow_compaction_correctness_check_fail) {
1233
0
                        return Status::InternalError<false>(err_msg);
1234
0
                    } else {
1235
0
                        DCHECK(false) << err_msg;
1236
0
                    }
1237
0
                }
1238
0
            }
1239
0
        }
1240
0
    }
1241
3.93k
    if (location_map) {
1242
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1243
0
        location_map->clear();
1244
0
    }
1245
1246
    // 2. calc delete bitmap for incremental data
1247
3.93k
    int64_t t1 = MonotonicMicros();
1248
3.93k
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
1249
3.93k
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
1250
3.93k
    int64_t t2 = MonotonicMicros();
1251
3.93k
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
1252
3.93k
        g_cu_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1253
3.93k
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
1254
8
        g_base_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1255
8
    }
1256
3.93k
    get_delete_bitmap_lock_start_time = t2;
1257
3.93k
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1258
3.93k
    int64_t t3 = MonotonicMicros();
1259
1260
3.93k
    calc_compaction_output_rowset_delete_bitmap(
1261
3.93k
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
1262
3.93k
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1263
3.93k
    int64_t t4 = MonotonicMicros();
1264
3.93k
    if (location_map) {
1265
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1266
0
    }
1267
3.93k
    int64_t t5 = MonotonicMicros();
1268
1269
    // 3. store delete bitmap
1270
3.93k
    DeleteBitmapPtr delete_bitmap_v2 = nullptr;
1271
3.93k
    auto delete_bitmap_size = output_rowset_delete_bitmap->delete_bitmap.size();
1272
3.93k
    auto store_version = config::delete_bitmap_store_write_version;
1273
3.94k
    if (store_version == 2 || store_version == 3) {
1274
0
        delete_bitmap_v2 = std::make_shared<DeleteBitmap>(*output_rowset_delete_bitmap);
1275
0
        std::vector<std::pair<RowsetId, int64_t>> retained_rowsets_to_seg_num;
1276
0
        {
1277
0
            std::shared_lock rlock(get_header_lock());
1278
0
            for (const auto& [rowset_version, rowset_ptr] : rowset_map()) {
1279
0
                if (rowset_version.second < output_rowset->start_version()) {
1280
0
                    retained_rowsets_to_seg_num.emplace_back(
1281
0
                            std::make_pair(rowset_ptr->rowset_id(), rowset_ptr->num_segments()));
1282
0
                }
1283
0
            }
1284
0
        }
1285
0
        if (config::enable_agg_delta_delete_bitmap_for_store_v2) {
1286
0
            tablet_meta()->delete_bitmap().subset_and_agg(
1287
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1288
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1289
0
        } else {
1290
0
            tablet_meta()->delete_bitmap().subset(
1291
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1292
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1293
0
        }
1294
0
    }
1295
3.93k
    std::optional<StorageResource> storage_resource;
1296
3.93k
    auto storage_resource_result = output_rowset->rowset_meta()->remote_storage_resource();
1297
3.94k
    if (storage_resource_result) {
1298
3.94k
        storage_resource = *storage_resource_result.value();
1299
3.94k
    }
1300
3.93k
    auto st = _engine.meta_mgr().update_delete_bitmap(
1301
3.93k
            *this, -1, initiator, output_rowset_delete_bitmap.get(), delete_bitmap_v2.get(),
1302
3.93k
            output_rowset->rowset_id().to_string(), storage_resource, store_version);
1303
3.93k
    int64_t t6 = MonotonicMicros();
1304
3.93k
    LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
1305
3.93k
              << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
1306
3.93k
              << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
1307
3.93k
              << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
1308
3.93k
              << " us, st=" << st.to_string() << ". store_version=" << store_version
1309
3.93k
              << ", calculated delete bitmap size=" << delete_bitmap_size
1310
3.93k
              << ", update delete bitmap size="
1311
3.93k
              << output_rowset_delete_bitmap->delete_bitmap.size();
1312
3.93k
    return st;
1313
3.93k
}
1314
1315
void CloudTablet::agg_delete_bitmap_for_compaction(
1316
        int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
1317
        DeleteBitmapPtr& new_delete_bitmap,
1318
3.82k
        std::map<std::string, int64_t>& pre_rowset_to_versions) {
1319
10.0k
    for (auto& rowset : pre_rowsets) {
1320
15.2k
        for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
1321
5.27k
            auto d = tablet_meta()->delete_bitmap().get_agg_without_cache(
1322
5.27k
                    {rowset->rowset_id(), seg_id, end_version}, start_version);
1323
5.27k
            if (d->isEmpty()) {
1324
4.03k
                continue;
1325
4.03k
            }
1326
1.24k
            VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
1327
0
                       << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
1328
0
                       << ", rowset_version=" << rowset->version().to_string()
1329
0
                       << ". compaction start_version=" << start_version
1330
0
                       << ", end_version=" << end_version
1331
0
                       << ". delete_bitmap cardinality=" << d->cardinality();
1332
1.24k
            DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
1333
1.24k
            new_delete_bitmap->set(end_key, *d);
1334
1.24k
            pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
1335
1.24k
        }
1336
10.0k
    }
1337
3.82k
}
1338
1339
44.6k
Status CloudTablet::sync_meta() {
1340
44.6k
    if (!config::enable_file_cache) {
1341
1
        return Status::OK();
1342
1
    }
1343
1344
44.6k
    TabletMetaSharedPtr tablet_meta;
1345
44.6k
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
1346
44.6k
    if (!st.ok()) {
1347
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
1348
0
            clear_cache();
1349
0
        }
1350
0
        return st;
1351
0
    }
1352
1353
44.6k
    auto new_compaction_policy = tablet_meta->compaction_policy();
1354
44.6k
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
1355
1
        _tablet_meta->set_compaction_policy(new_compaction_policy);
1356
1
    }
1357
44.6k
    auto new_time_series_compaction_goal_size_mbytes =
1358
44.6k
            tablet_meta->time_series_compaction_goal_size_mbytes();
1359
44.6k
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
1360
44.6k
        new_time_series_compaction_goal_size_mbytes) {
1361
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
1362
0
                new_time_series_compaction_goal_size_mbytes);
1363
0
    }
1364
44.6k
    auto new_time_series_compaction_file_count_threshold =
1365
44.6k
            tablet_meta->time_series_compaction_file_count_threshold();
1366
44.6k
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
1367
44.6k
        new_time_series_compaction_file_count_threshold) {
1368
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
1369
0
                new_time_series_compaction_file_count_threshold);
1370
0
    }
1371
44.6k
    auto new_time_series_compaction_time_threshold_seconds =
1372
44.6k
            tablet_meta->time_series_compaction_time_threshold_seconds();
1373
44.6k
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
1374
44.6k
        new_time_series_compaction_time_threshold_seconds) {
1375
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
1376
0
                new_time_series_compaction_time_threshold_seconds);
1377
0
    }
1378
44.6k
    auto new_time_series_compaction_empty_rowsets_threshold =
1379
44.6k
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
1380
44.6k
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
1381
44.6k
        new_time_series_compaction_empty_rowsets_threshold) {
1382
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
1383
0
                new_time_series_compaction_empty_rowsets_threshold);
1384
0
    }
1385
44.6k
    auto new_time_series_compaction_level_threshold =
1386
44.6k
            tablet_meta->time_series_compaction_level_threshold();
1387
44.6k
    if (_tablet_meta->time_series_compaction_level_threshold() !=
1388
44.6k
        new_time_series_compaction_level_threshold) {
1389
0
        _tablet_meta->set_time_series_compaction_level_threshold(
1390
0
                new_time_series_compaction_level_threshold);
1391
0
    }
1392
    // Sync disable_auto_compaction (stored in tablet_schema)
1393
44.6k
    auto new_disable_auto_compaction = tablet_meta->tablet_schema()->disable_auto_compaction();
1394
44.6k
    if (_tablet_meta->tablet_schema()->disable_auto_compaction() != new_disable_auto_compaction) {
1395
5
        _tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction(
1396
5
                new_disable_auto_compaction);
1397
5
    }
1398
    // Sync vertical_compaction_num_columns_per_group
1399
44.6k
    auto new_vertical_compaction_num_columns_per_group =
1400
44.6k
            tablet_meta->vertical_compaction_num_columns_per_group();
1401
44.6k
    if (_tablet_meta->vertical_compaction_num_columns_per_group() !=
1402
44.6k
        new_vertical_compaction_num_columns_per_group) {
1403
0
        _tablet_meta->set_vertical_compaction_num_columns_per_group(
1404
0
                new_vertical_compaction_num_columns_per_group);
1405
0
    }
1406
1407
44.6k
    return Status::OK();
1408
44.6k
}
1409
1410
4.80M
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
1411
4.80M
    std::shared_lock rdlock(_meta_lock);
1412
4.80M
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
1413
4.80M
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
1414
    // Currently, this information will not be used by the cloud report,
1415
    // but it may be used in the future.
1416
4.80M
}
1417
1418
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
1419
54.9k
                                              DeleteBitmap* expected_delete_bitmap) {
1420
54.9k
    DeleteBitmapPtr cached_delete_bitmap;
1421
54.9k
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1422
54.9k
    Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
1423
54.9k
            txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
1424
54.9k
    if (st.ok()) {
1425
54.8k
        bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
1426
54.8k
        auto msg = fmt::format(
1427
54.8k
                "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
1428
54.8k
                "txn_id={}, tablet_id={}",
1429
54.8k
                expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
1430
54.8k
                tablet_id());
1431
54.8k
        if (!res) {
1432
0
            DCHECK(res) << msg;
1433
0
            return Status::InternalError<false>(msg);
1434
0
        }
1435
54.8k
    }
1436
54.9k
    return Status::OK();
1437
54.9k
}
1438
1439
34
WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
1440
34
    std::shared_lock rlock(_meta_lock);
1441
34
    if (!_rowset_warm_up_states.contains(rowset_id)) {
1442
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1443
1
    }
1444
33
    auto& warmup_info = _rowset_warm_up_states[rowset_id];
1445
33
    warmup_info.update_state();
1446
33
    return warmup_info.state;
1447
34
}
1448
1449
bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
1450
34
                                          std::chrono::steady_clock::time_point start_tp) {
1451
34
    std::lock_guard wlock(_meta_lock);
1452
34
    return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
1453
34
}
1454
1455
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
1456
5
                                                              RowsetId rowset_id, int64_t delta) {
1457
5
    std::lock_guard wlock(_meta_lock);
1458
5
    return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
1459
5
}
1460
1461
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
1462
                                                                       RowsetId rowset_id,
1463
5
                                                                       int64_t delta) {
1464
5
    auto it = _rowset_warm_up_states.find(rowset_id);
1465
5
    if (it == _rowset_warm_up_states.end()) {
1466
0
        return false;
1467
0
    }
1468
5
    if (it->second.state.trigger_source != source) {
1469
        // Only the same trigger source can update the state
1470
2
        return false;
1471
2
    }
1472
3
    it->second.num_inverted_idx += delta;
1473
3
    return true;
1474
5
}
1475
1476
bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
1477
                                                   WarmUpTriggerSource source,
1478
34
                                                   std::chrono::steady_clock::time_point start_tp) {
1479
34
    auto rowset_id = rowset.rowset_id();
1480
1481
    // Check if rowset already has warmup state
1482
34
    if (_rowset_warm_up_states.contains(rowset_id)) {
1483
10
        auto existing_state = _rowset_warm_up_states[rowset_id].state;
1484
1485
        // For job-triggered warmup (one-time and periodic warmup), allow it to proceed
1486
        // except when there's already another job-triggered warmup in progress
1487
10
        if (source == WarmUpTriggerSource::JOB) {
1488
5
            if (existing_state.trigger_source == WarmUpTriggerSource::JOB &&
1489
5
                existing_state.progress == WarmUpProgress::DOING) {
1490
                // Same job type already in progress, skip to avoid duplicate warmup
1491
1
                return false;
1492
1
            }
1493
5
        } else {
1494
            // For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists
1495
5
            return false;
1496
5
        }
1497
10
    }
1498
1499
28
    if (source == WarmUpTriggerSource::JOB) {
1500
9
        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
1501
19
    } else if (source == WarmUpTriggerSource::SYNC_ROWSET) {
1502
6
        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
1503
13
    } else if (source == WarmUpTriggerSource::EVENT_DRIVEN) {
1504
13
        g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1;
1505
13
    }
1506
28
    _rowset_warm_up_states[rowset_id] = {
1507
28
            .state = {.trigger_source = source,
1508
28
                      .progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE
1509
28
                                                              : WarmUpProgress::DOING)},
1510
28
            .num_segments = rowset.num_segments(),
1511
28
            .start_tp = start_tp};
1512
28
    return true;
1513
34
}
1514
1515
51
void CloudTablet::RowsetWarmUpInfo::update_state() {
1516
51
    if (has_finished()) {
1517
14
        g_file_cache_warm_up_rowset_complete_num << 1;
1518
14
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1519
14
                            std::chrono::steady_clock::now() - start_tp)
1520
14
                            .count();
1521
14
        g_file_cache_warm_up_rowset_all_segments_latency << cost;
1522
14
        state.progress = WarmUpProgress::DONE;
1523
14
    }
1524
51
}
1525
1526
WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
1527
                                                        RowsetId rowset_id, Status status,
1528
                                                        int64_t segment_num,
1529
21
                                                        int64_t inverted_idx_num) {
1530
21
    std::lock_guard wlock(_meta_lock);
1531
21
    auto it = _rowset_warm_up_states.find(rowset_id);
1532
21
    if (it == _rowset_warm_up_states.end()) {
1533
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1534
1
    }
1535
20
    auto& warmup_info = it->second;
1536
20
    if (warmup_info.state.trigger_source != trigger_source) {
1537
        // Only the same trigger source can update the state
1538
2
        return warmup_info.state;
1539
2
    }
1540
18
    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status;
1541
18
    if (segment_num > 0) {
1542
16
        g_file_cache_warm_up_segment_complete_num << segment_num;
1543
16
        if (!status.ok()) {
1544
1
            g_file_cache_warm_up_segment_failed_num << segment_num;
1545
1
        }
1546
16
    }
1547
18
    if (inverted_idx_num > 0) {
1548
2
        g_file_cache_warm_up_inverted_idx_complete_num << inverted_idx_num;
1549
2
        if (!status.ok()) {
1550
0
            g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num;
1551
0
        }
1552
2
    }
1553
18
    warmup_info.done(segment_num, inverted_idx_num);
1554
18
    return warmup_info.state;
1555
20
}
1556
1557
244
bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
1558
244
    auto it = _rowset_warm_up_states.find(rowset_id);
1559
244
    if (it == _rowset_warm_up_states.end()) {
1560
102
        return false;
1561
102
    }
1562
142
    return it->second.state.progress == WarmUpProgress::DONE;
1563
244
}
1564
1565
598
void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
1566
598
    _rowset_warm_up_states[rowset_id] = {
1567
598
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1568
598
                      .progress = WarmUpProgress::DONE},
1569
598
            .num_segments = 1,
1570
598
            .start_tp = std::chrono::steady_clock::now()};
1571
598
}
1572
1573
bool CloudTablet::_check_rowset_should_be_visible_but_not_warmed_up(
1574
        const RowsetMetaSharedPtr& rs_meta, int64_t path_max_version,
1575
343
        std::chrono::system_clock::time_point freshness_limit_tp) const {
1576
343
    if (rs_meta->version() == Version {0, 1}) {
1577
        // skip rowset[0-1]
1578
18
        return false;
1579
18
    }
1580
325
    bool ret = rs_meta->start_version() > path_max_version &&
1581
325
               rs_meta->visible_timestamp() < freshness_limit_tp;
1582
325
    if (ret && config::read_cluster_cache_opt_verbose_log) {
1583
5
        using namespace std::chrono;
1584
5
        std::time_t t1 = system_clock::to_time_t(rs_meta->visible_timestamp());
1585
5
        std::tm tm1 = *std::localtime(&t1);
1586
5
        std::ostringstream oss1;
1587
5
        oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S");
1588
1589
5
        std::time_t t2 = system_clock::to_time_t(freshness_limit_tp);
1590
5
        std::tm tm2 = *std::localtime(&t2);
1591
5
        std::ostringstream oss2;
1592
5
        oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S");
1593
5
        LOG_INFO(
1594
5
                "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, "
1595
5
                "find a rowset which should be visible but not warmed up, tablet_id={}, "
1596
5
                "path_max_version={}, rowset_id={}, version={}, visible_time={}, "
1597
5
                "freshness_limit={}, version_graph={}, rowset_warmup_digest={}",
1598
5
                tablet_id(), path_max_version, rs_meta->rowset_id().to_string(),
1599
5
                rs_meta->version().to_string(), oss1.str(), oss2.str(),
1600
5
                _timestamped_version_tracker.debug_string(), rowset_warmup_digest());
1601
5
    }
1602
325
    return ret;
1603
343
}
1604
1605
void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs,
1606
                                                const StorageResource* storage_resource, int seg_id,
1607
1608
0
                                                int64_t expiration_time) {
1609
    // clang-format off
1610
0
    const auto& rowset_meta = rs->rowset_meta();
1611
0
    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
1612
    // Use rowset_meta->fs() instead of storage_resource->fs to support packed file.
1613
    // RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
1614
    // packed_slice_locations is not empty, which correctly maps segment file paths
1615
    // to their actual locations within packed files.
1616
0
    auto file_system = rowset_meta->fs();
1617
0
    if (!file_system) {
1618
0
        LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
1619
0
                     << ", rowset_id=" << rowset_meta->rowset_id();
1620
0
        return;
1621
0
    }
1622
0
    _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
1623
0
            .path = storage_resource->remote_segment_path(*rowset_meta, seg_id),
1624
0
            .file_size = rs->rowset_meta()->segment_file_size(seg_id),
1625
0
            .file_system = file_system,
1626
0
            .ctx = {
1627
0
                    .expiration_time = expiration_time,
1628
0
                    .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1629
0
                    .is_warmup = true
1630
0
            },
1631
0
            .download_done {[=](Status st) {
1632
0
                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
1633
0
                            if (rs->version().second > rs->version().first) {
1634
0
                                auto sleep_time = dp->param<int>("sleep", 3);
1635
0
                                LOG_INFO(
1636
0
                                        "[verbose] block download for rowset={}, "
1637
0
                                        "version={}, sleep={}",
1638
0
                                        rs->rowset_id().to_string(),
1639
0
                                        rs->version().to_string(), sleep_time);
1640
0
                                std::this_thread::sleep_for(
1641
0
                                        std::chrono::seconds(sleep_time));
1642
0
                            }
1643
0
                });
1644
0
                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
1645
0
                if (!st) {
1646
0
                    LOG_WARNING("add rowset warm up error ").error(st);
1647
0
                }
1648
0
            }},
1649
0
    });
1650
    // clang-format on
1651
0
}
1652
1653
void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs,
1654
                                                       const StorageResource* storage_resource,
1655
                                                       const io::Path& idx_path, int64_t idx_size,
1656
0
                                                       int64_t expiration_time) {
1657
    // clang-format off
1658
0
    const auto& rowset_meta = rs->rowset_meta();
1659
0
    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
1660
    // Use rowset_meta->fs() instead of storage_resource->fs to support packed file for idx files.
1661
0
    auto file_system = rowset_meta->fs();
1662
0
    if (!file_system) {
1663
0
        LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
1664
0
                     << ", rowset_id=" << rowset_meta->rowset_id();
1665
0
        return;
1666
0
    }
1667
0
    io::DownloadFileMeta meta {
1668
0
            .path = idx_path,
1669
0
            .file_size = idx_size,
1670
0
            .file_system = file_system,
1671
0
            .ctx = {
1672
0
                    .expiration_time = expiration_time,
1673
0
                    .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1674
0
                    .is_warmup = true
1675
0
            },
1676
0
            .download_done {[=](Status st) {
1677
0
                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_idx.callback.block", {
1678
0
                    auto sleep_time = dp->param<int>("sleep", 3);
1679
0
                    LOG_INFO(
1680
0
                            "[verbose] block download for "
1681
0
                            "rowset={}, inverted_idx_file={}, "
1682
0
                            "sleep={}",
1683
0
                            rs->rowset_id().to_string(), idx_path.string(), sleep_time);
1684
0
                    std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
1685
0
                });
1686
0
                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1);
1687
0
                if (!st) {
1688
0
                    LOG_WARNING("add rowset warm up error ").error(st);
1689
0
                }
1690
0
            }},
1691
0
    };
1692
0
    self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
1693
0
    _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
1694
0
    g_file_cache_cloud_tablet_submitted_index_num << 1;
1695
0
    g_file_cache_cloud_tablet_submitted_index_size << idx_size;
1696
    // clang-format on
1697
0
}
1698
1699
void CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets,
1700
290k
                                        bool warmup_delta_data) {
1701
#ifdef BE_TEST
1702
    warmup_delta_data = false;
1703
#endif
1704
349k
    for (auto& rs : rowsets) {
1705
349k
        if (warmup_delta_data) {
1706
            // Pre-set encryption algorithm to avoid re-entrant get_tablet() call
1707
            // inside RowsetMeta::fs() which causes SingleFlight deadlock when the
1708
            // tablet is not yet cached (during initial load_tablet).
1709
0
            rs->rowset_meta()->set_encryption_algorithm(_tablet_meta->encryption_algorithm());
1710
0
            bool warm_up_state_updated = false;
1711
            // Warmup rowset data in background
1712
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
1713
0
                const auto& rowset_meta = rs->rowset_meta();
1714
0
                constexpr int64_t interval = 600; // 10 mins
1715
                // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
1716
                // So we need to filter out the old rowsets avoid to download the whole table.
1717
0
                if (warmup_delta_data &&
1718
0
                    ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
1719
0
                    continue;
1720
0
                }
1721
1722
0
                auto storage_resource = rowset_meta->remote_storage_resource();
1723
0
                if (!storage_resource) {
1724
0
                    LOG(WARNING) << storage_resource.error();
1725
0
                    continue;
1726
0
                }
1727
1728
0
                int64_t expiration_time = _tablet_meta->ttl_seconds();
1729
0
                g_file_cache_cloud_tablet_submitted_segment_num << 1;
1730
0
                if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
1731
0
                    g_file_cache_cloud_tablet_submitted_segment_size
1732
0
                            << rs->rowset_meta()->segment_file_size(seg_id);
1733
0
                }
1734
0
                if (!warm_up_state_updated) {
1735
0
                    VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
1736
0
                               << ") triggerd by sync rowset";
1737
0
                    if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
1738
0
                                                          WarmUpTriggerSource::SYNC_ROWSET)) {
1739
0
                        LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id()
1740
0
                                  << ", skip it";
1741
0
                        break;
1742
0
                    }
1743
0
                    warm_up_state_updated = true;
1744
0
                }
1745
1746
0
                if (!config::file_cache_enable_only_warm_up_idx) {
1747
0
                    _submit_segment_download_task(rs, storage_resource.value(), seg_id,
1748
0
                                                  expiration_time);
1749
0
                }
1750
1751
0
                auto schema_ptr = rowset_meta->tablet_schema();
1752
0
                auto idx_version = schema_ptr->get_inverted_index_storage_format();
1753
0
                if (idx_version == InvertedIndexStorageFormatPB::V1) {
1754
0
                    std::unordered_map<int64_t, int64_t> index_size_map;
1755
0
                    auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
1756
0
                    for (const auto& info : inverted_index_info.index_info()) {
1757
0
                        if (info.index_file_size() != -1) {
1758
0
                            index_size_map[info.index_id()] = info.index_file_size();
1759
0
                        } else {
1760
0
                            VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
1761
0
                                       << ", index_id " << info.index_id();
1762
0
                        }
1763
0
                    }
1764
0
                    for (const auto& index : schema_ptr->inverted_indexes()) {
1765
0
                        auto idx_path = storage_resource.value()->remote_idx_v1_path(
1766
0
                                *rowset_meta, seg_id, index->index_id(), index->get_index_suffix());
1767
0
                        _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path,
1768
0
                                                             index_size_map[index->index_id()],
1769
0
                                                             expiration_time);
1770
0
                    }
1771
0
                } else {
1772
0
                    if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
1773
0
                        auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
1774
0
                        int64_t idx_size = 0;
1775
0
                        if (inverted_index_info.has_index_size()) {
1776
0
                            idx_size = inverted_index_info.index_size();
1777
0
                        } else {
1778
0
                            VLOG_DEBUG << "index_size is not set for segment " << seg_id;
1779
0
                        }
1780
0
                        auto idx_path =
1781
0
                                storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
1782
0
                        _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path,
1783
0
                                                             idx_size, expiration_time);
1784
0
                    }
1785
0
                }
1786
0
            }
1787
0
        }
1788
349k
        _rs_version_map.emplace(rs->version(), rs);
1789
349k
        _timestamped_version_tracker.add_version(rs->version());
1790
349k
        _max_version = std::max(rs->end_version(), _max_version);
1791
349k
        update_base_size(*rs);
1792
349k
    }
1793
290k
    _tablet_meta->add_rowsets_unchecked(rowsets);
1794
290k
}
1795
1796
#include "common/compile_check_end.h"
1797
} // namespace doris