Coverage Report

Created: 2024-11-20 21:21

/root/doris/be/src/olap/base_tablet.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <memory>
21
#include <shared_mutex>
22
#include <string>
23
24
#include "common/status.h"
25
#include "olap/iterators.h"
26
#include "olap/olap_common.h"
27
#include "olap/partial_update_info.h"
28
#include "olap/rowset/segment_v2/segment.h"
29
#include "olap/tablet_fwd.h"
30
#include "olap/tablet_meta.h"
31
#include "olap/tablet_schema.h"
32
#include "olap/version_graph.h"
33
#include "util/metrics.h"
34
35
namespace doris {
36
struct RowSetSplits;
37
struct RowsetWriterContext;
38
class RowsetWriter;
39
class CalcDeleteBitmapToken;
40
class SegmentCacheHandle;
41
class RowIdConversion;
42
struct PartialUpdateInfo;
43
class FixedReadPlan;
44
45
struct TabletWithVersion {
46
    BaseTabletSPtr tablet;
47
    int64_t version;
48
};
49
50
enum class CompactionStage { NOT_SCHEDULED, PENDING, EXECUTING };
51
52
// Base class for all tablet classes
53
class BaseTablet {
54
public:
55
    explicit BaseTablet(TabletMetaSharedPtr tablet_meta);
56
    virtual ~BaseTablet();
57
    BaseTablet(const BaseTablet&) = delete;
58
    BaseTablet& operator=(const BaseTablet&) = delete;
59
60
8.57k
    TabletState tablet_state() const { return _tablet_meta->tablet_state(); }
61
    Status set_tablet_state(TabletState state);
62
2
    int64_t table_id() const { return _tablet_meta->table_id(); }
63
0
    int64_t index_id() const { return _tablet_meta->index_id(); }
64
262
    int64_t partition_id() const { return _tablet_meta->partition_id(); }
65
4.30k
    int64_t tablet_id() const { return _tablet_meta->tablet_id(); }
66
335
    int32_t schema_hash() const { return _tablet_meta->schema_hash(); }
67
615
    KeysType keys_type() const { return _tablet_meta->tablet_schema()->keys_type(); }
68
128
    size_t num_key_columns() const { return _tablet_meta->tablet_schema()->num_key_columns(); }
69
141
    int64_t ttl_seconds() const { return _tablet_meta->ttl_seconds(); }
70
0
    std::mutex& get_schema_change_lock() { return _schema_change_lock; }
71
607
    bool enable_unique_key_merge_on_write() const {
72
607
#ifdef BE_TEST
73
607
        if (_tablet_meta == nullptr) {
74
0
            return false;
75
0
        }
76
607
#endif
77
607
        return _tablet_meta->enable_unique_key_merge_on_write();
78
607
    }
79
80
    // Property encapsulated in TabletMeta
81
731
    const TabletMetaSharedPtr& tablet_meta() { return _tablet_meta; }
82
83
    // FIXME(plat1ko): It is not appropriate to expose this lock
84
66
    std::shared_mutex& get_header_lock() { return _meta_lock; }
85
86
    void update_max_version_schema(const TabletSchemaSPtr& tablet_schema);
87
88
    Status update_by_least_common_schema(const TabletSchemaSPtr& update_schema);
89
90
966
    TabletSchemaSPtr tablet_schema() const {
91
966
        std::shared_lock rlock(_meta_lock);
92
966
        return _max_version_schema;
93
966
    }
94
95
    virtual bool exceed_version_limit(int32_t limit) = 0;
96
97
    virtual Result<std::unique_ptr<RowsetWriter>> create_rowset_writer(RowsetWriterContext& context,
98
                                                                       bool vertical) = 0;
99
100
    virtual Status capture_consistent_rowsets_unlocked(
101
            const Version& spec_version, std::vector<RowsetSharedPtr>* rowsets) const = 0;
102
103
    virtual Status capture_rs_readers(const Version& spec_version,
104
                                      std::vector<RowSetSplits>* rs_splits,
105
                                      bool skip_missing_version) = 0;
106
107
    virtual size_t tablet_footprint() = 0;
108
109
    // this method just return the compaction sum on each rowset
110
    // note(tsy): we should unify the compaction score calculation finally
111
    uint32_t get_real_compaction_score() const;
112
113
    // MUST hold shared meta lock
114
    Status capture_rs_readers_unlocked(const Versions& version_path,
115
                                       std::vector<RowSetSplits>* rs_splits) const;
116
117
    // _rs_version_map and _stale_rs_version_map should be protected by _meta_lock
118
    // The caller must call hold _meta_lock when call this three function.
119
    RowsetSharedPtr get_rowset_by_version(const Version& version, bool find_is_stale = false) const;
120
    RowsetSharedPtr get_stale_rowset_by_version(const Version& version) const;
121
    RowsetSharedPtr get_rowset_with_max_version() const;
122
123
    Status get_all_rs_id(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const;
124
    Status get_all_rs_id_unlocked(int64_t max_version, RowsetIdUnorderedSet* rowset_ids) const;
125
126
    // Get the missed versions until the spec_version.
127
    Versions get_missed_versions(int64_t spec_version) const;
128
    Versions get_missed_versions_unlocked(int64_t spec_version) const;
129
130
    void generate_tablet_meta_copy(TabletMeta& new_tablet_meta) const;
131
    void generate_tablet_meta_copy_unlocked(TabletMeta& new_tablet_meta) const;
132
133
36
    virtual int64_t max_version_unlocked() const { return _tablet_meta->max_version().second; }
134
135
    static TabletSchemaSPtr tablet_schema_with_merged_max_schema_version(
136
            const std::vector<RowsetMetaSharedPtr>& rowset_metas);
137
138
    ////////////////////////////////////////////////////////////////////////////
139
    // begin MoW functions
140
    ////////////////////////////////////////////////////////////////////////////
141
    std::vector<RowsetSharedPtr> get_rowset_by_ids(
142
            const RowsetIdUnorderedSet* specified_rowset_ids);
143
144
    // Lookup a row with TupleDescriptor and fill Block
145
    Status lookup_row_data(const Slice& encoded_key, const RowLocation& row_location,
146
                           RowsetSharedPtr rowset, const TupleDescriptor* desc,
147
                           OlapReaderStatistics& stats, std::string& values,
148
                           bool write_to_cache = false);
149
150
    // Lookup the row location of `encoded_key`, the function sets `row_location` on success.
151
    // NOTE: the method only works in unique key model with primary key index, you will got a
152
    //       not supported error in other data model.
153
    Status lookup_row_key(const Slice& encoded_key, TabletSchema* latest_schema, bool with_seq_col,
154
                          const std::vector<RowsetSharedPtr>& specified_rowsets,
155
                          RowLocation* row_location, uint32_t version,
156
                          std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
157
                          RowsetSharedPtr* rowset = nullptr, bool with_rowid = true,
158
                          std::string* encoded_seq_value = nullptr,
159
                          OlapReaderStatistics* stats = nullptr);
160
161
    // calc delete bitmap when flush memtable, use a fake version to calc
162
    // For example, cur max version is 5, and we use version 6 to calc but
163
    // finally this rowset publish version with 8, we should make up data
164
    // for rowset 6-7. Also, if a compaction happens between commit_txn and
165
    // publish_txn, we should remove compaction input rowsets' delete_bitmap
166
    // and build newly generated rowset's delete_bitmap
167
    static Status calc_delete_bitmap(const BaseTabletSPtr& tablet, RowsetSharedPtr rowset,
168
                                     const std::vector<segment_v2::SegmentSharedPtr>& segments,
169
                                     const std::vector<RowsetSharedPtr>& specified_rowsets,
170
                                     DeleteBitmapPtr delete_bitmap, int64_t version,
171
                                     CalcDeleteBitmapToken* token,
172
                                     RowsetWriter* rowset_writer = nullptr);
173
174
    Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
175
                                      const segment_v2::SegmentSharedPtr& seg,
176
                                      const std::vector<RowsetSharedPtr>& specified_rowsets,
177
                                      DeleteBitmapPtr delete_bitmap, int64_t end_version,
178
                                      RowsetWriter* rowset_writer);
179
180
    Status calc_delete_bitmap_between_segments(
181
            RowsetSharedPtr rowset, const std::vector<segment_v2::SegmentSharedPtr>& segments,
182
            DeleteBitmapPtr delete_bitmap);
183
184
    static Status commit_phase_update_delete_bitmap(
185
            const BaseTabletSPtr& tablet, const RowsetSharedPtr& rowset,
186
            RowsetIdUnorderedSet& pre_rowset_ids, DeleteBitmapPtr delete_bitmap,
187
            const std::vector<segment_v2::SegmentSharedPtr>& segments, int64_t txn_id,
188
            CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr);
189
190
    static void add_sentinel_mark_to_delete_bitmap(DeleteBitmap* delete_bitmap,
191
                                                   const RowsetIdUnorderedSet& rowsetids);
192
193
    Status check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, int64_t max_version,
194
                                           int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
195
                                           std::vector<RowsetSharedPtr>* rowsets = nullptr);
196
197
    static const signed char* get_delete_sign_column_data(const vectorized::Block& block,
198
                                                          size_t rows_at_least = 0);
199
200
    static Status generate_default_value_block(const TabletSchema& schema,
201
                                               const std::vector<uint32_t>& cids,
202
                                               const std::vector<std::string>& default_values,
203
                                               const vectorized::Block& ref_block,
204
                                               vectorized::Block& default_value_block);
205
206
    static Status generate_new_block_for_partial_update(
207
            TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
208
            const FixedReadPlan& read_plan_ori, const FixedReadPlan& read_plan_update,
209
            const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
210
            vectorized::Block* output_block);
211
212
    static Status generate_new_block_for_flexible_partial_update(
213
            TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
214
            std::set<uint32_t>& rids_be_overwritten, const FixedReadPlan& read_plan_ori,
215
            const FixedReadPlan& read_plan_update,
216
            const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
217
            vectorized::Block* output_block);
218
219
    // We use the TabletSchema from the caller because the TabletSchema in the rowset'meta
220
    // may be outdated due to schema change. Also note that the the cids should indicate the indexes
221
    // of the columns in the TabletSchema passed in.
222
    static Status fetch_value_through_row_column(RowsetSharedPtr input_rowset,
223
                                                 const TabletSchema& tablet_schema, uint32_t segid,
224
                                                 const std::vector<uint32_t>& rowids,
225
                                                 const std::vector<uint32_t>& cids,
226
                                                 vectorized::Block& block);
227
228
    static Status fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t segid,
229
                                        const std::vector<uint32_t>& rowids,
230
                                        const TabletColumn& tablet_column,
231
                                        vectorized::MutableColumnPtr& dst);
232
233
    virtual Result<std::unique_ptr<RowsetWriter>> create_transient_rowset_writer(
234
            const Rowset& rowset, std::shared_ptr<PartialUpdateInfo> partial_update_info,
235
            int64_t txn_expiration = 0) = 0;
236
237
    static Status update_delete_bitmap(const BaseTabletSPtr& self, TabletTxnInfo* txn_info,
238
                                       int64_t txn_id, int64_t txn_expiration = 0);
239
240
    virtual Status save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t txn_id,
241
                                      DeleteBitmapPtr delete_bitmap, RowsetWriter* rowset_writer,
242
                                      const RowsetIdUnorderedSet& cur_rowset_ids) = 0;
243
    virtual CalcDeleteBitmapExecutor* calc_delete_bitmap_executor() = 0;
244
245
    void calc_compaction_output_rowset_delete_bitmap(
246
            const std::vector<RowsetSharedPtr>& input_rowsets,
247
            const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version,
248
            std::set<RowLocation>* missed_rows,
249
            std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>* location_map,
250
            const DeleteBitmap& input_delete_bitmap, DeleteBitmap* output_rowset_delete_bitmap);
251
252
    Status check_rowid_conversion(
253
            RowsetSharedPtr dst_rowset,
254
            const std::map<RowsetSharedPtr, std::list<std::pair<RowLocation, RowLocation>>>&
255
                    location_map);
256
257
    static Status update_delete_bitmap_without_lock(
258
            const BaseTabletSPtr& self, const RowsetSharedPtr& rowset,
259
            const std::vector<RowsetSharedPtr>* specified_base_rowsets = nullptr);
260
261
    ////////////////////////////////////////////////////////////////////////////
262
    // end MoW functions
263
    ////////////////////////////////////////////////////////////////////////////
264
265
    RowsetSharedPtr get_rowset(const RowsetId& rowset_id);
266
267
    std::vector<RowsetSharedPtr> get_snapshot_rowset(bool include_stale_rowset = false) const;
268
269
    virtual void clear_cache() = 0;
270
271
    // Find the first consecutive empty rowsets. output->size() >= limit
272
    void calc_consecutive_empty_rowsets(std::vector<RowsetSharedPtr>* empty_rowsets,
273
                                        const std::vector<RowsetSharedPtr>& candidate_rowsets,
274
                                        int limit);
275
276
    // Return the merged schema of all rowsets
277
0
    virtual TabletSchemaSPtr merged_tablet_schema() const { return _max_version_schema; }
278
279
    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor,
280
11
                          bool include_stale = false) {
281
11
        std::shared_lock rlock(_meta_lock);
282
62
        for (auto& [v, rs] : _rs_version_map) {
283
62
            visitor(rs);
284
62
        }
285
11
        if (!include_stale) return;
286
10
        for (auto& [v, rs] : _stale_rs_version_map) {
287
0
            visitor(rs);
288
0
        }
289
10
    }
290
291
    Status calc_file_crc(uint32_t* crc_value, int64_t start_version, int64_t end_version,
292
                         int32_t* rowset_count, int64_t* file_count);
293
294
    Status show_nested_index_file(std::string* json_meta);
295
296
983
    TabletUid tablet_uid() const { return _tablet_meta->tablet_uid(); }
297
141
    TabletInfo get_tablet_info() const { return TabletInfo(tablet_id(), tablet_uid()); }
298
299
protected:
300
    // Find the missed versions until the spec_version.
301
    //
302
    // for example:
303
    //     [0-4][5-5][8-8][9-9][14-14]
304
    // for cloud, if spec_version = 12, it will return [6-7],[10-12]
305
    // for local, if spec_version = 12, it will return [6, 6], [7, 7], [10, 10], [11, 11], [12, 12]
306
    virtual Versions calc_missed_versions(int64_t spec_version,
307
                                          Versions existing_versions) const = 0;
308
309
    void _print_missed_versions(const Versions& missed_versions) const;
310
    bool _reconstruct_version_tracker_if_necessary();
311
312
    static void _rowset_ids_difference(const RowsetIdUnorderedSet& cur,
313
                                       const RowsetIdUnorderedSet& pre,
314
                                       RowsetIdUnorderedSet* to_add, RowsetIdUnorderedSet* to_del);
315
316
    Status _capture_consistent_rowsets_unlocked(const std::vector<Version>& version_path,
317
                                                std::vector<RowsetSharedPtr>* rowsets) const;
318
319
    Status sort_block(vectorized::Block& in_block, vectorized::Block& output_block);
320
321
    mutable std::shared_mutex _meta_lock;
322
    TimestampedVersionTracker _timestamped_version_tracker;
323
    // After version 0.13, all newly created rowsets are saved in _rs_version_map.
324
    // And if rowset being compacted, the old rowsets will be saved in _stale_rs_version_map;
325
    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _rs_version_map;
326
    // This variable _stale_rs_version_map is used to record these rowsets which are be compacted.
327
    // These _stale rowsets are been removed when rowsets' pathVersion is expired,
328
    // this policy is judged and computed by TimestampedVersionTracker.
329
    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> _stale_rs_version_map;
330
    const TabletMetaSharedPtr _tablet_meta;
331
    TabletSchemaSPtr _max_version_schema;
332
333
    // metrics of this tablet
334
    std::shared_ptr<MetricEntity> _metric_entity;
335
336
protected:
337
    std::mutex _schema_change_lock;
338
339
public:
340
    IntCounter* query_scan_bytes = nullptr;
341
    IntCounter* query_scan_rows = nullptr;
342
    IntCounter* query_scan_count = nullptr;
343
    IntCounter* flush_bytes = nullptr;
344
    IntCounter* flush_finish_count = nullptr;
345
    std::atomic<int64_t> published_count = 0;
346
    std::atomic<int64_t> read_block_count = 0;
347
    std::atomic<int64_t> write_count = 0;
348
    std::atomic<int64_t> compaction_count = 0;
349
350
    CompactionStage compaction_stage = CompactionStage::NOT_SCHEDULED;
351
    std::mutex sample_info_lock;
352
    std::vector<CompactionSampleInfo> sample_infos;
353
    Status last_compaction_status = Status::OK();
354
};
355
356
} /* namespace doris */