Coverage Report

Created: 2024-11-21 13:41

/root/doris/be/src/cloud/cloud_tablet.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablet.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/prettywriter.h>
24
#include <rapidjson/rapidjson.h>
25
#include <rapidjson/stringbuffer.h>
26
27
#include <atomic>
28
#include <memory>
29
#include <shared_mutex>
30
#include <unordered_map>
31
#include <vector>
32
33
#include "cloud/cloud_meta_mgr.h"
34
#include "cloud/cloud_storage_engine.h"
35
#include "cloud/cloud_tablet_mgr.h"
36
#include "io/cache/block_file_cache_downloader.h"
37
#include "io/cache/block_file_cache_factory.h"
38
#include "olap/cumulative_compaction_time_series_policy.h"
39
#include "olap/olap_define.h"
40
#include "olap/rowset/beta_rowset.h"
41
#include "olap/rowset/rowset.h"
42
#include "olap/rowset/rowset_factory.h"
43
#include "olap/rowset/rowset_fwd.h"
44
#include "olap/rowset/rowset_writer.h"
45
#include "olap/rowset/segment_v2/inverted_index_desc.h"
46
#include "olap/storage_policy.h"
47
#include "olap/tablet_schema.h"
48
#include "olap/txn_manager.h"
49
#include "util/debug_points.h"
50
#include "vec/common/schema_util.h"
51
52
namespace doris {
53
using namespace ErrorCode;
54
55
static constexpr int COMPACTION_DELETE_BITMAP_LOCK_ID = -1;
56
57
CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta)
58
0
        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
59
60
0
CloudTablet::~CloudTablet() = default;
61
62
0
bool CloudTablet::exceed_version_limit(int32_t limit) {
63
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
64
0
}
65
66
Status CloudTablet::capture_consistent_rowsets_unlocked(
67
0
        const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const {
68
0
    Versions version_path;
69
0
    auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
70
0
    if (!st.ok()) {
71
        // Check no missed versions or req version is merged
72
0
        auto missed_versions = get_missed_versions(spec_version.second);
73
0
        if (missed_versions.empty()) {
74
0
            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
75
0
        }
76
0
        st.append(" tablet_id=" + std::to_string(tablet_id()));
77
0
        return st;
78
0
    }
79
0
    VLOG_DEBUG << "capture consitent versions: " << version_path;
80
0
    return _capture_consistent_rowsets_unlocked(version_path, rowsets);
81
0
}
82
83
Status CloudTablet::capture_rs_readers(const Version& spec_version,
84
                                       std::vector<RowSetSplits>* rs_splits,
85
0
                                       bool skip_missing_version) {
86
0
    DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", {
87
0
        LOG_WARNING("CloudTablet.capture_rs_readers.return e-230").tag("tablet_id", tablet_id());
88
0
        return Status::Error<false>(-230, "injected error");
89
0
    });
90
0
    Versions version_path;
91
0
    std::shared_lock rlock(_meta_lock);
92
0
    auto st = _timestamped_version_tracker.capture_consistent_versions(spec_version, &version_path);
93
0
    if (!st.ok()) {
94
0
        rlock.unlock(); // avoid logging in lock range
95
        // Check no missed versions or req version is merged
96
0
        auto missed_versions = get_missed_versions(spec_version.second);
97
0
        if (missed_versions.empty()) {
98
0
            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
99
0
            st.append(" versions are already compacted, ");
100
0
        }
101
0
        st.append(" tablet_id=" + std::to_string(tablet_id()));
102
        // clang-format off
103
0
        LOG(WARNING) << st << '\n' << [this]() { std::string json; get_compaction_status(&json); return json; }();
104
        // clang-format on
105
0
        return st;
106
0
    }
107
0
    VLOG_DEBUG << "capture consitent versions: " << version_path;
108
0
    return capture_rs_readers_unlocked(version_path, rs_splits);
109
0
}
110
111
0
Status CloudTablet::merge_rowsets_schema() {
112
    // Find the rowset with the max version
113
0
    auto max_version_rowset =
114
0
            std::max_element(
115
0
                    _rs_version_map.begin(), _rs_version_map.end(),
116
0
                    [](const auto& a, const auto& b) {
117
0
                        return !a.second->tablet_schema()
118
0
                                       ? true
119
0
                                       : (!b.second->tablet_schema()
120
0
                                                  ? false
121
0
                                                  : a.second->tablet_schema()->schema_version() <
122
0
                                                            b.second->tablet_schema()
123
0
                                                                    ->schema_version());
124
0
                    })
125
0
                    ->second;
126
0
    TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
127
    // If the schema has variant columns, perform a merge to create a wide tablet schema
128
0
    if (max_version_schema->num_variant_columns() > 0) {
129
0
        std::vector<TabletSchemaSPtr> schemas;
130
0
        std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
131
0
                       [](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
132
        // Merge the collected schemas to obtain the least common schema
133
0
        RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
134
0
                                                                         max_version_schema));
135
0
        VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
136
0
        _merged_tablet_schema = max_version_schema;
137
0
    }
138
0
    return Status::OK();
139
0
}
140
141
// There are only two tablet_states RUNNING and NOT_READY in cloud mode
142
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
143
0
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
144
0
    RETURN_IF_ERROR(sync_if_not_running());
145
146
0
    if (query_version > 0) {
147
0
        std::shared_lock rlock(_meta_lock);
148
0
        if (_max_version >= query_version) {
149
0
            return Status::OK();
150
0
        }
151
0
    }
152
153
    // serially execute sync to reduce unnecessary network overhead
154
0
    std::lock_guard lock(_sync_meta_lock);
155
0
    if (query_version > 0) {
156
0
        std::shared_lock rlock(_meta_lock);
157
0
        if (_max_version >= query_version) {
158
0
            return Status::OK();
159
0
        }
160
0
    }
161
162
0
    auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
163
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
164
0
        clear_cache();
165
0
    }
166
167
0
    return st;
168
0
}
169
170
// Sync tablet meta and all rowset meta if not running.
171
// This could happen when BE didn't finish schema change job and another BE committed this schema change job.
172
// It should be a quite rare situation.
173
0
Status CloudTablet::sync_if_not_running() {
174
0
    if (tablet_state() == TABLET_RUNNING) {
175
0
        return Status::OK();
176
0
    }
177
178
    // Serially execute sync to reduce unnecessary network overhead
179
0
    std::lock_guard lock(_sync_meta_lock);
180
181
0
    {
182
0
        std::shared_lock rlock(_meta_lock);
183
0
        if (tablet_state() == TABLET_RUNNING) {
184
0
            return Status::OK();
185
0
        }
186
0
    }
187
188
0
    TabletMetaSharedPtr tablet_meta;
189
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
190
0
    if (!st.ok()) {
191
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
192
0
            clear_cache();
193
0
        }
194
0
        return st;
195
0
    }
196
197
0
    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
198
        // MoW may go to here when load while schema change
199
0
        return Status::OK();
200
0
    }
201
202
0
    TimestampedVersionTracker empty_tracker;
203
0
    {
204
0
        std::lock_guard wlock(_meta_lock);
205
0
        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
206
0
        _rs_version_map.clear();
207
0
        _stale_rs_version_map.clear();
208
0
        std::swap(_timestamped_version_tracker, empty_tracker);
209
0
        _tablet_meta->clear_rowsets();
210
0
        _tablet_meta->clear_stale_rowset();
211
0
        _max_version = -1;
212
0
    }
213
214
0
    st = _engine.meta_mgr().sync_tablet_rowsets(this);
215
0
    if (st.is<ErrorCode::NOT_FOUND>()) {
216
0
        clear_cache();
217
0
    }
218
0
    return st;
219
0
}
220
221
0
TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
222
0
    return _merged_tablet_schema;
223
0
}
224
225
void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
226
                              std::unique_lock<std::shared_mutex>& meta_lock,
227
0
                              bool warmup_delta_data) {
228
0
    if (to_add.empty()) {
229
0
        return;
230
0
    }
231
232
0
    auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) {
233
0
        for (auto& rs : rowsets) {
234
0
            if (version_overlap || warmup_delta_data) {
235
#ifndef BE_TEST
236
                // Warmup rowset data in background
237
                for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
238
                    const auto& rowset_meta = rs->rowset_meta();
239
                    constexpr int64_t interval = 600; // 10 mins
240
                    // When BE restart and receive the `load_sync` rpc, it will sync all historical rowsets first time.
241
                    // So we need to filter out the old rowsets avoid to download the whole table.
242
                    if (warmup_delta_data &&
243
                        ::time(nullptr) - rowset_meta->newest_write_timestamp() >= interval) {
244
                        continue;
245
                    }
246
247
                    auto storage_resource = rowset_meta->remote_storage_resource();
248
                    if (!storage_resource) {
249
                        LOG(WARNING) << storage_resource.error();
250
                        continue;
251
                    }
252
253
                    int64_t expiration_time =
254
                            _tablet_meta->ttl_seconds() == 0 ||
255
                                            rowset_meta->newest_write_timestamp() <= 0
256
                                    ? 0
257
                                    : rowset_meta->newest_write_timestamp() +
258
                                              _tablet_meta->ttl_seconds();
259
                    _engine.file_cache_block_downloader().submit_download_task(
260
                            io::DownloadFileMeta {
261
                                    .path = storage_resource.value()->remote_segment_path(
262
                                            *rowset_meta, seg_id),
263
                                    .file_size = rs->rowset_meta()->segment_file_size(seg_id),
264
                                    .file_system = storage_resource.value()->fs,
265
                                    .ctx =
266
                                            {
267
                                                    .expiration_time = expiration_time,
268
                                            },
269
                                    .download_done {},
270
                            });
271
272
                    auto download_idx_file = [&](const io::Path& idx_path) {
273
                        io::DownloadFileMeta meta {
274
                                .path = idx_path,
275
                                .file_size = -1,
276
                                .file_system = storage_resource.value()->fs,
277
                                .ctx =
278
                                        {
279
                                                .expiration_time = expiration_time,
280
                                        },
281
                                .download_done {},
282
                        };
283
                        _engine.file_cache_block_downloader().submit_download_task(std::move(meta));
284
                    };
285
                    auto schema_ptr = rowset_meta->tablet_schema();
286
                    auto idx_version = schema_ptr->get_inverted_index_storage_format();
287
                    if (idx_version == InvertedIndexStorageFormatPB::V1) {
288
                        for (const auto& index : schema_ptr->inverted_indexes()) {
289
                            auto idx_path = storage_resource.value()->remote_idx_v1_path(
290
                                    *rowset_meta, seg_id, index->index_id(),
291
                                    index->get_index_suffix());
292
                            download_idx_file(idx_path);
293
                        }
294
                    } else {
295
                        if (schema_ptr->has_inverted_index()) {
296
                            auto idx_path = storage_resource.value()->remote_idx_v2_path(
297
                                    *rowset_meta, seg_id);
298
                            download_idx_file(idx_path);
299
                        }
300
                    }
301
                }
302
#endif
303
0
            }
304
0
            _rs_version_map.emplace(rs->version(), rs);
305
0
            _timestamped_version_tracker.add_version(rs->version());
306
0
            _max_version = std::max(rs->end_version(), _max_version);
307
0
            update_base_size(*rs);
308
0
        }
309
0
        _tablet_meta->add_rowsets_unchecked(rowsets);
310
0
    };
311
312
0
    if (!version_overlap) {
313
0
        add_rowsets_directly(to_add);
314
0
        return;
315
0
    }
316
317
    // Filter out existed rowsets
318
0
    auto remove_it =
319
0
            std::remove_if(to_add.begin(), to_add.end(), [this](const RowsetSharedPtr& rs) {
320
0
                if (auto find_it = _rs_version_map.find(rs->version());
321
0
                    find_it == _rs_version_map.end()) {
322
0
                    return false;
323
0
                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
324
0
                    return true; // Same rowset
325
0
                }
326
327
                // If version of rowset in `to_add` is equal to rowset in tablet but rowset_id is not equal,
328
                // replace existed rowset with `to_add` rowset. This may occur when:
329
                //  1. schema change converts rowsets which have been double written to new tablet
330
                //  2. cumu compaction picks single overlapping input rowset to perform compaction
331
0
                _tablet_meta->delete_rs_meta_by_version(rs->version(), nullptr);
332
0
                _rs_version_map[rs->version()] = rs;
333
0
                _tablet_meta->add_rowsets_unchecked({rs});
334
0
                update_base_size(*rs);
335
0
                return true;
336
0
            });
337
338
0
    to_add.erase(remove_it, to_add.end());
339
340
    // delete rowsets with overlapped version
341
0
    std::vector<RowsetSharedPtr> to_add_directly;
342
0
    for (auto& to_add_rs : to_add) {
343
        // delete rowsets with overlapped version
344
0
        std::vector<RowsetSharedPtr> to_delete;
345
0
        Version to_add_v = to_add_rs->version();
346
        // if start_version  > max_version, we can skip checking overlap here.
347
0
        if (to_add_v.first > _max_version) {
348
            // if start_version  > max_version, we can skip checking overlap here.
349
0
            to_add_directly.push_back(to_add_rs);
350
0
        } else {
351
0
            to_add_directly.push_back(to_add_rs);
352
0
            for (auto& [v, rs] : _rs_version_map) {
353
0
                if (to_add_v.contains(v)) {
354
0
                    to_delete.push_back(rs);
355
0
                }
356
0
            }
357
0
            delete_rowsets(to_delete, meta_lock);
358
0
        }
359
0
    }
360
361
0
    add_rowsets_directly(to_add_directly);
362
0
}
363
364
void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
365
0
                                 std::unique_lock<std::shared_mutex>&) {
366
0
    if (to_delete.empty()) {
367
0
        return;
368
0
    }
369
0
    std::vector<RowsetMetaSharedPtr> rs_metas;
370
0
    rs_metas.reserve(to_delete.size());
371
0
    for (auto&& rs : to_delete) {
372
0
        rs_metas.push_back(rs->rowset_meta());
373
0
        _stale_rs_version_map[rs->version()] = rs;
374
0
    }
375
0
    _timestamped_version_tracker.add_stale_path_version(rs_metas);
376
0
    for (auto&& rs : to_delete) {
377
0
        _rs_version_map.erase(rs->version());
378
0
    }
379
380
0
    _tablet_meta->modify_rs_metas({}, rs_metas, false);
381
0
}
382
383
0
int CloudTablet::delete_expired_stale_rowsets() {
384
0
    std::vector<RowsetSharedPtr> expired_rowsets;
385
0
    int64_t expired_stale_sweep_endtime =
386
0
            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
387
0
    std::vector<std::string> version_to_delete;
388
0
    {
389
0
        std::unique_lock wlock(_meta_lock);
390
391
0
        std::vector<int64_t> path_ids;
392
        // capture the path version to delete
393
0
        _timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, &path_ids);
394
395
0
        if (path_ids.empty()) {
396
0
            return 0;
397
0
        }
398
399
0
        for (int64_t path_id : path_ids) {
400
0
            int start_version = -1;
401
0
            int end_version = -1;
402
            // delete stale versions in version graph
403
0
            auto version_path = _timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
404
0
            for (auto& v_ts : version_path->timestamped_versions()) {
405
0
                auto rs_it = _stale_rs_version_map.find(v_ts->version());
406
0
                if (rs_it != _stale_rs_version_map.end()) {
407
0
                    expired_rowsets.push_back(rs_it->second);
408
0
                    _stale_rs_version_map.erase(rs_it);
409
0
                } else {
410
0
                    LOG(WARNING) << "cannot find stale rowset " << v_ts->version() << " in tablet "
411
0
                                 << tablet_id();
412
                    // clang-format off
413
0
                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); std::string json; get_compaction_status(&json); return json; }();
414
                    // clang-format on
415
0
                }
416
0
                if (start_version < 0) {
417
0
                    start_version = v_ts->version().first;
418
0
                }
419
0
                end_version = v_ts->version().second;
420
0
                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
421
0
            }
422
0
            Version version(start_version, end_version);
423
0
            version_to_delete.emplace_back(version.to_string());
424
0
        }
425
0
        _reconstruct_version_tracker_if_necessary();
426
0
    }
427
0
    _tablet_meta->delete_bitmap().remove_stale_delete_bitmap_from_queue(version_to_delete);
428
0
    recycle_cached_data(expired_rowsets);
429
0
    return expired_rowsets.size();
430
0
}
431
432
0
void CloudTablet::update_base_size(const Rowset& rs) {
433
    // Define base rowset as the rowset of version [2-x]
434
0
    if (rs.start_version() == 2) {
435
0
        _base_size = rs.total_disk_size();
436
0
    }
437
0
}
438
439
0
void CloudTablet::clear_cache() {
440
0
    CloudTablet::recycle_cached_data(get_snapshot_rowset(true));
441
0
    _engine.tablet_mgr().erase_tablet(tablet_id());
442
0
}
443
444
0
void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets) {
445
0
    for (auto& rs : rowsets) {
446
        // Clear cached opened segments and inverted index cache in memory
447
0
        rs->clear_cache();
448
0
    }
449
450
0
    if (config::enable_file_cache) {
451
0
        for (const auto& rs : rowsets) {
452
0
            if (rs.use_count() >= 1) {
453
0
                LOG(WARNING) << "Rowset " << rs->rowset_id().to_string() << " has "
454
0
                             << rs.use_count()
455
0
                             << " references. File Cache won't be recycled when query is using it.";
456
0
                continue;
457
0
            }
458
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
459
                // TODO: Segment::file_cache_key
460
0
                auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
461
0
                auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
462
0
                file_cache->remove_if_cached_async(file_key);
463
0
            }
464
0
        }
465
0
    }
466
0
}
467
468
void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
469
0
                                          int64_t num_rows, int64_t data_size) {
470
0
    _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
471
0
    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
472
0
    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
473
0
    _approximate_data_size.store(data_size, std::memory_order_relaxed);
474
0
    int64_t cumu_num_deltas = 0;
475
0
    int64_t cumu_num_rowsets = 0;
476
0
    auto cp = _cumulative_point.load(std::memory_order_relaxed);
477
0
    for (auto& [v, r] : _rs_version_map) {
478
0
        if (v.second < cp) {
479
0
            continue;
480
0
        }
481
482
0
        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 1;
483
0
        ++cumu_num_rowsets;
484
0
    }
485
0
    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, std::memory_order_relaxed);
486
0
    _approximate_cumu_num_deltas.store(cumu_num_deltas, std::memory_order_relaxed);
487
0
}
488
489
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
490
0
        RowsetWriterContext& context, bool vertical) {
491
0
    context.rowset_id = _engine.next_rowset_id();
492
    // FIXME(plat1ko): Seems `tablet_id` and `index_id` has been set repeatedly
493
0
    context.tablet_id = tablet_id();
494
0
    context.index_id = index_id();
495
0
    context.partition_id = partition_id();
496
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
497
0
    return RowsetFactory::create_rowset_writer(_engine, context, vertical);
498
0
}
499
500
// create a rowset writer with rowset_id and seg_id
501
// after writer, merge this transient rowset with original rowset
502
Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_transient_rowset_writer(
503
        const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
504
0
        int64_t txn_expiration) {
505
0
    if (rowset.rowset_meta()->rowset_state() != RowsetStatePB::BEGIN_PARTIAL_UPDATE) [[unlikely]] {
506
        // May cause the segment files generated by the transient rowset writer unable to be
507
        // recycled, see `CloudRowsetWriter::build` for detail.
508
0
        LOG(WARNING) << "Wrong rowset state: " << rowset.rowset_meta()->rowset_state();
509
0
        DCHECK(false) << rowset.rowset_meta()->rowset_state();
510
0
    }
511
512
0
    RowsetWriterContext context;
513
0
    context.rowset_state = PREPARED;
514
0
    context.segments_overlap = OVERLAPPING;
515
    // During a partial update, the extracted columns of a variant should not be included in the tablet schema.
516
    // This is because the partial update for a variant needs to ignore the extracted columns.
517
    // Otherwise, the schema types in different rowsets might be inconsistent. When performing a partial update,
518
    // the complete variant is constructed by reading all the sub-columns of the variant.
519
0
    context.tablet_schema = rowset.tablet_schema()->copy_without_variant_extracted_columns();
520
0
    context.newest_write_timestamp = UnixSeconds();
521
0
    context.tablet_id = table_id();
522
0
    context.enable_segcompaction = false;
523
0
    context.write_type = DataWriteType::TYPE_DIRECT;
524
0
    context.partial_update_info = std::move(partial_update_info);
525
0
    context.is_transient_rowset_writer = true;
526
0
    context.rowset_id = rowset.rowset_id();
527
0
    context.tablet_id = tablet_id();
528
0
    context.index_id = index_id();
529
0
    context.partition_id = partition_id();
530
0
    context.enable_unique_key_merge_on_write = enable_unique_key_merge_on_write();
531
0
    context.txn_expiration = txn_expiration;
532
533
0
    auto storage_resource = rowset.rowset_meta()->remote_storage_resource();
534
0
    if (!storage_resource) {
535
0
        return ResultError(std::move(storage_resource.error()));
536
0
    }
537
538
0
    context.storage_resource = *storage_resource.value();
539
540
0
    return RowsetFactory::create_rowset_writer(_engine, context, false)
541
0
            .transform([&](auto&& writer) {
542
0
                writer->set_segment_start_id(rowset.num_segments());
543
0
                return writer;
544
0
            });
545
0
}
546
547
0
int64_t CloudTablet::get_cloud_base_compaction_score() const {
548
0
    if (_tablet_meta->compaction_policy() == CUMULATIVE_TIME_SERIES_POLICY) {
549
0
        bool has_delete = false;
550
0
        int64_t point = cumulative_layer_point();
551
0
        std::shared_lock<std::shared_mutex> rlock(_meta_lock);
552
0
        for (const auto& rs_meta : _tablet_meta->all_rs_metas()) {
553
0
            if (rs_meta->start_version() >= point) {
554
0
                continue;
555
0
            }
556
0
            if (rs_meta->has_delete_predicate()) {
557
0
                has_delete = true;
558
0
                break;
559
0
            }
560
0
        }
561
0
        if (!has_delete) {
562
0
            return 0;
563
0
        }
564
0
    }
565
566
0
    return _approximate_num_rowsets.load(std::memory_order_relaxed) -
567
0
           _approximate_cumu_num_rowsets.load(std::memory_order_relaxed);
568
0
}
569
570
0
int64_t CloudTablet::get_cloud_cumu_compaction_score() const {
571
    // TODO(plat1ko): Propose an algorithm that considers tablet's key type, number of delete rowsets,
572
    //  number of tablet versions simultaneously.
573
0
    return _approximate_cumu_num_deltas.load(std::memory_order_relaxed);
574
0
}
575
576
// return a json string to show the compaction status of this tablet
577
0
void CloudTablet::get_compaction_status(std::string* json_result) {
578
0
    rapidjson::Document root;
579
0
    root.SetObject();
580
581
0
    rapidjson::Document path_arr;
582
0
    path_arr.SetArray();
583
584
0
    std::vector<RowsetSharedPtr> rowsets;
585
0
    std::vector<RowsetSharedPtr> stale_rowsets;
586
0
    {
587
0
        std::shared_lock rdlock(_meta_lock);
588
0
        rowsets.reserve(_rs_version_map.size());
589
0
        for (auto& it : _rs_version_map) {
590
0
            rowsets.push_back(it.second);
591
0
        }
592
0
        stale_rowsets.reserve(_stale_rs_version_map.size());
593
0
        for (auto& it : _stale_rs_version_map) {
594
0
            stale_rowsets.push_back(it.second);
595
0
        }
596
0
    }
597
0
    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
598
0
    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
599
600
    // get snapshot version path json_doc
601
0
    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
602
0
    root.AddMember("cumulative point", _cumulative_point.load(), root.GetAllocator());
603
604
    // print all rowsets' version as an array
605
0
    rapidjson::Document versions_arr;
606
0
    rapidjson::Document missing_versions_arr;
607
0
    versions_arr.SetArray();
608
0
    missing_versions_arr.SetArray();
609
0
    int64_t last_version = -1;
610
0
    for (auto& rowset : rowsets) {
611
0
        const Version& ver = rowset->version();
612
0
        if (ver.first != last_version + 1) {
613
0
            rapidjson::Value miss_value;
614
0
            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, ver.first - 1).c_str(),
615
0
                                 missing_versions_arr.GetAllocator());
616
0
            missing_versions_arr.PushBack(miss_value, missing_versions_arr.GetAllocator());
617
0
        }
618
0
        rapidjson::Value value;
619
0
        std::string version_str = rowset->get_rowset_info_str();
620
0
        value.SetString(version_str.c_str(), version_str.length(), versions_arr.GetAllocator());
621
0
        versions_arr.PushBack(value, versions_arr.GetAllocator());
622
0
        last_version = ver.second;
623
0
    }
624
0
    root.AddMember("rowsets", versions_arr, root.GetAllocator());
625
0
    root.AddMember("missing_rowsets", missing_versions_arr, root.GetAllocator());
626
627
    // print all stale rowsets' version as an array
628
0
    rapidjson::Document stale_versions_arr;
629
0
    stale_versions_arr.SetArray();
630
0
    for (auto& rowset : stale_rowsets) {
631
0
        rapidjson::Value value;
632
0
        std::string version_str = rowset->get_rowset_info_str();
633
0
        value.SetString(version_str.c_str(), version_str.length(),
634
0
                        stale_versions_arr.GetAllocator());
635
0
        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
636
0
    }
637
0
    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
638
639
    // add stale version rowsets
640
0
    root.AddMember("stale version path", path_arr, root.GetAllocator());
641
642
    // to json string
643
0
    rapidjson::StringBuffer strbuf;
644
0
    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
645
0
    root.Accept(writer);
646
0
    *json_result = std::string(strbuf.GetString());
647
0
}
648
649
0
void CloudTablet::set_cumulative_layer_point(int64_t new_point) {
650
    // cumulative point should only be reset to -1, or be increased
651
0
    CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
652
0
            << "Unexpected cumulative point: " << new_point
653
0
            << ", origin: " << _cumulative_point.load();
654
0
    _cumulative_point = new_point;
655
0
}
656
657
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_base_compaction() {
658
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
659
0
    {
660
0
        std::shared_lock rlock(_meta_lock);
661
0
        for (const auto& [version, rs] : _rs_version_map) {
662
0
            if (version.first != 0 && version.first < _cumulative_point &&
663
0
                (_alter_version == -1 || version.second <= _alter_version)) {
664
0
                candidate_rowsets.push_back(rs);
665
0
            }
666
0
        }
667
0
    }
668
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
669
0
    return candidate_rowsets;
670
0
}
671
672
0
std::vector<RowsetSharedPtr> CloudTablet::pick_candidate_rowsets_to_full_compaction() {
673
0
    std::vector<RowsetSharedPtr> candidate_rowsets;
674
0
    {
675
0
        std::shared_lock rlock(_meta_lock);
676
0
        for (auto& [v, rs] : _rs_version_map) {
677
            // MUST NOT compact rowset [0-1] for some historical reasons (see cloud_schema_change)
678
0
            if (v.first != 0) {
679
0
                candidate_rowsets.push_back(rs);
680
0
            }
681
0
        }
682
0
    }
683
0
    std::sort(candidate_rowsets.begin(), candidate_rowsets.end(), Rowset::comparator);
684
0
    return candidate_rowsets;
685
0
}
686
687
0
CalcDeleteBitmapExecutor* CloudTablet::calc_delete_bitmap_executor() {
688
0
    return _engine.calc_delete_bitmap_executor();
689
0
}
690
691
Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
692
                                       DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
693
0
                                       const RowsetIdUnorderedSet& cur_rowset_ids) {
694
0
    RowsetSharedPtr rowset = txn_info->rowset;
695
0
    int64_t cur_version = rowset->start_version();
696
    // update delete bitmap info, in order to avoid recalculation when trying again
697
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
698
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE));
699
700
0
    if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update() &&
701
0
        rowset_writer->num_rows() > 0) {
702
0
        const auto& rowset_meta = rowset->rowset_meta();
703
0
        RETURN_IF_ERROR(_engine.meta_mgr().update_tmp_rowset(*rowset_meta));
704
0
    }
705
706
0
    DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
707
0
    for (auto iter = delete_bitmap->delete_bitmap.begin();
708
0
         iter != delete_bitmap->delete_bitmap.end(); ++iter) {
709
        // skip sentinel mark, which is used for delete bitmap correctness check
710
0
        if (std::get<1>(iter->first) != DeleteBitmap::INVALID_SEGMENT_ID) {
711
0
            new_delete_bitmap->merge(
712
0
                    {std::get<0>(iter->first), std::get<1>(iter->first), cur_version},
713
0
                    iter->second);
714
0
        }
715
0
    }
716
717
0
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
718
0
            *this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
719
720
    // store the delete bitmap with sentinel marks in txn_delete_bitmap_cache because if the txn is retried for some reason,
721
    // it will use the delete bitmap from txn_delete_bitmap_cache when re-calculating the delete bitmap, during which it will do
722
    // delete bitmap correctness check. If we store the new_delete_bitmap, the delete bitmap correctness check will fail
723
0
    RETURN_IF_ERROR(_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
724
0
            txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED,
725
0
            txn_info->publish_info));
726
727
0
    return Status::OK();
728
0
}
729
730
0
Versions CloudTablet::calc_missed_versions(int64_t spec_version, Versions existing_versions) const {
731
0
    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
732
733
    // sort the existing versions in ascending order
734
0
    std::sort(existing_versions.begin(), existing_versions.end(),
735
0
              [](const Version& a, const Version& b) {
736
                  // simple because 2 versions are certainly not overlapping
737
0
                  return a.first < b.first;
738
0
              });
739
740
    // From the first version(=0), find the missing version until spec_version
741
0
    int64_t last_version = -1;
742
0
    Versions missed_versions;
743
0
    for (const Version& version : existing_versions) {
744
0
        if (version.first > last_version + 1) {
745
            // there is a hole between versions
746
0
            missed_versions.emplace_back(last_version + 1, std::min(version.first, spec_version));
747
0
        }
748
0
        last_version = version.second;
749
0
        if (last_version >= spec_version) {
750
0
            break;
751
0
        }
752
0
    }
753
0
    if (last_version < spec_version) {
754
        // there is a hole between the last version and the specificed version.
755
0
        missed_versions.emplace_back(last_version + 1, spec_version);
756
0
    }
757
0
    return missed_versions;
758
0
}
759
760
Status CloudTablet::calc_delete_bitmap_for_compaction(
761
        const std::vector<RowsetSharedPtr>& input_rowsets, const RowsetSharedPtr& output_rowset,
762
        const RowIdConversion& rowid_conversion, ReaderType compaction_type, int64_t merged_rows,
763
        int64_t filtered_rows, int64_t initiator, DeleteBitmapPtr& output_rowset_delete_bitmap,
764
0
        bool allow_delete_in_cumu_compaction) {
765
0
    output_rowset_delete_bitmap = std::make_shared<DeleteBitmap>(tablet_id());
766
0
    std::unique_ptr<RowLocationSet> missed_rows;
767
0
    if ((config::enable_missing_rows_correctness_check ||
768
0
         config::enable_mow_compaction_correctness_check_core) &&
769
0
        !allow_delete_in_cumu_compaction &&
770
0
        compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION) {
771
0
        missed_rows = std::make_unique<RowLocationSet>();
772
0
        LOG(INFO) << "RowLocation Set inited succ for tablet:" << tablet_id();
773
0
    }
774
775
0
    std::unique_ptr<std::map<RowsetSharedPtr, RowLocationPairList>> location_map;
776
0
    if (config::enable_rowid_conversion_correctness_check) {
777
0
        location_map = std::make_unique<std::map<RowsetSharedPtr, RowLocationPairList>>();
778
0
        LOG(INFO) << "Location Map inited succ for tablet:" << tablet_id();
779
0
    }
780
781
    // 1. calc delete bitmap for historical data
782
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
783
0
    Version version = max_version();
784
0
    std::size_t missed_rows_size = 0;
785
0
    calc_compaction_output_rowset_delete_bitmap(
786
0
            input_rowsets, rowid_conversion, 0, version.second + 1, missed_rows.get(),
787
0
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
788
0
    if (missed_rows) {
789
0
        missed_rows_size = missed_rows->size();
790
0
        if (!allow_delete_in_cumu_compaction) {
791
0
            if (compaction_type == ReaderType::READER_CUMULATIVE_COMPACTION &&
792
0
                tablet_state() == TABLET_RUNNING) {
793
0
                if (merged_rows + filtered_rows >= 0 &&
794
0
                    merged_rows + filtered_rows != missed_rows_size) {
795
0
                    std::string err_msg = fmt::format(
796
0
                            "cumulative compaction: the merged rows({}), the filtered rows({}) is "
797
0
                            "not equal to missed rows({}) in rowid conversion, tablet_id: {}, "
798
0
                            "table_id:{}",
799
0
                            merged_rows, filtered_rows, missed_rows_size, tablet_id(), table_id());
800
0
                    if (config::enable_mow_compaction_correctness_check_core) {
801
0
                        CHECK(false) << err_msg;
802
0
                    } else {
803
0
                        DCHECK(false) << err_msg;
804
0
                    }
805
0
                    LOG(WARNING) << err_msg;
806
0
                }
807
0
            }
808
0
        }
809
0
    }
810
0
    if (location_map) {
811
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
812
0
        location_map->clear();
813
0
    }
814
815
    // 2. calc delete bitmap for incremental data
816
0
    RETURN_IF_ERROR(_engine.meta_mgr().get_delete_bitmap_update_lock(
817
0
            *this, COMPACTION_DELETE_BITMAP_LOCK_ID, initiator));
818
0
    RETURN_IF_ERROR(_engine.meta_mgr().sync_tablet_rowsets(this));
819
820
0
    calc_compaction_output_rowset_delete_bitmap(
821
0
            input_rowsets, rowid_conversion, version.second, UINT64_MAX, missed_rows.get(),
822
0
            location_map.get(), tablet_meta()->delete_bitmap(), output_rowset_delete_bitmap.get());
823
0
    if (location_map) {
824
0
        RETURN_IF_ERROR(check_rowid_conversion(output_rowset, *location_map));
825
0
    }
826
0
    if (missed_rows) {
827
0
        DCHECK_EQ(missed_rows->size(), missed_rows_size);
828
0
        if (missed_rows->size() != missed_rows_size) {
829
0
            LOG(WARNING) << "missed rows don't match, before: " << missed_rows_size
830
0
                         << " after: " << missed_rows->size();
831
0
        }
832
0
    }
833
834
    // 3. store delete bitmap
835
0
    RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(*this, -1, initiator,
836
0
                                                            output_rowset_delete_bitmap.get()));
837
0
    return Status::OK();
838
0
}
839
840
0
Status CloudTablet::sync_meta() {
841
0
    if (!config::enable_file_cache) {
842
0
        return Status::OK();
843
0
    }
844
845
0
    TabletMetaSharedPtr tablet_meta;
846
0
    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
847
0
    if (!st.ok()) {
848
0
        if (st.is<ErrorCode::NOT_FOUND>()) {
849
            // TODO(Lchangliang): recycle_resources_by_self();
850
0
        }
851
0
        return st;
852
0
    }
853
0
    if (tablet_meta->tablet_state() != TABLET_RUNNING) { // impossible
854
0
        return Status::InternalError("invalid tablet state. tablet_id={}", tablet_id());
855
0
    }
856
857
0
    auto new_ttl_seconds = tablet_meta->ttl_seconds();
858
0
    if (_tablet_meta->ttl_seconds() != new_ttl_seconds) {
859
0
        _tablet_meta->set_ttl_seconds(new_ttl_seconds);
860
0
        int64_t cur_time = UnixSeconds();
861
0
        std::shared_lock rlock(_meta_lock);
862
0
        for (auto& [_, rs] : _rs_version_map) {
863
0
            for (int seg_id = 0; seg_id < rs->num_segments(); ++seg_id) {
864
0
                int64_t new_expiration_time =
865
0
                        new_ttl_seconds + rs->rowset_meta()->newest_write_timestamp();
866
0
                new_expiration_time = new_expiration_time > cur_time ? new_expiration_time : 0;
867
0
                auto file_key = Segment::file_cache_key(rs->rowset_id().to_string(), seg_id);
868
0
                auto* file_cache = io::FileCacheFactory::instance()->get_by_path(file_key);
869
0
                file_cache->modify_expiration_time(file_key, new_expiration_time);
870
0
            }
871
0
        }
872
0
    }
873
874
0
    auto new_compaction_policy = tablet_meta->compaction_policy();
875
0
    if (_tablet_meta->compaction_policy() != new_compaction_policy) {
876
0
        _tablet_meta->set_compaction_policy(new_compaction_policy);
877
0
    }
878
0
    auto new_time_series_compaction_goal_size_mbytes =
879
0
            tablet_meta->time_series_compaction_goal_size_mbytes();
880
0
    if (_tablet_meta->time_series_compaction_goal_size_mbytes() !=
881
0
        new_time_series_compaction_goal_size_mbytes) {
882
0
        _tablet_meta->set_time_series_compaction_goal_size_mbytes(
883
0
                new_time_series_compaction_goal_size_mbytes);
884
0
    }
885
0
    auto new_time_series_compaction_file_count_threshold =
886
0
            tablet_meta->time_series_compaction_file_count_threshold();
887
0
    if (_tablet_meta->time_series_compaction_file_count_threshold() !=
888
0
        new_time_series_compaction_file_count_threshold) {
889
0
        _tablet_meta->set_time_series_compaction_file_count_threshold(
890
0
                new_time_series_compaction_file_count_threshold);
891
0
    }
892
0
    auto new_time_series_compaction_time_threshold_seconds =
893
0
            tablet_meta->time_series_compaction_time_threshold_seconds();
894
0
    if (_tablet_meta->time_series_compaction_time_threshold_seconds() !=
895
0
        new_time_series_compaction_time_threshold_seconds) {
896
0
        _tablet_meta->set_time_series_compaction_time_threshold_seconds(
897
0
                new_time_series_compaction_time_threshold_seconds);
898
0
    }
899
0
    auto new_time_series_compaction_empty_rowsets_threshold =
900
0
            tablet_meta->time_series_compaction_empty_rowsets_threshold();
901
0
    if (_tablet_meta->time_series_compaction_empty_rowsets_threshold() !=
902
0
        new_time_series_compaction_empty_rowsets_threshold) {
903
0
        _tablet_meta->set_time_series_compaction_empty_rowsets_threshold(
904
0
                new_time_series_compaction_empty_rowsets_threshold);
905
0
    }
906
0
    auto new_time_series_compaction_level_threshold =
907
0
            tablet_meta->time_series_compaction_level_threshold();
908
0
    if (_tablet_meta->time_series_compaction_level_threshold() !=
909
0
        new_time_series_compaction_level_threshold) {
910
0
        _tablet_meta->set_time_series_compaction_level_threshold(
911
0
                new_time_series_compaction_level_threshold);
912
0
    }
913
914
0
    return Status::OK();
915
0
}
916
917
0
void CloudTablet::build_tablet_report_info(TTabletInfo* tablet_info) {
918
0
    std::shared_lock rdlock(_meta_lock);
919
0
    tablet_info->__set_total_version_count(_tablet_meta->version_count());
920
0
    tablet_info->__set_tablet_id(_tablet_meta->tablet_id());
921
    // Currently, this information will not be used by the cloud report,
922
    // but it may be used in the future.
923
0
}
924
925
} // namespace doris