Coverage Report

Created: 2024-11-22 16:10

/root/doris/be/src/olap/rowset/rowset.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <butil/macros.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/olap_file.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
#include <stdint.h>
26
27
#include <atomic>
28
#include <memory>
29
#include <mutex>
30
#include <ostream>
31
#include <string>
32
#include <vector>
33
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "olap/olap_common.h"
37
#include "olap/rowset/rowset_meta.h"
38
#include "olap/tablet_schema.h"
39
40
namespace doris {
41
42
class Rowset;
43
44
namespace io {
45
class RemoteFileSystem;
46
} // namespace io
47
48
using RowsetSharedPtr = std::shared_ptr<Rowset>;
49
class RowsetReader;
50
51
// the rowset state transfer graph:
52
//    ROWSET_UNLOADED    <--|
53
//          ↓               |
54
//    ROWSET_LOADED         |
55
//          ↓               |
56
//    ROWSET_UNLOADING   -->|
57
enum RowsetState {
58
    // state for new created rowset
59
    ROWSET_UNLOADED,
60
    // state after load() called
61
    ROWSET_LOADED,
62
    // state for closed() called but owned by some readers
63
    ROWSET_UNLOADING
64
};
65
66
class RowsetStateMachine {
67
public:
68
1.69k
    RowsetStateMachine() : _rowset_state(ROWSET_UNLOADED) {}
69
70
398
    Status on_load() {
71
398
        switch (_rowset_state) {
72
398
        case ROWSET_UNLOADED:
73
398
            _rowset_state = ROWSET_LOADED;
74
398
            break;
75
76
0
        default:
77
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
78
0
                    "RowsetStateMachine meet invalid state");
79
398
        }
80
398
        return Status::OK();
81
398
    }
82
83
1
    Status on_close(uint64_t refs_by_reader) {
84
1
        switch (_rowset_state) {
85
1
        case ROWSET_LOADED:
86
1
            if (refs_by_reader == 0) {
87
1
                _rowset_state = ROWSET_UNLOADED;
88
1
            } else {
89
0
                _rowset_state = ROWSET_UNLOADING;
90
0
            }
91
1
            break;
92
93
0
        default:
94
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
95
0
                    "RowsetStateMachine meet invalid state");
96
1
        }
97
1
        return Status::OK();
98
1
    }
99
100
0
    Status on_release() {
101
0
        switch (_rowset_state) {
102
0
        case ROWSET_UNLOADING:
103
0
            _rowset_state = ROWSET_UNLOADED;
104
0
            break;
105
106
0
        default:
107
0
            return Status::Error<ErrorCode::ROWSET_INVALID_STATE_TRANSITION>(
108
0
                    "RowsetStateMachine meet invalid state");
109
0
        }
110
0
        return Status::OK();
111
0
    }
112
113
1.36k
    RowsetState rowset_state() { return _rowset_state; }
114
115
private:
116
    RowsetState _rowset_state;
117
};
118
119
class Rowset : public std::enable_shared_from_this<Rowset> {
120
public:
121
    virtual ~Rowset();
122
123
    // Open all segment files in this rowset and load necessary metadata.
124
    // - `use_cache` : whether to use fd cache, only applicable to alpha rowset now
125
    //
126
    // May be called multiple times, subsequent calls will no-op.
127
    // Derived class implements the load logic by overriding the `do_load_once()` method.
128
    Status load(bool use_cache = true);
129
130
    // returns Status::Error<ErrorCode::ROWSET_CREATE_READER>() when failed to create reader
131
    virtual Status create_reader(std::shared_ptr<RowsetReader>* result) = 0;
132
133
3.41M
    const RowsetMetaSharedPtr& rowset_meta() const { return _rowset_meta; }
134
135
    void merge_rowset_meta(const RowsetMeta& other);
136
137
2
    bool is_pending() const { return _is_pending; }
138
139
5.90k
    bool is_local() const { return _rowset_meta->is_local(); }
140
141
523
    const std::string& tablet_path() const { return _tablet_path; }
142
143
    // publish rowset to make it visible to read
144
    void make_visible(Version version);
145
    void set_version(Version version);
146
89
    const TabletSchemaSPtr& tablet_schema() const { return _schema; }
147
148
    // helper class to access RowsetMeta
149
962
    int64_t start_version() const { return rowset_meta()->version().first; }
150
4.97k
    int64_t end_version() const { return rowset_meta()->version().second; }
151
77
    size_t index_disk_size() const { return rowset_meta()->index_disk_size(); }
152
102
    size_t data_disk_size() const { return rowset_meta()->data_disk_size(); }
153
28
    size_t total_disk_size() const { return rowset_meta()->total_disk_size(); }
154
0
    bool empty() const { return rowset_meta()->empty(); }
155
0
    bool zero_num_rows() const { return rowset_meta()->num_rows() == 0; }
156
157
    size_t num_rows() const { return rowset_meta()->num_rows(); }
157
1.03M
    Version version() const { return rowset_meta()->version(); }
158
2.35M
    RowsetId rowset_id() const { return rowset_meta()->rowset_id(); }
159
4
    int64_t creation_time() const { return rowset_meta()->creation_time(); }
160
0
    PUniqueId load_id() const { return rowset_meta()->load_id(); }
161
0
    int64_t txn_id() const { return rowset_meta()->txn_id(); }
162
0
    int64_t partition_id() const { return rowset_meta()->partition_id(); }
163
    // flag for push delete rowset
164
0
    bool delete_flag() const { return rowset_meta()->delete_flag(); }
165
8.32k
    int64_t num_segments() const { return rowset_meta()->num_segments(); }
166
0
    void to_rowset_pb(RowsetMetaPB* rs_meta) const { return rowset_meta()->to_rowset_pb(rs_meta); }
167
0
    RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); }
168
    // The writing time of the newest data in rowset, to measure the freshness of a rowset.
169
13
    int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); }
170
380
    bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); }
171
526
    KeysType keys_type() { return _schema->keys_type(); }
172
0
    RowsetStatePB rowset_meta_state() const { return rowset_meta()->rowset_state(); }
173
0
    bool produced_by_compaction() const { return rowset_meta()->produced_by_compaction(); }
174
175
    // remove all files in this rowset
176
    // TODO should we rename the method to remove_files() to be more specific?
177
    virtual Status remove() = 0;
178
179
    // close to clear the resource owned by rowset
180
    // including: open files, indexes and so on
181
    // NOTICE: can not call this function in multithreads
182
39
    void close() {
183
39
        RowsetState old_state = _rowset_state_machine.rowset_state();
184
39
        if (old_state != ROWSET_LOADED) {
185
38
            return;
186
38
        }
187
1
        Status st = Status::OK();
188
1
        {
189
1
            std::lock_guard close_lock(_lock);
190
1
            uint64_t current_refs = _refs_by_reader;
191
1
            old_state = _rowset_state_machine.rowset_state();
192
1
            if (old_state != ROWSET_LOADED) {
193
0
                return;
194
0
            }
195
1
            if (current_refs == 0) {
196
1
                do_close();
197
1
            }
198
1
            st = _rowset_state_machine.on_close(current_refs);
199
1
        }
200
1
        if (!st.ok()) {
201
0
            LOG(WARNING) << "state transition failed from:" << _rowset_state_machine.rowset_state();
202
0
            return;
203
0
        }
204
1
        VLOG_NOTICE << "rowset is close. rowset state from:" << old_state << " to "
205
0
                    << _rowset_state_machine.rowset_state() << ", version:" << start_version()
206
0
                    << "-" << end_version() << ", tabletid:" << _rowset_meta->tablet_id();
207
1
    }
208
209
    // hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`.
210
    virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
211
                                 size_t new_rowset_start_seg_id = 0,
212
                                 std::set<int64_t>* without_index_uids = nullptr) = 0;
213
214
    // copy all files to `dir`
215
    virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0;
216
217
    virtual Status upload_to(const StorageResource& dest_fs, const RowsetId& new_rowset_id) = 0;
218
219
    virtual Status remove_old_files(std::vector<std::string>* files_to_remove) = 0;
220
221
    virtual Status check_file_exist() = 0;
222
223
0
    bool need_delete_file() const { return _need_delete_file; }
224
225
39
    void set_need_delete_file() { _need_delete_file = true; }
226
227
0
    bool contains_version(Version version) const {
228
0
        return rowset_meta()->version().contains(version);
229
0
    }
230
231
2.03k
    static bool comparator(const RowsetSharedPtr& left, const RowsetSharedPtr& right) {
232
2.03k
        return left->end_version() < right->end_version();
233
2.03k
    }
234
235
    // this function is called by reader to increase reference of rowset
236
394
    void acquire() { ++_refs_by_reader; }
237
238
394
    void release() {
239
        // if the refs by reader is 0 and the rowset is closed, should release the resouce
240
394
        uint64_t current_refs = --_refs_by_reader;
241
394
        if (current_refs == 0 && _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
242
0
            {
243
0
                std::lock_guard release_lock(_lock);
244
                // rejudge _refs_by_reader because we do not add lock in create reader
245
0
                if (_refs_by_reader == 0 &&
246
0
                    _rowset_state_machine.rowset_state() == ROWSET_UNLOADING) {
247
                    // first do close, then change state
248
0
                    do_close();
249
0
                    static_cast<void>(_rowset_state_machine.on_release());
250
0
                }
251
0
            }
252
0
            if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
253
0
                VLOG_NOTICE
254
0
                        << "close the rowset. rowset state from ROWSET_UNLOADING to ROWSET_UNLOADED"
255
0
                        << ", version:" << start_version() << "-" << end_version()
256
0
                        << ", tabletid:" << _rowset_meta->tablet_id();
257
0
            }
258
0
        }
259
394
    }
260
261
0
    void update_delayed_expired_timestamp(uint64_t delayed_expired_timestamp) {
262
0
        if (delayed_expired_timestamp > _delayed_expired_timestamp) {
263
0
            _delayed_expired_timestamp = delayed_expired_timestamp;
264
0
        }
265
0
    }
266
267
0
    uint64_t delayed_expired_timestamp() { return _delayed_expired_timestamp; }
268
269
9
    virtual Status get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) {
270
9
        _rowset_meta->get_segments_key_bounds(segments_key_bounds);
271
9
        return Status::OK();
272
9
    }
273
274
    // min key of the first segment
275
101
    bool first_key(std::string* min_key) {
276
101
        KeyBoundsPB key_bounds;
277
101
        bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
278
101
        if (!ret) {
279
0
            return false;
280
0
        }
281
101
        *min_key = key_bounds.min_key();
282
101
        return true;
283
101
    }
284
285
    // max key of the last segment
286
53
    bool last_key(std::string* max_key) {
287
53
        KeyBoundsPB key_bounds;
288
53
        bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
289
53
        if (!ret) {
290
0
            return false;
291
0
        }
292
53
        *max_key = key_bounds.max_key();
293
53
        return true;
294
53
    }
295
296
    bool check_rowset_segment();
297
298
0
    [[nodiscard]] virtual Status add_to_binlog() { return Status::OK(); }
299
300
    // is skip index compaction this time
301
10
    bool is_skip_index_compaction(int32_t column_id) const {
302
10
        return skip_index_compaction.find(column_id) != skip_index_compaction.end();
303
10
    }
304
305
    // set skip index compaction next time
306
0
    void set_skip_index_compaction(int32_t column_id) { skip_index_compaction.insert(column_id); }
307
308
    std::string get_rowset_info_str();
309
310
    void clear_cache();
311
312
    Result<std::string> segment_path(int64_t seg_id);
313
314
protected:
315
    friend class RowsetFactory;
316
317
    DISALLOW_COPY_AND_ASSIGN(Rowset);
318
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
319
    Rowset(const TabletSchemaSPtr& schema, RowsetMetaSharedPtr rowset_meta,
320
           std::string tablet_path);
321
322
    // this is non-public because all clients should use RowsetFactory to obtain pointer to initialized Rowset
323
    virtual Status init() = 0;
324
325
    // The actual implementation of load(). Guaranteed by to called exactly once.
326
    virtual Status do_load(bool use_cache) = 0;
327
328
    // release resources in this api
329
    virtual void do_close() = 0;
330
331
    virtual Status check_current_rowset_segment() = 0;
332
333
    virtual void clear_inverted_index_cache() = 0;
334
335
    TabletSchemaSPtr _schema;
336
337
    RowsetMetaSharedPtr _rowset_meta;
338
339
    // Local rowset requires a tablet path to obtain the absolute path on the local fs
340
    std::string _tablet_path;
341
342
    // init in constructor
343
    bool _is_pending;    // rowset is pending iff it's not in visible state
344
    bool _is_cumulative; // rowset is cumulative iff it's visible and start version < end version
345
346
    // mutex lock for load/close api because it is costly
347
    std::mutex _lock;
348
    bool _need_delete_file = false;
349
    // variable to indicate how many rowset readers owned this rowset
350
    std::atomic<uint64_t> _refs_by_reader;
351
    // rowset state machine
352
    RowsetStateMachine _rowset_state_machine;
353
    std::atomic<uint64_t> _delayed_expired_timestamp = 0;
354
355
    // <column_uniq_id>, skip index compaction
356
    std::set<int32_t> skip_index_compaction;
357
};
358
359
// `rs_metas` MUST already be sorted by `RowsetMeta::comparator`
360
Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets);
361
362
} // namespace doris