Coverage Report

Created: 2025-04-21 12:08

/root/doris/be/src/olap/tablet.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <glog/logging.h>
22
23
#include <atomic>
24
#include <cstddef>
25
#include <cstdint>
26
#include <functional>
27
#include <memory>
28
#include <mutex>
29
#include <ostream>
30
#include <set>
31
#include <shared_mutex>
32
#include <string>
33
#include <string_view>
34
#include <utility>
35
#include <vector>
36
37
#include "common/config.h"
38
#include "common/status.h"
39
#include "olap/base_tablet.h"
40
#include "olap/binlog_config.h"
41
#include "olap/data_dir.h"
42
#include "olap/olap_common.h"
43
#include "olap/partial_update_info.h"
44
#include "olap/rowset/rowset.h"
45
#include "olap/rowset/rowset_meta.h"
46
#include "olap/rowset/rowset_reader.h"
47
#include "olap/rowset/segment_v2/segment.h"
48
#include "olap/version_graph.h"
49
#include "segment_loader.h"
50
#include "util/metrics.h"
51
#include "util/once.h"
52
#include "util/slice.h"
53
54
namespace doris {
55
56
class Tablet;
57
class CumulativeCompactionPolicy;
58
class Compaction;
59
class SingleReplicaCompaction;
60
class RowsetWriter;
61
struct RowsetWriterContext;
62
class RowIdConversion;
63
class TTabletInfo;
64
class TabletMetaPB;
65
class TupleDescriptor;
66
class CalcDeleteBitmapToken;
67
enum CompressKind : int;
68
class RowsetBinlogMetasPB;
69
struct TabletTxnInfo;
70
71
namespace io {
72
class RemoteFileSystem;
73
} // namespace io
74
namespace vectorized {
75
class Block;
76
} // namespace vectorized
77
struct RowLocation;
78
enum KeysType : int;
79
enum SortType : int;
80
81
enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL };
82
83
static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = std::chrono::seconds(1);
84
85
struct WriteCooldownMetaExecutors {
86
    WriteCooldownMetaExecutors(size_t executor_nums = 5);
87
88
    void stop();
89
90
    void submit(TabletSharedPtr tablet);
91
5
    size_t _get_executor_pos(int64_t tablet_id) const {
92
5
        return std::hash<int64_t>()(tablet_id) % _executor_nums;
93
5
    };
94
    // Each executor is a mpsc to ensure uploads of the same tablet meta are not concurrent
95
    // FIXME(AlexYue): Use mpsc instead of `ThreadPool` with 1 thread
96
    // We use PriorityThreadPool since it would call status inside it's `shutdown` function.
97
    // Consider one situation where the StackTraceCache's singleton is detructed before
98
    // this WriteCooldownMetaExecutors's singleton, then invoking the status would also call
99
    // StackTraceCache which would then result in heap use after free like #23834
100
    std::vector<std::unique_ptr<PriorityThreadPool>> _executors;
101
    std::unordered_set<int64_t> _pending_tablets;
102
    std::mutex _latch;
103
    size_t _executor_nums;
104
};
105
106
class Tablet final : public BaseTablet {
107
public:
108
    Tablet(StorageEngine& engine, TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
109
           const std::string_view& cumulative_compaction_type = "");
110
111
3.75k
    DataDir* data_dir() const { return _data_dir; }
112
263
    int64_t replica_id() const { return _tablet_meta->replica_id(); }
113
12.1k
    TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); }
114
115
    bool set_tablet_schema_into_rowset_meta();
116
    Status init();
117
    bool init_succeeded();
118
119
    bool is_used();
120
121
    void register_tablet_into_dir();
122
    void deregister_tablet_from_dir();
123
124
    void save_meta();
125
    // Used in clone task, to update local meta when finishing a clone job
126
    Status revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,
127
                              const std::vector<RowsetSharedPtr>& to_delete,
128
                              bool is_incremental_clone);
129
130
    int64_t cumulative_layer_point() const;
131
    void set_cumulative_layer_point(int64_t new_point);
132
    inline int64_t cumulative_promotion_size() const;
133
    inline void set_cumulative_promotion_size(int64_t new_size);
134
135
    // Disk space occupied by tablet, contain local and remote.
136
    size_t tablet_footprint() override;
137
    // Local disk space occupied by tablet.
138
    size_t tablet_local_size();
139
    // Remote disk space occupied by tablet.
140
    size_t tablet_remote_size();
141
142
    size_t num_rows();
143
    int version_count() const;
144
    bool exceed_version_limit(int32_t limit) const override;
145
    int stale_version_count() const;
146
    uint64_t segment_count() const;
147
    Version max_version() const;
148
    Version max_version_unlocked() const;
149
    CumulativeCompactionPolicy* cumulative_compaction_policy();
150
151
    // properties encapsulated in TabletSchema
152
    SortType sort_type() const;
153
    size_t sort_col_num() const;
154
    size_t num_columns() const;
155
    size_t num_null_columns() const;
156
    size_t num_short_key_columns() const;
157
    size_t num_rows_per_row_block() const;
158
    CompressKind compress_kind() const;
159
    double bloom_filter_fpp() const;
160
    size_t next_unique_id() const;
161
    size_t row_size() const;
162
    int64_t avg_rs_meta_serialize_size() const;
163
164
    // operation in rowsets
165
    Status add_rowset(RowsetSharedPtr rowset);
166
    Status create_initial_rowset(const int64_t version);
167
168
    // MUST hold EXCLUSIVE `_meta_lock`.
169
    Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
170
                          std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false);
171
    bool rowset_exists_unlocked(const RowsetSharedPtr& rowset);
172
173
    // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock
174
    // The caller must call hold _meta_lock when call this two function.
175
    const RowsetSharedPtr get_rowset_by_version(const Version& version,
176
                                                bool find_is_stale = false) const;
177
    const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const;
178
179
    const RowsetSharedPtr rowset_with_max_version() const;
180
181
    static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version(
182
            const std::vector<RowsetMetaSharedPtr>& rowset_metas);
183
184
    Status add_inc_rowset(const RowsetSharedPtr& rowset);
185
    /// Delete stale rowset by timing. This delete policy uses now() minutes
186
    /// config::tablet_rowset_expired_stale_sweep_time_sec to compute the deadline of expired rowset
187
    /// to delete.  When rowset is deleted, it will be added to StorageEngine unused map and record
188
    /// need to delete flag.
189
    void delete_expired_stale_rowset();
190
191
    // Given spec_version, find a continuous version path and store it in version_path.
192
    // If quiet is true, then only "does this path exist" is returned.
193
    // If skip_missing_version is true, return ok even there are missing versions.
194
    Status capture_consistent_versions(const Version& spec_version,
195
                                       std::vector<Version>* version_path,
196
                                       bool skip_missing_version, bool quiet) const;
197
    // if quiet is true, no error log will be printed if there are missing versions
198
    Status check_version_integrity(const Version& version, bool quiet = false);
199
    bool check_version_exist(const Version& version) const;
200
    void acquire_version_and_rowsets(
201
            std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) const;
202
203
    Status capture_consistent_rowsets(const Version& spec_version,
204
                                      std::vector<RowsetSharedPtr>* rowsets) const;
205
    // If skip_missing_version is true, skip versions if they are missing.
206
    Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
207
                              bool skip_missing_version) const override;
208
209
    Status capture_rs_readers(const std::vector<Version>& version_path,
210
                              std::vector<RowSetSplits>* rs_splits) const;
211
212
    // meta lock
213
816
    std::shared_mutex& get_header_lock() { return _meta_lock; }
214
2
    std::mutex& get_rowset_update_lock() { return _rowset_update_lock; }
215
28
    std::mutex& get_push_lock() { return _ingest_lock; }
216
19
    std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
217
283
    std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; }
218
0
    std::shared_mutex& get_meta_store_lock() { return _meta_store_lock; }
219
220
46
    std::shared_timed_mutex& get_migration_lock() { return _migration_lock; }
221
222
17
    std::mutex& get_schema_change_lock() { return _schema_change_lock; }
223
224
17
    std::mutex& get_build_inverted_index_lock() { return _build_inverted_index_lock; }
225
226
    // operation for compaction
227
    bool can_do_compaction(size_t path_hash, CompactionType compaction_type);
228
    uint32_t calc_compaction_score(
229
            CompactionType compaction_type,
230
            std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
231
232
    // operation for clone
233
    void calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions);
234
    void calc_missed_versions_unlocked(int64_t spec_version,
235
                                       std::vector<Version>* missed_versions) const;
236
237
    // This function to find max continuous version from the beginning.
238
    // For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target.
239
    // 3 will be saved in "version", and 7 will be saved in "max_version", if max_version != nullptr
240
    void max_continuous_version_from_beginning(Version* version, Version* max_version = nullptr);
241
242
0
    void set_bad(bool is_bad) { _is_bad = is_bad; }
243
244
254
    int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; }
245
0
    void set_last_cumu_compaction_failure_time(int64_t millis) {
246
0
        _last_cumu_compaction_failure_millis = millis;
247
0
    }
248
249
0
    int64_t last_base_compaction_failure_time() { return _last_base_compaction_failure_millis; }
250
0
    void set_last_base_compaction_failure_time(int64_t millis) {
251
0
        _last_base_compaction_failure_millis = millis;
252
0
    }
253
254
0
    int64_t last_full_compaction_failure_time() { return _last_full_compaction_failure_millis; }
255
0
    void set_last_full_compaction_failure_time(int64_t millis) {
256
0
        _last_full_compaction_failure_millis = millis;
257
0
    }
258
259
5
    int64_t last_cumu_compaction_success_time() { return _last_cumu_compaction_success_millis; }
260
3
    void set_last_cumu_compaction_success_time(int64_t millis) {
261
3
        _last_cumu_compaction_success_millis = millis;
262
3
    }
263
264
0
    int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
265
0
    void set_last_base_compaction_success_time(int64_t millis) {
266
0
        _last_base_compaction_success_millis = millis;
267
0
    }
268
269
0
    int64_t last_full_compaction_success_time() { return _last_full_compaction_success_millis; }
270
0
    void set_last_full_compaction_success_time(int64_t millis) {
271
0
        _last_full_compaction_success_millis = millis;
272
0
    }
273
274
0
    int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; }
275
0
    void set_last_base_compaction_schedule_time(int64_t millis) {
276
0
        _last_base_compaction_schedule_millis = millis;
277
0
    }
278
279
0
    void set_last_single_compaction_failure_status(std::string status) {
280
0
        _last_single_compaction_failure_status = std::move(status);
281
0
    }
282
283
0
    void set_last_fetched_version(Version version) { _last_fetched_version = std::move(version); }
284
285
    void delete_all_files();
286
287
    void check_tablet_path_exists();
288
289
    bool check_path(const std::string& check_path) const;
290
291
    TabletInfo get_tablet_info() const;
292
293
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction();
294
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
295
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction();
296
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_build_inverted_index(
297
            const std::set<int64_t>& alter_index_uids, bool is_drop_op);
298
299
    // used for single compaction to get the local versions
300
    // Single compaction does not require remote rowsets and cannot violate the cooldown semantics
301
    std::vector<Version> get_all_local_versions();
302
303
    std::vector<RowsetSharedPtr> pick_first_consecutive_empty_rowsets(int limit);
304
305
    void calculate_cumulative_point();
306
    // TODO(ygl):
307
0
    bool is_primary_replica() { return false; }
308
309
    // return true if the checkpoint is actually done
310
    bool do_tablet_meta_checkpoint();
311
312
    // Check whether the rowset is useful or not, unuseful rowset can be swept up then.
313
    // Rowset which is under tablet's management is useful, i.e. rowset is in
314
    // _rs_version_map, or _stale_rs_version_map.
315
    // Rowset whose version range is not covered by this tablet is also useful.
316
    bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta);
317
318
    void build_tablet_report_info(TTabletInfo* tablet_info,
319
                                  bool enable_consecutive_missing_check = false,
320
                                  bool enable_path_check = false);
321
322
    void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const;
323
    // caller should hold the _meta_lock before calling this method
324
    void generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_meta) const;
325
326
    // return a json string to show the compaction status of this tablet
327
    void get_compaction_status(std::string* json_result);
328
329
    static Status prepare_compaction_and_calculate_permits(CompactionType compaction_type,
330
                                                           const TabletSharedPtr& tablet,
331
                                                           std::shared_ptr<Compaction>& compaction,
332
                                                           int64_t& permits);
333
334
    void execute_compaction(Compaction& compaction);
335
    void execute_single_replica_compaction(SingleReplicaCompaction& compaction);
336
337
    void set_cumulative_compaction_policy(
338
0
            std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
339
0
        _cumulative_compaction_policy = cumulative_compaction_policy;
340
0
    }
341
342
0
    std::shared_ptr<CumulativeCompactionPolicy> get_cumulative_compaction_policy() {
343
0
        return _cumulative_compaction_policy;
344
0
    }
345
346
0
    void set_last_base_compaction_status(std::string status) {
347
0
        _last_base_compaction_status = std::move(status);
348
0
    }
349
350
0
    std::string get_last_base_compaction_status() { return _last_base_compaction_status; }
351
352
    bool should_fetch_from_peer();
353
354
    std::tuple<int64_t, int64_t> get_visible_version_and_time() const;
355
356
548
    void set_visible_version(const std::shared_ptr<const VersionWithTime>& visible_version) {
357
548
        std::atomic_store_explicit(&_visible_version, visible_version, std::memory_order_relaxed);
358
548
    }
359
360
0
    inline bool all_beta() const {
361
0
        std::shared_lock rdlock(_meta_lock);
362
0
        return _tablet_meta->all_beta();
363
0
    }
364
365
0
    const TabletSchemaSPtr& tablet_schema_unlocked() const { return _max_version_schema; }
366
367
    Result<std::unique_ptr<RowsetWriter>> create_rowset_writer(RowsetWriterContext& context,
368
                                                               bool vertical) override;
369
370
    Status create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
371
                                          std::unique_ptr<RowsetWriter>* rowset_writer,
372
                                          std::shared_ptr<PartialUpdateInfo> partial_update_info);
373
    Status create_transient_rowset_writer(RowsetWriterContext& context, const RowsetId& rowset_id,
374
                                          std::unique_ptr<RowsetWriter>* rowset_writer);
375
376
    Status create_rowset(const RowsetMetaSharedPtr& rowset_meta, RowsetSharedPtr* rowset);
377
378
    // MUST hold EXCLUSIVE `_meta_lock`
379
    void add_rowsets(const std::vector<RowsetSharedPtr>& to_add);
380
    // MUST hold EXCLUSIVE `_meta_lock`
381
    Status delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale);
382
383
    // MUST hold SHARED `_meta_lock`
384
476
    const auto& rowset_map() const { return _rs_version_map; }
385
    // MUST hold SHARED `_meta_lock`
386
475
    const auto& stale_rowset_map() const { return _stale_rs_version_map; }
387
388
    ////////////////////////////////////////////////////////////////////////////
389
    // begin cooldown functions
390
    ////////////////////////////////////////////////////////////////////////////
391
22
    int64_t storage_policy_id() const { return _tablet_meta->storage_policy_id(); }
392
4
    void set_storage_policy_id(int64_t id) { _tablet_meta->set_storage_policy_id(id); }
393
394
0
    int64_t last_failed_follow_cooldown_time() const { return _last_failed_follow_cooldown_time; }
395
396
    // Cooldown to remote fs.
397
    Status cooldown(RowsetSharedPtr rowset = nullptr);
398
399
    RowsetSharedPtr pick_cooldown_rowset();
400
401
    RowsetSharedPtr need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
402
403
    struct CooldownConf {
404
        int64_t term = -1;
405
        int64_t cooldown_replica_id = -1;
406
    };
407
408
0
    CooldownConf cooldown_conf() const {
409
0
        std::shared_lock rlock(_cooldown_conf_lock);
410
0
        return _cooldown_conf;
411
0
    }
412
413
0
    CooldownConf cooldown_conf_unlocked() const { return _cooldown_conf; }
414
415
    // Return `true` if update success
416
    bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id);
417
418
    Status remove_all_remote_rowsets();
419
420
    void record_unused_remote_rowset(const RowsetId& rowset_id, const std::string& resource,
421
                                     int64_t num_segments);
422
423
    uint32_t calc_cold_data_compaction_score() const;
424
425
17
    std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; }
426
427
0
    std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }
428
429
    static void async_write_cooldown_meta(TabletSharedPtr tablet);
430
    // Return `ABORTED` if should not to retry again
431
    Status write_cooldown_meta();
432
    ////////////////////////////////////////////////////////////////////////////
433
    // end cooldown functions
434
    ////////////////////////////////////////////////////////////////////////////
435
436
    // Lookup the row location of `encoded_key`, the function sets `row_location` on success.
437
    // NOTE: the method only works in unique key model with primary key index, you will got a
438
    //       not supported error in other data model.
439
    Status lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, bool with_seq_col,
440
                          const std::vector<RowsetSharedPtr>& specified_rowsets,
441
                          RowLocation* row_location, uint32_t version,
442
                          std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
443
                          RowsetSharedPtr* rowset = nullptr, bool with_rowid = true,
444
                          bool is_partial_update = false);
445
446
    // Lookup a row with TupleDescriptor and fill Block
447
    Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location,
448
                           RowsetSharedPtr rowset, const TupleDescriptor* desc,
449
                           OlapReaderStatistics& stats, std::string& values,
450
                           bool write_to_cache = false);
451
452
    Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
453
                                 const std::vector<uint32_t>& rowids,
454
                                 const TabletColumn& tablet_column,
455
                                 vectorized::MutableColumnPtr& dst);
456
457
    // We use the TabletSchema from the caller because the TabletSchema in the rowset'meta
458
    // may be outdated due to schema change. Also note that the the cids should indicate the indexes
459
    // of the columns in the TabletSchema passed in.
460
    Status fetch_value_through_row_column(RowsetSharedPtr input_rowset,
461
                                          const TabletSchema& tablet_schema, uint32_t segid,
462
                                          const std::vector<uint32_t>& rowids,
463
                                          const std::vector<uint32_t>& cids,
464
                                          vectorized::Block& block);
465
466
    // calc delete bitmap when flush memtable, use a fake version to calc
467
    // For example, cur max version is 5, and we use version 6 to calc but
468
    // finally this rowset publish version with 8, we should make up data
469
    // for rowset 6-7. Also, if a compaction happens between commit_txn and
470
    // publish_txn, we should remove compaction input rowsets' delete_bitmap
471
    // and build newly generated rowset's delete_bitmap
472
    Status calc_delete_bitmap(RowsetSharedPtr rowset,
473
                              const std::vector<segment_v2::SegmentSharedPtr>& segments,
474
                              const std::vector<RowsetSharedPtr>& specified_rowsets,
475
                              DeleteBitmapPtr delete_bitmap, int64_t version,
476
                              CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr);
477
478
    std::vector<RowsetSharedPtr> get_rowset_by_ids(const RowsetIdUnorderedSet* specified_rowset_ids,
479
                                                   bool include_stale = false);
480
481
    Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
482
                                      const segment_v2::SegmentSharedPtr& seg,
483
                                      const std::vector<RowsetSharedPtr>& specified_rowsets,
484
                                      DeleteBitmapPtr delete_bitmap, int64_t end_version,
485
                                      RowsetWriter* rowset_writer);
486
487
    Status calc_delete_bitmap_between_segments(
488
            RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
489
            DeleteBitmapPtr delete_bitmap);
490
    Status read_columns_by_plan(TabletSchemaSPtr tablet_schema, std::vector<uint32_t> cids_to_read,
491
                                const PartialUpdateReadPlan& read_plan,
492
                                const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
493
                                vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index,
494
                                bool force_read_old_delete_signs,
495
                                const signed char* __restrict skip_map = nullptr);
496
    void prepare_to_read(const RowLocation& row_location, size_t pos,
497
                         PartialUpdateReadPlan* read_plan);
498
    Status generate_new_block_for_partial_update(
499
            TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
500
            const PartialUpdateReadPlan& read_plan_ori,
501
            const PartialUpdateReadPlan& read_plan_update,
502
            const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
503
            vectorized::Block* output_block);
504
505
    Status update_delete_bitmap_without_lock(
506
            const RowsetSharedPtr& rowset,
507
            const std::vector<RowsetSharedPtr>* specified_base_rowsets = nullptr);
508
509
    Status commit_phase_update_delete_bitmap(
510
            const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids,
511
            DeleteBitmapPtr delete_bitmap,
512
            const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id,
513
            CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr);
514
515
    Status update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id);
516
    void calc_compaction_output_rowset_delete_bitmap(
517
            const std::vector<RowsetSharedPtr>& input_rowsets,
518
            const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version,
519
            std::set<RowLocation>* missed_rows,
520
            std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map,
521
            const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap);
522
    void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
523
    Status check_rowid_conversion(
524
            RowsetSharedPtr dst_rowset,
525
            const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
526
                    location_map);
527
    Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const;
528
    Status sort_block(vectorized::Block& in_block, vectorized::Block& output_block);
529
530
    bool check_all_rowset_segment();
531
532
    void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
533
534
    void set_skip_compaction(bool skip,
535
                             CompactionType compaction_type = CompactionType::CUMULATIVE_COMPACTION,
536
                             int64_t start = -1);
537
    bool should_skip_compaction(CompactionType compaction_type, int64_t now);
538
539
    RowsetSharedPtr get_rowset(const RowsetId& rowset_id);
540
541
    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
542
32
                          bool include_stale = false) {
543
32
        std::shared_lock rlock(_meta_lock);
544
84
        for (auto& [v, rs] : _rs_version_map) {
545
84
            visitor(rs);
546
84
        }
547
32
        if (!include_stale) return;
548
31
        for (auto& [v, rs] : _stale_rs_version_map) {
549
0
            visitor(rs);
550
0
        }
551
31
    }
552
553
    std::vector<std::string> get_binlog_filepath(std::string_view binlog_version) const;
554
    std::pair<std::string, int64_t> get_binlog_info(std::string_view binlog_version) const;
555
    std::string get_rowset_binlog_meta(std::string_view binlog_version,
556
                                       std::string_view rowset_id) const;
557
    Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
558
                                   RowsetBinlogMetasPB* metas_pb);
559
    Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb);
560
    std::string get_segment_filepath(std::string_view rowset_id,
561
                                     std::string_view segment_index) const;
562
    std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
563
    std::string get_segment_index_filepath(std::string_view rowset_id,
564
                                           std::string_view segment_index,
565
                                           std::string_view index_id) const;
566
    std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
567
                                           int64_t index_id) const;
568
    bool can_add_binlog(uint64_t total_binlog_size) const;
569
    void gc_binlogs(int64_t version);
570
    Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
571
572
0
    inline void report_error(const Status& st) {
573
0
        if (st.is<ErrorCode::IO_ERROR>()) {
574
0
            ++_io_error_times;
575
0
        } else if (st.is<ErrorCode::CORRUPTION>()) {
576
0
            _io_error_times = config::max_tablet_io_errors + 1;
577
0
        }
578
0
    }
579
580
0
    inline int64_t get_io_error_times() const { return _io_error_times; }
581
582
0
    inline bool is_io_error_too_times() const {
583
0
        return config::max_tablet_io_errors > 0 && _io_error_times >= config::max_tablet_io_errors;
584
0
    }
585
586
0
    int64_t get_table_id() { return _tablet_meta->table_id(); }
587
588
    // binlog releated functions
589
    bool is_enable_binlog();
590
0
    bool is_binlog_enabled() { return _tablet_meta->binlog_config().is_enable(); }
591
0
    int64_t binlog_ttl_ms() const { return _tablet_meta->binlog_config().ttl_seconds(); }
592
0
    int64_t binlog_max_bytes() const { return _tablet_meta->binlog_config().max_bytes(); }
593
594
    void set_binlog_config(BinlogConfig binlog_config);
595
    void add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap,
596
                                            const RowsetIdUnorderedSet& rowsetids);
597
    Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version,
598
                                           int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
599
                                           std::vector<RowsetSharedPtr>* rowsets = nullptr);
600
    Status _get_segment_column_iterator(
601
            const BetaRowsetSharedPtr& rowset, uint32_t segid, const TabletColumn& target_column,
602
            SegmentCacheHandle* segment_cache_handle,
603
            std::unique_ptr<segment_v2::ColumnIterator>* column_iterator,
604
            OlapReaderStatistics* stats);
605
0
    void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
606
0
    bool is_alter_failed() { return _alter_failed; }
607
608
0
    void set_is_full_compaction_running(bool is_full_compaction_running) {
609
0
        _is_full_compaction_running = is_full_compaction_running;
610
0
    }
611
0
    inline bool is_full_compaction_running() const { return _is_full_compaction_running; }
612
    void clear_cache();
613
    Status calc_local_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version,
614
                               int32_t* rowset_count, int64_t* file_count);
615
    Status show_nested_index_file(std::string* json_meta);
616
617
private:
618
    Status _init_once_action();
619
    void _print_missed_versions(const std::vector<Version>& missed_versions) const;
620
    bool _contains_rowset(const RowsetId rowset_id);
621
    Status _contains_version(const Version& version);
622
623
    // Returns:
624
    // version: the max continuous version from beginning
625
    // max_version: the max version of this tablet
626
    void _max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version,
627
                                                         bool* has_version_cross) const;
628
    RowsetSharedPtr _rowset_with_largest_size();
629
    /// Delete stale rowset by version. This method not only delete the version in expired rowset map,
630
    /// but also delete the version in rowset meta vector.
631
    void _delete_stale_rowset_by_version(const Version& version);
632
    Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path,
633
                                                std::vector<RowsetSharedPtr>* rowsets) const;
634
635
    uint32_t _calc_cumulative_compaction_score(
636
            std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
637
    uint32_t _calc_base_compaction_score() const;
638
639
    // When the proportion of empty edges in the adjacency matrix used to represent the version graph
640
    // in the version tracker is greater than the threshold, rebuild the version tracker
641
    bool _reconstruct_version_tracker_if_necessary();
642
643
    std::vector<RowsetSharedPtr> _pick_visible_rowsets_to_compaction(int64_t min_start_version,
644
                                                                     int64_t max_start_version);
645
646
    void _init_context_common_fields(RowsetWriterContext& context);
647
648
    void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre,
649
                                RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del);
650
    Status _load_rowset_segments(const RowsetSharedPtr& rowset,
651
                                 std::vector<segment_v2::SegmentSharedPtr>* segments);
652
653
    ////////////////////////////////////////////////////////////////////////////
654
    // begin cooldown functions
655
    ////////////////////////////////////////////////////////////////////////////
656
    Status _cooldown_data(RowsetSharedPtr rowset);
657
    Status _follow_cooldowned_data();
658
    Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
659
                               TabletMetaPB* tablet_meta_pb);
660
    bool _has_data_to_cooldown();
661
    int64_t _get_newest_cooldown_time(const RowsetSharedPtr& rowset);
662
    ////////////////////////////////////////////////////////////////////////////
663
    // end cooldown functions
664
    ////////////////////////////////////////////////////////////////////////////
665
666
    void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap);
667
    std::string _get_rowset_info_str(RowsetSharedPtr rowset, bool delete_flag);
668
669
    void _clear_cache_by_rowset(const BetaRowsetSharedPtr& rowset);
670
671
public:
672
    static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
673
674
private:
675
    StorageEngine& _engine;
676
    DataDir* _data_dir = nullptr;
677
    TimestampedVersionTracker _timestamped_version_tracker;
678
679
    DorisCallOnce<Status> _init_once;
680
    // meta store lock is used for prevent 2 threads do checkpoint concurrently
681
    // it will be used in econ-mode in the future
682
    std::shared_mutex _meta_store_lock;
683
    std::mutex _ingest_lock;
684
    std::mutex _base_compaction_lock;
685
    std::mutex _cumulative_compaction_lock;
686
    std::mutex _schema_change_lock;
687
    std::shared_timed_mutex _migration_lock;
688
    std::mutex _build_inverted_index_lock;
689
690
    // In unique key table with MoW, we should guarantee that only one
691
    // writer can update rowset and delete bitmap at the same time.
692
    // We use a separate lock rather than _meta_lock, to avoid blocking read queries
693
    // during publish_txn, which might take hundreds of milliseconds
694
    mutable std::mutex _rowset_update_lock;
695
696
    // After version 0.13, all newly created rowsets are saved in _rs_version_map.
697
    // And if rowset being compacted, the old rowsetis will be saved in _stale_rs_version_map;
698
    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map;
699
    // This variable _stale_rs_version_map is used to record these rowsets which are be compacted.
700
    // These _stale rowsets are been removed when rowsets' pathVersion is expired,
701
    // this policy is judged and computed by TimestampedVersionTracker.
702
    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;
703
    // if this tablet is broken, set to true. default is false
704
    std::atomic<bool> _is_bad;
705
    // timestamp of last cumu compaction failure
706
    std::atomic<int64_t> _last_cumu_compaction_failure_millis;
707
    // timestamp of last base compaction failure
708
    std::atomic<int64_t> _last_base_compaction_failure_millis;
709
    // timestamp of last full compaction failure
710
    std::atomic<int64_t> _last_full_compaction_failure_millis;
711
    // timestamp of last cumu compaction success
712
    std::atomic<int64_t> _last_cumu_compaction_success_millis;
713
    // timestamp of last base compaction success
714
    std::atomic<int64_t> _last_base_compaction_success_millis;
715
    // timestamp of last full compaction success
716
    std::atomic<int64_t> _last_full_compaction_success_millis;
717
    // timestamp of last base compaction schedule time
718
    std::atomic<int64_t> _last_base_compaction_schedule_millis;
719
    std::atomic<int64_t> _cumulative_point;
720
    std::atomic<int64_t> _cumulative_promotion_size;
721
    std::atomic<int32_t> _newly_created_rowset_num;
722
    std::atomic<int64_t> _last_checkpoint_time;
723
    std::string _last_base_compaction_status;
724
725
    // single replica compaction status
726
    std::string _last_single_compaction_failure_status;
727
    Version _last_fetched_version;
728
729
    // cumulative compaction policy
730
    std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
731
    std::string_view _cumulative_compaction_type;
732
733
    // use a separate thread to check all tablets paths existence
734
    std::atomic<bool> _is_tablet_path_exists;
735
736
    int64_t _last_missed_version;
737
    int64_t _last_missed_time_s;
738
739
    bool _skip_cumu_compaction = false;
740
    int64_t _skip_cumu_compaction_ts;
741
742
    bool _skip_base_compaction = false;
743
    int64_t _skip_base_compaction_ts;
744
745
    // cooldown related
746
    CooldownConf _cooldown_conf;
747
    // `_cooldown_conf_lock` is used to serialize update cooldown conf and all operations that:
748
    // 1. read cooldown conf
749
    // 2. upload rowsets to remote storage
750
    // 3. update cooldown meta id
751
    mutable std::shared_mutex _cooldown_conf_lock;
752
    // `_cold_compaction_lock` is used to serialize cold data compaction and all operations that
753
    // may delete compaction input rowsets.
754
    std::mutex _cold_compaction_lock;
755
    int64_t _last_failed_follow_cooldown_time = 0;
756
    // `_alter_failed` is used to indicate whether the tablet failed to perform a schema change
757
    std::atomic<bool> _alter_failed = false;
758
759
    int64_t _io_error_times = 0;
760
761
    // partition's visible version. it sync from fe, but not real-time.
762
    std::shared_ptr<const VersionWithTime> _visible_version;
763
764
    std::atomic_bool _is_full_compaction_running = false;
765
};
766
767
10
inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
768
10
    return _cumulative_compaction_policy.get();
769
10
}
770
771
268
inline bool Tablet::init_succeeded() {
772
268
    return _init_once.has_called() && _init_once.stored_result().ok();
773
268
}
774
775
256
inline bool Tablet::is_used() {
776
256
    return !_is_bad && _data_dir->is_used();
777
256
}
778
779
297
inline void Tablet::register_tablet_into_dir() {
780
297
    _data_dir->register_tablet(this);
781
297
}
782
783
251
inline void Tablet::deregister_tablet_from_dir() {
784
251
    _data_dir->deregister_tablet(this);
785
251
}
786
787
287
inline int64_t Tablet::cumulative_layer_point() const {
788
287
    return _cumulative_point;
789
287
}
790
791
324
inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
792
    // cumulative point should only be reset to -1, or be increased
793
324
    CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
794
0
            << "Unexpected cumulative point: " << new_point
795
0
            << ", origin: " << _cumulative_point.load();
796
324
    _cumulative_point = new_point;
797
324
}
798
799
20
inline int64_t Tablet::cumulative_promotion_size() const {
800
20
    return _cumulative_promotion_size;
801
20
}
802
803
297
inline void Tablet::set_cumulative_promotion_size(int64_t new_size) {
804
297
    _cumulative_promotion_size = new_size;
805
297
}
806
807
// TODO(lingbin): Why other methods that need to get information from _tablet_meta
808
// are not locked, here needs a comment to explain.
809
8
inline size_t Tablet::tablet_footprint() {
810
8
    std::shared_lock rdlock(_meta_lock);
811
8
    return _tablet_meta->tablet_footprint();
812
8
}
813
814
0
inline size_t Tablet::tablet_local_size() {
815
0
    std::shared_lock rdlock(_meta_lock);
816
0
    return _tablet_meta->tablet_local_size();
817
0
}
818
819
0
inline size_t Tablet::tablet_remote_size() {
820
0
    std::shared_lock rdlock(_meta_lock);
821
0
    return _tablet_meta->tablet_remote_size();
822
0
}
823
824
// TODO(lingbin): Why other methods which need to get information from _tablet_meta
825
// are not locked, here needs a comment to explain.
826
14
inline size_t Tablet::num_rows() {
827
14
    std::shared_lock rdlock(_meta_lock);
828
14
    return _tablet_meta->num_rows();
829
14
}
830
831
24
inline int Tablet::version_count() const {
832
24
    std::shared_lock rdlock(_meta_lock);
833
24
    return _tablet_meta->version_count();
834
24
}
835
836
24
inline int Tablet::stale_version_count() const {
837
24
    std::shared_lock rdlock(_meta_lock);
838
24
    return _tablet_meta->stale_version_count();
839
24
}
840
841
29
inline Version Tablet::max_version() const {
842
29
    std::shared_lock rdlock(_meta_lock);
843
29
    return _tablet_meta->max_version();
844
29
}
845
846
0
inline uint64_t Tablet::segment_count() const {
847
0
    std::shared_lock rdlock(_meta_lock);
848
0
    uint64_t segment_nums = 0;
849
0
    for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
850
0
        segment_nums += rs_meta->num_segments();
851
0
    }
852
0
    return segment_nums;
853
0
}
854
855
34
inline Version Tablet::max_version_unlocked() const {
856
34
    return _tablet_meta->max_version();
857
34
}
858
859
0
inline SortType Tablet::sort_type() const {
860
0
    return _tablet_meta->tablet_schema()->sort_type();
861
0
}
862
863
0
inline size_t Tablet::sort_col_num() const {
864
0
    return _tablet_meta->tablet_schema()->sort_col_num();
865
0
}
866
867
0
inline size_t Tablet::num_columns() const {
868
0
    return _tablet_meta->tablet_schema()->num_columns();
869
0
}
870
871
0
inline size_t Tablet::num_null_columns() const {
872
0
    return _tablet_meta->tablet_schema()->num_null_columns();
873
0
}
874
875
0
inline size_t Tablet::num_short_key_columns() const {
876
0
    return _tablet_meta->tablet_schema()->num_short_key_columns();
877
0
}
878
879
0
inline size_t Tablet::num_rows_per_row_block() const {
880
0
    return _tablet_meta->tablet_schema()->num_rows_per_row_block();
881
0
}
882
883
0
inline CompressKind Tablet::compress_kind() const {
884
0
    return _tablet_meta->tablet_schema()->compress_kind();
885
0
}
886
887
0
inline double Tablet::bloom_filter_fpp() const {
888
0
    return _tablet_meta->tablet_schema()->bloom_filter_fpp();
889
0
}
890
891
0
inline size_t Tablet::next_unique_id() const {
892
0
    return _tablet_meta->tablet_schema()->next_column_unique_id();
893
0
}
894
895
0
inline size_t Tablet::row_size() const {
896
0
    return _tablet_meta->tablet_schema()->row_size();
897
0
}
898
899
24
inline int64_t Tablet::avg_rs_meta_serialize_size() const {
900
24
    return _tablet_meta->avg_rs_meta_serialize_size();
901
24
}
902
903
} // namespace doris