Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/rowset_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <gen_cpp/olap_file.pb.h>
22
#include <gen_cpp/types.pb.h>
23
24
#include <functional>
25
#include <memory>
26
#include <optional>
27
28
#include "common/factory_creator.h"
29
#include "core/block/block.h"
30
#include "storage/index/index_file_writer.h"
31
#include "storage/olap_define.h"
32
#include "storage/rowset/rowset.h"
33
#include "storage/rowset/rowset_writer_context.h"
34
#include "storage/schema_change/column_mapping.h"
35
#include "storage/tablet/tablet_fwd.h"
36
#include "storage/tablet/tablet_schema.h"
37
38
namespace doris {
39
40
struct SegmentStatistics {
41
    int64_t row_num;
42
    int64_t data_size;
43
    int64_t index_size;
44
    KeyBoundsPB key_bounds;
45
46
59.9k
    SegmentStatistics() = default;
47
48
    SegmentStatistics(SegmentStatisticsPB pb)
49
42
            : row_num(pb.row_num()),
50
42
              data_size(pb.data_size()),
51
42
              index_size(pb.index_size()),
52
42
              key_bounds(pb.key_bounds()) {}
53
54
42
    void to_pb(SegmentStatisticsPB* segstat_pb) const {
55
42
        segstat_pb->set_row_num(row_num);
56
42
        segstat_pb->set_data_size(data_size);
57
42
        segstat_pb->set_index_size(index_size);
58
42
        segstat_pb->mutable_key_bounds()->CopyFrom(key_bounds);
59
42
    }
60
61
0
    std::string to_string() {
62
0
        std::stringstream ss;
63
0
        ss << "row_num: " << row_num << ", data_size: " << data_size
64
0
           << ", index_size: " << index_size << ", key_bounds: " << key_bounds.ShortDebugString();
65
0
        return ss.str();
66
0
    }
67
};
68
using SegmentStatisticsSharedPtr = std::shared_ptr<SegmentStatistics>;
69
70
class RowsetWriter {
71
public:
72
208k
    RowsetWriter() = default;
73
209k
    virtual ~RowsetWriter() = default;
74
75
    virtual Status init(const RowsetWriterContext& rowset_writer_context) = 0;
76
77
0
    virtual Status add_block(const Block* block) {
78
0
        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
79
0
                "RowsetWriter not support add_block");
80
0
    }
81
    virtual Status add_columns(const Block* block, const std::vector<uint32_t>& col_ids,
82
0
                               bool is_key, uint32_t max_rows_per_segment, bool has_cluster_key) {
83
0
        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
84
0
                "RowsetWriter not support add_columns");
85
0
    }
86
87
    // Precondition: the input `rowset` should have the same type of the rowset we're building
88
    virtual Status add_rowset(RowsetSharedPtr rowset) = 0;
89
90
    // Precondition: the input `rowset` should have the same type of the rowset we're building
91
    virtual Status add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) = 0;
92
93
    virtual Status create_file_writer(uint32_t segment_id, io::FileWriterPtr& writer,
94
0
                                      FileType file_type = FileType::SEGMENT_FILE) {
95
0
        return Status::NotSupported("RowsetWriter does not support create_file_writer");
96
0
    }
97
98
    virtual Status create_index_file_writer(uint32_t segment_id,
99
6.10k
                                            IndexFileWriterPtr* index_file_writer) {
100
        // Create file writer for the inverted index format v2.
101
6.10k
        io::FileWriterPtr idx_file_v2_ptr;
102
6.10k
        if (_context.tablet_schema->get_inverted_index_storage_format() !=
103
6.10k
            InvertedIndexStorageFormatPB::V1) {
104
5.96k
            RETURN_IF_ERROR(
105
5.96k
                    create_file_writer(segment_id, idx_file_v2_ptr, FileType::INVERTED_INDEX_FILE));
106
5.96k
        }
107
6.10k
        std::string segment_prefix {InvertedIndexDescriptor::get_index_file_path_prefix(
108
6.10k
                _context.segment_path(segment_id))};
109
        // default to true, only when base compaction, we need to check the config
110
6.10k
        bool can_use_ram_dir = true;
111
6.10k
        if (_context.compaction_type == ReaderType::READER_BASE_COMPACTION) {
112
47
            can_use_ram_dir = config::inverted_index_ram_dir_enable_when_base_compaction;
113
47
        }
114
6.10k
        *index_file_writer = std::make_unique<IndexFileWriter>(
115
6.10k
                _context.fs(), segment_prefix, _context.rowset_id.to_string(), segment_id,
116
6.10k
                _context.tablet_schema->get_inverted_index_storage_format(),
117
6.10k
                std::move(idx_file_v2_ptr), can_use_ram_dir);
118
6.10k
        return Status::OK();
119
6.10k
    }
120
121
    // explicit flush all buffered rows into segment file.
122
    // note that `add_row` could also trigger flush when certain conditions are met
123
    virtual Status flush() = 0;
124
0
    virtual Status flush_columns(bool is_key) {
125
0
        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
126
0
                "RowsetWriter not support flush_columns");
127
0
    }
128
0
    virtual Status final_flush() {
129
0
        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
130
0
                "RowsetWriter not support final_flush");
131
0
    }
132
133
0
    virtual Status flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) {
134
0
        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
135
0
                "RowsetWriter not support flush_memtable");
136
0
    }
137
138
0
    virtual Status flush_single_block(const Block* block) {
139
0
        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(
140
0
                "RowsetWriter not support flush_single_block");
141
0
    }
142
143
0
    virtual Status add_segment(uint32_t segment_id, const SegmentStatistics& segstat) {
144
0
        return Status::NotSupported("RowsetWriter does not support add_segment");
145
0
    }
146
147
    // finish building and set rowset pointer to the built rowset (guaranteed to be inited).
148
    // rowset is invalid if returned Status is not OK
149
    virtual Status build(RowsetSharedPtr& rowset) = 0;
150
151
    // For ordered rowset compaction, manual build rowset
152
    virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) = 0;
153
154
    virtual PUniqueId load_id() = 0;
155
156
    virtual Version version() = 0;
157
158
    virtual int64_t num_rows() const = 0;
159
160
    virtual int64_t num_rows_updated() const = 0;
161
    virtual int64_t num_rows_deleted() const = 0;
162
    virtual int64_t num_rows_new_added() const = 0;
163
    virtual int64_t num_rows_filtered() const = 0;
164
165
    virtual RowsetId rowset_id() = 0;
166
167
    virtual RowsetTypePB type() const = 0;
168
169
0
    virtual Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows) const {
170
0
        return Status::NotSupported("to be implemented");
171
0
    }
172
173
    virtual int32_t allocate_segment_id() = 0;
174
175
0
    virtual void set_segment_start_id(int num_segment) {
176
0
        throw Exception(Status::FatalError("not supported!"));
177
0
    }
178
179
0
    virtual int64_t delete_bitmap_ns() { return 0; }
180
181
0
    virtual int64_t segment_writer_ns() { return 0; }
182
183
    virtual std::shared_ptr<PartialUpdateInfo> get_partial_update_info() = 0;
184
185
    virtual bool is_partial_update() = 0;
186
187
22.1k
    const RowsetWriterContext& context() { return _context; }
188
189
393k
    const RowsetMetaSharedPtr& rowset_meta() { return _rowset_meta; }
190
191
private:
192
    DISALLOW_COPY_AND_ASSIGN(RowsetWriter);
193
194
protected:
195
    RowsetWriterContext _context;
196
    RowsetMetaSharedPtr _rowset_meta;
197
};
198
199
} // namespace doris