Coverage Report

Created: 2026-06-24 23:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/partial_update_info.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/partial_update_info.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <cstdint>
23
#include <optional>
24
25
#include "common/consts.h"
26
#include "common/logging.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
30
#include "core/value/bitmap_value.h"
31
#include "storage/iterator/olap_data_convertor.h"
32
#include "storage/key/row_key_encoder.h"
33
#include "storage/mow/historical_row_fetcher.h"
34
#include "storage/mow/key_probe.h"
35
#include "storage/olap_common.h"
36
#include "storage/rowset/rowset.h"
37
#include "storage/rowset/rowset_writer_context.h"
38
#include "storage/segment/historical_row_retriever.h"
39
#include "storage/tablet/base_tablet.h"
40
#include "storage/tablet/tablet_meta.h"
41
#include "storage/tablet/tablet_schema.h"
42
#include "storage/utils.h"
43
44
namespace doris {
45
46
41
ColumnBitmap* get_mutable_skip_bitmap_column(Block* block, size_t skip_bitmap_col_idx) {
47
41
    auto skip_bitmap_column =
48
41
            IColumn::mutate(std::move(block->get_by_position(skip_bitmap_col_idx).column));
49
41
    auto* skip_bitmap_column_ptr = assert_cast<ColumnBitmap*>(skip_bitmap_column.get());
50
41
    block->replace_by_position(skip_bitmap_col_idx, std::move(skip_bitmap_column));
51
41
    return skip_bitmap_column_ptr;
52
41
}
53
54
Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
55
                               UniqueKeyUpdateModePB unique_key_update_mode,
56
                               PartialUpdateNewRowPolicyPB policy,
57
                               const std::set<std::string>& partial_update_cols,
58
                               bool is_strict_mode_, int64_t timestamp_ms_, int32_t nano_seconds_,
59
                               const std::string& timezone_,
60
                               const std::string& auto_increment_column,
61
75
                               int32_t sequence_map_col_uid, int64_t cur_max_version) {
62
75
    partial_update_mode = unique_key_update_mode;
63
75
    partial_update_new_key_policy = policy;
64
75
    partial_update_input_columns = partial_update_cols;
65
75
    max_version_in_flush_phase = cur_max_version;
66
75
    sequence_map_col_unqiue_id = sequence_map_col_uid;
67
75
    timestamp_ms = timestamp_ms_;
68
75
    nano_seconds = nano_seconds_;
69
75
    timezone = timezone_;
70
75
    missing_cids.clear();
71
75
    update_cids.clear();
72
73
75
    if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
74
        // partial_update_cols should include all key columns
75
50
        for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) {
76
25
            const auto key_col = tablet_schema.column(i);
77
25
            if (!partial_update_cols.contains(key_col.name())) {
78
0
                auto msg = fmt::format(
79
0
                        "Unable to do partial update on shadow index's tablet, tablet_id={}, "
80
0
                        "txn_id={}. Missing key column {}.",
81
0
                        tablet_id, txn_id, key_col.name());
82
0
                LOG_WARNING(msg);
83
0
                return Status::Aborted<false>(msg);
84
0
            }
85
25
        }
86
25
    }
87
75
    if (is_partial_update()) {
88
214
        for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
89
171
            if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
90
86
                auto tablet_column = tablet_schema.column(i);
91
86
                if (!partial_update_input_columns.contains(tablet_column.name())) {
92
47
                    missing_cids.emplace_back(i);
93
47
                    if (!tablet_column.has_default_value() && !tablet_column.is_nullable() &&
94
47
                        tablet_schema.auto_increment_column() != tablet_column.name()) {
95
3
                        can_insert_new_rows_in_partial_update = false;
96
3
                    }
97
47
                } else {
98
39
                    update_cids.emplace_back(i);
99
39
                }
100
86
                if (auto_increment_column == tablet_column.name()) {
101
0
                    is_schema_contains_auto_inc_column = true;
102
0
                }
103
86
            } else {
104
                // in flexible partial update, missing cids is all non sort keys' cid
105
85
                if (i >= tablet_schema.num_key_columns()) {
106
67
                    missing_cids.emplace_back(i);
107
67
                }
108
85
            }
109
171
        }
110
43
        _generate_default_values_for_missing_cids(tablet_schema);
111
43
    }
112
75
    is_strict_mode = is_strict_mode_;
113
75
    is_input_columns_contains_auto_inc_column =
114
75
            is_fixed_partial_update() &&
115
75
            partial_update_input_columns.contains(auto_increment_column);
116
75
    return Status::OK();
117
75
}
118
119
0
void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
120
0
    partial_update_info_pb->set_partial_update_mode(partial_update_mode);
121
0
    partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy);
122
0
    partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
123
0
    for (const auto& col : partial_update_input_columns) {
124
0
        partial_update_info_pb->add_partial_update_input_columns(col);
125
0
    }
126
0
    for (auto cid : missing_cids) {
127
0
        partial_update_info_pb->add_missing_cids(cid);
128
0
    }
129
0
    for (auto cid : update_cids) {
130
0
        partial_update_info_pb->add_update_cids(cid);
131
0
    }
132
0
    partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
133
0
            can_insert_new_rows_in_partial_update);
134
0
    partial_update_info_pb->set_is_strict_mode(is_strict_mode);
135
0
    partial_update_info_pb->set_timestamp_ms(timestamp_ms);
136
0
    partial_update_info_pb->set_nano_seconds(nano_seconds);
137
0
    partial_update_info_pb->set_timezone(timezone);
138
0
    partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
139
0
            is_input_columns_contains_auto_inc_column);
140
0
    partial_update_info_pb->set_is_schema_contains_auto_inc_column(
141
0
            is_schema_contains_auto_inc_column);
142
0
    for (const auto& value : default_values) {
143
0
        partial_update_info_pb->add_default_values(value);
144
0
    }
145
0
}
146
147
0
void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
148
0
    if (!partial_update_info_pb->has_partial_update_mode()) {
149
        // for backward compatibility
150
0
        if (partial_update_info_pb->is_partial_update()) {
151
0
            partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
152
0
        } else {
153
0
            partial_update_mode = UniqueKeyUpdateModePB::UPSERT;
154
0
        }
155
0
    } else {
156
0
        partial_update_mode = partial_update_info_pb->partial_update_mode();
157
0
    }
158
0
    if (partial_update_info_pb->has_partial_update_new_key_policy()) {
159
0
        partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy();
160
0
    }
161
0
    max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
162
0
                                         ? partial_update_info_pb->max_version_in_flush_phase()
163
0
                                         : -1;
164
0
    partial_update_input_columns.clear();
165
0
    for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
166
0
        partial_update_input_columns.insert(col);
167
0
    }
168
0
    missing_cids.clear();
169
0
    for (auto cid : partial_update_info_pb->missing_cids()) {
170
0
        missing_cids.push_back(cid);
171
0
    }
172
0
    update_cids.clear();
173
0
    for (auto cid : partial_update_info_pb->update_cids()) {
174
0
        update_cids.push_back(cid);
175
0
    }
176
0
    can_insert_new_rows_in_partial_update =
177
0
            partial_update_info_pb->can_insert_new_rows_in_partial_update();
178
0
    is_strict_mode = partial_update_info_pb->is_strict_mode();
179
0
    timestamp_ms = partial_update_info_pb->timestamp_ms();
180
0
    timezone = partial_update_info_pb->timezone();
181
0
    is_input_columns_contains_auto_inc_column =
182
0
            partial_update_info_pb->is_input_columns_contains_auto_inc_column();
183
0
    is_schema_contains_auto_inc_column =
184
0
            partial_update_info_pb->is_schema_contains_auto_inc_column();
185
0
    if (partial_update_info_pb->has_nano_seconds()) {
186
0
        nano_seconds = partial_update_info_pb->nano_seconds();
187
0
    }
188
0
    default_values.clear();
189
0
    for (const auto& value : partial_update_info_pb->default_values()) {
190
0
        default_values.push_back(value);
191
0
    }
192
0
}
193
194
0
std::string PartialUpdateInfo::summary() const {
195
0
    std::string mode;
196
0
    switch (partial_update_mode) {
197
0
    case UniqueKeyUpdateModePB::UPSERT:
198
0
        mode = "upsert";
199
0
        break;
200
0
    case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS:
201
0
        mode = "fixed partial update";
202
0
        break;
203
0
    case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
204
0
        mode = "flexible partial update";
205
0
        break;
206
0
    }
207
0
    return fmt::format(
208
0
            "mode={}, update_cids={}, missing_cids={}, is_strict_mode={}, "
209
0
            "max_version_in_flush_phase={}",
210
0
            mode, update_cids.size(), missing_cids.size(), is_strict_mode,
211
0
            max_version_in_flush_phase);
212
0
}
213
214
Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema,
215
                                         const std::function<std::string()>& line,
216
9
                                         BitmapValue* skip_bitmap) {
217
9
    switch (partial_update_new_key_policy) {
218
8
    case doris::PartialUpdateNewRowPolicyPB::APPEND: {
219
8
        if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
220
4
            if (!can_insert_new_rows_in_partial_update) {
221
1
                std::string error_column;
222
1
                for (auto cid : missing_cids) {
223
1
                    const TabletColumn& col = tablet_schema.column(cid);
224
1
                    if (!col.has_default_value() && !col.is_nullable() &&
225
1
                        !(tablet_schema.auto_increment_column() == col.name())) {
226
1
                        error_column = col.name();
227
1
                        break;
228
1
                    }
229
1
                }
230
1
                return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
231
1
                        "the unmentioned column `{}` should have default value or be nullable "
232
1
                        "for newly inserted rows in non-strict mode partial update",
233
1
                        error_column);
234
1
            }
235
4
        } else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) {
236
4
            DCHECK(skip_bitmap != nullptr);
237
4
            bool can_insert_new_row {true};
238
4
            std::string error_column;
239
14
            for (auto cid : missing_cids) {
240
14
                const TabletColumn& col = tablet_schema.column(cid);
241
14
                if (skip_bitmap->contains(col.unique_id()) && !col.has_default_value() &&
242
14
                    !col.is_nullable() && !col.is_auto_increment()) {
243
0
                    error_column = col.name();
244
0
                    can_insert_new_row = false;
245
0
                    break;
246
0
                }
247
14
            }
248
4
            if (!can_insert_new_row) {
249
0
                return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
250
0
                        "the unmentioned column `{}` should have default value or be "
251
0
                        "nullable for newly inserted rows in non-strict mode flexible partial "
252
0
                        "update",
253
0
                        error_column);
254
0
            }
255
4
        }
256
8
    } break;
257
7
    case doris::PartialUpdateNewRowPolicyPB::ERROR: {
258
1
        return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>(
259
1
                "Can't append new rows in partial update when partial_update_new_key_behavior is "
260
1
                "ERROR. Row with key=[{}] is not in table.",
261
1
                line());
262
8
    } break;
263
9
    }
264
7
    return Status::OK();
265
9
}
266
267
void PartialUpdateInfo::_generate_default_values_for_missing_cids(
268
43
        const TabletSchema& tablet_schema) {
269
114
    for (unsigned int cur_cid : missing_cids) {
270
114
        const auto& column = tablet_schema.column(cur_cid);
271
114
        if (column.has_default_value()) {
272
94
            std::string default_value;
273
94
            if (UNLIKELY((column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 ||
274
94
                          column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) &&
275
94
                         to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
276
94
                                 std::string::npos)) {
277
0
                auto pos = to_lower(column.default_value()).find('(');
278
0
                if (pos == std::string::npos) {
279
0
                    DateV2Value<DateTimeV2ValueType> dtv;
280
0
                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
281
0
                    default_value = dtv.to_string();
282
0
                    if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) {
283
0
                        default_value += timezone;
284
0
                    }
285
0
                } else {
286
0
                    int precision = std::stoi(column.default_value().substr(pos + 1));
287
0
                    DateV2Value<DateTimeV2ValueType> dtv;
288
0
                    dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
289
0
                    default_value = dtv.to_string();
290
0
                    if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) {
291
0
                        default_value += timezone;
292
0
                    }
293
0
                }
294
94
            } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
295
94
                                to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
296
94
                                        std::string::npos)) {
297
0
                DateV2Value<DateV2ValueType> dv;
298
0
                dv.from_unixtime(timestamp_ms / 1000, timezone);
299
0
                default_value = dv.to_string();
300
94
            } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_BITMAP &&
301
94
                                to_lower(column.default_value()).find(to_lower("BITMAP_EMPTY")) !=
302
94
                                        std::string::npos)) {
303
0
                BitmapValue v = BitmapValue {};
304
0
                default_value = v.to_string();
305
94
            } else {
306
94
                default_value = column.default_value();
307
94
            }
308
94
            default_values.emplace_back(default_value);
309
94
        } else {
310
            // place an empty string here
311
20
            default_values.emplace_back();
312
20
        }
313
114
    }
314
43
    CHECK_EQ(missing_cids.size(), default_values.size());
315
43
}
316
317
3
bool FixedReadPlan::empty() const {
318
3
    return plan.empty();
319
3
}
320
321
19
void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) {
322
19
    plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos);
323
19
}
324
325
// read columns by read plan
326
// read_index: ori_pos-> block_idx
327
Status FixedReadPlan::read_columns_by_plan(
328
        const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read,
329
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& block,
330
        std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs,
331
17
        const signed char* __restrict cur_delete_signs) const {
332
17
    if (force_read_old_delete_signs) {
333
        // always read delete sign column from historical data
334
15
        if (block.get_position_by_name(DELETE_SIGN) == -1) {
335
4
            auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
336
4
            cids_to_read.emplace_back(del_col_cid);
337
4
            block.swap(tablet_schema.create_block_by_cids(cids_to_read));
338
4
        }
339
15
    }
340
17
    bool has_row_column = tablet_schema.has_row_store_for_all_columns();
341
17
    std::optional<Block::ScopedMutableColumns> mutable_columns_guard;
342
17
    MutableColumns* mutable_columns = nullptr;
343
17
    if (!has_row_column) {
344
15
        mutable_columns_guard.emplace(block);
345
15
        mutable_columns = &mutable_columns_guard->mutable_columns();
346
15
    }
347
17
    uint32_t read_idx = 0;
348
17
    for (const auto& [rowset_id, segment_row_mappings] : plan) {
349
12
        for (const auto& [segment_id, mappings] : segment_row_mappings) {
350
12
            auto rowset_iter = rsid_to_rowset.find(rowset_id);
351
12
            CHECK(rowset_iter != rsid_to_rowset.end());
352
12
            std::vector<uint32_t> rids;
353
18
            for (auto [rid, pos] : mappings) {
354
18
                if (cur_delete_signs && cur_delete_signs[pos]) {
355
0
                    continue;
356
0
                }
357
18
                rids.emplace_back(rid);
358
18
                (*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
359
18
            }
360
12
            if (has_row_column) {
361
2
                auto st = BaseTablet::fetch_value_through_row_column(
362
2
                        rowset_iter->second, tablet_schema, segment_id, rids, cids_to_read, block);
363
2
                if (!st.ok()) {
364
0
                    LOG(WARNING) << "failed to fetch value through row column";
365
0
                    return st;
366
0
                }
367
2
                continue;
368
2
            }
369
28
            for (size_t cid = 0; cid < mutable_columns->size(); ++cid) {
370
18
                TabletColumn tablet_column = tablet_schema.column(cids_to_read[cid]);
371
18
                auto st = doris::BaseTablet::fetch_value_by_rowids(rowset_iter->second, segment_id,
372
18
                                                                   rids, tablet_column,
373
18
                                                                   (*mutable_columns)[cid]);
374
                // set read value to output block
375
18
                if (!st.ok()) {
376
0
                    LOG(WARNING) << "failed to fetch value";
377
0
                    return st;
378
0
                }
379
18
            }
380
10
        }
381
12
    }
382
17
    return Status::OK();
383
17
}
384
385
Status FixedReadPlan::fill_missing_columns(
386
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
387
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
388
        const TabletSchema& tablet_schema, Block& full_block,
389
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
390
15
        uint32_t segment_start_pos, const Block* block) const {
391
15
    auto mutable_full_columns_guard = full_block.mutate_columns_scoped();
392
15
    auto& mutable_full_columns = mutable_full_columns_guard.mutable_columns();
393
    // create old value columns
394
15
    DCHECK(historical_context.partial_update_info != nullptr);
395
15
    DCHECK(historical_context.tablet_schema != nullptr);
396
15
    const auto& partial_update_info = *historical_context.partial_update_info;
397
15
    const auto& missing_cids = partial_update_info.missing_cids;
398
15
    bool have_input_seq_column = false;
399
15
    if (tablet_schema.has_sequence_col()) {
400
4
        const std::vector<uint32_t>& including_cids = partial_update_info.update_cids;
401
4
        have_input_seq_column =
402
4
                (std::find(including_cids.cbegin(), including_cids.cend(),
403
4
                           tablet_schema.sequence_col_idx()) != including_cids.cend());
404
4
    }
405
406
15
    auto old_value_block = tablet_schema.create_block_by_cids(missing_cids);
407
15
    CHECK_EQ(missing_cids.size(), old_value_block.columns());
408
409
    // segment pos to write -> rowid to read in old_value_block
410
15
    std::map<uint32_t, uint32_t> read_index;
411
15
    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
412
15
                                         old_value_block, &read_index, true, nullptr));
413
414
15
    const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
415
15
    if (old_delete_signs == nullptr) {
416
0
        return Status::InternalError("old delete signs column not found, block: {}",
417
0
                                     old_value_block.dump_structure());
418
0
    }
419
    // build default value columns
420
15
    auto default_value_block = old_value_block.clone_empty();
421
15
    RETURN_IF_ERROR(BaseTablet::generate_default_value_block(tablet_schema, missing_cids,
422
15
                                                             partial_update_info.default_values,
423
15
                                                             old_value_block, default_value_block));
424
15
    auto mutable_default_value_columns_guard = default_value_block.mutate_columns_scoped();
425
15
    auto& mutable_default_value_columns = mutable_default_value_columns_guard.mutable_columns();
426
427
    // fill all missing value from mutable_old_columns, need to consider default value and null value
428
39
    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
429
24
        auto segment_pos = idx + segment_start_pos;
430
24
        auto pos_in_old_block = read_index[segment_pos];
431
432
70
        for (auto i = 0; i < missing_cids.size(); ++i) {
433
            // if the column has default value, fill it with default value
434
            // otherwise, if the column is nullable, fill it with null value
435
46
            const auto& tablet_column = tablet_schema.column(missing_cids[i]);
436
46
            auto& missing_col = mutable_full_columns[missing_cids[i]];
437
438
46
            bool should_use_default = use_default_or_null_flag[idx];
439
46
            if (!should_use_default) {
440
32
                bool old_row_delete_sign =
441
32
                        (old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0);
442
32
                if (old_row_delete_sign) {
443
0
                    if (!tablet_schema.has_sequence_col()) {
444
0
                        should_use_default = true;
445
0
                    } else if (have_input_seq_column || (!tablet_column.is_seqeunce_col())) {
446
                        // to keep the sequence column value not decreasing, we should read values of seq column
447
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
448
                        // it may cause the merge-on-read based compaction to produce incorrect results
449
0
                        should_use_default = true;
450
0
                    }
451
0
                }
452
32
            }
453
454
46
            if (should_use_default) {
455
14
                if (tablet_column.has_default_value()) {
456
14
                    missing_col->insert_from(*mutable_default_value_columns[i], 0);
457
14
                } else if (tablet_column.is_nullable()) {
458
0
                    auto* nullable_column = assert_cast<ColumnNullable*>(missing_col.get());
459
0
                    nullable_column->insert_many_defaults(1);
460
0
                } else if (tablet_schema.auto_increment_column() == tablet_column.name()) {
461
0
                    const auto& column = *DORIS_TRY(
462
0
                            historical_context.tablet_schema->column(tablet_column.name()));
463
0
                    DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
464
0
                    auto* auto_inc_column = assert_cast<ColumnInt64*>(missing_col.get());
465
0
                    int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
466
0
                    if (pos == -1) {
467
0
                        return Status::InternalError("auto increment column not found in block {}",
468
0
                                                     block->dump_structure());
469
0
                    }
470
0
                    auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
471
0
                } else {
472
                    // If the control flow reaches this branch, the column neither has default value
473
                    // nor is nullable. It means that the row's delete sign is marked, and the value
474
                    // columns are useless and won't be read. So we can just put arbitary values in the cells
475
0
                    missing_col->insert_default();
476
0
                }
477
32
            } else {
478
32
                missing_col->insert_from(*old_value_block.get_by_position(i).column,
479
32
                                         pos_in_old_block);
480
32
            }
481
46
        }
482
24
    }
483
15
    return Status::OK();
484
15
}
485
486
void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos,
487
9
                                       const BitmapValue& skip_bitmap) {
488
9
    if (!use_row_store) {
489
10
        for (uint64_t col_uid : skip_bitmap) {
490
10
            plan[row_location.rowset_id][row_location.segment_id][static_cast<uint32_t>(col_uid)]
491
10
                    .emplace_back(row_location.row_id, pos);
492
10
        }
493
9
    } else {
494
0
        row_store_plan[row_location.rowset_id][row_location.segment_id].emplace_back(
495
0
                row_location.row_id, pos);
496
0
    }
497
9
}
498
499
Status FlexibleReadPlan::read_columns_by_plan(
500
        const TabletSchema& tablet_schema,
501
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block,
502
15
        std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const {
503
15
    auto mutable_columns_guard = old_value_block.mutate_columns_scoped();
504
15
    auto& mutable_columns = mutable_columns_guard.mutable_columns();
505
506
    // cid -> next rid to fill in block
507
15
    std::map<uint32_t, uint32_t> next_read_idx;
508
88
    for (uint32_t cid {0}; cid < tablet_schema.num_columns(); cid++) {
509
73
        next_read_idx[cid] = 0;
510
73
    }
511
512
15
    for (const auto& [rowset_id, segment_mappings] : plan) {
513
8
        for (const auto& [segment_id, uid_mappings] : segment_mappings) {
514
9
            for (const auto& [col_uid, mappings] : uid_mappings) {
515
9
                auto rowset_iter = rsid_to_rowset.find(rowset_id);
516
9
                CHECK(rowset_iter != rsid_to_rowset.end());
517
9
                auto cid = tablet_schema.field_index(col_uid);
518
9
                DCHECK_NE(cid, -1);
519
9
                DCHECK_GE(cid, tablet_schema.num_key_columns());
520
9
                std::vector<uint32_t> rids;
521
10
                for (auto [rid, pos] : mappings) {
522
10
                    rids.emplace_back(rid);
523
10
                    (*read_index)[cid][static_cast<uint32_t>(pos)] = next_read_idx[cid]++;
524
10
                }
525
526
9
                TabletColumn tablet_column = tablet_schema.column(cid);
527
9
                auto idx = cid - tablet_schema.num_key_columns();
528
9
                RETURN_IF_ERROR(doris::BaseTablet::fetch_value_by_rowids(
529
9
                        rowset_iter->second, segment_id, rids, tablet_column,
530
9
                        mutable_columns[idx]));
531
9
            }
532
8
        }
533
8
    }
534
    // !!!ATTENTION!!!: columns in block may have different size because every row has different columns to update
535
15
    return Status::OK();
536
15
}
537
538
Status FlexibleReadPlan::read_columns_by_plan(
539
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& cids_to_read,
540
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block,
541
0
        std::map<uint32_t, uint32_t>* read_index) const {
542
0
    DCHECK(use_row_store);
543
0
    uint32_t read_idx = 0;
544
0
    for (const auto& [rowset_id, segment_row_mappings] : row_store_plan) {
545
0
        for (const auto& [segment_id, mappings] : segment_row_mappings) {
546
0
            auto rowset_iter = rsid_to_rowset.find(rowset_id);
547
0
            CHECK(rowset_iter != rsid_to_rowset.end());
548
0
            std::vector<uint32_t> rids;
549
0
            for (auto [rid, pos] : mappings) {
550
0
                rids.emplace_back(rid);
551
0
                (*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
552
0
            }
553
0
            auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second, tablet_schema,
554
0
                                                                 segment_id, rids, cids_to_read,
555
0
                                                                 old_value_block);
556
0
            if (!st.ok()) {
557
0
                LOG(WARNING) << "failed to fetch value through row column";
558
0
                return st;
559
0
            }
560
0
        }
561
0
    }
562
0
    return Status::OK();
563
0
}
564
565
Status FlexibleReadPlan::fill_non_primary_key_columns(
566
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
567
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
568
        const TabletSchema& tablet_schema, Block& full_block,
569
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
570
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
571
15
        std::vector<BitmapValue>* skip_bitmaps) const {
572
15
    auto mutable_full_columns_guard = full_block.mutate_columns_scoped();
573
15
    auto& mutable_full_columns = mutable_full_columns_guard.mutable_columns();
574
15
    DCHECK(historical_context.partial_update_info != nullptr);
575
576
    // missing_cids are all non sort key columns' cids
577
15
    const auto& non_sort_key_cids = historical_context.partial_update_info->missing_cids;
578
15
    auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids);
579
15
    CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns());
580
581
15
    if (!use_row_store) {
582
15
        RETURN_IF_ERROR(fill_non_primary_key_columns_for_column_store(
583
15
                historical_context, rsid_to_rowset, tablet_schema, non_sort_key_cids,
584
15
                old_value_block, mutable_full_columns, use_default_or_null_flag,
585
15
                has_default_or_nullable, segment_start_pos, block_start_pos, block, skip_bitmaps));
586
15
    } else {
587
0
        RETURN_IF_ERROR(fill_non_primary_key_columns_for_row_store(
588
0
                historical_context, rsid_to_rowset, tablet_schema, non_sort_key_cids,
589
0
                old_value_block, mutable_full_columns, use_default_or_null_flag,
590
0
                has_default_or_nullable, segment_start_pos, block_start_pos, block, skip_bitmaps));
591
0
    }
592
15
    return Status::OK();
593
15
}
594
595
static void fill_non_primary_key_cell_for_column_store(
596
        const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col,
597
        const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col,
598
        std::size_t block_pos, uint32_t segment_pos, bool skipped, bool row_has_sequence_col,
599
        bool use_default, const signed char* delete_sign_column_data,
600
        const TabletSchema& tablet_schema,
601
        std::map<uint32_t, std::map<uint32_t, uint32_t>>& read_index,
602
60
        const PartialUpdateInfo* info) {
603
60
    if (skipped) {
604
19
        DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
605
19
        DCHECK(cid != tablet_schema.version_col_idx());
606
19
        DCHECK(!tablet_column.is_row_store_column());
607
608
19
        if (!use_default) {
609
10
            if (delete_sign_column_data != nullptr) {
610
10
                bool old_row_delete_sign = false;
611
10
                if (auto it = read_index[tablet_schema.delete_sign_idx()].find(segment_pos);
612
10
                    it != read_index[tablet_schema.delete_sign_idx()].end()) {
613
8
                    old_row_delete_sign = (delete_sign_column_data[it->second] != 0);
614
8
                }
615
616
10
                if (old_row_delete_sign) {
617
0
                    if (!tablet_schema.has_sequence_col()) {
618
0
                        use_default = true;
619
0
                    } else if (row_has_sequence_col ||
620
0
                               (!tablet_column.is_seqeunce_col() &&
621
0
                                (tablet_column.unique_id() != info->sequence_map_col_uid()))) {
622
                        // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
623
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
624
                        // it may cause the merge-on-read based compaction to produce incorrect results
625
0
                        use_default = true;
626
0
                    }
627
0
                }
628
10
            }
629
10
        }
630
19
        if (!use_default && tablet_column.is_on_update_current_timestamp()) {
631
0
            use_default = true;
632
0
        }
633
19
        if (use_default) {
634
9
            if (tablet_column.has_default_value()) {
635
9
                new_col->insert_from(default_value_col, 0);
636
9
            } else if (tablet_column.is_nullable()) {
637
0
                assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get())
638
0
                        ->insert_many_defaults(1);
639
0
            } else if (tablet_column.is_auto_increment()) {
640
                // In flexible partial update, the skip bitmap indicates whether a cell
641
                // is specified in the original load, so the generated auto-increment value is filled
642
                // in current block in place if needed rather than using a seperate column to
643
                // store the generated auto-increment value in fixed partial update
644
0
                new_col->insert_from(cur_col, block_pos);
645
0
            } else {
646
0
                new_col->insert_default();
647
0
            }
648
10
        } else {
649
10
            auto pos_in_old_block = read_index.at(cid).at(segment_pos);
650
10
            new_col->insert_from(old_value_col, pos_in_old_block);
651
10
        }
652
41
    } else {
653
41
        new_col->insert_from(cur_col, block_pos);
654
41
    }
655
60
}
656
657
Status FlexibleReadPlan::fill_non_primary_key_columns_for_column_store(
658
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
659
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
660
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
661
        Block& old_value_block, MutableColumns& mutable_full_columns,
662
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
663
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
664
15
        std::vector<BitmapValue>* skip_bitmaps) const {
665
15
    auto* info = historical_context.partial_update_info.get();
666
15
    int32_t seq_col_unique_id = -1;
667
15
    if (tablet_schema.has_sequence_col()) {
668
11
        seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
669
11
    }
670
    // cid -> segment pos to write -> rowid to read in old_value_block
671
15
    std::map<uint32_t, std::map<uint32_t, uint32_t>> read_index;
672
15
    RETURN_IF_ERROR(
673
15
            read_columns_by_plan(tablet_schema, rsid_to_rowset, old_value_block, &read_index));
674
    // !!!ATTENTION!!!: columns in old_value_block may have different size because every row has different columns to update
675
676
15
    const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
677
    // build default value columns
678
15
    auto default_value_block = old_value_block.clone_empty();
679
15
    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
680
15
        RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
681
15
                tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
682
15
                default_value_block));
683
15
    }
684
685
    // fill all non sort key columns from mutable_old_columns, need to consider default value and null value
686
73
    for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
687
58
        auto cid = non_sort_key_cids[i];
688
58
        const auto& tablet_column = tablet_schema.column(cid);
689
58
        auto col_uid = tablet_column.unique_id();
690
118
        for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
691
60
            auto segment_pos = segment_start_pos + idx;
692
60
            auto block_pos = block_start_pos + idx;
693
694
60
            fill_non_primary_key_cell_for_column_store(
695
60
                    tablet_column, cid, mutable_full_columns[cid],
696
60
                    *default_value_block.get_by_position(i).column,
697
60
                    *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column,
698
60
                    block_pos, segment_pos, skip_bitmaps->at(block_pos).contains(col_uid),
699
60
                    tablet_schema.has_sequence_col()
700
60
                            ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
701
60
                            : false,
702
60
                    use_default_or_null_flag[idx], delete_sign_column_data, tablet_schema,
703
60
                    read_index, info);
704
60
        }
705
58
    }
706
15
    return Status::OK();
707
15
}
708
709
static void fill_non_primary_key_cell_for_row_store(
710
        const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col,
711
        const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col,
712
        std::size_t block_pos, bool skipped, bool row_has_sequence_col, bool use_default,
713
        const signed char* delete_sign_column_data, uint32_t pos_in_old_block,
714
0
        const TabletSchema& tablet_schema, const PartialUpdateInfo* info) {
715
0
    if (skipped) {
716
0
        DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
717
0
        DCHECK(cid != tablet_schema.version_col_idx());
718
0
        DCHECK(!tablet_column.is_row_store_column());
719
0
        if (!use_default) {
720
0
            if (delete_sign_column_data != nullptr) {
721
0
                bool old_row_delete_sign = (delete_sign_column_data[pos_in_old_block] != 0);
722
0
                if (old_row_delete_sign) {
723
0
                    if (!tablet_schema.has_sequence_col()) {
724
0
                        use_default = true;
725
0
                    } else if (row_has_sequence_col ||
726
0
                               (!tablet_column.is_seqeunce_col() &&
727
0
                                (tablet_column.unique_id() != info->sequence_map_col_uid()))) {
728
                        // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
729
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
730
                        // it may cause the merge-on-read based compaction to produce incorrect results
731
0
                        use_default = true;
732
0
                    }
733
0
                }
734
0
            }
735
0
        }
736
737
0
        if (!use_default && tablet_column.is_on_update_current_timestamp()) {
738
0
            use_default = true;
739
0
        }
740
0
        if (use_default) {
741
0
            if (tablet_column.has_default_value()) {
742
0
                new_col->insert_from(default_value_col, 0);
743
0
            } else if (tablet_column.is_nullable()) {
744
0
                assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get())
745
0
                        ->insert_many_defaults(1);
746
0
            } else if (tablet_column.is_auto_increment()) {
747
                // In flexible partial update, the skip bitmap indicates whether a cell
748
                // is specified in the original load, so the generated auto-increment value is filled
749
                // in current block in place if needed rather than using a seperate column to
750
                // store the generated auto-increment value in fixed partial update
751
0
                new_col->insert_from(cur_col, block_pos);
752
0
            } else {
753
0
                new_col->insert_default();
754
0
            }
755
0
        } else {
756
0
            new_col->insert_from(old_value_col, pos_in_old_block);
757
0
        }
758
0
    } else {
759
0
        new_col->insert_from(cur_col, block_pos);
760
0
    }
761
0
}
762
763
Status FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
764
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
765
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
766
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
767
        Block& old_value_block, MutableColumns& mutable_full_columns,
768
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
769
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
770
0
        std::vector<BitmapValue>* skip_bitmaps) const {
771
0
    auto* info = historical_context.partial_update_info.get();
772
0
    int32_t seq_col_unique_id = -1;
773
0
    if (tablet_schema.has_sequence_col()) {
774
0
        seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
775
0
    }
776
    // segment pos to write -> rowid to read in old_value_block
777
0
    std::map<uint32_t, uint32_t> read_index;
778
0
    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, non_sort_key_cids, rsid_to_rowset,
779
0
                                         old_value_block, &read_index));
780
781
0
    const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
782
    // build default value columns
783
0
    auto default_value_block = old_value_block.clone_empty();
784
0
    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
785
0
        RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
786
0
                tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
787
0
                default_value_block));
788
0
    }
789
790
    // fill all non sort key columns from mutable_old_columns, need to consider default value and null value
791
0
    for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
792
0
        auto cid = non_sort_key_cids[i];
793
0
        const auto& tablet_column = tablet_schema.column(cid);
794
0
        auto col_uid = tablet_column.unique_id();
795
0
        for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
796
0
            auto segment_pos = segment_start_pos + idx;
797
0
            auto block_pos = block_start_pos + idx;
798
0
            auto pos_in_old_block = read_index[segment_pos];
799
800
0
            fill_non_primary_key_cell_for_row_store(
801
0
                    tablet_column, cid, mutable_full_columns[cid],
802
0
                    *default_value_block.get_by_position(i).column,
803
0
                    *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column,
804
0
                    block_pos, skip_bitmaps->at(block_pos).contains(col_uid),
805
0
                    tablet_schema.has_sequence_col()
806
0
                            ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
807
0
                            : false,
808
0
                    use_default_or_null_flag[idx], delete_sign_column_data, pos_in_old_block,
809
0
                    tablet_schema, info);
810
0
        }
811
0
    }
812
0
    return Status::OK();
813
0
}
814
815
BlockAggregator::BlockAggregator(TabletSchema& tablet_schema, BaseTabletSPtr tablet,
816
                                 std::shared_ptr<MowContext> mow_context,
817
                                 const PartialUpdateInfo& partial_update_info,
818
                                 const RowKeyEncoder& key_encoder,
819
                                 const segment_v2::MowKeyProbe& probe,
820
                                 HistoricalRowFetcher& fetcher)
821
15
        : _tablet_schema(tablet_schema),
822
15
          _tablet(std::move(tablet)),
823
15
          _mow_context(std::move(mow_context)),
824
15
          _partial_update_info(partial_update_info),
825
15
          _key_encoder(key_encoder),
826
15
          _convertor(std::make_unique<OlapBlockDataConvertor>()),
827
15
          _probe(probe),
828
15
          _fetcher(fetcher) {
829
15
    _convertor->resize(tablet_schema.num_columns());
830
30
    for (uint32_t cid = 0; cid < tablet_schema.num_key_columns(); ++cid) {
831
15
        _convertor->add_column_data_convertor_at(tablet_schema.column(cid), cid);
832
15
    }
833
15
    if (tablet_schema.has_sequence_col()) {
834
11
        auto cid = cast_set<uint32_t>(tablet_schema.sequence_col_idx());
835
11
        _convertor->add_column_data_convertor_at(tablet_schema.column(cid), cid);
836
11
    }
837
15
}
838
839
15
BlockAggregator::~BlockAggregator() = default;
840
841
void BlockAggregator::merge_one_row(MutableBlock& dst_block, Block* src_block, int rid,
842
5
                                    BitmapValue& skip_bitmap) {
843
25
    for (size_t cid {_tablet_schema.num_key_columns()}; cid < _tablet_schema.num_columns(); cid++) {
844
20
        if (cid == _tablet_schema.skip_bitmap_col_idx()) {
845
5
            auto& cur_skip_bitmap =
846
5
                    assert_cast<ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
847
5
                            ->get_data()
848
5
                            .back();
849
5
            const auto& new_row_skip_bitmap =
850
5
                    assert_cast<const ColumnBitmap*>(src_block->get_by_position(cid).column.get())
851
5
                            ->get_data()[rid];
852
5
            cur_skip_bitmap &= new_row_skip_bitmap;
853
5
            continue;
854
5
        }
855
15
        if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
856
8
            dst_block.mutable_columns()[cid]->pop_back(1);
857
8
            dst_block.mutable_columns()[cid]->insert_from(*src_block->get_by_position(cid).column,
858
8
                                                          rid);
859
8
        }
860
15
    }
861
5
    VLOG_DEBUG << fmt::format("merge a row, after merge, output_block.rows()={}, state: {}",
862
0
                              dst_block.rows(), _state.to_string());
863
5
}
864
865
13
void BlockAggregator::append_one_row(MutableBlock& dst_block, Block* src_block, int rid) {
866
13
    dst_block.add_row(src_block, rid);
867
13
    _state.rows++;
868
13
    VLOG_DEBUG << fmt::format("append a new row, after append, output_block.rows()={}, state: {}",
869
0
                              dst_block.rows(), _state.to_string());
870
13
}
871
872
3
void BlockAggregator::remove_last_n_rows(MutableBlock& dst_block, int n) {
873
3
    if (n > 0) {
874
6
        for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
875
5
            DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
876
5
            dst_block.mutable_columns()[cid]->pop_back(n);
877
5
        }
878
1
    }
879
3
}
880
881
void BlockAggregator::append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid,
882
18
                                          BitmapValue& skip_bitmap, bool have_delete_sign) {
883
18
    if (have_delete_sign) {
884
        // remove all the previous batched rows
885
3
        remove_last_n_rows(dst_block, _state.rows);
886
3
        _state.rows = 0;
887
3
        _state.has_row_with_delete_sign = true;
888
889
3
        append_one_row(dst_block, src_block, rid);
890
15
    } else {
891
15
        if (_state.should_merge()) {
892
5
            merge_one_row(dst_block, src_block, rid, skip_bitmap);
893
10
        } else {
894
10
            append_one_row(dst_block, src_block, rid);
895
10
        }
896
15
    }
897
18
};
898
899
Status BlockAggregator::aggregate_rows(
900
        MutableBlock& output_block, Block* block, int start, int end, std::string key,
901
        std::vector<BitmapValue>* skip_bitmaps, const signed char* delete_signs,
902
        IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets,
903
11
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
904
11
    VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
905
11
    if (end - start == 1) {
906
0
        output_block.add_row(block, start);
907
0
        VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
908
0
        return Status::OK();
909
0
    }
910
911
11
    auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
912
11
    auto delete_sign_col_unique_id =
913
11
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
914
915
11
    _state.reset();
916
917
11
    std::string previous_encoded_seq_value {};
918
11
    segment_v2::ProbeOutcome probe_out;
919
11
    Status st = _probe.probe_previous_seq_value(key, specified_rowsets, segment_caches,
920
11
                                                &previous_encoded_seq_value, &probe_out);
921
11
    int pos = start;
922
11
    DCHECK(st.ok() || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
923
0
            << "[BlockAggregator::aggregate_rows] unexpected error status while lookup_row_key:"
924
0
            << st;
925
11
    if (!st.ok()) {
926
0
        return st;
927
0
    }
928
929
11
    std::string cur_seq_val;
930
11
    if (probe_out.result == segment_v2::KeyProbeResult::FOUND) {
931
12
        for (pos = start; pos < end; pos++) {
932
11
            auto& skip_bitmap = skip_bitmaps->at(pos);
933
11
            bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
934
            // Discard all the rows whose seq value is smaller than previous_encoded_seq_value.
935
11
            if (row_has_sequence_col) {
936
9
                std::string seq_val {};
937
9
                _key_encoder.append_seq_suffix(&seq_val, seq_column, pos);
938
9
                if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) < 0) {
939
3
                    continue;
940
3
                }
941
6
                cur_seq_val = std::move(seq_val);
942
6
                break;
943
9
            }
944
2
            cur_seq_val = std::move(previous_encoded_seq_value);
945
2
            break;
946
11
        }
947
9
    } else {
948
2
        pos = start;
949
2
        auto& skip_bitmap = skip_bitmaps->at(pos);
950
2
        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
951
2
        if (row_has_sequence_col) {
952
1
            std::string seq_val {};
953
            // for rows that don't specify seqeunce col, seq_val will be encoded to minial value
954
1
            _key_encoder.append_seq_suffix(&seq_val, seq_column, pos);
955
1
            cur_seq_val = std::move(seq_val);
956
1
        } else {
957
1
            cur_seq_val.clear();
958
1
            RETURN_IF_ERROR(_generate_encoded_default_seq_value(&cur_seq_val));
959
1
        }
960
2
    }
961
962
32
    for (int rid {pos}; rid < end; rid++) {
963
21
        auto& skip_bitmap = skip_bitmaps->at(rid);
964
21
        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
965
21
        bool have_delete_sign =
966
21
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0);
967
21
        if (!row_has_sequence_col) {
968
4
            append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign);
969
17
        } else {
970
17
            std::string seq_val {};
971
17
            _key_encoder.append_seq_suffix(&seq_val, seq_column, rid);
972
17
            if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) {
973
14
                append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign);
974
14
                cur_seq_val = std::move(seq_val);
975
14
            } else {
976
3
                VLOG_DEBUG << fmt::format(
977
0
                        "skip rid={} becasue its seq value is lower than the previous", rid);
978
3
            }
979
17
        }
980
21
    }
981
11
    return Status::OK();
982
11
};
983
984
1
Status BlockAggregator::_generate_encoded_default_seq_value(std::string* encoded_value) {
985
1
    const auto& seq_column = _tablet_schema.column(_tablet_schema.sequence_col_idx());
986
1
    auto block = _tablet_schema.create_block_by_cids(
987
1
            {cast_set<uint32_t>(_tablet_schema.sequence_col_idx())});
988
1
    if (seq_column.has_default_value()) {
989
1
        auto idx = _tablet_schema.sequence_col_idx() - _tablet_schema.num_key_columns();
990
1
        const auto& default_value = _partial_update_info.default_values[idx];
991
1
        StringRef str {default_value};
992
1
        RETURN_IF_ERROR(block.get_by_position(0).type->get_serde()->default_from_string(
993
1
                str, *block.get_by_position(0).column->assert_mutable().get()));
994
995
1
    } else {
996
0
        block.get_by_position(0).column->assert_mutable()->insert_default();
997
0
    }
998
1
    DCHECK_EQ(block.rows(), 1);
999
1
    auto olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
1000
1
    olap_data_convertor->add_column_data_convertor(seq_column);
1001
1
    olap_data_convertor->set_source_content(&block, 0, 1);
1002
1
    auto [status, column] = olap_data_convertor->convert_column_data(0);
1003
1
    if (!status.ok()) {
1004
0
        return status;
1005
0
    }
1006
    // include marker
1007
1
    _key_encoder.append_seq_suffix(encoded_value, column, 0);
1008
1
    return Status::OK();
1009
1
}
1010
1011
Status BlockAggregator::aggregate_for_sequence_column(
1012
        Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
1013
        IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets,
1014
11
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1015
11
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
1016
    // the process logic here is the same as MemTable::_aggregate_for_flexible_partial_update_without_seq_col()
1017
    // after this function, there will be at most 2 rows for a specified key
1018
11
    std::vector<BitmapValue>* skip_bitmaps =
1019
11
            &get_mutable_skip_bitmap_column(block, _tablet_schema.skip_bitmap_col_idx())
1020
11
                     ->get_data();
1021
11
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
1022
1023
11
    auto filtered_block = _tablet_schema.create_block();
1024
11
    MutableBlock output_block = MutableBlock::build_mutable_block(std::move(filtered_block));
1025
1026
11
    int same_key_rows {0};
1027
11
    std::string previous_key {};
1028
35
    for (int block_pos {0}; block_pos < num_rows; block_pos++) {
1029
24
        std::string key = _key_encoder.full_encode(key_columns, block_pos);
1030
24
        if (block_pos > 0 && previous_key == key) {
1031
13
            same_key_rows++;
1032
13
        } else {
1033
11
            if (same_key_rows > 0) {
1034
0
                RETURN_IF_ERROR(aggregate_rows(output_block, block, block_pos - same_key_rows,
1035
0
                                               block_pos, std::move(previous_key), skip_bitmaps,
1036
0
                                               delete_signs, seq_column, specified_rowsets,
1037
0
                                               segment_caches));
1038
0
            }
1039
11
            same_key_rows = 1;
1040
11
        }
1041
24
        previous_key = std::move(key);
1042
24
    }
1043
11
    if (same_key_rows > 0) {
1044
11
        RETURN_IF_ERROR(aggregate_rows(output_block, block, num_rows - same_key_rows, num_rows,
1045
11
                                       std::move(previous_key), skip_bitmaps, delete_signs,
1046
11
                                       seq_column, specified_rowsets, segment_caches));
1047
11
    }
1048
1049
11
    block->swap(output_block.to_block());
1050
11
    return Status::OK();
1051
11
}
1052
1053
Status BlockAggregator::fill_sequence_column(Block* block, size_t num_rows,
1054
                                             const FixedReadPlan& read_plan,
1055
1
                                             std::vector<BitmapValue>& skip_bitmaps) {
1056
1
    DCHECK(_tablet_schema.has_sequence_col());
1057
1
    std::vector<uint32_t> cids {static_cast<uint32_t>(_tablet_schema.sequence_col_idx())};
1058
1
    auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
1059
1060
1
    auto seq_col_block = _tablet_schema.create_block_by_cids(cids);
1061
1
    auto tmp_block = _tablet_schema.create_block_by_cids(cids);
1062
1
    std::map<uint32_t, uint32_t> read_index;
1063
1
    RETURN_IF_ERROR(read_plan.read_columns_by_plan(_tablet_schema, cids, _fetcher.pinned_rowsets(),
1064
1
                                                   seq_col_block, &read_index, false));
1065
1066
1
    auto new_seq_col_ptr = tmp_block.get_by_position(0).column->assert_mutable();
1067
1
    const auto& old_seq_col_ptr = *seq_col_block.get_by_position(0).column;
1068
1
    const auto& cur_seq_col_ptr = *block->get_by_position(_tablet_schema.sequence_col_idx()).column;
1069
3
    for (uint32_t block_pos {0}; block_pos < num_rows; block_pos++) {
1070
2
        if (read_index.contains(block_pos)) {
1071
1
            new_seq_col_ptr->insert_from(old_seq_col_ptr, read_index[block_pos]);
1072
1
            skip_bitmaps[block_pos].remove(seq_col_unique_id);
1073
1
        } else {
1074
1
            new_seq_col_ptr->insert_from(cur_seq_col_ptr, block_pos);
1075
1
        }
1076
2
    }
1077
1
    block->replace_by_position(_tablet_schema.sequence_col_idx(), std::move(new_seq_col_ptr));
1078
1
    return Status::OK();
1079
1
}
1080
1081
Status BlockAggregator::aggregate_for_insert_after_delete(
1082
        Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
1083
        const std::vector<RowsetSharedPtr>& specified_rowsets,
1084
15
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1085
15
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
1086
    // there will be at most 2 rows for a specified key in block when control flow reaches here
1087
    // after this function, there will not be duplicate rows in block
1088
1089
15
    std::vector<BitmapValue>* skip_bitmaps =
1090
15
            &get_mutable_skip_bitmap_column(block, _tablet_schema.skip_bitmap_col_idx())
1091
15
                     ->get_data();
1092
15
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
1093
1094
15
    auto filter_column = ColumnUInt8::create(num_rows, 1);
1095
15
    auto* __restrict filter_map = filter_column->get_data().data();
1096
15
    std::string previous_key {};
1097
15
    bool previous_has_delete_sign {false};
1098
15
    int duplicate_rows {0};
1099
15
    int32_t delete_sign_col_unique_id =
1100
15
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
1101
15
    auto seq_col_unique_id =
1102
15
            (_tablet_schema.sequence_col_idx() != -1)
1103
15
                    ? _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id()
1104
15
                    : -1;
1105
15
    FixedReadPlan read_plan;
1106
34
    for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
1107
19
        size_t delta_pos = block_pos;
1108
19
        auto& skip_bitmap = skip_bitmaps->at(block_pos);
1109
19
        std::string key = _key_encoder.full_encode(key_columns, delta_pos);
1110
19
        bool have_delete_sign =
1111
19
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);
1112
19
        if (delta_pos > 0 && previous_key == key) {
1113
            // !!ATTENTION!!: We can only remove the row with delete sign if there is a insert with the same key after this row.
1114
            // If there is only a row with delete sign, we should keep it and can't remove it from block, because
1115
            // compaction will not use the delete bitmap when reading data. So there may still be rows with delete sign
1116
            // in later process
1117
3
            DCHECK(previous_has_delete_sign);
1118
3
            DCHECK(!have_delete_sign);
1119
3
            ++duplicate_rows;
1120
            // the losing delete-sign row is removed from the block below instead of
1121
            // marking itself, so the probe only marks the old row
1122
3
            segment_v2::MowKeyProbe probe(
1123
3
                    _tablet.get(), &_tablet_schema, _tablet_schema.has_sequence_col(), _mow_context,
1124
3
                    RowsetId {}, 0,
1125
3
                    segment_v2::MowKeyProbe::Policy {
1126
3
                            .delete_bitmap_mode =
1127
3
                                    segment_v2::MowKeyProbe::DeleteBitmapMode::OLD_ROW,
1128
3
                            .skip_delete_sign = true,
1129
3
                            .skip_seq_loses = true,
1130
3
                            .skip_in_load_deleted = false,
1131
3
                    });
1132
3
            segment_v2::ProbeOutcome out;
1133
3
            Status st = probe.probe(key, /*segment_pos=*/0, /*key_has_seq_suffix=*/false,
1134
3
                                    /*have_delete_sign=*/false, specified_rowsets, segment_caches,
1135
3
                                    &out, /*stats=*/nullptr);
1136
3
            DCHECK(st.ok() || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
1137
0
                    << "[BlockAggregator::aggregate_for_insert_after_delete] unexpected error "
1138
0
                       "status while lookup_row_key:"
1139
0
                    << st;
1140
3
            if (!st.ok()) {
1141
0
                return st;
1142
0
            }
1143
1144
3
            if (out.result == segment_v2::KeyProbeResult::FOUND) {
1145
2
                if (_tablet_schema.has_sequence_col()) {
1146
                    // if the insert row doesn't specify the sequence column, we need to
1147
                    // read the historical's sequence column value so that we don't need
1148
                    // to handle seqeunce column in append_block_with_flexible_content()
1149
                    // for this row
1150
2
                    bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
1151
2
                    if (!row_has_sequence_col) {
1152
1
                        read_plan.prepare_to_read(out.loc, block_pos);
1153
1
                        _fetcher.pin_rowset(out.rowset);
1154
1
                    }
1155
2
                }
1156
                // the old-row delete mark is set inside probe()
1157
2
            }
1158
            // and remove the row with delete sign from the current block
1159
3
            filter_map[block_pos - 1] = 0;
1160
3
        }
1161
19
        previous_has_delete_sign = have_delete_sign;
1162
19
        previous_key = std::move(key);
1163
19
    }
1164
15
    if (duplicate_rows > 0) {
1165
3
        if (!read_plan.empty()) {
1166
            // fill sequence column value for some rows
1167
1
            RETURN_IF_ERROR(fill_sequence_column(block, num_rows, read_plan, *skip_bitmaps));
1168
1
        }
1169
3
        RETURN_IF_ERROR(filter_block(block, num_rows, std::move(filter_column), duplicate_rows,
1170
3
                                     "__filter_insert_after_delete_col__"));
1171
3
    }
1172
15
    return Status::OK();
1173
15
}
1174
1175
Status BlockAggregator::filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column,
1176
3
                                     int duplicate_rows, std::string col_name) {
1177
3
    auto num_cols = block->columns();
1178
3
    block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), col_name});
1179
3
    RETURN_IF_ERROR(Block::filter_block(block, num_cols, num_cols));
1180
3
    DCHECK_EQ(num_cols, block->columns());
1181
3
    size_t merged_rows = num_rows - block->rows();
1182
3
    if (duplicate_rows != merged_rows) {
1183
0
        auto msg = fmt::format(
1184
0
                "filter_block_for_flexible_partial_update {}: duplicate_rows != merged_rows, "
1185
0
                "duplicate_keys={}, merged_rows={}, num_rows={}, mutable_block->rows()={}",
1186
0
                col_name, duplicate_rows, merged_rows, num_rows, block->rows());
1187
0
        DCHECK(false) << msg;
1188
0
        return Status::InternalError<false>(msg);
1189
0
    }
1190
3
    return Status::OK();
1191
3
}
1192
1193
Status BlockAggregator::convert_pk_columns(Block* block, size_t row_pos, size_t num_rows,
1194
39
                                           std::vector<IOlapColumnDataAccessor*>& key_columns) {
1195
39
    key_columns.clear();
1196
39
    if (num_rows == 0) {
1197
2
        return Status::OK();
1198
2
    }
1199
74
    for (uint32_t cid {0}; cid < _tablet_schema.num_key_columns(); cid++) {
1200
37
        RETURN_IF_ERROR(_convertor->set_source_content_with_specifid_column(
1201
37
                block->get_by_position(cid), row_pos, num_rows, cid));
1202
37
        auto [status, column] = _convertor->convert_column_data(cid);
1203
37
        if (!status.ok()) {
1204
0
            return status;
1205
0
        }
1206
37
        key_columns.push_back(column);
1207
37
    }
1208
37
    return Status::OK();
1209
37
}
1210
1211
Status BlockAggregator::convert_seq_column(Block* block, size_t row_pos, size_t num_rows,
1212
39
                                           IOlapColumnDataAccessor*& seq_column) {
1213
39
    seq_column = nullptr;
1214
39
    if (num_rows == 0) {
1215
2
        return Status::OK();
1216
2
    }
1217
37
    if (_tablet_schema.has_sequence_col()) {
1218
29
        auto seq_col_idx = _tablet_schema.sequence_col_idx();
1219
29
        RETURN_IF_ERROR(_convertor->set_source_content_with_specifid_column(
1220
29
                block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx));
1221
29
        auto [status, column] = _convertor->convert_column_data(seq_col_idx);
1222
29
        if (!status.ok()) {
1223
0
            return status;
1224
0
        }
1225
29
        seq_column = column;
1226
29
    }
1227
37
    return Status::OK();
1228
37
};
1229
1230
Status BlockAggregator::aggregate_for_flexible_partial_update(
1231
        Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets,
1232
15
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1233
15
    std::vector<IOlapColumnDataAccessor*> key_columns {};
1234
15
    IOlapColumnDataAccessor* seq_column {nullptr};
1235
1236
15
    RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
1237
15
    RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
1238
1239
    // 1. merge duplicate rows when table has sequence column
1240
    // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column,
1241
    // some of them don't. We can't do the de-duplication in memtable because we don't know the historical data. We must
1242
    // de-duplicate them here.
1243
15
    if (_tablet_schema.has_sequence_col()) {
1244
11
        RETURN_IF_ERROR(aggregate_for_sequence_column(block, static_cast<int>(num_rows),
1245
11
                                                      key_columns, seq_column, specified_rowsets,
1246
11
                                                      segment_caches));
1247
11
    }
1248
1249
    // 2. merge duplicate rows and handle insert after delete
1250
15
    if (block->rows() != num_rows) {
1251
9
        num_rows = block->rows();
1252
        // data in block has changed, should re-encode key columns, sequence column
1253
9
        _convertor->clear_source_content();
1254
9
        RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
1255
9
        RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
1256
9
    }
1257
15
    RETURN_IF_ERROR(aggregate_for_insert_after_delete(block, num_rows, key_columns,
1258
15
                                                      specified_rowsets, segment_caches));
1259
15
    return Status::OK();
1260
15
}
1261
1262
} // namespace doris