Coverage Report

Created: 2026-05-14 09:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/partial_update_info.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/partial_update_info.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <cstdint>
23
#include <utility>
24
25
#include "common/consts.h"
26
#include "common/logging.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
30
#include "core/value/bitmap_value.h"
31
#include "exec/common/variant_util.h"
32
#include "storage/iterator/olap_data_convertor.h"
33
#include "storage/olap_common.h"
34
#include "storage/rowset/rowset.h"
35
#include "storage/rowset/rowset_writer_context.h"
36
#include "storage/segment/vertical_segment_writer.h"
37
#include "storage/tablet/base_tablet.h"
38
#include "storage/tablet/tablet_meta.h"
39
#include "storage/tablet/tablet_schema.h"
40
#include "storage/utils.h"
41
42
namespace doris {
43
Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
44
                               UniqueKeyUpdateModePB unique_key_update_mode,
45
                               PartialUpdateNewRowPolicyPB policy,
46
                               const std::set<std::string>& partial_update_cols,
47
                               bool is_strict_mode_, int64_t timestamp_ms_, int32_t nano_seconds_,
48
                               const std::string& timezone_,
49
                               const std::string& auto_increment_column,
50
30
                               int32_t sequence_map_col_uid, int64_t cur_max_version) {
51
30
    partial_update_mode = unique_key_update_mode;
52
30
    partial_update_new_key_policy = policy;
53
30
    partial_update_input_columns = partial_update_cols;
54
30
    max_version_in_flush_phase = cur_max_version;
55
30
    sequence_map_col_unqiue_id = sequence_map_col_uid;
56
30
    timestamp_ms = timestamp_ms_;
57
30
    nano_seconds = nano_seconds_;
58
30
    timezone = timezone_;
59
30
    missing_cids.clear();
60
30
    update_cids.clear();
61
62
30
    if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
63
        // partial_update_cols should include all key columns
64
0
        for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) {
65
0
            const auto key_col = tablet_schema.column(i);
66
0
            if (!partial_update_cols.contains(key_col.name())) {
67
0
                auto msg = fmt::format(
68
0
                        "Unable to do partial update on shadow index's tablet, tablet_id={}, "
69
0
                        "txn_id={}. Missing key column {}.",
70
0
                        tablet_id, txn_id, key_col.name());
71
0
                LOG_WARNING(msg);
72
0
                return Status::Aborted<false>(msg);
73
0
            }
74
0
        }
75
0
    }
76
30
    if (is_partial_update()) {
77
0
        for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
78
0
            if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
79
0
                auto tablet_column = tablet_schema.column(i);
80
0
                if (!partial_update_input_columns.contains(tablet_column.name())) {
81
0
                    missing_cids.emplace_back(i);
82
0
                    if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
83
0
                        tablet_schema.auto_increment_column() != tablet_column.name()) {
84
0
                        can_insert_new_rows_in_partial_update = false;
85
0
                    }
86
0
                } else {
87
0
                    update_cids.emplace_back(i);
88
0
                }
89
0
                if (auto_increment_column == tablet_column.name()) {
90
0
                    is_schema_contains_auto_inc_column = true;
91
0
                }
92
0
            } else {
93
                // in flexible partial update, missing cids is all non sort keys' cid
94
0
                if (i >= tablet_schema.num_key_columns()) {
95
0
                    missing_cids.emplace_back(i);
96
0
                }
97
0
            }
98
0
        }
99
0
        _generate_default_values_for_missing_cids(tablet_schema);
100
0
    }
101
30
    is_strict_mode = is_strict_mode_;
102
30
    is_input_columns_contains_auto_inc_column =
103
30
            is_fixed_partial_update() &&
104
30
            partial_update_input_columns.contains(auto_increment_column);
105
30
    return Status::OK();
106
30
}
107
108
1
void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
109
1
    partial_update_info_pb->set_partial_update_mode(partial_update_mode);
110
1
    partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy);
111
1
    partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
112
1
    for (const auto& col : partial_update_input_columns) {
113
0
        partial_update_info_pb->add_partial_update_input_columns(col);
114
0
    }
115
1
    for (auto cid : missing_cids) {
116
0
        partial_update_info_pb->add_missing_cids(cid);
117
0
    }
118
1
    for (auto cid : update_cids) {
119
0
        partial_update_info_pb->add_update_cids(cid);
120
0
    }
121
1
    partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
122
1
            can_insert_new_rows_in_partial_update);
123
1
    partial_update_info_pb->set_is_strict_mode(is_strict_mode);
124
1
    partial_update_info_pb->set_timestamp_ms(timestamp_ms);
125
1
    partial_update_info_pb->set_nano_seconds(nano_seconds);
126
1
    partial_update_info_pb->set_timezone(timezone);
127
1
    partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
128
1
            is_input_columns_contains_auto_inc_column);
129
1
    partial_update_info_pb->set_is_schema_contains_auto_inc_column(
130
1
            is_schema_contains_auto_inc_column);
131
1
    partial_update_info_pb->set_sequence_map_col_uid(sequence_map_col_unqiue_id);
132
1
    for (const auto& value : default_values) {
133
0
        partial_update_info_pb->add_default_values(value);
134
0
    }
135
1
}
136
137
2
void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
138
2
    if (!partial_update_info_pb->has_partial_update_mode()) {
139
        // for backward compatibility
140
0
        if (partial_update_info_pb->is_partial_update()) {
141
0
            partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
142
0
        } else {
143
0
            partial_update_mode = UniqueKeyUpdateModePB::UPSERT;
144
0
        }
145
2
    } else {
146
2
        partial_update_mode = partial_update_info_pb->partial_update_mode();
147
2
    }
148
2
    if (partial_update_info_pb->has_partial_update_new_key_policy()) {
149
1
        partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy();
150
1
    }
151
2
    max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
152
2
                                         ? partial_update_info_pb->max_version_in_flush_phase()
153
2
                                         : -1;
154
2
    partial_update_input_columns.clear();
155
2
    for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
156
0
        partial_update_input_columns.insert(col);
157
0
    }
158
2
    missing_cids.clear();
159
2
    for (auto cid : partial_update_info_pb->missing_cids()) {
160
0
        missing_cids.push_back(cid);
161
0
    }
162
2
    update_cids.clear();
163
2
    for (auto cid : partial_update_info_pb->update_cids()) {
164
0
        update_cids.push_back(cid);
165
0
    }
166
2
    can_insert_new_rows_in_partial_update =
167
2
            partial_update_info_pb->can_insert_new_rows_in_partial_update();
168
2
    is_strict_mode = partial_update_info_pb->is_strict_mode();
169
2
    timestamp_ms = partial_update_info_pb->timestamp_ms();
170
2
    timezone = partial_update_info_pb->timezone();
171
2
    is_input_columns_contains_auto_inc_column =
172
2
            partial_update_info_pb->is_input_columns_contains_auto_inc_column();
173
2
    is_schema_contains_auto_inc_column =
174
2
            partial_update_info_pb->is_schema_contains_auto_inc_column();
175
2
    sequence_map_col_unqiue_id = partial_update_info_pb->has_sequence_map_col_uid()
176
2
                                         ? partial_update_info_pb->sequence_map_col_uid()
177
2
                                         : -1;
178
2
    if (partial_update_info_pb->has_nano_seconds()) {
179
1
        nano_seconds = partial_update_info_pb->nano_seconds();
180
1
    }
181
2
    default_values.clear();
182
2
    for (const auto& value : partial_update_info_pb->default_values()) {
183
0
        default_values.push_back(value);
184
0
    }
185
2
}
186
187
0
std::string PartialUpdateInfo::summary() const {
188
0
    std::string mode;
189
0
    switch (partial_update_mode) {
190
0
    case UniqueKeyUpdateModePB::UPSERT:
191
0
        mode = "upsert";
192
0
        break;
193
0
    case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS:
194
0
        mode = "fixed partial update";
195
0
        break;
196
0
    case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
197
0
        mode = "flexible partial update";
198
0
        break;
199
0
    }
200
0
    return fmt::format(
201
0
            "mode={}, update_cids={}, missing_cids={}, is_strict_mode={}, "
202
0
            "max_version_in_flush_phase={}",
203
0
            mode, update_cids.size(), missing_cids.size(), is_strict_mode,
204
0
            max_version_in_flush_phase);
205
0
}
206
207
Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema,
208
                                         const std::function<std::string()>& line,
209
0
                                         BitmapValue* skip_bitmap) {
210
0
    switch (partial_update_new_key_policy) {
211
0
    case doris::PartialUpdateNewRowPolicyPB::APPEND: {
212
0
        if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
213
0
            if (!can_insert_new_rows_in_partial_update) {
214
0
                std::string error_column;
215
0
                for (auto cid : missing_cids) {
216
0
                    const TabletColumn& col = tablet_schema.column(cid);
217
0
                    if (!col.has_default_value() && !col.is_nullable() &&
218
0
                        !(tablet_schema.auto_increment_column() == col.name())) {
219
0
                        error_column = col.name();
220
0
                        break;
221
0
                    }
222
0
                }
223
0
                return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
224
0
                        "the unmentioned column `{}` should have default value or be nullable "
225
0
                        "for newly inserted rows in non-strict mode partial update",
226
0
                        error_column);
227
0
            }
228
0
        } else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) {
229
0
            DCHECK(skip_bitmap != nullptr);
230
0
            bool can_insert_new_row {true};
231
0
            std::string error_column;
232
0
            for (auto cid : missing_cids) {
233
0
                const TabletColumn& col = tablet_schema.column(cid);
234
0
                if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() &&
235
0
                    !col.is_nullable() && !col.is_auto_increment()) {
236
0
                    error_column = col.name();
237
0
                    can_insert_new_row = false;
238
0
                    break;
239
0
                }
240
0
            }
241
0
            if (!can_insert_new_row) {
242
0
                return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
243
0
                        "the unmentioned column `{}` should have default value or be "
244
0
                        "nullable for newly inserted rows in non-strict mode flexible partial "
245
0
                        "update",
246
0
                        error_column);
247
0
            }
248
0
        }
249
0
    } break;
250
0
    case doris::PartialUpdateNewRowPolicyPB::ERROR: {
251
0
        return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>(
252
0
                "Can't append new rows in partial update when partial_update_new_key_behavior is "
253
0
                "ERROR. Row with key=[{}] is not in table.",
254
0
                line());
255
0
    } break;
256
0
    }
257
0
    return Status::OK();
258
0
}
259
260
void PartialUpdateInfo::_generate_default_values_for_missing_cids(
261
0
        const TabletSchema& tablet_schema) {
262
0
    for (unsigned int cur_cid : missing_cids) {
263
0
        const auto& column = tablet_schema.column(cur_cid);
264
0
        if (column.has_default_value()) {
265
0
            std::string default_value;
266
0
            if (UNLIKELY((column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 ||
267
0
                          column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) &&
268
0
                         to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
269
0
                                 std::string::npos)) {
270
0
                auto pos = to_lower(column.default_value()).find('(');
271
0
                if (pos == std::string::npos) {
272
0
                    DateV2Value<DateTimeV2ValueType> dtv;
273
0
                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
274
0
                    default_value = dtv.to_string();
275
0
                    if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) {
276
0
                        default_value += timezone;
277
0
                    }
278
0
                } else {
279
0
                    int precision = std::stoi(column.default_value().substr(pos + 1));
280
0
                    DateV2Value<DateTimeV2ValueType> dtv;
281
0
                    dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
282
0
                    default_value = dtv.to_string();
283
0
                    if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) {
284
0
                        default_value += timezone;
285
0
                    }
286
0
                }
287
0
            } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
288
0
                                to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
289
0
                                        std::string::npos)) {
290
0
                DateV2Value<DateV2ValueType> dv;
291
0
                dv.from_unixtime(timestamp_ms / 1000, timezone);
292
0
                default_value = dv.to_string();
293
0
            } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_BITMAP &&
294
0
                                to_lower(column.default_value()).find(to_lower("BITMAP_EMPTY")) !=
295
0
                                        std::string::npos)) {
296
0
                BitmapValue v = BitmapValue {};
297
0
                default_value = v.to_string();
298
0
            } else {
299
0
                default_value = column.default_value();
300
0
            }
301
0
            default_values.emplace_back(default_value);
302
0
        } else {
303
            // place an empty string here
304
0
            default_values.emplace_back();
305
0
        }
306
0
    }
307
0
    CHECK_EQ(missing_cids.size(), default_values.size());
308
0
}
309
310
0
bool FixedReadPlan::empty() const {
311
0
    return plan.empty();
312
0
}
313
314
0
void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) {
315
0
    plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos);
316
0
}
317
318
// read columns by read plan
319
// read_index: ori_pos-> block_idx
320
Status FixedReadPlan::read_columns_by_plan(
321
        const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read,
322
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& block,
323
        std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs,
324
0
        const signed char* __restrict cur_delete_signs) const {
325
0
    if (force_read_old_delete_signs) {
326
        // always read delete sign column from historical data
327
0
        if (block.get_position_by_name(DELETE_SIGN) == -1) {
328
0
            auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
329
0
            cids_to_read.emplace_back(del_col_cid);
330
0
            block.swap(tablet_schema.create_block_by_cids(cids_to_read));
331
0
        }
332
0
    }
333
0
    bool has_row_column = tablet_schema.has_row_store_for_all_columns();
334
0
    auto mutable_columns = block.mutate_columns();
335
0
    uint32_t read_idx = 0;
336
0
    for (const auto& [rowset_id, segment_row_mappings] : plan) {
337
0
        for (const auto& [segment_id, mappings] : segment_row_mappings) {
338
0
            auto rowset_iter = rsid_to_rowset.find(rowset_id);
339
0
            CHECK(rowset_iter != rsid_to_rowset.end());
340
0
            std::vector<uint32_t> rids;
341
0
            for (auto [rid, pos] : mappings) {
342
0
                if (cur_delete_signs && cur_delete_signs[pos]) {
343
0
                    continue;
344
0
                }
345
0
                rids.emplace_back(rid);
346
0
                (*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
347
0
            }
348
0
            if (has_row_column) {
349
0
                auto st = BaseTablet::fetch_value_through_row_column(
350
0
                        rowset_iter->second, tablet_schema, segment_id, rids, cids_to_read, block);
351
0
                if (!st.ok()) {
352
0
                    LOG(WARNING) << "failed to fetch value through row column";
353
0
                    return st;
354
0
                }
355
0
                continue;
356
0
            }
357
0
            for (size_t cid = 0; cid < mutable_columns.size(); ++cid) {
358
0
                TabletColumn tablet_column = tablet_schema.column(cids_to_read[cid]);
359
0
                auto st = doris::BaseTablet::fetch_value_by_rowids(
360
0
                        rowset_iter->second, segment_id, rids, tablet_column, mutable_columns[cid]);
361
                // set read value to output block
362
0
                if (!st.ok()) {
363
0
                    LOG(WARNING) << "failed to fetch value";
364
0
                    return st;
365
0
                }
366
0
            }
367
0
        }
368
0
    }
369
0
    block.set_columns(std::move(mutable_columns));
370
0
    return Status::OK();
371
0
}
372
373
Status FixedReadPlan::fill_missing_columns(
374
        RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
375
        const TabletSchema& tablet_schema, Block& full_block,
376
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
377
0
        uint32_t segment_start_pos, const Block* block) const {
378
0
    auto mutable_full_columns = full_block.mutate_columns();
379
    // create old value columns
380
0
    const auto& missing_cids = rowset_ctx->partial_update_info->missing_cids;
381
0
    bool have_input_seq_column = false;
382
0
    if (tablet_schema.has_sequence_col()) {
383
0
        const std::vector<uint32_t>& including_cids = rowset_ctx->partial_update_info->update_cids;
384
0
        have_input_seq_column =
385
0
                (std::find(including_cids.cbegin(), including_cids.cend(),
386
0
                           tablet_schema.sequence_col_idx()) != including_cids.cend());
387
0
    }
388
389
0
    auto old_value_block = tablet_schema.create_block_by_cids(missing_cids);
390
0
    CHECK_EQ(missing_cids.size(), old_value_block.columns());
391
392
    // segment pos to write -> rowid to read in old_value_block
393
0
    std::map<uint32_t, uint32_t> read_index;
394
0
    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
395
0
                                         old_value_block, &read_index, true, nullptr));
396
397
0
    const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
398
0
    if (old_delete_signs == nullptr) {
399
0
        return Status::InternalError("old delete signs column not found, block: {}",
400
0
                                     old_value_block.dump_structure());
401
0
    }
402
    // build default value columns
403
0
    auto default_value_block = old_value_block.clone_empty();
404
0
    RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
405
0
            tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values,
406
0
            old_value_block, default_value_block));
407
0
    auto mutable_default_value_columns = default_value_block.mutate_columns();
408
409
    // fill all missing value from mutable_old_columns, need to consider default value and null value
410
0
    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
411
0
        auto segment_pos = idx + segment_start_pos;
412
0
        auto pos_in_old_block = read_index[segment_pos];
413
414
0
        for (auto i = 0; i < missing_cids.size(); ++i) {
415
            // if the column has default value, fill it with default value
416
            // otherwise, if the column is nullable, fill it with null value
417
0
            const auto& tablet_column = tablet_schema.column(missing_cids[i]);
418
0
            auto& missing_col = mutable_full_columns[missing_cids[i]];
419
420
0
            bool should_use_default = use_default_or_null_flag[idx];
421
0
            if (!should_use_default) {
422
0
                bool old_row_delete_sign =
423
0
                        (old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0);
424
0
                if (old_row_delete_sign) {
425
0
                    if (!tablet_schema.has_sequence_col()) {
426
0
                        should_use_default = true;
427
0
                    } else if (have_input_seq_column || (!tablet_column.is_seqeunce_col())) {
428
                        // to keep the sequence column value not decreasing, we should read values of seq column
429
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
430
                        // it may cause the merge-on-read based compaction to produce incorrect results
431
0
                        should_use_default = true;
432
0
                    }
433
0
                }
434
0
            }
435
436
0
            if (should_use_default) {
437
0
                if (tablet_column.has_default_value()) {
438
0
                    missing_col->insert_from(*mutable_default_value_columns[i], 0);
439
0
                } else if (tablet_column.is_nullable()) {
440
0
                    auto* nullable_column = assert_cast<ColumnNullable*>(missing_col.get());
441
0
                    nullable_column->insert_many_defaults(1);
442
0
                } else if (tablet_schema.auto_increment_column() == tablet_column.name()) {
443
0
                    const auto& column =
444
0
                            *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name()));
445
0
                    DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
446
0
                    auto* auto_inc_column = assert_cast<ColumnInt64*>(missing_col.get());
447
0
                    int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
448
0
                    if (pos == -1) {
449
0
                        return Status::InternalError("auto increment column not found in block {}",
450
0
                                                     block->dump_structure());
451
0
                    }
452
0
                    auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
453
0
                } else {
454
                    // If the control flow reaches this branch, the column neither has default value
455
                    // nor is nullable. It means that the row's delete sign is marked, and the value
456
                    // columns are useless and won't be read. So we can just put arbitary values in the cells
457
0
                    missing_col->insert_default();
458
0
                }
459
0
            } else {
460
0
                missing_col->insert_from(*old_value_block.get_by_position(i).column,
461
0
                                         pos_in_old_block);
462
0
            }
463
0
        }
464
0
    }
465
0
    full_block.set_columns(std::move(mutable_full_columns));
466
0
    return Status::OK();
467
0
}
468
469
void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos,
470
0
                                       const BitmapValue& skip_bitmap) {
471
0
    if (!use_row_store) {
472
0
        for (uint64_t col_uid : skip_bitmap) {
473
0
            if (variant_util::is_variant_patch_path_marker(col_uid)) {
474
0
                continue;
475
0
            }
476
0
            plan[row_location.rowset_id][row_location.segment_id][static_cast<uint32_t>(col_uid)]
477
0
                    .emplace_back(row_location.row_id, pos);
478
0
        }
479
0
    } else {
480
0
        row_store_plan[row_location.rowset_id][row_location.segment_id].emplace_back(
481
0
                row_location.row_id, pos);
482
0
    }
483
0
}
484
485
Status FlexibleReadPlan::read_columns_by_plan(
486
        const TabletSchema& tablet_schema,
487
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block,
488
0
        std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const {
489
0
    auto mutable_columns = old_value_block.mutate_columns();
490
491
    // cid -> next rid to fill in block
492
0
    std::map<uint32_t, uint32_t> next_read_idx;
493
0
    for (uint32_t cid {0}; cid < tablet_schema.num_columns(); cid++) {
494
0
        next_read_idx[cid] = 0;
495
0
    }
496
497
0
    for (const auto& [rowset_id, segment_mappings] : plan) {
498
0
        for (const auto& [segment_id, uid_mappings] : segment_mappings) {
499
0
            for (const auto& [col_uid, mappings] : uid_mappings) {
500
0
                auto rowset_iter = rsid_to_rowset.find(rowset_id);
501
0
                CHECK(rowset_iter != rsid_to_rowset.end());
502
0
                auto cid = tablet_schema.field_index(col_uid);
503
0
                DCHECK_NE(cid, -1);
504
0
                DCHECK_GE(cid, tablet_schema.num_key_columns());
505
0
                std::vector<uint32_t> rids;
506
0
                for (auto [rid, pos] : mappings) {
507
0
                    rids.emplace_back(rid);
508
0
                    (*read_index)[cid][static_cast<uint32_t>(pos)] = next_read_idx[cid]++;
509
0
                }
510
511
0
                TabletColumn tablet_column = tablet_schema.column(cid);
512
0
                auto idx = cid - tablet_schema.num_key_columns();
513
0
                RETURN_IF_ERROR(doris::BaseTablet::fetch_value_by_rowids(
514
0
                        rowset_iter->second, segment_id, rids, tablet_column,
515
0
                        mutable_columns[idx]));
516
0
            }
517
0
        }
518
0
    }
519
    // !!!ATTENTION!!!: columns in block may have different size because every row has different columns to update
520
0
    old_value_block.set_columns(std::move(mutable_columns));
521
0
    return Status::OK();
522
0
}
523
524
Status FlexibleReadPlan::read_columns_by_plan(
525
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& cids_to_read,
526
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block,
527
0
        std::map<uint32_t, uint32_t>* read_index) const {
528
0
    DCHECK(use_row_store);
529
0
    uint32_t read_idx = 0;
530
0
    for (const auto& [rowset_id, segment_row_mappings] : row_store_plan) {
531
0
        for (const auto& [segment_id, mappings] : segment_row_mappings) {
532
0
            auto rowset_iter = rsid_to_rowset.find(rowset_id);
533
0
            CHECK(rowset_iter != rsid_to_rowset.end());
534
0
            std::vector<uint32_t> rids;
535
0
            for (auto [rid, pos] : mappings) {
536
0
                rids.emplace_back(rid);
537
0
                (*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
538
0
            }
539
0
            auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second, tablet_schema,
540
0
                                                                 segment_id, rids, cids_to_read,
541
0
                                                                 old_value_block);
542
0
            if (!st.ok()) {
543
0
                LOG(WARNING) << "failed to fetch value through row column";
544
0
                return st;
545
0
            }
546
0
        }
547
0
    }
548
0
    return Status::OK();
549
0
}
550
551
Status FlexibleReadPlan::fill_non_primary_key_columns(
552
        RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
553
        const TabletSchema& tablet_schema, Block& full_block,
554
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
555
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
556
0
        std::vector<BitmapValue>* skip_bitmaps) const {
557
0
    auto mutable_full_columns = full_block.mutate_columns();
558
559
    // missing_cids are all non sort key columns' cids
560
0
    const auto& non_sort_key_cids = rowset_ctx->partial_update_info->missing_cids;
561
0
    auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids);
562
0
    CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns());
563
564
0
    if (!use_row_store) {
565
0
        RETURN_IF_ERROR(fill_non_primary_key_columns_for_column_store(
566
0
                rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block,
567
0
                mutable_full_columns, use_default_or_null_flag, has_default_or_nullable,
568
0
                segment_start_pos, block_start_pos, block, skip_bitmaps));
569
0
    } else {
570
0
        RETURN_IF_ERROR(fill_non_primary_key_columns_for_row_store(
571
0
                rowset_ctx, rsid_to_rowset, tablet_schema, non_sort_key_cids, old_value_block,
572
0
                mutable_full_columns, use_default_or_null_flag, has_default_or_nullable,
573
0
                segment_start_pos, block_start_pos, block, skip_bitmaps));
574
0
    }
575
0
    full_block.set_columns(std::move(mutable_full_columns));
576
0
    return Status::OK();
577
0
}
578
579
static bool old_row_has_delete_sign_for_column_store(
580
        const signed char* delete_sign_column_data, const TabletSchema& tablet_schema,
581
0
        std::map<uint32_t, std::map<uint32_t, uint32_t>>& read_index, uint32_t segment_pos) {
582
0
    if (delete_sign_column_data == nullptr) {
583
0
        return false;
584
0
    }
585
0
    if (auto it = read_index[tablet_schema.delete_sign_idx()].find(segment_pos);
586
0
        it != read_index[tablet_schema.delete_sign_idx()].end()) {
587
0
        return delete_sign_column_data[it->second] != 0;
588
0
    }
589
0
    return false;
590
0
}
591
592
static Status fill_non_primary_key_cell_for_column_store(
593
        const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col,
594
        const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col,
595
        std::size_t block_pos, uint32_t segment_pos, bool skipped, bool row_has_sequence_col,
596
        bool use_default, const signed char* delete_sign_column_data,
597
        const TabletSchema& tablet_schema,
598
        std::map<uint32_t, std::map<uint32_t, uint32_t>>& read_index, const PartialUpdateInfo* info,
599
0
        const BitmapValue& skip_bitmap) {
600
0
    if (skipped) {
601
0
        DCHECK(std::cmp_not_equal(cid, tablet_schema.skip_bitmap_col_idx()));
602
0
        DCHECK(std::cmp_not_equal(cid, tablet_schema.version_col_idx()));
603
0
        DCHECK(!tablet_column.is_row_store_column());
604
605
0
        if (!use_default) {
606
0
            if (delete_sign_column_data != nullptr) {
607
0
                bool old_row_delete_sign = old_row_has_delete_sign_for_column_store(
608
0
                        delete_sign_column_data, tablet_schema, read_index, segment_pos);
609
610
0
                if (old_row_delete_sign) {
611
0
                    if (!tablet_schema.has_sequence_col()) {
612
0
                        use_default = true;
613
0
                    } else if (row_has_sequence_col ||
614
0
                               (!tablet_column.is_seqeunce_col() &&
615
0
                                (tablet_column.unique_id() != info->sequence_map_col_uid()))) {
616
                        // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
617
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
618
                        // it may cause the merge-on-read based compaction to produce incorrect results
619
0
                        use_default = true;
620
0
                    }
621
0
                }
622
0
            }
623
0
        }
624
0
        if (!use_default && tablet_column.is_on_update_current_timestamp()) {
625
0
            use_default = true;
626
0
        }
627
0
        if (use_default) {
628
0
            if (tablet_column.has_default_value()) {
629
0
                new_col->insert_from(default_value_col, 0);
630
0
            } else if (tablet_column.is_nullable()) {
631
0
                assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get())
632
0
                        ->insert_many_defaults(1);
633
0
            } else if (tablet_column.is_auto_increment()) {
634
                // In flexible partial update, the skip bitmap indicates whether a cell
635
                // is specified in the original load, so the generated auto-increment value is filled
636
                // in current block in place if needed rather than using a seperate column to
637
                // store the generated auto-increment value in fixed partial update
638
0
                new_col->insert_from(cur_col, block_pos);
639
0
            } else {
640
0
                new_col->insert_default();
641
0
            }
642
0
        } else {
643
0
            auto pos_in_old_block = read_index.at(cid).at(segment_pos);
644
0
            new_col->insert_from(old_value_col, pos_in_old_block);
645
0
        }
646
0
    } else {
647
0
        if (tablet_column.is_variant_type() && !use_default &&
648
0
            !old_row_has_delete_sign_for_column_store(delete_sign_column_data, tablet_schema,
649
0
                                                      read_index, segment_pos) &&
650
0
            read_index.contains(cid) && read_index.at(cid).contains(segment_pos)) {
651
0
            RETURN_IF_ERROR(variant_util::merge_variant_patch_by_path_markers(
652
0
                    old_value_col, read_index.at(cid).at(segment_pos), cur_col, block_pos,
653
0
                    tablet_column.unique_id(), skip_bitmap, false, *new_col));
654
0
            return Status::OK();
655
0
        }
656
0
        new_col->insert_from(cur_col, block_pos);
657
0
    }
658
0
    return Status::OK();
659
0
}
660
661
Status FlexibleReadPlan::fill_non_primary_key_columns_for_column_store(
662
        RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
663
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
664
        Block& old_value_block, MutableColumns& mutable_full_columns,
665
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
666
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
667
0
        std::vector<BitmapValue>* skip_bitmaps) const {
668
0
    auto* info = rowset_ctx->partial_update_info.get();
669
0
    int32_t seq_col_unique_id = -1;
670
0
    if (tablet_schema.has_sequence_col()) {
671
0
        seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
672
0
    }
673
    // cid -> segment pos to write -> rowid to read in old_value_block
674
0
    std::map<uint32_t, std::map<uint32_t, uint32_t>> read_index;
675
0
    RETURN_IF_ERROR(
676
0
            read_columns_by_plan(tablet_schema, rsid_to_rowset, old_value_block, &read_index));
677
    // !!!ATTENTION!!!: columns in old_value_block may have different size because every row has different columns to update
678
679
0
    const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
680
    // build default value columns
681
0
    auto default_value_block = old_value_block.clone_empty();
682
0
    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
683
0
        RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
684
0
                tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
685
0
                default_value_block));
686
0
    }
687
688
    // fill all non sort key columns from mutable_old_columns, need to consider default value and null value
689
0
    for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
690
0
        auto cid = non_sort_key_cids[i];
691
0
        const auto& tablet_column = tablet_schema.column(cid);
692
0
        auto col_uid = tablet_column.unique_id();
693
0
        for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
694
0
            auto segment_pos = segment_start_pos + idx;
695
0
            auto block_pos = block_start_pos + idx;
696
697
0
            RETURN_IF_ERROR(fill_non_primary_key_cell_for_column_store(
698
0
                    tablet_column, cid, mutable_full_columns[cid],
699
0
                    *default_value_block.get_by_position(i).column,
700
0
                    *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column,
701
0
                    block_pos, segment_pos, skip_bitmaps->at(block_pos).contains(col_uid),
702
0
                    tablet_schema.has_sequence_col()
703
0
                            ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
704
0
                            : false,
705
0
                    use_default_or_null_flag[idx], delete_sign_column_data, tablet_schema,
706
0
                    read_index, info, skip_bitmaps->at(block_pos)));
707
0
        }
708
0
    }
709
0
    return Status::OK();
710
0
}
711
712
static Status fill_non_primary_key_cell_for_row_store(
713
        const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col,
714
        const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col,
715
        std::size_t block_pos, bool skipped, bool row_has_sequence_col, bool use_default,
716
        const signed char* delete_sign_column_data, bool old_row_exists, uint32_t pos_in_old_block,
717
        const TabletSchema& tablet_schema, const PartialUpdateInfo* info,
718
0
        const BitmapValue& skip_bitmap) {
719
0
    if (skipped) {
720
0
        DCHECK(std::cmp_not_equal(cid, tablet_schema.skip_bitmap_col_idx()));
721
0
        DCHECK(std::cmp_not_equal(cid, tablet_schema.version_col_idx()));
722
0
        DCHECK(!tablet_column.is_row_store_column());
723
0
        if (!use_default) {
724
0
            DCHECK(old_row_exists);
725
0
            if (delete_sign_column_data != nullptr) {
726
0
                bool old_row_delete_sign = (delete_sign_column_data[pos_in_old_block] != 0);
727
0
                if (old_row_delete_sign) {
728
0
                    if (!tablet_schema.has_sequence_col()) {
729
0
                        use_default = true;
730
0
                    } else if (row_has_sequence_col ||
731
0
                               (!tablet_column.is_seqeunce_col() &&
732
0
                                (tablet_column.unique_id() != info->sequence_map_col_uid()))) {
733
                        // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
734
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
735
                        // it may cause the merge-on-read based compaction to produce incorrect results
736
0
                        use_default = true;
737
0
                    }
738
0
                }
739
0
            }
740
0
        }
741
742
0
        if (!use_default && tablet_column.is_on_update_current_timestamp()) {
743
0
            use_default = true;
744
0
        }
745
0
        if (use_default) {
746
0
            if (tablet_column.has_default_value()) {
747
0
                new_col->insert_from(default_value_col, 0);
748
0
            } else if (tablet_column.is_nullable()) {
749
0
                assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get())
750
0
                        ->insert_many_defaults(1);
751
0
            } else if (tablet_column.is_auto_increment()) {
752
                // In flexible partial update, the skip bitmap indicates whether a cell
753
                // is specified in the original load, so the generated auto-increment value is filled
754
                // in current block in place if needed rather than using a seperate column to
755
                // store the generated auto-increment value in fixed partial update
756
0
                new_col->insert_from(cur_col, block_pos);
757
0
            } else {
758
0
                new_col->insert_default();
759
0
            }
760
0
        } else {
761
0
            new_col->insert_from(old_value_col, pos_in_old_block);
762
0
        }
763
0
    } else {
764
0
        bool old_row_delete_sign = (old_row_exists && delete_sign_column_data != nullptr &&
765
0
                                    delete_sign_column_data[pos_in_old_block] != 0);
766
0
        if (tablet_column.is_variant_type() && old_row_exists && !use_default &&
767
0
            !old_row_delete_sign) {
768
0
            RETURN_IF_ERROR(variant_util::merge_variant_patch_by_path_markers(
769
0
                    old_value_col, pos_in_old_block, cur_col, block_pos, tablet_column.unique_id(),
770
0
                    skip_bitmap, false, *new_col));
771
0
            return Status::OK();
772
0
        }
773
0
        new_col->insert_from(cur_col, block_pos);
774
0
    }
775
0
    return Status::OK();
776
0
}
777
778
Status FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
779
        RowsetWriterContext* rowset_ctx, const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
780
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
781
        Block& old_value_block, MutableColumns& mutable_full_columns,
782
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
783
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
784
0
        std::vector<BitmapValue>* skip_bitmaps) const {
785
0
    auto* info = rowset_ctx->partial_update_info.get();
786
0
    int32_t seq_col_unique_id = -1;
787
0
    if (tablet_schema.has_sequence_col()) {
788
0
        seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
789
0
    }
790
    // segment pos to write -> rowid to read in old_value_block
791
0
    std::map<uint32_t, uint32_t> read_index;
792
0
    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, non_sort_key_cids, rsid_to_rowset,
793
0
                                         old_value_block, &read_index));
794
795
0
    const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
796
    // build default value columns
797
0
    auto default_value_block = old_value_block.clone_empty();
798
0
    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
799
0
        RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
800
0
                tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
801
0
                default_value_block));
802
0
    }
803
804
    // fill all non sort key columns from mutable_old_columns, need to consider default value and null value
805
0
    for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
806
0
        auto cid = non_sort_key_cids[i];
807
0
        const auto& tablet_column = tablet_schema.column(cid);
808
0
        auto col_uid = tablet_column.unique_id();
809
0
        for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
810
0
            auto segment_pos = segment_start_pos + idx;
811
0
            auto block_pos = block_start_pos + idx;
812
0
            auto read_index_iter = read_index.find(segment_pos);
813
0
            bool old_row_exists = (read_index_iter != read_index.end());
814
0
            uint32_t pos_in_old_block = old_row_exists ? read_index_iter->second : 0;
815
816
0
            RETURN_IF_ERROR(fill_non_primary_key_cell_for_row_store(
817
0
                    tablet_column, cid, mutable_full_columns[cid],
818
0
                    *default_value_block.get_by_position(i).column,
819
0
                    *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column,
820
0
                    block_pos, skip_bitmaps->at(block_pos).contains(col_uid),
821
0
                    tablet_schema.has_sequence_col()
822
0
                            ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
823
0
                            : false,
824
0
                    use_default_or_null_flag[idx], delete_sign_column_data, old_row_exists,
825
0
                    pos_in_old_block, tablet_schema, info, skip_bitmaps->at(block_pos)));
826
0
        }
827
0
    }
828
0
    return Status::OK();
829
0
}
830
831
BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer)
832
13
        : _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {}
833
834
Status BlockAggregator::merge_one_row(MutableBlock& dst_block, Block* src_block, int rid,
835
0
                                      BitmapValue& skip_bitmap) {
836
0
    for (size_t cid {_tablet_schema.num_key_columns()}; cid < _tablet_schema.num_columns(); cid++) {
837
0
        if (std::cmp_equal(cid, _tablet_schema.skip_bitmap_col_idx())) {
838
0
            auto& cur_skip_bitmap =
839
0
                    assert_cast<ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
840
0
                            ->get_data()
841
0
                            .back();
842
0
            const auto& new_row_skip_bitmap =
843
0
                    assert_cast<ColumnBitmap*>(
844
0
                            src_block->get_by_position(cid).column->assume_mutable().get())
845
0
                            ->get_data()[rid];
846
0
            BitmapValue merged_skip_bitmap;
847
0
            RETURN_IF_ERROR(variant_util::merge_variant_patch_path_markers(
848
0
                    cur_skip_bitmap, new_row_skip_bitmap, &merged_skip_bitmap));
849
0
            cur_skip_bitmap = std::move(merged_skip_bitmap);
850
0
            continue;
851
0
        }
852
0
        if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
853
0
            if (_tablet_schema.column(cid).is_variant_type()) {
854
0
                auto merged_col = dst_block.mutable_columns()[cid]->clone_empty();
855
0
                RETURN_IF_ERROR(variant_util::merge_variant_patch(
856
0
                        *dst_block.mutable_columns()[cid],
857
0
                        dst_block.mutable_columns()[cid]->size() - 1,
858
0
                        *src_block->get_by_position(cid).column, rid, *merged_col));
859
0
                dst_block.mutable_columns()[cid]->pop_back(1);
860
0
                dst_block.mutable_columns()[cid]->insert_from(*merged_col, 0);
861
0
            } else {
862
0
                dst_block.mutable_columns()[cid]->pop_back(1);
863
0
                dst_block.mutable_columns()[cid]->insert_from(
864
0
                        *src_block->get_by_position(cid).column, rid);
865
0
            }
866
0
        }
867
0
    }
868
0
    VLOG_DEBUG << fmt::format("merge a row, after merge, output_block.rows()={}, state: {}",
869
0
                              dst_block.rows(), _state.to_string());
870
0
    return Status::OK();
871
0
}
872
873
0
void BlockAggregator::append_one_row(MutableBlock& dst_block, Block* src_block, int rid) {
874
0
    dst_block.add_row(src_block, rid);
875
0
    _state.rows++;
876
0
    VLOG_DEBUG << fmt::format("append a new row, after append, output_block.rows()={}, state: {}",
877
0
                              dst_block.rows(), _state.to_string());
878
0
}
879
880
0
void BlockAggregator::remove_last_n_rows(MutableBlock& dst_block, int n) {
881
0
    if (n > 0) {
882
0
        for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
883
0
            DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
884
0
            dst_block.mutable_columns()[cid]->pop_back(n);
885
0
        }
886
0
    }
887
0
}
888
889
Status BlockAggregator::append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid,
890
0
                                            BitmapValue& skip_bitmap, bool have_delete_sign) {
891
0
    if (have_delete_sign) {
892
        // remove all the previous batched rows
893
0
        remove_last_n_rows(dst_block, _state.rows);
894
0
        _state.rows = 0;
895
0
        _state.has_row_with_delete_sign = true;
896
897
0
        append_one_row(dst_block, src_block, rid);
898
0
    } else {
899
0
        if (_state.should_merge()) {
900
0
            RETURN_IF_ERROR(merge_one_row(dst_block, src_block, rid, skip_bitmap));
901
0
        } else {
902
0
            append_one_row(dst_block, src_block, rid);
903
0
        }
904
0
    }
905
0
    return Status::OK();
906
0
};
907
908
Status BlockAggregator::aggregate_rows(
909
        MutableBlock& output_block, Block* block, int start, int end, std::string key,
910
        std::vector<BitmapValue>* skip_bitmaps, const signed char* delete_signs,
911
        IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets,
912
0
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
913
0
    VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
914
0
    if (end - start == 1) {
915
0
        output_block.add_row(block, start);
916
0
        VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
917
0
        return Status::OK();
918
0
    }
919
920
0
    auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
921
0
    auto delete_sign_col_unique_id =
922
0
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
923
924
0
    _state.reset();
925
926
0
    RowLocation loc;
927
0
    RowsetSharedPtr rowset;
928
0
    std::string previous_encoded_seq_value {};
929
0
    Status st = _writer._tablet->lookup_row_key(
930
0
            key, &_tablet_schema, false, specified_rowsets, &loc, _writer._mow_context->max_version,
931
0
            segment_caches, &rowset, true, &previous_encoded_seq_value);
932
0
    int pos = start;
933
0
    bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
934
0
    DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
935
0
            << "[BlockAggregator::aggregate_rows] unexpected error status while lookup_row_key:"
936
0
            << st;
937
0
    if (!is_expected_st) {
938
0
        return st;
939
0
    }
940
941
0
    std::string cur_seq_val;
942
0
    if (st.ok()) {
943
0
        for (pos = start; pos < end; pos++) {
944
0
            auto& skip_bitmap = skip_bitmaps->at(pos);
945
0
            bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
946
            // Discard all the rows whose seq value is smaller than previous_encoded_seq_value.
947
0
            if (row_has_sequence_col) {
948
0
                std::string seq_val {};
949
0
                _writer._encode_seq_column(seq_column, pos, &seq_val);
950
0
                if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) < 0) {
951
0
                    continue;
952
0
                }
953
0
                cur_seq_val = std::move(seq_val);
954
0
                break;
955
0
            }
956
0
            cur_seq_val = std::move(previous_encoded_seq_value);
957
0
            break;
958
0
        }
959
0
    } else {
960
0
        pos = start;
961
0
        auto& skip_bitmap = skip_bitmaps->at(pos);
962
0
        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
963
0
        if (row_has_sequence_col) {
964
0
            std::string seq_val {};
965
            // for rows that don't specify seqeunce col, seq_val will be encoded to minial value
966
0
            _writer._encode_seq_column(seq_column, pos, &seq_val);
967
0
            cur_seq_val = std::move(seq_val);
968
0
        } else {
969
0
            cur_seq_val.clear();
970
0
            RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value(
971
0
                    _tablet_schema, *_writer._opts.rowset_ctx->partial_update_info, &cur_seq_val));
972
0
        }
973
0
    }
974
975
0
    for (int rid {pos}; rid < end; rid++) {
976
0
        auto& skip_bitmap = skip_bitmaps->at(rid);
977
0
        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
978
0
        bool have_delete_sign =
979
0
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0);
980
0
        if (!row_has_sequence_col) {
981
0
            RETURN_IF_ERROR(
982
0
                    append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign));
983
0
        } else {
984
0
            std::string seq_val {};
985
0
            _writer._encode_seq_column(seq_column, rid, &seq_val);
986
0
            if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) {
987
0
                RETURN_IF_ERROR(append_or_merge_row(output_block, block, rid, skip_bitmap,
988
0
                                                    have_delete_sign));
989
0
                cur_seq_val = std::move(seq_val);
990
0
            } else {
991
0
                VLOG_DEBUG << fmt::format(
992
0
                        "skip rid={} becasue its seq value is lower than the previous", rid);
993
0
            }
994
0
        }
995
0
    }
996
0
    return Status::OK();
997
0
};
998
999
Status BlockAggregator::aggregate_for_sequence_column(
1000
        Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
1001
        IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets,
1002
0
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1003
0
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
1004
    // the process logic here is the same as MemTable::_aggregate_for_flexible_partial_update_without_seq_col()
1005
    // after this function, there will be at most 2 rows for a specified key
1006
0
    std::vector<BitmapValue>* skip_bitmaps = &(
1007
0
            assert_cast<ColumnBitmap*>(block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
1008
0
                                               .column->assume_mutable()
1009
0
                                               .get())
1010
0
                    ->get_data());
1011
0
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
1012
1013
0
    auto filtered_block = _tablet_schema.create_block();
1014
0
    MutableBlock output_block = MutableBlock::build_mutable_block(&filtered_block);
1015
1016
0
    int same_key_rows {0};
1017
0
    std::string previous_key {};
1018
0
    for (int block_pos {0}; block_pos < num_rows; block_pos++) {
1019
0
        std::string key = _writer._full_encode_keys(key_columns, block_pos);
1020
0
        if (block_pos > 0 && previous_key == key) {
1021
0
            same_key_rows++;
1022
0
        } else {
1023
0
            if (same_key_rows > 0) {
1024
0
                RETURN_IF_ERROR(aggregate_rows(output_block, block, block_pos - same_key_rows,
1025
0
                                               block_pos, std::move(previous_key), skip_bitmaps,
1026
0
                                               delete_signs, seq_column, specified_rowsets,
1027
0
                                               segment_caches));
1028
0
            }
1029
0
            same_key_rows = 1;
1030
0
        }
1031
0
        previous_key = std::move(key);
1032
0
    }
1033
0
    if (same_key_rows > 0) {
1034
0
        RETURN_IF_ERROR(aggregate_rows(output_block, block, num_rows - same_key_rows, num_rows,
1035
0
                                       std::move(previous_key), skip_bitmaps, delete_signs,
1036
0
                                       seq_column, specified_rowsets, segment_caches));
1037
0
    }
1038
1039
0
    block->swap(output_block.to_block());
1040
0
    return Status::OK();
1041
0
}
1042
1043
Status BlockAggregator::aggregate_without_sequence_column(
1044
0
        Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns) {
1045
0
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
1046
0
    std::vector<BitmapValue>* skip_bitmaps = &(
1047
0
            assert_cast<ColumnBitmap*>(block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
1048
0
                                               .column->assume_mutable()
1049
0
                                               .get())
1050
0
                    ->get_data());
1051
0
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
1052
0
    DCHECK(delete_signs != nullptr);
1053
0
    int32_t delete_sign_col_unique_id =
1054
0
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
1055
1056
0
    auto aggregated_block = _tablet_schema.create_block();
1057
0
    MutableBlock output_block = MutableBlock::build_mutable_block(&aggregated_block);
1058
1059
0
    auto aggregate_range = [&](int start, int end) -> Status {
1060
0
        if (end - start == 1) {
1061
0
            output_block.add_row(block, start);
1062
0
            return Status::OK();
1063
0
        }
1064
0
        _state.reset();
1065
0
        for (int rid = start; rid < end; ++rid) {
1066
0
            auto& skip_bitmap = skip_bitmaps->at(rid);
1067
0
            bool have_delete_sign =
1068
0
                    (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0);
1069
0
            RETURN_IF_ERROR(
1070
0
                    append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign));
1071
0
        }
1072
0
        return Status::OK();
1073
0
    };
1074
1075
0
    int same_key_rows {0};
1076
0
    std::string previous_key {};
1077
0
    const auto num_rows_int = static_cast<int>(num_rows);
1078
0
    for (int block_pos {0}; block_pos < num_rows_int; block_pos++) {
1079
0
        std::string key = _writer._full_encode_keys(key_columns, block_pos);
1080
0
        if (block_pos > 0 && previous_key == key) {
1081
0
            same_key_rows++;
1082
0
        } else {
1083
0
            if (same_key_rows > 0) {
1084
0
                RETURN_IF_ERROR(aggregate_range(block_pos - same_key_rows, block_pos));
1085
0
            }
1086
0
            same_key_rows = 1;
1087
0
        }
1088
0
        previous_key = std::move(key);
1089
0
    }
1090
0
    if (same_key_rows > 0) {
1091
0
        RETURN_IF_ERROR(aggregate_range(num_rows_int - same_key_rows, num_rows_int));
1092
0
    }
1093
1094
0
    if (output_block.rows() != num_rows) {
1095
0
        block->swap(output_block.to_block());
1096
0
    }
1097
0
    return Status::OK();
1098
0
}
1099
1100
Status BlockAggregator::fill_sequence_column(Block* block, size_t num_rows,
1101
                                             const FixedReadPlan& read_plan,
1102
0
                                             std::vector<BitmapValue>& skip_bitmaps) {
1103
0
    DCHECK(_tablet_schema.has_sequence_col());
1104
0
    std::vector<uint32_t> cids {static_cast<uint32_t>(_tablet_schema.sequence_col_idx())};
1105
0
    auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
1106
1107
0
    auto seq_col_block = _tablet_schema.create_block_by_cids(cids);
1108
0
    auto tmp_block = _tablet_schema.create_block_by_cids(cids);
1109
0
    std::map<uint32_t, uint32_t> read_index;
1110
0
    RETURN_IF_ERROR(read_plan.read_columns_by_plan(_tablet_schema, cids, _writer._rsid_to_rowset,
1111
0
                                                   seq_col_block, &read_index, false));
1112
1113
0
    auto new_seq_col_ptr = tmp_block.get_by_position(0).column->assume_mutable();
1114
0
    const auto& old_seq_col_ptr = *seq_col_block.get_by_position(0).column;
1115
0
    const auto& cur_seq_col_ptr = *block->get_by_position(_tablet_schema.sequence_col_idx()).column;
1116
0
    for (uint32_t block_pos {0}; block_pos < num_rows; block_pos++) {
1117
0
        if (read_index.contains(block_pos)) {
1118
0
            new_seq_col_ptr->insert_from(old_seq_col_ptr, read_index[block_pos]);
1119
0
            skip_bitmaps[block_pos].remove(seq_col_unique_id);
1120
0
        } else {
1121
0
            new_seq_col_ptr->insert_from(cur_seq_col_ptr, block_pos);
1122
0
        }
1123
0
    }
1124
0
    block->replace_by_position(_tablet_schema.sequence_col_idx(), std::move(new_seq_col_ptr));
1125
0
    return Status::OK();
1126
0
}
1127
1128
Status BlockAggregator::aggregate_for_insert_after_delete(
1129
        Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
1130
        const std::vector<RowsetSharedPtr>& specified_rowsets,
1131
0
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1132
0
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
1133
    // there will be at most 2 rows for a specified key in block when control flow reaches here
1134
    // after this function, there will not be duplicate rows in block
1135
1136
0
    std::vector<BitmapValue>* skip_bitmaps = &(
1137
0
            assert_cast<ColumnBitmap*>(block->get_by_position(_tablet_schema.skip_bitmap_col_idx())
1138
0
                                               .column->assume_mutable()
1139
0
                                               .get())
1140
0
                    ->get_data());
1141
0
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
1142
1143
0
    auto filter_column = ColumnUInt8::create(num_rows, 1);
1144
0
    auto* __restrict filter_map = filter_column->get_data().data();
1145
0
    std::string previous_key {};
1146
0
    bool previous_has_delete_sign {false};
1147
0
    int duplicate_rows {0};
1148
0
    int32_t delete_sign_col_unique_id =
1149
0
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
1150
0
    auto seq_col_unique_id =
1151
0
            (_tablet_schema.sequence_col_idx() != -1)
1152
0
                    ? _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id()
1153
0
                    : -1;
1154
0
    FixedReadPlan read_plan;
1155
0
    for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
1156
0
        size_t delta_pos = block_pos;
1157
0
        auto& skip_bitmap = skip_bitmaps->at(block_pos);
1158
0
        std::string key = _writer._full_encode_keys(key_columns, delta_pos);
1159
0
        bool have_delete_sign =
1160
0
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);
1161
0
        if (delta_pos > 0 && previous_key == key) {
1162
            // !!ATTENTION!!: We can only remove the row with delete sign if there is a insert with the same key after this row.
1163
            // If there is only a row with delete sign, we should keep it and can't remove it from block, because
1164
            // compaction will not use the delete bitmap when reading data. So there may still be rows with delete sign
1165
            // in later process
1166
0
            DCHECK(previous_has_delete_sign);
1167
0
            DCHECK(!have_delete_sign);
1168
0
            ++duplicate_rows;
1169
0
            RowLocation loc;
1170
0
            RowsetSharedPtr rowset;
1171
0
            Status st = _writer._tablet->lookup_row_key(
1172
0
                    key, &_tablet_schema, false, specified_rowsets, &loc,
1173
0
                    _writer._mow_context->max_version, segment_caches, &rowset, true);
1174
0
            bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
1175
0
            DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
1176
0
                    << "[BlockAggregator::aggregate_for_insert_after_delete] unexpected error "
1177
0
                       "status while lookup_row_key:"
1178
0
                    << st;
1179
0
            if (!is_expected_st) {
1180
0
                return st;
1181
0
            }
1182
1183
0
            Slice previous_seq_slice {};
1184
0
            if (st.ok()) {
1185
0
                if (_tablet_schema.has_sequence_col()) {
1186
                    // if the insert row doesn't specify the sequence column, we need to
1187
                    // read the historical's sequence column value so that we don't need
1188
                    // to handle seqeunce column in append_block_with_flexible_content()
1189
                    // for this row
1190
0
                    bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
1191
0
                    if (!row_has_sequence_col) {
1192
0
                        read_plan.prepare_to_read(loc, block_pos);
1193
0
                        _writer._rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
1194
0
                    }
1195
0
                }
1196
                // delete the existing row
1197
0
                _writer._mow_context->delete_bitmap->add(
1198
0
                        {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
1199
0
                        loc.row_id);
1200
0
            }
1201
            // and remove the row with delete sign from the current block
1202
0
            filter_map[block_pos - 1] = 0;
1203
0
        }
1204
0
        previous_has_delete_sign = have_delete_sign;
1205
0
        previous_key = std::move(key);
1206
0
    }
1207
0
    if (duplicate_rows > 0) {
1208
0
        if (!read_plan.empty()) {
1209
            // fill sequence column value for some rows
1210
0
            RETURN_IF_ERROR(fill_sequence_column(block, num_rows, read_plan, *skip_bitmaps));
1211
0
        }
1212
0
        RETURN_IF_ERROR(filter_block(block, num_rows, std::move(filter_column), duplicate_rows,
1213
0
                                     "__filter_insert_after_delete_col__"));
1214
0
    }
1215
0
    return Status::OK();
1216
0
}
1217
1218
Status BlockAggregator::filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column,
1219
0
                                     int duplicate_rows, std::string col_name) {
1220
0
    auto num_cols = block->columns();
1221
0
    block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), col_name});
1222
0
    RETURN_IF_ERROR(Block::filter_block(block, num_cols, num_cols));
1223
0
    DCHECK_EQ(num_cols, block->columns());
1224
0
    size_t merged_rows = num_rows - block->rows();
1225
0
    if (std::cmp_not_equal(duplicate_rows, merged_rows)) {
1226
0
        auto msg = fmt::format(
1227
0
                "filter_block_for_flexible_partial_update {}: duplicate_rows != merged_rows, "
1228
0
                "duplicate_keys={}, merged_rows={}, num_rows={}, mutable_block->rows()={}",
1229
0
                col_name, duplicate_rows, merged_rows, num_rows, block->rows());
1230
0
        DCHECK(false) << msg;
1231
0
        return Status::InternalError<false>(msg);
1232
0
    }
1233
0
    return Status::OK();
1234
0
}
1235
1236
Status BlockAggregator::convert_pk_columns(Block* block, size_t row_pos, size_t num_rows,
1237
0
                                           std::vector<IOlapColumnDataAccessor*>& key_columns) {
1238
0
    key_columns.clear();
1239
0
    for (uint32_t cid {0}; cid < _tablet_schema.num_key_columns(); cid++) {
1240
0
        RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column(
1241
0
                block->get_by_position(cid), row_pos, num_rows, cid));
1242
0
        auto [status, column] = _writer._olap_data_convertor->convert_column_data(cid);
1243
0
        if (!status.ok()) {
1244
0
            return status;
1245
0
        }
1246
0
        key_columns.push_back(column);
1247
0
    }
1248
0
    return Status::OK();
1249
0
}
1250
1251
Status BlockAggregator::convert_seq_column(Block* block, size_t row_pos, size_t num_rows,
1252
0
                                           IOlapColumnDataAccessor*& seq_column) {
1253
0
    seq_column = nullptr;
1254
0
    if (_tablet_schema.has_sequence_col()) {
1255
0
        auto seq_col_idx = _tablet_schema.sequence_col_idx();
1256
0
        RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column(
1257
0
                block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx));
1258
0
        auto [status, column] = _writer._olap_data_convertor->convert_column_data(seq_col_idx);
1259
0
        if (!status.ok()) {
1260
0
            return status;
1261
0
        }
1262
0
        seq_column = column;
1263
0
    }
1264
0
    return Status::OK();
1265
0
};
1266
1267
Status BlockAggregator::aggregate_for_flexible_partial_update(
1268
        Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets,
1269
0
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1270
0
    std::vector<IOlapColumnDataAccessor*> key_columns {};
1271
0
    IOlapColumnDataAccessor* seq_column {nullptr};
1272
1273
0
    RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
1274
0
    RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
1275
1276
    // 1. merge duplicate rows when table has sequence column
1277
    // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column,
1278
    // some of them don't. We can't do the de-duplication in memtable because we don't know the historical data. We must
1279
    // de-duplicate them here.
1280
0
    if (_tablet_schema.has_sequence_col()) {
1281
0
        RETURN_IF_ERROR(aggregate_for_sequence_column(block, static_cast<int>(num_rows),
1282
0
                                                      key_columns, seq_column, specified_rowsets,
1283
0
                                                      segment_caches));
1284
0
    } else {
1285
0
        RETURN_IF_ERROR(aggregate_without_sequence_column(block, num_rows, key_columns));
1286
0
    }
1287
1288
    // 2. merge duplicate rows and handle insert after delete
1289
0
    if (block->rows() != num_rows) {
1290
0
        num_rows = block->rows();
1291
        // data in block has changed, should re-encode key columns, sequence column
1292
0
        _writer._olap_data_convertor->clear_source_content();
1293
0
        RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
1294
0
        RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
1295
0
    }
1296
0
    RETURN_IF_ERROR(aggregate_for_insert_after_delete(block, num_rows, key_columns,
1297
0
                                                      specified_rowsets, segment_caches));
1298
0
    return Status::OK();
1299
0
}
1300
1301
} // namespace doris