Coverage Report

Created: 2026-04-08 14:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/olap_file.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
#include <stdint.h>
26
27
#include <atomic>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <string>
32
#include <vector>
33
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "storage/metadata_adder.h"
37
#include "storage/olap_common.h"
38
#include "storage/rowset/rowset_meta.h"
39
#include "storage/tablet/tablet_schema.h"
40
41
namespace doris {
42
43
class Rowset;
44
45
namespace io {
46
class RemoteFileSystem;
47
} // namespace io
48
49
using RowsetSharedPtr = std::shared_ptr<Rowset>;
50
class RowsetReader;
51
52
// the rowset state transfer graph:
53
//    ROWSET_UNLOADED    <--|
54
//          ↓               |
55
//    ROWSET_LOADED         |
56
//          ↓               |
57
//    ROWSET_UNLOADING   -->|
58
enum RowsetState {
59
    // state for new created rowset
60
    ROWSET_UNLOADED,
61
    // state after load() called
62
    ROWSET_LOADED,
63
    // state for closed() called but owned by some readers
64
    ROWSET_UNLOADING
65
};
66
67
class RowsetStateMachine {
68
public:
69
1.06M
    RowsetStateMachine() : _rowset_state(ROWSET_UNLOADED) {}
70
71
304k
    Status on_load() {
72
304k
        switch (_rowset_state) {
73
304k
        case ROWSET_UNLOADED:
74
304k
            _rowset_state = ROWSET_LOADED;
75
304k
            break;
76
77
0
        default:
78
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
79
0
                    "RowsetStateMachine meet invalid state");
80
304k
        }
81
305k
        return Status::OK();
82
304k
    }
83
84
5.07k
    Status on_close(uint64_t refs_by_reader) {
85
5.07k
        switch (_rowset_state) {
86
5.07k
        case ROWSET_LOADED:
87
5.07k
            if (refs_by_reader == 0) {
88
5.07k
                _rowset_state = ROWSET_UNLOADED;
89
5.07k
            } else {
90
0
                _rowset_state = ROWSET_UNLOADING;
91
0
            }
92
5.07k
            break;
93
94
0
        default:
95
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
96
0
                    "RowsetStateMachine meet invalid state");
97
5.07k
        }
98
5.07k
        return Status::OK();
99
5.07k
    }
100
101
0
    Status on_release() {
102
0
        switch (_rowset_state) {
103
0
        case ROWSET_UNLOADING:
104
0
            _rowset_state = ROWSET_UNLOADED;
105
0
            break;
106
107
0
        default:
108
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
109
0
                    "RowsetStateMachine meet invalid state");
110
0
        }
111
0
        return Status::OK();
112
0
    }
113
114
12.2M
    RowsetState rowset_state() { return _rowset_state; }
115
116
private:
117
    RowsetState _rowset_state;
118
};
119
120
class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder<Rowset> {
121
public:
122
    // Open all segment files in this rowset and load necessary metadata.
123
    // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
124
    //
125
    // May be called multiple times, subsequent calls will no-op.
126
    // Derived class implements the load logic by overriding the `do_load_once()` method.
127
    Status load(bool use_cache = true);
128
129
    // returns Status::Error<ErrorCode::ROWSET_CREATE_READER>() when failed to create reader
130
    virtual Status create_reader(std::shared_ptr<RowsetReader>* result) = 0;
131
132
133M
    const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
133
134
    void merge_rowset_meta(const RowsetMeta& other);
135
136
275
    bool is_pending() const { return _is_pending; }
137
138
17.3M
    bool is_local() const { return _rowset_meta->is_local(); }
139
140
666
    const std::string& tablet_path() const { return _tablet_path; }
141
142
    // publish rowset to make it visible to read
143
    void make_visible(Version version, int64_t commit_tso);
144
    void set_version(Version version);
145
    const TabletSchemaSPtr& tablet_schema() const;
146
147
    // helper class to access RowsetMeta
148
7.88M
    int64_t start_version() const { return rowset_meta()->version().first; }
149
12.2M
    int64_t end_version() const { return rowset_meta()->version().second; }
150
216k
    int64_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
151
6.53M
    int64_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
152
494k
    int64_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
153
9.60k
    bool empty() const { return rowset_meta()->empty(); }
154
0
    bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; }
155
9.58M
    size_t num_rows() const { return rowset_meta()->num_rows(); }
156
21.5M
    Version version() const { return rowset_meta()->version(); }
157
28.7M
    RowsetId rowset_id() const { return rowset_meta()->rowset_id(); }
158
129k
    int64_t creation_time() const { return rowset_meta()->creation_time(); }
159
0
    PUniqueId load_id() const { return rowset_meta()->load_id(); }
160
153k
    int64_t txn_id() const { return rowset_meta()->txn_id(); }
161
0
    int64_t partition_id() const { return rowset_meta()->partition_id(); }
162
    // flag for push delete rowset
163
0
    bool delete_flag() const { return rowset_meta()->delete_flag(); }
164
19.6M
    MOCK_FUNCTION int64_t num_segments() const { return rowset_meta()->num_segments(); }
165
    void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); }
166
0
    RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
167
    // The writing time of the newest data in rowset, to measure the freshness of a rowset.
168
158k
    int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
169
    // The commit tso of the newest data in rowset.
170
126k
    int64_t commit_tso() const { return rowset_meta()->commit_tso(); }
171
172
1.78M
    bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
173
3.64M
    KeysType keys_type() { return _schema->keys_type(); }
174
3.23k
    RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
175
32
    bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); }
176
177
    // remove all files in this rowset
178
    // TODO should we rename the method to remove_files() to be more specific?
179
    virtual Status remove() = 0;
180
181
    // close to clear the resource owned by rowset
182
    // including: open files, indexes and so on
183
    // NOTICE: can not call this function in multithreads
184
31.1k
    void close() {
185
31.1k
        RowsetState old_state = _rowset_state_machine.rowset_state();
186
31.1k
        if (old_state != ROWSET_LOADED) {
187
26.0k
            return;
188
26.0k
        }
189
5.07k
        Status st = Status::OK();
190
5.07k
        {
191
5.07k
            std::lock_guard close_lock(_lock);
192
5.07k
            uint64_t current_refs = _refs_by_reader;
193
5.07k
            old_state = _rowset_state_machine.rowset_state();
194
5.07k
            if (old_state != ROWSET_LOADED) {
195
0
                return;
196
0
            }
197
5.07k
            if (current_refs == 0) {
198
5.07k
                do_close();
199
5.07k
            }
200
5.07k
            st = _rowset_state_machine.on_close(current_refs);
201
5.07k
        }
202
5.07k
        if (!st.ok()) {
203
0
            LOG(WARNING) << "state transition failed from:" << _rowset_state_machine.rowset_state();
204
0
            return;
205
0
        }
206
5.07k
        VLOG_NOTICE << "rowset is close. rowset state from:" << old_state << " to "
207
0
                    << _rowset_state_machine.rowset_state() << ", version:" << start_version()
208
0
                    << "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id();
209
5.07k
    }
210
211
    // hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`.
212
    virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
213
                                 size_t new_rowset_start_seg_id = 0,
214
                                 std::set<int64_t>* without_index_uids = nullptr) = 0;
215
216
    virtual Status get_inverted_index_size(int64_t* index_size) = 0;
217
218
    // copy all files to `dir`
219
    virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;
220
221
    virtual Status upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) = 0;
222
223
    virtual Status remove_old_files(std::vector<std::string>* files_to_remove) = 0;
224
225
    virtual Status check_file_exist() = 0;
226
227
31.0k
    bool need_delete_file() const { return _need_delete_file; }
228
229
31.1k
    void set_need_delete_file() { _need_delete_file = true; }
230
231
503k
    bool contains_version(Version version) const {
232
503k
        return rowset_meta()->version().contains(version);
233
503k
    }
234
235
1.26M
    static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
236
1.26M
        return left->end_version() < right->end_version();
237
1.26M
    }
238
239
    // this function is called by reader to increase reference of rowset
240
9.09M
    void acquire() { ++_refs_by_reader; }
241
242
9.09M
    void release() {
243
        // if the refs by reader is 0 and the rowset is closed, should release the resouce
244
9.09M
        uint64_t current_refs = --_refs_by_reader;
245
9.09M
        if (current_refs == 0 && _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
246
0
            {
247
0
                std::lock_guard release_lock(_lock);
248
                // rejudge _refs_by_reader because we do not add lock in create reader
249
0
                if (_refs_by_reader == 0 &&
250
0
                    _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
251
                    // first do close, then change state
252
0
                    do_close();
253
0
                    static_cast<void>(_rowset_state_machine.on_release());
254
0
                }
255
0
            }
256
0
            if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
257
0
                VLOG_NOTICE
258
0
                        << "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED"
259
0
                        << ", version:" << start_version() << "-" << end_version()
260
0
                        << ", tabletid:" << _rowset_meta->tablet_id();
261
0
            }
262
0
        }
263
9.09M
    }
264
265
12
    void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) {
266
12
        if (delayed_expired_timestamp > _delayed_expired_timestamp) {
267
6
            _delayed_expired_timestamp = delayed_expired_timestamp;
268
6
        }
269
12
    }
270
271
165
    uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; }
272
273
3.52k
    virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) {
274
3.52k
        _rowset_meta->get_segments_key_bounds(segments_key_bounds);
275
3.52k
        return Status::OK();
276
3.52k
    }
277
278
3.50k
    void get_num_segment_rows(std::vector<uint32_t>* num_segment_rows) {
279
3.50k
        _rowset_meta->get_num_segment_rows(num_segment_rows);
280
3.50k
    }
281
282
    // min key of the first segment
283
1.40M
    bool first_key(std::string* min_key) {
284
1.40M
        KeyBoundsPB key_bounds;
285
1.40M
        bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
286
1.40M
        if (!ret) {
287
0
            return false;
288
0
        }
289
1.40M
        *min_key = key_bounds.min_key();
290
1.40M
        return true;
291
1.40M
    }
292
293
    // max key of the last segment
294
971k
    bool last_key(std::string* max_key) {
295
971k
        KeyBoundsPB key_bounds;
296
971k
        bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
297
971k
        if (!ret) {
298
0
            return false;
299
0
        }
300
971k
        *max_key = key_bounds.max_key();
301
971k
        return true;
302
971k
    }
303
304
1.41M
    bool is_segments_key_bounds_truncated() const {
305
1.41M
        return _rowset_meta->is_segments_key_bounds_truncated();
306
1.41M
    }
307
308
    bool check_rowset_segment();
309
310
0
    [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
311
312
    // is skip index compaction this time
313
10.6k
    bool is_skip_index_compaction(int32_t column_id) const {
314
10.6k
        return skip_index_compaction.find(column_id) != skip_index_compaction.end();
315
10.6k
    }
316
317
    // set skip index compaction next time
318
5
    void set_skip_index_compaction(int32_t column_id) { skip_index_compaction.insert(column_id); }
319
320
    std::string get_rowset_info_str();
321
322
    void clear_cache();
323
324
    MOCK_FUNCTION Result<std::string> segment_path(int64_t seg_id);
325
326
    std::vector<std::string> get_index_file_names();
327
328
    // check if the rowset is a hole rowset
329
54.5k
    bool is_hole_rowset() const { return _is_hole_rowset; }
330
    // set the rowset as a hole rowset
331
1.74k
    void set_hole_rowset(bool is_hole_rowset) { _is_hole_rowset = is_hole_rowset; }
332
333
    int64_t approximate_cached_data_size();
334
335
    int64_t approximate_cache_index_size();
336
337
    std::chrono::time_point<std::chrono::system_clock> visible_timestamp() const;
338
339
protected:
340
    friend class RowsetFactory;
341
342
    DISALLOW_COPY_AND_ASSIGN(Rowset);
343
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
344
    Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
345
           std::string tablet_path);
346
347
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
348
    virtual Status init() = 0;
349
350
    // release resources in this api
351
    virtual void do_close() = 0;
352
353
    virtual Status check_current_rowset_segment() = 0;
354
355
    virtual void clear_inverted_index_cache() = 0;
356
357
    TabletSchemaSPtr _schema;
358
359
    RowsetMetaSharedPtr _rowset_meta;
360
361
    // Local rowset requires a tablet path to obtain the absolute path on the local fs
362
    std::string _tablet_path;
363
364
    // init in constructor
365
    bool _is_pending;    // rowset is pending iff it's not in visible state
366
    bool _is_cumulative; // rowset is cumulative iff it's visible and start version < end version
367
368
    // mutex lock for load/close api because it is costly
369
    std::mutex _lock;
370
    bool _need_delete_file = false;
371
    // variable to indicate how many rowset readers owned this rowset
372
    std::atomic<uint64_t> _refs_by_reader;
373
    // rowset state machine
374
    RowsetStateMachine _rowset_state_machine;
375
    std::atomic<uint64_t> _delayed_expired_timestamp = 0;
376
377
    // <column_uniq_id>, skip index compaction
378
    std::set<int32_t> skip_index_compaction;
379
380
    // only used for cloud mode, it indicates whether this rowset is a hole rowset.
381
    // a hole rowset is a rowset that has no data, but is used to fill the version gap
382
    // it is used to ensure that the version sequence is continuous.
383
    bool _is_hole_rowset = false;
384
};
385
386
// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
387
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
388
389
} // namespace doris