Coverage Report

Created: 2025-03-12 00:38

/root/doris/be/src/olap/rowset/rowset.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/olap_file.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
#include <stdint.h>
26
27
#include <atomic>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <string>
32
#include <vector>
33
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "olap/metadata_adder.h"
37
#include "olap/olap_common.h"
38
#include "olap/rowset/rowset_meta.h"
39
#include "olap/tablet_schema.h"
40
41
namespace doris {
42
43
class Rowset;
44
45
namespace io {
46
class RemoteFileSystem;
47
} // namespace io
48
49
using RowsetSharedPtr = std::shared_ptr<Rowset>;
50
class RowsetReader;
51
52
// the rowset state transfer graph:
53
//    ROWSET_UNLOADED    <--|
54
//          ↓               |
55
//    ROWSET_LOADED         |
56
//          ↓               |
57
//    ROWSET_UNLOADING   -->|
58
enum RowsetState {
59
    // state for new created rowset
60
    ROWSET_UNLOADED,
61
    // state after load() called
62
    ROWSET_LOADED,
63
    // state for closed() called but owned by some readers
64
    ROWSET_UNLOADING
65
};
66
67
class RowsetStateMachine {
68
public:
69
12.1k
    RowsetStateMachine() : _rowset_state(ROWSET_UNLOADED) {}
70
71
453
    Status on_load() {
72
453
        switch (_rowset_state) {
73
453
        case ROWSET_UNLOADED:
74
453
            _rowset_state = ROWSET_LOADED;
75
453
            break;
76
77
0
        default:
78
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
79
0
                    "RowsetStateMachine meet invalid state");
80
453
        }
81
453
        return Status::OK();
82
453
    }
83
84
1
    Status on_close(uint64_t refs_by_reader) {
85
1
        switch (_rowset_state) {
86
1
        case ROWSET_LOADED:
87
1
            if (refs_by_reader == 0) {
88
1
                _rowset_state = ROWSET_UNLOADED;
89
1
            } else {
90
0
                _rowset_state = ROWSET_UNLOADING;
91
0
            }
92
1
            break;
93
94
0
        default:
95
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
96
0
                    "RowsetStateMachine meet invalid state");
97
1
        }
98
1
        return Status::OK();
99
1
    }
100
101
0
    Status on_release() {
102
0
        switch (_rowset_state) {
103
0
        case ROWSET_UNLOADING:
104
0
            _rowset_state = ROWSET_UNLOADED;
105
0
            break;
106
107
0
        default:
108
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
109
0
                    "RowsetStateMachine meet invalid state");
110
0
        }
111
0
        return Status::OK();
112
0
    }
113
114
2.64k
    RowsetState rowset_state() { return _rowset_state; }
115
116
private:
117
    RowsetState _rowset_state;
118
};
119
120
class Rowset : public std::enable_shared_from_this<Rowset>, public MetadataAdder<Rowset> {
121
public:
122
    // Open all segment files in this rowset and load necessary metadata.
123
    // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
124
    //
125
    // May be called multiple times, subsequent calls will no-op.
126
    // Derived class implements the load logic by overriding the `do_load_once()` method.
127
    Status load(bool use_cache = true);
128
129
    // returns Status::Error<ErrorCode::ROWSET_CREATE_READER>() when failed to create reader
130
    virtual Status create_reader(std::shared_ptr<RowsetReader>* result) = 0;
131
132
3.90M
    const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
133
134
    void merge_rowset_meta(const RowsetMeta& other);
135
136
2
    bool is_pending() const { return _is_pending; }
137
138
10.1k
    bool is_local() const { return _rowset_meta->is_local(); }
139
140
644
    const std::string& tablet_path() const { return _tablet_path; }
141
142
    // publish rowset to make it visible to read
143
    void make_visible(Version version);
144
    void set_version(Version version);
145
2.70k
    const TabletSchemaSPtr& tablet_schema() const { return _schema; }
146
147
    // helper class to access RowsetMeta
148
1.60k
    int64_t start_version() const { return rowset_meta()->version().first; }
149
5.65k
    int64_t end_version() const { return rowset_meta()->version().second; }
150
64
    int64_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
151
89
    int64_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
152
15
    int64_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
153
0
    bool empty() const { return rowset_meta()->empty(); }
154
0
    bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; }
155
304
    size_t num_rows() const { return rowset_meta()->num_rows(); }
156
1.07M
    Version version() const { return rowset_meta()->version(); }
157
2.74M
    RowsetId rowset_id() const { return rowset_meta()->rowset_id(); }
158
8
    int64_t creation_time() const { return rowset_meta()->creation_time(); }
159
0
    PUniqueId load_id() const { return rowset_meta()->load_id(); }
160
0
    int64_t txn_id() const { return rowset_meta()->txn_id(); }
161
0
    int64_t partition_id() const { return rowset_meta()->partition_id(); }
162
    // flag for push delete rowset
163
0
    bool delete_flag() const { return rowset_meta()->delete_flag(); }
164
51.4k
    int64_t num_segments() const { return rowset_meta()->num_segments(); }
165
3
    void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); }
166
0
    RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
167
    // The writing time of the newest data in rowset, to measure the freshness of a rowset.
168
58
    int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
169
1.01k
    bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
170
1.15k
    KeysType keys_type() { return _schema->keys_type(); }
171
0
    RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
172
0
    bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); }
173
174
    // remove all files in this rowset
175
    // TODO should we rename the method to remove_files() to be more specific?
176
    virtual Status remove() = 0;
177
178
    // close to clear the resource owned by rowset
179
    // including: open files, indexes and so on
180
    // NOTICE: can not call this function in multithreads
181
51
    void close() {
182
51
        RowsetState old_state = _rowset_state_machine.rowset_state();
183
51
        if (old_state != ROWSET_LOADED) {
184
50
            return;
185
50
        }
186
1
        Status st = Status::OK();
187
1
        {
188
1
            std::lock_guard close_lock(_lock);
189
1
            uint64_t current_refs = _refs_by_reader;
190
1
            old_state = _rowset_state_machine.rowset_state();
191
1
            if (old_state != ROWSET_LOADED) {
192
0
                return;
193
0
            }
194
1
            if (current_refs == 0) {
195
1
                do_close();
196
1
            }
197
1
            st = _rowset_state_machine.on_close(current_refs);
198
1
        }
199
1
        if (!st.ok()) {
200
0
            LOG(WARNING) << "state transition failed from:" << _rowset_state_machine.rowset_state();
201
0
            return;
202
0
        }
203
1
        VLOG_NOTICE << "rowset is close. rowset state from:" << old_state << " to "
204
0
                    << _rowset_state_machine.rowset_state() << ", version:" << start_version()
205
0
                    << "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id();
206
1
    }
207
208
    // hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`.
209
    virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
210
                                 size_t new_rowset_start_seg_id = 0,
211
                                 std::set<int64_t>* without_index_uids = nullptr) = 0;
212
213
    virtual Status get_inverted_index_size(int64_t* index_size) = 0;
214
215
    // copy all files to `dir`
216
    virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;
217
218
    virtual Status upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) = 0;
219
220
    virtual Status remove_old_files(std::vector<std::string>* files_to_remove) = 0;
221
222
    virtual Status check_file_exist() = 0;
223
224
0
    bool need_delete_file() const { return _need_delete_file; }
225
226
51
    void set_need_delete_file() { _need_delete_file = true; }
227
228
0
    bool contains_version(Version version) const {
229
0
        return rowset_meta()->version().contains(version);
230
0
    }
231
232
2.03k
    static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
233
2.03k
        return left->end_version() < right->end_version();
234
2.03k
    }
235
236
    // this function is called by reader to increase reference of rowset
237
496
    void acquire() { ++_refs_by_reader; }
238
239
496
    void release() {
240
        // if the refs by reader is 0 and the rowset is closed, should release the resouce
241
496
        uint64_t current_refs = --_refs_by_reader;
242
496
        if (current_refs == 0 && _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
243
0
            {
244
0
                std::lock_guard release_lock(_lock);
245
                // rejudge _refs_by_reader because we do not add lock in create reader
246
0
                if (_refs_by_reader == 0 &&
247
0
                    _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
248
                    // first do close, then change state
249
0
                    do_close();
250
0
                    static_cast<void>(_rowset_state_machine.on_release());
251
0
                }
252
0
            }
253
0
            if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
254
0
                VLOG_NOTICE
255
0
                        << "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED"
256
0
                        << ", version:" << start_version() << "-" << end_version()
257
0
                        << ", tabletid:" << _rowset_meta->tablet_id();
258
0
            }
259
0
        }
260
496
    }
261
262
0
    void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) {
263
0
        if (delayed_expired_timestamp > _delayed_expired_timestamp) {
264
0
            _delayed_expired_timestamp = delayed_expired_timestamp;
265
0
        }
266
0
    }
267
268
0
    uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; }
269
270
28
    virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) {
271
28
        _rowset_meta->get_segments_key_bounds(segments_key_bounds);
272
28
        return Status::OK();
273
28
    }
274
275
    // min key of the first segment
276
104
    bool first_key(std::string* min_key) {
277
104
        KeyBoundsPB key_bounds;
278
104
        bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
279
104
        if (!ret) {
280
0
            return false;
281
0
        }
282
104
        *min_key = key_bounds.min_key();
283
104
        return true;
284
104
    }
285
286
    // max key of the last segment
287
56
    bool last_key(std::string* max_key) {
288
56
        KeyBoundsPB key_bounds;
289
56
        bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
290
56
        if (!ret) {
291
0
            return false;
292
0
        }
293
56
        *max_key = key_bounds.max_key();
294
56
        return true;
295
56
    }
296
297
    bool check_rowset_segment();
298
299
0
    [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
300
301
    // is skip index compaction this time
302
1.28k
    bool is_skip_index_compaction(int32_t column_id) const {
303
1.28k
        return skip_index_compaction.find(column_id) != skip_index_compaction.end();
304
1.28k
    }
305
306
    // set skip index compaction next time
307
1
    void set_skip_index_compaction(int32_t column_id) { skip_index_compaction.insert(column_id); }
308
309
    std::string get_rowset_info_str();
310
311
    void clear_cache();
312
313
    Result<std::string> segment_path(int64_t seg_id);
314
315
protected:
316
    friend class RowsetFactory;
317
318
    DISALLOW_COPY_AND_ASSIGN(Rowset);
319
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
320
    Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
321
           std::string tablet_path);
322
323
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
324
    virtual Status init() = 0;
325
326
    // release resources in this api
327
    virtual void do_close() = 0;
328
329
    virtual Status check_current_rowset_segment() = 0;
330
331
    virtual void clear_inverted_index_cache() = 0;
332
333
    TabletSchemaSPtr _schema;
334
335
    RowsetMetaSharedPtr _rowset_meta;
336
337
    // Local rowset requires a tablet path to obtain the absolute path on the local fs
338
    std::string _tablet_path;
339
340
    // init in constructor
341
    bool _is_pending;    // rowset is pending iff it's not in visible state
342
    bool _is_cumulative; // rowset is cumulative iff it's visible and start version < end version
343
344
    // mutex lock for load/close api because it is costly
345
    std::mutex _lock;
346
    bool _need_delete_file = false;
347
    // variable to indicate how many rowset readers owned this rowset
348
    std::atomic<uint64_t> _refs_by_reader;
349
    // rowset state machine
350
    RowsetStateMachine _rowset_state_machine;
351
    std::atomic<uint64_t> _delayed_expired_timestamp = 0;
352
353
    // <column_uniq_id>, skip index compaction
354
    std::set<int32_t> skip_index_compaction;
355
};
356
357
// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
358
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
359
360
} // namespace doris