Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <gen_cpp/olap_file.pb.h>
22
23
#include "common/status.h"
24
#include "core/block/block.h"
25
#include "io/fs/file_reader_writer_fwd.h"
26
#include "storage/index/index_file_writer.h"
27
#include "storage/rowset/rowset_writer_context.h"
28
#include "storage/tablet/tablet_fwd.h"
29
30
namespace doris {
31
class Block;
32
33
namespace segment_v2 {
34
class SegmentWriter;
35
class VerticalSegmentWriter;
36
} // namespace segment_v2
37
38
struct SegmentStatistics;
39
class BetaRowsetWriter;
40
class SegmentFileCollection;
41
class InvertedIndexFileCollection;
42
43
class FileWriterCreator {
44
public:
45
985
    virtual ~FileWriterCreator() = default;
46
47
    virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
48
                          FileType file_type = FileType::SEGMENT_FILE) = 0;
49
50
    virtual Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) = 0;
51
};
52
53
template <class T>
54
    requires std::is_base_of_v<RowsetWriter, T>
55
class FileWriterCreatorT : public FileWriterCreator {
56
public:
57
985
    explicit FileWriterCreatorT(T* t) : _t(t) {}
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEEC2EPS1_
Line
Count
Source
57
985
    explicit FileWriterCreatorT(T* t) : _t(t) {}
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EEC2EPS1_
58
59
    Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
60
1.19k
                  FileType file_type = FileType::SEGMENT_FILE) override {
61
1.19k
        return _t->create_file_writer(segment_id, file_writer, file_type);
62
1.19k
    }
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEE6createEjRSt10unique_ptrINS_2io10FileWriterESt14default_deleteIS5_EENS_8FileTypeE
Line
Count
Source
60
1.19k
                  FileType file_type = FileType::SEGMENT_FILE) override {
61
1.19k
        return _t->create_file_writer(segment_id, file_writer, file_type);
62
1.19k
    }
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EE6createEjRSt10unique_ptrINS_2io10FileWriterESt14default_deleteIS5_EENS_8FileTypeE
63
64
228
    Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) override {
65
228
        return _t->create_index_file_writer(segment_id, file_writer);
66
228
    }
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEE6createEjPSt10unique_ptrINS_10segment_v215IndexFileWriterESt14default_deleteIS5_EE
Line
Count
Source
64
228
    Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) override {
65
228
        return _t->create_index_file_writer(segment_id, file_writer);
66
228
    }
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EE6createEjPSt10unique_ptrINS_10segment_v215IndexFileWriterESt14default_deleteIS5_EE
67
68
private:
69
    T* _t = nullptr;
70
};
71
72
class SegmentCollector {
73
public:
74
985
    virtual ~SegmentCollector() = default;
75
76
    virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0;
77
};
78
79
template <class T>
80
    requires std::is_base_of_v<RowsetWriter, T>
81
class SegmentCollectorT : public SegmentCollector {
82
public:
83
985
    explicit SegmentCollectorT(T* t) : _t(t) {}
_ZN5doris17SegmentCollectorTINS_20BaseBetaRowsetWriterEEC2EPS1_
Line
Count
Source
83
985
    explicit SegmentCollectorT(T* t) : _t(t) {}
Unexecuted instantiation: _ZN5doris17SegmentCollectorTINS_18BetaRowsetWriterV2EEC2EPS1_
84
85
1.19k
    Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
86
1.19k
        return _t->add_segment(segment_id, segstat);
87
1.19k
    }
_ZN5doris17SegmentCollectorTINS_20BaseBetaRowsetWriterEE3addEjRNS_17SegmentStatisticsE
Line
Count
Source
85
1.19k
    Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
86
1.19k
        return _t->add_segment(segment_id, segstat);
87
1.19k
    }
Unexecuted instantiation: _ZN5doris17SegmentCollectorTINS_18BetaRowsetWriterV2EE3addEjRNS_17SegmentStatisticsE
88
89
private:
90
    T* _t = nullptr;
91
};
92
93
class SegmentFlusher {
94
public:
95
    SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
96
                   InvertedIndexFileCollection& idx_files);
97
98
    ~SegmentFlusher();
99
100
    // Return the file size flushed to disk in "flush_size"
101
    // This method is thread-safe.
102
    Status flush_single_block(const Block* block, int32_t segment_id,
103
                              int64_t* flush_size = nullptr);
104
105
15
    int64_t num_rows_written() const { return _num_rows_written; }
106
107
    // for partial update
108
0
    int64_t num_rows_updated() const { return _num_rows_updated; }
109
0
    int64_t num_rows_deleted() const { return _num_rows_deleted; }
110
0
    int64_t num_rows_new_added() const { return _num_rows_new_added; }
111
0
    int64_t num_rows_filtered() const { return _num_rows_filtered; }
112
113
    Status close();
114
115
public:
116
    class Writer {
117
        friend class SegmentFlusher;
118
119
    public:
120
        ~Writer();
121
122
1.71k
        Status add_rows(const Block* block, size_t row_offset, size_t input_row_num) {
123
1.71k
            return _flusher->_add_rows(_writer, block, row_offset, input_row_num);
124
1.71k
        }
125
126
        Status flush();
127
128
        int64_t max_row_to_add(size_t row_avg_size_in_bytes);
129
130
    private:
131
        Writer(SegmentFlusher* flusher, std::unique_ptr<segment_v2::SegmentWriter>& segment_writer);
132
133
        SegmentFlusher* _flusher = nullptr;
134
        std::unique_ptr<segment_v2::SegmentWriter> _writer;
135
    };
136
137
    Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);
138
139
private:
140
    Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer, const Block* block,
141
                     size_t row_offset, size_t row_num);
142
    Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
143
                     const Block* block, size_t row_offset, size_t row_num);
144
    Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
145
                                  int32_t segment_id, bool no_compression = false);
146
    Status _create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
147
                                  int32_t segment_id, bool no_compression = false);
148
    Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
149
                                 int64_t* flush_size = nullptr);
150
    Status _flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
151
                                 int64_t* flush_size = nullptr);
152
153
private:
154
    RowsetWriterContext& _context;
155
    SegmentFileCollection& _seg_files;
156
    InvertedIndexFileCollection& _idx_files;
157
158
    // written rows by add_block/add_row
159
    std::atomic<int64_t> _num_rows_written = 0;
160
    std::atomic<int64_t> _num_rows_updated = 0;
161
    std::atomic<int64_t> _num_rows_new_added = 0;
162
    std::atomic<int64_t> _num_rows_deleted = 0;
163
    std::atomic<int64_t> _num_rows_filtered = 0;
164
};
165
166
class SegmentCreator {
167
public:
168
    SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
169
                   InvertedIndexFileCollection& idx_files);
170
171
985
    ~SegmentCreator() = default;
172
173
0
    void set_segment_start_id(uint32_t start_id) { _next_segment_id = start_id; }
174
175
    Status add_block(const Block* block);
176
177
    Status flush();
178
179
1.19k
    int32_t allocate_segment_id() { return _next_segment_id.fetch_add(1); }
180
181
7
    int32_t next_segment_id() const { return _next_segment_id.load(); }
182
183
15
    int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); }
184
185
    // for partial update
186
0
    int64_t num_rows_updated() const { return _segment_flusher.num_rows_updated(); }
187
0
    int64_t num_rows_deleted() const { return _segment_flusher.num_rows_deleted(); }
188
0
    int64_t num_rows_new_added() const { return _segment_flusher.num_rows_new_added(); }
189
0
    int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }
190
191
    // Flush a block into a single segment, with pre-allocated segment_id.
192
    // Return the file size flushed to disk in "flush_size"
193
    // This method is thread-safe.
194
    Status flush_single_block(const Block* block, int32_t segment_id,
195
                              int64_t* flush_size = nullptr);
196
197
    // Flush a block into a single segment, without pre-allocated segment_id.
198
    // This method is thread-safe.
199
0
    Status flush_single_block(const Block* block) {
200
0
        return flush_single_block(block, allocate_segment_id());
201
0
    }
202
203
    Status close();
204
205
private:
206
    std::atomic<int32_t> _next_segment_id = 0;
207
    SegmentFlusher _segment_flusher;
208
    std::unique_ptr<SegmentFlusher::Writer> _flush_writer;
209
};
210
211
} // namespace doris