Coverage Report

Created: 2026-07-02 10:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/segment_creator.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <gen_cpp/olap_file.pb.h>
22
23
#include <mutex>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "io/fs/file_reader_writer_fwd.h"
29
#include "storage/index/index_file_writer.h"
30
#include "storage/rowset/rowset_writer_context.h"
31
#include "storage/segment/segment_index_file_cache_loader.h"
32
#include "storage/tablet/tablet_fwd.h"
33
34
namespace doris {
35
class Block;
36
37
namespace segment_v2 {
38
class SegmentWriter;
39
class VerticalSegmentWriter;
40
} // namespace segment_v2
41
42
struct SegmentStatistics;
43
class BetaRowsetWriter;
44
class SegmentFileCollection;
45
class InvertedIndexFileCollection;
46
47
class FileWriterCreator {
48
public:
49
1.02k
    virtual ~FileWriterCreator() = default;
50
51
    virtual Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
52
                          FileType file_type = FileType::SEGMENT_FILE) = 0;
53
54
    virtual Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) = 0;
55
};
56
57
template <class T>
58
    requires std::is_base_of_v<RowsetWriter, T>
59
class FileWriterCreatorT : public FileWriterCreator {
60
public:
61
1.02k
    explicit FileWriterCreatorT(T* t) : _t(t) {}
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEEC2EPS1_
Line
Count
Source
61
1.02k
    explicit FileWriterCreatorT(T* t) : _t(t) {}
_ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EEC2EPS1_
Line
Count
Source
61
1
    explicit FileWriterCreatorT(T* t) : _t(t) {}
62
63
    Status create(uint32_t segment_id, io::FileWriterPtr& file_writer,
64
1.29k
                  FileType file_type = FileType::SEGMENT_FILE) override {
65
1.29k
        return _t->create_file_writer(segment_id, file_writer, file_type);
66
1.29k
    }
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEE6createEjRSt10unique_ptrINS_2io10FileWriterESt14default_deleteIS5_EENS_8FileTypeE
Line
Count
Source
64
1.29k
                  FileType file_type = FileType::SEGMENT_FILE) override {
65
1.29k
        return _t->create_file_writer(segment_id, file_writer, file_type);
66
1.29k
    }
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EE6createEjRSt10unique_ptrINS_2io10FileWriterESt14default_deleteIS5_EENS_8FileTypeE
67
68
273
    Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) override {
69
273
        return _t->create_index_file_writer(segment_id, file_writer);
70
273
    }
_ZN5doris18FileWriterCreatorTINS_20BaseBetaRowsetWriterEE6createEjPSt10unique_ptrINS_10segment_v215IndexFileWriterESt14default_deleteIS5_EE
Line
Count
Source
68
273
    Status create(uint32_t segment_id, IndexFileWriterPtr* file_writer) override {
69
273
        return _t->create_index_file_writer(segment_id, file_writer);
70
273
    }
Unexecuted instantiation: _ZN5doris18FileWriterCreatorTINS_18BetaRowsetWriterV2EE6createEjPSt10unique_ptrINS_10segment_v215IndexFileWriterESt14default_deleteIS5_EE
71
72
private:
73
    T* _t = nullptr;
74
};
75
76
class SegmentCollector {
77
public:
78
1.02k
    virtual ~SegmentCollector() = default;
79
80
    virtual Status add(uint32_t segment_id, SegmentStatistics& segstat) = 0;
81
};
82
83
template <class T>
84
    requires std::is_base_of_v<RowsetWriter, T>
85
class SegmentCollectorT : public SegmentCollector {
86
public:
87
1.02k
    explicit SegmentCollectorT(T* t) : _t(t) {}
_ZN5doris17SegmentCollectorTINS_20BaseBetaRowsetWriterEEC2EPS1_
Line
Count
Source
87
1.02k
    explicit SegmentCollectorT(T* t) : _t(t) {}
_ZN5doris17SegmentCollectorTINS_18BetaRowsetWriterV2EEC2EPS1_
Line
Count
Source
87
1
    explicit SegmentCollectorT(T* t) : _t(t) {}
88
89
1.29k
    Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
90
1.29k
        return _t->add_segment(segment_id, segstat);
91
1.29k
    }
_ZN5doris17SegmentCollectorTINS_20BaseBetaRowsetWriterEE3addEjRNS_17SegmentStatisticsE
Line
Count
Source
89
1.29k
    Status add(uint32_t segment_id, SegmentStatistics& segstat) override {
90
1.29k
        return _t->add_segment(segment_id, segstat);
91
1.29k
    }
Unexecuted instantiation: _ZN5doris17SegmentCollectorTINS_18BetaRowsetWriterV2EE3addEjRNS_17SegmentStatisticsE
92
93
private:
94
    T* _t = nullptr;
95
};
96
97
class SegmentFlusher {
98
public:
99
    SegmentFlusher(RowsetWriterContext& context, SegmentFileCollection& seg_files,
100
                   InvertedIndexFileCollection& idx_files);
101
102
    ~SegmentFlusher();
103
104
    // Return the file size flushed to disk in "flush_size"
105
    // This method is thread-safe.
106
    Status flush_single_block(const Block* block, int32_t segment_id,
107
                              int64_t* flush_size = nullptr);
108
109
15
    int64_t num_rows_written() const { return _num_rows_written; }
110
111
    // for partial update
112
0
    int64_t num_rows_updated() const { return _num_rows_updated; }
113
0
    int64_t num_rows_deleted() const { return _num_rows_deleted; }
114
0
    int64_t num_rows_new_added() const { return _num_rows_new_added; }
115
0
    int64_t num_rows_filtered() const { return _num_rows_filtered; }
116
117
    Status close();
118
119
public:
120
    class Writer {
121
        friend class SegmentFlusher;
122
123
    public:
124
        ~Writer();
125
126
1.81k
        Status add_rows(const Block* block, size_t row_offset, size_t input_row_num) {
127
1.81k
            return _flusher->_add_rows(_writer, block, row_offset, input_row_num);
128
1.81k
        }
129
130
        Status flush();
131
132
        int64_t max_row_to_add(size_t row_avg_size_in_bytes);
133
134
    private:
135
        Writer(SegmentFlusher* flusher, std::unique_ptr<segment_v2::SegmentWriter>& segment_writer);
136
137
        SegmentFlusher* _flusher = nullptr;
138
        std::unique_ptr<segment_v2::SegmentWriter> _writer;
139
    };
140
141
    Status create_writer(std::unique_ptr<SegmentFlusher::Writer>& writer, uint32_t segment_id);
142
143
private:
144
    Status _add_rows(std::unique_ptr<segment_v2::SegmentWriter>& segment_writer, const Block* block,
145
                     size_t row_offset, size_t row_num);
146
    Status _add_rows(std::unique_ptr<segment_v2::VerticalSegmentWriter>& segment_writer,
147
                     const Block* block, size_t row_offset, size_t row_num);
148
    Status _create_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
149
                                  int32_t segment_id, bool no_compression = false);
150
    Status _create_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
151
                                  int32_t segment_id, bool no_compression = false);
152
    Status _flush_segment_writer(std::unique_ptr<segment_v2::SegmentWriter>& writer,
153
                                 int64_t* flush_size = nullptr);
154
    Status _flush_segment_writer(std::unique_ptr<segment_v2::VerticalSegmentWriter>& writer,
155
                                 int64_t* flush_size = nullptr);
156
    void _record_segment_index_file_cache_preload(
157
            uint32_t segment_id, const segment_v2::SegmentIndexFileCacheInfo& info);
158
    Status _preload_segment_indexes_to_file_cache();
159
160
private:
161
    RowsetWriterContext& _context;
162
    SegmentFileCollection& _seg_files;
163
    InvertedIndexFileCollection& _idx_files;
164
165
    // written rows by add_block/add_row
166
    std::atomic<int64_t> _num_rows_written = 0;
167
    std::atomic<int64_t> _num_rows_updated = 0;
168
    std::atomic<int64_t> _num_rows_new_added = 0;
169
    std::atomic<int64_t> _num_rows_deleted = 0;
170
    std::atomic<int64_t> _num_rows_filtered = 0;
171
    std::mutex _segment_index_file_cache_preloads_lock;
172
    std::vector<segment_v2::SegmentIndexFileCachePreloadTask> _segment_index_file_cache_preloads;
173
};
174
175
class SegmentCreator {
176
public:
177
    SegmentCreator(RowsetWriterContext& context, SegmentFileCollection& seg_files,
178
                   InvertedIndexFileCollection& idx_files);
179
180
1.02k
    ~SegmentCreator() = default;
181
182
0
    void set_segment_start_id(uint32_t start_id) { _next_segment_id = start_id; }
183
184
    Status add_block(const Block* block);
185
186
    Status flush();
187
188
1.29k
    int32_t allocate_segment_id() { return _next_segment_id.fetch_add(1); }
189
190
    // Return the next segment id to be allocated without advancing internal state.
191
0
    int32_t get_allocated_segment_id() const { return _next_segment_id.load(); }
192
193
12
    int32_t next_segment_id() const { return _next_segment_id.load(); }
194
195
15
    int64_t num_rows_written() const { return _segment_flusher.num_rows_written(); }
196
197
    // for partial update
198
0
    int64_t num_rows_updated() const { return _segment_flusher.num_rows_updated(); }
199
0
    int64_t num_rows_deleted() const { return _segment_flusher.num_rows_deleted(); }
200
0
    int64_t num_rows_new_added() const { return _segment_flusher.num_rows_new_added(); }
201
0
    int64_t num_rows_filtered() const { return _segment_flusher.num_rows_filtered(); }
202
203
    // Flush a block into a single segment, with pre-allocated segment_id.
204
    // Return the file size flushed to disk in "flush_size"
205
    // This method is thread-safe.
206
    Status flush_single_block(const Block* block, int32_t segment_id,
207
                              int64_t* flush_size = nullptr);
208
209
    // Flush a block into a single segment, without pre-allocated segment_id.
210
    // This method is thread-safe.
211
6
    Status flush_single_block(const Block* block) {
212
6
        return flush_single_block(block, allocate_segment_id());
213
6
    }
214
215
    Status close();
216
217
private:
218
    std::atomic<int32_t> _next_segment_id = 0;
219
    SegmentFlusher _segment_flusher;
220
    std::unique_ptr<SegmentFlusher::Writer> _flush_writer;
221
};
222
223
} // namespace doris