Coverage Report

Created: 2026-06-30 19:59

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/cloud/cloud_tablet.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/latency_recorder.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/rapidjson.h>
28
#include <rapidjson/stringbuffer.h>
29
30
#include <atomic>
31
#include <chrono>
32
#include <cstdint>
33
#include <memory>
34
#include <ranges>
35
#include <ratio>
36
#include <shared_mutex>
37
#include <unordered_map>
38
#include <vector>
39
40
#include "cloud/cloud_meta_mgr.h"
41
#include "cloud/cloud_storage_engine.h"
42
#include "cloud/cloud_tablet_mgr.h"
43
#include "cloud/cloud_warm_up_manager.h"
44
#include "common/cast_set.h"
45
#include "common/config.h"
46
#include "common/logging.h"
47
#include "cpp/sync_point.h"
48
#include "io/cache/block_file_cache.h"
49
#include "io/cache/block_file_cache_downloader.h"
50
#include "io/cache/block_file_cache_factory.h"
51
#include "io/cache/file_cache_expiration.h"
52
#include "olap/base_tablet.h"
53
#include "olap/compaction.h"
54
#include "olap/cumulative_compaction_time_series_policy.h"
55
#include "olap/olap_define.h"
56
#include "olap/rowset/beta_rowset.h"
57
#include "olap/rowset/rowset.h"
58
#include "olap/rowset/rowset_factory.h"
59
#include "olap/rowset/rowset_fwd.h"
60
#include "olap/rowset/rowset_writer.h"
61
#include "olap/rowset/segment_v2/inverted_index_desc.h"
62
#include "olap/storage_policy.h"
63
#include "olap/tablet_schema.h"
64
#include "olap/txn_manager.h"
65
#include "util/debug_points.h"
66
#include "util/stack_util.h"
67
#include "vec/common/schema_util.h"
68
69
namespace doris {
70
#include "common/compile_check_begin.h"
71
using namespace ErrorCode;
72
73
bvar::LatencyRecorder g_cu_compaction_get_delete_bitmap_lock_time_ms(
74
        "cu_compaction_get_delete_bitmap_lock_time_ms");
75
bvar::LatencyRecorder g_base_compaction_get_delete_bitmap_lock_time_ms(
76
        "base_compaction_get_delete_bitmap_lock_time_ms");
77
78
bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
79
bvar::Adder<int64_t> g_unused_rowsets_bytes("unused_rowsets_bytes");
80
81
bvar::Adder<int64_t> g_capture_prefer_cache_count("capture_prefer_cache_count");
82
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count(
83
        "capture_with_freshness_tolerance_count");
84
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count(
85
        "capture_with_freshness_tolerance_fallback_count");
86
bvar::Adder<int64_t> g_rowset_warmup_state_missing_count("rowset_warmup_state_missing_count");
87
bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window(
88
        "capture_prefer_cache_count_window", &g_capture_prefer_cache_count, 30);
89
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_count_window(
90
        "capture_with_freshness_tolerance_count_window", &g_capture_with_freshness_tolerance_count,
91
        30);
92
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_count_window(
93
        "capture_with_freshness_tolerance_fallback_count_window",
94
        &g_capture_with_freshness_tolerance_fallback_count, 30);
95
96
static constexpr int LOAD_INITIATOR_ID = -1;
97
98
namespace {
99
100
bool is_schema_change_output_rowset(const RowsetSharedPtr& rowset,
101
3
                                    const std::vector<RowsetSharedPtr>& output_rowsets) {
102
5
    return std::ranges::any_of(output_rowsets, [&rowset](const RowsetSharedPtr& output_rowset) {
103
5
        return output_rowset->rowset_id() == rowset->rowset_id();
104
5
    });
105
3
}
106
107
} // namespace
108
109
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
110
        "file_cache_cloud_tablet_submitted_segment_size");
111
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
112
        "file_cache_cloud_tablet_submitted_segment_num");
113
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
114
        "file_cache_cloud_tablet_submitted_index_size");
115
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
116
        "file_cache_cloud_tablet_submitted_index_num");
117
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
118
        "file_cache_cloud_tablet_finished_segment_size");
119
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
120
        "file_cache_cloud_tablet_finished_segment_num");
121
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
122
        "file_cache_cloud_tablet_finished_index_size");
123
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
124
        "file_cache_cloud_tablet_finished_index_num");
125
126
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
127
        "file_cache_recycle_cached_data_segment_num");
128
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
129
        "file_cache_recycle_cached_data_segment_size");
130
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
131
        "file_cache_recycle_cached_data_index_num");
132
133
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
134
        "file_cache_warm_up_segment_complete_num");
135
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
136
        "file_cache_warm_up_segment_failed_num");
137
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_complete_num(
138
        "file_cache_warm_up_inverted_idx_complete_num");
139
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_failed_num(
140
        "file_cache_warm_up_inverted_idx_failed_num");
141
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
142
        "file_cache_warm_up_rowset_complete_num");
143
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
144
        "file_cache_warm_up_rowset_triggered_by_job_num");
145
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
146
        "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
147
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num(
148
        "file_cache_warm_up_rowset_triggered_by_event_driven_num");
149
bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency(
150
        "file_cache_warm_up_rowset_all_segments_latency");
151
152
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
153
150
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
154
155
150
CloudTablet::~CloudTablet() = default;
156
157
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
158
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
159
0
}
160
161
19
std::string CloudTablet::tablet_path() const {
162
19
    return "";
163
19
}
164
165
Status CloudTablet::capture_rs_readers(const Version& spec_version,
166
                                       std::vector<RowSetSplits>* rs_splits,
167
0
                                       const CaptureRowsetOps& opts) {
168
0
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
169
0
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
170
0
        return Status::Error<false>(-230, "injected error");
171
0
    });
172
0
    std::shared_lock rlock(_meta_lock);
173
0
    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
174
0
            spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions}));
175
0
    return Status::OK();
176
0
}
177
178
[[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked(
179
43
        const Version& version_range, const CaptureRowsetOps& options) const {
180
43
    if (options.query_freshness_tolerance_ms > 0) {
181
24
        return capture_versions_with_freshness_tolerance(version_range, options);
182
24
    } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
183
13
        return capture_versions_prefer_cache(version_range);
184
13
    }
185
6
    return BaseTablet::capture_consistent_versions_unlocked(version_range, options);
186
43
}
187
188
Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
189
13
        const Version& spec_version) const {
190
13
    g_capture_prefer_cache_count << 1;
191
13
    Versions version_path;
192
13
    std::shared_lock rlock(_meta_lock);
193
13
    auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache(
194
13
            spec_version, version_path,
195
92
            [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); });
196
13
    if (!st.ok()) {
197
0
        return ResultError(st);
198
0
    }
199
13
    int64_t path_max_version = version_path.back().second;
200
13
    VLOG_DEBUG << fmt::format(
201
0
            "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, "
202
0
            "tablet_id={}, spec_version={}, path_max_version={}",
203
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
204
0
                          return fmt::format("{}", version.to_string());
205
0
                      }),
206
0
                      ", "),
207
0
            tablet_id(), spec_version.to_string(), path_max_version);
208
13
    return version_path;
209
13
}
210
211
230
bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const {
212
230
    if (start_version > end_version) {
213
0
        return false;
214
0
    }
215
230
    Version version {start_version, end_version};
216
230
    auto it = _rs_version_map.find(version);
217
230
    if (it == _rs_version_map.end()) {
218
78
        it = _stale_rs_version_map.find(version);
219
78
        if (it == _stale_rs_version_map.end()) {
220
0
            LOG_WARNING(
221
0
                    "fail to find Rowset in rs_version or stale_rs_version for version. "
222
0
                    "tablet={}, version={}",
223
0
                    tablet_id(), version.to_string());
224
0
            return false;
225
0
        }
226
78
    }
227
230
    const auto& rs = it->second;
228
230
    if (rs->visible_timestamp() < _engine.startup_timepoint()) {
229
        // We only care about rowsets that are created after startup time point. For other rowsets,
230
        // we assume they are warmed up.
231
13
        return true;
232
13
    }
233
217
    return is_rowset_warmed_up(rs->rowset_id());
234
230
};
235
236
Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance(
237
24
        const Version& spec_version, const CaptureRowsetOps& options) const {
238
24
    g_capture_with_freshness_tolerance_count << 1;
239
24
    using namespace std::chrono;
240
24
    auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
241
24
    auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms);
242
    // find a version path where every edge(rowset) has been warmuped
243
24
    Versions version_path;
244
24
    std::shared_lock rlock(_meta_lock);
245
24
    if (enable_unique_key_merge_on_write()) {
246
        // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
247
        // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue.
248
12
        RETURN_IF_ERROR_RESULT(
249
12
                _timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
250
12
                        spec_version, version_path, [&](int64_t start, int64_t end) {
251
12
                            return rowset_is_warmed_up_unlocked(start, end);
252
12
                        }));
253
12
    } else {
254
12
        RETURN_IF_ERROR_RESULT(
255
12
                _timestamped_version_tracker.capture_consistent_versions_with_validator(
256
12
                        spec_version, version_path, [&](int64_t start, int64_t end) {
257
12
                            return rowset_is_warmed_up_unlocked(start, end);
258
12
                        }));
259
12
    }
260
24
    int64_t path_max_version = version_path.back().second;
261
417
    auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) -> bool {
262
417
        if (rs_meta->version() == Version {0, 1}) {
263
            // skip rowset[0-1]
264
22
            return false;
265
22
        }
266
395
        bool ret = rs_meta->start_version() > path_max_version &&
267
395
                   rs_meta->visible_timestamp() < freshness_limit_tp;
268
395
        if (ret && config::read_cluster_cache_opt_verbose_log) {
269
5
            std::time_t t1 = system_clock::to_time_t(rs_meta->visible_timestamp());
270
5
            std::tm tm1 = *std::localtime(&t1);
271
5
            std::ostringstream oss1;
272
5
            oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S");
273
274
5
            std::time_t t2 = system_clock::to_time_t(freshness_limit_tp);
275
5
            std::tm tm2 = *std::localtime(&t2);
276
5
            std::ostringstream oss2;
277
5
            oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S");
278
5
            LOG_INFO(
279
5
                    "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, "
280
5
                    "find a rowset which should be visible but not warmed up, tablet_id={}, "
281
5
                    "path_max_version={}, rowset_id={}, version={}, visible_time={}, "
282
5
                    "freshness_limit={}, version_graph={}, rowset_warmup_digest={}",
283
5
                    tablet_id(), path_max_version, rs_meta->rowset_id().to_string(),
284
5
                    rs_meta->version().to_string(), oss1.str(), oss2.str(),
285
5
                    _timestamped_version_tracker.debug_string(), rowset_warmup_digest());
286
5
        }
287
395
        return ret;
288
417
    };
289
    // use std::views::concat after C++26
290
24
    bool should_fallback =
291
24
            std::ranges::any_of(std::views::values(_tablet_meta->all_rs_metas()),
292
24
                                should_be_visible_but_not_warmed_up) ||
293
24
            std::ranges::any_of(std::views::values(_tablet_meta->all_stale_rs_metas()),
294
22
                                should_be_visible_but_not_warmed_up);
295
24
    if (should_fallback) {
296
5
        rlock.unlock();
297
5
        g_capture_with_freshness_tolerance_fallback_count << 1;
298
        // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version
299
        // but has not been warmuped up yet, fallback to capture rowsets as usual
300
5
        return BaseTablet::capture_consistent_versions_unlocked(spec_version, options);
301
5
    }
302
19
    VLOG_DEBUG << fmt::format(
303
0
            "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, "
304
0
            "tablet_id={}, spec_version={}, path_max_version={}",
305
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
306
0
                          return fmt::format("{}", version.to_string());
307
0
                      }),
308
0
                      ", "),
309
0
            tablet_id(), spec_version.to_string(), path_max_version);
310
19
    return version_path;
311
24
}
312
313
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
314
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
315
26
Status CloudTablet::sync_rowsets(const SyncOptions& options, SyncRowsetStats* stats) {
316
26
    RETURN_IF_ERROR(sync_if_not_running(stats));
317
318
26
    if (options.query_version > 0) {
319
0
        DBUG_EXECUTE_IF("CloudTablet::sync_rowsets.stale_local_max_for_query_version", {
320
0
            auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
321
0
            auto stale_version = dp->param<int64_t>("version", -1);
322
0
            if (target_tablet_id == tablet_id() && stale_version >= 0) {
323
0
                std::unique_lock wlock(_meta_lock);
324
0
                LOG(INFO) << "override cloud tablet local max_version for query_version sync"
325
0
                          << ", tablet_id=" << tablet_id() << ", old_max_version=" << _max_version
326
0
                          << ", stale_version=" << stale_version
327
0
                          << ", query_version=" << options.query_version;
328
0
                _max_version = stale_version;
329
0
            }
330
0
        });
331
0
        auto lock_start = std::chrono::steady_clock::now();
332
0
        std::shared_lock rlock(_meta_lock);
333
0
        if (stats) {
334
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
335
0
                                                std::chrono::steady_clock::now() - lock_start)
336
0
                                                .count();
337
0
        }
338
0
        if (_max_version >= options.query_version) {
339
0
            return Status::OK();
340
0
        }
341
0
    }
342
343
    // serially execute sync to reduce unnecessary network overhead
344
26
    auto sync_lock_start = std::chrono::steady_clock::now();
345
26
    std::unique_lock lock(_sync_meta_lock);
346
26
    if (stats) {
347
0
        stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
348
0
                                                 std::chrono::steady_clock::now() - sync_lock_start)
349
0
                                                 .count();
350
0
    }
351
26
    if (options.query_version > 0) {
352
0
        auto lock_start = std::chrono::steady_clock::now();
353
0
        std::shared_lock rlock(_meta_lock);
354
0
        if (stats) {
355
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
356
0
                                                std::chrono::steady_clock::now() - lock_start)
357
0
                                                .count();
358
0
        }
359
0
        if (_max_version >= options.query_version) {
360
0
            return Status::OK();
361
0
        }
362
0
    }
363
364
26
    auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, options, stats);
365
26
    if (st.is<ErrorCode::NOT_FOUND>()) {
366
0
        clear_cache();
367
0
    }
368
369
26
    return st;
370
26
}
371
372
// Sync tablet meta and all rowset meta if not running.
373
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
374
// It should be a quite rare situation.
375
26
Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
376
26
    if (tablet_state() == TABLET_RUNNING) {
377
21
        return Status::OK();
378
21
    }
379
380
    // Serially execute sync to reduce unnecessary network overhead
381
5
    auto sync_lock_start = std::chrono::steady_clock::now();
382
5
    std::unique_lock lock(_sync_meta_lock);
383
5
    if (stats) {
384
0
        stats->sync_meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
385
0
                                                 std::chrono::steady_clock::now() - sync_lock_start)
386
0
                                                 .count();
387
0
    }
388
389
5
    {
390
5
        auto lock_start = std::chrono::steady_clock::now();
391
5
        std::shared_lock rlock(_meta_lock);
392
5
        if (stats) {
393
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
394
0
                                                std::chrono::steady_clock::now() - lock_start)
395
0
                                                .count();
396
0
        }
397
5
        if (tablet_state() == TABLET_RUNNING) {
398
0
            return Status::OK();
399
0
        }
400
5
    }
401
402
5
    TabletMetaSharedPtr tablet_meta;
403
5
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
404
5
    if (!st.ok()) {
405
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
406
0
            clear_cache();
407
0
        }
408
0
        return st;
409
0
    }
410
411
5
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
412
        // MoW may go to here when load while schema change
413
5
        return Status::OK();
414
5
    }
415
416
0
    TimestampedVersionTracker empty_tracker;
417
0
    {
418
0
        auto lock_start = std::chrono::steady_clock::now();
419
0
        std::lock_guard wlock(_meta_lock);
420
0
        if (stats) {
421
0
            stats->meta_lock_wait_ns += std::chrono::duration_cast<std::chrono::nanoseconds>(
422
0
                                                std::chrono::steady_clock::now() - lock_start)
423
0
                                                .count();
424
0
        }
425
0
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
426
0
        _rs_version_map.clear();
427
0
        _stale_rs_version_map.clear();
428
0
        std::swap(_timestamped_version_tracker, empty_tracker);
429
0
        _tablet_meta->clear_rowsets();
430
0
        _tablet_meta->clear_stale_rowset();
431
0
        _max_version = -1;
432
0
    }
433
434
0
    st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, {}, stats);
435
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
436
0
        clear_cache();
437
0
    }
438
0
    return st;
439
0
}
440
441
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
442
                              std::unique_lock<std::shared_mutex>& meta_lock,
443
297
                              bool warmup_delta_data) {
444
297
    if (to_add.empty()) {
445
2
        return;
446
2
    }
447
448
295
    VLOG_DEBUG << "add_rowsets tablet_id=" << tablet_id() << " stack: " << get_stack_trace();
449
450
295
    auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
451
865
        for (auto& rs : rowsets) {
452
865
            if (warmup_delta_data) {
453
#ifndef BE_TEST
454
                // Pre-set encryption algorithm to avoid re-entrant get_tablet() call
455
                // inside RowsetMeta::fs() which causes SingleFlight deadlock when the
456
                // tablet is not yet cached (during initial load_tablet).
457
                rs->rowset_meta()->set_encryption_algorithm(_tablet_meta->encryption_algorithm());
458
                bool warm_up_state_updated = false;
459
                // Warmup rowset data in background
460
                for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
461
                    const auto& rowset_meta = rs->rowset_meta();
462
                    constexpr int64_t interval = 600; // 10 mins
463
                    // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
464
                    // So we need to filter out the old rowsets avoid to download the whole table.
465
                    if (warmup_delta_data &&
466
                        ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
467
                        continue;
468
                    }
469
470
                    auto storage_resource = rowset_meta->remote_storage_resource();
471
                    if (!storage_resource) {
472
                        LOG(WARNING) << storage_resource.error();
473
                        continue;
474
                    }
475
476
                    int64_t expiration_time = io::calc_file_cache_expiration_time(
477
                            _tablet_meta->creation_time(), _tablet_meta->ttl_seconds());
478
                    g_file_cache_cloud_tablet_submitted_segment_num << 1;
479
                    if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
480
                        g_file_cache_cloud_tablet_submitted_segment_size
481
                                << rs->rowset_meta()->segment_file_size(seg_id);
482
                    }
483
                    if (!warm_up_state_updated) {
484
                        VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
485
                                   << ") triggerd by sync rowset";
486
                        if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
487
                                                              WarmUpTriggerSource::SYNC_ROWSET)) {
488
                            LOG(INFO) << "found duplicate warmup task for rowset "
489
                                      << rs->rowset_id() << ", skip it";
490
                            break;
491
                        }
492
                        warm_up_state_updated = true;
493
                    }
494
                    // clang-format off
495
                    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
496
                    auto file_system = rowset_meta->fs();
497
                    if (!file_system) {
498
                        LOG(WARNING) << "failed to get file system for tablet_id="
499
                                     << _tablet_meta->tablet_id() << ", rowset_id="
500
                                     << rowset_meta->rowset_id();
501
                        continue;
502
                    }
503
                    if (!config::file_cache_enable_only_warm_up_idx) {
504
                        _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
505
                                .path = storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
506
                                .file_size = rs->rowset_meta()->segment_file_size(seg_id),
507
                                .file_system = file_system,
508
                                .ctx =
509
                                        {
510
                                                .expiration_time = expiration_time,
511
                                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
512
                                                .is_warmup = true
513
                                        },
514
                                .download_done {[=](Status st) {
515
                                    DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
516
                                                if (rs->version().second > rs->version().first) {
517
                                                    auto sleep_time = dp->param<int>("sleep", 3);
518
                                                    LOG_INFO(
519
                                                            "[verbose] block download for rowset={}, "
520
                                                            "version={}, sleep={}",
521
                                                            rs->rowset_id().to_string(),
522
                                                            rs->version().to_string(), sleep_time);
523
                                                    std::this_thread::sleep_for(
524
                                                            std::chrono::seconds(sleep_time));
525
                                                }
526
                                    });
527
                                    self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
528
                                    if (!st) {
529
                                        LOG_WARNING("add rowset warm up error ").error(st);
530
                                    }
531
                                }},
532
                                .tablet_id = _tablet_meta->tablet_id(),
533
                        });
534
                    }
535
536
                    auto download_idx_file = [&, self](const io::Path& idx_path, int64_t idx_size) {
537
                        io::DownloadFileMeta meta {
538
                                .path = idx_path,
539
                                .file_size = idx_size,
540
                                .file_system = file_system,
541
                                .ctx =
542
                                        {
543
                                                .expiration_time = expiration_time,
544
                                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
545
                                                .is_warmup = true
546
                                        },
547
                                .download_done {[=](Status st) {
548
                                    DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_idx.callback.block", {
549
                                                // clang-format on
550
                                                auto sleep_time = dp->param<int>("sleep", 3);
551
                                                LOG_INFO(
552
                                                        "[verbose] block download for "
553
                                                        "rowset={}, inverted_idx_file={}, "
554
                                                        "sleep={}",
555
                                                        rs->rowset_id().to_string(),
556
                                                        idx_path.string(), sleep_time);
557
                                                std::this_thread::sleep_for(
558
                                                        std::chrono::seconds(sleep_time));
559
                                                // clang-format off
560
                                    });
561
                                    self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1);
562
                                    if (!st) {
563
                                        LOG_WARNING("add rowset warm up error ").error(st);
564
                                    }
565
                                }},
566
                                .tablet_id = _tablet_meta->tablet_id(),
567
                        };
568
                        self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
569
                        _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
570
                        g_file_cache_cloud_tablet_submitted_index_num << 1;
571
                        g_file_cache_cloud_tablet_submitted_index_size << idx_size;
572
                    };
573
                    // clang-format on
574
                    auto schema_ptr = rowset_meta->tablet_schema();
575
                    auto idx_version = schema_ptr->get_inverted_index_storage_format();
576
                    if (idx_version == InvertedIndexStorageFormatPB::V1) {
577
                        std::unordered_map<int64_t, int64_t> index_size_map;
578
                        auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
579
                        for (const auto& info : inverted_index_info.index_info()) {
580
                            if (info.index_file_size() != -1) {
581
                                index_size_map[info.index_id()] = info.index_file_size();
582
                            } else {
583
                                VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
584
                                           << ", index_id " << info.index_id();
585
                            }
586
                        }
587
                        for (const auto& index : schema_ptr->inverted_indexes()) {
588
                            auto idx_path = storage_resource.value()->remote_idx_v1_path(
589
                                    *rowset_meta, seg_id, index->index_id(),
590
                                    index->get_index_suffix());
591
                            download_idx_file(idx_path, index_size_map[index->index_id()]);
592
                        }
593
                    } else {
594
                        if (schema_ptr->has_inverted_index() || schema_ptr->has_ann_index()) {
595
                            auto&& inverted_index_info =
596
                                    rowset_meta->inverted_index_file_info(seg_id);
597
                            int64_t idx_size = 0;
598
                            if (inverted_index_info.has_index_size()) {
599
                                idx_size = inverted_index_info.index_size();
600
                            } else {
601
                                VLOG_DEBUG << "index_size is not set for segment " << seg_id;
602
                            }
603
                            auto idx_path = storage_resource.value()->remote_idx_v2_path(
604
                                    *rowset_meta, seg_id);
605
                            download_idx_file(idx_path, idx_size);
606
                        }
607
                    }
608
                }
609
#endif
610
0
            }
611
865
            _rs_version_map.emplace(rs->version(), rs);
612
865
            _timestamped_version_tracker.add_version(rs->version());
613
865
            _max_version = std::max(rs->end_version(), _max_version);
614
865
            update_base_size(*rs);
615
865
        }
616
295
        _tablet_meta->add_rowsets_unchecked(rowsets);
617
295
    };
618
619
295
    if (!version_overlap) {
620
294
        add_rowsets_directly(to_add);
621
294
        return;
622
294
    }
623
624
    // Filter out existed rowsets
625
1
    auto remove_it =
626
2
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
627
2
                if (auto find_it = _rs_version_map.find(rs->version());
628
2
                    find_it == _rs_version_map.end()) {
629
0
                    return false;
630
2
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
631
2
                    return true; // Same rowset
632
2
                }
633
634
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
635
                // replace existed rowset with `to_add` rowset. This may occur when:
636
                //  1. schema change converts rowsets which have been double written to new tablet
637
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
638
639
                // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
640
641
0
                std::vector<RowsetSharedPtr> unused_rowsets;
642
0
                if (auto find_it = _rs_version_map.find(rs->version());
643
0
                    find_it != _rs_version_map.end()) {
644
0
                    if (find_it->second->rowset_id() == rs->rowset_id()) {
645
0
                        LOG(WARNING) << "tablet_id=" << tablet_id()
646
0
                                     << ", rowset_id=" << rs->rowset_id().to_string()
647
0
                                     << ", existed rowset_id="
648
0
                                     << find_it->second->rowset_id().to_string();
649
0
                        DCHECK(find_it->second->rowset_id() != rs->rowset_id())
650
0
                                << "tablet_id=" << tablet_id()
651
0
                                << ", rowset_id=" << rs->rowset_id().to_string()
652
0
                                << ", existed rowset_id="
653
0
                                << find_it->second->rowset_id().to_string();
654
0
                    }
655
0
                    unused_rowsets.push_back(find_it->second);
656
0
                }
657
0
                add_unused_rowsets(unused_rowsets);
658
659
0
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
660
0
                _rs_version_map[rs->version()] = rs;
661
0
                _tablet_meta->add_rowsets_unchecked({rs});
662
0
                update_base_size(*rs);
663
0
                return true;
664
2
            });
665
666
1
    to_add.erase(remove_it, to_add.end());
667
668
    // delete rowsets with overlapped version
669
1
    std::vector<RowsetSharedPtr> to_add_directly;
670
1
    for (auto& to_add_rs : to_add) {
671
        // delete rowsets with overlapped version
672
0
        std::vector<RowsetSharedPtr> to_delete;
673
0
        Version to_add_v = to_add_rs->version();
674
        // if start_version  > max_version, we can skip checking overlap here.
675
0
        if (to_add_v.first > _max_version) {
676
            // if start_version  > max_version, we can skip checking overlap here.
677
0
            to_add_directly.push_back(to_add_rs);
678
0
        } else {
679
0
            to_add_directly.push_back(to_add_rs);
680
0
            for (auto& [v, rs] : _rs_version_map) {
681
0
                if (to_add_v.contains(v)) {
682
0
                    to_delete.push_back(rs);
683
0
                }
684
0
            }
685
0
            delete_rowsets(to_delete, meta_lock);
686
0
        }
687
0
    }
688
689
1
    add_rowsets_directly(to_add_directly);
690
1
}
691
692
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
693
105
                                 std::unique_lock<std::shared_mutex>&) {
694
105
    if (to_delete.empty()) {
695
0
        return;
696
0
    }
697
105
    std::vector<RowsetMetaSharedPtr> rs_metas;
698
105
    rs_metas.reserve(to_delete.size());
699
105
    int64_t now = ::time(nullptr);
700
607
    for (auto&& rs : to_delete) {
701
607
        rs->rowset_meta()->set_stale_at(now);
702
607
        rs_metas.push_back(rs->rowset_meta());
703
607
        _stale_rs_version_map[rs->version()] = rs;
704
607
    }
705
105
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
706
607
    for (auto&& rs : to_delete) {
707
607
        _rs_version_map.erase(rs->version());
708
607
    }
709
710
105
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
711
105
}
712
713
void CloudTablet::delete_rowsets_for_schema_change(const std::vector<RowsetSharedPtr>& to_delete,
714
                                                   std::unique_lock<std::shared_mutex>&,
715
5
                                                   bool recycle_deleted_rowsets) {
716
5
    if (to_delete.empty()) {
717
1
        return;
718
1
    }
719
4
    std::vector<RowsetMetaSharedPtr> rs_metas;
720
4
    rs_metas.reserve(to_delete.size());
721
5
    for (auto&& rs : to_delete) {
722
5
        rs_metas.push_back(rs->rowset_meta());
723
5
        _rs_version_map.erase(rs->version());
724
        // Remove edge from version graph so that the greedy capture algorithm
725
        // won't prefer the wider stale compaction rowset over individual SC
726
        // output rowsets (e.g. [818-822] vs [818],[819],...,[822]).
727
5
        _timestamped_version_tracker.delete_version(rs->version());
728
5
    }
729
730
    // Use same_version=true to skip adding to _stale_rs_metas. Do NOT use the
731
    // stale tracking mechanism (_stale_rs_version_map / _stale_version_path_map)
732
    // because SC output will create new rowsets with identical version ranges;
733
    // a later compaction could put those into stale as well, causing two stale
734
    // paths to reference the same version key -- when one path is cleaned first,
735
    // the other hits a DCHECK(false) in delete_expired_stale_rowsets().
736
4
    _tablet_meta->modify_rs_metas({}, rs_metas, true);
737
738
4
    if (recycle_deleted_rowsets) {
739
        // Schedule for direct cache cleanup. MS has already recycled these rowsets.
740
3
        add_unused_rowsets(to_delete);
741
3
    }
742
4
}
743
744
void CloudTablet::replace_rowsets_with_schema_change_output(
745
        const std::vector<RowsetSharedPtr>& output_rowsets, int64_t alter_version,
746
        std::unique_lock<std::shared_mutex>& meta_lock, const char* stage,
747
2
        bool recycle_deleted_rowsets) {
748
2
    std::vector<RowsetSharedPtr> to_delete;
749
7
    for (auto& [v, rs] : _rs_version_map) {
750
7
        if (v.first >= 2 && v.second <= alter_version &&
751
7
            !is_schema_change_output_rowset(rs, output_rowsets)) {
752
1
            to_delete.push_back(rs);
753
1
        }
754
7
    }
755
2
    if (!to_delete.empty()) {
756
1
        LOG_INFO(
757
1
                "schema change: delete {} local rowsets in [2, {}] before adding SC output, "
758
1
                "tablet_id={}, stage={}, versions=[{}]",
759
1
                to_delete.size(), alter_version, tablet_id(), stage,
760
1
                fmt::join(to_delete | std::views::transform([](const auto& rs) {
761
1
                              return rs->version().to_string();
762
1
                          }),
763
1
                          ", "));
764
1
        delete_rowsets_for_schema_change(to_delete, meta_lock, recycle_deleted_rowsets);
765
1
    }
766
2
    add_rowsets(output_rowsets, true, meta_lock, false);
767
2
}
768
769
1
uint64_t CloudTablet::delete_expired_stale_rowsets() {
770
1
    if (config::enable_mow_verbose_log) {
771
0
        LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
772
0
    }
773
1
    std::vector<RowsetSharedPtr> expired_rowsets;
774
    // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
775
1
    std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
776
1
    int64_t expired_stale_sweep_endtime =
777
1
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
778
1
    {
779
1
        std::unique_lock wlock(_meta_lock);
780
781
1
        std::vector<int64_t> path_ids;
782
        // capture the path version to delete
783
1
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
784
785
1
        if (path_ids.empty()) {
786
0
            return 0;
787
0
        }
788
789
1
        for (int64_t path_id : path_ids) {
790
1
            int64_t start_version = -1;
791
1
            int64_t end_version = -1;
792
1
            std::vector<RowsetSharedPtr> stale_rowsets;
793
            // delete stale versions in version graph
794
1
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
795
5
            for (auto& v_ts : version_path->timestamped_versions()) {
796
5
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
797
5
                if (rs_it != _stale_rs_version_map.end()) {
798
5
                    expired_rowsets.push_back(rs_it->second);
799
5
                    stale_rowsets.push_back(rs_it->second);
800
5
                    VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id()
801
0
                               << " rowset_id=" << rs_it->second->rowset_id().to_string()
802
0
                               << " version=" << rs_it->first.to_string();
803
5
                    _stale_rs_version_map.erase(rs_it);
804
5
                } else {
805
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
806
0
                                 << tablet_id();
807
                    // clang-format off
808
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
809
                    // clang-format on
810
0
                }
811
5
                if (start_version < 0) {
812
1
                    start_version = v_ts->version().first;
813
1
                }
814
5
                end_version = v_ts->version().second;
815
5
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
816
5
            }
817
1
            Version version(start_version, end_version);
818
1
            if (!stale_rowsets.empty()) {
819
1
                deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
820
1
            }
821
1
        }
822
1
        _reconstruct_version_tracker_if_necessary();
823
1
    }
824
825
    // if the rowset is not used by any query, we can recycle its cached data early.
826
0
    auto recycled_rowsets = recycle_cached_data(expired_rowsets);
827
1
    if (!recycled_rowsets.empty()) {
828
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
829
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
830
0
    }
831
1
    if (config::enable_mow_verbose_log) {
832
0
        LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
833
0
    }
834
835
1
    add_unused_rowsets(expired_rowsets);
836
1
    if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS &&
837
1
        enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) {
838
        // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
839
0
        OlapStopWatch watch;
840
0
        for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
841
            // agg delete bitmap for pre rowset
842
0
            DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
843
0
            agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
844
            // add remove delete bitmap
845
0
            if (!remove_delete_bitmap_key_ranges.empty()) {
846
0
                std::vector<RowsetId> rowset_ids;
847
0
                for (const auto& rs : unused_rowsets) {
848
0
                    rowset_ids.push_back(rs->rowset_id());
849
0
                }
850
0
                std::lock_guard<std::mutex> lock(_gc_mutex);
851
0
                _unused_delete_bitmap.push_back(
852
0
                        std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
853
0
            }
854
0
        }
855
0
        LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
856
0
                  << ", size=" << deleted_stale_rowsets.size()
857
0
                  << ", cost(us)=" << watch.get_elapse_time_us();
858
0
    }
859
1
    return expired_rowsets.size();
860
1
}
861
862
2
bool CloudTablet::need_remove_unused_rowsets() {
863
2
    std::lock_guard<std::mutex> lock(_gc_mutex);
864
2
    return !_unused_rowsets.empty() || !_unused_delete_bitmap.empty();
865
2
}
866
867
4
void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
868
4
    std::lock_guard<std::mutex> lock(_gc_mutex);
869
9
    for (const auto& rowset : rowsets) {
870
9
        _unused_rowsets[rowset->rowset_id()] = rowset;
871
9
        g_unused_rowsets_bytes << rowset->total_disk_size();
872
9
    }
873
4
    g_unused_rowsets_count << rowsets.size();
874
4
}
875
876
0
void CloudTablet::remove_unused_rowsets() {
877
0
    std::vector<std::shared_ptr<Rowset>> removed_rowsets;
878
0
    int64_t removed_delete_bitmap_num = 0;
879
0
    OlapStopWatch watch;
880
0
    {
881
0
        std::lock_guard<std::mutex> lock(_gc_mutex);
882
        // 1. remove unused rowsets's cache data and delete bitmap
883
0
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
884
0
            auto& rs = it->second;
885
0
            if (rs.use_count() > 1) {
886
0
                LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
887
0
                             << " has " << rs.use_count() << " references, it cannot be removed";
888
0
                ++it;
889
0
                continue;
890
0
            }
891
0
            tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
892
0
            _rowset_warm_up_states.erase(rs->rowset_id());
893
0
            rs->clear_cache();
894
0
            g_unused_rowsets_count << -1;
895
0
            g_unused_rowsets_bytes << -rs->total_disk_size();
896
0
            removed_rowsets.push_back(std::move(rs));
897
0
            it = _unused_rowsets.erase(it);
898
0
        }
899
0
    }
900
901
0
    {
902
0
        std::vector<RecycledRowsets> recycled_rowsets;
903
904
0
        for (auto& rs : removed_rowsets) {
905
0
            auto index_names = rs->get_index_file_names();
906
0
            recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
907
0
            int64_t segment_size_sum = 0;
908
0
            for (int32_t i = 0; i < rs->num_segments(); i++) {
909
0
                segment_size_sum += rs->rowset_meta()->segment_file_size(i);
910
0
            }
911
0
            g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
912
0
            g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
913
0
            g_file_cache_recycle_cached_data_index_num << index_names.size();
914
0
        }
915
916
0
        if (recycled_rowsets.size() > 0) {
917
0
            auto& manager =
918
0
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
919
0
            manager.recycle_cache(tablet_id(), recycled_rowsets);
920
0
        }
921
0
    }
922
923
0
    {
924
0
        std::lock_guard<std::mutex> lock(_gc_mutex);
925
        // 2. remove delete bitmap of pre rowsets
926
0
        for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
927
0
            auto& rowset_ids = std::get<0>(*it);
928
0
            bool find_unused_rowset = false;
929
0
            for (const auto& rowset_id : rowset_ids) {
930
0
                if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
931
0
                    LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
932
0
                              << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
933
0
                    find_unused_rowset = true;
934
0
                    break;
935
0
                }
936
0
            }
937
0
            if (find_unused_rowset) {
938
0
                ++it;
939
0
                continue;
940
0
            }
941
0
            auto& key_ranges = std::get<1>(*it);
942
0
            tablet_meta()->delete_bitmap().remove(key_ranges);
943
0
            it = _unused_delete_bitmap.erase(it);
944
0
            removed_delete_bitmap_num++;
945
            // TODO(kaijie): recycle cache for unused delete bitmap
946
0
        }
947
0
    }
948
949
0
    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
950
0
              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
951
0
              << ", removed_rowsets_num=" << removed_rowsets.size()
952
0
              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
953
0
              << ", cost(us)=" << watch.get_elapse_time_us();
954
0
}
955
956
865
void CloudTablet::update_base_size(const Rowset& rs) {
957
    // Define base rowset as the rowset of version [2-x]
958
865
    if (rs.start_version() == 2) {
959
117
        _base_size = rs.total_disk_size();
960
117
    }
961
865
}
962
963
0
void CloudTablet::clear_cache() {
964
0
    auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
965
0
    if (!recycled_rowsets.empty()) {
966
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
967
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
968
0
    }
969
0
    _engine.tablet_mgr().erase_tablet(tablet_id());
970
0
}
971
972
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
973
1
        const std::vector<RowsetSharedPtr>& rowsets) {
974
1
    std::vector<RecycledRowsets> recycled_rowsets;
975
5
    for (const auto& rs : rowsets) {
976
        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
977
5
        if (rs.use_count() > 2) {
978
5
            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
979
5
                         << " references. File Cache won't be recycled when query is using it.";
980
5
            continue;
981
5
        }
982
0
        rs->clear_cache();
983
0
        auto index_names = rs->get_index_file_names();
984
0
        recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
985
986
0
        int64_t segment_size_sum = 0;
987
0
        for (int32_t i = 0; i < rs->num_segments(); i++) {
988
0
            segment_size_sum += rs->rowset_meta()->segment_file_size(i);
989
0
        }
990
0
        g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
991
0
        g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
992
0
        g_file_cache_recycle_cached_data_index_num << index_names.size();
993
0
    }
994
1
    return recycled_rowsets;
995
1
}
996
997
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
998
1
                                          int64_t num_rows, int64_t data_size) {
999
1
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
1000
1
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
1001
1
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
1002
1
    int64_t cumu_num_deltas = 0;
1003
1
    int64_t cumu_num_rowsets = 0;
1004
1
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
1005
3
    for (auto& [v, r] : _rs_version_map) {
1006
3
        if (v.second < cp) {
1007
0
            continue;
1008
0
        }
1009
3
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
1010
3
        ++cumu_num_rowsets;
1011
3
    }
1012
    // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets
1013
    // in the version map, so we use the max value to ensure that the approximate number
1014
    // of rowsets is at least the size of _rs_version_map.
1015
    // Note that this is not the exact number of rowsets, but an approximate number.
1016
1
    int64_t approximate_num_rowsets =
1017
1
            std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size()));
1018
1
    _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed);
1019
1
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
1020
1
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
1021
1
}
1022
1023
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
1024
0
        RowsetWriterContext& context, bool vertical) {
1025
0
    context.rowset_id = _engine.next_rowset_id();
1026
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
1027
0
    context.tablet_id = tablet_id();
1028
0
    context.index_id = index_id();
1029
0
    context.partition_id = partition_id();
1030
0
    context.file_cache_base_timestamp = tablet_meta()->creation_time();
1031
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
1032
0
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
1033
0
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
1034
0
}
1035
1036
// create a rowset writer with rowset_id and seg_id
1037
// after writer, merge this transient rowset with original rowset
1038
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
1039
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
1040
0
        int64_t txn_expiration) {
1041
0
    if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
1042
0
        rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
1043
0
        auto msg = fmt::format(
1044
0
                "wrong rowset state when create_transient_rowset_writer, rowset state should be "
1045
0
                "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
1046
0
                RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
1047
0
                tablet_id());
1048
        // see `CloudRowsetWriter::build` for detail.
1049
        // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
1050
        // in `RowsetMeta::merge_rowset_meta()` in previous trials.
1051
0
        LOG(WARNING) << msg;
1052
0
        DCHECK(false) << msg;
1053
0
    }
1054
0
    RowsetWriterContext context;
1055
0
    context.rowset_state = PREPARED;
1056
0
    context.segments_overlap = OVERLAPPING;
1057
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
1058
    // This is because the partial update for a variant needs to ignore the extracted columns.
1059
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
1060
    // the complete variant is constructed by reading all the sub-columns of the variant.
1061
0
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
1062
0
    context.newest_write_timestamp = UnixSeconds();
1063
0
    context.tablet_id = table_id();
1064
0
    context.enable_segcompaction = false;
1065
0
    context.write_type = DataWriteType::TYPE_DIRECT;
1066
0
    context.partial_update_info = std::move(partial_update_info);
1067
0
    context.is_transient_rowset_writer = true;
1068
0
    context.rowset_id = rowset.rowset_id();
1069
0
    context.tablet_id = tablet_id();
1070
0
    context.index_id = index_id();
1071
0
    context.partition_id = partition_id();
1072
0
    context.file_cache_base_timestamp = tablet_meta()->creation_time();
1073
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
1074
0
    context.txn_expiration = txn_expiration;
1075
0
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
1076
    // TODO(liaoxin) enable packed file for transient rowset
1077
0
    context.allow_packed_file = false;
1078
1079
0
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
1080
0
    if (!storage_resource) {
1081
0
        return ResultError(std::move(storage_resource.error()));
1082
0
    }
1083
1084
0
    context.storage_resource = *storage_resource.value();
1085
1086
0
    return RowsetFactory::create_rowset_writer(_engine, context, false)
1087
0
            .transform([&](auto&& writer) {
1088
0
                writer->set_segment_start_id(cast_set<int32_t>(rowset.num_segments()));
1089
0
                return writer;
1090
0
            });
1091
0
}
1092
1093
3
int64_t CloudTablet::get_cloud_base_compaction_score() const {
1094
3
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
1095
0
        bool has_delete = false;
1096
0
        int64_t point = cumulative_layer_point();
1097
0
        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
1098
0
        for (const auto& [_, rs_meta] : _tablet_meta->all_rs_metas()) {
1099
0
            if (rs_meta->start_version() >= point) {
1100
0
                continue;
1101
0
            }
1102
0
            if (rs_meta->has_delete_predicate()) {
1103
0
                has_delete = true;
1104
0
                break;
1105
0
            }
1106
0
        }
1107
0
        if (!has_delete) {
1108
0
            return 0;
1109
0
        }
1110
0
    }
1111
1112
3
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
1113
3
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
1114
3
}
1115
1116
1
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
1117
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
1118
    //  number of tablet versions simultaneously.
1119
1
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
1120
1
}
1121
1122
// return a json string to show the compaction status of this tablet
1123
33
void CloudTablet::get_compaction_status(std::string* json_result) {
1124
33
    rapidjson::Document root;
1125
33
    root.SetObject();
1126
1127
33
    rapidjson::Document path_arr;
1128
33
    path_arr.SetArray();
1129
1130
33
    std::vector<RowsetSharedPtr> rowsets;
1131
33
    std::vector<RowsetSharedPtr> stale_rowsets;
1132
33
    {
1133
33
        std::shared_lock rdlock(_meta_lock);
1134
33
        rowsets.reserve(_rs_version_map.size());
1135
148
        for (auto& it : _rs_version_map) {
1136
148
            rowsets.push_back(it.second);
1137
148
        }
1138
33
        stale_rowsets.reserve(_stale_rs_version_map.size());
1139
540
        for (auto& it : _stale_rs_version_map) {
1140
540
            stale_rowsets.push_back(it.second);
1141
540
        }
1142
33
    }
1143
33
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
1144
33
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
1145
1146
    // get snapshot version path json_doc
1147
33
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
1148
33
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
1149
33
    rapidjson::Value cumu_value;
1150
33
    std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
1151
33
    cumu_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1152
33
                         root.GetAllocator());
1153
33
    root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
1154
33
    rapidjson::Value base_value;
1155
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
1156
33
    base_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1157
33
                         root.GetAllocator());
1158
33
    root.AddMember("last base failure time", base_value, root.GetAllocator());
1159
33
    rapidjson::Value full_value;
1160
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
1161
33
    full_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1162
33
                         root.GetAllocator());
1163
33
    root.AddMember("last full failure time", full_value, root.GetAllocator());
1164
33
    rapidjson::Value cumu_success_value;
1165
33
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
1166
33
    cumu_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1167
33
                                 root.GetAllocator());
1168
33
    root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
1169
33
    rapidjson::Value base_success_value;
1170
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
1171
33
    base_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1172
33
                                 root.GetAllocator());
1173
33
    root.AddMember("last base success time", base_success_value, root.GetAllocator());
1174
33
    rapidjson::Value full_success_value;
1175
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
1176
33
    full_success_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1177
33
                                 root.GetAllocator());
1178
33
    root.AddMember("last full success time", full_success_value, root.GetAllocator());
1179
33
    rapidjson::Value cumu_schedule_value;
1180
33
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
1181
33
    cumu_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1182
33
                                  root.GetAllocator());
1183
33
    root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
1184
33
    rapidjson::Value base_schedule_value;
1185
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
1186
33
    base_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1187
33
                                  root.GetAllocator());
1188
33
    root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
1189
33
    rapidjson::Value full_schedule_value;
1190
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
1191
33
    full_schedule_value.SetString(format_str.c_str(), cast_set<uint>(format_str.length()),
1192
33
                                  root.GetAllocator());
1193
33
    root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
1194
33
    rapidjson::Value cumu_compaction_status_value;
1195
33
    cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
1196
33
                                           cast_set<uint>(_last_cumu_compaction_status.length()),
1197
33
                                           root.GetAllocator());
1198
33
    root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
1199
33
    rapidjson::Value base_compaction_status_value;
1200
33
    base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
1201
33
                                           cast_set<uint>(_last_base_compaction_status.length()),
1202
33
                                           root.GetAllocator());
1203
33
    root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
1204
33
    rapidjson::Value full_compaction_status_value;
1205
33
    full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
1206
33
                                           cast_set<uint>(_last_full_compaction_status.length()),
1207
33
                                           root.GetAllocator());
1208
33
    root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());
1209
33
    rapidjson::Value exec_compaction_time;
1210
33
    std::string num_str {std::to_string(exec_compaction_time_us.load())};
1211
33
    exec_compaction_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
1212
33
                                   root.GetAllocator());
1213
33
    root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator());
1214
33
    rapidjson::Value local_read_time;
1215
33
    num_str = std::to_string(local_read_time_us.load());
1216
33
    local_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
1217
33
                              root.GetAllocator());
1218
33
    root.AddMember("compaction local read time us", local_read_time, root.GetAllocator());
1219
33
    rapidjson::Value remote_read_time;
1220
33
    num_str = std::to_string(remote_read_time_us.load());
1221
33
    remote_read_time.SetString(num_str.c_str(), cast_set<uint>(num_str.length()),
1222
33
                               root.GetAllocator());
1223
33
    root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator());
1224
1225
    // print all rowsets' version as an array
1226
33
    rapidjson::Document versions_arr;
1227
33
    rapidjson::Document missing_versions_arr;
1228
33
    versions_arr.SetArray();
1229
33
    missing_versions_arr.SetArray();
1230
33
    int64_t last_version = -1;
1231
148
    for (auto& rowset : rowsets) {
1232
148
        const Version& ver = rowset->version();
1233
148
        if (ver.first != last_version + 1) {
1234
0
            rapidjson::Value miss_value;
1235
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
1236
0
                                 missing_versions_arr.GetAllocator());
1237
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
1238
0
        }
1239
148
        rapidjson::Value value;
1240
148
        std::string version_str = rowset->get_rowset_info_str();
1241
148
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
1242
148
                        versions_arr.GetAllocator());
1243
148
        versions_arr.PushBack(value, versions_arr.GetAllocator());
1244
148
        last_version = ver.second;
1245
148
    }
1246
33
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
1247
33
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
1248
1249
    // print all stale rowsets' version as an array
1250
33
    rapidjson::Document stale_versions_arr;
1251
33
    stale_versions_arr.SetArray();
1252
540
    for (auto& rowset : stale_rowsets) {
1253
540
        rapidjson::Value value;
1254
540
        std::string version_str = rowset->get_rowset_info_str();
1255
540
        value.SetString(version_str.c_str(), cast_set<uint32_t>(version_str.length()),
1256
540
                        stale_versions_arr.GetAllocator());
1257
540
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
1258
540
    }
1259
33
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
1260
1261
    // add stale version rowsets
1262
33
    root.AddMember("stale version path", path_arr, root.GetAllocator());
1263
1264
    // to json string
1265
33
    rapidjson::StringBuffer strbuf;
1266
33
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1267
33
    root.Accept(writer);
1268
33
    *json_result = std::string(strbuf.GetString());
1269
33
}
1270
1271
1
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
1272
1
    if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
1273
1
        _cumulative_point = new_point;
1274
1
        return;
1275
1
    }
1276
    // cumulative point should only be reset to -1, or be increased
1277
    // FIXME: could happen in currently unresolved race conditions
1278
1
    LOG(WARNING) << "Unexpected cumulative point: " << new_point
1279
0
                 << ", origin: " << _cumulative_point.load();
1280
0
}
1281
1282
Status CloudTablet::check_rowset_schema_for_build_index(std::vector<TColumn>& columns,
1283
10
                                                        int schema_version) {
1284
10
    std::map<std::string, TabletColumn> fe_col_map;
1285
14
    for (int i = 0; i < columns.size(); i++) {
1286
4
        fe_col_map[columns[i].column_name] = TabletColumn(columns[i]);
1287
4
    }
1288
1289
10
    std::shared_lock rlock(_meta_lock);
1290
10
    for (const auto& [version, rs] : _rs_version_map) {
1291
4
        if (version.first == 0) {
1292
0
            continue;
1293
0
        }
1294
1295
4
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1296
0
            continue;
1297
0
        }
1298
1299
4
        for (auto rs_col : rs->tablet_schema()->columns()) {
1300
4
            auto find_ret = fe_col_map.find(rs_col->name());
1301
4
            if (find_ret == fe_col_map.end()) {
1302
1
                return Status::InternalError(
1303
1
                        "check rowset meta failed:rowset's col is dropped in FE.");
1304
1
            }
1305
1306
3
            if (rs_col->unique_id() != find_ret->second.unique_id()) {
1307
1
                return Status::InternalError("check rowset meta failed:col id not match.");
1308
1
            }
1309
1310
2
            if (rs_col->type() != find_ret->second.type()) {
1311
1
                return Status::InternalError("check rowset meta failed:col type not match.");
1312
1
            }
1313
2
        }
1314
4
    }
1315
1316
7
    return Status::OK();
1317
10
}
1318
1319
Result<RowsetSharedPtr> CloudTablet::pick_a_rowset_for_index_change(int schema_version,
1320
9
                                                                    bool& is_base_rowset) {
1321
9
    TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudTablet::pick_a_rowset_for_index_change",
1322
2
                                      Result<RowsetSharedPtr>(nullptr));
1323
2
    RowsetSharedPtr ret_rowset = nullptr;
1324
2
    std::shared_lock rlock(_meta_lock);
1325
2
    for (const auto& [version, rs] : _rs_version_map) {
1326
2
        if (version.first == 0) {
1327
0
            continue;
1328
0
        }
1329
2
        if (rs->num_rows() == 0) {
1330
1
            VLOG_DEBUG << "[index_change]find empty rs, index change may "
1331
0
                          "failed, id="
1332
0
                       << rs->rowset_id().to_string();
1333
1
        }
1334
1335
2
        if (rs->tablet_schema()->schema_version() >= schema_version) {
1336
2
            VLOG_DEBUG << "[index_change] skip rowset " << rs->tablet_schema()->schema_version()
1337
0
                       << "," << schema_version;
1338
2
            continue;
1339
2
        }
1340
1341
0
        if (ret_rowset == nullptr) {
1342
0
            ret_rowset = rs;
1343
0
            continue;
1344
0
        }
1345
1346
0
        if (rs->start_version() > ret_rowset->start_version()) {
1347
0
            ret_rowset = rs;
1348
0
        }
1349
0
    }
1350
1351
2
    if (ret_rowset != nullptr) {
1352
0
        is_base_rowset = ret_rowset->version().first < _cumulative_point;
1353
0
    }
1354
1355
2
    return ret_rowset;
1356
9
}
1357
1358
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
1359
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
1360
0
    {
1361
0
        std::shared_lock rlock(_meta_lock);
1362
0
        for (const auto& [version, rs] : _rs_version_map) {
1363
0
            if (version.first != 0 && version.first < _cumulative_point &&
1364
0
                (_alter_version == -1 || version.second <= _alter_version)) {
1365
0
                candidate_rowsets.push_back(rs);
1366
0
            }
1367
0
        }
1368
0
    }
1369
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1370
0
    return candidate_rowsets;
1371
0
}
1372
1373
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
1374
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
1375
0
    {
1376
0
        std::shared_lock rlock(_meta_lock);
1377
0
        for (auto& [v, rs] : _rs_version_map) {
1378
            // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
1379
0
            if (v.first != 0) {
1380
0
                candidate_rowsets.push_back(rs);
1381
0
            }
1382
0
        }
1383
0
    }
1384
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1385
0
    return candidate_rowsets;
1386
0
}
1387
1388
0
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
1389
0
    return _engine.calc_delete_bitmap_executor();
1390
0
}
1391
1392
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
1393
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
1394
                                       const RowsetIdUnorderedSet& cur_rowset_ids, int64_t lock_id,
1395
0
                                       int64_t next_visible_version) {
1396
0
    RowsetSharedPtr rowset = txn_info->rowset;
1397
0
    int64_t cur_version = rowset->start_version();
1398
    // update delete bitmap info, in order to avoid recalculation when trying again
1399
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1400
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
1401
1402
0
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
1403
0
        rowset_writer->num_rows() > 0) {
1404
0
        DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
1405
0
            return Status::InternalError<false>("injected update_tmp_rowset error.");
1406
0
        });
1407
0
        const auto& rowset_meta = rowset->rowset_meta();
1408
0
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
1409
0
    }
1410
1411
0
    RETURN_IF_ERROR(save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, lock_id,
1412
0
                                             next_visible_version, rowset));
1413
1414
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
1415
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
1416
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
1417
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
1418
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
1419
0
            txn_info->publish_info));
1420
1421
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
1422
0
        auto sleep_sec = dp->param<int>("sleep", 5);
1423
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1424
0
    });
1425
1426
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
1427
0
        auto retry = dp->param<bool>("retry", false);
1428
0
        auto sleep_sec = dp->param<int>("sleep", 0);
1429
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1430
0
        if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
1431
0
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1432
0
                    "injected DELETE_BITMAP_LOCK_ERROR");
1433
0
        } else {
1434
0
            return Status::InternalError<false>("injected non-retryable error");
1435
0
        }
1436
0
    });
1437
1438
0
    return Status::OK();
1439
0
}
1440
1441
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
1442
                                             DeleteBitmapPtr delete_bitmap, int64_t lock_id,
1443
0
                                             int64_t next_visible_version, RowsetSharedPtr rowset) {
1444
0
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1445
0
    for (auto iter = delete_bitmap->delete_bitmap.begin();
1446
0
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
1447
        // skip sentinel mark, which is used for delete bitmap correctness check
1448
0
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
1449
0
            new_delete_bitmap->merge(
1450
0
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
1451
0
                    iter->second);
1452
0
        }
1453
0
    }
1454
    // lock_id != -1 means this is in an explict txn
1455
0
    bool is_explicit_txn = (lock_id != -1);
1456
0
    auto ms_lock_id = !is_explicit_txn ? txn_id : lock_id;
1457
0
    std::optional<StorageResource> storage_resource;
1458
0
    auto storage_resource_result = rowset->rowset_meta()->remote_storage_resource();
1459
0
    if (storage_resource_result) {
1460
0
        storage_resource = *storage_resource_result.value();
1461
0
    }
1462
0
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
1463
0
            *this, ms_lock_id, LOAD_INITIATOR_ID, new_delete_bitmap.get(), new_delete_bitmap.get(),
1464
0
            rowset->rowset_id().to_string(), storage_resource,
1465
0
            config::delete_bitmap_store_write_version, txn_id, is_explicit_txn,
1466
0
            next_visible_version));
1467
0
    return Status::OK();
1468
0
}
1469
1470
1
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
1471
1
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
1472
1473
    // sort the existing versions in ascending order
1474
1
    std::sort(existing_versions.begin(), existing_versions.end(),
1475
3
              [](const Version& a, const Version& b) {
1476
                  // simple because 2 versions are certainly not overlapping
1477
3
                  return a.first < b.first;
1478
3
              });
1479
1480
    // From the first version(=0), find the missing version until spec_version
1481
1
    int64_t last_version = -1;
1482
1
    Versions missed_versions;
1483
4
    for (const Version& version : existing_versions) {
1484
4
        if (version.first > last_version + 1) {
1485
            // there is a hole between versions
1486
2
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
1487
2
        }
1488
4
        last_version = version.second;
1489
4
        if (last_version >= spec_version) {
1490
1
            break;
1491
1
        }
1492
4
    }
1493
1
    if (last_version < spec_version) {
1494
        // there is a hole between the last version and the specificed version.
1495
0
        missed_versions.emplace_back(last_version + 1, spec_version);
1496
0
    }
1497
1
    return missed_versions;
1498
1
}
1499
1500
Status CloudTablet::calc_delete_bitmap_for_compaction(
1501
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
1502
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
1503
        int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
1504
0
        bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) {
1505
0
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1506
0
    std::unique_ptr<RowLocationSet> missed_rows;
1507
0
    if ((config::enable_missing_rows_correctness_check ||
1508
0
         config::enable_mow_compaction_correctness_check_core ||
1509
0
         config::enable_mow_compaction_correctness_check_fail) &&
1510
0
        !allow_delete_in_cumu_compaction &&
1511
0
        (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1512
0
         !config::enable_prune_delete_sign_when_base_compaction)) {
1513
        // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false
1514
0
        missed_rows = std::make_unique<RowLocationSet>();
1515
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
1516
0
    }
1517
1518
0
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
1519
0
    if (config::enable_rowid_conversion_correctness_check &&
1520
0
        tablet_schema()->cluster_key_uids().empty()) {
1521
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
1522
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
1523
0
    }
1524
1525
    // 1. calc delete bitmap for historical data
1526
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1527
0
    Version version = max_version();
1528
0
    std::size_t missed_rows_size = 0;
1529
0
    calc_compaction_output_rowset_delete_bitmap(
1530
0
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
1531
0
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1532
0
    if (missed_rows) {
1533
0
        missed_rows_size = missed_rows->size();
1534
0
        if (!allow_delete_in_cumu_compaction) {
1535
0
            if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
1536
0
                 !config::enable_prune_delete_sign_when_base_compaction) &&
1537
0
                tablet_state() == TABLET_RUNNING) {
1538
0
                if (merged_rows + filtered_rows >= 0 &&
1539
0
                    merged_rows + filtered_rows != missed_rows_size) {
1540
0
                    std::string err_msg = fmt::format(
1541
0
                            "cumulative compaction: the merged rows({}), the filtered rows({}) is "
1542
0
                            "not equal to missed rows({}) in rowid conversion, tablet_id: {}, "
1543
0
                            "table_id:{}",
1544
0
                            merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id());
1545
0
                    LOG(WARNING) << err_msg;
1546
0
                    if (config::enable_mow_compaction_correctness_check_core) {
1547
0
                        CHECK(false) << err_msg;
1548
0
                    } else if (config::enable_mow_compaction_correctness_check_fail) {
1549
0
                        return Status::InternalError<false>(err_msg);
1550
0
                    } else {
1551
0
                        DCHECK(false) << err_msg;
1552
0
                    }
1553
0
                }
1554
0
            }
1555
0
        }
1556
0
    }
1557
0
    if (location_map) {
1558
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1559
0
        location_map->clear();
1560
0
    }
1561
1562
    // 2. calc delete bitmap for incremental data
1563
0
    int64_t t1 = MonotonicMicros();
1564
0
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
1565
0
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
1566
0
    int64_t t2 = MonotonicMicros();
1567
0
    if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
1568
0
        g_cu_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1569
0
    } else if (compaction_type == ReaderType::READER_BASE_COMPACTION) {
1570
0
        g_base_compaction_get_delete_bitmap_lock_time_ms << (t2 - t1) / 1000;
1571
0
    }
1572
0
    get_delete_bitmap_lock_start_time = t2;
1573
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
1574
0
    int64_t t3 = MonotonicMicros();
1575
1576
0
    calc_compaction_output_rowset_delete_bitmap(
1577
0
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
1578
0
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1579
0
    int64_t t4 = MonotonicMicros();
1580
0
    if (location_map) {
1581
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
1582
0
    }
1583
0
    int64_t t5 = MonotonicMicros();
1584
1585
    // 3. store delete bitmap
1586
0
    DeleteBitmapPtr delete_bitmap_v2 = nullptr;
1587
0
    auto delete_bitmap_size = output_rowset_delete_bitmap->delete_bitmap.size();
1588
0
    auto store_version = config::delete_bitmap_store_write_version;
1589
0
    if (store_version == 2 || store_version == 3) {
1590
0
        delete_bitmap_v2 = std::make_shared<DeleteBitmap>(*output_rowset_delete_bitmap);
1591
0
        std::vector<std::pair<RowsetId, int64_t>> retained_rowsets_to_seg_num;
1592
0
        {
1593
0
            std::shared_lock rlock(get_header_lock());
1594
0
            for (const auto& [rowset_version, rowset_ptr] : rowset_map()) {
1595
0
                if (rowset_version.second < output_rowset->start_version()) {
1596
0
                    retained_rowsets_to_seg_num.emplace_back(
1597
0
                            std::make_pair(rowset_ptr->rowset_id(), rowset_ptr->num_segments()));
1598
0
                }
1599
0
            }
1600
0
        }
1601
0
        if (config::enable_agg_delta_delete_bitmap_for_store_v2) {
1602
0
            tablet_meta()->delete_bitmap().subset_and_agg(
1603
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1604
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1605
0
        } else {
1606
0
            tablet_meta()->delete_bitmap().subset(
1607
0
                    retained_rowsets_to_seg_num, output_rowset->start_version(),
1608
0
                    output_rowset->end_version(), delete_bitmap_v2.get());
1609
0
        }
1610
0
    }
1611
0
    std::optional<StorageResource> storage_resource;
1612
0
    auto storage_resource_result = output_rowset->rowset_meta()->remote_storage_resource();
1613
0
    if (storage_resource_result) {
1614
0
        storage_resource = *storage_resource_result.value();
1615
0
    }
1616
0
    auto st = _engine.meta_mgr().update_delete_bitmap(
1617
0
            *this, -1, initiator, output_rowset_delete_bitmap.get(), delete_bitmap_v2.get(),
1618
0
            output_rowset->rowset_id().to_string(), storage_resource, store_version);
1619
0
    int64_t t6 = MonotonicMicros();
1620
0
    LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
1621
0
              << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
1622
0
              << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
1623
0
              << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
1624
0
              << " us, st=" << st.to_string() << ". store_version=" << store_version
1625
0
              << ", calculated delete bitmap size=" << delete_bitmap_size
1626
0
              << ", update delete bitmap size="
1627
0
              << output_rowset_delete_bitmap->delete_bitmap.size();
1628
0
    return st;
1629
0
}
1630
1631
void CloudTablet::agg_delete_bitmap_for_compaction(
1632
        int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
1633
        DeleteBitmapPtr& new_delete_bitmap,
1634
0
        std::map<std::string, int64_t>& pre_rowset_to_versions) {
1635
0
    for (auto& rowset : pre_rowsets) {
1636
0
        for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
1637
0
            auto d = tablet_meta()->delete_bitmap().get_agg_without_cache(
1638
0
                    {rowset->rowset_id(), seg_id, end_version}, start_version);
1639
0
            if (d->isEmpty()) {
1640
0
                continue;
1641
0
            }
1642
0
            VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
1643
0
                       << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
1644
0
                       << ", rowset_version=" << rowset->version().to_string()
1645
0
                       << ". compaction start_version=" << start_version
1646
0
                       << ", end_version=" << end_version
1647
0
                       << ". delete_bitmap cardinality=" << d->cardinality();
1648
0
            DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
1649
0
            new_delete_bitmap->set(end_key, *d);
1650
0
            pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
1651
0
        }
1652
0
    }
1653
0
}
1654
1655
5
Status CloudTablet::sync_meta() {
1656
5
    if (!config::enable_file_cache) {
1657
1
        return Status::OK();
1658
1
    }
1659
1660
4
    TabletMetaSharedPtr tablet_meta;
1661
4
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
1662
4
    if (!st.ok()) {
1663
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
1664
0
            clear_cache();
1665
0
        }
1666
0
        return st;
1667
0
    }
1668
1669
4
    auto old_creation_time = _tablet_meta->creation_time();
1670
4
    auto new_creation_time = tablet_meta->creation_time();
1671
4
    bool creation_time_changed = old_creation_time != new_creation_time;
1672
4
    if (creation_time_changed) {
1673
0
        _tablet_meta->set_creation_time(new_creation_time);
1674
0
    }
1675
1676
4
    auto old_ttl_seconds = _tablet_meta->ttl_seconds();
1677
4
    auto new_ttl_seconds = tablet_meta->ttl_seconds();
1678
4
    bool ttl_changed = old_ttl_seconds != new_ttl_seconds;
1679
4
    if (ttl_changed) {
1680
0
        _tablet_meta->set_ttl_seconds(new_ttl_seconds);
1681
0
    }
1682
1683
4
    if (creation_time_changed || ttl_changed) {
1684
0
        int64_t new_expiration_time =
1685
0
                io::calc_file_cache_expiration_time(new_creation_time, new_ttl_seconds);
1686
0
        std::shared_lock rlock(_meta_lock);
1687
0
        for (auto& [_, rs] : _rs_version_map) {
1688
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
1689
0
                auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
1690
0
                auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
1691
0
                if (file_cache != nullptr) {
1692
0
                    file_cache->modify_expiration_time(file_key, new_expiration_time);
1693
0
                }
1694
0
            }
1695
0
            for (const auto& file_name : rs->get_index_file_names()) {
1696
0
                auto file_key = io::BlockFileCache::hash(file_name);
1697
0
                auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
1698
0
                if (file_cache != nullptr) {
1699
0
                    file_cache->modify_expiration_time(file_key, new_expiration_time);
1700
0
                }
1701
0
            }
1702
0
        }
1703
0
    }
1704
1705
4
    auto new_compaction_policy = tablet_meta->compaction_policy();
1706
4
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
1707
1
        _tablet_meta->set_compaction_policy(new_compaction_policy);
1708
1
    }
1709
4
    auto new_time_series_compaction_goal_size_mbytes =
1710
4
            tablet_meta->time_series_compaction_goal_size_mbytes();
1711
4
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
1712
4
        new_time_series_compaction_goal_size_mbytes) {
1713
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
1714
0
                new_time_series_compaction_goal_size_mbytes);
1715
0
    }
1716
4
    auto new_time_series_compaction_file_count_threshold =
1717
4
            tablet_meta->time_series_compaction_file_count_threshold();
1718
4
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
1719
4
        new_time_series_compaction_file_count_threshold) {
1720
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
1721
0
                new_time_series_compaction_file_count_threshold);
1722
0
    }
1723
4
    auto new_time_series_compaction_time_threshold_seconds =
1724
4
            tablet_meta->time_series_compaction_time_threshold_seconds();
1725
4
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
1726
4
        new_time_series_compaction_time_threshold_seconds) {
1727
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
1728
0
                new_time_series_compaction_time_threshold_seconds);
1729
0
    }
1730
4
    auto new_time_series_compaction_empty_rowsets_threshold =
1731
4
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
1732
4
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
1733
4
        new_time_series_compaction_empty_rowsets_threshold) {
1734
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
1735
0
                new_time_series_compaction_empty_rowsets_threshold);
1736
0
    }
1737
4
    auto new_time_series_compaction_level_threshold =
1738
4
            tablet_meta->time_series_compaction_level_threshold();
1739
4
    if (_tablet_meta->time_series_compaction_level_threshold() !=
1740
4
        new_time_series_compaction_level_threshold) {
1741
0
        _tablet_meta->set_time_series_compaction_level_threshold(
1742
0
                new_time_series_compaction_level_threshold);
1743
0
    }
1744
    // Sync disable_auto_compaction (stored in tablet_schema)
1745
4
    auto new_disable_auto_compaction = tablet_meta->tablet_schema()->disable_auto_compaction();
1746
4
    if (_tablet_meta->tablet_schema()->disable_auto_compaction() != new_disable_auto_compaction) {
1747
3
        _tablet_meta->mutable_tablet_schema()->set_disable_auto_compaction(
1748
3
                new_disable_auto_compaction);
1749
3
    }
1750
1751
4
    return Status::OK();
1752
4
}
1753
1754
0
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
1755
0
    std::shared_lock rdlock(_meta_lock);
1756
0
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
1757
0
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
1758
    // Currently, this information will not be used by the cloud report,
1759
    // but it may be used in the future.
1760
0
}
1761
1762
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
1763
0
                                              DeleteBitmap* expected_delete_bitmap) {
1764
0
    DeleteBitmapPtr cached_delete_bitmap;
1765
0
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1766
0
    Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
1767
0
            txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
1768
0
    if (st.ok()) {
1769
0
        bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
1770
0
        auto msg = fmt::format(
1771
0
                "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
1772
0
                "txn_id={}, tablet_id={}",
1773
0
                expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
1774
0
                tablet_id());
1775
0
        if (!res) {
1776
0
            DCHECK(res) << msg;
1777
0
            return Status::InternalError<false>(msg);
1778
0
        }
1779
0
    }
1780
0
    return Status::OK();
1781
0
}
1782
1783
35
WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
1784
35
    std::shared_lock rlock(_meta_lock);
1785
35
    if (!_rowset_warm_up_states.contains(rowset_id)) {
1786
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1787
1
    }
1788
34
    auto& warmup_info = _rowset_warm_up_states[rowset_id];
1789
34
    warmup_info.update_state();
1790
34
    return warmup_info.state;
1791
35
}
1792
1793
bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
1794
34
                                          std::chrono::steady_clock::time_point start_tp) {
1795
34
    std::lock_guard wlock(_meta_lock);
1796
34
    return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
1797
34
}
1798
1799
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
1800
5
                                                              RowsetId rowset_id, int64_t delta) {
1801
5
    std::lock_guard wlock(_meta_lock);
1802
5
    return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
1803
5
}
1804
1805
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
1806
                                                                       RowsetId rowset_id,
1807
5
                                                                       int64_t delta) {
1808
5
    auto it = _rowset_warm_up_states.find(rowset_id);
1809
5
    if (it == _rowset_warm_up_states.end()) {
1810
0
        return false;
1811
0
    }
1812
5
    if (it->second.state.trigger_source != source) {
1813
        // Only the same trigger source can update the state
1814
2
        return false;
1815
2
    }
1816
3
    it->second.num_inverted_idx += delta;
1817
3
    return true;
1818
5
}
1819
1820
bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
1821
                                                   WarmUpTriggerSource source,
1822
34
                                                   std::chrono::steady_clock::time_point start_tp) {
1823
34
    auto rowset_id = rowset.rowset_id();
1824
1825
    // Check if rowset already has warmup state
1826
34
    if (_rowset_warm_up_states.contains(rowset_id)) {
1827
10
        auto existing_state = _rowset_warm_up_states[rowset_id].state;
1828
1829
        // For job-triggered warmup (one-time and periodic warmup), allow it to proceed
1830
        // except when there's already another job-triggered warmup in progress
1831
10
        if (source == WarmUpTriggerSource::JOB) {
1832
5
            if (existing_state.trigger_source == WarmUpTriggerSource::JOB &&
1833
5
                existing_state.progress == WarmUpProgress::DOING) {
1834
                // Same job type already in progress, skip to avoid duplicate warmup
1835
1
                return false;
1836
1
            }
1837
5
        } else {
1838
            // For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists
1839
5
            return false;
1840
5
        }
1841
10
    }
1842
1843
28
    if (source == WarmUpTriggerSource::JOB) {
1844
9
        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
1845
19
    } else if (source == WarmUpTriggerSource::SYNC_ROWSET) {
1846
6
        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
1847
13
    } else if (source == WarmUpTriggerSource::EVENT_DRIVEN) {
1848
13
        g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1;
1849
13
    }
1850
28
    _rowset_warm_up_states[rowset_id] = {
1851
28
            .state = {.trigger_source = source,
1852
28
                      .progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE
1853
28
                                                              : WarmUpProgress::DOING)},
1854
28
            .num_segments = rowset.num_segments(),
1855
28
            .start_tp = start_tp};
1856
28
    return true;
1857
34
}
1858
1859
52
void CloudTablet::RowsetWarmUpInfo::update_state() {
1860
52
    if (has_finished()) {
1861
14
        g_file_cache_warm_up_rowset_complete_num << 1;
1862
14
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1863
14
                            std::chrono::steady_clock::now() - start_tp)
1864
14
                            .count();
1865
14
        g_file_cache_warm_up_rowset_all_segments_latency << cost;
1866
14
        state.progress = WarmUpProgress::DONE;
1867
14
    }
1868
52
}
1869
1870
WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
1871
                                                        RowsetId rowset_id, Status status,
1872
                                                        int64_t segment_num,
1873
21
                                                        int64_t inverted_idx_num) {
1874
21
    std::lock_guard wlock(_meta_lock);
1875
21
    auto it = _rowset_warm_up_states.find(rowset_id);
1876
21
    if (it == _rowset_warm_up_states.end()) {
1877
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1878
1
    }
1879
20
    auto& warmup_info = it->second;
1880
20
    if (warmup_info.state.trigger_source != trigger_source) {
1881
        // Only the same trigger source can update the state
1882
2
        return warmup_info.state;
1883
2
    }
1884
18
    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status;
1885
18
    if (segment_num > 0) {
1886
16
        g_file_cache_warm_up_segment_complete_num << segment_num;
1887
16
        if (!status.ok()) {
1888
1
            g_file_cache_warm_up_segment_failed_num << segment_num;
1889
1
        }
1890
16
    }
1891
18
    if (inverted_idx_num > 0) {
1892
2
        g_file_cache_warm_up_inverted_idx_complete_num << inverted_idx_num;
1893
2
        if (!status.ok()) {
1894
0
            g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num;
1895
0
        }
1896
2
    }
1897
18
    warmup_info.done(segment_num, inverted_idx_num);
1898
18
    return warmup_info.state;
1899
20
}
1900
1901
222
bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
1902
222
    auto it = _rowset_warm_up_states.find(rowset_id);
1903
222
    if (it == _rowset_warm_up_states.end()) {
1904
        // The rowset is not in warmup state, which means the rowset has never been warmed up.
1905
        // This may happen when the upstream BE tried to warm up rowsets on this BE but this BE
1906
        // was restarting so the warmup failed, and _rowset_warm_up_states has no entry for it.
1907
        //
1908
        // Normally the startup_timepoint check in rowset_is_warmed_up_unlocked() would filter out
1909
        // such rowsets (visible_timestamp < startup_timepoint → assumed warmed up). However,
1910
        // compaction-produced rowsets have their visible_timestamp set at rowset builder
1911
        // initialization time rather than the final transaction commit time on meta-service,
1912
        // so their visible_timestamp can be earlier than startup_timepoint, causing the
1913
        // startup_timepoint check to NOT filter them out and reaching here with no warmup entry.
1914
        //
1915
        // If such a rowset is before the cumulative compaction point and base compaction never
1916
        // happens, returning false here would cause the version path algorithm to exclude it,
1917
        // leading to a persistently low path_max_version. With continuous upstream ingestion,
1918
        // the freshness tolerance fallback check would keep triggering, making every query on
1919
        // this tablet fall back to reading all data from remote storage.
1920
        //
1921
        // Returning true (optimistically treating it as warmed up) allows the version path to
1922
        // include it. On cache miss the data is transparently read from remote storage per-segment
1923
        // and cached locally in 1MB blocks, so the problem self-heals through subsequent queries.
1924
7
        g_rowset_warmup_state_missing_count << 1;
1925
7
        LOG_EVERY_N(WARNING, 100) << fmt::format(
1926
1
                "rowset warmup state missing, considering it as warmed up. tablet_id={}, "
1927
1
                "rowset_id={}",
1928
1
                tablet_id(), rowset_id.to_string());
1929
7
        return true;
1930
7
    }
1931
215
    return it->second.state.progress == WarmUpProgress::DONE;
1932
222
}
1933
1934
675
void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
1935
675
    _rowset_warm_up_states[rowset_id] = {
1936
675
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1937
675
                      .progress = WarmUpProgress::DONE},
1938
675
            .num_segments = 1,
1939
675
            .start_tp = std::chrono::steady_clock::now()};
1940
675
}
1941
1942
93
void CloudTablet::add_not_warmed_up_rowset(const RowsetId& rowset_id) {
1943
93
    _rowset_warm_up_states[rowset_id] = {
1944
93
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1945
93
                      .progress = WarmUpProgress::DOING},
1946
93
            .num_segments = 1,
1947
93
            .start_tp = std::chrono::steady_clock::now()};
1948
93
}
1949
1950
#include "common/compile_check_end.h"
1951
} // namespace doris