be/src/storage/partial_update_info.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/olap_file.pb.h> |
20 | | |
21 | | #include <cstdint> |
22 | | #include <functional> |
23 | | #include <map> |
24 | | #include <set> |
25 | | #include <string> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "core/column/column.h" |
30 | | #include "storage/rowset/rowset_fwd.h" |
31 | | #include "storage/tablet/tablet_fwd.h" |
32 | | |
33 | | namespace doris { |
34 | | class TabletSchema; |
35 | | class PartialUpdateInfoPB; |
36 | | class BitmapValue; |
37 | | struct RowLocation; |
38 | | class Block; |
39 | | class MutableBlock; |
40 | | class IOlapColumnDataAccessor; |
41 | | namespace segment_v2 { |
42 | | struct HistoricalRowRetrieverContext; |
43 | | } |
44 | | |
45 | | struct RowsetWriterContext; |
46 | | struct RowsetId; |
47 | | class BitmapValue; |
48 | | namespace segment_v2 { |
49 | | class VerticalSegmentWriter; |
50 | | } |
51 | | |
52 | | class SegmentCacheHandle; |
53 | | |
54 | | struct PartialUpdateInfo { |
55 | | Status init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, |
56 | | UniqueKeyUpdateModePB unique_key_update_mode, PartialUpdateNewRowPolicyPB policy, |
57 | | const std::set<std::string>& partial_update_cols, bool is_strict_mode, |
58 | | int64_t timestamp_ms, int32_t nano_seconds, const std::string& timezone, |
59 | | const std::string& auto_increment_column, int32_t sequence_map_col_uid = -1, |
60 | | int64_t cur_max_version = -1); |
61 | | void to_pb(PartialUpdateInfoPB* partial_update_info) const; |
62 | | void from_pb(PartialUpdateInfoPB* partial_update_info); |
63 | | Status handle_new_key(const TabletSchema& tablet_schema, |
64 | | const std::function<std::string()>& line, |
65 | | BitmapValue* skip_bitmap = nullptr); |
66 | | std::string summary() const; |
67 | | |
68 | 0 | std::string partial_update_mode_str() const { |
69 | 0 | switch (partial_update_mode) { |
70 | 0 | case UniqueKeyUpdateModePB::UPSERT: |
71 | 0 | return "upsert"; |
72 | 0 | case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS: |
73 | 0 | return "partial update"; |
74 | 0 | case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS: |
75 | 0 | return "flexible partial update"; |
76 | 0 | } |
77 | 0 | return ""; |
78 | 0 | } |
79 | 91 | bool is_partial_update() const { return partial_update_mode != UniqueKeyUpdateModePB::UPSERT; } |
80 | 29 | bool is_fixed_partial_update() const { |
81 | 29 | return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; |
82 | 29 | } |
83 | 3 | bool is_flexible_partial_update() const { |
84 | 3 | return partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS; |
85 | 3 | } |
86 | 15 | UniqueKeyUpdateModePB update_mode() const { return partial_update_mode; } |
87 | 0 | int32_t sequence_map_col_uid() const { return sequence_map_col_unqiue_id; } |
88 | | |
89 | | private: |
90 | | void _generate_default_values_for_missing_cids(const TabletSchema& tablet_schema); |
91 | | |
92 | | public: |
93 | | UniqueKeyUpdateModePB partial_update_mode {UniqueKeyUpdateModePB::UPSERT}; |
94 | | PartialUpdateNewRowPolicyPB partial_update_new_key_policy {PartialUpdateNewRowPolicyPB::APPEND}; |
95 | | int64_t max_version_in_flush_phase {-1}; |
96 | | std::set<std::string> partial_update_input_columns; |
97 | | std::vector<uint32_t> missing_cids; |
98 | | std::vector<uint32_t> update_cids; |
99 | | // if key not exist in old rowset, use default value or null value for the unmentioned cols |
100 | | // to generate a new row, only available in non-strict mode |
101 | | bool can_insert_new_rows_in_partial_update {true}; |
102 | | bool is_strict_mode {false}; |
103 | | int64_t timestamp_ms {0}; |
104 | | int32_t nano_seconds {0}; |
105 | | std::string timezone; |
106 | | bool is_input_columns_contains_auto_inc_column = false; |
107 | | bool is_schema_contains_auto_inc_column = false; |
108 | | |
109 | | // default values for missing cids |
110 | | std::vector<std::string> default_values; |
111 | | |
112 | | int32_t sequence_map_col_unqiue_id {-1}; |
113 | | }; |
114 | | |
115 | | // used in mow partial update |
116 | | struct RidAndPos { |
117 | | uint32_t rid; |
118 | | // pos in block |
119 | | size_t pos; |
120 | | }; |
121 | | |
122 | | class FixedReadPlan { |
123 | | public: |
124 | | bool empty() const; |
125 | 2 | void clear() { plan.clear(); } |
126 | | void prepare_to_read(const RowLocation& row_location, size_t pos); |
127 | | Status read_columns_by_plan(const TabletSchema& tablet_schema, |
128 | | std::vector<uint32_t> cids_to_read, |
129 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
130 | | Block& block, std::map<uint32_t, uint32_t>* read_index, |
131 | | bool force_read_old_delete_signs, |
132 | | const signed char* __restrict cur_delete_signs = nullptr) const; |
133 | | Status fill_missing_columns(const segment_v2::HistoricalRowRetrieverContext& historical_context, |
134 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
135 | | const TabletSchema& tablet_schema, Block& full_block, |
136 | | const std::vector<bool>& use_default_or_null_flag, |
137 | | bool has_default_or_nullable, uint32_t segment_start_pos, |
138 | | const Block* block) const; |
139 | | |
140 | | private: |
141 | | std::map<RowsetId, std::map<uint32_t /* segment_id */, std::vector<RidAndPos>>> plan; |
142 | | }; |
143 | | |
144 | | class FlexibleReadPlan { |
145 | | public: |
146 | 0 | FlexibleReadPlan(bool has_row_store_for_column) : use_row_store(has_row_store_for_column) {} |
147 | | void prepare_to_read(const RowLocation& row_location, size_t pos, |
148 | | const BitmapValue& skip_bitmap); |
149 | | // for column store |
150 | | Status read_columns_by_plan(const TabletSchema& tablet_schema, |
151 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
152 | | Block& old_value_block, |
153 | | std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const; |
154 | | |
155 | | // for row_store |
156 | | Status read_columns_by_plan(const TabletSchema& tablet_schema, |
157 | | const std::vector<uint32_t>& cids_to_read, |
158 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
159 | | Block& old_value_block, |
160 | | std::map<uint32_t, uint32_t>* read_index) const; |
161 | | Status fill_non_primary_key_columns( |
162 | | const segment_v2::HistoricalRowRetrieverContext& historical_context, |
163 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
164 | | const TabletSchema& tablet_schema, Block& full_block, |
165 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
166 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
167 | | std::vector<BitmapValue>* skip_bitmaps) const; |
168 | | |
169 | | Status fill_non_primary_key_columns_for_column_store( |
170 | | const segment_v2::HistoricalRowRetrieverContext& historical_context, |
171 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
172 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids, |
173 | | Block& old_value_block, MutableColumns& mutable_full_columns, |
174 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
175 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
176 | | std::vector<BitmapValue>* skip_bitmaps) const; |
177 | | Status fill_non_primary_key_columns_for_row_store( |
178 | | const segment_v2::HistoricalRowRetrieverContext& historical_context, |
179 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
180 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids, |
181 | | Block& old_value_block, MutableColumns& mutable_full_columns, |
182 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
183 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
184 | | std::vector<BitmapValue>* skip_bitmaps) const; |
185 | | |
186 | | private: |
187 | | bool use_row_store {false}; |
188 | | // rowset_id -> segment_id -> column unique id -> mappings |
189 | | std::map<RowsetId, std::map<uint32_t, std::map<uint32_t, std::vector<RidAndPos>>>> plan; |
190 | | std::map<RowsetId, std::map<uint32_t /* segment_id */, std::vector<RidAndPos>>> row_store_plan; |
191 | | }; |
192 | | |
193 | | class BlockAggregator { |
194 | | public: |
195 | | ~BlockAggregator() = default; |
196 | | BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer); |
197 | | |
198 | | Status convert_pk_columns(Block* block, size_t row_pos, size_t num_rows, |
199 | | std::vector<IOlapColumnDataAccessor*>& key_columns); |
200 | | Status convert_seq_column(Block* block, size_t row_pos, size_t num_rows, |
201 | | IOlapColumnDataAccessor*& seq_column); |
202 | | Status aggregate_for_flexible_partial_update( |
203 | | Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets, |
204 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
205 | | |
206 | | private: |
207 | | Status aggregate_for_sequence_column( |
208 | | Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns, |
209 | | IOlapColumnDataAccessor* seq_column, |
210 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
211 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
212 | | Status aggregate_for_insert_after_delete( |
213 | | Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns, |
214 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
215 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
216 | | Status filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column, |
217 | | int duplicate_rows, std::string col_name); |
218 | | |
219 | | Status fill_sequence_column(Block* block, size_t num_rows, const FixedReadPlan& read_plan, |
220 | | std::vector<BitmapValue>& skip_bitmaps); |
221 | | |
222 | | void append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid, |
223 | | BitmapValue& skip_bitmap, bool have_delete_sign); |
224 | | void merge_one_row(MutableBlock& dst_block, Block* src_block, int rid, |
225 | | BitmapValue& skip_bitmap); |
226 | | void append_one_row(MutableBlock& dst_block, Block* src_block, int rid); |
227 | | void remove_last_n_rows(MutableBlock& dst_block, int n); |
228 | | |
229 | | // aggregate rows with same keys in range [start, end) from block to output_block |
230 | | Status aggregate_rows(MutableBlock& output_block, Block* block, int start, int end, |
231 | | std::string key, std::vector<BitmapValue>* skip_bitmaps, |
232 | | const signed char* delete_signs, IOlapColumnDataAccessor* seq_column, |
233 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
234 | | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches); |
235 | | |
236 | | segment_v2::VerticalSegmentWriter& _writer; |
237 | | TabletSchema& _tablet_schema; |
238 | | |
239 | | // used to store state when aggregating rows in block |
240 | | struct AggregateState { |
241 | | int rows {0}; |
242 | | bool has_row_with_delete_sign {false}; |
243 | | |
244 | 0 | bool should_merge() const { |
245 | 0 | return ((rows == 1 && !has_row_with_delete_sign) || rows == 2); |
246 | 0 | } |
247 | | |
248 | 0 | void reset() { |
249 | 0 | rows = 0; |
250 | 0 | has_row_with_delete_sign = false; |
251 | 0 | } |
252 | | |
253 | 0 | std::string to_string() const { |
254 | 0 | return fmt::format("rows={}, have_delete_row={}", rows, has_row_with_delete_sign); |
255 | 0 | } |
256 | | } _state {}; |
257 | | }; |
258 | | |
259 | | struct PartialUpdateStats { |
260 | | int64_t num_rows_updated {0}; |
261 | | int64_t num_rows_new_added {0}; |
262 | | int64_t num_rows_deleted {0}; |
263 | | int64_t num_rows_filtered {0}; |
264 | | }; |
265 | | } // namespace doris |