Coverage Report

Created: 2026-04-10 15:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/cloud/cloud_tablet.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/latency_recorder.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/rapidjson.h>
28
#include <rapidjson/stringbuffer.h>
29
30
#include <algorithm>
31
#include <atomic>
32
#include <chrono>
33
#include <cstdint>
34
#include <memory>
35
#include <ranges>
36
#include <ratio>
37
#include <shared_mutex>
38
#include <unordered_map>
39
#include <vector>
40
41
#include "cloud/cloud_meta_mgr.h"
42
#include "cloud/cloud_storage_engine.h"
43
#include "cloud/cloud_tablet_mgr.h"
44
#include "cloud/cloud_warm_up_manager.h"
45
#include "cloud/config.h"
46
#include "common/cast_set.h"
47
#include "common/config.h"
48
#include "common/logging.h"
49
#include "cpp/sync_point.h"
50
#include "io/cache/block_file_cache_downloader.h"
51
#include "io/cache/block_file_cache_factory.h"
52
#include "storage/compaction/compaction.h"
53
#include "storage/compaction/cumulative_compaction_time_series_policy.h"
54
#include "storage/index/inverted/inverted_index_desc.h"
55
#include "storage/olap_define.h"
56
#include "storage/rowset/beta_rowset.h"
57
#include "storage/rowset/rowset.h"
58
#include "storage/rowset/rowset_factory.h"
59
#include "storage/rowset/rowset_fwd.h"
60
#include "storage/rowset/rowset_writer.h"
61
#include "storage/storage_policy.h"
62
#include "storage/tablet/base_tablet.h"
63
#include "storage/tablet/tablet_schema.h"
64
#include "storage/txn/txn_manager.h"
65
#include "util/debug_points.h"
66
#include "util/stack_util.h"
67
68
namespace doris {
69
#include "common/compile_check_begin.h"
70
using namespace ErrorCode;
71
72
bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms(
73
        "cu_compaction_get_delete_bitmap_lock_time_ms");
74
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
75
        "base_compaction_get_delete_bitmap_lock_time_ms");
76
77
bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
78
bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");
79
80
bvar::Adder<int64_t> g_capture_prefer_cache_count("capture_prefer_cache_count");
81
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count(
82
        "capture_with_freshness_tolerance_count");
83
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count(
84
        "capture_with_freshness_tolerance_fallback_count");
85
bvar::Adder<int64_t> g_rowset_warmup_state_missing_count("rowset_warmup_state_missing_count");
86
bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window(
87
        "capture_prefer_cache_count_window", &g_capture_prefer_cache_count, 30);
88
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_count_window(
89
        "capture_with_freshness_tolerance_count_window", &g_capture_with_freshness_tolerance_count,
90
        30);
91
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_count_window(
92
        "capture_with_freshness_tolerance_fallback_count_window",
93
        &g_capture_with_freshness_tolerance_fallback_count, 30);
94
95
static constexpr int LOAD_INITIATOR_ID = -1;
96
97
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
98
        "file_cache_cloud_tablet_submitted_segment_size");
99
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
100
        "file_cache_cloud_tablet_submitted_segment_num");
101
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
102
        "file_cache_cloud_tablet_submitted_index_size");
103
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
104
        "file_cache_cloud_tablet_submitted_index_num");
105
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
106
        "file_cache_cloud_tablet_finished_segment_size");
107
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
108
        "file_cache_cloud_tablet_finished_segment_num");
109
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
110
        "file_cache_cloud_tablet_finished_index_size");
111
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
112
        "file_cache_cloud_tablet_finished_index_num");
113
114
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
115
        "file_cache_recycle_cached_data_segment_num");
116
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
117
        "file_cache_recycle_cached_data_segment_size");
118
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
119
        "file_cache_recycle_cached_data_index_num");
120
121
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
122
        "file_cache_warm_up_segment_complete_num");
123
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
124
        "file_cache_warm_up_segment_failed_num");
125
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_complete_num(
126
        "file_cache_warm_up_inverted_idx_complete_num");
127
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_failed_num(
128
        "file_cache_warm_up_inverted_idx_failed_num");
129
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
130
        "file_cache_warm_up_rowset_complete_num");
131
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
132
        "file_cache_warm_up_rowset_triggered_by_job_num");
133
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
134
        "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
135
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num(
136
        "file_cache_warm_up_rowset_triggered_by_event_driven_num");
137
bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency(
138
        "file_cache_warm_up_rowset_all_segments_latency");
139
140
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
141
142
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
142
143
142
CloudTablet::~CloudTablet() = default;
144
145
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
146
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
147
0
}
148
149
19
std::string CloudTablet::tablet_path() const {
150
19
    return "";
151
19
}
152
153
Status CloudTablet::capture_rs_readers(const Version& spec_version,
154
                                       std::vector<RowSetSplits>* rs_splits,
155
0
                                       const CaptureRowsetOps& opts) {
156
0
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
157
0
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
158
0
        return Status::Error<false>(-230, "injected error");
159
0
    });
160
0
    std::shared_lock rlock(_meta_lock);
161
0
    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
162
0
            spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions}));
163
0
    return Status::OK();
164
0
}
165
166
[[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked(
167
39
        const Version& version_range, const CaptureRowsetOps& options) const {
168
39
    if (options.query_freshness_tolerance_ms > 0) {
169
24
        return capture_versions_with_freshness_tolerance(version_range, options);
170
24
    } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
171
13
        return capture_versions_prefer_cache(version_range);
172
13
    }
173
2
    return BaseTablet::capture_consistent_versions_unlocked(version_range, options);
174
39
}
175
176
Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
177
13
        const Version& spec_version) const {
178
13
    g_capture_prefer_cache_count << 1;
179
13
    Versions version_path;
180
13
    std::shared_lock rlock(_meta_lock);
181
13
    auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache(
182
13
            spec_version, version_path,
183
92
            [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); });
184
13
    if (!st.ok()) {
185
0
        return ResultError(st);
186
0
    }
187
13
    int64_t path_max_version = version_path.back().second;
188
13
    VLOG_DEBUG << fmt::format(
189
0
            "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, "
190
0
            "tablet_id={}, spec_version={}, path_max_version={}",
191
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
192
0
                          return fmt::format("{}", version.to_string());
193
0
                      }),
194
0
                      ", "),
195
0
            tablet_id(), spec_version.to_string(), path_max_version);
196
13
    return version_path;
197
13
}
198
199
230
bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const {
200
230
    if (start_version > end_version) {
201
0
        return false;
202
0
    }
203
230
    Version version {start_version, end_version};
204
230
    auto it = _rs_version_map.find(version);
205
230
    if (it == _rs_version_map.end()) {
206
78
        it = _stale_rs_version_map.find(version);
207
78
        if (it == _stale_rs_version_map.end()) {
208
0
            LOG_WARNING(
209
0
                    "fail to find Rowset in rs_version or stale_rs_version for version. "
210
0
                    "tablet={}, version={}",
211
0
                    tablet_id(), version.to_string());
212
0
            return false;
213
0
        }
214
78
    }
215
230
    const auto& rs = it->second;
216
230
    if (rs->visible_timestamp() < _engine.startup_timepoint()) {
217
        // We only care about rowsets that are created after startup time point. For other rowsets,
218
        // we assume they are warmed up.
219
13
        return true;
220
13
    }
221
217
    return is_rowset_warmed_up(rs->rowset_id());
222
230
};
223
224
Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance(
225
24
        const Version& spec_version, const CaptureRowsetOps& options) const {
226
24
    g_capture_with_freshness_tolerance_count << 1;
227
24
    using namespace std::chrono;
228
24
    auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
229
24
    auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms);
230
    // find a version path where every edge(rowset) has been warmuped
231
24
    Versions version_path;
232
24
    std::shared_lock rlock(_meta_lock);
233
24
    if (enable_unique_key_merge_on_write()) {
234
        // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
235
        // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue.
236
12
        RETURN_IF_ERROR_RESULT(
237
12
                _timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
238
12
                        spec_version, version_path, [&](int64_t start, int64_t end) {
239
12
                            return rowset_is_warmed_up_unlocked(start, end);
240
12
                        }));
241
12
    } else {
242
12
        RETURN_IF_ERROR_RESULT(
243
12
                _timestamped_version_tracker.capture_consistent_versions_with_validator(
244
12
                        spec_version, version_path, [&](int64_t start, int64_t end) {
245
12
                            return rowset_is_warmed_up_unlocked(start, end);
246
12
                        }));
247
12
    }
248
24
    int64_t path_max_version = version_path.back().second;
249
    // use std::views::concat after C++26
250
417
    auto check_fn = [this, path_max_version, freshness_limit_tp](const auto& rs_meta) {
251
417
        return _check_rowset_should_be_visible_but_not_warmed_up(rs_meta, path_max_version,
252
417
                                                                 freshness_limit_tp);
253
417
    };
254
24
    bool should_fallback =
255
24
            std::ranges::any_of(std::views::values(_tablet_meta->all_rs_metas()), check_fn) ||
256
24
            std::ranges::any_of(std::views::values(_tablet_meta->all_stale_rs_metas()), check_fn);
257
24
    if (should_fallback) {
258
5
        rlock.unlock();
259
5
        g_capture_with_freshness_tolerance_fallback_count << 1;
260
        // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version
261
        // but has not been warmuped up yet, fallback to capture rowsets as usual
262
5
        return BaseTablet::capture_consistent_versions_unlocked(spec_version, options);
263
5
    }
264
19
    VLOG_DEBUG << fmt::format(
265
0
            "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, "
266
0
            "tablet_id={}, spec_version={}, path_max_version={}",
267
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
268
0
                          return fmt::format("{}", version.to_string());
269
0
                      }),
270
0
                      ", "),
271
0
            tablet_id(), spec_version.to_string(), path_max_version);
272
19
    return version_path;
273
24
}
274
275
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
276
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
277
16
Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* stats) {
278
16
    RETURN_IF_ERROR(sync_if_not_running(stats));
279
280
16
    if (options.query_version > 0) {
281
0
        auto lock_start = std::chrono::steady_clock::now();
282
0
        std::shared_lock rlock(_meta_lock);
283
0
        if (stats) {
284
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
285
0
                                                std::chrono::steady_clock::now() - lock_start)
286
0
                                                .count();
287
0
        }
288
0
        if (_max_version >= options.query_version) {
289
0
            return Status::OK();
290
0
        }
291
0
    }
292
293
    // serially execute sync to reduce unnecessary network overhead
294
16
    auto sync_lock_start = std::chrono::steady_clock::now();
295
16
    std::unique_lock lock(_sync_meta_lock);
296
16
    if (stats) {
297
0
        stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
298
0
                                                 std::chrono::steady_clock::now() - sync_lock_start)
299
0
                                                 .count();
300
0
    }
301
16
    if (options.query_version > 0) {
302
0
        auto lock_start = std::chrono::steady_clock::now();
303
0
        std::shared_lock rlock(_meta_lock);
304
0
        if (stats) {
305
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
306
0
                                                std::chrono::steady_clock::now() - lock_start)
307
0
                                                .count();
308
0
        }
309
0
        if (_max_version >= options.query_version) {
310
0
            return Status::OK();
311
0
        }
312
0
    }
313
314
16
    auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, options, stats);
315
16
    if (st.is<ErrorCode::NOT_FOUND>()) {
316
0
        clear_cache();
317
0
    }
318
319
16
    return st;
320
16
}
321
322
// Sync tablet meta and all rowset meta if not running.
323
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
324
// It should be a quite rare situation.
325
16
Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
326
16
    if (tablet_state() == TABLET_RUNNING) {
327
16
        return Status::OK();
328
16
    }
329
330
    // Serially execute sync to reduce unnecessary network overhead
331
0
    auto sync_lock_start = std::chrono::steady_clock::now();
332
0
    std::unique_lock lock(_sync_meta_lock);
333
0
    if (stats) {
334
0
        stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
335
0
                                                 std::chrono::steady_clock::now() - sync_lock_start)
336
0
                                                 .count();
337
0
    }
338
339
0
    {
340
0
        auto lock_start = std::chrono::steady_clock::now();
341
0
        std::shared_lock rlock(_meta_lock);
342
0
        if (stats) {
343
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
344
0
                                                std::chrono::steady_clock::now() - lock_start)
345
0
                                                .count();
346
0
        }
347
0
        if (tablet_state() == TABLET_RUNNING) {
348
0
            return Status::OK();
349
0
        }
350
0
    }
351
352
0
    TabletMetaSharedPtr tablet_meta;
353
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
354
0
    if (!st.ok()) {
355
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
356
0
            clear_cache();
357
0
        }
358
0
        return st;
359
0
    }
360
361
0
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
362
        // MoW may go to here when load while schema change
363
0
        return Status::OK();
364
0
    }
365
366
0
    TimestampedVersionTracker empty_tracker;
367
0
    {
368
0
        auto lock_start = std::chrono::steady_clock::now();
369
0
        std::lock_guard wlock(_meta_lock);
370
0
        if (stats) {
371
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
372
0
                                                std::chrono::steady_clock::now() - lock_start)
373
0
                                                .count();
374
0
        }
375
0
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
376
0
        _rs_version_map.clear();
377
0
        _stale_rs_version_map.clear();
378
0
        std::swap(_timestamped_version_tracker, empty_tracker);
379
0
        _tablet_meta->clear_rowsets();
380
0
        _tablet_meta->clear_stale_rowset();
381
0
        _max_version = -1;
382
0
    }
383
384
0
    st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, {}, stats);
385
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
386
0
        clear_cache();
387
0
    }
388
0
    return st;
389
0
}
390
391
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
392
                              std::unique_lock<std::shared_mutex>& meta_lock,
393
302
                              bool warmup_delta_data) {
394
302
    if (to_add.empty()) {
395
0
        return;
396
0
    }
397
398
302
    VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace();
399
400
302
    if (!version_overlap) {
401
302
        _add_rowsets_directly(to_add, warmup_delta_data);
402
302
        return;
403
302
    }
404
405
    // Filter out existed rowsets
406
0
    auto remove_it =
407
0
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
408
0
                if (auto find_it = _rs_version_map.find(rs->version());
409
0
                    find_it == _rs_version_map.end()) {
410
0
                    return false;
411
0
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
412
0
                    return true; // Same rowset
413
0
                }
414
415
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
416
                // replace existed rowset with `to_add` rowset. This may occur when:
417
                //  1. schema change converts rowsets which have been double written to new tablet
418
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
419
420
                // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
421
422
0
                std::vector<RowsetSharedPtr> unused_rowsets;
423
0
                if (auto find_it = _rs_version_map.find(rs->version());
424
0
                    find_it != _rs_version_map.end()) {
425
0
                    if (find_it->second->rowset_id() == rs->rowset_id()) {
426
0
                        LOG(WARNING) << "tablet_id=" << tablet_id()
427
0
                                     << ", rowset_id=" << rs->rowset_id().to_string()
428
0
                                     << ", existed rowset_id="
429
0
                                     << find_it->second->rowset_id().to_string();
430
0
                        DCHECK(find_it->second->rowset_id() != rs->rowset_id())
431
0
                                << "tablet_id=" << tablet_id()
432
0
                                << ", rowset_id=" << rs->rowset_id().to_string()
433
0
                                << ", existed rowset_id="
434
0
                                << find_it->second->rowset_id().to_string();
435
0
                    }
436
0
                    unused_rowsets.push_back(find_it->second);
437
0
                }
438
0
                add_unused_rowsets(unused_rowsets);
439
440
0
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
441
0
                _rs_version_map[rs->version()] = rs;
442
0
                _tablet_meta->add_rowsets_unchecked({rs});
443
0
                update_base_size(*rs);
444
0
                return true;
445
0
            });
446
447
0
    to_add.erase(remove_it, to_add.end());
448
449
    // delete rowsets with overlapped version
450
0
    std::vector<RowsetSharedPtr> to_add_directly;
451
0
    for (auto& to_add_rs : to_add) {
452
        // delete rowsets with overlapped version
453
0
        std::vector<RowsetSharedPtr> to_delete;
454
0
        Version to_add_v = to_add_rs->version();
455
        // if start_version  > max_version, we can skip checking overlap here.
456
0
        if (to_add_v.first > _max_version) {
457
            // if start_version  > max_version, we can skip checking overlap here.
458
0
            to_add_directly.push_back(to_add_rs);
459
0
        } else {
460
0
            to_add_directly.push_back(to_add_rs);
461
0
            for (auto& [v, rs] : _rs_version_map) {
462
0
                if (to_add_v.contains(v)) {
463
0
                    to_delete.push_back(rs);
464
0
                }
465
0
            }
466
0
            delete_rowsets(to_delete, meta_lock);
467
0
        }
468
0
    }
469
470
0
    _add_rowsets_directly(to_add_directly, warmup_delta_data);
471
0
}
472
473
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
474
105
                                 std::unique_lock<std::shared_mutex>&) {
475
105
    if (to_delete.empty()) {
476
0
        return;
477
0
    }
478
105
    std::vector<RowsetMetaSharedPtr> rs_metas;
479
105
    rs_metas.reserve(to_delete.size());
480
105
    int64_t now = ::time(nullptr);
481
607
    for (auto&& rs : to_delete) {
482
607
        rs->rowset_meta()->set_stale_at(now);
483
607
        rs_metas.push_back(rs->rowset_meta());
484
607
        _stale_rs_version_map[rs->version()] = rs;
485
607
    }
486
105
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
487
607
    for (auto&& rs : to_delete) {
488
607
        _rs_version_map.erase(rs->version());
489
607
    }
490
491
105
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
492
105
}
493
494
void CloudTablet::delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
495
4
                                                   std::unique_lock<std::shared_mutex>&) {
496
4
    if (to_delete.empty()) {
497
1
        return;
498
1
    }
499
3
    std::vector<RowsetMetaSharedPtr> rs_metas;
500
3
    rs_metas.reserve(to_delete.size());
501
4
    for (auto&& rs : to_delete) {
502
4
        rs_metas.push_back(rs->rowset_meta());
503
4
        _rs_version_map.erase(rs->version());
504
        // Remove edge from version graph so that the greedy capture algorithm
505
        // won't prefer the wider stale compaction rowset over individual SC
506
        // output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
507
4
        _timestamped_version_tracker.delete_version(rs->version());
508
4
    }
509
510
    // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
511
    // stale tracking mechanism (_stale_rs_version_map / _stale_version_path_map)
512
    // because SC output will create new rowsets with identical version ranges;
513
    // a later compaction could put those into stale as well, causing two stale
514
    // paths to reference the same version key -- when one path is cleaned first,
515
    // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
516
3
    _tablet_meta->modify_rs_metas({}, rs_metas, true);
517
518
    // Schedule for direct cache cleanup. MS has already recycled these rowsets.
519
3
    add_unused_rowsets(to_delete);
520
3
}
521
522
1
uint64_t CloudTablet::delete_expired_stale_rowsets() {
523
1
    if (config::enable_mow_verbose_log) {
524
0
        LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
525
0
    }
526
1
    std::vector<RowsetSharedPtr> expired_rowsets;
527
    // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
528
1
    std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
529
1
    int64_t expired_stale_sweep_endtime =
530
1
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
531
1
    {
532
1
        std::unique_lock wlock(_meta_lock);
533
534
1
        std::vector<int64_t> path_ids;
535
        // capture the path version to delete
536
1
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
537
538
1
        if (path_ids.empty()) {
539
0
            return 0;
540
0
        }
541
542
1
        for (int64_t path_id : path_ids) {
543
1
            int64_t start_version = -1;
544
1
            int64_t end_version = -1;
545
1
            std::vector<RowsetSharedPtr> stale_rowsets;
546
            // delete stale versions in version graph
547
1
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
548
5
            for (auto& v_ts : version_path->timestamped_versions()) {
549
5
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
550
5
                if (rs_it != _stale_rs_version_map.end()) {
551
5
                    expired_rowsets.push_back(rs_it->second);
552
5
                    stale_rowsets.push_back(rs_it->second);
553
5
                    VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id()
554
0
                               << " rowset_id=" << rs_it->second->rowset_id().to_string()
555
0
                               << " version=" << rs_it->first.to_string();
556
5
                    _stale_rs_version_map.erase(rs_it);
557
5
                } else {
558
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
559
0
                                 << tablet_id();
560
                    // clang-format off
561
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
562
                    // clang-format on
563
0
                }
564
5
                if (start_version < 0) {
565
1
                    start_version = v_ts->version().first;
566
1
                }
567
5
                end_version = v_ts->version().second;
568
5
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
569
5
            }
570
1
            Version version(start_version, end_version);
571
1
            if (!stale_rowsets.empty()) {
572
1
                deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
573
1
            }
574
1
        }
575
1
        _reconstruct_version_tracker_if_necessary();
576
1
    }
577
578
    // if the rowset is not used by any query, we can recycle its cached data early.
579
0
    auto recycled_rowsets = recycle_cached_data(expired_rowsets);
580
1
    if (!recycled_rowsets.empty()) {
581
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
582
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
583
0
    }
584
1
    if (config::enable_mow_verbose_log) {
585
0
        LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
586
0
    }
587
588
1
    add_unused_rowsets(expired_rowsets);
589
1
    if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS &&
590
1
        enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) {
591
        // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
592
0
        OlapStopWatch watch;
593
0
        for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
594
            // agg delete bitmap for pre rowset
595
0
            DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
596
0
            agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
597
            // add remove delete bitmap
598
0
            if (!remove_delete_bitmap_key_ranges.empty()) {
599
0
                std::vector<RowsetId> rowset_ids;
600
0
                for (const auto& rs : unused_rowsets) {
601
0
                    rowset_ids.push_back(rs->rowset_id());
602
0
                }
603
0
                std::lock_guard<std::mutex> lock(_gc_mutex);
604
0
                _unused_delete_bitmap.push_back(
605
0
                        std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
606
0
            }
607
0
        }
608
0
        LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
609
0
                  << ", size=" << deleted_stale_rowsets.size()
610
0
                  << ", cost(us)=" << watch.get_elapse_time_us();
611
0
    }
612
1
    return expired_rowsets.size();
613
1
}
614
615
1
bool CloudTablet::need_remove_unused_rowsets() {
616
1
    std::lock_guard<std::mutex> lock(_gc_mutex);
617
1
    return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
618
1
}
619
620
4
void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
621
4
    std::lock_guard<std::mutex> lock(_gc_mutex);
622
9
    for (const auto& rowset : rowsets) {
623
9
        _unused_rowsets[rowset->rowset_id()] = rowset;
624
9
        g_unused_rowsets_bytes << rowset->total_disk_size();
625
9
    }
626
4
    g_unused_rowsets_count << rowsets.size();
627
4
}
628
629
0
void CloudTablet::remove_unused_rowsets() {
630
0
    std::vector<std::shared_ptr<Rowset>> removed_rowsets;
631
0
    int64_t removed_delete_bitmap_num = 0;
632
0
    OlapStopWatch watch;
633
0
    {
634
0
        std::lock_guard<std::mutex> lock(_gc_mutex);
635
        // 1. remove unused rowsets's cache data and delete bitmap
636
0
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
637
0
            auto& rs = it->second;
638
0
            if (rs.use_count() > 1) {
639
0
                LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
640
0
                             << " has " << rs.use_count() << " references, it cannot be removed";
641
0
                ++it;
642
0
                continue;
643
0
            }
644
0
            tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
645
0
            _rowset_warm_up_states.erase(rs->rowset_id());
646
0
            rs->clear_cache();
647
0
            g_unused_rowsets_count << -1;
648
0
            g_unused_rowsets_bytes << -rs->total_disk_size();
649
0
            removed_rowsets.push_back(std::move(rs));
650
0
            it = _unused_rowsets.erase(it);
651
0
        }
652
0
    }
653
654
0
    {
655
0
        std::vector<RecycledRowsets> recycled_rowsets;
656
657
0
        for (auto& rs : removed_rowsets) {
658
0
            auto index_names = rs->get_index_file_names();
659
0
            recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
660
0
            int64_t segment_size_sum = 0;
661
0
            for (int32_t i = 0; i < rs->num_segments(); i++) {
662
0
                segment_size_sum += rs->rowset_meta()->segment_file_size(i);
663
0
            }
664
0
            g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
665
0
            g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
666
0
            g_file_cache_recycle_cached_data_index_num << index_names.size();
667
0
        }
668
669
0
        if (recycled_rowsets.size() > 0) {
670
0
            auto& manager =
671
0
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
672
0
            manager.recycle_cache(tablet_id(), recycled_rowsets);
673
0
        }
674
0
    }
675
676
0
    {
677
0
        std::lock_guard<std::mutex> lock(_gc_mutex);
678
        // 2. remove delete bitmap of pre rowsets
679
0
        for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
680
0
            auto& rowset_ids = std::get<0>(*it);
681
0
            bool find_unused_rowset = false;
682
0
            for (const auto& rowset_id : rowset_ids) {
683
0
                if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
684
0
                    LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
685
0
                              << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
686
0
                    find_unused_rowset = true;
687
0
                    break;
688
0
                }
689
0
            }
690
0
            if (find_unused_rowset) {
691
0
                ++it;
692
0
                continue;
693
0
            }
694
0
            auto& key_ranges = std::get<1>(*it);
695
0
            tablet_meta()->delete_bitmap().remove(key_ranges);
696
0
            it = _unused_delete_bitmap.erase(it);
697
0
            removed_delete_bitmap_num++;
698
            // TODO(kaijie): recycle cache for unused delete bitmap
699
0
        }
700
0
    }
701
702
0
    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
703
0
              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
704
0
              << ", removed_rowsets_num=" << removed_rowsets.size()
705
0
              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
706
0
              << ", cost(us)=" << watch.get_elapse_time_us();
707
0
}
708
709
871
void CloudTablet::update_base_size(const Rowset& rs) {
710
    // Define base rowset as the rowset of version [2-x]
711
871
    if (rs.start_version() == 2) {
712
122
        _base_size = rs.total_disk_size();
713
122
    }
714
871
}
715
716
0
void CloudTablet::clear_cache() {
717
0
    auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
718
0
    if (!recycled_rowsets.empty()) {
719
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
720
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
721
0
    }
722
0
    _engine.tablet_mgr().erase_tablet(tablet_id());
723
0
}
724
725
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
726
1
        const std::vector<RowsetSharedPtr>& rowsets) {
727
1
    std::vector<RecycledRowsets> recycled_rowsets;
728
5
    for (const auto& rs : rowsets) {
729
        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
730
5
        if (rs.use_count() > 2) {
731
5
            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
732
5
                         << " references. File Cache won't be recycled when query is using it.";
733
5
            continue;
734
5
        }
735
0
        rs->clear_cache();
736
0
        auto index_names = rs->get_index_file_names();
737
0
        recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
738
739
0
        int64_t segment_size_sum = 0;
740
0
        for (int32_t i = 0; i < rs->num_segments(); i++) {
741
0
            segment_size_sum += rs->rowset_meta()->segment_file_size(i);
742
0
        }
743
0
        g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
744
0
        g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
745
0
        g_file_cache_recycle_cached_data_index_num << index_names.size();
746
0
    }
747
1
    return recycled_rowsets;
748
1
}
749
750
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
751
0
                                          int64_t num_rows, int64_t data_size) {
752
0
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
753
0
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
754
0
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
755
0
    int64_t cumu_num_deltas = 0;
756
0
    int64_t cumu_num_rowsets = 0;
757
0
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
758
0
    for (auto& [v, r] : _rs_version_map) {
759
0
        if (v.second < cp) {
760
0
            continue;
761
0
        }
762
0
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
763
0
        ++cumu_num_rowsets;
764
0
    }
765
    // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets
766
    // in the version map, so we use the max value to ensure that the approximate number
767
    // of rowsets is at least the size of _rs_version_map.
768
    // Note that this is not the exact number of rowsets, but an approximate number.
769
0
    int64_t approximate_num_rowsets =
770
0
            std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size()));
771
0
    _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed);
772
0
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
773
0
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
774
0
}
775
776
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
777
0
        RowsetWriterContext& context, bool vertical) {
778
0
    context.rowset_id = _engine.next_rowset_id();
779
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
780
0
    context.tablet_id = tablet_id();
781
0
    context.index_id = index_id();
782
0
    context.partition_id = partition_id();
783
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
784
0
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
785
0
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
786
0
}
787
788
// create a rowset writer with rowset_id and seg_id
789
// after writer, merge this transient rowset with original rowset
790
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
791
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
792
0
        int64_t txn_expiration) {
793
0
    if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
794
0
        rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
795
0
        auto msg = fmt::format(
796
0
                "wrong rowset state when create_transient_rowset_writer, rowset state should be "
797
0
                "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
798
0
                RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
799
0
                tablet_id());
800
        // see `CloudRowsetWriter::build` for detail.
801
        // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
802
        // in `RowsetMeta::merge_rowset_meta()` in previous trials.
803
0
        LOG(WARNING) << msg;
804
0
        DCHECK(false) << msg;
805
0
    }
806
0
    RowsetWriterContext context;
807
0
    context.rowset_state = PREPARED;
808
0
    context.segments_overlap = OVERLAPPING;
809
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
810
    // This is because the partial update for a variant needs to ignore the extracted columns.
811
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
812
    // the complete variant is constructed by reading all the sub-columns of the variant.
813
0
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
814
0
    context.newest_write_timestamp = UnixSeconds();
815
0
    context.tablet_id = table_id();
816
0
    context.enable_segcompaction = false;
817
0
    context.write_type = DataWriteType::TYPE_DIRECT;
818
0
    context.partial_update_info = std::move(partial_update_info);
819
0
    context.is_transient_rowset_writer = true;
820
0
    context.rowset_id = rowset.rowset_id();
821
0
    context.tablet_id = tablet_id();
822
0
    context.index_id = index_id();
823
0
    context.partition_id = partition_id();
824
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
825
0
    context.txn_expiration = txn_expiration;
826
0
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
827
    // TODO(liaoxin) enable packed file for transient rowset
828
0
    context.allow_packed_file = false;
829
830
0
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
831
0
    if (!storage_resource) {
832
0
        return ResultError(std::move(storage_resource.error()));
833
0
    }
834
835
0
    context.storage_resource = *storage_resource.value();
836
837
0
    return RowsetFactory::create_rowset_writer(_engine, context, false)
838
0
            .transform([&](auto&& writer) {
839
0
                writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
840
0
                return writer;
841
0
            });
842
0
}
843
844
3
int64_t CloudTablet::get_cloud_base_compaction_score() const {
845
3
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
846
0
        bool has_delete = false;
847
0
        int64_t point = cumulative_layer_point();
848
0
        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
849
0
        for (const auto& [_, rs_meta] : _tablet_meta->all_rs_metas()) {
850
0
            if (rs_meta->start_version() >= point) {
851
0
                continue;
852
0
            }
853
0
            if (rs_meta->has_delete_predicate()) {
854
0
                has_delete = true;
855
0
                break;
856
0
            }
857
0
        }
858
0
        if (!has_delete) {
859
0
            return 0;
860
0
        }
861
0
    }
862
863
3
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
864
3
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
865
3
}
866
867
1
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
868
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
869
    //  number of tablet versions simultaneously.
870
1
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
871
1
}
872
873
// return a json string to show the compaction status of this tablet
874
33
void CloudTablet::get_compaction_status(std::string* json_result) {
875
33
    rapidjson::Document root;
876
33
    root.SetObject();
877
878
33
    rapidjson::Document path_arr;
879
33
    path_arr.SetArray();
880
881
33
    std::vector<RowsetSharedPtr> rowsets;
882
33
    std::vector<RowsetSharedPtr> stale_rowsets;
883
33
    {
884
33
        std::shared_lock rdlock(_meta_lock);
885
33
        rowsets.reserve(_rs_version_map.size());
886
148
        for (auto& it : _rs_version_map) {
887
148
            rowsets.push_back(it.second);
888
148
        }
889
33
        stale_rowsets.reserve(_stale_rs_version_map.size());
890
540
        for (auto& it : _stale_rs_version_map) {
891
540
            stale_rowsets.push_back(it.second);
892
540
        }
893
33
    }
894
33
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
895
33
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
896
897
    // get snapshot version path json_doc
898
33
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
899
33
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
900
33
    rapidjson::Value cumu_value;
901
33
    std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
902
33
    cumu_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
903
33
                         root.GetAllocator());
904
33
    root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
905
33
    rapidjson::Value base_value;
906
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
907
33
    base_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
908
33
                         root.GetAllocator());
909
33
    root.AddMember("last base failure time", base_value, root.GetAllocator());
910
33
    rapidjson::Value full_value;
911
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
912
33
    full_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
913
33
                         root.GetAllocator());
914
33
    root.AddMember("last full failure time", full_value, root.GetAllocator());
915
33
    rapidjson::Value cumu_success_value;
916
33
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
917
33
    cumu_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
918
33
                                 root.GetAllocator());
919
33
    root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
920
33
    rapidjson::Value base_success_value;
921
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
922
33
    base_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
923
33
                                 root.GetAllocator());
924
33
    root.AddMember("last base success time", base_success_value, root.GetAllocator());
925
33
    rapidjson::Value full_success_value;
926
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
927
33
    full_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
928
33
                                 root.GetAllocator());
929
33
    root.AddMember("last full success time", full_success_value, root.GetAllocator());
930
33
    rapidjson::Value cumu_schedule_value;
931
33
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
932
33
    cumu_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
933
33
                                  root.GetAllocator());
934
33
    root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
935
33
    rapidjson::Value base_schedule_value;
936
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
937
33
    base_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
938
33
                                  root.GetAllocator());
939
33
    root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
940
33
    rapidjson::Value full_schedule_value;
941
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
942
33
    full_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
943
33
                                  root.GetAllocator());
944
33
    root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
945
33
    rapidjson::Value cumu_compaction_status_value;
946
33
    cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
947
33
                                           cast_set<uint>(_last_cumu_compaction_status.length()),
948
33
                                           root.GetAllocator());
949
33
    root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
950
33
    rapidjson::Value base_compaction_status_value;
951
33
    base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
952
33
                                           cast_set<uint>(_last_base_compaction_status.length()),
953
33
                                           root.GetAllocator());
954
33
    root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
955
33
    rapidjson::Value full_compaction_status_value;
956
33
    full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
957
33
                                           cast_set<uint>(_last_full_compaction_status.length()),
958
33
                                           root.GetAllocator());
959
33
    root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());
960
33
    rapidjson::Value exec_compaction_time;
961
33
    std::string num_str {std::to_string(exec_compaction_time_us.load())};
962
33
    exec_compaction_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
963
33
                                   root.GetAllocator());
964
33
    root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator());
965
33
    rapidjson::Value local_read_time;
966
33
    num_str = std::to_string(local_read_time_us.load());
967
33
    local_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
968
33
                              root.GetAllocator());
969
33
    root.AddMember("compaction local read time us", local_read_time, root.GetAllocator());
970
33
    rapidjson::Value remote_read_time;
971
33
    num_str = std::to_string(remote_read_time_us.load());
972
33
    remote_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
973
33
                               root.GetAllocator());
974
33
    root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator());
975
976
    // print all rowsets' version as an array
977
33
    rapidjson::Document versions_arr;
978
33
    rapidjson::Document missing_versions_arr;
979
33
    versions_arr.SetArray();
980
33
    missing_versions_arr.SetArray();
981
33
    int64_t last_version = -1;
982
148
    for (auto& rowset : rowsets) {
983
148
        const Version& ver = rowset->version();
984
148
        if (ver.first != last_version + 1) {
985
0
            rapidjson::Value miss_value;
986
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
987
0
                                 missing_versions_arr.GetAllocator());
988
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
989
0
        }
990
148
        rapidjson::Value value;
991
148
        std::string version_str = rowset->get_rowset_info_str();
992
148
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
993
148
                        versions_arr.GetAllocator());
994
148
        versions_arr.PushBack(value, versions_arr.GetAllocator());
995
148
        last_version = ver.second;
996
148
    }
997
33
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
998
33
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
999
1000
    // print all stale rowsets' version as an array
1001
33
    rapidjson::Document stale_versions_arr;
1002
33
    stale_versions_arr.SetArray();
1003
540
    for (auto& rowset : stale_rowsets) {
1004
540
        rapidjson::Value value;
1005
540
        std::string version_str = rowset->get_rowset_info_str();
1006
540
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
1007
540
                        stale_versions_arr.GetAllocator());
1008
540
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
1009
540
    }
1010
33
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
1011
1012
    // add stale version rowsets
1013
33
    root.AddMember("stale version path", path_arr, root.GetAllocator());
1014
1015
    // to json string
1016
33
    rapidjson::StringBuffer strbuf;
1017
33
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1018
33
    root.Accept(writer);
1019
33
    *json_result = std::string(strbuf.GetString());
1020
33
}
1021
1022
0
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
1023
0
    if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
1024
0
        _cumulative_point = new_point;
1025
0
        return;
1026
0
    }
1027
    // cumulative point should only be reset to -1, or be increased
1028
    // FIXME: could happen in currently unresolved race conditions
1029
0
    LOG(WARNING) << "Unexpected cumulative point: " << new_point
1030
0
                 << ", origin: " << _cumulative_point.load();
1031
0
}
1032
1033
Status CloudTablet::check_rowset_schema_for_build_index(std::vector<TColumn>& columns,
1034
10
                                                        int schema_version) {
1035
10
    std::map<std::string, TabletColumn> fe_col_map;
1036
14
    for (int i = 0; i < columns.size(); i++) {
1037
4
        fe_col_map[columns[i].column_name] = TabletColumn(columns[i]);
1038
4
    }
1039
1040
10
    std::shared_lock rlock(_meta_lock);
1041
10
    for (const auto& [version, rs] : _rs_version_map) {
1042
4
        if (version.first == 0) {
1043
0
            continue;
1044
0
        }
1045
1046
4
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1047
0
            continue;
1048
0
        }
1049
1050
4
        for (auto rs_col : rs->tablet_schema()->columns()) {
1051
4
            auto find_ret = fe_col_map.find(rs_col->name());
1052
4
            if (find_ret == fe_col_map.end()) {
1053
1
                return Status::InternalError(
1054
1
                        "check rowset meta failed:rowset's col is dropped in FE.");
1055
1
            }
1056
1057
3
            if (rs_col->unique_id() != find_ret->second.unique_id()) {
1058
1
                return Status::InternalError("check rowset meta failed:col id not match.");
1059
1
            }
1060
1061
2
            if (rs_col->type() != find_ret->second.type()) {
1062
1
                return Status::InternalError("check rowset meta failed:col type not match.");
1063
1
            }
1064
2
        }
1065
4
    }
1066
1067
7
    return Status::OK();
1068
10
}
1069
1070
Result<RowsetSharedPtr> CloudTablet::pick_a_rowset_for_index_change(int schema_version,
1071
9
                                                                    bool& is_base_rowset) {
1072
9
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudTablet::pick_a_rowset_for_index_change",
1073
2
                                      Result<RowsetSharedPtr>(nullptr));
1074
2
    RowsetSharedPtr ret_rowset = nullptr;
1075
2
    std::shared_lock rlock(_meta_lock);
1076
2
    for (const auto& [version, rs] : _rs_version_map) {
1077
2
        if (version.first == 0) {
1078
0
            continue;
1079
0
        }
1080
2
        if (rs->num_rows() == 0) {
1081
1
            VLOG_DEBUG << "[index_change]find empty rs, index change may "
1082
0
                          "failed, id="
1083
0
                       << rs->rowset_id().to_string();
1084
1
        }
1085
1086
2
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1087
2
            VLOG_DEBUG << "[index_change] skip rowset " << rs->tablet_schema()->schema_version()
1088
0
                       << "," << schema_version;
1089
2
            continue;
1090
2
        }
1091
1092
0
        if (ret_rowset == nullptr) {
1093
0
            ret_rowset = rs;
1094
0
            continue;
1095
0
        }
1096
1097
0
        if (rs->start_version() > ret_rowset->start_version()) {
1098
0
            ret_rowset = rs;
1099
0
        }
1100
0
    }
1101
1102
2
    if (ret_rowset != nullptr) {
1103
0
        is_base_rowset = ret_rowset->version().first < _cumulative_point;
1104
0
    }
1105
1106
2
    return ret_rowset;
1107
9
}
1108
1109
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
1110
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
1111
0
    {
1112
0
        std::shared_lock rlock(_meta_lock);
1113
0
        for (const auto& [version, rs] : _rs_version_map) {
1114
0
            if (version.first != 0 && version.first < _cumulative_point &&
1115
0
                (_alter_version == -1 || version.second <= _alter_version)) {
1116
0
                candidate_rowsets.push_back(rs);
1117
0
            }
1118
0
        }
1119
0
    }
1120
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1121
0
    return candidate_rowsets;
1122
0
}
1123
1124
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
1125
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
1126
0
    {
1127
0
        std::shared_lock rlock(_meta_lock);
1128
0
        for (auto& [v, rs] : _rs_version_map) {
1129
            // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
1130
0
            if (v.first != 0) {
1131
0
                candidate_rowsets.push_back(rs);
1132
0
            }
1133
0
        }
1134
0
    }
1135
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1136
0
    return candidate_rowsets;
1137
0
}
1138
1139
0
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
1140
0
    return _engine.calc_delete_bitmap_executor();
1141
0
}
1142
1143
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
1144
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
1145
                                       const RowsetIdUnorderedSet& cur_rowset_ids, int64_t lock_id,
1146
0
                                       int64_t next_visible_version) {
1147
0
    RowsetSharedPtr rowset = txn_info->rowset;
1148
0
    int64_t cur_version = rowset->start_version();
1149
    // update delete bitmap info, in order to avoid recalculation when trying again
1150
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1151
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
1152
1153
0
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
1154
0
        rowset_writer->num_rows() > 0) {
1155
0
        DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
1156
0
            return Status::InternalError<false>("injected update_tmp_rowset error.");
1157
0
        });
1158
0
        const auto& rowset_meta = rowset->rowset_meta();
1159
0
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
1160
0
    }
1161
1162
0
    RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id,
1163
0
                                             next_visible_version, rowset));
1164
1165
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
1166
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
1167
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
1168
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1169
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
1170
0
            txn_info->publish_info));
1171
1172
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
1173
0
        auto sleep_sec = dp->param<int>("sleep", 5);
1174
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1175
0
    });
1176
1177
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
1178
0
        auto retry = dp->param<bool>("retry", false);
1179
0
        auto sleep_sec = dp->param<int>("sleep", 0);
1180
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1181
0
        if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
1182
0
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1183
0
                    "injected DELETE_BITMAP_LOCK_ERROR");
1184
0
        } else {
1185
0
            return Status::InternalError<false>("injected non-retryable error");
1186
0
        }
1187
0
    });
1188
1189
0
    return Status::OK();
1190
0
}
1191
1192
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
1193
                                             DeleteBitmapPtr delete_bitmap, int64_t lock_id,
1194
0
                                             int64_t next_visible_version, RowsetSharedPtr rowset) {
1195
0
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1196
0
    for (auto iter = delete_bitmap->delete_bitmap.begin();
1197
0
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
1198
        // skip sentinel mark, which is used for delete bitmap correctness check
1199
0
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
1200
0
            new_delete_bitmap->merge(
1201
0
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
1202
0
                    iter->second);
1203
0
        }
1204
0
    }
1205
    // lock_id != -1 means this is in an explict txn
1206
0
    bool is_explicit_txn = (lock_id != -1);
1207
0
    auto ms_lock_id = !is_explicit_txn ? txn_id : lock_id;
1208
0
    std::optional<StorageResource> storage_resource;
1209
0
    auto storage_resource_result = rowset->rowset_meta()->remote_storage_resource();
1210
0
    if (storage_resource_result) {
1211
0
        storage_resource = *storage_resource_result.value();
1212
0
    }
1213
0
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
1214
0
            *this, ms_lock_id, LOAD_INITIATOR_ID, new_delete_bitmap.get(), new_delete_bitmap.get(),
1215
0
            rowset->rowset_id().to_string(), storage_resource,
1216
0
            config::delete_bitmap_store_write_version, txn_id, is_explicit_txn,
1217
0
            next_visible_version));
1218
0
    return Status::OK();
1219
0
}
1220
1221
0
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
1222
0
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
1223
1224
    // sort the existing versions in ascending order
1225
0
    std::sort(existing_versions.begin(), existing_versions.end(),
1226
0
              [](const Version& a, const Version& b) {
1227
                  // simple because 2 versions are certainly not overlapping
1228
0
                  return a.first < b.first;
1229
0
              });
1230
1231
    // From the first version(=0), find the missing version until spec_version
1232
0
    int64_t last_version = -1;
1233
0
    Versions missed_versions;
1234
0
    for (const Version& version : existing_versions) {
1235
0
        if (version.first > last_version + 1) {
1236
            // there is a hole between versions
1237
0
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
1238
0
        }
1239
0
        last_version = version.second;
1240
0
        if (last_version >= spec_version) {
1241
0
            break;
1242
0
        }
1243
0
    }
1244
0
    if (last_version < spec_version) {
1245
        // there is a hole between the last version and the specificed version.
1246
0
        missed_versions.emplace_back(last_version + 1, spec_version);
1247
0
    }
1248
0
    return missed_versions;
1249
0
}
1250
1251
Status CloudTablet::calc_delete_bitmap_for_compaction(
1252
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
1253
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
1254
        int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
1255
0
        bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) {
1256
0
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1257
0
    std::unique_ptr<RowLocationSet> missed_rows;
1258
0
    if ((config::enable_missing_rows_correctness_check ||
1259
0
         config::enable_mow_compaction_correctness_check_core ||
1260
0
         config::enable_mow_compaction_correctness_check_fail) &&
1261
0
        !allow_delete_in_cumu_compaction &&
1262
0
        (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1263
0
         !config::enable_prune_delete_sign_when_base_compaction)) {
1264
        // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false
1265
0
        missed_rows = std::make_unique<RowLocationSet>();
1266
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
1267
0
    }
1268
1269
0
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
1270
0
    if (config::enable_rowid_conversion_correctness_check &&
1271
0
        tablet_schema()->cluster_key_uids().empty()) {
1272
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
1273
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
1274
0
    }
1275
1276
    // 1. calc delete bitmap for historical data
1277
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1278
0
    Version version = max_version();
1279
0
    std::size_t missed_rows_size = 0;
1280
0
    calc_compaction_output_rowset_delete_bitmap(
1281
0
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
1282
0
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1283
0
    if (missed_rows) {
1284
0
        missed_rows_size = missed_rows->size();
1285
0
        if (!allow_delete_in_cumu_compaction) {
1286
0
            if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1287
0
                 !config::enable_prune_delete_sign_when_base_compaction) &&
1288
0
                tablet_state() == TABLET_RUNNING) {
1289
0
                if (merged_rows + filtered_rows >= 0 &&
1290
0
                    merged_rows + filtered_rows != missed_rows_size) {
1291
0
                    std::string err_msg = fmt::format(
1292
0
                            "cumulative compaction: the merged rows({}), the filtered rows({}) is "
1293
0
                            "not equal to missed rows({}) in rowid conversion, tablet_id: {}, "
1294
0
                            "table_id:{}",
1295
0
                            merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id());
1296
0
                    LOG(WARNING) << err_msg;
1297
0
                    if (config::enable_mow_compaction_correctness_check_core) {
1298
0
                        CHECK(false) << err_msg;
1299
0
                    } else if (config::enable_mow_compaction_correctness_check_fail) {
1300
0
                        return Status::InternalError<false>(err_msg);
1301
0
                    } else {
1302
0
                        DCHECK(false) << err_msg;
1303
0
                    }
1304
0
                }
1305
0
            }
1306
0
        }
1307
0
    }
1308
0
    if (location_map) {
1309
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1310
0
        location_map->clear();
1311
0
    }
1312
1313
    // 2. calc delete bitmap for incremental data
1314
0
    int64_t t1 = MonotonicMicros();
1315
0
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
1316
0
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
1317
0
    int64_t t2 = MonotonicMicros();
1318
0
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
1319
0
        g_cu_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1320
0
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
1321
0
        g_base_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1322
0
    }
1323
0
    get_delete_bitmap_lock_start_time = t2;
1324
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1325
0
    int64_t t3 = MonotonicMicros();
1326
1327
0
    calc_compaction_output_rowset_delete_bitmap(
1328
0
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
1329
0
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1330
0
    int64_t t4 = MonotonicMicros();
1331
0
    if (location_map) {
1332
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1333
0
    }
1334
0
    int64_t t5 = MonotonicMicros();
1335
1336
    // 3. store delete bitmap
1337
0
    DeleteBitmapPtr delete_bitmap_v2 = nullptr;
1338
0
    auto delete_bitmap_size = output_rowset_delete_bitmap->delete_bitmap.size();
1339
0
    auto store_version = config::delete_bitmap_store_write_version;
1340
0
    if (store_version == 2 || store_version == 3) {
1341
0
        delete_bitmap_v2 = std::make_shared<DeleteBitmap>(*output_rowset_delete_bitmap);
1342
0
        std::vector<std::pair<RowsetId, int64_t>> retained_rowsets_to_seg_num;
1343
0
        {
1344
0
            std::shared_lock rlock(get_header_lock());
1345
0
            for (const auto& [rowset_version, rowset_ptr] : rowset_map()) {
1346
0
                if (rowset_version.second < output_rowset->start_version()) {
1347
0
                    retained_rowsets_to_seg_num.emplace_back(
1348
0
                            std::make_pair(rowset_ptr->rowset_id(), rowset_ptr->num_segments()));
1349
0
                }
1350
0
            }
1351
0
        }
1352
0
        if (config::enable_agg_delta_delete_bitmap_for_store_v2) {
1353
0
            tablet_meta()->delete_bitmap().subset_and_agg(
1354
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1355
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1356
0
        } else {
1357
0
            tablet_meta()->delete_bitmap().subset(
1358
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1359
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1360
0
        }
1361
0
    }
1362
0
    std::optional<StorageResource> storage_resource;
1363
0
    auto storage_resource_result = output_rowset->rowset_meta()->remote_storage_resource();
1364
0
    if (storage_resource_result) {
1365
0
        storage_resource = *storage_resource_result.value();
1366
0
    }
1367
0
    auto st = _engine.meta_mgr().update_delete_bitmap(
1368
0
            *this, -1, initiator, output_rowset_delete_bitmap.get(), delete_bitmap_v2.get(),
1369
0
            output_rowset->rowset_id().to_string(), storage_resource, store_version);
1370
0
    int64_t t6 = MonotonicMicros();
1371
0
    LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
1372
0
              << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
1373
0
              << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
1374
0
              << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
1375
0
              << " us, st=" << st.to_string() << ". store_version=" << store_version
1376
0
              << ", calculated delete bitmap size=" << delete_bitmap_size
1377
0
              << ", update delete bitmap size="
1378
0
              << output_rowset_delete_bitmap->delete_bitmap.size();
1379
0
    return st;
1380
0
}
1381
1382
void CloudTablet::agg_delete_bitmap_for_compaction(
1383
        int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
1384
        DeleteBitmapPtr& new_delete_bitmap,
1385
0
        std::map<std::string, int64_t>& pre_rowset_to_versions) {
1386
0
    for (auto& rowset : pre_rowsets) {
1387
0
        for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
1388
0
            auto d = tablet_meta()->delete_bitmap().get_agg_without_cache(
1389
0
                    {rowset->rowset_id(), seg_id, end_version}, start_version);
1390
0
            if (d->isEmpty()) {
1391
0
                continue;
1392
0
            }
1393
0
            VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
1394
0
                       << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
1395
0
                       << ", rowset_version=" << rowset->version().to_string()
1396
0
                       << ". compaction start_version=" << start_version
1397
0
                       << ", end_version=" << end_version
1398
0
                       << ". delete_bitmap cardinality=" << d->cardinality();
1399
0
            DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
1400
0
            new_delete_bitmap->set(end_key, *d);
1401
0
            pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
1402
0
        }
1403
0
    }
1404
0
}
1405
1406
5
Status CloudTablet::sync_meta() {
1407
5
    if (!config::enable_file_cache) {
1408
1
        return Status::OK();
1409
1
    }
1410
1411
4
    TabletMetaSharedPtr tablet_meta;
1412
4
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
1413
4
    if (!st.ok()) {
1414
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
1415
0
            clear_cache();
1416
0
        }
1417
0
        return st;
1418
0
    }
1419
1420
4
    auto new_compaction_policy = tablet_meta->compaction_policy();
1421
4
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
1422
1
        _tablet_meta->set_compaction_policy(new_compaction_policy);
1423
1
    }
1424
4
    auto new_time_series_compaction_goal_size_mbytes =
1425
4
            tablet_meta->time_series_compaction_goal_size_mbytes();
1426
4
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
1427
4
        new_time_series_compaction_goal_size_mbytes) {
1428
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
1429
0
                new_time_series_compaction_goal_size_mbytes);
1430
0
    }
1431
4
    auto new_time_series_compaction_file_count_threshold =
1432
4
            tablet_meta->time_series_compaction_file_count_threshold();
1433
4
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
1434
4
        new_time_series_compaction_file_count_threshold) {
1435
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
1436
0
                new_time_series_compaction_file_count_threshold);
1437
0
    }
1438
4
    auto new_time_series_compaction_time_threshold_seconds =
1439
4
            tablet_meta->time_series_compaction_time_threshold_seconds();
1440
4
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
1441
4
        new_time_series_compaction_time_threshold_seconds) {
1442
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
1443
0
                new_time_series_compaction_time_threshold_seconds);
1444
0
    }
1445
4
    auto new_time_series_compaction_empty_rowsets_threshold =
1446
4
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
1447
4
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
1448
4
        new_time_series_compaction_empty_rowsets_threshold) {
1449
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
1450
0
                new_time_series_compaction_empty_rowsets_threshold);
1451
0
    }
1452
4
    auto new_time_series_compaction_level_threshold =
1453
4
            tablet_meta->time_series_compaction_level_threshold();
1454
4
    if (_tablet_meta->time_series_compaction_level_threshold() !=
1455
4
        new_time_series_compaction_level_threshold) {
1456
0
        _tablet_meta->set_time_series_compaction_level_threshold(
1457
0
                new_time_series_compaction_level_threshold);
1458
0
    }
1459
    // Sync disable_auto_compaction (stored in tablet_schema)
1460
4
    auto new_disable_auto_compaction = tablet_meta->tablet_schema()->disable_auto_compaction();
1461
4
    if (_tablet_meta->tablet_schema()->disable_auto_compaction() != new_disable_auto_compaction) {
1462
3
        _tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction(
1463
3
                new_disable_auto_compaction);
1464
3
    }
1465
    // Sync vertical_compaction_num_columns_per_group
1466
4
    auto new_vertical_compaction_num_columns_per_group =
1467
4
            tablet_meta->vertical_compaction_num_columns_per_group();
1468
4
    if (_tablet_meta->vertical_compaction_num_columns_per_group() !=
1469
4
        new_vertical_compaction_num_columns_per_group) {
1470
0
        _tablet_meta->set_vertical_compaction_num_columns_per_group(
1471
0
                new_vertical_compaction_num_columns_per_group);
1472
0
    }
1473
1474
4
    return Status::OK();
1475
4
}
1476
1477
0
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
1478
0
    std::shared_lock rdlock(_meta_lock);
1479
0
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
1480
0
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
1481
    // Currently, this information will not be used by the cloud report,
1482
    // but it may be used in the future.
1483
0
}
1484
1485
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
1486
0
                                              DeleteBitmap* expected_delete_bitmap) {
1487
0
    DeleteBitmapPtr cached_delete_bitmap;
1488
0
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1489
0
    Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
1490
0
            txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
1491
0
    if (st.ok()) {
1492
0
        bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
1493
0
        auto msg = fmt::format(
1494
0
                "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
1495
0
                "txn_id={}, tablet_id={}",
1496
0
                expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
1497
0
                tablet_id());
1498
0
        if (!res) {
1499
0
            DCHECK(res) << msg;
1500
0
            return Status::InternalError<false>(msg);
1501
0
        }
1502
0
    }
1503
0
    return Status::OK();
1504
0
}
1505
1506
35
WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
1507
35
    std::shared_lock rlock(_meta_lock);
1508
35
    if (!_rowset_warm_up_states.contains(rowset_id)) {
1509
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1510
1
    }
1511
34
    auto& warmup_info = _rowset_warm_up_states[rowset_id];
1512
34
    warmup_info.update_state();
1513
34
    return warmup_info.state;
1514
35
}
1515
1516
bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
1517
34
                                          std::chrono::steady_clock::time_point start_tp) {
1518
34
    std::lock_guard wlock(_meta_lock);
1519
34
    return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
1520
34
}
1521
1522
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
1523
5
                                                              RowsetId rowset_id, int64_t delta) {
1524
5
    std::lock_guard wlock(_meta_lock);
1525
5
    return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
1526
5
}
1527
1528
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
1529
                                                                       RowsetId rowset_id,
1530
5
                                                                       int64_t delta) {
1531
5
    auto it = _rowset_warm_up_states.find(rowset_id);
1532
5
    if (it == _rowset_warm_up_states.end()) {
1533
0
        return false;
1534
0
    }
1535
5
    if (it->second.state.trigger_source != source) {
1536
        // Only the same trigger source can update the state
1537
2
        return false;
1538
2
    }
1539
3
    it->second.num_inverted_idx += delta;
1540
3
    return true;
1541
5
}
1542
1543
bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
1544
                                                   WarmUpTriggerSource source,
1545
34
                                                   std::chrono::steady_clock::time_point start_tp) {
1546
34
    auto rowset_id = rowset.rowset_id();
1547
1548
    // Check if rowset already has warmup state
1549
34
    if (_rowset_warm_up_states.contains(rowset_id)) {
1550
10
        auto existing_state = _rowset_warm_up_states[rowset_id].state;
1551
1552
        // For job-triggered warmup (one-time and periodic warmup), allow it to proceed
1553
        // except when there's already another job-triggered warmup in progress
1554
10
        if (source == WarmUpTriggerSource::JOB) {
1555
5
            if (existing_state.trigger_source == WarmUpTriggerSource::JOB &&
1556
5
                existing_state.progress == WarmUpProgress::DOING) {
1557
                // Same job type already in progress, skip to avoid duplicate warmup
1558
1
                return false;
1559
1
            }
1560
5
        } else {
1561
            // For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists
1562
5
            return false;
1563
5
        }
1564
10
    }
1565
1566
28
    if (source == WarmUpTriggerSource::JOB) {
1567
9
        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
1568
19
    } else if (source == WarmUpTriggerSource::SYNC_ROWSET) {
1569
6
        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
1570
13
    } else if (source == WarmUpTriggerSource::EVENT_DRIVEN) {
1571
13
        g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1;
1572
13
    }
1573
28
    _rowset_warm_up_states[rowset_id] = {
1574
28
            .state = {.trigger_source = source,
1575
28
                      .progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE
1576
28
                                                              : WarmUpProgress::DOING)},
1577
28
            .num_segments = rowset.num_segments(),
1578
28
            .start_tp = start_tp};
1579
28
    return true;
1580
34
}
1581
1582
52
void CloudTablet::RowsetWarmUpInfo::update_state() {
1583
52
    if (has_finished()) {
1584
14
        g_file_cache_warm_up_rowset_complete_num << 1;
1585
14
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1586
14
                            std::chrono::steady_clock::now() - start_tp)
1587
14
                            .count();
1588
14
        g_file_cache_warm_up_rowset_all_segments_latency << cost;
1589
14
        state.progress = WarmUpProgress::DONE;
1590
14
    }
1591
52
}
1592
1593
WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
1594
                                                        RowsetId rowset_id, Status status,
1595
                                                        int64_t segment_num,
1596
21
                                                        int64_t inverted_idx_num) {
1597
21
    std::lock_guard wlock(_meta_lock);
1598
21
    auto it = _rowset_warm_up_states.find(rowset_id);
1599
21
    if (it == _rowset_warm_up_states.end()) {
1600
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1601
1
    }
1602
20
    auto& warmup_info = it->second;
1603
20
    if (warmup_info.state.trigger_source != trigger_source) {
1604
        // Only the same trigger source can update the state
1605
2
        return warmup_info.state;
1606
2
    }
1607
18
    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status;
1608
18
    if (segment_num > 0) {
1609
16
        g_file_cache_warm_up_segment_complete_num << segment_num;
1610
16
        if (!status.ok()) {
1611
1
            g_file_cache_warm_up_segment_failed_num << segment_num;
1612
1
        }
1613
16
    }
1614
18
    if (inverted_idx_num > 0) {
1615
2
        g_file_cache_warm_up_inverted_idx_complete_num << inverted_idx_num;
1616
2
        if (!status.ok()) {
1617
0
            g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num;
1618
0
        }
1619
2
    }
1620
18
    warmup_info.done(segment_num, inverted_idx_num);
1621
18
    return warmup_info.state;
1622
20
}
1623
1624
222
bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
1625
222
    auto it = _rowset_warm_up_states.find(rowset_id);
1626
222
    if (it == _rowset_warm_up_states.end()) {
1627
        // The rowset is not in warmup state, which means the rowset has never been warmed up.
1628
        // This may happen when the upstream BE tried to warm up rowsets on this BE but this BE
1629
        // was restarting so the warmup failed, and _rowset_warm_up_states has no entry for it.
1630
        //
1631
        // Normally the startup_timepoint check in rowset_is_warmed_up_unlocked() would filter out
1632
        // such rowsets (visible_timestamp < startup_timepoint → assumed warmed up). However,
1633
        // compaction-produced rowsets have their visible_timestamp set at rowset builder
1634
        // initialization time rather than the final transaction commit time on meta-service,
1635
        // so their visible_timestamp can be earlier than startup_timepoint, causing the
1636
        // startup_timepoint check to NOT filter them out and reaching here with no warmup entry.
1637
        //
1638
        // If such a rowset is before the cumulative compaction point and base compaction never
1639
        // happens, returning false here would cause the version path algorithm to exclude it,
1640
        // leading to a persistently low path_max_version. With continuous upstream ingestion,
1641
        // the freshness tolerance fallback check would keep triggering, making every query on
1642
        // this tablet fall back to reading all data from remote storage.
1643
        //
1644
        // Returning true (optimistically treating it as warmed up) allows the version path to
1645
        // include it. On cache miss the data is transparently read from remote storage per-segment
1646
        // and cached locally in 1MB blocks, so the problem self-heals through subsequent queries.
1647
7
        g_rowset_warmup_state_missing_count << 1;
1648
7
        LOG_EVERY_N(WARNING, 100) << fmt::format(
1649
1
                "rowset warmup state missing, considering it as warmed up. tablet_id={}, "
1650
1
                "rowset_id={}",
1651
1
                tablet_id(), rowset_id.to_string());
1652
7
        return true;
1653
7
    }
1654
215
    return it->second.state.progress == WarmUpProgress::DONE;
1655
222
}
1656
1657
675
void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
1658
675
    _rowset_warm_up_states[rowset_id] = {
1659
675
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1660
675
                      .progress = WarmUpProgress::DONE},
1661
675
            .num_segments = 1,
1662
675
            .start_tp = std::chrono::steady_clock::now()};
1663
675
}
1664
1665
93
void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) {
1666
93
    _rowset_warm_up_states[rowset_id] = {
1667
93
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1668
93
                      .progress = WarmUpProgress::DOING},
1669
93
            .num_segments = 1,
1670
93
            .start_tp = std::chrono::steady_clock::now()};
1671
93
}
1672
1673
bool CloudTablet::_check_rowset_should_be_visible_but_not_warmed_up(
1674
        const RowsetMetaSharedPtr& rs_meta, int64_t path_max_version,
1675
417
        std::chrono::system_clock::time_point freshness_limit_tp) const {
1676
417
    if (rs_meta->version() == Version {0, 1}) {
1677
        // skip rowset[0-1]
1678
22
        return false;
1679
22
    }
1680
395
    bool ret = rs_meta->start_version() > path_max_version &&
1681
395
               rs_meta->visible_timestamp() < freshness_limit_tp;
1682
395
    if (ret && config::read_cluster_cache_opt_verbose_log) {
1683
5
        using namespace std::chrono;
1684
5
        std::time_t t1 = system_clock::to_time_t(rs_meta->visible_timestamp());
1685
5
        std::tm tm1 = *std::localtime(&t1);
1686
5
        std::ostringstream oss1;
1687
5
        oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S");
1688
1689
5
        std::time_t t2 = system_clock::to_time_t(freshness_limit_tp);
1690
5
        std::tm tm2 = *std::localtime(&t2);
1691
5
        std::ostringstream oss2;
1692
5
        oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S");
1693
5
        LOG_INFO(
1694
5
                "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, "
1695
5
                "find a rowset which should be visible but not warmed up, tablet_id={}, "
1696
5
                "path_max_version={}, rowset_id={}, version={}, visible_time={}, "
1697
5
                "freshness_limit={}, version_graph={}, rowset_warmup_digest={}",
1698
5
                tablet_id(), path_max_version, rs_meta->rowset_id().to_string(),
1699
5
                rs_meta->version().to_string(), oss1.str(), oss2.str(),
1700
5
                _timestamped_version_tracker.debug_string(), rowset_warmup_digest());
1701
5
    }
1702
395
    return ret;
1703
417
}
1704
1705
void CloudTablet::_submit_segment_download_task(const RowsetSharedPtr& rs,
1706
                                                const StorageResource* storage_resource, int seg_id,
1707
1708
0
                                                int64_t expiration_time) {
1709
    // clang-format off
1710
0
    const auto& rowset_meta = rs->rowset_meta();
1711
0
    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
1712
    // Use rowset_meta->fs() instead of storage_resource->fs to support packed file.
1713
    // RowsetMeta::fs() wraps the underlying FileSystem with PackedFileSystem when
1714
    // packed_slice_locations is not empty, which correctly maps segment file paths
1715
    // to their actual locations within packed files.
1716
0
    auto file_system = rowset_meta->fs();
1717
0
    if (!file_system) {
1718
0
        LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
1719
0
                     << ", rowset_id=" << rowset_meta->rowset_id();
1720
0
        return;
1721
0
    }
1722
0
    _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
1723
0
            .path = storage_resource->remote_segment_path(*rowset_meta, seg_id),
1724
0
            .file_size = rs->rowset_meta()->segment_file_size(seg_id),
1725
0
            .file_system = file_system,
1726
0
            .ctx = {
1727
0
                    .expiration_time = expiration_time,
1728
0
                    .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1729
0
                    .is_warmup = true
1730
0
            },
1731
0
            .download_done {[=](Status st) {
1732
0
                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
1733
0
                            if (rs->version().second > rs->version().first) {
1734
0
                                auto sleep_time = dp->param<int>("sleep", 3);
1735
0
                                LOG_INFO(
1736
0
                                        "[verbose] block download for rowset={}, "
1737
0
                                        "version={}, sleep={}",
1738
0
                                        rs->rowset_id().to_string(),
1739
0
                                        rs->version().to_string(), sleep_time);
1740
0
                                std::this_thread::sleep_for(
1741
0
                                        std::chrono::seconds(sleep_time));
1742
0
                            }
1743
0
                });
1744
0
                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
1745
0
                if (!st) {
1746
0
                    LOG_WARNING("add rowset warm up error ").error(st);
1747
0
                }
1748
0
            }},
1749
0
    });
1750
    // clang-format on
1751
0
}
1752
1753
void CloudTablet::_submit_inverted_index_download_task(const RowsetSharedPtr& rs,
1754
                                                       const StorageResource* storage_resource,
1755
                                                       const io::Path& idx_path, int64_t idx_size,
1756
0
                                                       int64_t expiration_time) {
1757
    // clang-format off
1758
0
    const auto& rowset_meta = rs->rowset_meta();
1759
0
    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
1760
    // Use rowset_meta->fs() instead of storage_resource->fs to support packed file for idx files.
1761
0
    auto file_system = rowset_meta->fs();
1762
0
    if (!file_system) {
1763
0
        LOG(WARNING) << "failed to get file system for tablet_id=" << _tablet_meta->tablet_id()
1764
0
                     << ", rowset_id=" << rowset_meta->rowset_id();
1765
0
        return;
1766
0
    }
1767
0
    io::DownloadFileMeta meta {
1768
0
            .path = idx_path,
1769
0
            .file_size = idx_size,
1770
0
            .file_system = file_system,
1771
0
            .ctx = {
1772
0
                    .expiration_time = expiration_time,
1773
0
                    .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
1774
0
                    .is_warmup = true
1775
0
            },
1776
0
            .download_done {[=](Status st) {
1777
0
                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_idx.callback.block", {
1778
0
                    auto sleep_time = dp->param<int>("sleep", 3);
1779
0
                    LOG_INFO(
1780
0
                            "[verbose] block download for "
1781
0
                            "rowset={}, inverted_idx_file={}, "
1782
0
                            "sleep={}",
1783
0
                            rs->rowset_id().to_string(), idx_path.string(), sleep_time);
1784
0
                    std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
1785
0
                });
1786
0
                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1);
1787
0
                if (!st) {
1788
0
                    LOG_WARNING("add rowset warm up error ").error(st);
1789
0
                }
1790
0
            }},
1791
0
    };
1792
0
    self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
1793
0
    _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
1794
0
    g_file_cache_cloud_tablet_submitted_index_num << 1;
1795
0
    g_file_cache_cloud_tablet_submitted_index_size << idx_size;
1796
    // clang-format on
1797
0
}
1798
1799
void CloudTablet::_add_rowsets_directly(std::vector<RowsetSharedPtr>& rowsets,
1800
302
                                        bool warmup_delta_data) {
1801
302
#ifdef BE_TEST
1802
302
    warmup_delta_data = false;
1803
302
#endif
1804
871
    for (auto& rs : rowsets) {
1805
871
        if (warmup_delta_data) {
1806
            // Pre-set encryption algorithm to avoid re-entrant get_tablet() call
1807
            // inside RowsetMeta::fs() which causes SingleFlight deadlock when the
1808
            // tablet is not yet cached (during initial load_tablet).
1809
0
            rs->rowset_meta()->set_encryption_algorithm(_tablet_meta->encryption_algorithm());
1810
0
            bool warm_up_state_updated = false;
1811
            // Warmup rowset data in background
1812
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
1813
0
                const auto& rowset_meta = rs->rowset_meta();
1814
0
                constexpr int64_t interval = 600; // 10 mins
1815
                // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
1816
                // So we need to filter out the old rowsets avoid to download the whole table.
1817
0
                if (::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
1818
0
                    continue;
1819
0
                }
1820
1821
0
                auto storage_resource = rowset_meta->remote_storage_resource();
1822
0
                if (!storage_resource) {
1823
0
                    LOG(WARNING) << storage_resource.error();
1824
0
                    continue;
1825
0
                }
1826
1827
0
                int64_t expiration_time = _tablet_meta->ttl_seconds() == 0 ||
1828
0
                                                          rowset_meta->newest_write_timestamp() <= 0
1829
0
                                                  ? 0
1830
0
                                                  : rowset_meta->newest_write_timestamp() +
1831
0
                                                            _tablet_meta->ttl_seconds();
1832
0
                g_file_cache_cloud_tablet_submitted_segment_num << 1;
1833
0
                if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
1834
0
                    g_file_cache_cloud_tablet_submitted_segment_size
1835
0
                            << rs->rowset_meta()->segment_file_size(seg_id);
1836
0
                }
1837
0
                if (!warm_up_state_updated) {
1838
0
                    VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
1839
0
                               << ") triggerd by sync rowset";
1840
0
                    if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
1841
0
                                                          WarmUpTriggerSource::SYNC_ROWSET)) {
1842
0
                        LOG(INFO) << "found duplicate warmup task for rowset " << rs->rowset_id()
1843
0
                                  << ", skip it";
1844
0
                        break;
1845
0
                    }
1846
0
                    warm_up_state_updated = true;
1847
0
                }
1848
1849
0
                if (!config::file_cache_enable_only_warm_up_idx) {
1850
0
                    _submit_segment_download_task(rs, storage_resource.value(), seg_id,
1851
0
                                                  expiration_time);
1852
0
                }
1853
1854
0
                auto schema_ptr = rowset_meta->tablet_schema();
1855
0
                auto idx_version = schema_ptr->get_inverted_index_storage_format();
1856
0
                if (idx_version == InvertedIndexStorageFormatPB::V1) {
1857
0
                    std::unordered_map<int64_t, int64_t> index_size_map;
1858
0
                    auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
1859
0
                    for (const auto& info : inverted_index_info.index_info()) {
1860
0
                        if (info.index_file_size() != -1) {
1861
0
                            index_size_map[info.index_id()] = info.index_file_size();
1862
0
                        } else {
1863
0
                            VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
1864
0
                                       << ", index_id " << info.index_id();
1865
0
                        }
1866
0
                    }
1867
0
                    for (const auto& index : schema_ptr->inverted_indexes()) {
1868
0
                        auto idx_path = storage_resource.value()->remote_idx_v1_path(
1869
0
                                *rowset_meta, seg_id, index->index_id(), index->get_index_suffix());
1870
0
                        _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path,
1871
0
                                                             index_size_map[index->index_id()],
1872
0
                                                             expiration_time);
1873
0
                    }
1874
0
                } else {
1875
0
                    if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
1876
0
                        auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
1877
0
                        int64_t idx_size = 0;
1878
0
                        if (inverted_index_info.has_index_size()) {
1879
0
                            idx_size = inverted_index_info.index_size();
1880
0
                        } else {
1881
0
                            VLOG_DEBUG << "index_size is not set for segment " << seg_id;
1882
0
                        }
1883
0
                        auto idx_path =
1884
0
                                storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
1885
0
                        _submit_inverted_index_download_task(rs, storage_resource.value(), idx_path,
1886
0
                                                             idx_size, expiration_time);
1887
0
                    }
1888
0
                }
1889
0
            }
1890
0
        }
1891
871
        _rs_version_map.emplace(rs->version(), rs);
1892
871
        _timestamped_version_tracker.add_version(rs->version());
1893
871
        _max_version = std::max(rs->end_version(), _max_version);
1894
871
        update_base_size(*rs);
1895
871
    }
1896
302
    _tablet_meta->add_rowsets_unchecked(rowsets);
1897
302
}
1898
1899
15
void CloudTablet::clear_unused_visible_pending_rowsets() {
1900
15
    int64_t cur_max_version = max_version().second;
1901
15
    int32_t max_version_count = max_version_config();
1902
15
    int64_t current_time = std::chrono::duration_cast<std::chrono::seconds>(
1903
15
                                   std::chrono::system_clock::now().time_since_epoch())
1904
15
                                   .count();
1905
1906
15
    std::unique_lock<std::mutex> wlock(_visible_pending_rs_lock);
1907
38
    for (auto it = _visible_pending_rs_map.begin(); it != _visible_pending_rs_map.end();) {
1908
23
        if (int64_t version = it->first, expiration_time = it->second.expiration_time;
1909
23
            version <= cur_max_version || expiration_time < current_time) {
1910
19
            it = _visible_pending_rs_map.erase(it);
1911
19
        } else {
1912
4
            ++it;
1913
4
        }
1914
23
    }
1915
1916
15
    while (!_visible_pending_rs_map.empty() && _visible_pending_rs_map.size() > max_version_count) {
1917
0
        _visible_pending_rs_map.erase(--_visible_pending_rs_map.end());
1918
0
    }
1919
15
}
1920
1921
void CloudTablet::try_make_committed_rs_visible(int64_t txn_id, int64_t visible_version,
1922
0
                                                int64_t version_update_time_ms) {
1923
0
    if (enable_unique_key_merge_on_write()) {
1924
        // for mow tablet, we get committed rowset from `CloudTxnDeleteBitmapCache` rather than `CommittedRowsetManager`
1925
0
        try_make_committed_rs_visible_for_mow(txn_id, visible_version, version_update_time_ms);
1926
0
        return;
1927
0
    }
1928
1929
0
    auto& committed_rs_mgr = _engine.committed_rs_mgr();
1930
0
    auto res = committed_rs_mgr.get_committed_rowset(txn_id, tablet_id());
1931
0
    if (!res.has_value()) {
1932
0
        return;
1933
0
    }
1934
0
    auto [rowset_meta, expiration_time] = res.value();
1935
0
    bool is_empty_rowset = (rowset_meta == nullptr);
1936
0
    if (!is_empty_rowset) {
1937
0
        rowset_meta->set_cloud_fields_after_visible(visible_version, version_update_time_ms);
1938
0
    }
1939
0
    {
1940
0
        std::lock_guard<std::mutex> lock(_visible_pending_rs_lock);
1941
0
        _visible_pending_rs_map.emplace(
1942
0
                visible_version,
1943
0
                VisiblePendingRowset {rowset_meta, expiration_time, is_empty_rowset});
1944
0
    }
1945
0
    apply_visible_pending_rowsets();
1946
0
    committed_rs_mgr.remove_committed_rowset(txn_id, tablet_id());
1947
0
}
1948
1949
void CloudTablet::try_make_committed_rs_visible_for_mow(int64_t txn_id, int64_t visible_version,
1950
0
                                                        int64_t version_update_time_ms) {
1951
0
    Defer defer {[&] {
1952
0
        _engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id, tablet_id());
1953
0
    }};
1954
0
    auto res = _engine.txn_delete_bitmap_cache().get_rowset_and_delete_bitmap(txn_id, tablet_id());
1955
0
    if (!res.has_value()) {
1956
0
        return;
1957
0
    }
1958
0
    auto [rowset, delete_bitmap] = res.value();
1959
0
    bool is_empty_rowset = (rowset == nullptr);
1960
0
    {
1961
0
        std::unique_lock lock {_sync_meta_lock};
1962
0
        std::unique_lock meta_wlock {_meta_lock};
1963
0
        if (_max_version + 1 != visible_version) {
1964
0
            return;
1965
0
        }
1966
0
        if (is_empty_rowset) {
1967
0
            Versions existing_versions;
1968
0
            for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
1969
0
                existing_versions.emplace_back(rs->version());
1970
0
            }
1971
0
            if (existing_versions.empty()) {
1972
0
                return;
1973
0
            }
1974
0
            auto max_version = std::ranges::max(existing_versions, {}, &Version::first);
1975
0
            auto prev_rowset = get_rowset_by_version(max_version);
1976
0
            auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
1977
0
                    this, visible_version, prev_rowset->rowset_meta(), &rowset);
1978
0
            if (!st.ok()) {
1979
0
                return;
1980
0
            }
1981
0
        } else {
1982
0
            for (const auto& [delete_bitmap_key, bitmap_value] : delete_bitmap->delete_bitmap) {
1983
                // skip sentinel mark, which is used for delete bitmap correctness check
1984
0
                if (std::get<1>(delete_bitmap_key) != DeleteBitmap::INVALID_SEGMENT_ID) {
1985
0
                    tablet_meta()->delete_bitmap().merge(
1986
0
                            {std::get<0>(delete_bitmap_key), std::get<1>(delete_bitmap_key),
1987
0
                             visible_version},
1988
0
                            bitmap_value);
1989
0
                }
1990
0
            }
1991
0
        }
1992
0
        rowset->rowset_meta()->set_cloud_fields_after_visible(visible_version,
1993
0
                                                              version_update_time_ms);
1994
0
        add_rowsets({rowset}, false, meta_wlock, true);
1995
0
    }
1996
0
    LOG(INFO) << "mow added visible pending rowset, txn_id=" << txn_id
1997
0
              << ", tablet_id=" << tablet_id() << ", version=" << visible_version
1998
0
              << ", rowset_id=" << rowset->rowset_id().to_string();
1999
0
}
2000
2001
15
void CloudTablet::apply_visible_pending_rowsets() {
2002
15
    Defer defer {[&] { clear_unused_visible_pending_rowsets(); }};
2003
2004
15
    std::unique_lock lock(_sync_meta_lock);
2005
15
    std::unique_lock<std::shared_mutex> meta_wlock(_meta_lock);
2006
15
    int64_t next_version = _max_version + 1;
2007
15
    std::vector<RowsetSharedPtr> to_add;
2008
15
    std::lock_guard<std::mutex> pending_lock(_visible_pending_rs_lock);
2009
15
    for (auto it = _visible_pending_rs_map.upper_bound(_max_version);
2010
31
         it != _visible_pending_rs_map.end(); ++it) {
2011
21
        int64_t version = it->first;
2012
21
        if (version != next_version) break;
2013
2014
17
        auto& pending_rs = it->second;
2015
17
        if (pending_rs.is_empty_rowset) {
2016
3
            RowsetSharedPtr prev_rowset {nullptr};
2017
3
            if (!to_add.empty()) {
2018
1
                prev_rowset = to_add.back();
2019
2
            } else {
2020
2
                Versions existing_versions;
2021
2
                for (const auto& [_, rs] : tablet_meta()->all_rs_metas()) {
2022
1
                    existing_versions.emplace_back(rs->version());
2023
1
                }
2024
2
                if (existing_versions.empty()) {
2025
1
                    break;
2026
1
                }
2027
1
                auto max_version = std::ranges::max(existing_versions, {}, &Version::first);
2028
1
                prev_rowset = get_rowset_by_version(max_version);
2029
1
            }
2030
2
            RowsetSharedPtr rowset;
2031
2
            auto st = _engine.meta_mgr().create_empty_rowset_for_hole(
2032
2
                    this, version, prev_rowset->rowset_meta(), &rowset);
2033
2
            if (!st.ok()) {
2034
0
                return;
2035
0
            }
2036
2
            to_add.push_back(std::move(rowset));
2037
14
        } else {
2038
14
            RowsetSharedPtr rowset;
2039
14
            auto st = RowsetFactory::create_rowset(nullptr, "", pending_rs.rowset_meta, &rowset);
2040
14
            if (!st.ok()) {
2041
0
                LOG(WARNING) << "failed to create rowset from pending rowset meta, tablet_id="
2042
0
                             << tablet_id() << ", version=" << version
2043
0
                             << ", rowset_id=" << pending_rs.rowset_meta->rowset_id().to_string()
2044
0
                             << ", error=" << st;
2045
0
                break;
2046
0
            }
2047
14
            to_add.push_back(std::move(rowset));
2048
14
        }
2049
16
        next_version++;
2050
16
    }
2051
15
    if (!to_add.empty()) {
2052
10
        add_rowsets(to_add, false, meta_wlock, true);
2053
10
        LOG_INFO(
2054
10
                "applied_visible_pending_rowsets, tablet_id={}, new_max_version={}, "
2055
10
                "count={}, new_rowsets={}",
2056
10
                tablet_id(), _max_version, to_add.size(),
2057
16
                fmt::join(to_add | std::views::transform([](const RowsetSharedPtr& rs) {
2058
16
                              return fmt::format("{}{}", rs->rowset_id().to_string(),
2059
16
                                                 rs->version().to_string());
2060
16
                          }),
2061
10
                          ","));
2062
10
    }
2063
15
}
2064
2065
#include "common/compile_check_end.h"
2066
2067
} // namespace doris