Coverage Report

Created: 2025-08-22 14:32

/root/doris/be/src/cloud/cloud_tablet.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <bvar/bvar.h>
21
#include <gen_cpp/Types_types.h>
22
#include <gen_cpp/olap_file.pb.h>
23
#include <rapidjson/document.h>
24
#include <rapidjson/encodings.h>
25
#include <rapidjson/prettywriter.h>
26
#include <rapidjson/rapidjson.h>
27
#include <rapidjson/stringbuffer.h>
28
29
#include <atomic>
30
#include <memory>
31
#include <shared_mutex>
32
#include <unordered_map>
33
#include <vector>
34
35
#include "cloud/cloud_meta_mgr.h"
36
#include "cloud/cloud_storage_engine.h"
37
#include "cloud/cloud_tablet_mgr.h"
38
#include "cloud/cloud_warm_up_manager.h"
39
#include "common/config.h"
40
#include "common/logging.h"
41
#include "io/cache/block_file_cache_downloader.h"
42
#include "io/cache/block_file_cache_factory.h"
43
#include "olap/base_tablet.h"
44
#include "olap/compaction.h"
45
#include "olap/cumulative_compaction_time_series_policy.h"
46
#include "olap/olap_define.h"
47
#include "olap/rowset/beta_rowset.h"
48
#include "olap/rowset/rowset.h"
49
#include "olap/rowset/rowset_factory.h"
50
#include "olap/rowset/rowset_fwd.h"
51
#include "olap/rowset/rowset_writer.h"
52
#include "olap/rowset/segment_v2/inverted_index_desc.h"
53
#include "olap/storage_policy.h"
54
#include "olap/tablet_schema.h"
55
#include "olap/txn_manager.h"
56
#include "util/debug_points.h"
57
#include "vec/common/schema_util.h"
58
59
namespace doris {
60
#include "common/compile_check_begin.h"
61
using namespace ErrorCode;
62
63
bvar::Adder<int64_t> g_unused_rowsets_count("unused_rowsets_count");
64
65
static constexpr int LOAD_INITIATOR_ID = -1;
66
67
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_size(
68
        "file_cache_cloud_tablet_submitted_segment_size");
69
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_segment_num(
70
        "file_cache_cloud_tablet_submitted_segment_num");
71
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_size(
72
        "file_cache_cloud_tablet_submitted_index_size");
73
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_submitted_index_num(
74
        "file_cache_cloud_tablet_submitted_index_num");
75
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_size(
76
        "file_cache_cloud_tablet_finished_segment_size");
77
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_segment_num(
78
        "file_cache_cloud_tablet_finished_segment_num");
79
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_size(
80
        "file_cache_cloud_tablet_finished_index_size");
81
bvar::Adder<uint64_t> g_file_cache_cloud_tablet_finished_index_num(
82
        "file_cache_cloud_tablet_finished_index_num");
83
84
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_num(
85
        "file_cache_recycle_cached_data_segment_num");
86
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_segment_size(
87
        "file_cache_recycle_cached_data_segment_size");
88
bvar::Adder<uint64_t> g_file_cache_recycle_cached_data_index_num(
89
        "file_cache_recycle_cached_data_index_num");
90
91
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
92
4
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
93
94
4
CloudTablet::~CloudTablet() = default;
95
96
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
97
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
98
0
}
99
100
0
std::string CloudTablet::tablet_path() const {
101
0
    return "";
102
0
}
103
104
Status CloudTablet::capture_rs_readers(const Version& spec_version,
105
                                       std::vector<RowSetSplits>* rs_splits,
106
0
                                       bool skip_missing_version) {
107
0
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
108
0
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
109
0
        return Status::Error<false>(-230, "injected error");
110
0
    });
111
0
    std::shared_lock rlock(_meta_lock);
112
0
    *rs_splits = DORIS_TRY(capture_rs_readers_unlocked(
Line
Count
Source
716
0
    ({                                           \
717
0
        auto&& res = (stmt);                     \
718
0
        using T = std::decay_t<decltype(res)>;   \
719
0
        if (!res.has_value()) [[unlikely]] {     \
  Branch (719:13): [True: 0, False: 0]
720
0
            return std::forward<T>(res).error(); \
721
0
        }                                        \
722
0
        std::forward<T>(res).value();            \
723
0
    });
113
0
            spec_version, CaptureRowsetOps {.skip_missing_versions = skip_missing_version}));
114
0
    return Status::OK();
115
0
}
116
117
0
Status CloudTablet::merge_rowsets_schema() {
118
    // Find the rowset with the max version
119
0
    auto max_version_rowset =
120
0
            std::max_element(
121
0
                    _rs_version_map.begin(), _rs_version_map.end(),
122
0
                    [](const auto& a, const auto& b) {
123
0
                        return !a.second->tablet_schema()
  Branch (123:32): [True: 0, False: 0]
124
0
                                       ? true
125
0
                                       : (!b.second->tablet_schema()
  Branch (125:43): [True: 0, False: 0]
126
0
                                                  ? false
127
0
                                                  : a.second->tablet_schema()->schema_version() <
128
0
                                                            b.second->tablet_schema()
129
0
                                                                    ->schema_version());
130
0
                    })
131
0
                    ->second;
132
0
    TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
133
    // If the schema has variant columns, perform a merge to create a wide tablet schema
134
0
    if (max_version_schema->num_variant_columns() > 0) {
  Branch (134:9): [True: 0, False: 0]
135
0
        std::vector<TabletSchemaSPtr> schemas;
136
0
        std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
137
0
                       [](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
138
        // Merge the collected schemas to obtain the least common schema
139
0
        RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
140
0
                                                                         max_version_schema));
141
0
        VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
142
0
        _merged_tablet_schema = max_version_schema;
143
0
    }
144
0
    return Status::OK();
145
0
}
146
147
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
148
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
149
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data,
150
0
                                 SyncRowsetStats* stats) {
151
0
    RETURN_IF_ERROR(sync_if_not_running(stats));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
152
153
0
    if (query_version > 0) {
  Branch (153:9): [True: 0, False: 0]
154
0
        std::shared_lock rlock(_meta_lock);
155
0
        if (_max_version >= query_version) {
  Branch (155:13): [True: 0, False: 0]
156
0
            return Status::OK();
157
0
        }
158
0
    }
159
160
    // serially execute sync to reduce unnecessary network overhead
161
0
    std::unique_lock lock(_sync_meta_lock);
162
0
    if (query_version > 0) {
  Branch (162:9): [True: 0, False: 0]
163
0
        std::shared_lock rlock(_meta_lock);
164
0
        if (_max_version >= query_version) {
  Branch (164:13): [True: 0, False: 0]
165
0
            return Status::OK();
166
0
        }
167
0
    }
168
169
0
    auto st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, warmup_delta_data, true,
170
0
                                                              false, stats);
171
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (171:9): [True: 0, False: 0]
172
0
        clear_cache();
173
0
    }
174
175
0
    return st;
176
0
}
177
178
// Sync tablet meta and all rowset meta if not running.
179
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
180
// It should be a quite rare situation.
181
0
Status CloudTablet::sync_if_not_running(SyncRowsetStats* stats) {
182
0
    if (tablet_state() == TABLET_RUNNING) {
  Branch (182:9): [True: 0, False: 0]
183
0
        return Status::OK();
184
0
    }
185
186
    // Serially execute sync to reduce unnecessary network overhead
187
0
    std::unique_lock lock(_sync_meta_lock);
188
189
0
    {
190
0
        std::shared_lock rlock(_meta_lock);
191
0
        if (tablet_state() == TABLET_RUNNING) {
  Branch (191:13): [True: 0, False: 0]
192
0
            return Status::OK();
193
0
        }
194
0
    }
195
196
0
    TabletMetaSharedPtr tablet_meta;
197
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
198
0
    if (!st.ok()) {
  Branch (198:9): [True: 0, False: 0]
199
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (199:13): [True: 0, False: 0]
200
0
            clear_cache();
201
0
        }
202
0
        return st;
203
0
    }
204
205
0
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
  Branch (205:9): [True: 0, False: 0]
206
        // MoW may go to here when load while schema change
207
0
        return Status::OK();
208
0
    }
209
210
0
    TimestampedVersionTracker empty_tracker;
211
0
    {
212
0
        std::lock_guard wlock(_meta_lock);
213
0
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
214
0
        _rs_version_map.clear();
215
0
        _stale_rs_version_map.clear();
216
0
        std::swap(_timestamped_version_tracker, empty_tracker);
217
0
        _tablet_meta->clear_rowsets();
218
0
        _tablet_meta->clear_stale_rowset();
219
0
        _max_version = -1;
220
0
    }
221
222
0
    st = _engine.meta_mgr().sync_tablet_rowsets_unlocked(this, lock, false, true, false, stats);
223
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (223:9): [True: 0, False: 0]
224
0
        clear_cache();
225
0
    }
226
0
    return st;
227
0
}
228
229
0
TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
230
0
    std::shared_lock rlock(_meta_lock);
231
0
    return _merged_tablet_schema;
232
0
}
233
234
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
235
                              std::unique_lock<std::shared_mutex>& meta_lock,
236
0
                              bool warmup_delta_data) {
237
0
    if (to_add.empty()) {
  Branch (237:9): [True: 0, False: 0]
238
0
        return;
239
0
    }
240
241
0
    auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
242
0
        for (auto& rs : rowsets) {
  Branch (242:23): [True: 0, False: 0]
243
0
            if (version_overlap || warmup_delta_data) {
  Branch (243:17): [True: 0, False: 0]
  Branch (243:36): [True: 0, False: 0]
244
#ifndef BE_TEST
245
                // Warmup rowset data in background
246
                for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
247
                    const auto& rowset_meta = rs->rowset_meta();
248
                    constexpr int64_t interval = 600; // 10 mins
249
                    // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
250
                    // So we need to filter out the old rowsets avoid to download the whole table.
251
                    if (warmup_delta_data &&
252
                        ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
253
                        continue;
254
                    }
255
256
                    auto storage_resource = rowset_meta->remote_storage_resource();
257
                    if (!storage_resource) {
258
                        LOG(WARNING) << storage_resource.error();
259
                        continue;
260
                    }
261
262
                    int64_t expiration_time =
263
                            _tablet_meta->ttl_seconds() == 0 ||
264
                                            rowset_meta->newest_write_timestamp() <= 0
265
                                    ? 0
266
                                    : rowset_meta->newest_write_timestamp() +
267
                                              _tablet_meta->ttl_seconds();
268
                    g_file_cache_cloud_tablet_submitted_segment_num << 1;
269
                    if (rs->rowset_meta()->segment_file_size(seg_id) > 0) {
270
                        g_file_cache_cloud_tablet_submitted_segment_size
271
                                << rs->rowset_meta()->segment_file_size(seg_id);
272
                    }
273
                    _engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta {
274
                            .path = storage_resource.value()->remote_segment_path(*rowset_meta,
275
                                                                                  seg_id),
276
                            .file_size = rs->rowset_meta()->segment_file_size(seg_id),
277
                            .file_system = storage_resource.value()->fs,
278
                            .ctx =
279
                                    {
280
                                            .expiration_time = expiration_time,
281
                                            .is_dryrun = config::
282
                                                    enable_reader_dryrun_when_download_file_cache,
283
                                    },
284
                            .download_done {[](Status st) {
285
                                if (!st) {
286
                                    LOG_WARNING("add rowset warm up error ").error(st);
287
                                }
288
                            }},
289
                    });
290
291
                    auto download_idx_file = [&](const io::Path& idx_path, int64_t idx_size) {
292
                        io::DownloadFileMeta meta {
293
                                .path = idx_path,
294
                                .file_size = idx_size,
295
                                .file_system = storage_resource.value()->fs,
296
                                .ctx =
297
                                        {
298
                                                .expiration_time = expiration_time,
299
                                                .is_dryrun = config::
300
                                                        enable_reader_dryrun_when_download_file_cache,
301
                                        },
302
                                .download_done {[](Status st) {
303
                                    if (!st) {
304
                                        LOG_WARNING("add rowset warm up error ").error(st);
305
                                    }
306
                                }},
307
                        };
308
                        _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
309
                        g_file_cache_cloud_tablet_submitted_index_num << 1;
310
                        g_file_cache_cloud_tablet_submitted_index_size << idx_size;
311
                    };
312
                    // clang-format on
313
                    auto schema_ptr = rowset_meta->tablet_schema();
314
                    auto idx_version = schema_ptr->get_inverted_index_storage_format();
315
                    if (idx_version == InvertedIndexStorageFormatPB::V1) {
316
                        std::unordered_map<int64_t, int64_t> index_size_map;
317
                        auto&& inverted_index_info = rowset_meta->inverted_index_file_info(seg_id);
318
                        for (const auto& info : inverted_index_info.index_info()) {
319
                            if (info.index_file_size() != -1) {
320
                                index_size_map[info.index_id()] = info.index_file_size();
321
                            } else {
322
                                VLOG_DEBUG << "Invalid index_file_size for segment_id " << seg_id
323
                                           << ", index_id " << info.index_id();
324
                            }
325
                        }
326
                        for (const auto& index : schema_ptr->inverted_indexes()) {
327
                            auto idx_path = storage_resource.value()->remote_idx_v1_path(
328
                                    *rowset_meta, seg_id, index->index_id(),
329
                                    index->get_index_suffix());
330
                            download_idx_file(idx_path, index_size_map[index->index_id()]);
331
                        }
332
                    } else {
333
                        if (schema_ptr->has_inverted_index()) {
334
                            auto&& inverted_index_info =
335
                                    rowset_meta->inverted_index_file_info(seg_id);
336
                            int64_t idx_size = 0;
337
                            if (inverted_index_info.has_index_size()) {
338
                                idx_size = inverted_index_info.index_size();
339
                            } else {
340
                                VLOG_DEBUG << "index_size is not set for segment " << seg_id;
341
                            }
342
                            auto idx_path = storage_resource.value()->remote_idx_v2_path(
343
                                    *rowset_meta, seg_id);
344
                            download_idx_file(idx_path, idx_size);
345
                        }
346
                    }
347
                }
348
#endif
349
0
            }
350
0
            _rs_version_map.emplace(rs->version(), rs);
351
0
            _timestamped_version_tracker.add_version(rs->version());
352
0
            _max_version = std::max(rs->end_version(), _max_version);
353
0
            update_base_size(*rs);
354
0
        }
355
0
        _tablet_meta->add_rowsets_unchecked(rowsets);
356
0
    };
357
358
0
    if (!version_overlap) {
  Branch (358:9): [True: 0, False: 0]
359
0
        add_rowsets_directly(to_add);
360
0
        return;
361
0
    }
362
363
    // Filter out existed rowsets
364
0
    auto remove_it =
365
0
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
366
0
                if (auto find_it = _rs_version_map.find(rs->version());
367
0
                    find_it == _rs_version_map.end()) {
  Branch (367:21): [True: 0, False: 0]
368
0
                    return false;
369
0
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
  Branch (369:28): [True: 0, False: 0]
370
0
                    return true; // Same rowset
371
0
                }
372
373
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
374
                // replace existed rowset with `to_add` rowset. This may occur when:
375
                //  1. schema change converts rowsets which have been double written to new tablet
376
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
377
378
                // add existed rowset to unused_rowsets to remove delete bitmap and recycle cached data
379
0
                std::vector<RowsetSharedPtr> unused_rowsets;
380
0
                if (auto find_it = _rs_version_map.find(rs->version());
381
0
                    find_it != _rs_version_map.end()) {
  Branch (381:21): [True: 0, False: 0]
382
0
                    DCHECK(find_it->second->rowset_id() != rs->rowset_id())
383
0
                            << "tablet_id=" << tablet_id()
384
0
                            << ", rowset_id=" << rs->rowset_id().to_string()
385
0
                            << ", existed rowset_id=" << find_it->second->rowset_id().to_string();
386
0
                    unused_rowsets.push_back(find_it->second);
387
0
                }
388
0
                add_unused_rowsets(unused_rowsets);
389
390
0
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
391
0
                _rs_version_map[rs->version()] = rs;
392
0
                _tablet_meta->add_rowsets_unchecked({rs});
393
0
                update_base_size(*rs);
394
0
                return true;
395
0
            });
396
397
0
    to_add.erase(remove_it, to_add.end());
398
399
    // delete rowsets with overlapped version
400
0
    std::vector<RowsetSharedPtr> to_add_directly;
401
0
    for (auto& to_add_rs : to_add) {
  Branch (401:26): [True: 0, False: 0]
402
        // delete rowsets with overlapped version
403
0
        std::vector<RowsetSharedPtr> to_delete;
404
0
        Version to_add_v = to_add_rs->version();
405
        // if start_version  > max_version, we can skip checking overlap here.
406
0
        if (to_add_v.first > _max_version) {
  Branch (406:13): [True: 0, False: 0]
407
            // if start_version  > max_version, we can skip checking overlap here.
408
0
            to_add_directly.push_back(to_add_rs);
409
0
        } else {
410
0
            to_add_directly.push_back(to_add_rs);
411
0
            for (auto& [v, rs] : _rs_version_map) {
  Branch (411:32): [True: 0, False: 0]
412
0
                if (to_add_v.contains(v)) {
  Branch (412:21): [True: 0, False: 0]
413
0
                    to_delete.push_back(rs);
414
0
                }
415
0
            }
416
0
            delete_rowsets(to_delete, meta_lock);
417
0
        }
418
0
    }
419
420
0
    add_rowsets_directly(to_add_directly);
421
0
}
422
423
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
424
0
                                 std::unique_lock<std::shared_mutex>&) {
425
0
    if (to_delete.empty()) {
  Branch (425:9): [True: 0, False: 0]
426
0
        return;
427
0
    }
428
0
    std::vector<RowsetMetaSharedPtr> rs_metas;
429
0
    rs_metas.reserve(to_delete.size());
430
0
    for (auto&& rs : to_delete) {
  Branch (430:20): [True: 0, False: 0]
431
0
        rs_metas.push_back(rs->rowset_meta());
432
0
        _stale_rs_version_map[rs->version()] = rs;
433
0
    }
434
0
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
435
0
    for (auto&& rs : to_delete) {
  Branch (435:20): [True: 0, False: 0]
436
0
        _rs_version_map.erase(rs->version());
437
0
    }
438
439
0
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
440
0
}
441
442
0
uint64_t CloudTablet::delete_expired_stale_rowsets() {
443
0
    if (config::enable_mow_verbose_log) {
  Branch (443:9): [True: 0, False: 0]
444
0
        LOG_INFO("begin delete_expired_stale_rowset for tablet={}", tablet_id());
Line
Count
Source
118
0
#define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
445
0
    }
446
0
    std::vector<RowsetSharedPtr> expired_rowsets;
447
    // ATTN: trick, Use stale_rowsets to temporarily increase the reference count of the rowset shared pointer in _stale_rs_version_map so that in the recycle_cached_data function, it checks if the reference count is 2.
448
0
    std::vector<std::pair<Version, std::vector<RowsetSharedPtr>>> deleted_stale_rowsets;
449
0
    int64_t expired_stale_sweep_endtime =
450
0
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
451
0
    {
452
0
        std::unique_lock wlock(_meta_lock);
453
454
0
        std::vector<int64_t> path_ids;
455
        // capture the path version to delete
456
0
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
457
458
0
        if (path_ids.empty()) {
  Branch (458:13): [True: 0, False: 0]
459
0
            return 0;
460
0
        }
461
462
0
        for (int64_t path_id : path_ids) {
  Branch (462:30): [True: 0, False: 0]
463
0
            int64_t start_version = -1;
464
0
            int64_t end_version = -1;
465
0
            std::vector<RowsetSharedPtr> stale_rowsets;
466
            // delete stale versions in version graph
467
0
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
468
0
            for (auto& v_ts : version_path->timestamped_versions()) {
  Branch (468:29): [True: 0, False: 0]
469
0
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
470
0
                if (rs_it != _stale_rs_version_map.end()) {
  Branch (470:21): [True: 0, False: 0]
471
0
                    expired_rowsets.push_back(rs_it->second);
472
0
                    stale_rowsets.push_back(rs_it->second);
473
0
                    VLOG_DEBUG << "erase stale rowset, tablet_id=" << tablet_id()
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
474
0
                               << " rowset_id=" << rs_it->second->rowset_id().to_string()
475
0
                               << " version=" << rs_it->first.to_string();
476
0
                    _stale_rs_version_map.erase(rs_it);
477
0
                } else {
478
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
479
0
                                 << tablet_id();
480
                    // clang-format off
481
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
482
                    // clang-format on
483
0
                }
484
0
                if (start_version < 0) {
  Branch (484:21): [True: 0, False: 0]
485
0
                    start_version = v_ts->version().first;
486
0
                }
487
0
                end_version = v_ts->version().second;
488
0
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
489
0
            }
490
0
            Version version(start_version, end_version);
491
0
            if (!stale_rowsets.empty()) {
  Branch (491:17): [True: 0, False: 0]
492
0
                deleted_stale_rowsets.emplace_back(version, std::move(stale_rowsets));
493
0
            }
494
0
        }
495
0
        _reconstruct_version_tracker_if_necessary();
496
0
    }
497
0
    auto recycled_rowsets = recycle_cached_data(expired_rowsets);
498
0
    if (!recycled_rowsets.empty()) {
  Branch (498:9): [True: 0, False: 0]
499
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
500
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
501
0
    }
502
0
    if (config::enable_mow_verbose_log) {
  Branch (502:9): [True: 0, False: 0]
503
0
        LOG_INFO("finish delete_expired_stale_rowset for tablet={}", tablet_id());
Line
Count
Source
118
0
#define LOG_INFO TaggableLogger(__FILE__, __LINE__, google::GLOG_INFO)
504
0
    }
505
506
0
    add_unused_rowsets(expired_rowsets);
507
0
    if (config::enable_agg_and_remove_pre_rowsets_delete_bitmap && keys_type() == UNIQUE_KEYS &&
  Branch (507:9): [True: 0, False: 0]
  Branch (507:68): [True: 0, False: 0]
508
0
        enable_unique_key_merge_on_write() && !deleted_stale_rowsets.empty()) {
  Branch (508:9): [True: 0, False: 0]
  Branch (508:47): [True: 0, False: 0]
509
        // agg delete bitmap for pre rowsets; record unused delete bitmap key ranges
510
0
        OlapStopWatch watch;
511
0
        for (const auto& [version, unused_rowsets] : deleted_stale_rowsets) {
  Branch (511:52): [True: 0, False: 0]
512
            // agg delete bitmap for pre rowset
513
0
            DeleteBitmapKeyRanges remove_delete_bitmap_key_ranges;
514
0
            agg_delete_bitmap_for_stale_rowsets(version, remove_delete_bitmap_key_ranges);
515
            // add remove delete bitmap
516
0
            if (!remove_delete_bitmap_key_ranges.empty()) {
  Branch (516:17): [True: 0, False: 0]
517
0
                std::vector<RowsetId> rowset_ids;
518
0
                for (const auto& rs : unused_rowsets) {
  Branch (518:37): [True: 0, False: 0]
519
0
                    rowset_ids.push_back(rs->rowset_id());
520
0
                }
521
0
                std::lock_guard<std::mutex> lock(_gc_mutex);
522
0
                _unused_delete_bitmap.push_back(
523
0
                        std::make_pair(rowset_ids, remove_delete_bitmap_key_ranges));
524
0
            }
525
0
        }
526
0
        LOG(INFO) << "agg pre rowsets delete bitmap. tablet_id=" << tablet_id()
527
0
                  << ", size=" << deleted_stale_rowsets.size()
528
0
                  << ", cost(us)=" << watch.get_elapse_time_us();
529
0
    }
530
0
    return expired_rowsets.size();
531
0
}
532
533
0
bool CloudTablet::need_remove_unused_rowsets() {
534
0
    std::lock_guard<std::mutex> lock(_gc_mutex);
535
0
    return !_unused_rowsets.empty();
536
0
}
537
538
0
void CloudTablet::add_unused_rowsets(const std::vector<RowsetSharedPtr>& rowsets) {
539
0
    std::lock_guard<std::mutex> lock(_gc_mutex);
540
0
    for (const auto& rowset : rowsets) {
  Branch (540:29): [True: 0, False: 0]
541
0
        _unused_rowsets[rowset->rowset_id()] = rowset;
542
0
    }
543
0
    g_unused_rowsets_count << rowsets.size();
544
0
}
545
546
0
void CloudTablet::remove_unused_rowsets() {
547
0
    int64_t removed_delete_bitmap_num = 0;
548
0
    std::vector<std::shared_ptr<Rowset>> removed_rowsets;
549
0
    OlapStopWatch watch;
550
551
0
    {
552
0
        std::lock_guard<std::mutex> lock(_gc_mutex);
553
        // 1. remove unused rowsets's cache data and delete bitmap
554
0
        for (auto it = _unused_rowsets.begin(); it != _unused_rowsets.end();) {
  Branch (554:49): [True: 0, False: 0]
555
0
            auto& rs = it->second;
556
0
            if (rs.use_count() > 1) {
  Branch (556:17): [True: 0, False: 0]
557
0
                LOG(WARNING) << "tablet_id:" << tablet_id() << " rowset: " << rs->rowset_id()
558
0
                             << " has " << rs.use_count() << " references, it cannot be removed";
559
0
                ++it;
560
0
                continue;
561
0
            }
562
0
            tablet_meta()->remove_rowset_delete_bitmap(rs->rowset_id(), rs->version());
563
0
            rs->clear_cache();
564
0
            removed_rowsets.push_back(std::move(rs));
565
0
            g_unused_rowsets_count << -1;
566
0
            it = _unused_rowsets.erase(it);
567
0
        }
568
0
    }
569
570
0
    {
571
0
        std::vector<RecycledRowsets> recycled_rowsets;
572
573
0
        for (auto& rs : removed_rowsets) {
  Branch (573:23): [True: 0, False: 0]
574
0
            auto index_names = rs->get_index_file_names();
575
0
            recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
576
0
            int64_t segment_size_sum = 0;
577
0
            for (int32_t i = 0; i < rs->num_segments(); i++) {
  Branch (577:33): [True: 0, False: 0]
578
0
                segment_size_sum += rs->rowset_meta()->segment_file_size(i);
579
0
            }
580
0
            g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
581
0
            g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
582
0
            g_file_cache_recycle_cached_data_index_num << index_names.size();
583
0
        }
584
585
0
        if (recycled_rowsets.size() > 0) {
  Branch (585:13): [True: 0, False: 0]
586
0
            auto& manager =
587
0
                    ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
588
0
            manager.recycle_cache(tablet_id(), recycled_rowsets);
589
0
        }
590
0
    }
591
592
    // 2. remove delete bitmap of pre rowsets
593
0
    for (auto it = _unused_delete_bitmap.begin(); it != _unused_delete_bitmap.end();) {
  Branch (593:51): [True: 0, False: 0]
594
0
        auto& rowset_ids = std::get<0>(*it);
595
0
        bool find_unused_rowset = false;
596
0
        for (const auto& rowset_id : rowset_ids) {
  Branch (596:36): [True: 0, False: 0]
597
0
            if (_unused_rowsets.find(rowset_id) != _unused_rowsets.end()) {
  Branch (597:17): [True: 0, False: 0]
598
0
                LOG(INFO) << "can not remove pre rowset delete bitmap because rowset is in use"
599
0
                          << ", tablet_id=" << tablet_id() << ", rowset_id=" << rowset_id;
600
0
                find_unused_rowset = true;
601
0
                break;
602
0
            }
603
0
        }
604
0
        if (find_unused_rowset) {
  Branch (604:13): [True: 0, False: 0]
605
0
            ++it;
606
0
            continue;
607
0
        }
608
0
        auto& key_ranges = std::get<1>(*it);
609
0
        tablet_meta()->delete_bitmap()->remove(key_ranges);
610
0
        it = _unused_delete_bitmap.erase(it);
611
0
        removed_delete_bitmap_num++;
612
0
    }
613
614
0
    LOG(INFO) << "tablet_id=" << tablet_id() << ", unused_rowset size=" << _unused_rowsets.size()
615
0
              << ", unused_delete_bitmap size=" << _unused_delete_bitmap.size()
616
0
              << ", removed_rowsets_num=" << removed_rowsets.size()
617
0
              << ", removed_delete_bitmap_num=" << removed_delete_bitmap_num
618
0
              << ", cost(us)=" << watch.get_elapse_time_us();
619
0
}
620
621
0
void CloudTablet::update_base_size(const Rowset& rs) {
622
    // Define base rowset as the rowset of version [2-x]
623
0
    if (rs.start_version() == 2) {
  Branch (623:9): [True: 0, False: 0]
624
0
        _base_size = rs.total_disk_size();
625
0
    }
626
0
}
627
628
0
void CloudTablet::clear_cache() {
629
0
    auto recycled_rowsets = CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
630
0
    if (!recycled_rowsets.empty()) {
  Branch (630:9): [True: 0, False: 0]
631
0
        auto& manager = ExecEnv::GetInstance()->storage_engine().to_cloud().cloud_warm_up_manager();
632
0
        manager.recycle_cache(tablet_id(), recycled_rowsets);
633
0
    }
634
0
    _engine.tablet_mgr().erase_tablet(tablet_id());
635
0
}
636
637
std::vector<RecycledRowsets> CloudTablet::recycle_cached_data(
638
0
        const std::vector<RowsetSharedPtr>& rowsets) {
639
0
    std::vector<RecycledRowsets> recycled_rowsets;
640
0
    for (const auto& rs : rowsets) {
  Branch (640:25): [True: 0, False: 0]
641
        // rowsets and tablet._rs_version_map each hold a rowset shared_ptr, so at this point, the reference count of the shared_ptr is at least 2.
642
0
        if (rs.use_count() > 2) {
  Branch (642:13): [True: 0, False: 0]
643
0
            LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has " << rs.use_count()
644
0
                         << " references. File Cache won't be recycled when query is using it.";
645
0
            continue;
646
0
        }
647
0
        rs->clear_cache();
648
0
        auto index_names = rs->get_index_file_names();
649
0
        recycled_rowsets.emplace_back(rs->rowset_id(), rs->num_segments(), index_names);
650
651
0
        int64_t segment_size_sum = 0;
652
0
        for (int32_t i = 0; i < rs->num_segments(); i++) {
  Branch (652:29): [True: 0, False: 0]
653
0
            segment_size_sum += rs->rowset_meta()->segment_file_size(i);
654
0
        }
655
0
        g_file_cache_recycle_cached_data_segment_num << rs->num_segments();
656
0
        g_file_cache_recycle_cached_data_segment_size << segment_size_sum;
657
0
        g_file_cache_recycle_cached_data_index_num << index_names.size();
658
0
    }
659
0
    return recycled_rowsets;
660
0
}
661
662
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
663
0
                                          int64_t num_rows, int64_t data_size) {
664
0
    _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
665
0
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
666
0
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
667
0
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
668
0
    int64_t cumu_num_deltas = 0;
669
0
    int64_t cumu_num_rowsets = 0;
670
0
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
671
0
    for (auto& [v, r] : _rs_version_map) {
  Branch (671:23): [True: 0, False: 0]
672
0
        if (v.second < cp) {
  Branch (672:13): [True: 0, False: 0]
673
0
            continue;
674
0
        }
675
676
0
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
  Branch (676:28): [True: 0, False: 0]
677
0
        ++cumu_num_rowsets;
678
0
    }
679
0
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
680
0
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
681
0
}
682
683
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
684
0
        RowsetWriterContext& context, bool vertical) {
685
0
    context.rowset_id = _engine.next_rowset_id();
686
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
687
0
    context.tablet_id = tablet_id();
688
0
    context.index_id = index_id();
689
0
    context.partition_id = partition_id();
690
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
691
0
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
692
0
}
693
694
// create a rowset writer with rowset_id and seg_id
695
// after writer, merge this transient rowset with original rowset
696
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
697
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
698
0
        int64_t txn_expiration) {
699
0
    if (rowset.rowset_meta_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE &&
  Branch (699:9): [True: 0, False: 0]
700
0
        rowset.rowset_meta_state() != RowsetStatePB::COMMITTED) [[unlikely]] {
  Branch (700:9): [True: 0, False: 0]
701
0
        auto msg = fmt::format(
702
0
                "wrong rowset state when create_transient_rowset_writer, rowset state should be "
703
0
                "BEGIN_PARTIAL_UPDATE or COMMITTED, but found {}, rowset_id={}, tablet_id={}",
704
0
                RowsetStatePB_Name(rowset.rowset_meta_state()), rowset.rowset_id().to_string(),
705
0
                tablet_id());
706
        // see `CloudRowsetWriter::build` for detail.
707
        // if this is in a retry task, the rowset state may have been changed to RowsetStatePB::COMMITTED
708
        // in `RowsetMeta::merge_rowset_meta()` in previous trials.
709
0
        LOG(WARNING) << msg;
710
0
        DCHECK(false) << msg;
711
0
    }
712
0
    RowsetWriterContext context;
713
0
    context.rowset_state = PREPARED;
714
0
    context.segments_overlap = OVERLAPPING;
715
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
716
    // This is because the partial update for a variant needs to ignore the extracted columns.
717
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
718
    // the complete variant is constructed by reading all the sub-columns of the variant.
719
0
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
720
0
    context.newest_write_timestamp = UnixSeconds();
721
0
    context.tablet_id = table_id();
722
0
    context.enable_segcompaction = false;
723
0
    context.write_type = DataWriteType::TYPE_DIRECT;
724
0
    context.partial_update_info = std::move(partial_update_info);
725
0
    context.is_transient_rowset_writer = true;
726
0
    context.rowset_id = rowset.rowset_id();
727
0
    context.tablet_id = tablet_id();
728
0
    context.index_id = index_id();
729
0
    context.partition_id = partition_id();
730
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
731
0
    context.txn_expiration = txn_expiration;
732
733
0
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
734
0
    if (!storage_resource) {
  Branch (734:9): [True: 0, False: 0]
735
0
        return ResultError(std::move(storage_resource.error()));
736
0
    }
737
738
0
    context.storage_resource = *storage_resource.value();
739
740
0
    return RowsetFactory::create_rowset_writer(_engine, context, false)
741
0
            .transform([&](auto&& writer) {
742
0
                writer->set_segment_start_id(static_cast<int32_t>(rowset.num_segments()));
743
0
                return writer;
744
0
            });
745
0
}
746
747
3
int64_t CloudTablet::get_cloud_base_compaction_score() const {
748
3
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
  Branch (748:9): [True: 0, False: 3]
749
0
        bool has_delete = false;
750
0
        int64_t point = cumulative_layer_point();
751
0
        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
752
0
        for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
  Branch (752:34): [True: 0, False: 0]
753
0
            if (rs_meta->start_version() >= point) {
  Branch (753:17): [True: 0, False: 0]
754
0
                continue;
755
0
            }
756
0
            if (rs_meta->has_delete_predicate()) {
  Branch (756:17): [True: 0, False: 0]
757
0
                has_delete = true;
758
0
                break;
759
0
            }
760
0
        }
761
0
        if (!has_delete) {
  Branch (761:13): [True: 0, False: 0]
762
0
            return 0;
763
0
        }
764
0
    }
765
766
3
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
767
3
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
768
3
}
769
770
1
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
771
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
772
    //  number of tablet versions simultaneously.
773
1
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
774
1
}
775
776
// return a json string to show the compaction status of this tablet
777
0
void CloudTablet::get_compaction_status(std::string* json_result) {
778
0
    rapidjson::Document root;
779
0
    root.SetObject();
780
781
0
    rapidjson::Document path_arr;
782
0
    path_arr.SetArray();
783
784
0
    std::vector<RowsetSharedPtr> rowsets;
785
0
    std::vector<RowsetSharedPtr> stale_rowsets;
786
0
    {
787
0
        std::shared_lock rdlock(_meta_lock);
788
0
        rowsets.reserve(_rs_version_map.size());
789
0
        for (auto& it : _rs_version_map) {
  Branch (789:23): [True: 0, False: 0]
790
0
            rowsets.push_back(it.second);
791
0
        }
792
0
        stale_rowsets.reserve(_stale_rs_version_map.size());
793
0
        for (auto& it : _stale_rs_version_map) {
  Branch (793:23): [True: 0, False: 0]
794
0
            stale_rowsets.push_back(it.second);
795
0
        }
796
0
    }
797
0
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
798
0
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
799
800
    // get snapshot version path json_doc
801
0
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
802
0
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
803
0
    rapidjson::Value cumu_value;
804
0
    std::string format_str = ToStringFromUnixMillis(_last_cumu_compaction_failure_millis.load());
805
0
    cumu_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
806
0
                         root.GetAllocator());
807
0
    root.AddMember("last cumulative failure time", cumu_value, root.GetAllocator());
808
0
    rapidjson::Value base_value;
809
0
    format_str = ToStringFromUnixMillis(_last_base_compaction_failure_millis.load());
810
0
    base_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
811
0
                         root.GetAllocator());
812
0
    root.AddMember("last base failure time", base_value, root.GetAllocator());
813
0
    rapidjson::Value full_value;
814
0
    format_str = ToStringFromUnixMillis(_last_full_compaction_failure_millis.load());
815
0
    full_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
816
0
                         root.GetAllocator());
817
0
    root.AddMember("last full failure time", full_value, root.GetAllocator());
818
0
    rapidjson::Value cumu_success_value;
819
0
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_success_millis.load());
820
0
    cumu_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
821
0
                                 root.GetAllocator());
822
0
    root.AddMember("last cumulative success time", cumu_success_value, root.GetAllocator());
823
0
    rapidjson::Value base_success_value;
824
0
    format_str = ToStringFromUnixMillis(_last_base_compaction_success_millis.load());
825
0
    base_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
826
0
                                 root.GetAllocator());
827
0
    root.AddMember("last base success time", base_success_value, root.GetAllocator());
828
0
    rapidjson::Value full_success_value;
829
0
    format_str = ToStringFromUnixMillis(_last_full_compaction_success_millis.load());
830
0
    full_success_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
831
0
                                 root.GetAllocator());
832
0
    root.AddMember("last full success time", full_success_value, root.GetAllocator());
833
0
    rapidjson::Value cumu_schedule_value;
834
0
    format_str = ToStringFromUnixMillis(_last_cumu_compaction_schedule_millis.load());
835
0
    cumu_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
836
0
                                  root.GetAllocator());
837
0
    root.AddMember("last cumulative schedule time", cumu_schedule_value, root.GetAllocator());
838
0
    rapidjson::Value base_schedule_value;
839
0
    format_str = ToStringFromUnixMillis(_last_base_compaction_schedule_millis.load());
840
0
    base_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
841
0
                                  root.GetAllocator());
842
0
    root.AddMember("last base schedule time", base_schedule_value, root.GetAllocator());
843
0
    rapidjson::Value full_schedule_value;
844
0
    format_str = ToStringFromUnixMillis(_last_full_compaction_schedule_millis.load());
845
0
    full_schedule_value.SetString(format_str.c_str(), static_cast<uint>(format_str.length()),
846
0
                                  root.GetAllocator());
847
0
    root.AddMember("last full schedule time", full_schedule_value, root.GetAllocator());
848
0
    rapidjson::Value cumu_compaction_status_value;
849
0
    cumu_compaction_status_value.SetString(_last_cumu_compaction_status.c_str(),
850
0
                                           static_cast<uint>(_last_cumu_compaction_status.length()),
851
0
                                           root.GetAllocator());
852
0
    root.AddMember("last cumulative status", cumu_compaction_status_value, root.GetAllocator());
853
0
    rapidjson::Value base_compaction_status_value;
854
0
    base_compaction_status_value.SetString(_last_base_compaction_status.c_str(),
855
0
                                           static_cast<uint>(_last_base_compaction_status.length()),
856
0
                                           root.GetAllocator());
857
0
    root.AddMember("last base status", base_compaction_status_value, root.GetAllocator());
858
0
    rapidjson::Value full_compaction_status_value;
859
0
    full_compaction_status_value.SetString(_last_full_compaction_status.c_str(),
860
0
                                           static_cast<uint>(_last_full_compaction_status.length()),
861
0
                                           root.GetAllocator());
862
0
    root.AddMember("last full status", full_compaction_status_value, root.GetAllocator());
863
0
    rapidjson::Value exec_compaction_time;
864
0
    std::string num_str {std::to_string(exec_compaction_time_us.load())};
865
0
    exec_compaction_time.SetString(num_str.c_str(), static_cast<uint>(num_str.length()),
866
0
                                   root.GetAllocator());
867
0
    root.AddMember("exec compaction time us", exec_compaction_time, root.GetAllocator());
868
0
    rapidjson::Value local_read_time;
869
0
    num_str = std::to_string(local_read_time_us.load());
870
0
    local_read_time.SetString(num_str.c_str(), static_cast<uint>(num_str.length()),
871
0
                              root.GetAllocator());
872
0
    root.AddMember("compaction local read time us", local_read_time, root.GetAllocator());
873
0
    rapidjson::Value remote_read_time;
874
0
    num_str = std::to_string(remote_read_time_us.load());
875
0
    remote_read_time.SetString(num_str.c_str(), static_cast<uint>(num_str.length()),
876
0
                               root.GetAllocator());
877
0
    root.AddMember("compaction remote read time us", remote_read_time, root.GetAllocator());
878
879
    // print all rowsets' version as an array
880
0
    rapidjson::Document versions_arr;
881
0
    rapidjson::Document missing_versions_arr;
882
0
    versions_arr.SetArray();
883
0
    missing_versions_arr.SetArray();
884
0
    int64_t last_version = -1;
885
0
    for (auto& rowset : rowsets) {
  Branch (885:23): [True: 0, False: 0]
886
0
        const Version& ver = rowset->version();
887
0
        if (ver.first != last_version + 1) {
  Branch (887:13): [True: 0, False: 0]
888
0
            rapidjson::Value miss_value;
889
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
890
0
                                 missing_versions_arr.GetAllocator());
891
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
892
0
        }
893
0
        rapidjson::Value value;
894
0
        std::string version_str = rowset->get_rowset_info_str();
895
0
        value.SetString(version_str.c_str(), static_cast<uint32_t>(version_str.length()),
896
0
                        versions_arr.GetAllocator());
897
0
        versions_arr.PushBack(value, versions_arr.GetAllocator());
898
0
        last_version = ver.second;
899
0
    }
900
0
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
901
0
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
902
903
    // print all stale rowsets' version as an array
904
0
    rapidjson::Document stale_versions_arr;
905
0
    stale_versions_arr.SetArray();
906
0
    for (auto& rowset : stale_rowsets) {
  Branch (906:23): [True: 0, False: 0]
907
0
        rapidjson::Value value;
908
0
        std::string version_str = rowset->get_rowset_info_str();
909
0
        value.SetString(version_str.c_str(), static_cast<uint32_t>(version_str.length()),
910
0
                        stale_versions_arr.GetAllocator());
911
0
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
912
0
    }
913
0
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
914
915
    // add stale version rowsets
916
0
    root.AddMember("stale version path", path_arr, root.GetAllocator());
917
918
    // to json string
919
0
    rapidjson::StringBuffer strbuf;
920
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
921
0
    root.Accept(writer);
922
0
    *json_result = std::string(strbuf.GetString());
923
0
}
924
925
0
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
926
0
    if (new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point) {
  Branch (926:9): [True: 0, False: 0]
  Branch (926:60): [True: 0, False: 0]
927
0
        _cumulative_point = new_point;
928
0
        return;
929
0
    }
930
    // cumulative point should only be reset to -1, or be increased
931
    // FIXME: could happen in currently unresolved race conditions
932
0
    LOG(WARNING) << "Unexpected cumulative point: " << new_point
933
0
                 << ", origin: " << _cumulative_point.load();
934
0
}
935
936
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
937
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
938
0
    {
939
0
        std::shared_lock rlock(_meta_lock);
940
0
        for (const auto& [version, rs] : _rs_version_map) {
  Branch (940:40): [True: 0, False: 0]
941
0
            if (version.first != 0 && version.first < _cumulative_point &&
  Branch (941:17): [True: 0, False: 0]
  Branch (941:39): [True: 0, False: 0]
942
0
                (_alter_version == -1 || version.second <= _alter_version)) {
  Branch (942:18): [True: 0, False: 0]
  Branch (942:42): [True: 0, False: 0]
943
0
                candidate_rowsets.push_back(rs);
944
0
            }
945
0
        }
946
0
    }
947
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
948
0
    return candidate_rowsets;
949
0
}
950
951
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
952
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
953
0
    {
954
0
        std::shared_lock rlock(_meta_lock);
955
0
        for (auto& [v, rs] : _rs_version_map) {
  Branch (955:28): [True: 0, False: 0]
956
            // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
957
0
            if (v.first != 0) {
  Branch (957:17): [True: 0, False: 0]
958
0
                candidate_rowsets.push_back(rs);
959
0
            }
960
0
        }
961
0
    }
962
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
963
0
    return candidate_rowsets;
964
0
}
965
966
0
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
967
0
    return _engine.calc_delete_bitmap_executor();
968
0
}
969
970
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
971
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
972
                                       const RowsetIdUnorderedSet& cur_rowset_ids,
973
0
                                       int64_t next_visible_version) {
974
0
    RowsetSharedPtr rowset = txn_info->rowset;
975
0
    int64_t cur_version = rowset->start_version();
976
    // update delete bitmap info, in order to avoid recalculation when trying again
977
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
978
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
979
980
0
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
  Branch (980:9): [True: 0, False: 0]
  Branch (980:42): [True: 0, False: 0]
981
0
        rowset_writer->num_rows() > 0) {
  Branch (981:9): [True: 0, False: 0]
982
0
        DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.update_tmp_rowset.error", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
983
0
            return Status::InternalError<false>("injected update_tmp_rowset error.");
984
0
        });
985
0
        const auto& rowset_meta = rowset->rowset_meta();
986
0
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
987
0
    }
988
989
0
    RETURN_IF_ERROR(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
990
0
            save_delete_bitmap_to_ms(cur_version, txn_id, delete_bitmap, next_visible_version));
991
992
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
993
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
994
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
995
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
996
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
997
0
            txn_info->publish_info));
998
999
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.enable_sleep", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
42
0
        }                                                                     \
43
0
    }
1000
0
        auto sleep_sec = dp->param<int>("sleep", 5);
1001
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1002
0
    });
1003
1004
0
    DBUG_EXECUTE_IF("CloudTablet::save_delete_bitmap.injected_error", {
Line
Count
Source
37
0
    if (UNLIKELY(config::enable_debug_points)) {                              \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
38
0
        auto dp = DebugPoints::instance()->get_debug_point(debug_point_name); \
39
0
        if (dp) {                                                             \
  Branch (39:13): [True: 0, False: 0]
40
0
            [[maybe_unused]] auto DP_NAME = debug_point_name;                 \
41
0
            { code; }                                                         \
  Branch (41:15): [True: 0, False: 0]
42
0
        }                                                                     \
43
0
    }
1005
0
        auto retry = dp->param<bool>("retry", false);
1006
0
        auto sleep_sec = dp->param<int>("sleep", 0);
1007
0
        std::this_thread::sleep_for(std::chrono::seconds(sleep_sec));
1008
0
        if (retry) { // return DELETE_BITMAP_LOCK_ERROR to let it retry
1009
0
            return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR>(
1010
0
                    "injected DELETE_BITMAP_LOCK_ERROR");
1011
0
        } else {
1012
0
            return Status::InternalError<false>("injected non-retryable error");
1013
0
        }
1014
0
    });
1015
1016
0
    return Status::OK();
1017
0
}
1018
1019
Status CloudTablet::save_delete_bitmap_to_ms(int64_t cur_version, int64_t txn_id,
1020
                                             DeleteBitmapPtr delete_bitmap,
1021
0
                                             int64_t next_visible_version) {
1022
0
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1023
0
    for (auto iter = delete_bitmap->delete_bitmap.begin();
1024
0
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
  Branch (1024:10): [True: 0, False: 0]
1025
        // skip sentinel mark, which is used for delete bitmap correctness check
1026
0
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
  Branch (1026:13): [True: 0, False: 0]
1027
0
            new_delete_bitmap->merge(
1028
0
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
1029
0
                    iter->second);
1030
0
        }
1031
0
    }
1032
1033
0
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, txn_id, LOAD_INITIATOR_ID,
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1034
0
                                                            new_delete_bitmap.get(), txn_id, false,
1035
0
                                                            next_visible_version));
1036
0
    return Status::OK();
1037
0
}
1038
1039
0
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
1040
0
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
1041
1042
    // sort the existing versions in ascending order
1043
0
    std::sort(existing_versions.begin(), existing_versions.end(),
1044
0
              [](const Version& a, const Version& b) {
1045
                  // simple because 2 versions are certainly not overlapping
1046
0
                  return a.first < b.first;
1047
0
              });
1048
1049
    // From the first version(=0), find the missing version until spec_version
1050
0
    int64_t last_version = -1;
1051
0
    Versions missed_versions;
1052
0
    for (const Version& version : existing_versions) {
  Branch (1052:33): [True: 0, False: 0]
1053
0
        if (version.first > last_version + 1) {
  Branch (1053:13): [True: 0, False: 0]
1054
            // there is a hole between versions
1055
0
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
1056
0
        }
1057
0
        last_version = version.second;
1058
0
        if (last_version >= spec_version) {
  Branch (1058:13): [True: 0, False: 0]
1059
0
            break;
1060
0
        }
1061
0
    }
1062
0
    if (last_version < spec_version) {
  Branch (1062:9): [True: 0, False: 0]
1063
        // there is a hole between the last version and the specificed version.
1064
0
        missed_versions.emplace_back(last_version + 1, spec_version);
1065
0
    }
1066
0
    return missed_versions;
1067
0
}
1068
1069
Status CloudTablet::calc_delete_bitmap_for_compaction(
1070
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
1071
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
1072
        int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
1073
0
        bool allow_delete_in_cumu_compaction, int64_t& get_delete_bitmap_lock_start_time) {
1074
0
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
1075
0
    std::unique_ptr<RowLocationSet> missed_rows;
1076
0
    if ((config::enable_missing_rows_correctness_check ||
  Branch (1076:10): [True: 0, False: 0]
1077
0
         config::enable_mow_compaction_correctness_check_core) &&
  Branch (1077:10): [True: 0, False: 0]
1078
0
        !allow_delete_in_cumu_compaction &&
  Branch (1078:9): [True: 0, False: 0]
1079
0
        (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
  Branch (1079:10): [True: 0, False: 0]
1080
0
         !config::enable_prune_delete_sign_when_base_compaction)) {
  Branch (1080:10): [True: 0, False: 0]
1081
        // also check duplicate key for base compaction when config::enable_prune_delete_sign_when_base_compaction==false
1082
0
        missed_rows = std::make_unique<RowLocationSet>();
1083
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
1084
0
    }
1085
1086
0
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
1087
0
    if (config::enable_rowid_conversion_correctness_check) {
  Branch (1087:9): [True: 0, False: 0]
1088
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
1089
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
1090
0
    }
1091
1092
    // 1. calc delete bitmap for historical data
1093
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1094
0
    Version version = max_version();
1095
0
    std::size_t missed_rows_size = 0;
1096
0
    calc_compaction_output_rowset_delete_bitmap(
1097
0
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
1098
0
            location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1099
0
    if (missed_rows) {
  Branch (1099:9): [True: 0, False: 0]
1100
0
        missed_rows_size = missed_rows->size();
1101
0
        if (!allow_delete_in_cumu_compaction) {
  Branch (1101:13): [True: 0, False: 0]
1102
0
            if ((compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
  Branch (1102:18): [True: 0, False: 0]
1103
0
                 !config::enable_prune_delete_sign_when_base_compaction) &&
  Branch (1103:18): [True: 0, False: 0]
1104
0
                tablet_state() == TABLET_RUNNING) {
  Branch (1104:17): [True: 0, False: 0]
1105
0
                if (merged_rows >= 0 && merged_rows != missed_rows_size) {
  Branch (1105:21): [True: 0, False: 0]
  Branch (1105:41): [True: 0, False: 0]
1106
0
                    std::string err_msg = fmt::format(
1107
0
                            "cumulative compaction: the merged rows({}) is not equal to missed "
1108
0
                            "rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
1109
0
                            merged_rows, missed_rows_size, tablet_id(), table_id());
1110
0
                    if (config::enable_mow_compaction_correctness_check_core) {
  Branch (1110:25): [True: 0, False: 0]
1111
0
                        CHECK(false) << err_msg;
1112
0
                    } else {
1113
0
                        DCHECK(false) << err_msg;
1114
0
                    }
1115
0
                }
1116
0
            }
1117
0
        }
1118
0
    }
1119
0
    if (location_map) {
  Branch (1119:9): [True: 0, False: 0]
1120
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1121
0
        location_map->clear();
1122
0
    }
1123
1124
    // 2. calc delete bitmap for incremental data
1125
0
    int64_t t1 = MonotonicMicros();
1126
0
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1127
0
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
1128
0
    int64_t t2 = MonotonicMicros();
1129
0
    get_delete_bitmap_lock_start_time = t2;
1130
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1131
0
    int64_t t3 = MonotonicMicros();
1132
1133
0
    calc_compaction_output_rowset_delete_bitmap(
1134
0
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
1135
0
            location_map.get(), *tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
1136
0
    int64_t t4 = MonotonicMicros();
1137
0
    if (location_map) {
  Branch (1137:9): [True: 0, False: 0]
1138
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
Line
Count
Source
637
0
    do {                                \
638
0
        Status _status_ = (stmt);       \
639
0
        if (UNLIKELY(!_status_.ok())) { \
Line
Count
Source
36
0
#define UNLIKELY(expr) __builtin_expect(!!(expr), 0)
  Branch (36:24): [True: 0, False: 0]
640
0
            return _status_;            \
641
0
        }                               \
642
0
    } while (false)
  Branch (642:14): [Folded - Ignored]
1139
0
    }
1140
0
    int64_t t5 = MonotonicMicros();
1141
1142
    // 3. store delete bitmap
1143
0
    auto st = _engine.meta_mgr().update_delete_bitmap(*this, -1, initiator,
1144
0
                                                      output_rowset_delete_bitmap.get());
1145
0
    int64_t t6 = MonotonicMicros();
1146
0
    LOG(INFO) << "calc_delete_bitmap_for_compaction, tablet_id=" << tablet_id()
1147
0
              << ", get lock cost " << (t2 - t1) << " us, sync rowsets cost " << (t3 - t2)
1148
0
              << " us, calc delete bitmap cost " << (t4 - t3) << " us, check rowid conversion cost "
1149
0
              << (t5 - t4) << " us, store delete bitmap cost " << (t6 - t5)
1150
0
              << " us, st=" << st.to_string();
1151
0
    return st;
1152
0
}
1153
1154
void CloudTablet::agg_delete_bitmap_for_compaction(
1155
        int64_t start_version, int64_t end_version, const std::vector<RowsetSharedPtr>& pre_rowsets,
1156
        DeleteBitmapPtr& new_delete_bitmap,
1157
0
        std::map<std::string, int64_t>& pre_rowset_to_versions) {
1158
0
    for (auto& rowset : pre_rowsets) {
  Branch (1158:23): [True: 0, False: 0]
1159
0
        for (uint32_t seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
  Branch (1159:35): [True: 0, False: 0]
1160
0
            auto d = tablet_meta()->delete_bitmap()->get_agg_without_cache(
1161
0
                    {rowset->rowset_id(), seg_id, end_version}, start_version);
1162
0
            if (d->isEmpty()) {
  Branch (1162:17): [True: 0, False: 0]
1163
0
                continue;
1164
0
            }
1165
0
            VLOG_DEBUG << "agg delete bitmap for tablet_id=" << tablet_id()
Line
Count
Source
41
0
#define VLOG_DEBUG VLOG(7)
1166
0
                       << ", rowset_id=" << rowset->rowset_id() << ", seg_id=" << seg_id
1167
0
                       << ", rowset_version=" << rowset->version().to_string()
1168
0
                       << ". compaction start_version=" << start_version
1169
0
                       << ", end_version=" << end_version
1170
0
                       << ". delete_bitmap cardinality=" << d->cardinality();
1171
0
            DeleteBitmap::BitmapKey end_key {rowset->rowset_id(), seg_id, end_version};
1172
0
            new_delete_bitmap->set(end_key, *d);
1173
0
            pre_rowset_to_versions[rowset->rowset_id().to_string()] = rowset->version().second;
1174
0
        }
1175
0
    }
1176
0
}
1177
1178
0
Status CloudTablet::sync_meta() {
1179
0
    if (!config::enable_file_cache) {
  Branch (1179:9): [True: 0, False: 0]
1180
0
        return Status::OK();
1181
0
    }
1182
1183
0
    TabletMetaSharedPtr tablet_meta;
1184
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
1185
0
    if (!st.ok()) {
  Branch (1185:9): [True: 0, False: 0]
1186
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
  Branch (1186:13): [True: 0, False: 0]
1187
0
            clear_cache();
1188
0
        }
1189
0
        return st;
1190
0
    }
1191
1192
0
    auto new_ttl_seconds = tablet_meta->ttl_seconds();
1193
0
    if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
  Branch (1193:9): [True: 0, False: 0]
1194
0
        _tablet_meta->set_ttl_seconds(new_ttl_seconds);
1195
0
        int64_t cur_time = UnixSeconds();
1196
0
        std::shared_lock rlock(_meta_lock);
1197
0
        for (auto& [_, rs] : _rs_version_map) {
  Branch (1197:28): [True: 0, False: 0]
1198
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
  Branch (1198:34): [True: 0, False: 0]
1199
0
                int64_t new_expiration_time =
1200
0
                        new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
1201
0
                new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
  Branch (1201:39): [True: 0, False: 0]
1202
0
                auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
1203
0
                auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
1204
0
                file_cache->modify_expiration_time(file_key, new_expiration_time);
1205
0
            }
1206
0
        }
1207
0
    }
1208
1209
0
    auto new_compaction_policy = tablet_meta->compaction_policy();
1210
0
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
  Branch (1210:9): [True: 0, False: 0]
1211
0
        _tablet_meta->set_compaction_policy(new_compaction_policy);
1212
0
    }
1213
0
    auto new_time_series_compaction_goal_size_mbytes =
1214
0
            tablet_meta->time_series_compaction_goal_size_mbytes();
1215
0
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
  Branch (1215:9): [True: 0, False: 0]
1216
0
        new_time_series_compaction_goal_size_mbytes) {
1217
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
1218
0
                new_time_series_compaction_goal_size_mbytes);
1219
0
    }
1220
0
    auto new_time_series_compaction_file_count_threshold =
1221
0
            tablet_meta->time_series_compaction_file_count_threshold();
1222
0
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
  Branch (1222:9): [True: 0, False: 0]
1223
0
        new_time_series_compaction_file_count_threshold) {
1224
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
1225
0
                new_time_series_compaction_file_count_threshold);
1226
0
    }
1227
0
    auto new_time_series_compaction_time_threshold_seconds =
1228
0
            tablet_meta->time_series_compaction_time_threshold_seconds();
1229
0
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
  Branch (1229:9): [True: 0, False: 0]
1230
0
        new_time_series_compaction_time_threshold_seconds) {
1231
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
1232
0
                new_time_series_compaction_time_threshold_seconds);
1233
0
    }
1234
0
    auto new_time_series_compaction_empty_rowsets_threshold =
1235
0
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
1236
0
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
  Branch (1236:9): [True: 0, False: 0]
1237
0
        new_time_series_compaction_empty_rowsets_threshold) {
1238
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
1239
0
                new_time_series_compaction_empty_rowsets_threshold);
1240
0
    }
1241
0
    auto new_time_series_compaction_level_threshold =
1242
0
            tablet_meta->time_series_compaction_level_threshold();
1243
0
    if (_tablet_meta->time_series_compaction_level_threshold() !=
  Branch (1243:9): [True: 0, False: 0]
1244
0
        new_time_series_compaction_level_threshold) {
1245
0
        _tablet_meta->set_time_series_compaction_level_threshold(
1246
0
                new_time_series_compaction_level_threshold);
1247
0
    }
1248
1249
0
    return Status::OK();
1250
0
}
1251
1252
0
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
1253
0
    std::shared_lock rdlock(_meta_lock);
1254
0
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
1255
0
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
1256
    // Currently, this information will not be used by the cloud report,
1257
    // but it may be used in the future.
1258
0
}
1259
1260
Status CloudTablet::check_delete_bitmap_cache(int64_t txn_id,
1261
0
                                              DeleteBitmap* expected_delete_bitmap) {
1262
0
    DeleteBitmapPtr cached_delete_bitmap;
1263
0
    CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
1264
0
    Status st = engine.txn_delete_bitmap_cache().get_delete_bitmap(
1265
0
            txn_id, tablet_id(), &cached_delete_bitmap, nullptr, nullptr);
1266
0
    if (st.ok()) {
  Branch (1266:9): [True: 0, False: 0]
1267
0
        bool res = (expected_delete_bitmap->cardinality() == cached_delete_bitmap->cardinality());
1268
0
        auto msg = fmt::format(
1269
0
                "delete bitmap cache check failed, cur_cardinality={}, cached_cardinality={}"
1270
0
                "txn_id={}, tablet_id={}",
1271
0
                expected_delete_bitmap->cardinality(), cached_delete_bitmap->cardinality(), txn_id,
1272
0
                tablet_id());
1273
0
        if (!res) {
  Branch (1273:13): [True: 0, False: 0]
1274
0
            DCHECK(res) << msg;
1275
0
            return Status::InternalError<false>(msg);
1276
0
        }
1277
0
    }
1278
0
    return Status::OK();
1279
0
}
1280
1281
#include "common/compile_check_end.h"
1282
} // namespace doris