Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/partial_update_info.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <gen_cpp/olap_file.pb.h>
20
21
#include <cstdint>
22
#include <functional>
23
#include <map>
24
#include <set>
25
#include <string>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "core/column/column.h"
30
#include "storage/rowset/rowset_fwd.h"
31
#include "storage/tablet/tablet_fwd.h"
32
33
namespace doris {
34
class TabletSchema;
35
class PartialUpdateInfoPB;
36
class BitmapValue;
37
struct RowLocation;
38
class Block;
39
class MutableBlock;
40
class IOlapColumnDataAccessor;
41
42
struct RowsetWriterContext;
43
struct RowsetId;
44
class BitmapValue;
45
namespace segment_v2 {
46
class VerticalSegmentWriter;
47
}
48
49
class SegmentCacheHandle;
50
51
struct PartialUpdateInfo {
52
    Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
53
                UniqueKeyUpdateModePB unique_key_update_mode, PartialUpdateNewRowPolicyPB policy,
54
                const std::set<std::string>& partial_update_cols, bool is_strict_mode,
55
                int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone,
56
                const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1,
57
                int64_t cur_max_version = -1);
58
    void to_pb(PartialUpdateInfoPB* partial_update_info) const;
59
    void from_pb(PartialUpdateInfoPB* partial_update_info);
60
    Status handle_new_key(const TabletSchema& tablet_schema,
61
                          const std::function<std::string()>& line,
62
                          BitmapValue* skip_bitmap = nullptr);
63
    std::string summary() const;
64
65
3.18k
    std::string partial_update_mode_str() const {
66
3.18k
        switch (partial_update_mode) {
67
0
        case UniqueKeyUpdateModePB::UPSERT:
68
0
            return "upsert";
69
2.68k
        case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS:
70
2.68k
            return "partial update";
71
518
        case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
72
518
            return "flexible partial update";
73
3.18k
        }
74
0
        return "";
75
3.18k
    }
76
675k
    bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; }
77
198k
    bool is_fixed_partial_update() const {
78
198k
        return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
79
198k
    }
80
52.7k
    bool is_flexible_partial_update() const {
81
52.7k
        return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS;
82
52.7k
    }
83
59.5k
    UniqueKeyUpdateModePB update_mode() const { return partial_update_mode; }
84
273
    int32_t sequence_map_col_uid() const { return sequence_map_col_unqiue_id; }
85
86
private:
87
    void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema);
88
89
public:
90
    UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT};
91
    PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND};
92
    int64_t max_version_in_flush_phase {-1};
93
    std::set<std::string> partial_update_input_columns;
94
    std::vector<uint32_t> missing_cids;
95
    std::vector<uint32_t> update_cids;
96
    // if key not exist in old rowset, use default value or null value for the unmentioned cols
97
    // to generate a new row, only available in non-strict mode
98
    bool can_insert_new_rows_in_partial_update {true};
99
    bool is_strict_mode {false};
100
    int64_t timestamp_ms {0};
101
    int32_t nano_seconds {0};
102
    std::string timezone;
103
    bool is_input_columns_contains_auto_inc_column = false;
104
    bool is_schema_contains_auto_inc_column = false;
105
106
    // default values for missing cids
107
    std::vector<std::string> default_values;
108
109
    int32_t sequence_map_col_unqiue_id {-1};
110
};
111
112
// used in mow partial update
113
struct RidAndPos {
114
    uint32_t rid;
115
    // pos in block
116
    size_t pos;
117
};
118
119
class FixedReadPlan {
120
public:
121
    bool empty() const;
122
    void prepare_to_read(const RowLocation& row_location, size_t pos);
123
    Status read_columns_by_plan(const TabletSchema& tablet_schema,
124
                                std::vector<uint32_t> cids_to_read,
125
                                const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
126
                                Block& block, std::map<uint32_t, uint32_t>* read_index,
127
                                bool force_read_old_delete_signs,
128
                                const signed char* __restrict cur_delete_signs = nullptr) const;
129
    Status fill_missing_columns(RowsetWriterContext* rowset_ctx,
130
                                const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
131
                                const TabletSchema& tablet_schema, Block& full_block,
132
                                const std::vector<bool>& use_default_or_null_flag,
133
                                bool has_default_or_nullable, uint32_t segment_start_pos,
134
                                const Block* block) const;
135
136
private:
137
    std::map<RowsetId, std::map<uint32_t /* segment_id */, std::vector<RidAndPos>>> plan;
138
};
139
140
class FlexibleReadPlan {
141
public:
142
265
    FlexibleReadPlan(bool has_row_store_for_column) : use_row_store(has_row_store_for_column) {}
143
    void prepare_to_read(const RowLocation& row_location, size_t pos,
144
                         const BitmapValue& skip_bitmap);
145
    // for column store
146
    Status read_columns_by_plan(const TabletSchema& tablet_schema,
147
                                const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
148
                                Block& old_value_block,
149
                                std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const;
150
151
    // for row_store
152
    Status read_columns_by_plan(const TabletSchema& tablet_schema,
153
                                const std::vector<uint32_t>& cids_to_read,
154
                                const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
155
                                Block& old_value_block,
156
                                std::map<uint32_t, uint32_t>* read_index) const;
157
    Status fill_non_primary_key_columns(RowsetWriterContext* rowset_ctx,
158
                                        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
159
                                        const TabletSchema& tablet_schema, Block& full_block,
160
                                        const std::vector<bool>& use_default_or_null_flag,
161
                                        bool has_default_or_nullable, uint32_t segment_start_pos,
162
                                        uint32_t block_start_pos, const Block* block,
163
                                        std::vector<BitmapValue>* skip_bitmaps) const;
164
165
    Status fill_non_primary_key_columns_for_column_store(
166
            RowsetWriterContext* rowset_ctx,
167
            const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
168
            const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
169
            Block& old_value_block, MutableColumns& mutable_full_columns,
170
            const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
171
            uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
172
            std::vector<BitmapValue>* skip_bitmaps) const;
173
    Status fill_non_primary_key_columns_for_row_store(
174
            RowsetWriterContext* rowset_ctx,
175
            const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
176
            const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
177
            Block& old_value_block, MutableColumns& mutable_full_columns,
178
            const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
179
            uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
180
            std::vector<BitmapValue>* skip_bitmaps) const;
181
182
private:
183
    bool use_row_store {false};
184
    // rowset_id -> segment_id -> column unique id -> mappings
185
    std::map<RowsetId, std::map<uint32_t, std::map<uint32_t, std::vector<RidAndPos>>>> plan;
186
    std::map<RowsetId, std::map<uint32_t /* segment_id */, std::vector<RidAndPos>>> row_store_plan;
187
};
188
189
class BlockAggregator {
190
public:
191
    ~BlockAggregator() = default;
192
    BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer);
193
194
    Status convert_pk_columns(Block* block, size_t row_pos, size_t num_rows,
195
                              std::vector<IOlapColumnDataAccessor*>& key_columns);
196
    Status convert_seq_column(Block* block, size_t row_pos, size_t num_rows,
197
                              IOlapColumnDataAccessor*& seq_column);
198
    Status aggregate_for_flexible_partial_update(
199
            Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets,
200
            std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches);
201
202
private:
203
    Status aggregate_for_sequence_column(
204
            Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
205
            IOlapColumnDataAccessor* seq_column,
206
            const std::vector<RowsetSharedPtr>& specified_rowsets,
207
            std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches);
208
    Status aggregate_for_insert_after_delete(
209
            Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
210
            const std::vector<RowsetSharedPtr>& specified_rowsets,
211
            std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches);
212
    Status filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column,
213
                        int duplicate_rows, std::string col_name);
214
215
    Status fill_sequence_column(Block* block, size_t num_rows, const FixedReadPlan& read_plan,
216
                                std::vector<BitmapValue>& skip_bitmaps);
217
218
    void append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid,
219
                             BitmapValue& skip_bitmap, bool have_delete_sign);
220
    void merge_one_row(MutableBlock& dst_block, Block* src_block, int rid,
221
                       BitmapValue& skip_bitmap);
222
    void append_one_row(MutableBlock& dst_block, Block* src_block, int rid);
223
    void remove_last_n_rows(MutableBlock& dst_block, int n);
224
225
    // aggregate rows with same keys in range [start, end) from block to output_block
226
    Status aggregate_rows(MutableBlock& output_block, Block* block, int start, int end,
227
                          std::string key, std::vector<BitmapValue>* skip_bitmaps,
228
                          const signed char* delete_signs, IOlapColumnDataAccessor* seq_column,
229
                          const std::vector<RowsetSharedPtr>& specified_rowsets,
230
                          std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches);
231
232
    segment_v2::VerticalSegmentWriter& _writer;
233
    TabletSchema& _tablet_schema;
234
235
    // used to store state when aggregating rows in block
236
    struct AggregateState {
237
        int rows {0};
238
        bool has_row_with_delete_sign {false};
239
240
282
        bool should_merge() const {
241
282
            return ((rows == 1 && !has_row_with_delete_sign) || rows == 2);
242
282
        }
243
244
99
        void reset() {
245
99
            rows = 0;
246
99
            has_row_with_delete_sign = false;
247
99
        }
248
249
0
        std::string to_string() const {
250
0
            return fmt::format("rows={}, have_delete_row={}", rows, has_row_with_delete_sign);
251
0
        }
252
    } _state {};
253
};
254
255
struct PartialUpdateStats {
256
    int64_t num_rows_updated {0};
257
    int64_t num_rows_new_added {0};
258
    int64_t num_rows_deleted {0};
259
    int64_t num_rows_filtered {0};
260
};
261
} // namespace doris