Coverage Report

Created: 2026-06-24 12:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <gen_cpp/olap_file.pb.h>
22
23
#include "common/status.h"
24
#include "core/block/block.h"
25
#include "io/fs/file_reader_writer_fwd.h"
26
#include "storage/index/index_file_writer.h"
27
#include "storage/rowset/rowset_writer_context.h"
28
#include "storage/tablet/tablet_fwd.h"
29
30
namespace doris {
31
class Block;
32
33
namespace segment_v2 {
34
class SegmentWriter;
35
class VerticalSegmentWriter;
36
} // namespace segment_v2
37
38
struct SegmentStatistics;
39
class BetaRowsetWriter;
40
class SegmentFileCollection;
41
class InvertedIndexFileCollection;
42
43
class FileWriterCreator {
44
public:
45
904
    virtual ~FileWriterCreator() = default;
46
47
    virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
48
                          FileType file_type = FileType::SEGMENT_FILE) = 0;
49
50
    virtual Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) = 0;
51
};
52
53
template <class T>
54
    requires std::is_base_of_v<RowsetWriter, T>
55
class FileWriterCreatorT : public FileWriterCreator {
56
public:
57
904
    explicit FileWriterCreatorT(T* t) : _t(t) {}
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEEC2EPS1_
Line
Count
Source
57
903
    explicit FileWriterCreatorT(T* t) : _t(t) {}
_ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EEC2EPS1_
Line
Count
Source
57
1
    explicit FileWriterCreatorT(T* t) : _t(t) {}
58
59
    Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
60
1.20k
                  FileType file_type = FileType::SEGMENT_FILE) override {
61
1.20k
        return _t->create_file_writer(segment_id, file_writer, file_type);
62
1.20k
    }
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEE6createEjRSt10unique_ptrINS_2io10FileWriterESt14default_deleteIS5_EENS_8FileTypeE
Line
Count
Source
60
1.20k
                  FileType file_type = FileType::SEGMENT_FILE) override {
61
1.20k
        return _t->create_file_writer(segment_id, file_writer, file_type);
62
1.20k
    }
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EE6createEjRSt10unique_ptrINS_2io10FileWriterESt14default_deleteIS5_EENS_8FileTypeE
63
64
227
    Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) override {
65
227
        return _t->create_index_file_writer(segment_id, file_writer);
66
227
    }
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEE6createEjPSt10unique_ptrINS_10segment_v215IndexFileWriterESt14default_deleteIS5_EE
Line
Count
Source
64
227
    Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) override {
65
227
        return _t->create_index_file_writer(segment_id, file_writer);
66
227
    }
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EE6createEjPSt10unique_ptrINS_10segment_v215IndexFileWriterESt14default_deleteIS5_EE
67
68
private:
69
    T* _t = nullptr;
70
};
71
72
class SegmentCollector {
73
public:
74
904
    virtual ~SegmentCollector() = default;
75
76
    virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0;
77
};
78
79
template <class T>
80
    requires std::is_base_of_v<RowsetWriter, T>
81
class SegmentCollectorT : public SegmentCollector {
82
public:
83
904
    explicit SegmentCollectorT(T* t) : _t(t) {}
_ZN5doris17SegmentCollectorTINS_20BaseBetaRowsetWriterEEC2EPS1_
Line
Count
Source
83
903
    explicit SegmentCollectorT(T* t) : _t(t) {}
_ZN5doris17SegmentCollectorTINS_18BetaRowsetWriterV2EEC2EPS1_
Line
Count
Source
83
1
    explicit SegmentCollectorT(T* t) : _t(t) {}
84
85
1.20k
    Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
86
1.20k
        return _t->add_segment(segment_id, segstat);
87
1.20k
    }
_ZN5doris17SegmentCollectorTINS_20BaseBetaRowsetWriterEE3addEjRNS_17SegmentStatisticsE
Line
Count
Source
85
1.20k
    Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
86
1.20k
        return _t->add_segment(segment_id, segstat);
87
1.20k
    }
Unexecuted instantiation: _ZN5doris17SegmentCollectorTINS_18BetaRowsetWriterV2EE3addEjRNS_17SegmentStatisticsE
88
89
private:
90
    T* _t = nullptr;
91
};
92
93
class SegmentFlusher {
94
public:
95
    SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
96
                   InvertedIndexFileCollection& idx_files);
97
98
    ~SegmentFlusher();
99
100
    // Return the file size flushed to disk in "flush_size"
101
    // This method is thread-safe.
102
    Status flush_single_block(const Block* block, int32_t segment_id,
103
                              int64_t* flush_size = nullptr);
104
105
15
    int64_t num_rows_written() const { return _num_rows_written; }
106
107
    // for partial update
108
0
    int64_t num_rows_updated() const { return _num_rows_updated; }
109
0
    int64_t num_rows_deleted() const { return _num_rows_deleted; }
110
0
    int64_t num_rows_new_added() const { return _num_rows_new_added; }
111
0
    int64_t num_rows_filtered() const { return _num_rows_filtered; }
112
113
    Status close();
114
115
    class Writer {
116
        friend class SegmentFlusher;
117
118
    public:
119
        ~Writer();
120
121
1.71k
        Status add_rows(const Block* block, size_t row_offset, size_t input_row_num) {
122
1.71k
            return _flusher->_add_rows(_writer, block, row_offset, input_row_num);
123
1.71k
        }
124
125
        Status flush();
126
127
        int64_t max_row_to_add(size_t row_avg_size_in_bytes);
128
129
    private:
130
        Writer(SegmentFlusher* flusher, std::unique_ptr<segment_v2::SegmentWriter>& segment_writer);
131
132
        SegmentFlusher* _flusher = nullptr;
133
        std::unique_ptr<segment_v2::SegmentWriter> _writer;
134
    };
135
136
    Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);
137
138
private:
139
    Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer, const Block* block,
140
                     size_t row_offset, size_t row_num);
141
    Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
142
                     const Block* block, size_t row_offset, size_t row_num);
143
    Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
144
                                  int32_t segment_id, bool no_compression = false);
145
    Status _create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
146
                                  int32_t segment_id, bool no_compression = false);
147
    Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
148
                                 int64_t* flush_size = nullptr);
149
    Status _flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
150
                                 int64_t* flush_size = nullptr);
151
152
    RowsetWriterContext& _context;
153
    SegmentFileCollection& _seg_files;
154
    InvertedIndexFileCollection& _idx_files;
155
156
    // written rows by add_block/add_row
157
    std::atomic<int64_t> _num_rows_written = 0;
158
    std::atomic<int64_t> _num_rows_updated = 0;
159
    std::atomic<int64_t> _num_rows_new_added = 0;
160
    std::atomic<int64_t> _num_rows_deleted = 0;
161
    std::atomic<int64_t> _num_rows_filtered = 0;
162
};
163
164
class SegmentCreator {
165
public:
166
    SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
167
                   InvertedIndexFileCollection& idx_files);
168
169
904
    ~SegmentCreator() = default;
170
171
0
    void set_segment_start_id(uint32_t start_id) { _next_segment_id = start_id; }
172
173
    Status add_block(const Block* block);
174
175
    Status flush();
176
177
1.20k
    int32_t allocate_segment_id() { return _next_segment_id.fetch_add(1); }
178
179
    // Return the next segment id to be allocated without advancing internal state.
180
0
    int32_t get_allocated_segment_id() const { return _next_segment_id.load(); }
181
182
12
    int32_t next_segment_id() const { return _next_segment_id.load(); }
183
184
15
    int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); }
185
186
    // for partial update
187
0
    int64_t num_rows_updated() const { return _segment_flusher.num_rows_updated(); }
188
0
    int64_t num_rows_deleted() const { return _segment_flusher.num_rows_deleted(); }
189
0
    int64_t num_rows_new_added() const { return _segment_flusher.num_rows_new_added(); }
190
0
    int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }
191
192
    // Flush a block into a single segment, with pre-allocated segment_id.
193
    // Return the file size flushed to disk in "flush_size"
194
    // This method is thread-safe.
195
    Status flush_single_block(const Block* block, int32_t segment_id,
196
                              int64_t* flush_size = nullptr);
197
198
    // Flush a block into a single segment, without pre-allocated segment_id.
199
    // This method is thread-safe.
200
4
    Status flush_single_block(const Block* block) {
201
4
        return flush_single_block(block, allocate_segment_id());
202
4
    }
203
204
    Status close();
205
206
private:
207
    std::atomic<int32_t> _next_segment_id = 0;
208
    SegmentFlusher _segment_flusher;
209
    std::unique_ptr<SegmentFlusher::Writer> _flush_writer;
210
};
211
212
} // namespace doris