be/src/storage/partial_update_info.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/olap_file.pb.h> |
20 | | |
21 | | #include <cstdint> |
22 | | #include <functional> |
23 | | #include <map> |
24 | | #include <set> |
25 | | #include <string> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "core/column/column.h" |
30 | | #include "storage/rowset/rowset_fwd.h" |
31 | | #include "storage/tablet/tablet_fwd.h" |
32 | | |
33 | | namespace doris { |
34 | | class TabletSchema; |
35 | | class PartialUpdateInfoPB; |
36 | | class BitmapValue; |
37 | | struct RowLocation; |
38 | | class Block; |
39 | | class MutableBlock; |
40 | | class IOlapColumnDataAccessor; |
41 | | |
42 | | struct RowsetWriterContext; |
43 | | struct RowsetId; |
44 | | class BitmapValue; |
45 | | namespace segment_v2 { |
46 | | class VerticalSegmentWriter; |
47 | | } |
48 | | |
49 | | class SegmentCacheHandle; |
50 | | |
51 | | struct PartialUpdateInfo { |
52 | | Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, |
53 | | UniqueKeyUpdateModePB unique_key_update_mode, PartialUpdateNewRowPolicyPB policy, |
54 | | const std::set<std::string>& partial_update_cols, bool is_strict_mode, |
55 | | int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, |
56 | | const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1, |
57 | | int64_t cur_max_version = -1); |
58 | | void to_pb(PartialUpdateInfoPB* partial_update_info) const; |
59 | | void from_pb(PartialUpdateInfoPB* partial_update_info); |
60 | | Status handle_new_key(const TabletSchema& tablet_schema, |
61 | | const std::function<std::string()>& line, |
62 | | BitmapValue* skip_bitmap = nullptr); |
63 | | std::string summary() const; |
64 | | |
65 | 3.18k | std::string partial_update_mode_str() const { |
66 | 3.18k | switch (partial_update_mode) { |
67 | 0 | case UniqueKeyUpdateModePB::UPSERT: |
68 | 0 | return "upsert"; |
69 | 2.68k | case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS: |
70 | 2.68k | return "partial update"; |
71 | 518 | case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS: |
72 | 518 | return "flexible partial update"; |
73 | 3.18k | } |
74 | 0 | return ""; |
75 | 3.18k | } |
76 | 675k | bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; } |
77 | 198k | bool is_fixed_partial_update() const { |
78 | 198k | return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; |
79 | 198k | } |
80 | 52.7k | bool is_flexible_partial_update() const { |
81 | 52.7k | return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; |
82 | 52.7k | } |
83 | 59.5k | UniqueKeyUpdateModePB update_mode() const { return partial_update_mode; } |
84 | 273 | int32_t sequence_map_col_uid() const { return sequence_map_col_unqiue_id; } |
85 | | |
86 | | private: |
87 | | void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema); |
88 | | |
89 | | public: |
90 | | UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
91 | | PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND}; |
92 | | int64_t max_version_in_flush_phase {-1}; |
93 | | std::set<std::string> partial_update_input_columns; |
94 | | std::vector<uint32_t> missing_cids; |
95 | | std::vector<uint32_t> update_cids; |
96 | | // if key not exist in old rowset, use default value or null value for the unmentioned cols |
97 | | // to generate a new row, only available in non-strict mode |
98 | | bool can_insert_new_rows_in_partial_update {true}; |
99 | | bool is_strict_mode {false}; |
100 | | int64_t timestamp_ms {0}; |
101 | | int32_t nano_seconds {0}; |
102 | | std::string timezone; |
103 | | bool is_input_columns_contains_auto_inc_column = false; |
104 | | bool is_schema_contains_auto_inc_column = false; |
105 | | |
106 | | // default values for missing cids |
107 | | std::vector<std::string> default_values; |
108 | | |
109 | | int32_t sequence_map_col_unqiue_id {-1}; |
110 | | }; |
111 | | |
112 | | // used in mow partial update |
113 | | struct RidAndPos { |
114 | | uint32_t rid; |
115 | | // pos in block |
116 | | size_t pos; |
117 | | }; |
118 | | |
119 | | class FixedReadPlan { |
120 | | public: |
121 | | bool empty() const; |
122 | | void prepare_to_read(const RowLocation& row_location, size_t pos); |
123 | | Status read_columns_by_plan(const TabletSchema& tablet_schema, |
124 | | std::vector<uint32_t> cids_to_read, |
125 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
126 | | Block& block, std::map<uint32_t, uint32_t>* read_index, |
127 | | bool force_read_old_delete_signs, |
128 | | const signed char* __restrict cur_delete_signs = nullptr) const; |
129 | | Status fill_missing_columns(RowsetWriterContext* rowset_ctx, |
130 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
131 | | const TabletSchema& tablet_schema, Block& full_block, |
132 | | const std::vector<bool>& use_default_or_null_flag, |
133 | | bool has_default_or_nullable, uint32_t segment_start_pos, |
134 | | const Block* block) const; |
135 | | |
136 | | private: |
137 | | std::map<RowsetId, std::map<uint32_t /* segment_id */, std::vector<RidAndPos>>> plan; |
138 | | }; |
139 | | |
140 | | class FlexibleReadPlan { |
141 | | public: |
142 | 265 | FlexibleReadPlan(bool has_row_store_for_column) : use_row_store(has_row_store_for_column) {} |
143 | | void prepare_to_read(const RowLocation& row_location, size_t pos, |
144 | | const BitmapValue& skip_bitmap); |
145 | | // for column store |
146 | | Status read_columns_by_plan(const TabletSchema& tablet_schema, |
147 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
148 | | Block& old_value_block, |
149 | | std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const; |
150 | | |
151 | | // for row_store |
152 | | Status read_columns_by_plan(const TabletSchema& tablet_schema, |
153 | | const std::vector<uint32_t>& cids_to_read, |
154 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
155 | | Block& old_value_block, |
156 | | std::map<uint32_t, uint32_t>* read_index) const; |
157 | | Status fill_non_primary_key_columns(RowsetWriterContext* rowset_ctx, |
158 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
159 | | const TabletSchema& tablet_schema, Block& full_block, |
160 | | const std::vector<bool>& use_default_or_null_flag, |
161 | | bool has_default_or_nullable, uint32_t segment_start_pos, |
162 | | uint32_t block_start_pos, const Block* block, |
163 | | std::vector<BitmapValue>* skip_bitmaps) const; |
164 | | |
165 | | Status fill_non_primary_key_columns_for_column_store( |
166 | | RowsetWriterContext* rowset_ctx, |
167 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
168 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids, |
169 | | Block& old_value_block, MutableColumns& mutable_full_columns, |
170 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
171 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
172 | | std::vector<BitmapValue>* skip_bitmaps) const; |
173 | | Status fill_non_primary_key_columns_for_row_store( |
174 | | RowsetWriterContext* rowset_ctx, |
175 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
176 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids, |
177 | | Block& old_value_block, MutableColumns& mutable_full_columns, |
178 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
179 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
180 | | std::vector<BitmapValue>* skip_bitmaps) const; |
181 | | |
182 | | private: |
183 | | bool use_row_store {false}; |
184 | | // rowset_id -> segment_id -> column unique id -> mappings |
185 | | std::map<RowsetId, std::map<uint32_t, std::map<uint32_t, std::vector<RidAndPos>>>> plan; |
186 | | std::map<RowsetId, std::map<uint32_t /* segment_id */, std::vector<RidAndPos>>> row_store_plan; |
187 | | }; |
188 | | |
189 | | class BlockAggregator { |
190 | | public: |
191 | | ~BlockAggregator() = default; |
192 | | BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer); |
193 | | |
194 | | Status convert_pk_columns(Block* block, size_t row_pos, size_t num_rows, |
195 | | std::vector<IOlapColumnDataAccessor*>& key_columns); |
196 | | Status convert_seq_column(Block* block, size_t row_pos, size_t num_rows, |
197 | | IOlapColumnDataAccessor*& seq_column); |
198 | | Status aggregate_for_flexible_partial_update( |
199 | | Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets, |
200 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
201 | | |
202 | | private: |
203 | | Status aggregate_for_sequence_column( |
204 | | Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns, |
205 | | IOlapColumnDataAccessor* seq_column, |
206 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
207 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
208 | | Status aggregate_for_insert_after_delete( |
209 | | Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns, |
210 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
211 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
212 | | Status filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column, |
213 | | int duplicate_rows, std::string col_name); |
214 | | |
215 | | Status fill_sequence_column(Block* block, size_t num_rows, const FixedReadPlan& read_plan, |
216 | | std::vector<BitmapValue>& skip_bitmaps); |
217 | | |
218 | | void append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid, |
219 | | BitmapValue& skip_bitmap, bool have_delete_sign); |
220 | | void merge_one_row(MutableBlock& dst_block, Block* src_block, int rid, |
221 | | BitmapValue& skip_bitmap); |
222 | | void append_one_row(MutableBlock& dst_block, Block* src_block, int rid); |
223 | | void remove_last_n_rows(MutableBlock& dst_block, int n); |
224 | | |
225 | | // aggregate rows with same keys in range [start, end) from block to output_block |
226 | | Status aggregate_rows(MutableBlock& output_block, Block* block, int start, int end, |
227 | | std::string key, std::vector<BitmapValue>* skip_bitmaps, |
228 | | const signed char* delete_signs, IOlapColumnDataAccessor* seq_column, |
229 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
230 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
231 | | |
232 | | segment_v2::VerticalSegmentWriter& _writer; |
233 | | TabletSchema& _tablet_schema; |
234 | | |
235 | | // used to store state when aggregating rows in block |
236 | | struct AggregateState { |
237 | | int rows {0}; |
238 | | bool has_row_with_delete_sign {false}; |
239 | | |
240 | 282 | bool should_merge() const { |
241 | 282 | return ((rows == 1 && !has_row_with_delete_sign) || rows == 2); |
242 | 282 | } |
243 | | |
244 | 99 | void reset() { |
245 | 99 | rows = 0; |
246 | 99 | has_row_with_delete_sign = false; |
247 | 99 | } |
248 | | |
249 | 0 | std::string to_string() const { |
250 | 0 | return fmt::format("rows={}, have_delete_row={}", rows, has_row_with_delete_sign); |
251 | 0 | } |
252 | | } _state {}; |
253 | | }; |
254 | | |
255 | | struct PartialUpdateStats { |
256 | | int64_t num_rows_updated {0}; |
257 | | int64_t num_rows_new_added {0}; |
258 | | int64_t num_rows_deleted {0}; |
259 | | int64_t num_rows_filtered {0}; |
260 | | }; |
261 | | } // namespace doris |