Coverage Report

Created: 2026-03-13 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <fmt/format.h>
21
#include <gen_cpp/olap_common.pb.h>
22
#include <gen_cpp/olap_file.pb.h>
23
24
#include <atomic>
25
#include <condition_variable>
26
#include <map>
27
#include <memory>
28
#include <mutex>
29
#include <optional>
30
#include <roaring/roaring.hh>
31
#include <string>
32
#include <vector>
33
34
#include "common/status.h"
35
#include "io/fs/file_reader_writer_fwd.h"
36
#include "load/delta_writer/delta_writer.h"
37
#include "storage/index/index_file_writer.h"
38
#include "storage/olap_common.h"
39
#include "storage/rowset/rowset.h"
40
#include "storage/rowset/rowset_meta.h"
41
#include "storage/rowset/rowset_writer.h"
42
#include "storage/rowset/rowset_writer_context.h"
43
#include "storage/rowset/segment_creator.h"
44
#include "storage/segment/segment.h"
45
46
namespace doris {
47
class Block;
48
49
namespace segment_v2 {
50
class SegmentWriter;
51
} // namespace segment_v2
52
53
using SegCompactionCandidates = std::vector<segment_v2::SegmentSharedPtr>;
54
using SegCompactionCandidatesSharedPtr = std::shared_ptr<SegCompactionCandidates>;
55
56
class SegmentFileCollection {
57
public:
58
    ~SegmentFileCollection();
59
60
    Status add(int seg_id, io::FileWriterPtr&& writer);
61
62
    // Return `nullptr` if no file writer matches `seg_id`
63
    io::FileWriter* get(int seg_id) const;
64
65
    // Close all file writers
66
    Status close();
67
68
    // Get segments file size in segment id order.
69
    // `seg_id_offset` is the offset of the segment id relative to the subscript of `_file_writers`,
70
    // for more details, see `Tablet::create_transient_rowset_writer`.
71
    Result<std::vector<size_t>> segments_file_size(int seg_id_offset);
72
73
0
    const std::unordered_map<int, io::FileWriterPtr>& get_file_writers() const {
74
0
        return _file_writers;
75
0
    }
76
77
private:
78
    mutable std::mutex _lock;
79
    std::unordered_map<int /* seg_id */, io::FileWriterPtr> _file_writers;
80
    bool _closed {false};
81
};
82
83
class InvertedIndexFileCollection {
84
public:
85
    ~InvertedIndexFileCollection();
86
87
    // `seg_id` -> inverted index file writer
88
    Status add(int seg_id, IndexFileWriterPtr&& writer);
89
90
    // Close all file writers
91
    // If the inverted index file writer is not closed, an error will be thrown during destruction
92
    Status begin_close();
93
94
    // Wait for all inverted index file writers to be closed
95
    Status finish_close();
96
97
    // Get inverted index file info in segment id order.
98
    // `seg_id_offset` is the offset of the segment id relative to the subscript of `_inverted_index_file_writers`,
99
    // for more details, see `Tablet::create_transient_rowset_writer`.
100
    Result<std::vector<const InvertedIndexFileInfo*>> inverted_index_file_info(int seg_id_offset);
101
102
    // return all inverted index file writers
103
143
    std::unordered_map<int, IndexFileWriterPtr>& get_file_writers() {
104
143
        return _inverted_index_file_writers;
105
143
    }
106
107
322
    int64_t get_total_index_size() const { return _total_size; }
108
109
private:
110
    mutable std::mutex _lock;
111
    std::unordered_map<int /* seg_id */, IndexFileWriterPtr> _inverted_index_file_writers;
112
    int64_t _total_size = 0;
113
};
114
115
class BaseBetaRowsetWriter : public RowsetWriter {
116
public:
117
    BaseBetaRowsetWriter();
118
119
    ~BaseBetaRowsetWriter() override;
120
121
    Status init(const RowsetWriterContext& rowset_writer_context) override;
122
123
    Status add_block(const Block* block) override;
124
125
    // Declare these interface in `BaseBetaRowsetWriter`
126
    // add rowset by create hard link
127
    Status add_rowset(RowsetSharedPtr rowset) override;
128
    Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) override;
129
130
    Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer,
131
                              FileType file_type = FileType::SEGMENT_FILE) override;
132
133
    Status create_index_file_writer(uint32_t segment_id, IndexFileWriterPtr* writer) override;
134
135
    Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override;
136
137
    Status flush() override;
138
139
    Status flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) override;
140
141
    // Return the file size flushed to disk in "flush_size"
142
    // This method is thread-safe.
143
    Status flush_single_block(const Block* block) override;
144
145
    RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override;
146
147
12
    PUniqueId load_id() override { return _context.load_id; }
148
149
339
    Version version() override { return _context.version; }
150
151
15
    int64_t num_rows() const override { return _segment_creator.num_rows_written(); }
152
153
    // for partial update
154
0
    int64_t num_rows_updated() const override { return _segment_creator.num_rows_updated(); }
155
0
    int64_t num_rows_deleted() const override { return _segment_creator.num_rows_deleted(); }
156
0
    int64_t num_rows_new_added() const override { return _segment_creator.num_rows_new_added(); }
157
0
    int64_t num_rows_filtered() const override { return _segment_creator.num_rows_filtered(); }
158
159
287
    RowsetId rowset_id() override { return _context.rowset_id; }
160
161
15
    RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
162
163
5.62k
    Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const override {
164
5.62k
        std::lock_guard l(_segid_statistics_map_mutex);
165
5.62k
        *segment_num_rows = _segment_num_rows;
166
5.62k
        return Status::OK();
167
5.62k
    }
168
169
12
    int32_t allocate_segment_id() override { return _segment_creator.allocate_segment_id(); };
170
171
0
    void set_segment_start_id(int32_t start_id) override {
172
0
        _segment_creator.set_segment_start_id(start_id);
173
0
        _segment_start_id = start_id;
174
0
    }
175
176
15
    int64_t delete_bitmap_ns() override { return _delete_bitmap_ns; }
177
178
15
    int64_t segment_writer_ns() override { return _segment_writer_ns; }
179
180
0
    std::shared_ptr<PartialUpdateInfo> get_partial_update_info() override {
181
0
        return _context.partial_update_info;
182
0
    }
183
184
0
    bool is_partial_update() override {
185
0
        return _context.partial_update_info && _context.partial_update_info->is_partial_update();
186
0
    }
187
188
0
    const std::unordered_map<int, io::FileWriterPtr>& get_file_writers() const {
189
0
        return _seg_files.get_file_writers();
190
0
    }
191
192
143
    std::unordered_map<int, IndexFileWriterPtr>& index_file_writers() {
193
143
        return this->_idx_files.get_file_writers();
194
143
    }
195
196
private:
197
    // build a tmp rowset for load segment to calc delete_bitmap
198
    // for this segment
199
protected:
200
    Status _generate_delete_bitmap(int32_t segment_id);
201
    virtual Status _build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num = false);
202
    Status _create_file_writer(const std::string& path, io::FileWriterPtr& file_writer,
203
                               bool is_index_file = false);
204
    virtual Status _close_file_writers();
205
    virtual Status _check_segment_number_limit(size_t segnum);
206
    virtual int64_t _num_seg() const;
207
    // build a tmp rowset for load segment to calc delete_bitmap for this segment
208
    Status _build_tmp(RowsetSharedPtr& rowset_ptr);
209
210
0
    uint64_t get_rowset_num_rows() {
211
0
        std::lock_guard l(_segid_statistics_map_mutex);
212
0
        return std::accumulate(_segment_num_rows.begin(), _segment_num_rows.end(), uint64_t(0));
213
0
    }
214
    // Only during vertical compaction is this method called
215
    // Some index files are written during normal compaction and some files are written during index compaction.
216
    // After all index writes are completed, call this method to write the final compound index file.
217
322
    Status _close_inverted_index_file_writers() {
218
322
        RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.begin_close(),
219
322
                                       "failed to close index file when build new rowset");
220
322
        this->_total_index_size += _idx_files.get_total_index_size();
221
322
        RETURN_NOT_OK_STATUS_WITH_WARN(_idx_files.finish_close(),
222
322
                                       "failed to wait close index file when build new rowset");
223
322
        return Status::OK();
224
322
    }
225
226
    std::atomic<int32_t> _num_segment; // number of consecutive flushed segments
227
    roaring::Roaring _segment_set;     // bitmap set to record flushed segment id
228
    std::mutex _segment_set_mutex;     // mutex for _segment_set
229
    int32_t _segment_start_id;         // basic write start from 0, partial update may be different
230
231
    SegmentFileCollection _seg_files;
232
    InvertedIndexFileCollection _idx_files;
233
234
    // record rows number of every segment already written, using for rowid
235
    // conversion when compaction in unique key with MoW model
236
    std::vector<uint32_t> _segment_num_rows;
237
238
    // for unique key table with merge-on-write
239
    std::vector<KeyBoundsPB> _segments_encoded_key_bounds;
240
    std::optional<bool> _segments_key_bounds_truncated;
241
242
    // counters and statistics maintained during add_rowset
243
    std::atomic<int64_t> _num_rows_written;
244
    std::atomic<int64_t> _total_data_size;
245
    std::atomic<int64_t> _total_index_size;
246
    // TODO rowset Zonemap
247
248
    std::map<uint32_t, SegmentStatistics> _segid_statistics_map;
249
    mutable std::mutex _segid_statistics_map_mutex;
250
251
    bool _is_pending = false;
252
    bool _already_built = false;
253
254
    SegmentCreator _segment_creator;
255
256
    fmt::memory_buffer vlog_buffer;
257
258
    std::shared_ptr<MowContext> _mow_context;
259
    std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
260
261
    int64_t _delete_bitmap_ns = 0;
262
    int64_t _segment_writer_ns = 0;
263
};
264
265
class SegcompactionWorker;
266
267
// `StorageEngine` mixin for `BaseBetaRowsetWriter`
268
class BetaRowsetWriter : public BaseBetaRowsetWriter {
269
public:
270
    BetaRowsetWriter(StorageEngine& engine);
271
272
    ~BetaRowsetWriter() override;
273
274
    Status build(RowsetSharedPtr& rowset) override;
275
276
    Status init(const RowsetWriterContext& rowset_writer_context) override;
277
278
    Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) override;
279
280
    Status flush_segment_writer_for_segcompaction(
281
            std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size,
282
            KeyBoundsPB& key_bounds);
283
    Status create_segment_writer_for_segcompaction(
284
            std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end);
285
286
2.13k
    bool is_segcompacted() const { return _num_segcompacted > 0; }
287
288
private:
289
    // segment compaction
290
    friend class SegcompactionWorker;
291
    Status _close_file_writers() override;
292
    Status _check_segment_number_limit(size_t segnum) override;
293
    int64_t _num_seg() const override;
294
    Status _wait_flying_segcompaction();
295
    Status _segcompaction_if_necessary();
296
    Status _segcompaction_rename_last_segments();
297
    Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, int32_t segment_id);
298
    Status _find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr& segments);
299
    Status _rename_compacted_segments(int64_t begin, int64_t end);
300
    Status _rename_compacted_segment_plain(uint32_t seg_id);
301
    Status _rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id);
302
    Status _remove_segment_footer_cache(const uint32_t seg_id, const std::string& segment_path);
303
    void _clear_statistics_for_deleting_segments_unsafe(uint32_t begin, uint32_t end);
304
305
    StorageEngine& _engine;
306
307
    std::atomic<int32_t> _segcompacted_point {0}; // segemnts before this point have
308
                                                  // already been segment compacted
309
    std::atomic<int32_t> _num_segcompacted {0};   // index for segment compaction
310
311
    std::shared_ptr<SegcompactionWorker> _segcompaction_worker = nullptr;
312
313
    // ensure only one inflight segcompaction task for each rowset
314
    std::atomic<bool> _is_doing_segcompaction {false};
315
    // enforce condition variable on _is_doing_segcompaction
316
    std::mutex _is_doing_segcompaction_lock;
317
    std::condition_variable _segcompacting_cond;
318
319
    std::atomic<int> _segcompaction_status {ErrorCode::OK};
320
};
321
322
} // namespace doris