Coverage Report

Created: 2025-06-08 11:30

/root/doris/be/src/olap/rowset/rowset.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/olap_file.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
#include <stdint.h>
26
27
#include <atomic>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <string>
32
#include <vector>
33
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "olap/olap_common.h"
37
#include "olap/rowset/rowset_meta.h"
38
#include "olap/tablet_schema.h"
39
40
namespace doris {
41
42
class Rowset;
43
44
namespace io {
45
class RemoteFileSystem;
46
} // namespace io
47
48
using RowsetSharedPtr = std::shared_ptr<Rowset>;
49
class RowsetReader;
50
51
// the rowset state transfer graph:
52
//    ROWSET_UNLOADED    <--|
53
//          ↓               |
54
//    ROWSET_LOADED         |
55
//          ↓               |
56
//    ROWSET_UNLOADING   -->|
57
enum RowsetState {
58
    // state for new created rowset
59
    ROWSET_UNLOADED,
60
    // state after load() called
61
    ROWSET_LOADED,
62
    // state for closed() called but owned by some readers
63
    ROWSET_UNLOADING
64
};
65
66
class RowsetStateMachine {
67
public:
68
11.8k
    RowsetStateMachine() : _rowset_state(ROWSET_UNLOADED) {}
69
70
396
    Status on_load() {
71
396
        switch (_rowset_state) {
72
396
        case ROWSET_UNLOADED:
73
396
            _rowset_state = ROWSET_LOADED;
74
396
            break;
75
76
0
        default:
77
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
78
0
                    "RowsetStateMachine meet invalid state");
79
396
        }
80
396
        return Status::OK();
81
396
    }
82
83
0
    Status on_close(uint64_t refs_by_reader) {
84
0
        switch (_rowset_state) {
85
0
        case ROWSET_LOADED:
86
0
            if (refs_by_reader == 0) {
87
0
                _rowset_state = ROWSET_UNLOADED;
88
0
            } else {
89
0
                _rowset_state = ROWSET_UNLOADING;
90
0
            }
91
0
            break;
92
93
0
        default:
94
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
95
0
                    "RowsetStateMachine meet invalid state");
96
0
        }
97
0
        return Status::OK();
98
0
    }
99
100
0
    Status on_release() {
101
0
        switch (_rowset_state) {
102
0
        case ROWSET_UNLOADING:
103
0
            _rowset_state = ROWSET_UNLOADED;
104
0
            break;
105
106
0
        default:
107
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
108
0
                    "RowsetStateMachine meet invalid state");
109
0
        }
110
0
        return Status::OK();
111
0
    }
112
113
1.38k
    RowsetState rowset_state() { return _rowset_state; }
114
115
private:
116
    RowsetState _rowset_state;
117
};
118
119
class Rowset : public std::enable_shared_from_this<Rowset> {
120
public:
121
    virtual ~Rowset();
122
123
    // Open all segment files in this rowset and load necessary metadata.
124
    // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
125
    //
126
    // May be called multiple times, subsequent calls will no-op.
127
    // Derived class implements the load logic by overriding the `do_load_once()` method.
128
    Status load(bool use_cache = true);
129
130
    // returns Status::Error<ErrorCode::ROWSET_CREATE_READER>() when failed to create reader
131
    virtual Status create_reader(std::shared_ptr<RowsetReader>* result) = 0;
132
133
3.86M
    const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
134
135
    void merge_rowset_meta(const RowsetMeta& other);
136
137
3
    bool is_pending() const { return _is_pending; }
138
139
939
    bool is_local() const { return _rowset_meta->is_local(); }
140
141
    // publish rowset to make it visible to read
142
    void make_visible(Version version);
143
128
    const TabletSchemaSPtr& tablet_schema() { return _schema; }
144
145
    // helper class to access RowsetMeta
146
885
    int64_t start_version() const { return rowset_meta()->version().first; }
147
4.78k
    int64_t end_version() const { return rowset_meta()->version().second; }
148
44
    size_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
149
86
    size_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
150
0
    size_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
151
0
    bool empty() const { return rowset_meta()->empty(); }
152
0
    bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; }
153
152
    size_t num_rows() const { return rowset_meta()->num_rows(); }
154
1.07M
    Version version() const { return rowset_meta()->version(); }
155
2.72M
    RowsetId rowset_id() const { return rowset_meta()->rowset_id(); }
156
4
    int64_t creation_time() const { return rowset_meta()->creation_time(); }
157
0
    PUniqueId load_id() const { return rowset_meta()->load_id(); }
158
0
    int64_t txn_id() const { return rowset_meta()->txn_id(); }
159
0
    int64_t partition_id() const { return rowset_meta()->partition_id(); }
160
    // flag for push delete rowset
161
0
    bool delete_flag() const { return rowset_meta()->delete_flag(); }
162
48.9k
    int64_t num_segments() const { return rowset_meta()->num_segments(); }
163
0
    void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); }
164
0
    RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
165
    // The writing time of the newest data in rowset, to measure the freshness of a rowset.
166
27
    int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
167
378
    bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
168
524
    KeysType keys_type() { return _schema->keys_type(); }
169
1
    RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
170
0
    bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); }
171
172
    // remove all files in this rowset
173
    // TODO should we rename the method to remove_files() to be more specific?
174
    virtual Status remove() = 0;
175
176
    // used for partial update, when publish, partial update may add a new rowset
177
    // and we should update rowset meta
178
    void merge_rowset_meta(const RowsetMetaSharedPtr& other);
179
180
    // close to clear the resource owned by rowset
181
    // including: open files, indexes and so on
182
    // NOTICE: can not call this function in multithreads
183
50
    void close() {
184
50
        RowsetState old_state = _rowset_state_machine.rowset_state();
185
50
        if (old_state != ROWSET_LOADED) {
186
50
            return;
187
50
        }
188
0
        Status st = Status::OK();
189
0
        {
190
0
            std::lock_guard close_lock(_lock);
191
0
            uint64_t current_refs = _refs_by_reader;
192
0
            old_state = _rowset_state_machine.rowset_state();
193
0
            if (old_state != ROWSET_LOADED) {
194
0
                return;
195
0
            }
196
0
            if (current_refs == 0) {
197
0
                do_close();
198
0
            }
199
0
            st = _rowset_state_machine.on_close(current_refs);
200
0
        }
201
0
        if (!st.ok()) {
202
0
            LOG(WARNING) << "state transition failed from:" << _rowset_state_machine.rowset_state();
203
0
            return;
204
0
        }
205
0
        VLOG_NOTICE << "rowset is close. rowset state from:" << old_state << " to "
206
0
                    << _rowset_state_machine.rowset_state() << ", version:" << start_version()
207
0
                    << "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id();
208
0
    }
209
210
    // hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`.
211
    virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
212
                                 size_t new_rowset_start_seg_id = 0,
213
                                 std::set<int64_t>* without_index_uids = nullptr) = 0;
214
215
    // copy all files to `dir`
216
    virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;
217
218
0
    virtual Status upload_to(io::RemoteFileSystem* dest_fs, const RowsetId& new_rowset_id) {
219
0
        return Status::OK();
220
0
    }
221
222
    virtual Status remove_old_files(std::vector<std::string>* files_to_remove) = 0;
223
224
    // return whether `path` is one of the files in this rowset
225
    virtual bool check_path(const std::string& path) = 0;
226
227
    virtual bool check_file_exist() = 0;
228
229
0
    bool need_delete_file() const { return _need_delete_file; }
230
231
50
    void set_need_delete_file() { _need_delete_file = true; }
232
233
0
    bool contains_version(Version version) const {
234
0
        return rowset_meta()->version().contains(version);
235
0
    }
236
237
1.95k
    static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
238
1.95k
        return left->end_version() < right->end_version();
239
1.95k
    }
240
241
    // this function is called by reader to increase reference of rowset
242
407
    void acquire() { ++_refs_by_reader; }
243
244
407
    void release() {
245
        // if the refs by reader is 0 and the rowset is closed, should release the resouce
246
407
        uint64_t current_refs = --_refs_by_reader;
247
407
        if (current_refs == 0 && _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
248
0
            {
249
0
                std::lock_guard release_lock(_lock);
250
                // rejudge _refs_by_reader because we do not add lock in create reader
251
0
                if (_refs_by_reader == 0 &&
252
0
                    _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
253
                    // first do close, then change state
254
0
                    do_close();
255
0
                    static_cast<void>(_rowset_state_machine.on_release());
256
0
                }
257
0
            }
258
0
            if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
259
0
                VLOG_NOTICE
260
0
                        << "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED"
261
0
                        << ", version:" << start_version() << "-" << end_version()
262
0
                        << ", tabletid:" << _rowset_meta->tablet_id();
263
0
            }
264
0
        }
265
407
    }
266
267
0
    void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) {
268
0
        if (delayed_expired_timestamp > _delayed_expired_timestamp) {
269
0
            _delayed_expired_timestamp = delayed_expired_timestamp;
270
0
        }
271
0
    }
272
273
0
    uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; }
274
275
24
    virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) {
276
24
        _rowset_meta->get_segments_key_bounds(segments_key_bounds);
277
24
        return Status::OK();
278
24
    }
279
280
    // min key of the first segment
281
101
    bool first_key(std::string* min_key) {
282
101
        KeyBoundsPB key_bounds;
283
101
        bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
284
101
        if (!ret) {
285
0
            return false;
286
0
        }
287
101
        *min_key = key_bounds.min_key();
288
101
        return true;
289
101
    }
290
291
    // max key of the last segment
292
53
    bool last_key(std::string* max_key) {
293
53
        KeyBoundsPB key_bounds;
294
53
        bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
295
53
        if (!ret) {
296
0
            return false;
297
0
        }
298
53
        *max_key = key_bounds.max_key();
299
53
        return true;
300
53
    }
301
302
    bool check_rowset_segment();
303
304
0
    [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
305
306
    // is skip index compaction this time
307
6
    bool is_skip_index_compaction(int32_t column_id) const {
308
6
        return skip_index_compaction.find(column_id) != skip_index_compaction.end();
309
6
    }
310
311
    // set skip index compaction next time
312
0
    void set_skip_index_compaction(int32_t column_id) { skip_index_compaction.insert(column_id); }
313
314
0
    virtual void clear_inverted_index_cache() { LOG(INFO) << "should not reach here"; }
315
    void clear_cache();
316
317
protected:
318
    friend class RowsetFactory;
319
320
    DISALLOW_COPY_AND_ASSIGN(Rowset);
321
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
322
    Rowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr& rowset_meta);
323
324
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
325
    virtual Status init() = 0;
326
327
    // The actual implementation of load(). Guaranteed by to called exactly once.
328
    virtual Status do_load(bool use_cache) = 0;
329
330
    // release resources in this api
331
    virtual void do_close() = 0;
332
333
    virtual bool check_current_rowset_segment() = 0;
334
335
    TabletSchemaSPtr _schema;
336
337
    RowsetMetaSharedPtr _rowset_meta;
338
    // init in constructor
339
    bool _is_pending;    // rowset is pending iff it's not in visible state
340
    bool _is_cumulative; // rowset is cumulative iff it's visible and start version < end version
341
342
    // mutex lock for load/close api because it is costly
343
    std::mutex _lock;
344
    bool _need_delete_file = false;
345
    // variable to indicate how many rowset readers owned this rowset
346
    std::atomic<uint64_t> _refs_by_reader;
347
    // rowset state machine
348
    RowsetStateMachine _rowset_state_machine;
349
    std::atomic<uint64_t> _delayed_expired_timestamp = 0;
350
351
    // <column_uniq_id>, skip index compaction
352
    std::set<int32_t> skip_index_compaction;
353
};
354
355
} // namespace doris