Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/partial_update_info.h" |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | |
22 | | #include <cstdint> |
23 | | #include <utility> |
24 | | |
25 | | #include "common/consts.h" |
26 | | #include "common/logging.h" |
27 | | #include "core/assert_cast.h" |
28 | | #include "core/block/block.h" |
29 | | #include "core/data_type/data_type_number.h" // IWYU pragma: keep |
30 | | #include "core/value/bitmap_value.h" |
31 | | #include "exec/common/variant_util.h" |
32 | | #include "storage/iterator/olap_data_convertor.h" |
33 | | #include "storage/olap_common.h" |
34 | | #include "storage/rowset/rowset.h" |
35 | | #include "storage/rowset/rowset_writer_context.h" |
36 | | #include "storage/segment/vertical_segment_writer.h" |
37 | | #include "storage/tablet/base_tablet.h" |
38 | | #include "storage/tablet/tablet_meta.h" |
39 | | #include "storage/tablet/tablet_schema.h" |
40 | | #include "storage/utils.h" |
41 | | |
42 | | namespace doris { |
43 | | Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema, |
44 | | UniqueKeyUpdateModePB unique_key_update_mode, |
45 | | PartialUpdateNewRowPolicyPB policy, |
46 | | const std::set<std::string>& partial_update_cols, |
47 | | bool is_strict_mode_, int64_t timestamp_ms_, int32_t nano_seconds_, |
48 | | const std::string& timezone_, |
49 | | const std::string& auto_increment_column, |
50 | 30 | int32_t sequence_map_col_uid, int64_t cur_max_version) { |
51 | 30 | partial_update_mode = unique_key_update_mode; |
52 | 30 | partial_update_new_key_policy = policy; |
53 | 30 | partial_update_input_columns = partial_update_cols; |
54 | 30 | max_version_in_flush_phase = cur_max_version; |
55 | 30 | sequence_map_col_unqiue_id = sequence_map_col_uid; |
56 | 30 | timestamp_ms = timestamp_ms_; |
57 | 30 | nano_seconds = nano_seconds_; |
58 | 30 | timezone = timezone_; |
59 | 30 | missing_cids.clear(); |
60 | 30 | update_cids.clear(); |
61 | | |
62 | 30 | if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
63 | | // partial_update_cols should include all key columns |
64 | 0 | for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) { |
65 | 0 | const auto key_col = tablet_schema.column(i); |
66 | 0 | if (!partial_update_cols.contains(key_col.name())) { |
67 | 0 | auto msg = fmt::format( |
68 | 0 | "Unable to do partial update on shadow index's tablet, tablet_id={}, " |
69 | 0 | "txn_id={}. Missing key column {}.", |
70 | 0 | tablet_id, txn_id, key_col.name()); |
71 | 0 | LOG_WARNING(msg); |
72 | 0 | return Status::Aborted<false>(msg); |
73 | 0 | } |
74 | 0 | } |
75 | 0 | } |
76 | 30 | if (is_partial_update()) { |
77 | 0 | for (auto i = 0; i < tablet_schema.num_columns(); ++i) { |
78 | 0 | if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
79 | 0 | auto tablet_column = tablet_schema.column(i); |
80 | 0 | if (!partial_update_input_columns.contains(tablet_column.name())) { |
81 | 0 | missing_cids.emplace_back(i); |
82 | 0 | if (!tablet_column.has_default_value() && !tablet_column.is_nullable() && |
83 | 0 | tablet_schema.auto_increment_column() != tablet_column.name()) { |
84 | 0 | can_insert_new_rows_in_partial_update = false; |
85 | 0 | } |
86 | 0 | } else { |
87 | 0 | update_cids.emplace_back(i); |
88 | 0 | } |
89 | 0 | if (auto_increment_column == tablet_column.name()) { |
90 | 0 | is_schema_contains_auto_inc_column = true; |
91 | 0 | } |
92 | 0 | } else { |
93 | | // in flexible partial update, missing cids is all non sort keys' cid |
94 | 0 | if (i >= tablet_schema.num_key_columns()) { |
95 | 0 | missing_cids.emplace_back(i); |
96 | 0 | } |
97 | 0 | } |
98 | 0 | } |
99 | 0 | _generate_default_values_for_missing_cids(tablet_schema); |
100 | 0 | } |
101 | 30 | is_strict_mode = is_strict_mode_; |
102 | 30 | is_input_columns_contains_auto_inc_column = |
103 | 30 | is_fixed_partial_update() && |
104 | 30 | partial_update_input_columns.contains(auto_increment_column); |
105 | 30 | return Status::OK(); |
106 | 30 | } |
107 | | |
108 | 1 | void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const { |
109 | 1 | partial_update_info_pb->set_partial_update_mode(partial_update_mode); |
110 | 1 | partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy); |
111 | 1 | partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase); |
112 | 1 | for (const auto& col : partial_update_input_columns) { |
113 | 0 | partial_update_info_pb->add_partial_update_input_columns(col); |
114 | 0 | } |
115 | 1 | for (auto cid : missing_cids) { |
116 | 0 | partial_update_info_pb->add_missing_cids(cid); |
117 | 0 | } |
118 | 1 | for (auto cid : update_cids) { |
119 | 0 | partial_update_info_pb->add_update_cids(cid); |
120 | 0 | } |
121 | 1 | partial_update_info_pb->set_can_insert_new_rows_in_partial_update( |
122 | 1 | can_insert_new_rows_in_partial_update); |
123 | 1 | partial_update_info_pb->set_is_strict_mode(is_strict_mode); |
124 | 1 | partial_update_info_pb->set_timestamp_ms(timestamp_ms); |
125 | 1 | partial_update_info_pb->set_nano_seconds(nano_seconds); |
126 | 1 | partial_update_info_pb->set_timezone(timezone); |
127 | 1 | partial_update_info_pb->set_is_input_columns_contains_auto_inc_column( |
128 | 1 | is_input_columns_contains_auto_inc_column); |
129 | 1 | partial_update_info_pb->set_is_schema_contains_auto_inc_column( |
130 | 1 | is_schema_contains_auto_inc_column); |
131 | 1 | partial_update_info_pb->set_sequence_map_col_uid(sequence_map_col_unqiue_id); |
132 | 1 | for (const auto& value : default_values) { |
133 | 0 | partial_update_info_pb->add_default_values(value); |
134 | 0 | } |
135 | 1 | } |
136 | | |
137 | 2 | void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) { |
138 | 2 | if (!partial_update_info_pb->has_partial_update_mode()) { |
139 | | // for backward compatibility |
140 | 0 | if (partial_update_info_pb->is_partial_update()) { |
141 | 0 | partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS; |
142 | 0 | } else { |
143 | 0 | partial_update_mode = UniqueKeyUpdateModePB::UPSERT; |
144 | 0 | } |
145 | 2 | } else { |
146 | 2 | partial_update_mode = partial_update_info_pb->partial_update_mode(); |
147 | 2 | } |
148 | 2 | if (partial_update_info_pb->has_partial_update_new_key_policy()) { |
149 | 1 | partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy(); |
150 | 1 | } |
151 | 2 | max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase() |
152 | 2 | ? partial_update_info_pb->max_version_in_flush_phase() |
153 | 2 | : -1; |
154 | 2 | partial_update_input_columns.clear(); |
155 | 2 | for (const auto& col : partial_update_info_pb->partial_update_input_columns()) { |
156 | 0 | partial_update_input_columns.insert(col); |
157 | 0 | } |
158 | 2 | missing_cids.clear(); |
159 | 2 | for (auto cid : partial_update_info_pb->missing_cids()) { |
160 | 0 | missing_cids.push_back(cid); |
161 | 0 | } |
162 | 2 | update_cids.clear(); |
163 | 2 | for (auto cid : partial_update_info_pb->update_cids()) { |
164 | 0 | update_cids.push_back(cid); |
165 | 0 | } |
166 | 2 | can_insert_new_rows_in_partial_update = |
167 | 2 | partial_update_info_pb->can_insert_new_rows_in_partial_update(); |
168 | 2 | is_strict_mode = partial_update_info_pb->is_strict_mode(); |
169 | 2 | timestamp_ms = partial_update_info_pb->timestamp_ms(); |
170 | 2 | timezone = partial_update_info_pb->timezone(); |
171 | 2 | is_input_columns_contains_auto_inc_column = |
172 | 2 | partial_update_info_pb->is_input_columns_contains_auto_inc_column(); |
173 | 2 | is_schema_contains_auto_inc_column = |
174 | 2 | partial_update_info_pb->is_schema_contains_auto_inc_column(); |
175 | 2 | sequence_map_col_unqiue_id = partial_update_info_pb->has_sequence_map_col_uid() |
176 | 2 | ? partial_update_info_pb->sequence_map_col_uid() |
177 | 2 | : -1; |
178 | 2 | if (partial_update_info_pb->has_nano_seconds()) { |
179 | 1 | nano_seconds = partial_update_info_pb->nano_seconds(); |
180 | 1 | } |
181 | 2 | default_values.clear(); |
182 | 2 | for (const auto& value : partial_update_info_pb->default_values()) { |
183 | 0 | default_values.push_back(value); |
184 | 0 | } |
185 | 2 | } |
186 | | |
187 | 0 | std::string PartialUpdateInfo::summary() const { |
188 | 0 | std::string mode; |
189 | 0 | switch (partial_update_mode) { |
190 | 0 | case UniqueKeyUpdateModePB::UPSERT: |
191 | 0 | mode = "upsert"; |
192 | 0 | break; |
193 | 0 | case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS: |
194 | 0 | mode = "fixed partial update"; |
195 | 0 | break; |
196 | 0 | case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS: |
197 | 0 | mode = "flexible partial update"; |
198 | 0 | break; |
199 | 0 | } |
200 | 0 | return fmt::format( |
201 | 0 | "mode={}, update_cids={}, missing_cids={}, is_strict_mode={}, " |
202 | 0 | "max_version_in_flush_phase={}", |
203 | 0 | mode, update_cids.size(), missing_cids.size(), is_strict_mode, |
204 | 0 | max_version_in_flush_phase); |
205 | 0 | } |
206 | | |
207 | | Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema, |
208 | | const std::function<std::string()>& line, |
209 | 0 | BitmapValue* skip_bitmap) { |
210 | 0 | switch (partial_update_new_key_policy) { |
211 | 0 | case doris::PartialUpdateNewRowPolicyPB::APPEND: { |
212 | 0 | if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) { |
213 | 0 | if (!can_insert_new_rows_in_partial_update) { |
214 | 0 | std::string error_column; |
215 | 0 | for (auto cid : missing_cids) { |
216 | 0 | const TabletColumn& col = tablet_schema.column(cid); |
217 | 0 | if (!col.has_default_value() && !col.is_nullable() && |
218 | 0 | !(tablet_schema.auto_increment_column() == col.name())) { |
219 | 0 | error_column = col.name(); |
220 | 0 | break; |
221 | 0 | } |
222 | 0 | } |
223 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA, false>( |
224 | 0 | "the unmentioned column `{}` should have default value or be nullable " |
225 | 0 | "for newly inserted rows in non-strict mode partial update", |
226 | 0 | error_column); |
227 | 0 | } |
228 | 0 | } else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) { |
229 | 0 | DCHECK(skip_bitmap != nullptr); |
230 | 0 | bool can_insert_new_row {true}; |
231 | 0 | std::string error_column; |
232 | 0 | for (auto cid : missing_cids) { |
233 | 0 | const TabletColumn& col = tablet_schema.column(cid); |
234 | 0 | if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() && |
235 | 0 | !col.is_nullable() && !col.is_auto_increment()) { |
236 | 0 | error_column = col.name(); |
237 | 0 | can_insert_new_row = false; |
238 | 0 | break; |
239 | 0 | } |
240 | 0 | } |
241 | 0 | if (!can_insert_new_row) { |
242 | 0 | return Status::Error<ErrorCode::INVALID_SCHEMA, false>( |
243 | 0 | "the unmentioned column `{}` should have default value or be " |
244 | 0 | "nullable for newly inserted rows in non-strict mode flexible partial " |
245 | 0 | "update", |
246 | 0 | error_column); |
247 | 0 | } |
248 | 0 | } |
249 | 0 | } break; |
250 | 0 | case doris::PartialUpdateNewRowPolicyPB::ERROR: { |
251 | 0 | return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>( |
252 | 0 | "Can't append new rows in partial update when partial_update_new_key_behavior is " |
253 | 0 | "ERROR. Row with key=[{}] is not in table.", |
254 | 0 | line()); |
255 | 0 | } break; |
256 | 0 | } |
257 | 0 | return Status::OK(); |
258 | 0 | } |
259 | | |
260 | | void PartialUpdateInfo::_generate_default_values_for_missing_cids( |
261 | 0 | const TabletSchema& tablet_schema) { |
262 | 0 | for (unsigned int cur_cid : missing_cids) { |
263 | 0 | const auto& column = tablet_schema.column(cur_cid); |
264 | 0 | if (column.has_default_value()) { |
265 | 0 | std::string default_value; |
266 | 0 | if (UNLIKELY((column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 || |
267 | 0 | column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) && |
268 | 0 | to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) != |
269 | 0 | std::string::npos)) { |
270 | 0 | auto pos = to_lower(column.default_value()).find('('); |
271 | 0 | if (pos == std::string::npos) { |
272 | 0 | DateV2Value<DateTimeV2ValueType> dtv; |
273 | 0 | dtv.from_unixtime(timestamp_ms / 1000, timezone); |
274 | 0 | default_value = dtv.to_string(); |
275 | 0 | if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) { |
276 | 0 | default_value += timezone; |
277 | 0 | } |
278 | 0 | } else { |
279 | 0 | int precision = std::stoi(column.default_value().substr(pos + 1)); |
280 | 0 | DateV2Value<DateTimeV2ValueType> dtv; |
281 | 0 | dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision); |
282 | 0 | default_value = dtv.to_string(); |
283 | 0 | if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) { |
284 | 0 | default_value += timezone; |
285 | 0 | } |
286 | 0 | } |
287 | 0 | } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 && |
288 | 0 | to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) != |
289 | 0 | std::string::npos)) { |
290 | 0 | DateV2Value<DateV2ValueType> dv; |
291 | 0 | dv.from_unixtime(timestamp_ms / 1000, timezone); |
292 | 0 | default_value = dv.to_string(); |
293 | 0 | } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_BITMAP && |
294 | 0 | to_lower(column.default_value()).find(to_lower("BITMAP_EMPTY")) != |
295 | 0 | std::string::npos)) { |
296 | 0 | BitmapValue v = BitmapValue {}; |
297 | 0 | default_value = v.to_string(); |
298 | 0 | } else { |
299 | 0 | default_value = column.default_value(); |
300 | 0 | } |
301 | 0 | default_values.emplace_back(default_value); |
302 | 0 | } else { |
303 | | // place an empty string here |
304 | 0 | default_values.emplace_back(); |
305 | 0 | } |
306 | 0 | } |
307 | 0 | CHECK_EQ(missing_cids.size(), default_values.size()); |
308 | 0 | } |
309 | | |
310 | 0 | bool FixedReadPlan::empty() const { |
311 | 0 | return plan.empty(); |
312 | 0 | } |
313 | | |
314 | 0 | void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) { |
315 | 0 | plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos); |
316 | 0 | } |
317 | | |
318 | | // read columns by read plan |
319 | | // read_index: ori_pos-> block_idx |
320 | | Status FixedReadPlan::read_columns_by_plan( |
321 | | const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read, |
322 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& block, |
323 | | std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs, |
324 | 0 | const signed char* __restrict cur_delete_signs) const { |
325 | 0 | if (force_read_old_delete_signs) { |
326 | | // always read delete sign column from historical data |
327 | 0 | if (block.get_position_by_name(DELETE_SIGN) == -1) { |
328 | 0 | auto del_col_cid = tablet_schema.field_index(DELETE_SIGN); |
329 | 0 | cids_to_read.emplace_back(del_col_cid); |
330 | 0 | block.swap(tablet_schema.create_block_by_cids(cids_to_read)); |
331 | 0 | } |
332 | 0 | } |
333 | 0 | bool has_row_column = tablet_schema.has_row_store_for_all_columns(); |
334 | 0 | auto mutable_columns = block.mutate_columns(); |
335 | 0 | uint32_t read_idx = 0; |
336 | 0 | for (const auto& [rowset_id, segment_row_mappings] : plan) { |
337 | 0 | for (const auto& [segment_id, mappings] : segment_row_mappings) { |
338 | 0 | auto rowset_iter = rsid_to_rowset.find(rowset_id); |
339 | 0 | CHECK(rowset_iter != rsid_to_rowset.end()); |
340 | 0 | std::vector<uint32_t> rids; |
341 | 0 | for (auto [rid, pos] : mappings) { |
342 | 0 | if (cur_delete_signs && cur_delete_signs[pos]) { |
343 | 0 | continue; |
344 | 0 | } |
345 | 0 | rids.emplace_back(rid); |
346 | 0 | (*read_index)[static_cast<uint32_t>(pos)] = read_idx++; |
347 | 0 | } |
348 | 0 | if (has_row_column) { |
349 | 0 | auto st = BaseTablet::fetch_value_through_row_column( |
350 | 0 | rowset_iter->second, tablet_schema, segment_id, rids, cids_to_read, block); |
351 | 0 | if (!st.ok()) { |
352 | 0 | LOG(WARNING) << "failed to fetch value through row column"; |
353 | 0 | return st; |
354 | 0 | } |
355 | 0 | continue; |
356 | 0 | } |
357 | 0 | for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { |
358 | 0 | TabletColumn tablet_column = tablet_schema.column(cids_to_read[cid]); |
359 | 0 | auto st = doris::BaseTablet::fetch_value_by_rowids( |
360 | 0 | rowset_iter->second, segment_id, rids, tablet_column, mutable_columns[cid]); |
361 | | // set read value to output block |
362 | 0 | if (!st.ok()) { |
363 | 0 | LOG(WARNING) << "failed to fetch value"; |
364 | 0 | return st; |
365 | 0 | } |
366 | 0 | } |
367 | 0 | } |
368 | 0 | } |
369 | 0 | block.set_columns(std::move(mutable_columns)); |
370 | 0 | return Status::OK(); |
371 | 0 | } |
372 | | |
373 | | Status FixedReadPlan::fill_missing_columns( |
374 | | RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
375 | | const TabletSchema& tablet_schema, Block& full_block, |
376 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
377 | 0 | uint32_t segment_start_pos, const Block* block) const { |
378 | 0 | auto mutable_full_columns = full_block.mutate_columns(); |
379 | | // create old value columns |
380 | 0 | const auto& missing_cids = rowset_ctx->partial_update_info->missing_cids; |
381 | 0 | bool have_input_seq_column = false; |
382 | 0 | if (tablet_schema.has_sequence_col()) { |
383 | 0 | const std::vector<uint32_t>& including_cids = rowset_ctx->partial_update_info->update_cids; |
384 | 0 | have_input_seq_column = |
385 | 0 | (std::find(including_cids.cbegin(), including_cids.cend(), |
386 | 0 | tablet_schema.sequence_col_idx()) != including_cids.cend()); |
387 | 0 | } |
388 | |
|
389 | 0 | auto old_value_block = tablet_schema.create_block_by_cids(missing_cids); |
390 | 0 | CHECK_EQ(missing_cids.size(), old_value_block.columns()); |
391 | | |
392 | | // segment pos to write -> rowid to read in old_value_block |
393 | 0 | std::map<uint32_t, uint32_t> read_index; |
394 | 0 | RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset, |
395 | 0 | old_value_block, &read_index, true, nullptr)); |
396 | | |
397 | 0 | const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block); |
398 | 0 | if (old_delete_signs == nullptr) { |
399 | 0 | return Status::InternalError("old delete signs column not found, block: {}", |
400 | 0 | old_value_block.dump_structure()); |
401 | 0 | } |
402 | | // build default value columns |
403 | 0 | auto default_value_block = old_value_block.clone_empty(); |
404 | 0 | RETURN_IF_ERROR(BaseTablet::generate_default_value_block( |
405 | 0 | tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values, |
406 | 0 | old_value_block, default_value_block)); |
407 | 0 | auto mutable_default_value_columns = default_value_block.mutate_columns(); |
408 | | |
409 | | // fill all missing value from mutable_old_columns, need to consider default value and null value |
410 | 0 | for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { |
411 | 0 | auto segment_pos = idx + segment_start_pos; |
412 | 0 | auto pos_in_old_block = read_index[segment_pos]; |
413 | |
|
414 | 0 | for (auto i = 0; i < missing_cids.size(); ++i) { |
415 | | // if the column has default value, fill it with default value |
416 | | // otherwise, if the column is nullable, fill it with null value |
417 | 0 | const auto& tablet_column = tablet_schema.column(missing_cids[i]); |
418 | 0 | auto& missing_col = mutable_full_columns[missing_cids[i]]; |
419 | |
|
420 | 0 | bool should_use_default = use_default_or_null_flag[idx]; |
421 | 0 | if (!should_use_default) { |
422 | 0 | bool old_row_delete_sign = |
423 | 0 | (old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0); |
424 | 0 | if (old_row_delete_sign) { |
425 | 0 | if (!tablet_schema.has_sequence_col()) { |
426 | 0 | should_use_default = true; |
427 | 0 | } else if (have_input_seq_column || (!tablet_column.is_seqeunce_col())) { |
428 | | // to keep the sequence column value not decreasing, we should read values of seq column |
429 | | // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise |
430 | | // it may cause the merge-on-read based compaction to produce incorrect results |
431 | 0 | should_use_default = true; |
432 | 0 | } |
433 | 0 | } |
434 | 0 | } |
435 | |
|
436 | 0 | if (should_use_default) { |
437 | 0 | if (tablet_column.has_default_value()) { |
438 | 0 | missing_col->insert_from(*mutable_default_value_columns[i], 0); |
439 | 0 | } else if (tablet_column.is_nullable()) { |
440 | 0 | auto* nullable_column = assert_cast<ColumnNullable*>(missing_col.get()); |
441 | 0 | nullable_column->insert_many_defaults(1); |
442 | 0 | } else if (tablet_schema.auto_increment_column() == tablet_column.name()) { |
443 | 0 | const auto& column = |
444 | 0 | *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name())); |
445 | 0 | DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT); |
446 | 0 | auto* auto_inc_column = assert_cast<ColumnInt64*>(missing_col.get()); |
447 | 0 | int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL); |
448 | 0 | if (pos == -1) { |
449 | 0 | return Status::InternalError("auto increment column not found in block {}", |
450 | 0 | block->dump_structure()); |
451 | 0 | } |
452 | 0 | auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx); |
453 | 0 | } else { |
454 | | // If the control flow reaches this branch, the column neither has default value |
455 | | // nor is nullable. It means that the row's delete sign is marked, and the value |
456 | | // columns are useless and won't be read. So we can just put arbitary values in the cells |
457 | 0 | missing_col->insert_default(); |
458 | 0 | } |
459 | 0 | } else { |
460 | 0 | missing_col->insert_from(*old_value_block.get_by_position(i).column, |
461 | 0 | pos_in_old_block); |
462 | 0 | } |
463 | 0 | } |
464 | 0 | } |
465 | 0 | full_block.set_columns(std::move(mutable_full_columns)); |
466 | 0 | return Status::OK(); |
467 | 0 | } |
468 | | |
469 | | void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos, |
470 | 0 | const BitmapValue& skip_bitmap) { |
471 | 0 | if (!use_row_store) { |
472 | 0 | for (uint64_t col_uid : skip_bitmap) { |
473 | 0 | if (variant_util::is_variant_patch_path_marker(col_uid)) { |
474 | 0 | continue; |
475 | 0 | } |
476 | 0 | plan[row_location.rowset_id][row_location.segment_id][static_cast<uint32_t>(col_uid)] |
477 | 0 | .emplace_back(row_location.row_id, pos); |
478 | 0 | } |
479 | 0 | } else { |
480 | 0 | row_store_plan[row_location.rowset_id][row_location.segment_id].emplace_back( |
481 | 0 | row_location.row_id, pos); |
482 | 0 | } |
483 | 0 | } |
484 | | |
485 | | Status FlexibleReadPlan::read_columns_by_plan( |
486 | | const TabletSchema& tablet_schema, |
487 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block, |
488 | 0 | std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const { |
489 | 0 | auto mutable_columns = old_value_block.mutate_columns(); |
490 | | |
491 | | // cid -> next rid to fill in block |
492 | 0 | std::map<uint32_t, uint32_t> next_read_idx; |
493 | 0 | for (uint32_t cid {0}; cid < tablet_schema.num_columns(); cid++) { |
494 | 0 | next_read_idx[cid] = 0; |
495 | 0 | } |
496 | |
|
497 | 0 | for (const auto& [rowset_id, segment_mappings] : plan) { |
498 | 0 | for (const auto& [segment_id, uid_mappings] : segment_mappings) { |
499 | 0 | for (const auto& [col_uid, mappings] : uid_mappings) { |
500 | 0 | auto rowset_iter = rsid_to_rowset.find(rowset_id); |
501 | 0 | CHECK(rowset_iter != rsid_to_rowset.end()); |
502 | 0 | auto cid = tablet_schema.field_index(col_uid); |
503 | 0 | DCHECK_NE(cid, -1); |
504 | 0 | DCHECK_GE(cid, tablet_schema.num_key_columns()); |
505 | 0 | std::vector<uint32_t> rids; |
506 | 0 | for (auto [rid, pos] : mappings) { |
507 | 0 | rids.emplace_back(rid); |
508 | 0 | (*read_index)[cid][static_cast<uint32_t>(pos)] = next_read_idx[cid]++; |
509 | 0 | } |
510 | |
|
511 | 0 | TabletColumn tablet_column = tablet_schema.column(cid); |
512 | 0 | auto idx = cid - tablet_schema.num_key_columns(); |
513 | 0 | RETURN_IF_ERROR(doris::BaseTablet::fetch_value_by_rowids( |
514 | 0 | rowset_iter->second, segment_id, rids, tablet_column, |
515 | 0 | mutable_columns[idx])); |
516 | 0 | } |
517 | 0 | } |
518 | 0 | } |
519 | | // !!!ATTENTION!!!: columns in block may have different size because every row has different columns to update |
520 | 0 | old_value_block.set_columns(std::move(mutable_columns)); |
521 | 0 | return Status::OK(); |
522 | 0 | } |
523 | | |
524 | | Status FlexibleReadPlan::read_columns_by_plan( |
525 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& cids_to_read, |
526 | | const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block, |
527 | 0 | std::map<uint32_t, uint32_t>* read_index) const { |
528 | 0 | DCHECK(use_row_store); |
529 | 0 | uint32_t read_idx = 0; |
530 | 0 | for (const auto& [rowset_id, segment_row_mappings] : row_store_plan) { |
531 | 0 | for (const auto& [segment_id, mappings] : segment_row_mappings) { |
532 | 0 | auto rowset_iter = rsid_to_rowset.find(rowset_id); |
533 | 0 | CHECK(rowset_iter != rsid_to_rowset.end()); |
534 | 0 | std::vector<uint32_t> rids; |
535 | 0 | for (auto [rid, pos] : mappings) { |
536 | 0 | rids.emplace_back(rid); |
537 | 0 | (*read_index)[static_cast<uint32_t>(pos)] = read_idx++; |
538 | 0 | } |
539 | 0 | auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second, tablet_schema, |
540 | 0 | segment_id, rids, cids_to_read, |
541 | 0 | old_value_block); |
542 | 0 | if (!st.ok()) { |
543 | 0 | LOG(WARNING) << "failed to fetch value through row column"; |
544 | 0 | return st; |
545 | 0 | } |
546 | 0 | } |
547 | 0 | } |
548 | 0 | return Status::OK(); |
549 | 0 | } |
550 | | |
551 | | Status FlexibleReadPlan::fill_non_primary_key_columns( |
552 | | RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
553 | | const TabletSchema& tablet_schema, Block& full_block, |
554 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
555 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
556 | 0 | std::vector<BitmapValue>* skip_bitmaps) const { |
557 | 0 | auto mutable_full_columns = full_block.mutate_columns(); |
558 | | |
559 | | // missing_cids are all non sort key columns' cids |
560 | 0 | const auto& non_sort_key_cids = rowset_ctx->partial_update_info->missing_cids; |
561 | 0 | auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids); |
562 | 0 | CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns()); |
563 | |
|
564 | 0 | if (!use_row_store) { |
565 | 0 | RETURN_IF_ERROR(fill_non_primary_key_columns_for_column_store( |
566 | 0 | rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block, |
567 | 0 | mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, |
568 | 0 | segment_start_pos, block_start_pos, block, skip_bitmaps)); |
569 | 0 | } else { |
570 | 0 | RETURN_IF_ERROR(fill_non_primary_key_columns_for_row_store( |
571 | 0 | rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block, |
572 | 0 | mutable_full_columns, use_default_or_null_flag, has_default_or_nullable, |
573 | 0 | segment_start_pos, block_start_pos, block, skip_bitmaps)); |
574 | 0 | } |
575 | 0 | full_block.set_columns(std::move(mutable_full_columns)); |
576 | 0 | return Status::OK(); |
577 | 0 | } |
578 | | |
579 | | static bool old_row_has_delete_sign_for_column_store( |
580 | | const signed char* delete_sign_column_data, const TabletSchema& tablet_schema, |
581 | 0 | std::map<uint32_t, std::map<uint32_t, uint32_t>>& read_index, uint32_t segment_pos) { |
582 | 0 | if (delete_sign_column_data == nullptr) { |
583 | 0 | return false; |
584 | 0 | } |
585 | 0 | if (auto it = read_index[tablet_schema.delete_sign_idx()].find(segment_pos); |
586 | 0 | it != read_index[tablet_schema.delete_sign_idx()].end()) { |
587 | 0 | return delete_sign_column_data[it->second] != 0; |
588 | 0 | } |
589 | 0 | return false; |
590 | 0 | } |
591 | | |
592 | | static Status fill_non_primary_key_cell_for_column_store( |
593 | | const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col, |
594 | | const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col, |
595 | | std::size_t block_pos, uint32_t segment_pos, bool skipped, bool row_has_sequence_col, |
596 | | bool use_default, const signed char* delete_sign_column_data, |
597 | | const TabletSchema& tablet_schema, |
598 | | std::map<uint32_t, std::map<uint32_t, uint32_t>>& read_index, const PartialUpdateInfo* info, |
599 | 0 | const BitmapValue& skip_bitmap) { |
600 | 0 | if (skipped) { |
601 | 0 | DCHECK(std::cmp_not_equal(cid, tablet_schema.skip_bitmap_col_idx())); |
602 | 0 | DCHECK(std::cmp_not_equal(cid, tablet_schema.version_col_idx())); |
603 | 0 | DCHECK(!tablet_column.is_row_store_column()); |
604 | |
|
605 | 0 | if (!use_default) { |
606 | 0 | if (delete_sign_column_data != nullptr) { |
607 | 0 | bool old_row_delete_sign = old_row_has_delete_sign_for_column_store( |
608 | 0 | delete_sign_column_data, tablet_schema, read_index, segment_pos); |
609 | |
|
610 | 0 | if (old_row_delete_sign) { |
611 | 0 | if (!tablet_schema.has_sequence_col()) { |
612 | 0 | use_default = true; |
613 | 0 | } else if (row_has_sequence_col || |
614 | 0 | (!tablet_column.is_seqeunce_col() && |
615 | 0 | (tablet_column.unique_id() != info->sequence_map_col_uid()))) { |
616 | | // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column) |
617 | | // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise |
618 | | // it may cause the merge-on-read based compaction to produce incorrect results |
619 | 0 | use_default = true; |
620 | 0 | } |
621 | 0 | } |
622 | 0 | } |
623 | 0 | } |
624 | 0 | if (!use_default && tablet_column.is_on_update_current_timestamp()) { |
625 | 0 | use_default = true; |
626 | 0 | } |
627 | 0 | if (use_default) { |
628 | 0 | if (tablet_column.has_default_value()) { |
629 | 0 | new_col->insert_from(default_value_col, 0); |
630 | 0 | } else if (tablet_column.is_nullable()) { |
631 | 0 | assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get()) |
632 | 0 | ->insert_many_defaults(1); |
633 | 0 | } else if (tablet_column.is_auto_increment()) { |
634 | | // In flexible partial update, the skip bitmap indicates whether a cell |
635 | | // is specified in the original load, so the generated auto-increment value is filled |
636 | | // in current block in place if needed rather than using a seperate column to |
637 | | // store the generated auto-increment value in fixed partial update |
638 | 0 | new_col->insert_from(cur_col, block_pos); |
639 | 0 | } else { |
640 | 0 | new_col->insert_default(); |
641 | 0 | } |
642 | 0 | } else { |
643 | 0 | auto pos_in_old_block = read_index.at(cid).at(segment_pos); |
644 | 0 | new_col->insert_from(old_value_col, pos_in_old_block); |
645 | 0 | } |
646 | 0 | } else { |
647 | 0 | if (tablet_column.is_variant_type() && !use_default && |
648 | 0 | !old_row_has_delete_sign_for_column_store(delete_sign_column_data, tablet_schema, |
649 | 0 | read_index, segment_pos) && |
650 | 0 | read_index.contains(cid) && read_index.at(cid).contains(segment_pos)) { |
651 | 0 | RETURN_IF_ERROR(variant_util::merge_variant_patch_by_path_markers( |
652 | 0 | old_value_col, read_index.at(cid).at(segment_pos), cur_col, block_pos, |
653 | 0 | tablet_column.unique_id(), skip_bitmap, false, *new_col)); |
654 | 0 | return Status::OK(); |
655 | 0 | } |
656 | 0 | new_col->insert_from(cur_col, block_pos); |
657 | 0 | } |
658 | 0 | return Status::OK(); |
659 | 0 | } |
660 | | |
661 | | Status FlexibleReadPlan::fill_non_primary_key_columns_for_column_store( |
662 | | RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
663 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids, |
664 | | Block& old_value_block, MutableColumns& mutable_full_columns, |
665 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
666 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
667 | 0 | std::vector<BitmapValue>* skip_bitmaps) const { |
668 | 0 | auto* info = rowset_ctx->partial_update_info.get(); |
669 | 0 | int32_t seq_col_unique_id = -1; |
670 | 0 | if (tablet_schema.has_sequence_col()) { |
671 | 0 | seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id(); |
672 | 0 | } |
673 | | // cid -> segment pos to write -> rowid to read in old_value_block |
674 | 0 | std::map<uint32_t, std::map<uint32_t, uint32_t>> read_index; |
675 | 0 | RETURN_IF_ERROR( |
676 | 0 | read_columns_by_plan(tablet_schema, rsid_to_rowset, old_value_block, &read_index)); |
677 | | // !!!ATTENTION!!!: columns in old_value_block may have different size because every row has different columns to update |
678 | | |
679 | 0 | const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block); |
680 | | // build default value columns |
681 | 0 | auto default_value_block = old_value_block.clone_empty(); |
682 | 0 | if (has_default_or_nullable || delete_sign_column_data != nullptr) { |
683 | 0 | RETURN_IF_ERROR(BaseTablet::generate_default_value_block( |
684 | 0 | tablet_schema, non_sort_key_cids, info->default_values, old_value_block, |
685 | 0 | default_value_block)); |
686 | 0 | } |
687 | | |
688 | | // fill all non sort key columns from mutable_old_columns, need to consider default value and null value |
689 | 0 | for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) { |
690 | 0 | auto cid = non_sort_key_cids[i]; |
691 | 0 | const auto& tablet_column = tablet_schema.column(cid); |
692 | 0 | auto col_uid = tablet_column.unique_id(); |
693 | 0 | for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { |
694 | 0 | auto segment_pos = segment_start_pos + idx; |
695 | 0 | auto block_pos = block_start_pos + idx; |
696 | |
|
697 | 0 | RETURN_IF_ERROR(fill_non_primary_key_cell_for_column_store( |
698 | 0 | tablet_column, cid, mutable_full_columns[cid], |
699 | 0 | *default_value_block.get_by_position(i).column, |
700 | 0 | *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column, |
701 | 0 | block_pos, segment_pos, skip_bitmaps->at(block_pos).contains(col_uid), |
702 | 0 | tablet_schema.has_sequence_col() |
703 | 0 | ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id) |
704 | 0 | : false, |
705 | 0 | use_default_or_null_flag[idx], delete_sign_column_data, tablet_schema, |
706 | 0 | read_index, info, skip_bitmaps->at(block_pos))); |
707 | 0 | } |
708 | 0 | } |
709 | 0 | return Status::OK(); |
710 | 0 | } |
711 | | |
712 | | static Status fill_non_primary_key_cell_for_row_store( |
713 | | const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col, |
714 | | const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col, |
715 | | std::size_t block_pos, bool skipped, bool row_has_sequence_col, bool use_default, |
716 | | const signed char* delete_sign_column_data, bool old_row_exists, uint32_t pos_in_old_block, |
717 | | const TabletSchema& tablet_schema, const PartialUpdateInfo* info, |
718 | 0 | const BitmapValue& skip_bitmap) { |
719 | 0 | if (skipped) { |
720 | 0 | DCHECK(std::cmp_not_equal(cid, tablet_schema.skip_bitmap_col_idx())); |
721 | 0 | DCHECK(std::cmp_not_equal(cid, tablet_schema.version_col_idx())); |
722 | 0 | DCHECK(!tablet_column.is_row_store_column()); |
723 | 0 | if (!use_default) { |
724 | 0 | DCHECK(old_row_exists); |
725 | 0 | if (delete_sign_column_data != nullptr) { |
726 | 0 | bool old_row_delete_sign = (delete_sign_column_data[pos_in_old_block] != 0); |
727 | 0 | if (old_row_delete_sign) { |
728 | 0 | if (!tablet_schema.has_sequence_col()) { |
729 | 0 | use_default = true; |
730 | 0 | } else if (row_has_sequence_col || |
731 | 0 | (!tablet_column.is_seqeunce_col() && |
732 | 0 | (tablet_column.unique_id() != info->sequence_map_col_uid()))) { |
733 | | // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column) |
734 | | // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise |
735 | | // it may cause the merge-on-read based compaction to produce incorrect results |
736 | 0 | use_default = true; |
737 | 0 | } |
738 | 0 | } |
739 | 0 | } |
740 | 0 | } |
741 | |
|
742 | 0 | if (!use_default && tablet_column.is_on_update_current_timestamp()) { |
743 | 0 | use_default = true; |
744 | 0 | } |
745 | 0 | if (use_default) { |
746 | 0 | if (tablet_column.has_default_value()) { |
747 | 0 | new_col->insert_from(default_value_col, 0); |
748 | 0 | } else if (tablet_column.is_nullable()) { |
749 | 0 | assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get()) |
750 | 0 | ->insert_many_defaults(1); |
751 | 0 | } else if (tablet_column.is_auto_increment()) { |
752 | | // In flexible partial update, the skip bitmap indicates whether a cell |
753 | | // is specified in the original load, so the generated auto-increment value is filled |
754 | | // in current block in place if needed rather than using a seperate column to |
755 | | // store the generated auto-increment value in fixed partial update |
756 | 0 | new_col->insert_from(cur_col, block_pos); |
757 | 0 | } else { |
758 | 0 | new_col->insert_default(); |
759 | 0 | } |
760 | 0 | } else { |
761 | 0 | new_col->insert_from(old_value_col, pos_in_old_block); |
762 | 0 | } |
763 | 0 | } else { |
764 | 0 | bool old_row_delete_sign = (old_row_exists && delete_sign_column_data != nullptr && |
765 | 0 | delete_sign_column_data[pos_in_old_block] != 0); |
766 | 0 | if (tablet_column.is_variant_type() && old_row_exists && !use_default && |
767 | 0 | !old_row_delete_sign) { |
768 | 0 | RETURN_IF_ERROR(variant_util::merge_variant_patch_by_path_markers( |
769 | 0 | old_value_col, pos_in_old_block, cur_col, block_pos, tablet_column.unique_id(), |
770 | 0 | skip_bitmap, false, *new_col)); |
771 | 0 | return Status::OK(); |
772 | 0 | } |
773 | 0 | new_col->insert_from(cur_col, block_pos); |
774 | 0 | } |
775 | 0 | return Status::OK(); |
776 | 0 | } |
777 | | |
778 | | Status FlexibleReadPlan::fill_non_primary_key_columns_for_row_store( |
779 | | RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, |
780 | | const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids, |
781 | | Block& old_value_block, MutableColumns& mutable_full_columns, |
782 | | const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable, |
783 | | uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block, |
784 | 0 | std::vector<BitmapValue>* skip_bitmaps) const { |
785 | 0 | auto* info = rowset_ctx->partial_update_info.get(); |
786 | 0 | int32_t seq_col_unique_id = -1; |
787 | 0 | if (tablet_schema.has_sequence_col()) { |
788 | 0 | seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id(); |
789 | 0 | } |
790 | | // segment pos to write -> rowid to read in old_value_block |
791 | 0 | std::map<uint32_t, uint32_t> read_index; |
792 | 0 | RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, non_sort_key_cids, rsid_to_rowset, |
793 | 0 | old_value_block, &read_index)); |
794 | | |
795 | 0 | const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block); |
796 | | // build default value columns |
797 | 0 | auto default_value_block = old_value_block.clone_empty(); |
798 | 0 | if (has_default_or_nullable || delete_sign_column_data != nullptr) { |
799 | 0 | RETURN_IF_ERROR(BaseTablet::generate_default_value_block( |
800 | 0 | tablet_schema, non_sort_key_cids, info->default_values, old_value_block, |
801 | 0 | default_value_block)); |
802 | 0 | } |
803 | | |
804 | | // fill all non sort key columns from mutable_old_columns, need to consider default value and null value |
805 | 0 | for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) { |
806 | 0 | auto cid = non_sort_key_cids[i]; |
807 | 0 | const auto& tablet_column = tablet_schema.column(cid); |
808 | 0 | auto col_uid = tablet_column.unique_id(); |
809 | 0 | for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { |
810 | 0 | auto segment_pos = segment_start_pos + idx; |
811 | 0 | auto block_pos = block_start_pos + idx; |
812 | 0 | auto read_index_iter = read_index.find(segment_pos); |
813 | 0 | bool old_row_exists = (read_index_iter != read_index.end()); |
814 | 0 | uint32_t pos_in_old_block = old_row_exists ? read_index_iter->second : 0; |
815 | |
|
816 | 0 | RETURN_IF_ERROR(fill_non_primary_key_cell_for_row_store( |
817 | 0 | tablet_column, cid, mutable_full_columns[cid], |
818 | 0 | *default_value_block.get_by_position(i).column, |
819 | 0 | *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column, |
820 | 0 | block_pos, skip_bitmaps->at(block_pos).contains(col_uid), |
821 | 0 | tablet_schema.has_sequence_col() |
822 | 0 | ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id) |
823 | 0 | : false, |
824 | 0 | use_default_or_null_flag[idx], delete_sign_column_data, old_row_exists, |
825 | 0 | pos_in_old_block, tablet_schema, info, skip_bitmaps->at(block_pos))); |
826 | 0 | } |
827 | 0 | } |
828 | 0 | return Status::OK(); |
829 | 0 | } |
830 | | |
831 | | BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer) |
832 | 13 | : _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {} |
833 | | |
834 | | Status BlockAggregator::merge_one_row(MutableBlock& dst_block, Block* src_block, int rid, |
835 | 0 | BitmapValue& skip_bitmap) { |
836 | 0 | for (size_t cid {_tablet_schema.num_key_columns()}; cid < _tablet_schema.num_columns(); cid++) { |
837 | 0 | if (std::cmp_equal(cid, _tablet_schema.skip_bitmap_col_idx())) { |
838 | 0 | auto& cur_skip_bitmap = |
839 | 0 | assert_cast<ColumnBitmap*>(dst_block.mutable_columns()[cid].get()) |
840 | 0 | ->get_data() |
841 | 0 | .back(); |
842 | 0 | const auto& new_row_skip_bitmap = |
843 | 0 | assert_cast<ColumnBitmap*>( |
844 | 0 | src_block->get_by_position(cid).column->assume_mutable().get()) |
845 | 0 | ->get_data()[rid]; |
846 | 0 | BitmapValue merged_skip_bitmap; |
847 | 0 | RETURN_IF_ERROR(variant_util::merge_variant_patch_path_markers( |
848 | 0 | cur_skip_bitmap, new_row_skip_bitmap, &merged_skip_bitmap)); |
849 | 0 | cur_skip_bitmap = std::move(merged_skip_bitmap); |
850 | 0 | continue; |
851 | 0 | } |
852 | 0 | if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) { |
853 | 0 | if (_tablet_schema.column(cid).is_variant_type()) { |
854 | 0 | auto merged_col = dst_block.mutable_columns()[cid]->clone_empty(); |
855 | 0 | RETURN_IF_ERROR(variant_util::merge_variant_patch( |
856 | 0 | *dst_block.mutable_columns()[cid], |
857 | 0 | dst_block.mutable_columns()[cid]->size() - 1, |
858 | 0 | *src_block->get_by_position(cid).column, rid, *merged_col)); |
859 | 0 | dst_block.mutable_columns()[cid]->pop_back(1); |
860 | 0 | dst_block.mutable_columns()[cid]->insert_from(*merged_col, 0); |
861 | 0 | } else { |
862 | 0 | dst_block.mutable_columns()[cid]->pop_back(1); |
863 | 0 | dst_block.mutable_columns()[cid]->insert_from( |
864 | 0 | *src_block->get_by_position(cid).column, rid); |
865 | 0 | } |
866 | 0 | } |
867 | 0 | } |
868 | 0 | VLOG_DEBUG << fmt::format("merge a row, after merge, output_block.rows()={}, state: {}", |
869 | 0 | dst_block.rows(), _state.to_string()); |
870 | 0 | return Status::OK(); |
871 | 0 | } |
872 | | |
873 | 0 | void BlockAggregator::append_one_row(MutableBlock& dst_block, Block* src_block, int rid) { |
874 | 0 | dst_block.add_row(src_block, rid); |
875 | 0 | _state.rows++; |
876 | 0 | VLOG_DEBUG << fmt::format("append a new row, after append, output_block.rows()={}, state: {}", |
877 | 0 | dst_block.rows(), _state.to_string()); |
878 | 0 | } |
879 | | |
880 | 0 | void BlockAggregator::remove_last_n_rows(MutableBlock& dst_block, int n) { |
881 | 0 | if (n > 0) { |
882 | 0 | for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) { |
883 | 0 | DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n); |
884 | 0 | dst_block.mutable_columns()[cid]->pop_back(n); |
885 | 0 | } |
886 | 0 | } |
887 | 0 | } |
888 | | |
889 | | Status BlockAggregator::append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid, |
890 | 0 | BitmapValue& skip_bitmap, bool have_delete_sign) { |
891 | 0 | if (have_delete_sign) { |
892 | | // remove all the previous batched rows |
893 | 0 | remove_last_n_rows(dst_block, _state.rows); |
894 | 0 | _state.rows = 0; |
895 | 0 | _state.has_row_with_delete_sign = true; |
896 | |
|
897 | 0 | append_one_row(dst_block, src_block, rid); |
898 | 0 | } else { |
899 | 0 | if (_state.should_merge()) { |
900 | 0 | RETURN_IF_ERROR(merge_one_row(dst_block, src_block, rid, skip_bitmap)); |
901 | 0 | } else { |
902 | 0 | append_one_row(dst_block, src_block, rid); |
903 | 0 | } |
904 | 0 | } |
905 | 0 | return Status::OK(); |
906 | 0 | }; |
907 | | |
908 | | Status BlockAggregator::aggregate_rows( |
909 | | MutableBlock& output_block, Block* block, int start, int end, std::string key, |
910 | | std::vector<BitmapValue>* skip_bitmaps, const signed char* delete_signs, |
911 | | IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets, |
912 | 0 | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) { |
913 | 0 | VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end); |
914 | 0 | if (end - start == 1) { |
915 | 0 | output_block.add_row(block, start); |
916 | 0 | VLOG_DEBUG << fmt::format("append a row directly, rid={}", start); |
917 | 0 | return Status::OK(); |
918 | 0 | } |
919 | | |
920 | 0 | auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id(); |
921 | 0 | auto delete_sign_col_unique_id = |
922 | 0 | _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id(); |
923 | |
|
924 | 0 | _state.reset(); |
925 | |
|
926 | 0 | RowLocation loc; |
927 | 0 | RowsetSharedPtr rowset; |
928 | 0 | std::string previous_encoded_seq_value {}; |
929 | 0 | Status st = _writer._tablet->lookup_row_key( |
930 | 0 | key, &_tablet_schema, false, specified_rowsets, &loc, _writer._mow_context->max_version, |
931 | 0 | segment_caches, &rowset, true, &previous_encoded_seq_value); |
932 | 0 | int pos = start; |
933 | 0 | bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok()); |
934 | 0 | DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) |
935 | 0 | << "[BlockAggregator::aggregate_rows] unexpected error status while lookup_row_key:" |
936 | 0 | << st; |
937 | 0 | if (!is_expected_st) { |
938 | 0 | return st; |
939 | 0 | } |
940 | | |
941 | 0 | std::string cur_seq_val; |
942 | 0 | if (st.ok()) { |
943 | 0 | for (pos = start; pos < end; pos++) { |
944 | 0 | auto& skip_bitmap = skip_bitmaps->at(pos); |
945 | 0 | bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); |
946 | | // Discard all the rows whose seq value is smaller than previous_encoded_seq_value. |
947 | 0 | if (row_has_sequence_col) { |
948 | 0 | std::string seq_val {}; |
949 | 0 | _writer._encode_seq_column(seq_column, pos, &seq_val); |
950 | 0 | if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) < 0) { |
951 | 0 | continue; |
952 | 0 | } |
953 | 0 | cur_seq_val = std::move(seq_val); |
954 | 0 | break; |
955 | 0 | } |
956 | 0 | cur_seq_val = std::move(previous_encoded_seq_value); |
957 | 0 | break; |
958 | 0 | } |
959 | 0 | } else { |
960 | 0 | pos = start; |
961 | 0 | auto& skip_bitmap = skip_bitmaps->at(pos); |
962 | 0 | bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); |
963 | 0 | if (row_has_sequence_col) { |
964 | 0 | std::string seq_val {}; |
965 | | // for rows that don't specify seqeunce col, seq_val will be encoded to minial value |
966 | 0 | _writer._encode_seq_column(seq_column, pos, &seq_val); |
967 | 0 | cur_seq_val = std::move(seq_val); |
968 | 0 | } else { |
969 | 0 | cur_seq_val.clear(); |
970 | 0 | RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value( |
971 | 0 | _tablet_schema, *_writer._opts.rowset_ctx->partial_update_info, &cur_seq_val)); |
972 | 0 | } |
973 | 0 | } |
974 | | |
975 | 0 | for (int rid {pos}; rid < end; rid++) { |
976 | 0 | auto& skip_bitmap = skip_bitmaps->at(rid); |
977 | 0 | bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); |
978 | 0 | bool have_delete_sign = |
979 | 0 | (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0); |
980 | 0 | if (!row_has_sequence_col) { |
981 | 0 | RETURN_IF_ERROR( |
982 | 0 | append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign)); |
983 | 0 | } else { |
984 | 0 | std::string seq_val {}; |
985 | 0 | _writer._encode_seq_column(seq_column, rid, &seq_val); |
986 | 0 | if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) { |
987 | 0 | RETURN_IF_ERROR(append_or_merge_row(output_block, block, rid, skip_bitmap, |
988 | 0 | have_delete_sign)); |
989 | 0 | cur_seq_val = std::move(seq_val); |
990 | 0 | } else { |
991 | 0 | VLOG_DEBUG << fmt::format( |
992 | 0 | "skip rid={} becasue its seq value is lower than the previous", rid); |
993 | 0 | } |
994 | 0 | } |
995 | 0 | } |
996 | 0 | return Status::OK(); |
997 | 0 | }; |
998 | | |
999 | | Status BlockAggregator::aggregate_for_sequence_column( |
1000 | | Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns, |
1001 | | IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets, |
1002 | 0 | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) { |
1003 | 0 | DCHECK_EQ(block->columns(), _tablet_schema.num_columns()); |
1004 | | // the process logic here is the same as MemTable::_aggregate_for_flexible_partial_update_without_seq_col() |
1005 | | // after this function, there will be at most 2 rows for a specified key |
1006 | 0 | std::vector<BitmapValue>* skip_bitmaps = &( |
1007 | 0 | assert_cast<ColumnBitmap*>(block->get_by_position(_tablet_schema.skip_bitmap_col_idx()) |
1008 | 0 | .column->assume_mutable() |
1009 | 0 | .get()) |
1010 | 0 | ->get_data()); |
1011 | 0 | const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows); |
1012 | |
|
1013 | 0 | auto filtered_block = _tablet_schema.create_block(); |
1014 | 0 | MutableBlock output_block = MutableBlock::build_mutable_block(&filtered_block); |
1015 | |
|
1016 | 0 | int same_key_rows {0}; |
1017 | 0 | std::string previous_key {}; |
1018 | 0 | for (int block_pos {0}; block_pos < num_rows; block_pos++) { |
1019 | 0 | std::string key = _writer._full_encode_keys(key_columns, block_pos); |
1020 | 0 | if (block_pos > 0 && previous_key == key) { |
1021 | 0 | same_key_rows++; |
1022 | 0 | } else { |
1023 | 0 | if (same_key_rows > 0) { |
1024 | 0 | RETURN_IF_ERROR(aggregate_rows(output_block, block, block_pos - same_key_rows, |
1025 | 0 | block_pos, std::move(previous_key), skip_bitmaps, |
1026 | 0 | delete_signs, seq_column, specified_rowsets, |
1027 | 0 | segment_caches)); |
1028 | 0 | } |
1029 | 0 | same_key_rows = 1; |
1030 | 0 | } |
1031 | 0 | previous_key = std::move(key); |
1032 | 0 | } |
1033 | 0 | if (same_key_rows > 0) { |
1034 | 0 | RETURN_IF_ERROR(aggregate_rows(output_block, block, num_rows - same_key_rows, num_rows, |
1035 | 0 | std::move(previous_key), skip_bitmaps, delete_signs, |
1036 | 0 | seq_column, specified_rowsets, segment_caches)); |
1037 | 0 | } |
1038 | | |
1039 | 0 | block->swap(output_block.to_block()); |
1040 | 0 | return Status::OK(); |
1041 | 0 | } |
1042 | | |
1043 | | Status BlockAggregator::aggregate_without_sequence_column( |
1044 | 0 | Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns) { |
1045 | 0 | DCHECK_EQ(block->columns(), _tablet_schema.num_columns()); |
1046 | 0 | std::vector<BitmapValue>* skip_bitmaps = &( |
1047 | 0 | assert_cast<ColumnBitmap*>(block->get_by_position(_tablet_schema.skip_bitmap_col_idx()) |
1048 | 0 | .column->assume_mutable() |
1049 | 0 | .get()) |
1050 | 0 | ->get_data()); |
1051 | 0 | const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows); |
1052 | 0 | DCHECK(delete_signs != nullptr); |
1053 | 0 | int32_t delete_sign_col_unique_id = |
1054 | 0 | _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id(); |
1055 | |
|
1056 | 0 | auto aggregated_block = _tablet_schema.create_block(); |
1057 | 0 | MutableBlock output_block = MutableBlock::build_mutable_block(&aggregated_block); |
1058 | |
|
1059 | 0 | auto aggregate_range = [&](int start, int end) -> Status { |
1060 | 0 | if (end - start == 1) { |
1061 | 0 | output_block.add_row(block, start); |
1062 | 0 | return Status::OK(); |
1063 | 0 | } |
1064 | 0 | _state.reset(); |
1065 | 0 | for (int rid = start; rid < end; ++rid) { |
1066 | 0 | auto& skip_bitmap = skip_bitmaps->at(rid); |
1067 | 0 | bool have_delete_sign = |
1068 | 0 | (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0); |
1069 | 0 | RETURN_IF_ERROR( |
1070 | 0 | append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign)); |
1071 | 0 | } |
1072 | 0 | return Status::OK(); |
1073 | 0 | }; |
1074 | |
|
1075 | 0 | int same_key_rows {0}; |
1076 | 0 | std::string previous_key {}; |
1077 | 0 | const auto num_rows_int = static_cast<int>(num_rows); |
1078 | 0 | for (int block_pos {0}; block_pos < num_rows_int; block_pos++) { |
1079 | 0 | std::string key = _writer._full_encode_keys(key_columns, block_pos); |
1080 | 0 | if (block_pos > 0 && previous_key == key) { |
1081 | 0 | same_key_rows++; |
1082 | 0 | } else { |
1083 | 0 | if (same_key_rows > 0) { |
1084 | 0 | RETURN_IF_ERROR(aggregate_range(block_pos - same_key_rows, block_pos)); |
1085 | 0 | } |
1086 | 0 | same_key_rows = 1; |
1087 | 0 | } |
1088 | 0 | previous_key = std::move(key); |
1089 | 0 | } |
1090 | 0 | if (same_key_rows > 0) { |
1091 | 0 | RETURN_IF_ERROR(aggregate_range(num_rows_int - same_key_rows, num_rows_int)); |
1092 | 0 | } |
1093 | | |
1094 | 0 | if (output_block.rows() != num_rows) { |
1095 | 0 | block->swap(output_block.to_block()); |
1096 | 0 | } |
1097 | 0 | return Status::OK(); |
1098 | 0 | } |
1099 | | |
1100 | | Status BlockAggregator::fill_sequence_column(Block* block, size_t num_rows, |
1101 | | const FixedReadPlan& read_plan, |
1102 | 0 | std::vector<BitmapValue>& skip_bitmaps) { |
1103 | 0 | DCHECK(_tablet_schema.has_sequence_col()); |
1104 | 0 | std::vector<uint32_t> cids {static_cast<uint32_t>(_tablet_schema.sequence_col_idx())}; |
1105 | 0 | auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id(); |
1106 | |
|
1107 | 0 | auto seq_col_block = _tablet_schema.create_block_by_cids(cids); |
1108 | 0 | auto tmp_block = _tablet_schema.create_block_by_cids(cids); |
1109 | 0 | std::map<uint32_t, uint32_t> read_index; |
1110 | 0 | RETURN_IF_ERROR(read_plan.read_columns_by_plan(_tablet_schema, cids, _writer._rsid_to_rowset, |
1111 | 0 | seq_col_block, &read_index, false)); |
1112 | | |
1113 | 0 | auto new_seq_col_ptr = tmp_block.get_by_position(0).column->assume_mutable(); |
1114 | 0 | const auto& old_seq_col_ptr = *seq_col_block.get_by_position(0).column; |
1115 | 0 | const auto& cur_seq_col_ptr = *block->get_by_position(_tablet_schema.sequence_col_idx()).column; |
1116 | 0 | for (uint32_t block_pos {0}; block_pos < num_rows; block_pos++) { |
1117 | 0 | if (read_index.contains(block_pos)) { |
1118 | 0 | new_seq_col_ptr->insert_from(old_seq_col_ptr, read_index[block_pos]); |
1119 | 0 | skip_bitmaps[block_pos].remove(seq_col_unique_id); |
1120 | 0 | } else { |
1121 | 0 | new_seq_col_ptr->insert_from(cur_seq_col_ptr, block_pos); |
1122 | 0 | } |
1123 | 0 | } |
1124 | 0 | block->replace_by_position(_tablet_schema.sequence_col_idx(), std::move(new_seq_col_ptr)); |
1125 | 0 | return Status::OK(); |
1126 | 0 | } |
1127 | | |
1128 | | Status BlockAggregator::aggregate_for_insert_after_delete( |
1129 | | Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns, |
1130 | | const std::vector<RowsetSharedPtr>& specified_rowsets, |
1131 | 0 | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) { |
1132 | 0 | DCHECK_EQ(block->columns(), _tablet_schema.num_columns()); |
1133 | | // there will be at most 2 rows for a specified key in block when control flow reaches here |
1134 | | // after this function, there will not be duplicate rows in block |
1135 | |
|
1136 | 0 | std::vector<BitmapValue>* skip_bitmaps = &( |
1137 | 0 | assert_cast<ColumnBitmap*>(block->get_by_position(_tablet_schema.skip_bitmap_col_idx()) |
1138 | 0 | .column->assume_mutable() |
1139 | 0 | .get()) |
1140 | 0 | ->get_data()); |
1141 | 0 | const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows); |
1142 | |
|
1143 | 0 | auto filter_column = ColumnUInt8::create(num_rows, 1); |
1144 | 0 | auto* __restrict filter_map = filter_column->get_data().data(); |
1145 | 0 | std::string previous_key {}; |
1146 | 0 | bool previous_has_delete_sign {false}; |
1147 | 0 | int duplicate_rows {0}; |
1148 | 0 | int32_t delete_sign_col_unique_id = |
1149 | 0 | _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id(); |
1150 | 0 | auto seq_col_unique_id = |
1151 | 0 | (_tablet_schema.sequence_col_idx() != -1) |
1152 | 0 | ? _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id() |
1153 | 0 | : -1; |
1154 | 0 | FixedReadPlan read_plan; |
1155 | 0 | for (size_t block_pos {0}; block_pos < num_rows; block_pos++) { |
1156 | 0 | size_t delta_pos = block_pos; |
1157 | 0 | auto& skip_bitmap = skip_bitmaps->at(block_pos); |
1158 | 0 | std::string key = _writer._full_encode_keys(key_columns, delta_pos); |
1159 | 0 | bool have_delete_sign = |
1160 | 0 | (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0); |
1161 | 0 | if (delta_pos > 0 && previous_key == key) { |
1162 | | // !!ATTENTION!!: We can only remove the row with delete sign if there is a insert with the same key after this row. |
1163 | | // If there is only a row with delete sign, we should keep it and can't remove it from block, because |
1164 | | // compaction will not use the delete bitmap when reading data. So there may still be rows with delete sign |
1165 | | // in later process |
1166 | 0 | DCHECK(previous_has_delete_sign); |
1167 | 0 | DCHECK(!have_delete_sign); |
1168 | 0 | ++duplicate_rows; |
1169 | 0 | RowLocation loc; |
1170 | 0 | RowsetSharedPtr rowset; |
1171 | 0 | Status st = _writer._tablet->lookup_row_key( |
1172 | 0 | key, &_tablet_schema, false, specified_rowsets, &loc, |
1173 | 0 | _writer._mow_context->max_version, segment_caches, &rowset, true); |
1174 | 0 | bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok()); |
1175 | 0 | DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) |
1176 | 0 | << "[BlockAggregator::aggregate_for_insert_after_delete] unexpected error " |
1177 | 0 | "status while lookup_row_key:" |
1178 | 0 | << st; |
1179 | 0 | if (!is_expected_st) { |
1180 | 0 | return st; |
1181 | 0 | } |
1182 | | |
1183 | 0 | Slice previous_seq_slice {}; |
1184 | 0 | if (st.ok()) { |
1185 | 0 | if (_tablet_schema.has_sequence_col()) { |
1186 | | // if the insert row doesn't specify the sequence column, we need to |
1187 | | // read the historical's sequence column value so that we don't need |
1188 | | // to handle seqeunce column in append_block_with_flexible_content() |
1189 | | // for this row |
1190 | 0 | bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id)); |
1191 | 0 | if (!row_has_sequence_col) { |
1192 | 0 | read_plan.prepare_to_read(loc, block_pos); |
1193 | 0 | _writer._rsid_to_rowset.emplace(rowset->rowset_id(), rowset); |
1194 | 0 | } |
1195 | 0 | } |
1196 | | // delete the existing row |
1197 | 0 | _writer._mow_context->delete_bitmap->add( |
1198 | 0 | {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, |
1199 | 0 | loc.row_id); |
1200 | 0 | } |
1201 | | // and remove the row with delete sign from the current block |
1202 | 0 | filter_map[block_pos - 1] = 0; |
1203 | 0 | } |
1204 | 0 | previous_has_delete_sign = have_delete_sign; |
1205 | 0 | previous_key = std::move(key); |
1206 | 0 | } |
1207 | 0 | if (duplicate_rows > 0) { |
1208 | 0 | if (!read_plan.empty()) { |
1209 | | // fill sequence column value for some rows |
1210 | 0 | RETURN_IF_ERROR(fill_sequence_column(block, num_rows, read_plan, *skip_bitmaps)); |
1211 | 0 | } |
1212 | 0 | RETURN_IF_ERROR(filter_block(block, num_rows, std::move(filter_column), duplicate_rows, |
1213 | 0 | "__filter_insert_after_delete_col__")); |
1214 | 0 | } |
1215 | 0 | return Status::OK(); |
1216 | 0 | } |
1217 | | |
1218 | | Status BlockAggregator::filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column, |
1219 | 0 | int duplicate_rows, std::string col_name) { |
1220 | 0 | auto num_cols = block->columns(); |
1221 | 0 | block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), col_name}); |
1222 | 0 | RETURN_IF_ERROR(Block::filter_block(block, num_cols, num_cols)); |
1223 | 0 | DCHECK_EQ(num_cols, block->columns()); |
1224 | 0 | size_t merged_rows = num_rows - block->rows(); |
1225 | 0 | if (std::cmp_not_equal(duplicate_rows, merged_rows)) { |
1226 | 0 | auto msg = fmt::format( |
1227 | 0 | "filter_block_for_flexible_partial_update {}: duplicate_rows != merged_rows, " |
1228 | 0 | "duplicate_keys={}, merged_rows={}, num_rows={}, mutable_block->rows()={}", |
1229 | 0 | col_name, duplicate_rows, merged_rows, num_rows, block->rows()); |
1230 | 0 | DCHECK(false) << msg; |
1231 | 0 | return Status::InternalError<false>(msg); |
1232 | 0 | } |
1233 | 0 | return Status::OK(); |
1234 | 0 | } |
1235 | | |
1236 | | Status BlockAggregator::convert_pk_columns(Block* block, size_t row_pos, size_t num_rows, |
1237 | 0 | std::vector<IOlapColumnDataAccessor*>& key_columns) { |
1238 | 0 | key_columns.clear(); |
1239 | 0 | for (uint32_t cid {0}; cid < _tablet_schema.num_key_columns(); cid++) { |
1240 | 0 | RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column( |
1241 | 0 | block->get_by_position(cid), row_pos, num_rows, cid)); |
1242 | 0 | auto [status, column] = _writer._olap_data_convertor->convert_column_data(cid); |
1243 | 0 | if (!status.ok()) { |
1244 | 0 | return status; |
1245 | 0 | } |
1246 | 0 | key_columns.push_back(column); |
1247 | 0 | } |
1248 | 0 | return Status::OK(); |
1249 | 0 | } |
1250 | | |
1251 | | Status BlockAggregator::convert_seq_column(Block* block, size_t row_pos, size_t num_rows, |
1252 | 0 | IOlapColumnDataAccessor*& seq_column) { |
1253 | 0 | seq_column = nullptr; |
1254 | 0 | if (_tablet_schema.has_sequence_col()) { |
1255 | 0 | auto seq_col_idx = _tablet_schema.sequence_col_idx(); |
1256 | 0 | RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column( |
1257 | 0 | block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx)); |
1258 | 0 | auto [status, column] = _writer._olap_data_convertor->convert_column_data(seq_col_idx); |
1259 | 0 | if (!status.ok()) { |
1260 | 0 | return status; |
1261 | 0 | } |
1262 | 0 | seq_column = column; |
1263 | 0 | } |
1264 | 0 | return Status::OK(); |
1265 | 0 | }; |
1266 | | |
1267 | | Status BlockAggregator::aggregate_for_flexible_partial_update( |
1268 | | Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets, |
1269 | 0 | std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) { |
1270 | 0 | std::vector<IOlapColumnDataAccessor*> key_columns {}; |
1271 | 0 | IOlapColumnDataAccessor* seq_column {nullptr}; |
1272 | |
|
1273 | 0 | RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns)); |
1274 | 0 | RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column)); |
1275 | | |
1276 | | // 1. merge duplicate rows when table has sequence column |
1277 | | // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column, |
1278 | | // some of them don't. We can't do the de-duplication in memtable because we don't know the historical data. We must |
1279 | | // de-duplicate them here. |
1280 | 0 | if (_tablet_schema.has_sequence_col()) { |
1281 | 0 | RETURN_IF_ERROR(aggregate_for_sequence_column(block, static_cast<int>(num_rows), |
1282 | 0 | key_columns, seq_column, specified_rowsets, |
1283 | 0 | segment_caches)); |
1284 | 0 | } else { |
1285 | 0 | RETURN_IF_ERROR(aggregate_without_sequence_column(block, num_rows, key_columns)); |
1286 | 0 | } |
1287 | | |
1288 | | // 2. merge duplicate rows and handle insert after delete |
1289 | 0 | if (block->rows() != num_rows) { |
1290 | 0 | num_rows = block->rows(); |
1291 | | // data in block has changed, should re-encode key columns, sequence column |
1292 | 0 | _writer._olap_data_convertor->clear_source_content(); |
1293 | 0 | RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns)); |
1294 | 0 | RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column)); |
1295 | 0 | } |
1296 | 0 | RETURN_IF_ERROR(aggregate_for_insert_after_delete(block, num_rows, key_columns, |
1297 | 0 | specified_rowsets, segment_caches)); |
1298 | 0 | return Status::OK(); |
1299 | 0 | } |
1300 | | |
1301 | | } // namespace doris |