Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/olap/tablet.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <glog/logging.h>
22
#include <stddef.h>
23
#include <stdint.h>
24
25
#include <atomic>
26
#include <functional>
27
#include <limits>
28
#include <list>
29
#include <map>
30
#include <memory>
31
#include <mutex>
32
#include <ostream>
33
#include <set>
34
#include <shared_mutex>
35
#include <string>
36
#include <string_view>
37
#include <unordered_map>
38
#include <utility>
39
#include <vector>
40
41
#include "common/config.h"
42
#include "common/status.h"
43
#include "olap/base_tablet.h"
44
#include "olap/binlog_config.h"
45
#include "olap/data_dir.h"
46
#include "olap/olap_common.h"
47
#include "olap/partial_update_info.h"
48
#include "olap/rowset/rowset.h"
49
#include "olap/rowset/rowset_meta.h"
50
#include "olap/rowset/rowset_reader.h"
51
#include "olap/rowset/segment_v2/segment.h"
52
#include "olap/tablet_meta.h"
53
#include "olap/tablet_schema.h"
54
#include "olap/version_graph.h"
55
#include "segment_loader.h"
56
#include "util/metrics.h"
57
#include "util/once.h"
58
#include "util/slice.h"
59
60
namespace doris {
61
62
class Tablet;
63
class CumulativeCompactionPolicy;
64
class Compaction;
65
class SingleReplicaCompaction;
66
class RowsetWriter;
67
struct TabletTxnInfo;
68
struct RowsetWriterContext;
69
class RowIdConversion;
70
class TTabletInfo;
71
class TabletMetaPB;
72
class TupleDescriptor;
73
class CalcDeleteBitmapToken;
74
enum CompressKind : int;
75
class RowsetBinlogMetasPB;
76
struct TabletTxnInfo;
77
78
namespace io {
79
class RemoteFileSystem;
80
} // namespace io
81
namespace vectorized {
82
class Block;
83
} // namespace vectorized
84
struct RowLocation;
85
enum KeysType : int;
86
enum SortType : int;
87
88
using TabletSharedPtr = std::shared_ptr<Tablet>;
89
90
enum TabletStorageType { STORAGE_TYPE_LOCAL, STORAGE_TYPE_REMOTE, STORAGE_TYPE_REMOTE_AND_LOCAL };
91
92
static inline constexpr auto TRACE_TABLET_LOCK_THRESHOLD = std::chrono::seconds(1);
93
94
class Tablet : public BaseTablet {
95
public:
96
    static TabletSharedPtr create_tablet_from_meta(TabletMetaSharedPtr tablet_meta,
97
                                                   DataDir* data_dir = nullptr);
98
99
    Tablet(TabletMetaSharedPtr tablet_meta, DataDir* data_dir,
100
           const std::string_view& cumulative_compaction_type = "");
101
102
    Status init();
103
    bool init_succeeded();
104
105
    bool is_used();
106
107
    void register_tablet_into_dir();
108
    void deregister_tablet_from_dir();
109
110
    void save_meta();
111
    // Used in clone task, to update local meta when finishing a clone job
112
    Status revise_tablet_meta(const std::vector<RowsetSharedPtr>& to_add,
113
                              const std::vector<RowsetSharedPtr>& to_delete,
114
                              bool is_incremental_clone);
115
116
    int64_t cumulative_layer_point() const;
117
    void set_cumulative_layer_point(int64_t new_point);
118
    inline int64_t cumulative_promotion_size() const;
119
    inline void set_cumulative_promotion_size(int64_t new_size);
120
121
    // Disk space occupied by tablet, contain local and remote.
122
    size_t tablet_footprint();
123
    // Local disk space occupied by tablet.
124
    size_t tablet_local_size();
125
    // Remote disk space occupied by tablet.
126
    size_t tablet_remote_size();
127
128
    size_t num_rows();
129
    int version_count() const;
130
    bool exceed_version_limit(int32_t limit) const;
131
    int stale_version_count() const;
132
    uint64_t segment_count() const;
133
    Version max_version() const;
134
    Version max_version_unlocked() const;
135
    CumulativeCompactionPolicy* cumulative_compaction_policy();
136
    bool enable_unique_key_merge_on_write() const;
137
138
    // properties encapsulated in TabletSchema
139
    KeysType keys_type() const;
140
    SortType sort_type() const;
141
    size_t sort_col_num() const;
142
    size_t num_columns() const;
143
    size_t num_null_columns() const;
144
    size_t num_key_columns() const;
145
    size_t num_short_key_columns() const;
146
    size_t num_rows_per_row_block() const;
147
    CompressKind compress_kind() const;
148
    double bloom_filter_fpp() const;
149
    size_t next_unique_id() const;
150
    size_t row_size() const;
151
    int64_t avg_rs_meta_serialize_size() const;
152
153
    // operation in rowsets
154
    Status add_rowset(RowsetSharedPtr rowset);
155
    Status create_initial_rowset(const int64_t version);
156
157
    // MUST hold EXCLUSIVE `_meta_lock`.
158
    Status modify_rowsets(std::vector<RowsetSharedPtr>& to_add,
159
                          std::vector<RowsetSharedPtr>& to_delete, bool check_delete = false);
160
161
    // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock
162
    // The caller must call hold _meta_lock when call this two function.
163
    const RowsetSharedPtr get_rowset_by_version(const Version& version,
164
                                                bool find_is_stale = false) const;
165
    const RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const;
166
167
    const RowsetSharedPtr rowset_with_max_version() const;
168
169
    static RowsetMetaSharedPtr rowset_meta_with_max_schema_version(
170
            const std::vector<RowsetMetaSharedPtr>& rowset_metas);
171
172
    Status add_inc_rowset(const RowsetSharedPtr& rowset);
173
    /// Delete stale rowset by timing. This delete policy uses now() minutes
174
    /// config::tablet_rowset_expired_stale_sweep_time_sec to compute the deadline of expired rowset
175
    /// to delete.  When rowset is deleted, it will be added to StorageEngine unused map and record
176
    /// need to delete flag.
177
    void delete_expired_stale_rowset();
178
179
    // Given spec_version, find a continuous version path and store it in version_path.
180
    // If quiet is true, then only "does this path exist" is returned.
181
    // If skip_missing_version is true, return ok even there are missing versions.
182
    Status capture_consistent_versions(const Version& spec_version,
183
                                       std::vector<Version>* version_path,
184
                                       bool skip_missing_version, bool quiet = false) const;
185
    // if quiet is true, no error log will be printed if there are missing versions
186
    Status check_version_integrity(const Version& version, bool quiet = false);
187
    bool check_version_exist(const Version& version) const;
188
    void acquire_version_and_rowsets(
189
            std::vector<std::pair<Version, RowsetSharedPtr>>* version_rowsets) const;
190
191
    Status capture_consistent_rowsets(const Version& spec_version,
192
                                      std::vector<RowsetSharedPtr>* rowsets) const;
193
    // If skip_missing_version is true, skip versions if they are missing.
194
    Status capture_rs_readers(const Version& spec_version, std::vector<RowSetSplits>* rs_splits,
195
                              bool skip_missing_version) const;
196
197
    Status capture_rs_readers(const std::vector<Version>& version_path,
198
                              std::vector<RowSetSplits>* rs_splits) const;
199
200
    // meta lock
201
64
    std::shared_mutex& get_header_lock() { return _meta_lock; }
202
1
    std::mutex& get_rowset_update_lock() { return _rowset_update_lock; }
203
12
    std::mutex& get_push_lock() { return _ingest_lock; }
204
2
    std::mutex& get_base_compaction_lock() { return _base_compaction_lock; }
205
4
    std::mutex& get_cumulative_compaction_lock() { return _cumulative_compaction_lock; }
206
207
13
    std::shared_timed_mutex& get_migration_lock() { return _migration_lock; }
208
209
0
    std::mutex& get_schema_change_lock() { return _schema_change_lock; }
210
211
0
    std::mutex& get_build_inverted_index_lock() { return _build_inverted_index_lock; }
212
213
    // operation for compaction
214
    bool can_do_compaction(size_t path_hash, CompactionType compaction_type);
215
    uint32_t calc_compaction_score(
216
            CompactionType compaction_type,
217
            std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
218
219
    // operation for clone
220
    void calc_missed_versions(int64_t spec_version, std::vector<Version>* missed_versions);
221
    void calc_missed_versions_unlocked(int64_t spec_version,
222
                                       std::vector<Version>* missed_versions) const;
223
224
    // This function to find max continuous version from the beginning.
225
    // For example: If there are 1, 2, 3, 5, 6, 7 versions belongs tablet, then 3 is target.
226
    // 3 will be saved in "version", and 7 will be saved in "max_version", if max_version != nullptr
227
    void max_continuous_version_from_beginning(Version* version, Version* max_version = nullptr);
228
229
0
    void set_bad(bool is_bad) { _is_bad = is_bad; }
230
231
1
    int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; }
232
1
    void set_last_cumu_compaction_failure_time(int64_t millis) {
233
1
        _last_cumu_compaction_failure_millis = millis;
234
1
    }
235
236
0
    int64_t last_base_compaction_failure_time() { return _last_base_compaction_failure_millis; }
237
0
    void set_last_base_compaction_failure_time(int64_t millis) {
238
0
        _last_base_compaction_failure_millis = millis;
239
0
    }
240
241
0
    int64_t last_full_compaction_failure_time() { return _last_full_compaction_failure_millis; }
242
0
    void set_last_full_compaction_failure_time(int64_t millis) {
243
0
        _last_full_compaction_failure_millis = millis;
244
0
    }
245
246
5
    int64_t last_cumu_compaction_success_time() { return _last_cumu_compaction_success_millis; }
247
3
    void set_last_cumu_compaction_success_time(int64_t millis) {
248
3
        _last_cumu_compaction_success_millis = millis;
249
3
    }
250
251
1
    int64_t last_base_compaction_success_time() { return _last_base_compaction_success_millis; }
252
1
    void set_last_base_compaction_success_time(int64_t millis) {
253
1
        _last_base_compaction_success_millis = millis;
254
1
    }
255
256
0
    int64_t last_full_compaction_success_time() { return _last_full_compaction_success_millis; }
257
0
    void set_last_full_compaction_success_time(int64_t millis) {
258
0
        _last_full_compaction_success_millis = millis;
259
0
    }
260
261
0
    int64_t last_base_compaction_schedule_time() { return _last_base_compaction_schedule_millis; }
262
0
    void set_last_base_compaction_schedule_time(int64_t millis) {
263
0
        _last_base_compaction_schedule_millis = millis;
264
0
    }
265
266
0
    void set_last_single_compaction_failure_status(std::string status) {
267
0
        _last_single_compaction_failure_status = std::move(status);
268
0
    }
269
270
0
    void set_last_fetched_version(Version version) { _last_fetched_version = std::move(version); }
271
272
    void delete_all_files();
273
274
    void check_tablet_path_exists();
275
276
    bool check_path(const std::string& check_path) const;
277
    bool check_rowset_id(const RowsetId& rowset_id);
278
279
    TabletInfo get_tablet_info() const;
280
281
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_cumulative_compaction();
282
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_base_compaction();
283
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_full_compaction();
284
    std::vector<RowsetSharedPtr> pick_candidate_rowsets_to_build_inverted_index(
285
            const std::set<int64_t>& alter_index_uids, bool is_drop_op);
286
287
    // used for single compaction to get the local versions
288
    // Single compaction does not require remote rowsets and cannot violate the cooldown semantics
289
    std::vector<Version> get_all_local_versions();
290
291
    std::vector<RowsetSharedPtr> pick_first_consecutive_empty_rowsets(int limit);
292
293
    void calculate_cumulative_point();
294
    // TODO(ygl):
295
0
    bool is_primary_replica() { return false; }
296
297
    // TODO(ygl):
298
    // eco mode means power saving in new energy car
299
    // eco mode also means save money in palo
300
0
    bool in_eco_mode() { return false; }
301
302
    // return true if the checkpoint is actually done
303
    bool do_tablet_meta_checkpoint();
304
305
    // Check whether the rowset is useful or not, unuseful rowset can be swept up then.
306
    // Rowset which is under tablet's management is useful, i.e. rowset is in
307
    // _rs_version_map, or _stale_rs_version_map.
308
    // Rowset whose version range is not covered by this tablet is also useful.
309
    bool rowset_meta_is_useful(RowsetMetaSharedPtr rowset_meta);
310
311
    void build_tablet_report_info(TTabletInfo* tablet_info,
312
                                  bool enable_consecutive_missing_check = false,
313
                                  bool enable_path_check = false);
314
315
    void generate_tablet_meta_copy(TabletMetaSharedPtr new_tablet_meta) const;
316
    // caller should hold the _meta_lock before calling this method
317
    void generate_tablet_meta_copy_unlocked(TabletMetaSharedPtr new_tablet_meta) const;
318
319
    // return a json string to show the compaction status of this tablet
320
    void get_compaction_status(std::string* json_result);
321
322
    static Status prepare_compaction_and_calculate_permits(CompactionType compaction_type,
323
                                                           const TabletSharedPtr& tablet,
324
                                                           std::shared_ptr<Compaction>& compaction,
325
                                                           int64_t& permits);
326
327
    void execute_compaction(Compaction& compaction);
328
    void execute_single_replica_compaction(SingleReplicaCompaction& compaction);
329
330
    void set_cumulative_compaction_policy(
331
0
            std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy) {
332
0
        _cumulative_compaction_policy = cumulative_compaction_policy;
333
0
    }
334
335
0
    std::shared_ptr<CumulativeCompactionPolicy> get_cumulative_compaction_policy() {
336
0
        return _cumulative_compaction_policy;
337
0
    }
338
339
0
    void set_last_base_compaction_status(std::string status) {
340
0
        _last_base_compaction_status = std::move(status);
341
0
    }
342
343
0
    std::string get_last_base_compaction_status() { return _last_base_compaction_status; }
344
345
    bool should_fetch_from_peer();
346
347
0
    inline bool all_beta() const {
348
0
        std::shared_lock rdlock(_meta_lock);
349
0
        return _tablet_meta->all_beta();
350
0
    }
351
352
    TabletSchemaSPtr tablet_schema() const override;
353
354
0
    const TabletSchemaSPtr& tablet_schema_unlocked() const { return _max_version_schema; }
355
356
    // Find the related rowset with specified version and return its tablet schema
357
0
    TabletSchemaSPtr tablet_schema(Version version) const {
358
0
        return _tablet_meta->tablet_schema(version);
359
0
    }
360
361
    Status create_rowset_writer(RowsetWriterContext& context,
362
                                std::unique_ptr<RowsetWriter>* rowset_writer);
363
364
    Status create_transient_rowset_writer(RowsetSharedPtr rowset_ptr,
365
                                          std::unique_ptr<RowsetWriter>* rowset_writer,
366
                                          std::shared_ptr<PartialUpdateInfo> partial_update_info);
367
    Status create_transient_rowset_writer(RowsetWriterContext& context, const RowsetId& rowset_id,
368
                                          std::unique_ptr<RowsetWriter>* rowset_writer);
369
370
    Status create_vertical_rowset_writer(RowsetWriterContext& context,
371
                                         std::unique_ptr<RowsetWriter>* rowset_writer);
372
373
    Status create_rowset(const RowsetMetaSharedPtr& rowset_meta, RowsetSharedPtr* rowset);
374
375
    // MUST hold EXCLUSIVE `_meta_lock`
376
    void add_rowsets(const std::vector<RowsetSharedPtr>& to_add);
377
    // MUST hold EXCLUSIVE `_meta_lock`
378
    void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete, bool move_to_stale);
379
380
    // MUST hold SHARED `_meta_lock`
381
27
    const auto& rowset_map() const { return _rs_version_map; }
382
    // MUST hold SHARED `_meta_lock`
383
26
    const auto& stale_rowset_map() const { return _stale_rs_version_map; }
384
385
    ////////////////////////////////////////////////////////////////////////////
386
    // begin cooldown functions
387
    ////////////////////////////////////////////////////////////////////////////
388
0
    int64_t last_failed_follow_cooldown_time() const { return _last_failed_follow_cooldown_time; }
389
390
    // Cooldown to remote fs.
391
    Status cooldown();
392
393
    RowsetSharedPtr pick_cooldown_rowset();
394
395
    bool need_cooldown(int64_t* cooldown_timestamp, size_t* file_size);
396
397
0
    std::pair<int64_t, int64_t> cooldown_conf() const {
398
0
        std::shared_lock rlock(_cooldown_conf_lock);
399
0
        return {_cooldown_replica_id, _cooldown_term};
400
0
    }
401
402
0
    std::pair<int64_t, int64_t> cooldown_conf_unlocked() const {
403
0
        return {_cooldown_replica_id, _cooldown_term};
404
0
    }
405
406
    // Return `true` if update success
407
    bool update_cooldown_conf(int64_t cooldown_term, int64_t cooldown_replica_id);
408
409
    Status remove_all_remote_rowsets();
410
411
    void record_unused_remote_rowset(const RowsetId& rowset_id, const std::string& resource,
412
                                     int64_t num_segments);
413
414
    static void remove_unused_remote_files();
415
416
    // If a rowset is to be written to remote filesystem, MUST add it to `pending_remote_rowsets` before uploading,
417
    // and then erase it from `pending_remote_rowsets` after it has been insert to the Tablet.
418
    // `remove_unused_remote_files` MUST NOT delete files of these pending rowsets.
419
    static void add_pending_remote_rowset(std::string rowset_id);
420
    static void erase_pending_remote_rowset(const std::string& rowset_id);
421
422
    uint32_t calc_cold_data_compaction_score() const;
423
424
0
    std::mutex& get_cold_compaction_lock() { return _cold_compaction_lock; }
425
426
0
    std::shared_mutex& get_cooldown_conf_lock() { return _cooldown_conf_lock; }
427
428
    static void async_write_cooldown_meta(TabletSharedPtr tablet);
429
    // Return `ABORTED` if should not to retry again
430
    Status write_cooldown_meta();
431
    ////////////////////////////////////////////////////////////////////////////
432
    // end cooldown functions
433
    ////////////////////////////////////////////////////////////////////////////
434
435
    // Lookup the row location of `encoded_key`, the function sets `row_location` on success.
436
    // NOTE: the method only works in unique key model with primary key index, you will got a
437
    //       not supported error in other data model.
438
    Status lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, bool with_seq_col,
439
                          const std::vector<RowsetSharedPtr>& specified_rowsets,
440
                          RowLocation* row_location, uint32_t version,
441
                          std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
442
                          RowsetSharedPtr* rowset = nullptr);
443
444
    // Lookup a row with TupleDescriptor and fill Block
445
    Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location,
446
                           RowsetSharedPtr rowset, const TupleDescriptor* desc,
447
                           OlapReaderStatistics& stats, std::string& values,
448
                           bool write_to_cache = false);
449
450
    Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
451
                                 const std::vector<uint32_t>& rowids,
452
                                 const TabletColumn& tablet_column,
453
                                 vectorized::MutableColumnPtr& dst);
454
455
    // We use the TabletSchema from the caller because the TabletSchema in the rowset'meta
456
    // may be outdated due to schema change. Also note that the the cids should indicate the indexes
457
    // of the columns in the TabletSchema passed in.
458
    Status fetch_value_through_row_column(RowsetSharedPtr input_rowset,
459
                                          const TabletSchema& tablet_schema, uint32_t segid,
460
                                          const std::vector<uint32_t>& rowids,
461
                                          const std::vector<uint32_t>& cids,
462
                                          vectorized::Block& block);
463
464
    // calc delete bitmap when flush memtable, use a fake version to calc
465
    // For example, cur max version is 5, and we use version 6 to calc but
466
    // finally this rowset publish version with 8, we should make up data
467
    // for rowset 6-7. Also, if a compaction happens between commit_txn and
468
    // publish_txn, we should remove compaction input rowsets' delete_bitmap
469
    // and build newly generated rowset's delete_bitmap
470
    Status calc_delete_bitmap(RowsetSharedPtr rowset,
471
                              const std::vector<segment_v2::SegmentSharedPtr>& segments,
472
                              const std::vector<RowsetSharedPtr>& specified_rowsets,
473
                              DeleteBitmapPtr delete_bitmap, int64_t version,
474
                              CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr);
475
476
    std::vector<RowsetSharedPtr> get_rowset_by_ids(
477
            const RowsetIdUnorderedSet* specified_rowset_ids);
478
479
    Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
480
                                      const segment_v2::SegmentSharedPtr& seg,
481
                                      const std::vector<RowsetSharedPtr>& specified_rowsets,
482
                                      DeleteBitmapPtr delete_bitmap, int64_t end_version,
483
                                      RowsetWriter* rowset_writer);
484
485
    Status calc_delete_bitmap_between_segments(
486
            RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
487
            DeleteBitmapPtr delete_bitmap);
488
    Status read_columns_by_plan(TabletSchemaSPtr tablet_schema,
489
                                const std::vector<uint32_t> cids_to_read,
490
                                const PartialUpdateReadPlan& read_plan,
491
                                const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
492
                                vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index);
493
    void prepare_to_read(const RowLocation& row_location, size_t pos,
494
                         PartialUpdateReadPlan* read_plan);
495
    Status generate_new_block_for_partial_update(
496
            TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
497
            const PartialUpdateReadPlan& read_plan_ori,
498
            const PartialUpdateReadPlan& read_plan_update,
499
            const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
500
            vectorized::Block* output_block);
501
502
    Status update_delete_bitmap_without_lock(
503
            const RowsetSharedPtr& rowset,
504
            const std::vector<RowsetSharedPtr>* specified_base_rowsets = nullptr);
505
506
    Status commit_phase_update_delete_bitmap(
507
            const RowsetSharedPtr& rowset, RowsetIdUnorderedSet& pre_rowset_ids,
508
            DeleteBitmapPtr delete_bitmap,
509
            const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id,
510
            CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr);
511
512
    Status update_delete_bitmap(TabletTxnInfo* txn_info, int64_t txn_id);
513
    void calc_compaction_output_rowset_delete_bitmap(
514
            const std::vector<RowsetSharedPtr>& input_rowsets,
515
            const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version,
516
            std::set<RowLocation>* missed_rows,
517
            std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map,
518
            const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap);
519
    void merge_delete_bitmap(const DeleteBitmap& delete_bitmap);
520
    Status check_rowid_conversion(
521
            RowsetSharedPtr dst_rowset,
522
            const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
523
                    location_map);
524
    Status all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const;
525
    void sort_block(vectorized::Block& in_block, vectorized::Block& output_block);
526
527
    bool check_all_rowset_segment();
528
529
    void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
530
531
    void set_skip_compaction(bool skip,
532
                             CompactionType compaction_type = CompactionType::CUMULATIVE_COMPACTION,
533
                             int64_t start = -1);
534
    bool should_skip_compaction(CompactionType compaction_type, int64_t now);
535
536
    RowsetSharedPtr get_rowset(const RowsetId& rowset_id);
537
538
4
    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor) {
539
4
        std::shared_lock rlock(_meta_lock);
540
52
        for (auto& [v, rs] : _rs_version_map) {
541
52
            visitor(rs);
542
52
        }
543
4
    }
544
545
    std::vector<std::string> get_binlog_filepath(std::string_view binlog_version) const;
546
    std::pair<std::string, int64_t> get_binlog_info(std::string_view binlog_version) const;
547
    std::string get_rowset_binlog_meta(std::string_view binlog_version,
548
                                       std::string_view rowset_id) const;
549
    Status get_rowset_binlog_metas(const std::vector<int64_t>& binlog_versions,
550
                                   RowsetBinlogMetasPB* metas_pb);
551
    Status get_rowset_binlog_metas(Version binlog_versions, RowsetBinlogMetasPB* metas_pb);
552
    std::string get_segment_filepath(std::string_view rowset_id,
553
                                     std::string_view segment_index) const;
554
    std::string get_segment_filepath(std::string_view rowset_id, int64_t segment_index) const;
555
    std::string get_segment_index_filepath(std::string_view rowset_id,
556
                                           std::string_view segment_index,
557
                                           std::string_view index_id) const;
558
    std::string get_segment_index_filepath(std::string_view rowset_id, int64_t segment_index,
559
                                           int64_t index_id) const;
560
    bool can_add_binlog(uint64_t total_binlog_size) const;
561
    void gc_binlogs(int64_t version);
562
    Status ingest_binlog_metas(RowsetBinlogMetasPB* metas_pb);
563
564
0
    inline void report_error(const Status& st) {
565
0
        if (st.is<ErrorCode::IO_ERROR>()) {
566
0
            ++_io_error_times;
567
0
        } else if (st.is<ErrorCode::CORRUPTION>()) {
568
0
            _io_error_times = config::max_tablet_io_errors + 1;
569
0
        }
570
0
    }
571
572
0
    inline int64_t get_io_error_times() const { return _io_error_times; }
573
574
0
    inline bool is_io_error_too_times() const {
575
0
        return config::max_tablet_io_errors > 0 && _io_error_times >= config::max_tablet_io_errors;
576
0
    }
577
578
0
    int64_t get_table_id() { return _tablet_meta->table_id(); }
579
580
    // binlog releated functions
581
    bool is_enable_binlog();
582
0
    bool is_binlog_enabled() { return _tablet_meta->binlog_config().is_enable(); }
583
0
    int64_t binlog_ttl_ms() const { return _tablet_meta->binlog_config().ttl_seconds(); }
584
0
    int64_t binlog_max_bytes() const { return _tablet_meta->binlog_config().max_bytes(); }
585
586
    void set_binlog_config(BinlogConfig binlog_config);
587
    void add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap,
588
                                            const RowsetIdUnorderedSet& rowsetids);
589
    Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version,
590
                                           int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
591
                                           std::vector<RowsetSharedPtr>* rowsets = nullptr);
592
0
    void set_alter_failed(bool alter_failed) { _alter_failed = alter_failed; }
593
0
    bool is_alter_failed() { return _alter_failed; }
594
595
0
    void set_is_full_compaction_running(bool is_full_compaction_running) {
596
0
        _is_full_compaction_running = is_full_compaction_running;
597
0
    }
598
0
    inline bool is_full_compaction_running() const { return _is_full_compaction_running; }
599
    void clear_cache();
600
601
private:
602
    Status _init_once_action();
603
    void _print_missed_versions(const std::vector<Version>& missed_versions) const;
604
    bool _contains_rowset(const RowsetId rowset_id);
605
    Status _contains_version(const Version& version);
606
607
    // Returns:
608
    // version: the max continuous version from beginning
609
    // max_version: the max version of this tablet
610
    void _max_continuous_version_from_beginning_unlocked(Version* version, Version* max_version,
611
                                                         bool* has_version_cross) const;
612
    RowsetSharedPtr _rowset_with_largest_size();
613
    /// Delete stale rowset by version. This method not only delete the version in expired rowset map,
614
    /// but also delete the version in rowset meta vector.
615
    void _delete_stale_rowset_by_version(const Version& version);
616
    Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path,
617
                                                std::vector<RowsetSharedPtr>* rowsets) const;
618
619
    uint32_t _calc_cumulative_compaction_score(
620
            std::shared_ptr<CumulativeCompactionPolicy> cumulative_compaction_policy);
621
    uint32_t _calc_base_compaction_score() const;
622
623
    // When the proportion of empty edges in the adjacency matrix used to represent the version graph
624
    // in the version tracker is greater than the threshold, rebuild the version tracker
625
    bool _reconstruct_version_tracker_if_necessary();
626
    void _init_context_common_fields(RowsetWriterContext& context);
627
628
    void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const RowsetIdUnorderedSet& pre,
629
                                RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del);
630
    Status _load_rowset_segments(const RowsetSharedPtr& rowset,
631
                                 std::vector<segment_v2::SegmentSharedPtr>* segments);
632
633
    ////////////////////////////////////////////////////////////////////////////
634
    // begin cooldown functions
635
    ////////////////////////////////////////////////////////////////////////////
636
    Status _cooldown_data();
637
    Status _follow_cooldowned_data();
638
    Status _read_cooldown_meta(const std::shared_ptr<io::RemoteFileSystem>& fs,
639
                               TabletMetaPB* tablet_meta_pb);
640
    bool _has_data_to_cooldown();
641
    int64_t _get_newest_cooldown_time(const RowsetSharedPtr& rowset);
642
    ////////////////////////////////////////////////////////////////////////////
643
    // end cooldown functions
644
    ////////////////////////////////////////////////////////////////////////////
645
646
    void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr delete_bitmap);
647
    std::string _get_rowset_info_str(RowsetSharedPtr rowset, bool delete_flag);
648
649
    void _clear_cache_by_rowset(const BetaRowsetSharedPtr& rowset);
650
651
public:
652
    static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
653
654
private:
655
    TimestampedVersionTracker _timestamped_version_tracker;
656
657
    DorisCallOnce<Status> _init_once;
658
    // meta store lock is used for prevent 2 threads do checkpoint concurrently
659
    // it will be used in econ-mode in the future
660
    std::shared_mutex _meta_store_lock;
661
    std::mutex _ingest_lock;
662
    std::mutex _base_compaction_lock;
663
    std::mutex _cumulative_compaction_lock;
664
    std::mutex _schema_change_lock;
665
    std::shared_timed_mutex _migration_lock;
666
    std::mutex _build_inverted_index_lock;
667
668
    // TODO(lingbin): There is a _meta_lock TabletMeta too, there should be a comment to
669
    // explain how these two locks work together.
670
    mutable std::shared_mutex _meta_lock;
671
672
    // In unique key table with MoW, we should guarantee that only one
673
    // writer can update rowset and delete bitmap at the same time.
674
    // We use a separate lock rather than _meta_lock, to avoid blocking read queries
675
    // during publish_txn, which might take hundreds of milliseconds
676
    mutable std::mutex _rowset_update_lock;
677
678
    // After version 0.13, all newly created rowsets are saved in _rs_version_map.
679
    // And if rowset being compacted, the old rowsetis will be saved in _stale_rs_version_map;
680
    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map;
681
    // This variable _stale_rs_version_map is used to record these rowsets which are be compacted.
682
    // These _stale rowsets are been removed when rowsets' pathVersion is expired,
683
    // this policy is judged and computed by TimestampedVersionTracker.
684
    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;
685
    // if this tablet is broken, set to true. default is false
686
    std::atomic<bool> _is_bad;
687
    // timestamp of last cumu compaction failure
688
    std::atomic<int64_t> _last_cumu_compaction_failure_millis;
689
    // timestamp of last base compaction failure
690
    std::atomic<int64_t> _last_base_compaction_failure_millis;
691
    // timestamp of last full compaction failure
692
    std::atomic<int64_t> _last_full_compaction_failure_millis;
693
    // timestamp of last cumu compaction success
694
    std::atomic<int64_t> _last_cumu_compaction_success_millis;
695
    // timestamp of last base compaction success
696
    std::atomic<int64_t> _last_base_compaction_success_millis;
697
    // timestamp of last full compaction success
698
    std::atomic<int64_t> _last_full_compaction_success_millis;
699
    // timestamp of last base compaction schedule time
700
    std::atomic<int64_t> _last_base_compaction_schedule_millis;
701
    std::atomic<int64_t> _cumulative_point;
702
    std::atomic<int64_t> _cumulative_promotion_size;
703
    std::atomic<int32_t> _newly_created_rowset_num;
704
    std::atomic<int64_t> _last_checkpoint_time;
705
    std::string _last_base_compaction_status;
706
707
    // single replica compaction status
708
    std::string _last_single_compaction_failure_status;
709
    Version _last_fetched_version;
710
711
    // cumulative compaction policy
712
    std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
713
    std::string_view _cumulative_compaction_type;
714
715
    // use a seperate thread to check all tablets paths existance
716
    std::atomic<bool> _is_tablet_path_exists;
717
718
    int64_t _last_missed_version;
719
    int64_t _last_missed_time_s;
720
721
    // Max schema_version schema from Rowset or FE
722
    TabletSchemaSPtr _max_version_schema;
723
724
    bool _skip_cumu_compaction = false;
725
    int64_t _skip_cumu_compaction_ts;
726
727
    bool _skip_base_compaction = false;
728
    int64_t _skip_base_compaction_ts;
729
730
    // cooldown related
731
    int64_t _cooldown_replica_id = -1;
732
    int64_t _cooldown_term = -1;
733
    // `_cooldown_conf_lock` is used to serialize update cooldown conf and all operations that:
734
    // 1. read cooldown conf
735
    // 2. upload rowsets to remote storage
736
    // 3. update cooldown meta id
737
    mutable std::shared_mutex _cooldown_conf_lock;
738
    // `_cold_compaction_lock` is used to serialize cold data compaction and all operations that
739
    // may delete compaction input rowsets.
740
    std::mutex _cold_compaction_lock;
741
    int64_t _last_failed_follow_cooldown_time = 0;
742
    // `_alter_failed` is used to indicate whether the tablet failed to perform a schema change
743
    std::atomic<bool> _alter_failed = false;
744
745
    DISALLOW_COPY_AND_ASSIGN(Tablet);
746
747
    int64_t _io_error_times = 0;
748
749
public:
750
    IntCounter* flush_bytes;
751
    IntCounter* flush_finish_count;
752
    std::atomic<int64_t> publised_count = 0;
753
    std::atomic_bool _is_full_compaction_running = false;
754
};
755
756
1
inline CumulativeCompactionPolicy* Tablet::cumulative_compaction_policy() {
757
1
    return _cumulative_compaction_policy.get();
758
1
}
759
760
4
inline bool Tablet::init_succeeded() {
761
4
    return _init_once.has_called() && _init_once.stored_result().ok();
762
4
}
763
764
2.09k
inline bool Tablet::is_used() {
765
2.09k
    return !_is_bad && _data_dir->is_used();
766
2.09k
}
767
768
26
inline void Tablet::register_tablet_into_dir() {
769
26
    _data_dir->register_tablet(this);
770
26
}
771
772
23
inline void Tablet::deregister_tablet_from_dir() {
773
23
    _data_dir->deregister_tablet(this);
774
23
}
775
776
9
inline int64_t Tablet::cumulative_layer_point() const {
777
9
    return _cumulative_point;
778
9
}
779
780
54
inline void Tablet::set_cumulative_layer_point(int64_t new_point) {
781
    // cumulative point should only be reset to -1, or be increased
782
54
    CHECK(new_point == Tablet::K_INVALID_CUMULATIVE_POINT || new_point >= _cumulative_point)
783
0
            << "Unexpected cumulative point: " << new_point
784
0
            << ", origin: " << _cumulative_point.load();
785
54
    _cumulative_point = new_point;
786
54
}
787
788
11
inline int64_t Tablet::cumulative_promotion_size() const {
789
11
    return _cumulative_promotion_size;
790
11
}
791
792
20
inline void Tablet::set_cumulative_promotion_size(int64_t new_size) {
793
20
    _cumulative_promotion_size = new_size;
794
20
}
795
796
398
inline bool Tablet::enable_unique_key_merge_on_write() const {
797
398
#ifdef BE_TEST
798
398
    if (_tablet_meta == nullptr) {
799
0
        return false;
800
0
    }
801
398
#endif
802
398
    return _tablet_meta->enable_unique_key_merge_on_write();
803
398
}
804
805
// TODO(lingbin): Why other methods that need to get information from _tablet_meta
806
// are not locked, here needs a comment to explain.
807
5
inline size_t Tablet::tablet_footprint() {
808
5
    std::shared_lock rdlock(_meta_lock);
809
5
    return _tablet_meta->tablet_footprint();
810
5
}
811
812
0
inline size_t Tablet::tablet_local_size() {
813
0
    std::shared_lock rdlock(_meta_lock);
814
0
    return _tablet_meta->tablet_local_size();
815
0
}
816
817
0
inline size_t Tablet::tablet_remote_size() {
818
0
    std::shared_lock rdlock(_meta_lock);
819
0
    return _tablet_meta->tablet_remote_size();
820
0
}
821
822
// TODO(lingbin): Why other methods which need to get information from _tablet_meta
823
// are not locked, here needs a comment to explain.
824
12
inline size_t Tablet::num_rows() {
825
12
    std::shared_lock rdlock(_meta_lock);
826
12
    return _tablet_meta->num_rows();
827
12
}
828
829
8
inline int Tablet::version_count() const {
830
8
    std::shared_lock rdlock(_meta_lock);
831
8
    return _tablet_meta->version_count();
832
8
}
833
834
8
inline int Tablet::stale_version_count() const {
835
8
    std::shared_lock rdlock(_meta_lock);
836
8
    return _tablet_meta->stale_version_count();
837
8
}
838
839
20
inline Version Tablet::max_version() const {
840
20
    std::shared_lock rdlock(_meta_lock);
841
20
    return _tablet_meta->max_version();
842
20
}
843
844
0
inline uint64_t Tablet::segment_count() const {
845
0
    std::shared_lock rdlock(_meta_lock);
846
0
    uint64_t segment_nums = 0;
847
0
    for (auto& rs_meta : _tablet_meta->all_rs_metas()) {
848
0
        segment_nums += rs_meta->num_segments();
849
0
    }
850
0
    return segment_nums;
851
0
}
852
853
4
inline Version Tablet::max_version_unlocked() const {
854
4
    return _tablet_meta->max_version();
855
4
}
856
857
615
inline KeysType Tablet::keys_type() const {
858
615
    return _schema->keys_type();
859
615
}
860
861
0
inline SortType Tablet::sort_type() const {
862
0
    return _schema->sort_type();
863
0
}
864
865
0
inline size_t Tablet::sort_col_num() const {
866
0
    return _schema->sort_col_num();
867
0
}
868
869
0
inline size_t Tablet::num_columns() const {
870
0
    return _schema->num_columns();
871
0
}
872
873
0
inline size_t Tablet::num_null_columns() const {
874
0
    return _schema->num_null_columns();
875
0
}
876
877
128
inline size_t Tablet::num_key_columns() const {
878
128
    return _schema->num_key_columns();
879
128
}
880
881
0
inline size_t Tablet::num_short_key_columns() const {
882
0
    return _schema->num_short_key_columns();
883
0
}
884
885
0
inline size_t Tablet::num_rows_per_row_block() const {
886
0
    return _schema->num_rows_per_row_block();
887
0
}
888
889
0
inline CompressKind Tablet::compress_kind() const {
890
0
    return _schema->compress_kind();
891
0
}
892
893
0
inline double Tablet::bloom_filter_fpp() const {
894
0
    return _schema->bloom_filter_fpp();
895
0
}
896
897
0
inline size_t Tablet::next_unique_id() const {
898
0
    return _schema->next_column_unique_id();
899
0
}
900
901
0
inline size_t Tablet::row_size() const {
902
0
    return _schema->row_size();
903
0
}
904
905
8
inline int64_t Tablet::avg_rs_meta_serialize_size() const {
906
8
    return _tablet_meta->avg_rs_meta_serialize_size();
907
8
}
908
909
} // namespace doris