Coverage Report

Created: 2025-10-30 15:07

/root/doris/be/src/cloud/cloud_tablet.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <bvar/bvar.h>
21
#include <bvar/latency_recorder.h>
22
#include <gen_cpp/Types_types.h>
23
#include <gen_cpp/olap_file.pb.h>
24
#include <rapidjson/document.h>
25
#include <rapidjson/encodings.h>
26
#include <rapidjson/prettywriter.h>
27
#include <rapidjson/rapidjson.h>
28
#include <rapidjson/stringbuffer.h>
29
30
#include <atomic>
31
#include <chrono>
32
#include <cstdint>
33
#include <memory>
34
#include <ranges>
35
#include <ratio>
36
#include <shared_mutex>
37
#include <unordered_map>
38
#include <vector>
39
40
#include "cloud/cloud_meta_mgr.h"
41
#include "cloud/cloud_storage_engine.h"
42
#include "cloud/cloud_tablet_mgr.h"
43
#include "cloud/cloud_warm_up_manager.h"
44
#include "common/config.h"
45
#include "common/logging.h"
46
#include "common/status.h"
47
#include "io/cache/block_file_cache_downloader.h"
48
#include "io/cache/block_file_cache_factory.h"
49
#include "olap/base_tablet.h"
50
#include "olap/compaction.h"
51
#include "olap/cumulative_compaction_time_series_policy.h"
52
#include "olap/olap_define.h"
53
#include "olap/rowset/beta_rowset.h"
54
#include "olap/rowset/rowset.h"
55
#include "olap/rowset/rowset_factory.h"
56
#include "olap/rowset/rowset_fwd.h"
57
#include "olap/rowset/rowset_writer.h"
58
#include "olap/rowset/segment_v2/inverted_index_desc.h"
59
#include "olap/storage_policy.h"
60
#include "olap/tablet_schema.h"
61
#include "olap/txn_manager.h"
62
#include "util/debug_points.h"
63
#include "vec/common/schema_util.h"
64
65
namespace doris {
66
#include "common/compile_check_begin.h"
67
using namespace ErrorCode;
68
69
bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
70
71
bvar::Adder<int64_t> g_capture_prefer_cache_count("capture_prefer_cache_count");
72
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_count(
73
        "capture_with_freshness_tolerance_count");
74
bvar::Adder<int64_t> g_capture_with_freshness_tolerance_fallback_count(
75
        "capture_with_freshness_tolerance_fallback_count");
76
bvar::Window<bvar::Adder<int64_t>> g_capture_prefer_cache_count_window(
77
        "capture_prefer_cache_count_window", &g_capture_prefer_cache_count, 30);
78
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_count_window(
79
        "capture_with_freshness_tolerance_count_window", &g_capture_with_freshness_tolerance_count,
80
        30);
81
bvar::Window<bvar::Adder<int64_t>> g_capture_with_freshness_tolerance_fallback_count_window(
82
        "capture_with_freshness_tolerance_fallback_count_window",
83
        &g_capture_with_freshness_tolerance_fallback_count, 30);
84
85
static constexpr int LOAD_INITIATOR_ID = -1;
86
87
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
88
        "file_cache_cloud_tablet_submitted_segment_size");
89
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
90
        "file_cache_cloud_tablet_submitted_segment_num");
91
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
92
        "file_cache_cloud_tablet_submitted_index_size");
93
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
94
        "file_cache_cloud_tablet_submitted_index_num");
95
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
96
        "file_cache_cloud_tablet_finished_segment_size");
97
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
98
        "file_cache_cloud_tablet_finished_segment_num");
99
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
100
        "file_cache_cloud_tablet_finished_index_size");
101
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
102
        "file_cache_cloud_tablet_finished_index_num");
103
104
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
105
        "file_cache_recycle_cached_data_segment_num");
106
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
107
        "file_cache_recycle_cached_data_segment_size");
108
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
109
        "file_cache_recycle_cached_data_index_num");
110
111
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_complete_num(
112
        "file_cache_warm_up_segment_complete_num");
113
bvar::Adder<uint64_t> g_file_cache_warm_up_segment_failed_num(
114
        "file_cache_warm_up_segment_failed_num");
115
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_complete_num(
116
        "file_cache_warm_up_inverted_idx_complete_num");
117
bvar::Adder<uint64_t> g_file_cache_warm_up_inverted_idx_failed_num(
118
        "file_cache_warm_up_inverted_idx_failed_num");
119
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_complete_num(
120
        "file_cache_warm_up_rowset_complete_num");
121
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_job_num(
122
        "file_cache_warm_up_rowset_triggered_by_job_num");
123
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num(
124
        "file_cache_warm_up_rowset_triggered_by_sync_rowset_num");
125
bvar::Adder<uint64_t> g_file_cache_warm_up_rowset_triggered_by_event_driven_num(
126
        "file_cache_warm_up_rowset_triggered_by_event_driven_num");
127
bvar::LatencyRecorder g_file_cache_warm_up_rowset_all_segments_latency(
128
        "file_cache_warm_up_rowset_all_segments_latency");
129
130
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
131
81
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
132
133
81
CloudTablet::~CloudTablet() = default;
134
135
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
136
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
137
0
}
138
139
19
std::string CloudTablet::tablet_path() const {
140
19
    return "";
141
19
}
142
143
Status CloudTablet::capture_rs_readers(const Version& spec_version,
144
                                       std::vector<RowSetSplits>* rs_splits,
145
0
                                       const CaptureRowsetOps& opts) {
146
0
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
147
0
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
148
0
        return Status::Error<false>(-230, "injected error");
149
0
    });
150
0
    std::shared_lock rlock(_meta_lock);
151
0
    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
152
0
            spec_version, CaptureRowsetOps {.skip_missing_versions = opts.skip_missing_versions}));
153
0
    return Status::OK();
154
0
}
155
156
[[nodiscard]] Result<std::vector<Version>> CloudTablet::capture_consistent_versions_unlocked(
157
33
        const Version& version_range, const CaptureRowsetOps& options) const {
158
33
    if (options.query_freshness_tolerance_ms > 0) {
  Branch (158:9): [True: 20, False: 13]
159
20
        return capture_versions_with_freshness_tolerance(version_range, options);
160
20
    } else if (options.enable_prefer_cached_rowset && !enable_unique_key_merge_on_write()) {
  Branch (160:16): [True: 13, False: 0]
  Branch (160:55): [True: 13, False: 0]
161
13
        return capture_versions_prefer_cache(version_range);
162
13
    }
163
0
    return BaseTablet::capture_consistent_versions_unlocked(version_range, options);
164
33
}
165
166
0
Status CloudTablet::merge_rowsets_schema() {
167
    // Find the rowset with the max version
168
0
    auto max_version_rowset =
169
0
            std::max_element(
170
0
                    _rs_version_map.begin(), _rs_version_map.end(),
171
0
                    [](const auto& a, const auto& b) {
172
0
                        return !a.second->tablet_schema()
  Branch (172:32): [True: 0, False: 0]
173
0
                                       ? true
174
0
                                       : (!b.second->tablet_schema()
  Branch (174:43): [True: 0, False: 0]
175
0
                                                  ? false
176
0
                                                  : a.second->tablet_schema()->schema_version() <
177
0
                                                            b.second->tablet_schema()
178
0
                                                                    ->schema_version());
179
0
                    })
180
0
                    ->second;
181
0
    TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
182
    // If the schema has variant columns, perform a merge to create a wide tablet schema
183
0
    if (max_version_schema->num_variant_columns() > 0) {
  Branch (183:9): [True: 0, False: 0]
184
0
        std::vector<TabletSchemaSPtr> schemas;
185
0
        std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
186
0
                       [](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
187
        // Merge the collected schemas to obtain the least common schema
188
0
        RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
189
0
                                                                         max_version_schema));
190
0
        VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
191
0
        _merged_tablet_schema = max_version_schema;
192
0
    }
193
0
    return Status::OK();
194
0
}
195
196
Result<std::vector<Version>> CloudTablet::capture_versions_prefer_cache(
197
13
        const Version& spec_version) const {
198
13
    g_capture_prefer_cache_count << 1;
199
13
    Versions version_path;
200
13
    std::shared_lock rlock(_meta_lock);
201
13
    auto st = _timestamped_version_tracker.capture_consistent_versions_prefer_cache(
202
13
            spec_version, version_path,
203
92
            [&](int64_t start, int64_t end) { return rowset_is_warmed_up_unlocked(start, end); });
204
13
    if (!st.ok()) {
  Branch (204:9): [True: 0, False: 13]
205
0
        return ResultError(st);
206
0
    }
207
13
    int64_t path_max_version = version_path.back().second;
208
13
    VLOG_DEBUG << fmt::format(
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
209
0
            "[verbose] CloudTablet::capture_versions_prefer_cache, capture path: {}, "
210
0
            "tablet_id={}, spec_version={}, path_max_version={}",
211
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
212
0
                          return fmt::format("{}", version.to_string());
213
0
                      }),
214
0
                      ", "),
215
0
            tablet_id(), spec_version.to_string(), path_max_version);
216
13
    return version_path;
217
13
}
218
219
210
bool CloudTablet::rowset_is_warmed_up_unlocked(int64_t start_version, int64_t end_version) const {
220
210
    if (start_version > end_version) {
  Branch (220:9): [True: 0, False: 210]
221
0
        return false;
222
0
    }
223
210
    Version version {start_version, end_version};
224
210
    auto it = _rs_version_map.find(version);
225
210
    if (it == _rs_version_map.end()) {
  Branch (225:9): [True: 78, False: 132]
226
78
        it = _stale_rs_version_map.find(version);
227
78
        if (it == _stale_rs_version_map.end()) {
  Branch (227:13): [True: 0, False: 78]
228
0
            LOG_WARNING(
Line
Count
Source
119
0
#define LOG_WARNING TaggableLogger(__FILE__, __LINE__, google::GLOG_WARNING)
229
0
                    "fail to find Rowset in rs_version or stale_rs_version for version. "
230
0
                    "tablet={}, version={}",
231
0
                    tablet_id(), version.to_string());
232
0
            return false;
233
0
        }
234
78
    }
235
210
    const auto& rs = it->second;
236
210
    if (rs->visible_timestamp() < _engine.startup_timepoint()) {
  Branch (236:9): [True: 10, False: 200]
237
        // We only care about rowsets that are created after startup time point. For other rowsets,
238
        // we assume they are warmed up.
239
10
        return true;
240
10
    }
241
200
    return is_rowset_warmed_up(rs->rowset_id());
242
210
};
243
244
Result<std::vector<Version>> CloudTablet::capture_versions_with_freshness_tolerance(
245
20
        const Version& spec_version, const CaptureRowsetOps& options) const {
246
20
    g_capture_with_freshness_tolerance_count << 1;
247
20
    using namespace std::chrono;
248
20
    auto query_freshness_tolerance_ms = options.query_freshness_tolerance_ms;
249
20
    auto freshness_limit_tp = system_clock::now() - milliseconds(query_freshness_tolerance_ms);
250
    // find a version path where every edge(rowset) has been warmuped
251
20
    Versions version_path;
252
20
    std::shared_lock rlock(_meta_lock);
253
20
    if (enable_unique_key_merge_on_write()) {
  Branch (253:9): [True: 11, False: 9]
254
        // For merge-on-write table, newly generated delete bitmap marks will be on the rowsets which are in newest layout.
255
        // So we can ony capture rowsets which are in newest data layout. Otherwise there may be data correctness issue.
256
11
        RETURN_IF_ERROR_RESULT(
Line
Count
Source
708
11
    do {                                            \
709
11
        Status _status_ = (stmt);                   \
710
11
        if (UNLIKELY(!_status_.ok())) {             \
Line
Count
Source
36
11
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 11]
711
0
            return unexpected(std::move(_status_)); \
712
0
        }                                           \
713
11
    } while (false)
  Branch (713:14): [Folded - Ignored]
257
11
                _timestamped_version_tracker.capture_consistent_versions_with_validator_mow(
258
11
                        spec_version, version_path, [&](int64_t start, int64_t end) {
259
11
                            return rowset_is_warmed_up_unlocked(start, end);
260
11
                        }));
261
11
    } else {
262
9
        RETURN_IF_ERROR_RESULT(
Line
Count
Source
708
9
    do {                                            \
709
9
        Status _status_ = (stmt);                   \
710
9
        if (UNLIKELY(!_status_.ok())) {             \
Line
Count
Source
36
9
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 9]
711
0
            return unexpected(std::move(_status_)); \
712
0
        }                                           \
713
9
    } while (false)
  Branch (713:14): [Folded - Ignored]
263
9
                _timestamped_version_tracker.capture_consistent_versions_with_validator(
264
9
                        spec_version, version_path, [&](int64_t start, int64_t end) {
265
9
                            return rowset_is_warmed_up_unlocked(start, end);
266
9
                        }));
267
9
    }
268
20
    int64_t path_max_version = version_path.back().second;
269
376
    auto should_be_visible_but_not_warmed_up = [&](const auto& rs_meta) -> bool {
270
376
        if (rs_meta->version() == Version {0, 1}) {
  Branch (270:13): [True: 20, False: 356]
271
            // skip rowset[0-1]
272
20
            return false;
273
20
        }
274
356
        bool ret = rs_meta->start_version() > path_max_version &&
  Branch (274:20): [True: 29, False: 327]
275
356
                   rs_meta->visible_timestamp() < freshness_limit_tp;
  Branch (275:20): [True: 5, False: 24]
276
356
        if (ret && config::read_cluster_cache_opt_verbose_log) {
  Branch (276:13): [True: 5, False: 351]
  Branch (276:20): [True: 5, False: 0]
277
5
            std::time_t t1 = system_clock::to_time_t(rs_meta->visible_timestamp());
278
5
            std::tm tm1 = *std::localtime(&t1);
279
5
            std::ostringstream oss1;
280
5
            oss1 << std::put_time(&tm1, "%Y-%m-%d %H:%M:%S");
281
282
5
            std::time_t t2 = system_clock::to_time_t(freshness_limit_tp);
283
5
            std::tm tm2 = *std::localtime(&t2);
284
5
            std::ostringstream oss2;
285
5
            oss2 << std::put_time(&tm2, "%Y-%m-%d %H:%M:%S");
286
5
            LOG_INFO(
Line
Count
Source
118
10
#define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
287
5
                    "[verbose] CloudTablet::capture_rs_readers_with_freshness_tolerance, "
288
5
                    "find a rowset which should be visible but not warmed up, tablet_id={}, "
289
5
                    "path_max_version={}, rowset_id={}, version={}, visible_time={}, "
290
5
                    "freshness_limit={}, version_graph={}, rowset_warmup_digest={}",
291
5
                    tablet_id(), path_max_version, rs_meta->rowset_id().to_string(),
292
5
                    rs_meta->version().to_string(), oss1.str(), oss2.str(),
293
5
                    _timestamped_version_tracker.debug_string(), rowset_warmup_digest());
294
5
        }
295
356
        return ret;
296
376
    };
297
    // use std::views::concat after C++26
298
20
    bool should_fallback = std::ranges::any_of(_tablet_meta->all_rs_metas(),
  Branch (298:28): [True: 2, False: 18]
299
20
                                               should_be_visible_but_not_warmed_up) ||
300
20
                           std::ranges::any_of(_tablet_meta->all_stale_rs_metas(),
  Branch (300:28): [True: 3, False: 15]
301
18
                                               should_be_visible_but_not_warmed_up);
302
20
    if (should_fallback) {
  Branch (302:9): [True: 5, False: 15]
303
5
        rlock.unlock();
304
5
        g_capture_with_freshness_tolerance_fallback_count << 1;
305
        // if there exists a rowset which satisfies freshness tolerance and its start version is larger than the path max version
306
        // but has not been warmuped up yet, fallback to capture rowsets as usual
307
5
        return BaseTablet::capture_consistent_versions_unlocked(spec_version, options);
308
5
    }
309
15
    VLOG_DEBUG << fmt::format(
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
310
0
            "[verbose] CloudTablet::capture_versions_with_freshness_tolerance, capture path: {}, "
311
0
            "tablet_id={}, spec_version={}, path_max_version={}",
312
0
            fmt::join(version_path | std::views::transform([](const auto& version) {
313
0
                          return fmt::format("{}", version.to_string());
314
0
                      }),
315
0
                      ", "),
316
0
            tablet_id(), spec_version.to_string(), path_max_version);
317
15
    return version_path;
318
20
}
319
320
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
321
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
322
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data,
323
0
                                 SyncRowsetStats* stats) {
324
0
    RETURN_IF_ERROR(sync_if_not_running(stats));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
325
326
0
    if (query_version > 0) {
  Branch (326:9): [True: 0, False: 0]
327
0
        std::shared_lock rlock(_meta_lock);
328
0
        if (_max_version >= query_version) {
  Branch (328:13): [True: 0, False: 0]
329
0
            return Status::OK();
330
0
        }
331
0
    }
332
333
    // serially execute sync to reduce unnecessary network overhead
334
0
    std::unique_lock lock(_sync_meta_lock);
335
0
    if (query_version > 0) {
  Branch (335:9): [True: 0, False: 0]
336
0
        std::shared_lock rlock(_meta_lock);
337
0
        if (_max_version >= query_version) {
  Branch (337:13): [True: 0, False: 0]
338
0
            return Status::OK();
339
0
        }
340
0
    }
341
342
0
    auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, warmup_delta_data, true,
343
0
                                                              false, stats);
344
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (344:9): [True: 0, False: 0]
345
0
        clear_cache();
346
0
    }
347
348
0
    return st;
349
0
}
350
351
// Sync tablet meta and all rowset meta if not running.
352
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
353
// It should be a quite rare situation.
354
0
Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
355
0
    if (tablet_state() == TABLET_RUNNING) {
  Branch (355:9): [True: 0, False: 0]
356
0
        return Status::OK();
357
0
    }
358
359
    // Serially execute sync to reduce unnecessary network overhead
360
0
    std::unique_lock lock(_sync_meta_lock);
361
362
0
    {
363
0
        std::shared_lock rlock(_meta_lock);
364
0
        if (tablet_state() == TABLET_RUNNING) {
  Branch (364:13): [True: 0, False: 0]
365
0
            return Status::OK();
366
0
        }
367
0
    }
368
369
0
    TabletMetaSharedPtr tablet_meta;
370
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
371
0
    if (!st.ok()) {
  Branch (371:9): [True: 0, False: 0]
372
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (372:13): [True: 0, False: 0]
373
0
            clear_cache();
374
0
        }
375
0
        return st;
376
0
    }
377
378
0
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
  Branch (378:9): [True: 0, False: 0]
379
        // MoW may go to here when load while schema change
380
0
        return Status::OK();
381
0
    }
382
383
0
    TimestampedVersionTracker empty_tracker;
384
0
    {
385
0
        std::lock_guard wlock(_meta_lock);
386
0
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
387
0
        _rs_version_map.clear();
388
0
        _stale_rs_version_map.clear();
389
0
        std::swap(_timestamped_version_tracker, empty_tracker);
390
0
        _tablet_meta->clear_rowsets();
391
0
        _tablet_meta->clear_stale_rowset();
392
0
        _max_version = -1;
393
0
    }
394
395
0
    st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, false, true, false, stats);
396
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (396:9): [True: 0, False: 0]
397
0
        clear_cache();
398
0
    }
399
0
    return st;
400
0
}
401
402
0
TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
403
0
    std::shared_lock rlock(_meta_lock);
404
0
    return _merged_tablet_schema;
405
0
}
406
407
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
408
                              std::unique_lock<std::shared_mutex>& meta_lock,
409
247
                              bool warmup_delta_data) {
410
247
    if (to_add.empty()) {
  Branch (410:9): [True: 0, False: 247]
411
0
        return;
412
0
    }
413
414
247
    auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
415
732
        for (auto& rs : rowsets) {
  Branch (415:23): [True: 732, False: 247]
416
732
            if (version_overlap || warmup_delta_data) {
  Branch (416:17): [True: 0, False: 732]
  Branch (416:36): [True: 0, False: 732]
417
#ifndef BE_TEST
418
                bool warm_up_state_updated = false;
419
                // Warmup rowset data in background
420
                for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
421
                    const auto& rowset_meta = rs->rowset_meta();
422
                    constexpr int64_t interval = 600; // 10 mins
423
                    // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
424
                    // So we need to filter out the old rowsets avoid to download the whole table.
425
                    if (warmup_delta_data &&
426
                        ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
427
                        continue;
428
                    }
429
430
                    auto storage_resource = rowset_meta->remote_storage_resource();
431
                    if (!storage_resource) {
432
                        LOG(WARNING) << storage_resource.error();
433
                        continue;
434
                    }
435
436
                    int64_t expiration_time =
437
                            _tablet_meta->ttl_seconds() == 0 ||
438
                                            rowset_meta->newest_write_timestamp() <= 0
439
                                    ? 0
440
                                    : rowset_meta->newest_write_timestamp() +
441
                                              _tablet_meta->ttl_seconds();
442
                    g_file_cache_cloud_tablet_submitted_segment_num << 1;
443
                    if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
444
                        g_file_cache_cloud_tablet_submitted_segment_size
445
                                << rs->rowset_meta()->segment_file_size(seg_id);
446
                    }
447
                    if (!warm_up_state_updated) {
448
                        VLOG_DEBUG << "warm up rowset " << rs->version() << "(" << rs->rowset_id()
449
                                   << ") triggerd by sync rowset";
450
                        if (!add_rowset_warmup_state_unlocked(*(rs->rowset_meta()),
451
                                                              WarmUpTriggerSource::SYNC_ROWSET)) {
452
                            LOG(INFO) << "found duplicate warmup task for rowset "
453
                                      << rs->rowset_id() << ", skip it";
454
                            break;
455
                        }
456
                        warm_up_state_updated = true;
457
                    }
458
                    // clang-format off
459
                    auto self = std::dynamic_pointer_cast<CloudTablet>(shared_from_this());
460
                    _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
461
                            .path = storage_resource.value()->remote_segment_path(*rowset_meta,
462
                                                                                  seg_id),
463
                            .file_size = rs->rowset_meta()->segment_file_size(seg_id),
464
                            .file_system = storage_resource.value()->fs,
465
                            .ctx =
466
                                    {
467
                                            .expiration_time = expiration_time,
468
                                            .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
469
                                            .is_warmup = true
470
                                    },
471
                            .download_done {[=](Status st) {
472
                                DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_data.callback.block_compaction_rowset", {
473
                                            if (rs->version().second > rs->version().first) {
474
                                                auto sleep_time = dp->param<int>("sleep", 3);
475
                                                LOG_INFO(
476
                                                        "[verbose] block download for rowset={}, "
477
                                                        "version={}, sleep={}",
478
                                                        rs->rowset_id().to_string(),
479
                                                        rs->version().to_string(), sleep_time);
480
                                                std::this_thread::sleep_for(
481
                                                        std::chrono::seconds(sleep_time));
482
                                            }
483
                                });
484
                                self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 1, 0);
485
                                if (!st) {
486
                                    LOG_WARNING("add rowset warm up error ").error(st);
487
                                }
488
                            }},
489
                    });
490
491
                    auto download_idx_file = [&, self](const io::Path& idx_path, int64_t idx_size) {
492
                        io::DownloadFileMeta meta {
493
                                .path = idx_path,
494
                                .file_size = idx_size,
495
                                .file_system = storage_resource.value()->fs,
496
                                .ctx =
497
                                        {
498
                                                .expiration_time = expiration_time,
499
                                                .is_dryrun = config::enable_reader_dryrun_when_download_file_cache,
500
                                                .is_warmup = true
501
                                        },
502
                                .download_done {[=](Status st) {
503
                                    DBUG_EXECUTE_IF("CloudTablet::add_rowsets.download_idx.callback.block", {
504
                                                // clang-format on
505
                                                auto sleep_time = dp->param<int>("sleep", 3);
506
                                                LOG_INFO(
507
                                                        "[verbose] block download for "
508
                                                        "rowset={}, inverted_idx_file={}, "
509
                                                        "sleep={}",
510
                                                        rs->rowset_id().to_string(),
511
                                                        idx_path.string(), sleep_time);
512
                                                std::this_thread::sleep_for(
513
                                                        std::chrono::seconds(sleep_time));
514
                                                // clang-format off
515
                                    });
516
                                    self->complete_rowset_segment_warmup(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), st, 0, 1);
517
                                    if (!st) {
518
                                        LOG_WARNING("add rowset warm up error ").error(st);
519
                                    }
520
                                }},
521
                        };
522
                        self->update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource::SYNC_ROWSET, rowset_meta->rowset_id(), 1);
523
                        _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
524
                        g_file_cache_cloud_tablet_submitted_index_num << 1;
525
                        g_file_cache_cloud_tablet_submitted_index_size << idx_size;
526
                    };
527
                    // clang-format on
528
                    auto schema_ptr = rowset_meta->tablet_schema();
529
                    auto idx_version = schema_ptr->get_inverted_index_storage_format();
530
                    if (idx_version == InvertedIndexStorageFormatPB::V1) {
531
                        std::unordered_map<int64_t, int64_t> index_size_map;
532
                        auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
533
                        for (const auto& info : inverted_index_info.index_info()) {
534
                            if (info.index_file_size() != -1) {
535
                                index_size_map[info.index_id()] = info.index_file_size();
536
                            } else {
537
                                VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
538
                                           << ", index_id " << info.index_id();
539
                            }
540
                        }
541
                        for (const auto& index : schema_ptr->inverted_indexes()) {
542
                            auto idx_path = storage_resource.value()->remote_idx_v1_path(
543
                                    *rowset_meta, seg_id, index->index_id(),
544
                                    index->get_index_suffix());
545
                            download_idx_file(idx_path, index_size_map[index->index_id()]);
546
                        }
547
                    } else {
548
                        if (schema_ptr->has_inverted_index()) {
549
                            auto&& inverted_index_info =
550
                                    rowset_meta->inverted_index_file_info(seg_id);
551
                            int64_t idx_size = 0;
552
                            if (inverted_index_info.has_index_size()) {
553
                                idx_size = inverted_index_info.index_size();
554
                            } else {
555
                                VLOG_DEBUG << "index_size is not set for segment " << seg_id;
556
                            }
557
                            auto idx_path = storage_resource.value()->remote_idx_v2_path(
558
                                    *rowset_meta, seg_id);
559
                            download_idx_file(idx_path, idx_size);
560
                        }
561
                    }
562
                }
563
#endif
564
0
            }
565
732
            _rs_version_map.emplace(rs->version(), rs);
566
732
            _timestamped_version_tracker.add_version(rs->version());
567
732
            _max_version = std::max(rs->end_version(), _max_version);
568
732
            update_base_size(*rs);
569
732
        }
570
247
        _tablet_meta->add_rowsets_unchecked(rowsets);
571
247
    };
572
573
247
    if (!version_overlap) {
  Branch (573:9): [True: 247, False: 0]
574
247
        add_rowsets_directly(to_add);
575
247
        return;
576
247
    }
577
578
    // Filter out existed rowsets
579
0
    auto remove_it =
580
0
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
581
0
                if (auto find_it = _rs_version_map.find(rs->version());
582
0
                    find_it == _rs_version_map.end()) {
  Branch (582:21): [True: 0, False: 0]
583
0
                    return false;
584
0
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
  Branch (584:28): [True: 0, False: 0]
585
0
                    return true; // Same rowset
586
0
                }
587
588
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
589
                // replace existed rowset with `to_add` rowset. This may occur when:
590
                //  1. schema change converts rowsets which have been double written to new tablet
591
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
592
593
                // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
594
0
                std::vector<RowsetSharedPtr> unused_rowsets;
595
0
                if (auto find_it = _rs_version_map.find(rs->version());
596
0
                    find_it != _rs_version_map.end()) {
  Branch (596:21): [True: 0, False: 0]
597
0
                    DCHECK(find_it->second->rowset_id() != rs->rowset_id())
598
0
                            << "tablet_id=" << tablet_id()
599
0
                            << ", rowset_id=" << rs->rowset_id().to_string()
600
0
                            << ", existed rowset_id=" << find_it->second->rowset_id().to_string();
601
0
                    unused_rowsets.push_back(find_it->second);
602
0
                }
603
0
                add_unused_rowsets(unused_rowsets);
604
605
0
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
606
0
                _rs_version_map[rs->version()] = rs;
607
0
                _tablet_meta->add_rowsets_unchecked({rs});
608
0
                update_base_size(*rs);
609
0
                return true;
610
0
            });
611
612
0
    to_add.erase(remove_it, to_add.end());
613
614
    // delete rowsets with overlapped version
615
0
    std::vector<RowsetSharedPtr> to_add_directly;
616
0
    for (auto& to_add_rs : to_add) {
  Branch (616:26): [True: 0, False: 0]
617
        // delete rowsets with overlapped version
618
0
        std::vector<RowsetSharedPtr> to_delete;
619
0
        Version to_add_v = to_add_rs->version();
620
        // if start_version  > max_version, we can skip checking overlap here.
621
0
        if (to_add_v.first > _max_version) {
  Branch (621:13): [True: 0, False: 0]
622
            // if start_version  > max_version, we can skip checking overlap here.
623
0
            to_add_directly.push_back(to_add_rs);
624
0
        } else {
625
0
            to_add_directly.push_back(to_add_rs);
626
0
            for (auto& [v, rs] : _rs_version_map) {
  Branch (626:32): [True: 0, False: 0]
627
0
                if (to_add_v.contains(v)) {
  Branch (627:21): [True: 0, False: 0]
628
0
                    to_delete.push_back(rs);
629
0
                }
630
0
            }
631
0
            delete_rowsets(to_delete, meta_lock);
632
0
        }
633
0
    }
634
635
0
    add_rowsets_directly(to_add_directly);
636
0
}
637
638
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
639
94
                                 std::unique_lock<std::shared_mutex>&) {
640
94
    if (to_delete.empty()) {
  Branch (640:9): [True: 0, False: 94]
641
0
        return;
642
0
    }
643
94
    std::vector<RowsetMetaSharedPtr> rs_metas;
644
94
    rs_metas.reserve(to_delete.size());
645
94
    int64_t now = ::time(nullptr);
646
540
    for (auto&& rs : to_delete) {
  Branch (646:20): [True: 540, False: 94]
647
540
        rs->rowset_meta()->set_stale_at(now);
648
540
        rs_metas.push_back(rs->rowset_meta());
649
540
        _stale_rs_version_map[rs->version()] = rs;
650
540
    }
651
94
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
652
540
    for (auto&& rs : to_delete) {
  Branch (652:20): [True: 540, False: 94]
653
540
        _rs_version_map.erase(rs->version());
654
540
    }
655
656
94
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
657
94
}
658
659
0
uint64_t CloudTablet::delete_expired_stale_rowsets() {
660
0
    if (config::enable_mow_verbose_log) {
  Branch (660:9): [True: 0, False: 0]
661
0
        LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
Line
Count
Source
118
0
#define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
662
0
    }
663
0
    std::vector<RowsetSharedPtr> expired_rowsets;
664
    // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
665
0
    std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
666
0
    int64_t expired_stale_sweep_endtime =
667
0
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
668
0
    {
669
0
        std::unique_lock wlock(_meta_lock);
670
671
0
        std::vector<int64_t> path_ids;
672
        // capture the path version to delete
673
0
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
674
675
0
        if (path_ids.empty()) {
  Branch (675:13): [True: 0, False: 0]
676
0
            return 0;
677
0
        }
678
679
0
        for (int64_t path_id : path_ids) {
  Branch (679:30): [True: 0, False: 0]
680
0
            int64_t start_version = -1;
681
0
            int64_t end_version = -1;
682
0
            std::vector<RowsetSharedPtr> stale_rowsets;
683
            // delete stale versions in version graph
684
0
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
685
0
            for (auto& v_ts : version_path->timestamped_versions()) {
  Branch (685:29): [True: 0, False: 0]
686
0
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
687
0
                if (rs_it != _stale_rs_version_map.end()) {
  Branch (687:21): [True: 0, False: 0]
688
0
                    expired_rowsets.push_back(rs_it->second);
689
0
                    stale_rowsets.push_back(rs_it->second);
690
0
                    VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id()
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
691
0
                               << " rowset_id=" << rs_it->second->rowset_id().to_string()
692
0
                               << " version=" << rs_it->first.to_string();
693
0
                    _stale_rs_version_map.erase(rs_it);
694
0
                } else {
695
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
696
0
                                 << tablet_id();
697
                    // clang-format off
698
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
699
                    // clang-format on
700
0
                }
701
0
                if (start_version < 0) {
  Branch (701:21): [True: 0, False: 0]
702
0
                    start_version = v_ts->version().first;
703
0
                }
704
0
                end_version = v_ts->version().second;
705
0
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
706
0
            }
707
0
            Version version(start_version, end_version);
708
0
            if (!stale_rowsets.empty()) {
  Branch (708:17): [True: 0, False: 0]
709
0
                deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
710
0
            }
711
0
        }
712
0
        _reconstruct_version_tracker_if_necessary();
713
0
    }
714
0
    auto recycled_rowsets = recycle_cached_data(expired_rowsets);
715
0
    if (!recycled_rowsets.empty()) {
  Branch (715:9): [True: 0, False: 0]
716
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
717
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
718
0
    }
719
0
    if (config::enable_mow_verbose_log) {
  Branch (719:9): [True: 0, False: 0]
720
0
        LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
Line
Count
Source
118
0
#define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
721
0
    }
722
723
0
    add_unused_rowsets(expired_rowsets);
724
0
    if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS &&
  Branch (724:9): [True: 0, False: 0]
  Branch (724:68): [True: 0, False: 0]
725
0
        enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) {
  Branch (725:9): [True: 0, False: 0]
  Branch (725:47): [True: 0, False: 0]
726
        // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
727
0
        OlapStopWatch watch;
728
0
        for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
  Branch (728:52): [True: 0, False: 0]
729
            // agg delete bitmap for pre rowset
730
0
            DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
731
0
            agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
732
            // add remove delete bitmap
733
0
            if (!remove_delete_bitmap_key_ranges.empty()) {
  Branch (733:17): [True: 0, False: 0]
734
0
                std::vector<RowsetId> rowset_ids;
735
0
                for (const auto& rs : unused_rowsets) {
  Branch (735:37): [True: 0, False: 0]
736
0
                    rowset_ids.push_back(rs->rowset_id());
737
0
                }
738
0
                std::lock_guard<std::mutex> lock(_gc_mutex);
739
0
                _unused_delete_bitmap.push_back(
740
0
                        std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
741
0
            }
742
0
        }
743
0
        LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
744
0
                  << ", size=" << deleted_stale_rowsets.size()
745
0
                  << ", cost(us)=" << watch.get_elapse_time_us();
746
0
    }
747
0
    return expired_rowsets.size();
748
0
}
749
750
0
bool CloudTablet::need_remove_unused_rowsets() {
751
0
    std::lock_guard<std::mutex> lock(_gc_mutex);
752
0
    return !_unused_rowsets.empty();
753
0
}
754
755
0
void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
756
0
    std::lock_guard<std::mutex> lock(_gc_mutex);
757
0
    for (const auto& rowset : rowsets) {
  Branch (757:29): [True: 0, False: 0]
758
0
        _unused_rowsets[rowset->rowset_id()] = rowset;
759
0
    }
760
0
    g_unused_rowsets_count << rowsets.size();
761
0
}
762
763
0
void CloudTablet::remove_unused_rowsets() {
764
0
    int64_t removed_delete_bitmap_num = 0;
765
0
    std::vector<std::shared_ptr<Rowset>> removed_rowsets;
766
0
    OlapStopWatch watch;
767
768
0
    {
769
0
        std::lock_guard<std::mutex> lock(_gc_mutex);
770
        // 1. remove unused rowsets's cache data and delete bitmap
771
0
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
  Branch (771:49): [True: 0, False: 0]
772
0
            auto& rs = it->second;
773
0
            if (rs.use_count() > 1) {
  Branch (773:17): [True: 0, False: 0]
774
0
                LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
775
0
                             << " has " << rs.use_count() << " references, it cannot be removed";
776
0
                ++it;
777
0
                continue;
778
0
            }
779
0
            tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
780
0
            _rowset_warm_up_states.erase(rs->rowset_id());
781
0
            rs->clear_cache();
782
0
            removed_rowsets.push_back(std::move(rs));
783
0
            g_unused_rowsets_count << -1;
784
0
            it = _unused_rowsets.erase(it);
785
0
        }
786
0
    }
787
788
0
    {
789
0
        std::vector<RecycledRowsets> recycled_rowsets;
790
791
0
        for (auto& rs : removed_rowsets) {
  Branch (791:23): [True: 0, False: 0]
792
0
            auto index_names = rs->get_index_file_names();
793
0
            recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
794
0
            int64_t segment_size_sum = 0;
795
0
            for (int32_t i = 0; i < rs->num_segments(); i++) {
  Branch (795:33): [True: 0, False: 0]
796
0
                segment_size_sum += rs->rowset_meta()->segment_file_size(i);
797
0
            }
798
0
            g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
799
0
            g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
800
0
            g_file_cache_recycle_cached_data_index_num << index_names.size();
801
0
        }
802
803
0
        if (recycled_rowsets.size() > 0) {
  Branch (803:13): [True: 0, False: 0]
804
0
            auto& manager =
805
0
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
806
0
            manager.recycle_cache(tablet_id(), recycled_rowsets);
807
0
        }
808
0
    }
809
810
    // 2. remove delete bitmap of pre rowsets
811
0
    for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
  Branch (811:51): [True: 0, False: 0]
812
0
        auto& rowset_ids = std::get<0>(*it);
813
0
        bool find_unused_rowset = false;
814
0
        for (const auto& rowset_id : rowset_ids) {
  Branch (814:36): [True: 0, False: 0]
815
0
            if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
  Branch (815:17): [True: 0, False: 0]
816
0
                LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
817
0
                          << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
818
0
                find_unused_rowset = true;
819
0
                break;
820
0
            }
821
0
        }
822
0
        if (find_unused_rowset) {
  Branch (822:13): [True: 0, False: 0]
823
0
            ++it;
824
0
            continue;
825
0
        }
826
0
        auto& key_ranges = std::get<1>(*it);
827
0
        tablet_meta()->delete_bitmap()->remove(key_ranges);
828
0
        it = _unused_delete_bitmap.erase(it);
829
0
        removed_delete_bitmap_num++;
830
0
    }
831
832
0
    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
833
0
              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
834
0
              << ", removed_rowsets_num=" << removed_rowsets.size()
835
0
              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
836
0
              << ", cost(us)=" << watch.get_elapse_time_us();
837
0
}
838
839
732
void CloudTablet::update_base_size(const Rowset& rs) {
840
    // Define base rowset as the rowset of version [2-x]
841
732
    if (rs.start_version() == 2) {
  Branch (841:9): [True: 98, False: 634]
842
98
        _base_size = rs.total_disk_size();
843
98
    }
844
732
}
845
846
0
void CloudTablet::clear_cache() {
847
0
    auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
848
0
    if (!recycled_rowsets.empty()) {
  Branch (848:9): [True: 0, False: 0]
849
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
850
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
851
0
    }
852
0
    _engine.tablet_mgr().erase_tablet(tablet_id());
853
0
}
854
855
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
856
0
        const std::vector<RowsetSharedPtr>& rowsets) {
857
0
    std::vector<RecycledRowsets> recycled_rowsets;
858
0
    for (const auto& rs : rowsets) {
  Branch (858:25): [True: 0, False: 0]
859
        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
860
0
        if (rs.use_count() > 2) {
  Branch (860:13): [True: 0, False: 0]
861
0
            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
862
0
                         << " references. File Cache won't be recycled when query is using it.";
863
0
            continue;
864
0
        }
865
0
        rs->clear_cache();
866
0
        auto index_names = rs->get_index_file_names();
867
0
        recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
868
869
0
        int64_t segment_size_sum = 0;
870
0
        for (int32_t i = 0; i < rs->num_segments(); i++) {
  Branch (870:29): [True: 0, False: 0]
871
0
            segment_size_sum += rs->rowset_meta()->segment_file_size(i);
872
0
        }
873
0
        g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
874
0
        g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
875
0
        g_file_cache_recycle_cached_data_index_num << index_names.size();
876
0
    }
877
0
    return recycled_rowsets;
878
0
}
879
880
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
881
0
                                          int64_t num_rows, int64_t data_size) {
882
0
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
883
0
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
884
0
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
885
0
    int64_t cumu_num_deltas = 0;
886
0
    int64_t cumu_num_rowsets = 0;
887
0
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
888
0
    for (auto& [v, r] : _rs_version_map) {
  Branch (888:23): [True: 0, False: 0]
889
0
        if (v.second < cp) {
  Branch (889:13): [True: 0, False: 0]
890
0
            continue;
891
0
        }
892
0
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
  Branch (892:28): [True: 0, False: 0]
893
0
        ++cumu_num_rowsets;
894
0
    }
895
    // num_rowsets may be less than the size of _rs_version_map when there are some hole rowsets
896
    // in the version map, so we use the max value to ensure that the approximate number
897
    // of rowsets is at least the size of _rs_version_map.
898
    // Note that this is not the exact number of rowsets, but an approximate number.
899
0
    int64_t approximate_num_rowsets =
900
0
            std::max(num_rowsets, static_cast<int64_t>(_rs_version_map.size()));
901
0
    _approximate_num_rowsets.store(approximate_num_rowsets, std::memory_order_relaxed);
902
0
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
903
0
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
904
0
}
905
906
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
907
0
        RowsetWriterContext& context, bool vertical) {
908
0
    context.rowset_id = _engine.next_rowset_id();
909
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
910
0
    context.tablet_id = tablet_id();
911
0
    context.index_id = index_id();
912
0
    context.partition_id = partition_id();
913
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
914
0
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
915
0
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
916
0
}
917
918
// create a rowset writer with rowset_id and seg_id
919
// after writer, merge this transient rowset with original rowset
920
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
921
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
922
0
        int64_t txn_expiration) {
923
0
    if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
  Branch (923:9): [True: 0, False: 0]
924
0
        rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
  Branch (924:9): [True: 0, False: 0]
925
0
        auto msg = fmt::format(
926
0
                "wrong rowset state when create_transient_rowset_writer, rowset state should be "
927
0
                "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
928
0
                RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
929
0
                tablet_id());
930
        // see `CloudRowsetWriter::build` for detail.
931
        // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
932
        // in `RowsetMeta::merge_rowset_meta()` in previous trials.
933
0
        LOG(WARNING) << msg;
934
0
        DCHECK(false) << msg;
935
0
    }
936
0
    RowsetWriterContext context;
937
0
    context.rowset_state = PREPARED;
938
0
    context.segments_overlap = OVERLAPPING;
939
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
940
    // This is because the partial update for a variant needs to ignore the extracted columns.
941
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
942
    // the complete variant is constructed by reading all the sub-columns of the variant.
943
0
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
944
0
    context.newest_write_timestamp = UnixSeconds();
945
0
    context.tablet_id = table_id();
946
0
    context.enable_segcompaction = false;
947
0
    context.write_type = DataWriteType::TYPE_DIRECT;
948
0
    context.partial_update_info = std::move(partial_update_info);
949
0
    context.is_transient_rowset_writer = true;
950
0
    context.rowset_id = rowset.rowset_id();
951
0
    context.tablet_id = tablet_id();
952
0
    context.index_id = index_id();
953
0
    context.partition_id = partition_id();
954
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
955
0
    context.txn_expiration = txn_expiration;
956
0
    context.encrypt_algorithm = tablet_meta()->encryption_algorithm();
957
958
0
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
959
0
    if (!storage_resource) {
  Branch (959:9): [True: 0, False: 0]
960
0
        return ResultError(std::move(storage_resource.error()));
961
0
    }
962
963
0
    context.storage_resource = *storage_resource.value();
964
965
0
    return RowsetFactory::create_rowset_writer(_engine, context, false)
966
0
            .transform([&](auto&& writer) {
967
0
                writer->set_segment_start_id(static_cast<int32_t>(rowset.num_segments()));
968
0
                return writer;
969
0
            });
970
0
}
971
972
3
int64_t CloudTablet::get_cloud_base_compaction_score() const {
973
3
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
  Branch (973:9): [True: 0, False: 3]
974
0
        bool has_delete = false;
975
0
        int64_t point = cumulative_layer_point();
976
0
        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
977
0
        for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
  Branch (977:34): [True: 0, False: 0]
978
0
            if (rs_meta->start_version() >= point) {
  Branch (978:17): [True: 0, False: 0]
979
0
                continue;
980
0
            }
981
0
            if (rs_meta->has_delete_predicate()) {
  Branch (981:17): [True: 0, False: 0]
982
0
                has_delete = true;
983
0
                break;
984
0
            }
985
0
        }
986
0
        if (!has_delete) {
  Branch (986:13): [True: 0, False: 0]
987
0
            return 0;
988
0
        }
989
0
    }
990
991
3
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
992
3
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
993
3
}
994
995
1
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
996
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
997
    //  number of tablet versions simultaneously.
998
1
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
999
1
}
1000
1001
// return a json string to show the compaction status of this tablet
1002
33
void CloudTablet::get_compaction_status(std::string* json_result) {
1003
33
    rapidjson::Document root;
1004
33
    root.SetObject();
1005
1006
33
    rapidjson::Document path_arr;
1007
33
    path_arr.SetArray();
1008
1009
33
    std::vector<RowsetSharedPtr> rowsets;
1010
33
    std::vector<RowsetSharedPtr> stale_rowsets;
1011
33
    {
1012
33
        std::shared_lock rdlock(_meta_lock);
1013
33
        rowsets.reserve(_rs_version_map.size());
1014
148
        for (auto& it : _rs_version_map) {
  Branch (1014:23): [True: 148, False: 33]
1015
148
            rowsets.push_back(it.second);
1016
148
        }
1017
33
        stale_rowsets.reserve(_stale_rs_version_map.size());
1018
540
        for (auto& it : _stale_rs_version_map) {
  Branch (1018:23): [True: 540, False: 33]
1019
540
            stale_rowsets.push_back(it.second);
1020
540
        }
1021
33
    }
1022
33
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
1023
33
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
1024
1025
    // get snapshot version path json_doc
1026
33
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
1027
33
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
1028
33
    rapidjson::Value cumu_value;
1029
33
    std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
1030
33
    cumu_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1031
33
                         root.GetAllocator());
1032
33
    root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
1033
33
    rapidjson::Value base_value;
1034
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
1035
33
    base_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1036
33
                         root.GetAllocator());
1037
33
    root.AddMember("last base failure time", base_value, root.GetAllocator());
1038
33
    rapidjson::Value full_value;
1039
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
1040
33
    full_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1041
33
                         root.GetAllocator());
1042
33
    root.AddMember("last full failure time", full_value, root.GetAllocator());
1043
33
    rapidjson::Value cumu_success_value;
1044
33
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
1045
33
    cumu_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1046
33
                                 root.GetAllocator());
1047
33
    root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
1048
33
    rapidjson::Value base_success_value;
1049
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
1050
33
    base_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1051
33
                                 root.GetAllocator());
1052
33
    root.AddMember("last base success time", base_success_value, root.GetAllocator());
1053
33
    rapidjson::Value full_success_value;
1054
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
1055
33
    full_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1056
33
                                 root.GetAllocator());
1057
33
    root.AddMember("last full success time", full_success_value, root.GetAllocator());
1058
33
    rapidjson::Value cumu_schedule_value;
1059
33
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
1060
33
    cumu_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1061
33
                                  root.GetAllocator());
1062
33
    root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
1063
33
    rapidjson::Value base_schedule_value;
1064
33
    format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
1065
33
    base_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1066
33
                                  root.GetAllocator());
1067
33
    root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
1068
33
    rapidjson::Value full_schedule_value;
1069
33
    format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
1070
33
    full_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
1071
33
                                  root.GetAllocator());
1072
33
    root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
1073
33
    rapidjson::Value cumu_compaction_status_value;
1074
33
    cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
1075
33
                                           static_cast<uint>(_last_cumu_compaction_status.length()),
1076
33
                                           root.GetAllocator());
1077
33
    root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
1078
33
    rapidjson::Value base_compaction_status_value;
1079
33
    base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
1080
33
                                           static_cast<uint>(_last_base_compaction_status.length()),
1081
33
                                           root.GetAllocator());
1082
33
    root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
1083
33
    rapidjson::Value full_compaction_status_value;
1084
33
    full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
1085
33
                                           static_cast<uint>(_last_full_compaction_status.length()),
1086
33
                                           root.GetAllocator());
1087
33
    root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());
1088
33
    rapidjson::Value exec_compaction_time;
1089
33
    std::string num_str {std::to_string(exec_compaction_time_us.load())};
1090
33
    exec_compaction_time.SetString(num_str.c_str(), static_cast<uint>(num_str.length()),
1091
33
                                   root.GetAllocator());
1092
33
    root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator());
1093
33
    rapidjson::Value local_read_time;
1094
33
    num_str = std::to_string(local_read_time_us.load());
1095
33
    local_read_time.SetString(num_str.c_str(), static_cast<uint>(num_str.length()),
1096
33
                              root.GetAllocator());
1097
33
    root.AddMember("compaction local read time us", local_read_time, root.GetAllocator());
1098
33
    rapidjson::Value remote_read_time;
1099
33
    num_str = std::to_string(remote_read_time_us.load());
1100
33
    remote_read_time.SetString(num_str.c_str(), static_cast<uint>(num_str.length()),
1101
33
                               root.GetAllocator());
1102
33
    root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator());
1103
1104
    // print all rowsets' version as an array
1105
33
    rapidjson::Document versions_arr;
1106
33
    rapidjson::Document missing_versions_arr;
1107
33
    versions_arr.SetArray();
1108
33
    missing_versions_arr.SetArray();
1109
33
    int64_t last_version = -1;
1110
148
    for (auto& rowset : rowsets) {
  Branch (1110:23): [True: 148, False: 33]
1111
148
        const Version& ver = rowset->version();
1112
148
        if (ver.first != last_version + 1) {
  Branch (1112:13): [True: 0, False: 148]
1113
0
            rapidjson::Value miss_value;
1114
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
1115
0
                                 missing_versions_arr.GetAllocator());
1116
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
1117
0
        }
1118
148
        rapidjson::Value value;
1119
148
        std::string version_str = rowset->get_rowset_info_str();
1120
148
        value.SetString(version_str.c_str(), static_cast<uint32_t>(version_str.length()),
1121
148
                        versions_arr.GetAllocator());
1122
148
        versions_arr.PushBack(value, versions_arr.GetAllocator());
1123
148
        last_version = ver.second;
1124
148
    }
1125
33
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
1126
33
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
1127
1128
    // print all stale rowsets' version as an array
1129
33
    rapidjson::Document stale_versions_arr;
1130
33
    stale_versions_arr.SetArray();
1131
540
    for (auto& rowset : stale_rowsets) {
  Branch (1131:23): [True: 540, False: 33]
1132
540
        rapidjson::Value value;
1133
540
        std::string version_str = rowset->get_rowset_info_str();
1134
540
        value.SetString(version_str.c_str(), static_cast<uint32_t>(version_str.length()),
1135
540
                        stale_versions_arr.GetAllocator());
1136
540
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
1137
540
    }
1138
33
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
1139
1140
    // add stale version rowsets
1141
33
    root.AddMember("stale version path", path_arr, root.GetAllocator());
1142
1143
    // to json string
1144
33
    rapidjson::StringBuffer strbuf;
1145
33
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
1146
33
    root.Accept(writer);
1147
33
    *json_result = std::string(strbuf.GetString());
1148
33
}
1149
1150
0
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
1151
0
    if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
  Branch (1151:9): [True: 0, False: 0]
  Branch (1151:60): [True: 0, False: 0]
1152
0
        _cumulative_point = new_point;
1153
0
        return;
1154
0
    }
1155
    // cumulative point should only be reset to -1, or be increased
1156
    // FIXME: could happen in currently unresolved race conditions
1157
0
    LOG(WARNING) << "Unexpected cumulative point: " << new_point
1158
0
                 << ", origin: " << _cumulative_point.load();
1159
0
}
1160
1161
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
1162
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
1163
0
    {
1164
0
        std::shared_lock rlock(_meta_lock);
1165
0
        for (const auto& [version, rs] : _rs_version_map) {
  Branch (1165:40): [True: 0, False: 0]
1166
0
            if (version.first != 0 && version.first < _cumulative_point &&
  Branch (1166:17): [True: 0, False: 0]
  Branch (1166:39): [True: 0, False: 0]
1167
0
                (_alter_version == -1 || version.second <= _alter_version)) {
  Branch (1167:18): [True: 0, False: 0]
  Branch (1167:42): [True: 0, False: 0]
1168
0
                candidate_rowsets.push_back(rs);
1169
0
            }
1170
0
        }
1171
0
    }
1172
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1173
0
    return candidate_rowsets;
1174
0
}
1175
1176
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
1177
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
1178
0
    {
1179
0
        std::shared_lock rlock(_meta_lock);
1180
0
        for (auto& [v, rs] : _rs_version_map) {
  Branch (1180:28): [True: 0, False: 0]
1181
            // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
1182
0
            if (v.first != 0) {
  Branch (1182:17): [True: 0, False: 0]
1183
0
                candidate_rowsets.push_back(rs);
1184
0
            }
1185
0
        }
1186
0
    }
1187
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
1188
0
    return candidate_rowsets;
1189
0
}
1190
1191
0
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
1192
0
    return _engine.calc_delete_bitmap_executor();
1193
0
}
1194
1195
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
1196
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
1197
                                       const RowsetIdUnorderedSet& cur_rowset_ids,
1198
0
                                       int64_t next_visible_version) {
1199
0
    RowsetSharedPtr rowset = txn_info->rowset;
1200
0
    int64_t cur_version = rowset->start_version();
1201
    // update delete bitmap info, in order to avoid recalculation when trying again
1202
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1203
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
1204
1205
0
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
  Branch (1205:9): [True: 0, False: 0]
  Branch (1205:42): [True: 0, False: 0]
1206
0
        rowset_writer->num_rows() > 0) {
  Branch (1206:9): [True: 0, False: 0]
1207
0
        DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
1208
0
            return Status::InternalError<false>("injected update_tmp_rowset error.");
1209
0
        });
1210
0
        const auto& rowset_meta = rowset->rowset_meta();
1211
0
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1212
0
    }
1213
1214
0
    RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1215
0
            save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, next_visible_version));
1216
1217
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
1218
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
1219
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
1220
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1221
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
1222
0
            txn_info->publish_info));
1223
1224
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
1225
0
        auto sleep_sec = dp->param<int>("sleep", 5);
1226
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1227
0
    });
1228
1229
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
  Branch (41:15): [True: 0, False: 0]
42
0
        }                                                                     \
43
0
    }
1230
0
        auto retry = dp->param<bool>("retry", false);
1231
0
        auto sleep_sec = dp->param<int>("sleep", 0);
1232
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1233
0
        if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
1234
0
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1235
0
                    "injected DELETE_BITMAP_LOCK_ERROR");
1236
0
        } else {
1237
0
            return Status::InternalError<false>("injected non-retryable error");
1238
0
        }
1239
0
    });
1240
1241
0
    return Status::OK();
1242
0
}
1243
1244
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
1245
                                             DeleteBitmapPtr delete_bitmap,
1246
0
                                             int64_t next_visible_version) {
1247
0
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1248
0
    for (auto iter = delete_bitmap->delete_bitmap.begin();
1249
0
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
  Branch (1249:10): [True: 0, False: 0]
1250
        // skip sentinel mark, which is used for delete bitmap correctness check
1251
0
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
  Branch (1251:13): [True: 0, False: 0]
1252
0
            new_delete_bitmap->merge(
1253
0
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
1254
0
                    iter->second);
1255
0
        }
1256
0
    }
1257
1258
0
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, LOAD_INITIATOR_ID,
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1259
0
                                                            new_delete_bitmap.get(), txn_id, false,
1260
0
                                                            next_visible_version));
1261
0
    return Status::OK();
1262
0
}
1263
1264
0
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
1265
0
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
1266
1267
    // sort the existing versions in ascending order
1268
0
    std::sort(existing_versions.begin(), existing_versions.end(),
1269
0
              [](const Version& a, const Version& b) {
1270
                  // simple because 2 versions are certainly not overlapping
1271
0
                  return a.first < b.first;
1272
0
              });
1273
1274
    // From the first version(=0), find the missing version until spec_version
1275
0
    int64_t last_version = -1;
1276
0
    Versions missed_versions;
1277
0
    for (const Version& version : existing_versions) {
  Branch (1277:33): [True: 0, False: 0]
1278
0
        if (version.first > last_version + 1) {
  Branch (1278:13): [True: 0, False: 0]
1279
            // there is a hole between versions
1280
0
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
1281
0
        }
1282
0
        last_version = version.second;
1283
0
        if (last_version >= spec_version) {
  Branch (1283:13): [True: 0, False: 0]
1284
0
            break;
1285
0
        }
1286
0
    }
1287
0
    if (last_version < spec_version) {
  Branch (1287:9): [True: 0, False: 0]
1288
        // there is a hole between the last version and the specificed version.
1289
0
        missed_versions.emplace_back(last_version + 1, spec_version);
1290
0
    }
1291
0
    return missed_versions;
1292
0
}
1293
1294
Status CloudTablet::calc_delete_bitmap_for_compaction(
1295
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
1296
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
1297
        int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
1298
0
        bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) {
1299
0
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1300
0
    std::unique_ptr<RowLocationSet> missed_rows;
1301
0
    if ((config::enable_missing_rows_correctness_check ||
  Branch (1301:10): [True: 0, False: 0]
1302
0
         config::enable_mow_compaction_correctness_check_core) &&
  Branch (1302:10): [True: 0, False: 0]
1303
0
        !allow_delete_in_cumu_compaction &&
  Branch (1303:9): [True: 0, False: 0]
1304
0
        (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
  Branch (1304:10): [True: 0, False: 0]
1305
0
         !config::enable_prune_delete_sign_when_base_compaction)) {
  Branch (1305:10): [True: 0, False: 0]
1306
        // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false
1307
0
        missed_rows = std::make_unique<RowLocationSet>();
1308
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
1309
0
    }
1310
1311
0
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
1312
0
    if (config::enable_rowid_conversion_correctness_check) {
  Branch (1312:9): [True: 0, False: 0]
1313
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
1314
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
1315
0
    }
1316
1317
    // 1. calc delete bitmap for historical data
1318
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1319
0
    Version version = max_version();
1320
0
    std::size_t missed_rows_size = 0;
1321
0
    calc_compaction_output_rowset_delete_bitmap(
1322
0
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
1323
0
            location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1324
0
    if (missed_rows) {
  Branch (1324:9): [True: 0, False: 0]
1325
0
        missed_rows_size = missed_rows->size();
1326
0
        if (!allow_delete_in_cumu_compaction) {
  Branch (1326:13): [True: 0, False: 0]
1327
0
            if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
  Branch (1327:18): [True: 0, False: 0]
1328
0
                 !config::enable_prune_delete_sign_when_base_compaction) &&
  Branch (1328:18): [True: 0, False: 0]
1329
0
                tablet_state() == TABLET_RUNNING) {
  Branch (1329:17): [True: 0, False: 0]
1330
0
                if (merged_rows >= 0 && merged_rows != missed_rows_size) {
  Branch (1330:21): [True: 0, False: 0]
  Branch (1330:41): [True: 0, False: 0]
1331
0
                    std::string err_msg = fmt::format(
1332
0
                            "cumulative compaction: the merged rows({}) is not equal to missed "
1333
0
                            "rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
1334
0
                            merged_rows, missed_rows_size, tablet_id(), table_id());
1335
0
                    if (config::enable_mow_compaction_correctness_check_core) {
  Branch (1335:25): [True: 0, False: 0]
1336
0
                        CHECK(false) << err_msg;
1337
0
                    } else {
1338
0
                        DCHECK(false) << err_msg;
1339
0
                    }
1340
0
                }
1341
0
            }
1342
0
        }
1343
0
    }
1344
0
    if (location_map) {
  Branch (1344:9): [True: 0, False: 0]
1345
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1346
0
        location_map->clear();
1347
0
    }
1348
1349
    // 2. calc delete bitmap for incremental data
1350
0
    int64_t t1 = MonotonicMicros();
1351
0
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1352
0
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
1353
0
    int64_t t2 = MonotonicMicros();
1354
0
    get_delete_bitmap_lock_start_time = t2;
1355
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1356
0
    int64_t t3 = MonotonicMicros();
1357
1358
0
    calc_compaction_output_rowset_delete_bitmap(
1359
0
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
1360
0
            location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1361
0
    int64_t t4 = MonotonicMicros();
1362
0
    if (location_map) {
  Branch (1362:9): [True: 0, False: 0]
1363
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1364
0
    }
1365
0
    int64_t t5 = MonotonicMicros();
1366
1367
    // 3. store delete bitmap
1368
0
    auto st = _engine.meta_mgr().update_delete_bitmap(*this, -1, initiator,
1369
0
                                                      output_rowset_delete_bitmap.get());
1370
0
    int64_t t6 = MonotonicMicros();
1371
0
    LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
1372
0
              << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
1373
0
              << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
1374
0
              << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
1375
0
              << " us, st=" << st.to_string();
1376
0
    return st;
1377
0
}
1378
1379
void CloudTablet::agg_delete_bitmap_for_compaction(
1380
        int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
1381
        DeleteBitmapPtr& new_delete_bitmap,
1382
0
        std::map<std::string, int64_t>& pre_rowset_to_versions) {
1383
0
    for (auto& rowset : pre_rowsets) {
  Branch (1383:23): [True: 0, False: 0]
1384
0
        for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
  Branch (1384:35): [True: 0, False: 0]
1385
0
            auto d = tablet_meta()->delete_bitmap()->get_agg_without_cache(
1386
0
                    {rowset->rowset_id(), seg_id, end_version}, start_version);
1387
0
            if (d->isEmpty()) {
  Branch (1387:17): [True: 0, False: 0]
1388
0
                continue;
1389
0
            }
1390
0
            VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
1391
0
                       << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
1392
0
                       << ", rowset_version=" << rowset->version().to_string()
1393
0
                       << ". compaction start_version=" << start_version
1394
0
                       << ", end_version=" << end_version
1395
0
                       << ". delete_bitmap cardinality=" << d->cardinality();
1396
0
            DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
1397
0
            new_delete_bitmap->set(end_key, *d);
1398
0
            pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
1399
0
        }
1400
0
    }
1401
0
}
1402
1403
0
Status CloudTablet::sync_meta() {
1404
0
    if (!config::enable_file_cache) {
  Branch (1404:9): [True: 0, False: 0]
1405
0
        return Status::OK();
1406
0
    }
1407
1408
0
    TabletMetaSharedPtr tablet_meta;
1409
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
1410
0
    if (!st.ok()) {
  Branch (1410:9): [True: 0, False: 0]
1411
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (1411:13): [True: 0, False: 0]
1412
0
            clear_cache();
1413
0
        }
1414
0
        return st;
1415
0
    }
1416
1417
0
    auto new_ttl_seconds = tablet_meta->ttl_seconds();
1418
0
    if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
  Branch (1418:9): [True: 0, False: 0]
1419
0
        _tablet_meta->set_ttl_seconds(new_ttl_seconds);
1420
0
        int64_t cur_time = UnixSeconds();
1421
0
        std::shared_lock rlock(_meta_lock);
1422
0
        for (auto& [_, rs] : _rs_version_map) {
  Branch (1422:28): [True: 0, False: 0]
1423
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
  Branch (1423:34): [True: 0, False: 0]
1424
0
                int64_t new_expiration_time =
1425
0
                        new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
1426
0
                new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
  Branch (1426:39): [True: 0, False: 0]
1427
0
                auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
1428
0
                auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
1429
0
                file_cache->modify_expiration_time(file_key, new_expiration_time);
1430
0
            }
1431
0
        }
1432
0
    }
1433
1434
0
    auto new_compaction_policy = tablet_meta->compaction_policy();
1435
0
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
  Branch (1435:9): [True: 0, False: 0]
1436
0
        _tablet_meta->set_compaction_policy(new_compaction_policy);
1437
0
    }
1438
0
    auto new_time_series_compaction_goal_size_mbytes =
1439
0
            tablet_meta->time_series_compaction_goal_size_mbytes();
1440
0
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
  Branch (1440:9): [True: 0, False: 0]
1441
0
        new_time_series_compaction_goal_size_mbytes) {
1442
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
1443
0
                new_time_series_compaction_goal_size_mbytes);
1444
0
    }
1445
0
    auto new_time_series_compaction_file_count_threshold =
1446
0
            tablet_meta->time_series_compaction_file_count_threshold();
1447
0
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
  Branch (1447:9): [True: 0, False: 0]
1448
0
        new_time_series_compaction_file_count_threshold) {
1449
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
1450
0
                new_time_series_compaction_file_count_threshold);
1451
0
    }
1452
0
    auto new_time_series_compaction_time_threshold_seconds =
1453
0
            tablet_meta->time_series_compaction_time_threshold_seconds();
1454
0
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
  Branch (1454:9): [True: 0, False: 0]
1455
0
        new_time_series_compaction_time_threshold_seconds) {
1456
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
1457
0
                new_time_series_compaction_time_threshold_seconds);
1458
0
    }
1459
0
    auto new_time_series_compaction_empty_rowsets_threshold =
1460
0
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
1461
0
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
  Branch (1461:9): [True: 0, False: 0]
1462
0
        new_time_series_compaction_empty_rowsets_threshold) {
1463
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
1464
0
                new_time_series_compaction_empty_rowsets_threshold);
1465
0
    }
1466
0
    auto new_time_series_compaction_level_threshold =
1467
0
            tablet_meta->time_series_compaction_level_threshold();
1468
0
    if (_tablet_meta->time_series_compaction_level_threshold() !=
  Branch (1468:9): [True: 0, False: 0]
1469
0
        new_time_series_compaction_level_threshold) {
1470
0
        _tablet_meta->set_time_series_compaction_level_threshold(
1471
0
                new_time_series_compaction_level_threshold);
1472
0
    }
1473
1474
0
    return Status::OK();
1475
0
}
1476
1477
0
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
1478
0
    std::shared_lock rdlock(_meta_lock);
1479
0
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
1480
0
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
1481
    // Currently, this information will not be used by the cloud report,
1482
    // but it may be used in the future.
1483
0
}
1484
1485
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
1486
0
                                              DeleteBitmap* expected_delete_bitmap) {
1487
0
    DeleteBitmapPtr cached_delete_bitmap;
1488
0
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1489
0
    Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
1490
0
            txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
1491
0
    if (st.ok()) {
  Branch (1491:9): [True: 0, False: 0]
1492
0
        bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
1493
0
        auto msg = fmt::format(
1494
0
                "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
1495
0
                "txn_id={}, tablet_id={}",
1496
0
                expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
1497
0
                tablet_id());
1498
0
        if (!res) {
  Branch (1498:13): [True: 0, False: 0]
1499
0
            DCHECK(res) << msg;
1500
0
            return Status::InternalError<false>(msg);
1501
0
        }
1502
0
    }
1503
0
    return Status::OK();
1504
0
}
1505
1506
34
WarmUpState CloudTablet::get_rowset_warmup_state(RowsetId rowset_id) {
1507
34
    std::shared_lock rlock(_meta_lock);
1508
34
    if (!_rowset_warm_up_states.contains(rowset_id)) {
  Branch (1508:9): [True: 1, False: 33]
1509
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1510
1
    }
1511
33
    auto& warmup_info = _rowset_warm_up_states[rowset_id];
1512
33
    warmup_info.update_state();
1513
33
    return warmup_info.state;
1514
34
}
1515
1516
bool CloudTablet::add_rowset_warmup_state(const RowsetMeta& rowset, WarmUpTriggerSource source,
1517
34
                                          std::chrono::steady_clock::time_point start_tp) {
1518
34
    std::lock_guard wlock(_meta_lock);
1519
34
    return add_rowset_warmup_state_unlocked(rowset, source, start_tp);
1520
34
}
1521
1522
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num(WarmUpTriggerSource source,
1523
5
                                                              RowsetId rowset_id, int64_t delta) {
1524
5
    std::lock_guard wlock(_meta_lock);
1525
5
    return update_rowset_warmup_state_inverted_idx_num_unlocked(source, rowset_id, delta);
1526
5
}
1527
1528
bool CloudTablet::update_rowset_warmup_state_inverted_idx_num_unlocked(WarmUpTriggerSource source,
1529
                                                                       RowsetId rowset_id,
1530
5
                                                                       int64_t delta) {
1531
5
    auto it = _rowset_warm_up_states.find(rowset_id);
1532
5
    if (it == _rowset_warm_up_states.end()) {
  Branch (1532:9): [True: 0, False: 5]
1533
0
        return false;
1534
0
    }
1535
5
    if (it->second.state.trigger_source != source) {
  Branch (1535:9): [True: 2, False: 3]
1536
        // Only the same trigger source can update the state
1537
2
        return false;
1538
2
    }
1539
3
    it->second.num_inverted_idx += delta;
1540
3
    return true;
1541
5
}
1542
1543
bool CloudTablet::add_rowset_warmup_state_unlocked(const RowsetMeta& rowset,
1544
                                                   WarmUpTriggerSource source,
1545
34
                                                   std::chrono::steady_clock::time_point start_tp) {
1546
34
    auto rowset_id = rowset.rowset_id();
1547
1548
    // Check if rowset already has warmup state
1549
34
    if (_rowset_warm_up_states.contains(rowset_id)) {
  Branch (1549:9): [True: 10, False: 24]
1550
10
        auto existing_state = _rowset_warm_up_states[rowset_id].state;
1551
1552
        // For job-triggered warmup (one-time and periodic warmup), allow it to proceed
1553
        // except when there's already another job-triggered warmup in progress
1554
10
        if (source == WarmUpTriggerSource::JOB) {
  Branch (1554:13): [True: 5, False: 5]
1555
5
            if (existing_state.trigger_source == WarmUpTriggerSource::JOB &&
  Branch (1555:17): [True: 2, False: 3]
1556
5
                existing_state.progress == WarmUpProgress::DOING) {
  Branch (1556:17): [True: 1, False: 1]
1557
                // Same job type already in progress, skip to avoid duplicate warmup
1558
1
                return false;
1559
1
            }
1560
5
        } else {
1561
            // For non-job warmup (EVENT_DRIVEN, SYNC_ROWSET), skip if any warmup exists
1562
5
            return false;
1563
5
        }
1564
10
    }
1565
1566
28
    if (source == WarmUpTriggerSource::JOB) {
  Branch (1566:9): [True: 9, False: 19]
1567
9
        g_file_cache_warm_up_rowset_triggered_by_job_num << 1;
1568
19
    } else if (source == WarmUpTriggerSource::SYNC_ROWSET) {
  Branch (1568:16): [True: 6, False: 13]
1569
6
        g_file_cache_warm_up_rowset_triggered_by_sync_rowset_num << 1;
1570
13
    } else if (source == WarmUpTriggerSource::EVENT_DRIVEN) {
  Branch (1570:16): [True: 13, False: 0]
1571
13
        g_file_cache_warm_up_rowset_triggered_by_event_driven_num << 1;
1572
13
    }
1573
28
    _rowset_warm_up_states[rowset_id] = {
1574
28
            .state = {.trigger_source = source,
1575
28
                      .progress = (rowset.num_segments() == 0 ? WarmUpProgress::DONE
  Branch (1575:36): [True: 1, False: 27]
1576
28
                                                              : WarmUpProgress::DOING)},
1577
28
            .num_segments = rowset.num_segments(),
1578
28
            .start_tp = start_tp};
1579
28
    return true;
1580
34
}
1581
1582
51
void CloudTablet::RowsetWarmUpInfo::update_state() {
1583
51
    if (has_finished()) {
  Branch (1583:9): [True: 14, False: 37]
1584
14
        g_file_cache_warm_up_rowset_complete_num << 1;
1585
14
        auto cost = std::chrono::duration_cast<std::chrono::milliseconds>(
1586
14
                            std::chrono::steady_clock::now() - start_tp)
1587
14
                            .count();
1588
14
        g_file_cache_warm_up_rowset_all_segments_latency << cost;
1589
14
        state.progress = WarmUpProgress::DONE;
1590
14
    }
1591
51
}
1592
1593
WarmUpState CloudTablet::complete_rowset_segment_warmup(WarmUpTriggerSource trigger_source,
1594
                                                        RowsetId rowset_id, Status status,
1595
                                                        int64_t segment_num,
1596
21
                                                        int64_t inverted_idx_num) {
1597
21
    std::lock_guard wlock(_meta_lock);
1598
21
    auto it = _rowset_warm_up_states.find(rowset_id);
1599
21
    if (it == _rowset_warm_up_states.end()) {
  Branch (1599:9): [True: 1, False: 20]
1600
1
        return {.trigger_source = WarmUpTriggerSource::NONE, .progress = WarmUpProgress::NONE};
1601
1
    }
1602
20
    auto& warmup_info = it->second;
1603
20
    if (warmup_info.state.trigger_source != trigger_source) {
  Branch (1603:9): [True: 2, False: 18]
1604
        // Only the same trigger source can update the state
1605
2
        return warmup_info.state;
1606
2
    }
1607
18
    VLOG_DEBUG << "complete rowset segment warmup for rowset " << rowset_id << ", " << status;
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
1608
18
    if (segment_num > 0) {
  Branch (1608:9): [True: 16, False: 2]
1609
16
        g_file_cache_warm_up_segment_complete_num << segment_num;
1610
16
        if (!status.ok()) {
  Branch (1610:13): [True: 1, False: 15]
1611
1
            g_file_cache_warm_up_segment_failed_num << segment_num;
1612
1
        }
1613
16
    }
1614
18
    if (inverted_idx_num > 0) {
  Branch (1614:9): [True: 2, False: 16]
1615
2
        g_file_cache_warm_up_inverted_idx_complete_num << inverted_idx_num;
1616
2
        if (!status.ok()) {
  Branch (1616:13): [True: 0, False: 2]
1617
0
            g_file_cache_warm_up_inverted_idx_failed_num << inverted_idx_num;
1618
0
        }
1619
2
    }
1620
18
    warmup_info.done(segment_num, inverted_idx_num);
1621
18
    return warmup_info.state;
1622
20
}
1623
1624
200
bool CloudTablet::is_rowset_warmed_up(const RowsetId& rowset_id) const {
1625
200
    auto it = _rowset_warm_up_states.find(rowset_id);
1626
200
    if (it == _rowset_warm_up_states.end()) {
  Branch (1626:9): [True: 58, False: 142]
1627
58
        return false;
1628
58
    }
1629
142
    return it->second.state.progress == WarmUpProgress::DONE;
1630
200
}
1631
1632
598
void CloudTablet::add_warmed_up_rowset(const RowsetId& rowset_id) {
1633
598
    _rowset_warm_up_states[rowset_id] = {
1634
598
            .state = {.trigger_source = WarmUpTriggerSource::SYNC_ROWSET,
1635
598
                      .progress = WarmUpProgress::DONE},
1636
598
            .num_segments = 1,
1637
598
            .start_tp = std::chrono::steady_clock::now()};
1638
598
}
1639
1640
#include "common/compile_check_end.h"
1641
} // namespace doris