Coverage Report

Created: 2026-06-25 08:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/partial_update_info.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/partial_update_info.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
22
#include <cstdint>
23
#include <optional>
24
25
#include "common/consts.h"
26
#include "common/logging.h"
27
#include "core/assert_cast.h"
28
#include "core/block/block.h"
29
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
30
#include "core/value/bitmap_value.h"
31
#include "storage/iterator/olap_data_convertor.h"
32
#include "storage/olap_common.h"
33
#include "storage/rowset/rowset.h"
34
#include "storage/rowset/rowset_writer_context.h"
35
#include "storage/segment/historical_row_retriever.h"
36
#include "storage/segment/vertical_segment_writer.h"
37
#include "storage/tablet/base_tablet.h"
38
#include "storage/tablet/tablet_meta.h"
39
#include "storage/tablet/tablet_schema.h"
40
#include "storage/utils.h"
41
42
namespace doris {
43
namespace {
44
45
290
ColumnBitmap* get_mutable_skip_bitmap_column(Block* block, size_t skip_bitmap_col_idx) {
46
290
    auto skip_bitmap_column =
47
290
            IColumn::mutate(std::move(block->get_by_position(skip_bitmap_col_idx).column));
48
290
    auto* skip_bitmap_column_ptr = assert_cast<ColumnBitmap*>(skip_bitmap_column.get());
49
290
    block->replace_by_position(skip_bitmap_col_idx, std::move(skip_bitmap_column));
50
290
    return skip_bitmap_column_ptr;
51
290
}
52
53
} // namespace
54
55
Status PartialUpdateInfo::init(int64_t tablet_id, int64_t txn_id, const TabletSchema& tablet_schema,
56
                               UniqueKeyUpdateModePB unique_key_update_mode,
57
                               PartialUpdateNewRowPolicyPB policy,
58
                               const std::set<std::string>& partial_update_cols,
59
                               bool is_strict_mode_, int64_t timestamp_ms_, int32_t nano_seconds_,
60
                               const std::string& timezone_,
61
                               const std::string& auto_increment_column,
62
224k
                               int32_t sequence_map_col_uid, int64_t cur_max_version) {
63
224k
    partial_update_mode = unique_key_update_mode;
64
224k
    partial_update_new_key_policy = policy;
65
224k
    partial_update_input_columns = partial_update_cols;
66
224k
    max_version_in_flush_phase = cur_max_version;
67
224k
    sequence_map_col_unqiue_id = sequence_map_col_uid;
68
224k
    timestamp_ms = timestamp_ms_;
69
224k
    nano_seconds = nano_seconds_;
70
224k
    timezone = timezone_;
71
224k
    missing_cids.clear();
72
224k
    update_cids.clear();
73
74
224k
    if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
75
        // partial_update_cols should include all key columns
76
12.1k
        for (std::size_t i {0}; i < tablet_schema.num_key_columns(); i++) {
77
8.37k
            const auto key_col = tablet_schema.column(i);
78
8.37k
            if (!partial_update_cols.contains(key_col.name())) {
79
0
                auto msg = fmt::format(
80
0
                        "Unable to do partial update on shadow index's tablet, tablet_id={}, "
81
0
                        "txn_id={}. Missing key column {}.",
82
0
                        tablet_id, txn_id, key_col.name());
83
0
                LOG_WARNING(msg);
84
0
                return Status::Aborted<false>(msg);
85
0
            }
86
8.37k
        }
87
3.75k
    }
88
224k
    if (is_partial_update()) {
89
38.9k
        for (auto i = 0; i < tablet_schema.num_columns(); ++i) {
90
34.9k
            if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
91
31.0k
                auto tablet_column = tablet_schema.column(i);
92
31.0k
                if (!partial_update_input_columns.contains(tablet_column.name())) {
93
17.9k
                    missing_cids.emplace_back(i);
94
17.9k
                    if (!tablet_column.is_row_store_column() &&
95
17.9k
                        !tablet_column.has_default_value() && !tablet_column.is_nullable() &&
96
17.9k
                        tablet_schema.auto_increment_column() != tablet_column.name()) {
97
1.26k
                        can_insert_new_rows_in_partial_update = false;
98
1.26k
                    }
99
17.9k
                } else {
100
13.0k
                    update_cids.emplace_back(i);
101
13.0k
                }
102
31.0k
                if (auto_increment_column == tablet_column.name()) {
103
240
                    is_schema_contains_auto_inc_column = true;
104
240
                }
105
31.0k
            } else {
106
                // in flexible partial update, missing cids is all non sort keys' cid
107
3.88k
                if (i >= tablet_schema.num_key_columns()) {
108
3.61k
                    missing_cids.emplace_back(i);
109
3.61k
                }
110
3.88k
            }
111
34.9k
        }
112
4.01k
        _generate_default_values_for_missing_cids(tablet_schema);
113
4.01k
    }
114
224k
    is_strict_mode = is_strict_mode_;
115
224k
    is_input_columns_contains_auto_inc_column =
116
224k
            is_fixed_partial_update() &&
117
224k
            partial_update_input_columns.contains(auto_increment_column);
118
224k
    return Status::OK();
119
224k
}
120
121
452
void PartialUpdateInfo::to_pb(PartialUpdateInfoPB* partial_update_info_pb) const {
122
452
    partial_update_info_pb->set_partial_update_mode(partial_update_mode);
123
452
    partial_update_info_pb->set_partial_update_new_key_policy(partial_update_new_key_policy);
124
452
    partial_update_info_pb->set_max_version_in_flush_phase(max_version_in_flush_phase);
125
2.05k
    for (const auto& col : partial_update_input_columns) {
126
2.05k
        partial_update_info_pb->add_partial_update_input_columns(col);
127
2.05k
    }
128
2.19k
    for (auto cid : missing_cids) {
129
2.19k
        partial_update_info_pb->add_missing_cids(cid);
130
2.19k
    }
131
2.05k
    for (auto cid : update_cids) {
132
2.05k
        partial_update_info_pb->add_update_cids(cid);
133
2.05k
    }
134
452
    partial_update_info_pb->set_can_insert_new_rows_in_partial_update(
135
452
            can_insert_new_rows_in_partial_update);
136
452
    partial_update_info_pb->set_is_strict_mode(is_strict_mode);
137
452
    partial_update_info_pb->set_timestamp_ms(timestamp_ms);
138
452
    partial_update_info_pb->set_nano_seconds(nano_seconds);
139
452
    partial_update_info_pb->set_timezone(timezone);
140
452
    partial_update_info_pb->set_is_input_columns_contains_auto_inc_column(
141
452
            is_input_columns_contains_auto_inc_column);
142
452
    partial_update_info_pb->set_is_schema_contains_auto_inc_column(
143
452
            is_schema_contains_auto_inc_column);
144
2.19k
    for (const auto& value : default_values) {
145
2.19k
        partial_update_info_pb->add_default_values(value);
146
2.19k
    }
147
452
}
148
149
0
void PartialUpdateInfo::from_pb(PartialUpdateInfoPB* partial_update_info_pb) {
150
0
    if (!partial_update_info_pb->has_partial_update_mode()) {
151
        // for backward compatibility
152
0
        if (partial_update_info_pb->is_partial_update()) {
153
0
            partial_update_mode = UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS;
154
0
        } else {
155
0
            partial_update_mode = UniqueKeyUpdateModePB::UPSERT;
156
0
        }
157
0
    } else {
158
0
        partial_update_mode = partial_update_info_pb->partial_update_mode();
159
0
    }
160
0
    if (partial_update_info_pb->has_partial_update_new_key_policy()) {
161
0
        partial_update_new_key_policy = partial_update_info_pb->partial_update_new_key_policy();
162
0
    }
163
0
    max_version_in_flush_phase = partial_update_info_pb->has_max_version_in_flush_phase()
164
0
                                         ? partial_update_info_pb->max_version_in_flush_phase()
165
0
                                         : -1;
166
0
    partial_update_input_columns.clear();
167
0
    for (const auto& col : partial_update_info_pb->partial_update_input_columns()) {
168
0
        partial_update_input_columns.insert(col);
169
0
    }
170
0
    missing_cids.clear();
171
0
    for (auto cid : partial_update_info_pb->missing_cids()) {
172
0
        missing_cids.push_back(cid);
173
0
    }
174
0
    update_cids.clear();
175
0
    for (auto cid : partial_update_info_pb->update_cids()) {
176
0
        update_cids.push_back(cid);
177
0
    }
178
0
    can_insert_new_rows_in_partial_update =
179
0
            partial_update_info_pb->can_insert_new_rows_in_partial_update();
180
0
    is_strict_mode = partial_update_info_pb->is_strict_mode();
181
0
    timestamp_ms = partial_update_info_pb->timestamp_ms();
182
0
    timezone = partial_update_info_pb->timezone();
183
0
    is_input_columns_contains_auto_inc_column =
184
0
            partial_update_info_pb->is_input_columns_contains_auto_inc_column();
185
0
    is_schema_contains_auto_inc_column =
186
0
            partial_update_info_pb->is_schema_contains_auto_inc_column();
187
0
    if (partial_update_info_pb->has_nano_seconds()) {
188
0
        nano_seconds = partial_update_info_pb->nano_seconds();
189
0
    }
190
0
    default_values.clear();
191
0
    for (const auto& value : partial_update_info_pb->default_values()) {
192
0
        default_values.push_back(value);
193
0
    }
194
0
}
195
196
0
std::string PartialUpdateInfo::summary() const {
197
0
    std::string mode;
198
0
    switch (partial_update_mode) {
199
0
    case UniqueKeyUpdateModePB::UPSERT:
200
0
        mode = "upsert";
201
0
        break;
202
0
    case UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS:
203
0
        mode = "fixed partial update";
204
0
        break;
205
0
    case UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS:
206
0
        mode = "flexible partial update";
207
0
        break;
208
0
    }
209
0
    return fmt::format(
210
0
            "mode={}, update_cids={}, missing_cids={}, is_strict_mode={}, "
211
0
            "max_version_in_flush_phase={}",
212
0
            mode, update_cids.size(), missing_cids.size(), is_strict_mode,
213
0
            max_version_in_flush_phase);
214
0
}
215
216
Status PartialUpdateInfo::handle_new_key(const TabletSchema& tablet_schema,
217
                                         const std::function<std::string()>& line,
218
557
                                         BitmapValue* skip_bitmap) {
219
557
    switch (partial_update_new_key_policy) {
220
521
    case doris::PartialUpdateNewRowPolicyPB::APPEND: {
221
521
        if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FIXED_COLUMNS) {
222
237
            if (!can_insert_new_rows_in_partial_update) {
223
10
                std::string error_column;
224
18
                for (auto cid : missing_cids) {
225
18
                    const TabletColumn& col = tablet_schema.column(cid);
226
18
                    if (!col.is_row_store_column() && !col.has_default_value() &&
227
18
                        !col.is_nullable() &&
228
18
                        !(tablet_schema.auto_increment_column() == col.name())) {
229
10
                        error_column = col.name();
230
10
                        break;
231
10
                    }
232
18
                }
233
10
                return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
234
10
                        "the unmentioned column `{}` should have default value or be nullable "
235
10
                        "for newly inserted rows in non-strict mode partial update",
236
10
                        error_column);
237
10
            }
238
284
        } else if (partial_update_mode == UniqueKeyUpdateModePB::UPDATE_FLEXIBLE_COLUMNS) {
239
284
            DCHECK(skip_bitmap != nullptr);
240
284
            bool can_insert_new_row {true};
241
284
            std::string error_column;
242
4.05k
            for (auto cid : missing_cids) {
243
4.05k
                const TabletColumn& col = tablet_schema.column(cid);
244
4.05k
                if (!col.is_row_store_column() && skip_bitmap->contains(col.unique_id()) &&
245
4.05k
                    !col.has_default_value() && !col.is_nullable() && !col.is_auto_increment()) {
246
0
                    error_column = col.name();
247
0
                    can_insert_new_row = false;
248
0
                    break;
249
0
                }
250
4.05k
            }
251
284
            if (!can_insert_new_row) {
252
0
                return Status::Error<ErrorCode::INVALID_SCHEMA, false>(
253
0
                        "the unmentioned column `{}` should have default value or be "
254
0
                        "nullable for newly inserted rows in non-strict mode flexible partial "
255
0
                        "update",
256
0
                        error_column);
257
0
            }
258
284
        }
259
521
    } break;
260
511
    case doris::PartialUpdateNewRowPolicyPB::ERROR: {
261
36
        return Status::Error<ErrorCode::NEW_ROWS_IN_PARTIAL_UPDATE, false>(
262
36
                "Can't append new rows in partial update when partial_update_new_key_behavior is "
263
36
                "ERROR. Row with key=[{}] is not in table.",
264
36
                line());
265
521
    } break;
266
557
    }
267
512
    return Status::OK();
268
557
}
269
270
void PartialUpdateInfo::_generate_default_values_for_missing_cids(
271
4.01k
        const TabletSchema& tablet_schema) {
272
21.5k
    for (unsigned int cur_cid : missing_cids) {
273
21.5k
        const auto& column = tablet_schema.column(cur_cid);
274
21.5k
        if (column.has_default_value()) {
275
7.96k
            std::string default_value;
276
7.96k
            if (UNLIKELY((column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 ||
277
7.96k
                          column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) &&
278
7.96k
                         to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) !=
279
7.96k
                                 std::string::npos)) {
280
133
                auto pos = to_lower(column.default_value()).find('(');
281
133
                if (pos == std::string::npos) {
282
64
                    DateV2Value<DateTimeV2ValueType> dtv;
283
64
                    dtv.from_unixtime(timestamp_ms / 1000, timezone);
284
64
                    default_value = dtv.to_string();
285
64
                    if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) {
286
0
                        default_value += timezone;
287
0
                    }
288
69
                } else {
289
69
                    int precision = std::stoi(column.default_value().substr(pos + 1));
290
69
                    DateV2Value<DateTimeV2ValueType> dtv;
291
69
                    dtv.from_unixtime(timestamp_ms / 1000, nano_seconds, timezone, precision);
292
69
                    default_value = dtv.to_string();
293
69
                    if (column.type() == FieldType::OLAP_FIELD_TYPE_TIMESTAMPTZ) {
294
0
                        default_value += timezone;
295
0
                    }
296
69
                }
297
7.83k
            } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 &&
298
7.83k
                                to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) !=
299
7.83k
                                        std::string::npos)) {
300
79
                DateV2Value<DateV2ValueType> dv;
301
79
                dv.from_unixtime(timestamp_ms / 1000, timezone);
302
79
                default_value = dv.to_string();
303
7.75k
            } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_BITMAP &&
304
7.75k
                                to_lower(column.default_value()).find(to_lower("BITMAP_EMPTY")) !=
305
7.75k
                                        std::string::npos)) {
306
0
                BitmapValue v = BitmapValue {};
307
0
                default_value = v.to_string();
308
7.75k
            } else {
309
7.75k
                default_value = column.default_value();
310
7.75k
            }
311
7.96k
            default_values.emplace_back(default_value);
312
13.5k
        } else {
313
            // place an empty string here
314
13.5k
            default_values.emplace_back();
315
13.5k
        }
316
21.5k
    }
317
4.01k
    CHECK_EQ(missing_cids.size(), default_values.size());
318
4.01k
}
319
320
9
bool FixedReadPlan::empty() const {
321
9
    return plan.empty();
322
9
}
323
324
22.7k
void FixedReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) {
325
22.7k
    plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos);
326
22.7k
}
327
328
// read columns by read plan
329
// read_index: ori_pos-> block_idx
330
Status FixedReadPlan::read_columns_by_plan(
331
        const TabletSchema& tablet_schema, std::vector<uint32_t> cids_to_read,
332
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& block,
333
        std::map<uint32_t, uint32_t>* read_index, bool force_read_old_delete_signs,
334
1.49k
        const signed char* __restrict cur_delete_signs) const {
335
1.49k
    if (force_read_old_delete_signs) {
336
        // always read delete sign column from historical data
337
1.47k
        if (block.get_position_by_name(DELETE_SIGN) == -1) {
338
937
            auto del_col_cid = tablet_schema.field_index(DELETE_SIGN);
339
937
            cids_to_read.emplace_back(del_col_cid);
340
937
            block.swap(tablet_schema.create_block_by_cids(cids_to_read));
341
937
        }
342
1.47k
    }
343
1.49k
    bool has_row_column = tablet_schema.has_row_store_for_all_columns();
344
1.49k
    std::optional<Block::ScopedMutableColumns> mutable_columns_guard;
345
1.49k
    MutableColumns* mutable_columns = nullptr;
346
1.49k
    if (!has_row_column) {
347
1.25k
        mutable_columns_guard.emplace(block);
348
1.25k
        mutable_columns = &mutable_columns_guard->mutable_columns();
349
1.25k
    }
350
1.49k
    uint32_t read_idx = 0;
351
1.49k
    for (const auto& [rowset_id, segment_row_mappings] : plan) {
352
624
        for (const auto& [segment_id, mappings] : segment_row_mappings) {
353
624
            auto rowset_iter = rsid_to_rowset.find(rowset_id);
354
624
            CHECK(rowset_iter != rsid_to_rowset.end());
355
624
            std::vector<uint32_t> rids;
356
22.7k
            for (auto [rid, pos] : mappings) {
357
22.7k
                if (cur_delete_signs && cur_delete_signs[pos]) {
358
1
                    continue;
359
1
                }
360
22.7k
                rids.emplace_back(rid);
361
22.7k
                (*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
362
22.7k
            }
363
624
            if (has_row_column) {
364
178
                auto st = BaseTablet::fetch_value_through_row_column(
365
178
                        rowset_iter->second, tablet_schema, segment_id, rids, cids_to_read, block);
366
178
                if (!st.ok()) {
367
0
                    LOG(WARNING) << "failed to fetch value through row column";
368
0
                    return st;
369
0
                }
370
178
                continue;
371
178
            }
372
2.57k
            for (size_t cid = 0; cid < mutable_columns->size(); ++cid) {
373
2.12k
                TabletColumn tablet_column = tablet_schema.column(cids_to_read[cid]);
374
2.12k
                auto st = doris::BaseTablet::fetch_value_by_rowids(rowset_iter->second, segment_id,
375
2.12k
                                                                   rids, tablet_column,
376
2.12k
                                                                   (*mutable_columns)[cid]);
377
                // set read value to output block
378
2.12k
                if (!st.ok()) {
379
0
                    LOG(WARNING) << "failed to fetch value";
380
0
                    return st;
381
0
                }
382
2.12k
            }
383
446
        }
384
623
    }
385
1.49k
    return Status::OK();
386
1.49k
}
387
388
Status FixedReadPlan::fill_missing_columns(
389
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
390
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
391
        const TabletSchema& tablet_schema, Block& full_block,
392
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
393
1.45k
        uint32_t segment_start_pos, const Block* block) const {
394
1.45k
    auto mutable_full_columns_guard = full_block.mutate_columns_scoped();
395
1.45k
    auto& mutable_full_columns = mutable_full_columns_guard.mutable_columns();
396
    // create old value columns
397
1.45k
    DCHECK(historical_context.partial_update_info != nullptr);
398
1.45k
    DCHECK(historical_context.tablet_schema != nullptr);
399
1.45k
    const auto& partial_update_info = *historical_context.partial_update_info;
400
1.45k
    const auto& missing_cids = partial_update_info.missing_cids;
401
1.45k
    bool have_input_seq_column = false;
402
1.45k
    if (tablet_schema.has_sequence_col()) {
403
224
        const std::vector<uint32_t>& including_cids = partial_update_info.update_cids;
404
224
        have_input_seq_column =
405
224
                (std::find(including_cids.cbegin(), including_cids.cend(),
406
224
                           tablet_schema.sequence_col_idx()) != including_cids.cend());
407
224
    }
408
409
1.45k
    auto old_value_block = tablet_schema.create_block_by_cids(missing_cids);
410
1.45k
    CHECK_EQ(missing_cids.size(), old_value_block.columns());
411
412
    // segment pos to write -> rowid to read in old_value_block
413
1.45k
    std::map<uint32_t, uint32_t> read_index;
414
1.45k
    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset,
415
1.45k
                                         old_value_block, &read_index, true, nullptr));
416
417
1.45k
    const auto* old_delete_signs = BaseTablet::get_delete_sign_column_data(old_value_block);
418
1.45k
    if (old_delete_signs == nullptr) {
419
0
        return Status::InternalError("old delete signs column not found, block: {}",
420
0
                                     old_value_block.dump_structure());
421
0
    }
422
    // build default value columns
423
1.45k
    auto default_value_block = old_value_block.clone_empty();
424
1.45k
    RETURN_IF_ERROR(BaseTablet::generate_default_value_block(tablet_schema, missing_cids,
425
1.45k
                                                             partial_update_info.default_values,
426
1.45k
                                                             old_value_block, default_value_block));
427
1.45k
    auto mutable_default_value_columns_guard = default_value_block.mutate_columns_scoped();
428
1.45k
    auto& mutable_default_value_columns = mutable_default_value_columns_guard.mutable_columns();
429
430
    // fill all missing value from mutable_old_columns, need to consider default value and null value
431
26.0k
    for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
432
24.5k
        auto segment_pos = idx + segment_start_pos;
433
24.5k
        auto pos_in_old_block = read_index[segment_pos];
434
435
117k
        for (auto i = 0; i < missing_cids.size(); ++i) {
436
            // if the column has default value, fill it with default value
437
            // otherwise, if the column is nullable, fill it with null value
438
92.5k
            const auto& tablet_column = tablet_schema.column(missing_cids[i]);
439
92.5k
            auto& missing_col = mutable_full_columns[missing_cids[i]];
440
441
92.5k
            bool should_use_default = use_default_or_null_flag[idx];
442
92.5k
            if (!should_use_default) {
443
79.1k
                bool old_row_delete_sign =
444
79.1k
                        (old_delete_signs != nullptr && old_delete_signs[pos_in_old_block] != 0);
445
79.1k
                if (old_row_delete_sign) {
446
151
                    if (!tablet_schema.has_sequence_col()) {
447
70
                        should_use_default = true;
448
81
                    } else if (have_input_seq_column || (!tablet_column.is_seqeunce_col())) {
449
                        // to keep the sequence column value not decreasing, we should read values of seq column
450
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
451
                        // it may cause the merge-on-read based compaction to produce incorrect results
452
77
                        should_use_default = true;
453
77
                    }
454
151
                }
455
79.1k
            }
456
457
92.5k
            if (should_use_default) {
458
13.5k
                if (tablet_column.has_default_value()) {
459
3.03k
                    missing_col->insert_from(*mutable_default_value_columns[i], 0);
460
10.5k
                } else if (tablet_column.is_nullable()) {
461
9.01k
                    auto* nullable_column = assert_cast<ColumnNullable*>(missing_col.get());
462
9.01k
                    nullable_column->insert_many_defaults(1);
463
9.01k
                } else if (tablet_schema.auto_increment_column() == tablet_column.name()) {
464
41
                    const auto& column = *DORIS_TRY(
465
41
                            historical_context.tablet_schema->column(tablet_column.name()));
466
41
                    DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
467
41
                    auto* auto_inc_column = assert_cast<ColumnInt64*>(missing_col.get());
468
41
                    int pos = block->get_position_by_name(BeConsts::PARTIAL_UPDATE_AUTO_INC_COL);
469
41
                    if (pos == -1) {
470
0
                        return Status::InternalError("auto increment column not found in block {}",
471
0
                                                     block->dump_structure());
472
0
                    }
473
41
                    auto_inc_column->insert_from(*block->get_by_position(pos).column.get(), idx);
474
1.49k
                } else {
475
                    // If the control flow reaches this branch, the column neither has default value
476
                    // nor is nullable. It means that the row's delete sign is marked, and the value
477
                    // columns are useless and won't be read. So we can just put arbitary values in the cells
478
1.49k
                    missing_col->insert_default();
479
1.49k
                }
480
79.0k
            } else {
481
79.0k
                missing_col->insert_from(*old_value_block.get_by_position(i).column,
482
79.0k
                                         pos_in_old_block);
483
79.0k
            }
484
92.5k
        }
485
24.5k
    }
486
1.45k
    return Status::OK();
487
1.45k
}
488
489
void FlexibleReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos,
490
1.78k
                                       const BitmapValue& skip_bitmap) {
491
1.78k
    if (!use_row_store) {
492
3.92k
        for (uint64_t col_uid : skip_bitmap) {
493
3.92k
            plan[row_location.rowset_id][row_location.segment_id][static_cast<uint32_t>(col_uid)]
494
3.92k
                    .emplace_back(row_location.row_id, pos);
495
3.92k
        }
496
933
    } else {
497
856
        row_store_plan[row_location.rowset_id][row_location.segment_id].emplace_back(
498
856
                row_location.row_id, pos);
499
856
    }
500
1.78k
}
501
502
Status FlexibleReadPlan::read_columns_by_plan(
503
        const TabletSchema& tablet_schema,
504
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block,
505
145
        std::map<uint32_t, std::map<uint32_t, uint32_t>>* read_index) const {
506
145
    auto mutable_columns_guard = old_value_block.mutate_columns_scoped();
507
145
    auto& mutable_columns = mutable_columns_guard.mutable_columns();
508
509
    // cid -> next rid to fill in block
510
145
    std::map<uint32_t, uint32_t> next_read_idx;
511
2.16k
    for (uint32_t cid {0}; cid < tablet_schema.num_columns(); cid++) {
512
2.02k
        next_read_idx[cid] = 0;
513
2.02k
    }
514
515
180
    for (const auto& [rowset_id, segment_mappings] : plan) {
516
180
        for (const auto& [segment_id, uid_mappings] : segment_mappings) {
517
1.49k
            for (const auto& [col_uid, mappings] : uid_mappings) {
518
1.49k
                auto rowset_iter = rsid_to_rowset.find(rowset_id);
519
1.49k
                CHECK(rowset_iter != rsid_to_rowset.end());
520
1.49k
                auto cid = tablet_schema.field_index(col_uid);
521
1.49k
                DCHECK_NE(cid, -1);
522
1.49k
                DCHECK_GE(cid, tablet_schema.num_key_columns());
523
1.49k
                std::vector<uint32_t> rids;
524
3.91k
                for (auto [rid, pos] : mappings) {
525
3.91k
                    rids.emplace_back(rid);
526
3.91k
                    (*read_index)[cid][static_cast<uint32_t>(pos)] = next_read_idx[cid]++;
527
3.91k
                }
528
529
1.49k
                TabletColumn tablet_column = tablet_schema.column(cid);
530
1.49k
                auto idx = cid - tablet_schema.num_key_columns();
531
1.49k
                RETURN_IF_ERROR(doris::BaseTablet::fetch_value_by_rowids(
532
1.49k
                        rowset_iter->second, segment_id, rids, tablet_column,
533
1.49k
                        mutable_columns[idx]));
534
1.49k
            }
535
180
        }
536
180
    }
537
    // !!!ATTENTION!!!: columns in block may have different size because every row has different columns to update
538
145
    return Status::OK();
539
145
}
540
541
Status FlexibleReadPlan::read_columns_by_plan(
542
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& cids_to_read,
543
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset, Block& old_value_block,
544
119
        std::map<uint32_t, uint32_t>* read_index) const {
545
119
    DCHECK(use_row_store);
546
119
    uint32_t read_idx = 0;
547
158
    for (const auto& [rowset_id, segment_row_mappings] : row_store_plan) {
548
158
        for (const auto& [segment_id, mappings] : segment_row_mappings) {
549
158
            auto rowset_iter = rsid_to_rowset.find(rowset_id);
550
158
            CHECK(rowset_iter != rsid_to_rowset.end());
551
158
            std::vector<uint32_t> rids;
552
859
            for (auto [rid, pos] : mappings) {
553
859
                rids.emplace_back(rid);
554
859
                (*read_index)[static_cast<uint32_t>(pos)] = read_idx++;
555
859
            }
556
158
            auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second, tablet_schema,
557
158
                                                                 segment_id, rids, cids_to_read,
558
158
                                                                 old_value_block);
559
158
            if (!st.ok()) {
560
0
                LOG(WARNING) << "failed to fetch value through row column";
561
0
                return st;
562
0
            }
563
158
        }
564
158
    }
565
119
    return Status::OK();
566
119
}
567
568
Status FlexibleReadPlan::fill_non_primary_key_columns(
569
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
570
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
571
        const TabletSchema& tablet_schema, Block& full_block,
572
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
573
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
574
264
        std::vector<BitmapValue>* skip_bitmaps) const {
575
264
    auto mutable_full_columns_guard = full_block.mutate_columns_scoped();
576
264
    auto& mutable_full_columns = mutable_full_columns_guard.mutable_columns();
577
264
    DCHECK(historical_context.partial_update_info != nullptr);
578
579
    // missing_cids are all non sort key columns' cids
580
264
    const auto& non_sort_key_cids = historical_context.partial_update_info->missing_cids;
581
264
    auto old_value_block = tablet_schema.create_block_by_cids(non_sort_key_cids);
582
264
    CHECK_EQ(non_sort_key_cids.size(), old_value_block.columns());
583
584
264
    if (!use_row_store) {
585
145
        RETURN_IF_ERROR(fill_non_primary_key_columns_for_column_store(
586
145
                historical_context, rsid_to_rowset, tablet_schema, non_sort_key_cids,
587
145
                old_value_block, mutable_full_columns, use_default_or_null_flag,
588
145
                has_default_or_nullable, segment_start_pos, block_start_pos, block, skip_bitmaps));
589
145
    } else {
590
119
        RETURN_IF_ERROR(fill_non_primary_key_columns_for_row_store(
591
119
                historical_context, rsid_to_rowset, tablet_schema, non_sort_key_cids,
592
119
                old_value_block, mutable_full_columns, use_default_or_null_flag,
593
119
                has_default_or_nullable, segment_start_pos, block_start_pos, block, skip_bitmaps));
594
119
    }
595
264
    return Status::OK();
596
264
}
597
598
static void fill_non_primary_key_cell_for_column_store(
599
        const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col,
600
        const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col,
601
        std::size_t block_pos, uint32_t segment_pos, bool skipped, bool row_has_sequence_col,
602
        bool use_default, const signed char* delete_sign_column_data,
603
        const TabletSchema& tablet_schema,
604
        std::map<uint32_t, std::map<uint32_t, uint32_t>>& read_index,
605
16.9k
        const PartialUpdateInfo* info) {
606
16.9k
    if (skipped) {
607
4.78k
        DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
608
4.78k
        DCHECK(cid != tablet_schema.version_col_idx());
609
4.78k
        DCHECK(!tablet_column.is_row_store_column());
610
611
4.78k
        if (!use_default) {
612
3.91k
            if (delete_sign_column_data != nullptr) {
613
3.91k
                bool old_row_delete_sign = false;
614
3.91k
                if (auto it = read_index[tablet_schema.delete_sign_idx()].find(segment_pos);
615
3.91k
                    it != read_index[tablet_schema.delete_sign_idx()].end()) {
616
3.79k
                    old_row_delete_sign = (delete_sign_column_data[it->second] != 0);
617
3.79k
                }
618
619
3.91k
                if (old_row_delete_sign) {
620
7
                    if (!tablet_schema.has_sequence_col()) {
621
2
                        use_default = true;
622
5
                    } else if (row_has_sequence_col ||
623
5
                               (!tablet_column.is_seqeunce_col() &&
624
5
                                (tablet_column.unique_id() != info->sequence_map_col_uid()))) {
625
                        // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
626
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
627
                        // it may cause the merge-on-read based compaction to produce incorrect results
628
3
                        use_default = true;
629
3
                    }
630
7
                }
631
3.91k
            }
632
3.91k
        }
633
4.78k
        if (!use_default && tablet_column.is_on_update_current_timestamp()) {
634
6
            use_default = true;
635
6
        }
636
4.78k
        if (use_default) {
637
873
            if (tablet_column.has_default_value()) {
638
424
                new_col->insert_from(default_value_col, 0);
639
449
            } else if (tablet_column.is_nullable()) {
640
426
                assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get())
641
426
                        ->insert_many_defaults(1);
642
426
            } else if (tablet_column.is_auto_increment()) {
643
                // In flexible partial update, the skip bitmap indicates whether a cell
644
                // is specified in the original load, so the generated auto-increment value is filled
645
                // in current block in place if needed rather than using a seperate column to
646
                // store the generated auto-increment value in fixed partial update
647
2
                new_col->insert_from(cur_col, block_pos);
648
21
            } else {
649
21
                new_col->insert_default();
650
21
            }
651
3.90k
        } else {
652
3.90k
            auto pos_in_old_block = read_index.at(cid).at(segment_pos);
653
3.90k
            new_col->insert_from(old_value_col, pos_in_old_block);
654
3.90k
        }
655
12.1k
    } else {
656
12.1k
        new_col->insert_from(cur_col, block_pos);
657
12.1k
    }
658
16.9k
}
659
660
Status FlexibleReadPlan::fill_non_primary_key_columns_for_column_store(
661
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
662
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
663
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
664
        Block& old_value_block, MutableColumns& mutable_full_columns,
665
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
666
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
667
145
        std::vector<BitmapValue>* skip_bitmaps) const {
668
145
    auto* info = historical_context.partial_update_info.get();
669
145
    int32_t seq_col_unique_id = -1;
670
145
    if (tablet_schema.has_sequence_col()) {
671
22
        seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
672
22
    }
673
    // cid -> segment pos to write -> rowid to read in old_value_block
674
145
    std::map<uint32_t, std::map<uint32_t, uint32_t>> read_index;
675
145
    RETURN_IF_ERROR(
676
145
            read_columns_by_plan(tablet_schema, rsid_to_rowset, old_value_block, &read_index));
677
    // !!!ATTENTION!!!: columns in old_value_block may have different size because every row has different columns to update
678
679
145
    const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
680
    // build default value columns
681
145
    auto default_value_block = old_value_block.clone_empty();
682
145
    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
683
145
        RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
684
145
                tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
685
145
                default_value_block));
686
145
    }
687
688
    // fill all non sort key columns from mutable_old_columns, need to consider default value and null value
689
2.00k
    for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
690
1.86k
        auto cid = non_sort_key_cids[i];
691
1.86k
        const auto& tablet_column = tablet_schema.column(cid);
692
1.86k
        auto col_uid = tablet_column.unique_id();
693
18.8k
        for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
694
16.9k
            auto segment_pos = segment_start_pos + idx;
695
16.9k
            auto block_pos = block_start_pos + idx;
696
697
16.9k
            fill_non_primary_key_cell_for_column_store(
698
16.9k
                    tablet_column, cid, mutable_full_columns[cid],
699
16.9k
                    *default_value_block.get_by_position(i).column,
700
16.9k
                    *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column,
701
16.9k
                    block_pos, segment_pos, skip_bitmaps->at(block_pos).contains(col_uid),
702
16.9k
                    tablet_schema.has_sequence_col()
703
16.9k
                            ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
704
16.9k
                            : false,
705
16.9k
                    use_default_or_null_flag[idx], delete_sign_column_data, tablet_schema,
706
16.9k
                    read_index, info);
707
16.9k
        }
708
1.86k
    }
709
145
    return Status::OK();
710
145
}
711
712
static void fill_non_primary_key_cell_for_row_store(
713
        const TabletColumn& tablet_column, uint32_t cid, MutableColumnPtr& new_col,
714
        const IColumn& default_value_col, const IColumn& old_value_col, const IColumn& cur_col,
715
        std::size_t block_pos, bool skipped, bool row_has_sequence_col, bool use_default,
716
        const signed char* delete_sign_column_data, uint32_t pos_in_old_block,
717
16.5k
        const TabletSchema& tablet_schema, const PartialUpdateInfo* info) {
718
16.5k
    if (skipped) {
719
4.16k
        DCHECK(cid != tablet_schema.skip_bitmap_col_idx());
720
4.16k
        DCHECK(cid != tablet_schema.version_col_idx());
721
4.16k
        DCHECK(!tablet_column.is_row_store_column());
722
4.16k
        if (!use_default) {
723
3.64k
            if (delete_sign_column_data != nullptr) {
724
3.64k
                bool old_row_delete_sign = (delete_sign_column_data[pos_in_old_block] != 0);
725
3.64k
                if (old_row_delete_sign) {
726
7
                    if (!tablet_schema.has_sequence_col()) {
727
2
                        use_default = true;
728
5
                    } else if (row_has_sequence_col ||
729
5
                               (!tablet_column.is_seqeunce_col() &&
730
5
                                (tablet_column.unique_id() != info->sequence_map_col_uid()))) {
731
                        // to keep the sequence column value not decreasing, we should read values of seq column(and seq map column)
732
                        // from old rows even if the old row is deleted when the input don't specify the sequence column, otherwise
733
                        // it may cause the merge-on-read based compaction to produce incorrect results
734
3
                        use_default = true;
735
3
                    }
736
7
                }
737
3.64k
            }
738
3.64k
        }
739
740
4.16k
        if (!use_default && tablet_column.is_on_update_current_timestamp()) {
741
6
            use_default = true;
742
6
        }
743
4.16k
        if (use_default) {
744
529
            if (tablet_column.has_default_value()) {
745
175
                new_col->insert_from(default_value_col, 0);
746
354
            } else if (tablet_column.is_nullable()) {
747
350
                assert_cast<ColumnNullable*, TypeCheckOnRelease::DISABLE>(new_col.get())
748
350
                        ->insert_many_defaults(1);
749
350
            } else if (tablet_column.is_auto_increment()) {
750
                // In flexible partial update, the skip bitmap indicates whether a cell
751
                // is specified in the original load, so the generated auto-increment value is filled
752
                // in current block in place if needed rather than using a seperate column to
753
                // store the generated auto-increment value in fixed partial update
754
2
                new_col->insert_from(cur_col, block_pos);
755
2
            } else {
756
2
                new_col->insert_default();
757
2
            }
758
3.63k
        } else {
759
3.63k
            new_col->insert_from(old_value_col, pos_in_old_block);
760
3.63k
        }
761
12.3k
    } else {
762
12.3k
        new_col->insert_from(cur_col, block_pos);
763
12.3k
    }
764
16.5k
}
765
766
Status FlexibleReadPlan::fill_non_primary_key_columns_for_row_store(
767
        const segment_v2::HistoricalRowRetrieverContext& historical_context,
768
        const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
769
        const TabletSchema& tablet_schema, const std::vector<uint32_t>& non_sort_key_cids,
770
        Block& old_value_block, MutableColumns& mutable_full_columns,
771
        const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
772
        uint32_t segment_start_pos, uint32_t block_start_pos, const Block* block,
773
119
        std::vector<BitmapValue>* skip_bitmaps) const {
774
119
    auto* info = historical_context.partial_update_info.get();
775
119
    int32_t seq_col_unique_id = -1;
776
119
    if (tablet_schema.has_sequence_col()) {
777
3
        seq_col_unique_id = tablet_schema.column(tablet_schema.sequence_col_idx()).unique_id();
778
3
    }
779
    // segment pos to write -> rowid to read in old_value_block
780
119
    std::map<uint32_t, uint32_t> read_index;
781
119
    RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, non_sort_key_cids, rsid_to_rowset,
782
119
                                         old_value_block, &read_index));
783
784
119
    const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block);
785
    // build default value columns
786
119
    auto default_value_block = old_value_block.clone_empty();
787
119
    if (has_default_or_nullable || delete_sign_column_data != nullptr) {
788
119
        RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
789
119
                tablet_schema, non_sort_key_cids, info->default_values, old_value_block,
790
119
                default_value_block));
791
119
    }
792
793
    // fill all non sort key columns from mutable_old_columns, need to consider default value and null value
794
1.87k
    for (std::size_t i {0}; i < non_sort_key_cids.size(); i++) {
795
1.75k
        auto cid = non_sort_key_cids[i];
796
1.75k
        const auto& tablet_column = tablet_schema.column(cid);
797
1.75k
        auto col_uid = tablet_column.unique_id();
798
18.2k
        for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) {
799
16.5k
            auto segment_pos = segment_start_pos + idx;
800
16.5k
            auto block_pos = block_start_pos + idx;
801
16.5k
            auto pos_in_old_block = read_index[segment_pos];
802
803
16.5k
            fill_non_primary_key_cell_for_row_store(
804
16.5k
                    tablet_column, cid, mutable_full_columns[cid],
805
16.5k
                    *default_value_block.get_by_position(i).column,
806
16.5k
                    *old_value_block.get_by_position(i).column, *block->get_by_position(cid).column,
807
16.5k
                    block_pos, skip_bitmaps->at(block_pos).contains(col_uid),
808
16.5k
                    tablet_schema.has_sequence_col()
809
16.5k
                            ? !skip_bitmaps->at(block_pos).contains(seq_col_unique_id)
810
16.5k
                            : false,
811
16.5k
                    use_default_or_null_flag[idx], delete_sign_column_data, pos_in_old_block,
812
16.5k
                    tablet_schema, info);
813
16.5k
        }
814
1.75k
    }
815
119
    return Status::OK();
816
119
}
817
818
BlockAggregator::BlockAggregator(segment_v2::VerticalSegmentWriter& vertical_segment_writer)
819
73.2k
        : _writer(vertical_segment_writer), _tablet_schema(*_writer._tablet_schema) {}
820
821
void BlockAggregator::merge_one_row(MutableBlock& dst_block, Block* src_block, int rid,
822
153
                                    BitmapValue& skip_bitmap) {
823
1.57k
    for (size_t cid {_tablet_schema.num_key_columns()}; cid < _tablet_schema.num_columns(); cid++) {
824
1.42k
        if (cid == _tablet_schema.skip_bitmap_col_idx()) {
825
153
            auto& cur_skip_bitmap =
826
153
                    assert_cast<ColumnBitmap*>(dst_block.mutable_columns()[cid].get())
827
153
                            ->get_data()
828
153
                            .back();
829
153
            const auto& new_row_skip_bitmap =
830
153
                    assert_cast<const ColumnBitmap*>(src_block->get_by_position(cid).column.get())
831
153
                            ->get_data()[rid];
832
153
            cur_skip_bitmap &= new_row_skip_bitmap;
833
153
            continue;
834
153
        }
835
1.27k
        if (!skip_bitmap.contains(_tablet_schema.column(cid).unique_id())) {
836
460
            dst_block.mutable_columns()[cid]->pop_back(1);
837
460
            dst_block.mutable_columns()[cid]->insert_from(*src_block->get_by_position(cid).column,
838
460
                                                          rid);
839
460
        }
840
1.27k
    }
841
153
    VLOG_DEBUG << fmt::format("merge a row, after merge, output_block.rows()={}, state: {}",
842
0
                              dst_block.rows(), _state.to_string());
843
153
}
844
845
252
void BlockAggregator::append_one_row(MutableBlock& dst_block, Block* src_block, int rid) {
846
252
    dst_block.add_row(src_block, rid);
847
252
    _state.rows++;
848
252
    VLOG_DEBUG << fmt::format("append a new row, after append, output_block.rows()={}, state: {}",
849
0
                              dst_block.rows(), _state.to_string());
850
252
}
851
852
123
void BlockAggregator::remove_last_n_rows(MutableBlock& dst_block, int n) {
853
123
    if (n > 0) {
854
968
        for (size_t cid {0}; cid < _tablet_schema.num_columns(); cid++) {
855
880
            DCHECK_GE(dst_block.mutable_columns()[cid]->size(), n);
856
880
            dst_block.mutable_columns()[cid]->pop_back(n);
857
880
        }
858
88
    }
859
123
}
860
861
void BlockAggregator::append_or_merge_row(MutableBlock& dst_block, Block* src_block, int rid,
862
405
                                          BitmapValue& skip_bitmap, bool have_delete_sign) {
863
405
    if (have_delete_sign) {
864
        // remove all the previous batched rows
865
123
        remove_last_n_rows(dst_block, _state.rows);
866
123
        _state.rows = 0;
867
123
        _state.has_row_with_delete_sign = true;
868
869
123
        append_one_row(dst_block, src_block, rid);
870
282
    } else {
871
282
        if (_state.should_merge()) {
872
153
            merge_one_row(dst_block, src_block, rid, skip_bitmap);
873
153
        } else {
874
129
            append_one_row(dst_block, src_block, rid);
875
129
        }
876
282
    }
877
405
};
878
879
Status BlockAggregator::aggregate_rows(
880
        MutableBlock& output_block, Block* block, int start, int end, std::string key,
881
        std::vector<BitmapValue>* skip_bitmaps, const signed char* delete_signs,
882
        IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets,
883
133
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
884
133
    VLOG_DEBUG << fmt::format("merge rows in range=[{}-{})", start, end);
885
133
    if (end - start == 1) {
886
34
        output_block.add_row(block, start);
887
34
        VLOG_DEBUG << fmt::format("append a row directly, rid={}", start);
888
34
        return Status::OK();
889
34
    }
890
891
99
    auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
892
99
    auto delete_sign_col_unique_id =
893
99
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
894
895
99
    _state.reset();
896
897
99
    RowLocation loc;
898
99
    RowsetSharedPtr rowset;
899
99
    std::string previous_encoded_seq_value {};
900
99
    Status st = _writer._tablet->lookup_row_key(
901
99
            key, &_tablet_schema, false, specified_rowsets, &loc, _writer._mow_context->max_version,
902
99
            segment_caches, &rowset, true, &previous_encoded_seq_value);
903
99
    int pos = start;
904
99
    bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
905
99
    DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
906
0
            << "[BlockAggregator::aggregate_rows] unexpected error status while lookup_row_key:"
907
0
            << st;
908
99
    if (!is_expected_st) {
909
0
        return st;
910
0
    }
911
912
99
    std::string cur_seq_val;
913
99
    if (st.ok()) {
914
97
        for (pos = start; pos < end; pos++) {
915
97
            auto& skip_bitmap = skip_bitmaps->at(pos);
916
97
            bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
917
            // Discard all the rows whose seq value is smaller than previous_encoded_seq_value.
918
97
            if (row_has_sequence_col) {
919
56
                std::string seq_val {};
920
56
                _writer._encode_seq_column(seq_column, pos, &seq_val);
921
56
                if (Slice {seq_val}.compare(Slice {previous_encoded_seq_value}) < 0) {
922
19
                    continue;
923
19
                }
924
37
                cur_seq_val = std::move(seq_val);
925
37
                break;
926
56
            }
927
41
            cur_seq_val = std::move(previous_encoded_seq_value);
928
41
            break;
929
97
        }
930
78
    } else {
931
21
        pos = start;
932
21
        auto& skip_bitmap = skip_bitmaps->at(pos);
933
21
        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
934
21
        if (row_has_sequence_col) {
935
15
            std::string seq_val {};
936
            // for rows that don't specify seqeunce col, seq_val will be encoded to minial value
937
15
            _writer._encode_seq_column(seq_column, pos, &seq_val);
938
15
            cur_seq_val = std::move(seq_val);
939
15
        } else {
940
6
            cur_seq_val.clear();
941
6
            RETURN_IF_ERROR(_writer._generate_encoded_default_seq_value(
942
6
                    _tablet_schema, *_writer._opts.rowset_ctx->partial_update_info, &cur_seq_val));
943
6
        }
944
21
    }
945
946
562
    for (int rid {pos}; rid < end; rid++) {
947
463
        auto& skip_bitmap = skip_bitmaps->at(rid);
948
463
        bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
949
463
        bool have_delete_sign =
950
463
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[rid] != 0);
951
463
        if (!row_has_sequence_col) {
952
193
            append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign);
953
270
        } else {
954
270
            std::string seq_val {};
955
270
            _writer._encode_seq_column(seq_column, rid, &seq_val);
956
270
            if (Slice {seq_val}.compare(Slice {cur_seq_val}) >= 0) {
957
212
                append_or_merge_row(output_block, block, rid, skip_bitmap, have_delete_sign);
958
212
                cur_seq_val = std::move(seq_val);
959
212
            } else {
960
58
                VLOG_DEBUG << fmt::format(
961
0
                        "skip rid={} becasue its seq value is lower than the previous", rid);
962
58
            }
963
270
        }
964
463
    }
965
99
    return Status::OK();
966
99
};
967
968
Status BlockAggregator::aggregate_for_sequence_column(
969
        Block* block, int num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
970
        IOlapColumnDataAccessor* seq_column, const std::vector<RowsetSharedPtr>& specified_rowsets,
971
25
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
972
25
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
973
    // the process logic here is the same as MemTable::_aggregate_for_flexible_partial_update_without_seq_col()
974
    // after this function, there will be at most 2 rows for a specified key
975
25
    std::vector<BitmapValue>* skip_bitmaps =
976
25
            &get_mutable_skip_bitmap_column(block, _tablet_schema.skip_bitmap_col_idx())
977
25
                     ->get_data();
978
25
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
979
980
25
    auto filtered_block = _tablet_schema.create_block();
981
25
    MutableBlock output_block = MutableBlock::build_mutable_block(std::move(filtered_block));
982
983
25
    int same_key_rows {0};
984
25
    std::string previous_key {};
985
541
    for (int block_pos {0}; block_pos < num_rows; block_pos++) {
986
516
        std::string key = _writer._full_encode_keys(key_columns, block_pos);
987
516
        if (block_pos > 0 && previous_key == key) {
988
383
            same_key_rows++;
989
383
        } else {
990
133
            if (same_key_rows > 0) {
991
108
                RETURN_IF_ERROR(aggregate_rows(output_block, block, block_pos - same_key_rows,
992
108
                                               block_pos, std::move(previous_key), skip_bitmaps,
993
108
                                               delete_signs, seq_column, specified_rowsets,
994
108
                                               segment_caches));
995
108
            }
996
133
            same_key_rows = 1;
997
133
        }
998
516
        previous_key = std::move(key);
999
516
    }
1000
25
    if (same_key_rows > 0) {
1001
25
        RETURN_IF_ERROR(aggregate_rows(output_block, block, num_rows - same_key_rows, num_rows,
1002
25
                                       std::move(previous_key), skip_bitmaps, delete_signs,
1003
25
                                       seq_column, specified_rowsets, segment_caches));
1004
25
    }
1005
1006
25
    block->swap(output_block.to_block());
1007
25
    return Status::OK();
1008
25
}
1009
1010
Status BlockAggregator::fill_sequence_column(Block* block, size_t num_rows,
1011
                                             const FixedReadPlan& read_plan,
1012
4
                                             std::vector<BitmapValue>& skip_bitmaps) {
1013
4
    DCHECK(_tablet_schema.has_sequence_col());
1014
4
    std::vector<uint32_t> cids {static_cast<uint32_t>(_tablet_schema.sequence_col_idx())};
1015
4
    auto seq_col_unique_id = _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id();
1016
1017
4
    auto seq_col_block = _tablet_schema.create_block_by_cids(cids);
1018
4
    auto tmp_block = _tablet_schema.create_block_by_cids(cids);
1019
4
    std::map<uint32_t, uint32_t> read_index;
1020
4
    RETURN_IF_ERROR(read_plan.read_columns_by_plan(_tablet_schema, cids, _writer._rsid_to_rowset,
1021
4
                                                   seq_col_block, &read_index, false));
1022
1023
4
    auto new_seq_col_ptr = tmp_block.get_by_position(0).column->assert_mutable();
1024
4
    const auto& old_seq_col_ptr = *seq_col_block.get_by_position(0).column;
1025
4
    const auto& cur_seq_col_ptr = *block->get_by_position(_tablet_schema.sequence_col_idx()).column;
1026
80
    for (uint32_t block_pos {0}; block_pos < num_rows; block_pos++) {
1027
76
        if (read_index.contains(block_pos)) {
1028
14
            new_seq_col_ptr->insert_from(old_seq_col_ptr, read_index[block_pos]);
1029
14
            skip_bitmaps[block_pos].remove(seq_col_unique_id);
1030
62
        } else {
1031
62
            new_seq_col_ptr->insert_from(cur_seq_col_ptr, block_pos);
1032
62
        }
1033
76
    }
1034
4
    block->replace_by_position(_tablet_schema.sequence_col_idx(), std::move(new_seq_col_ptr));
1035
4
    return Status::OK();
1036
4
}
1037
1038
Status BlockAggregator::aggregate_for_insert_after_delete(
1039
        Block* block, size_t num_rows, const std::vector<IOlapColumnDataAccessor*>& key_columns,
1040
        const std::vector<RowsetSharedPtr>& specified_rowsets,
1041
265
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1042
265
    DCHECK_EQ(block->columns(), _tablet_schema.num_columns());
1043
    // there will be at most 2 rows for a specified key in block when control flow reaches here
1044
    // after this function, there will not be duplicate rows in block
1045
1046
265
    std::vector<BitmapValue>* skip_bitmaps =
1047
265
            &get_mutable_skip_bitmap_column(block, _tablet_schema.skip_bitmap_col_idx())
1048
265
                     ->get_data();
1049
265
    const auto* delete_signs = BaseTablet::get_delete_sign_column_data(*block, num_rows);
1050
1051
265
    auto filter_column = ColumnUInt8::create(num_rows, 1);
1052
265
    auto* __restrict filter_map = filter_column->get_data().data();
1053
265
    std::string previous_key {};
1054
265
    bool previous_has_delete_sign {false};
1055
265
    int duplicate_rows {0};
1056
265
    int32_t delete_sign_col_unique_id =
1057
265
            _tablet_schema.column(_tablet_schema.delete_sign_idx()).unique_id();
1058
265
    auto seq_col_unique_id =
1059
265
            (_tablet_schema.sequence_col_idx() != -1)
1060
265
                    ? _tablet_schema.column(_tablet_schema.sequence_col_idx()).unique_id()
1061
265
                    : -1;
1062
265
    FixedReadPlan read_plan;
1063
2.45k
    for (size_t block_pos {0}; block_pos < num_rows; block_pos++) {
1064
2.19k
        size_t delta_pos = block_pos;
1065
2.19k
        auto& skip_bitmap = skip_bitmaps->at(block_pos);
1066
2.19k
        std::string key = _writer._full_encode_keys(key_columns, delta_pos);
1067
2.19k
        bool have_delete_sign =
1068
2.19k
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);
1069
2.19k
        if (delta_pos > 0 && previous_key == key) {
1070
            // !!ATTENTION!!: We can only remove the row with delete sign if there is a insert with the same key after this row.
1071
            // If there is only a row with delete sign, we should keep it and can't remove it from block, because
1072
            // compaction will not use the delete bitmap when reading data. So there may still be rows with delete sign
1073
            // in later process
1074
43
            DCHECK(previous_has_delete_sign);
1075
43
            DCHECK(!have_delete_sign);
1076
43
            ++duplicate_rows;
1077
43
            RowLocation loc;
1078
43
            RowsetSharedPtr rowset;
1079
43
            Status st = _writer._tablet->lookup_row_key(
1080
43
                    key, &_tablet_schema, false, specified_rowsets, &loc,
1081
43
                    _writer._mow_context->max_version, segment_caches, &rowset, true);
1082
43
            bool is_expected_st = (st.is<ErrorCode::KEY_NOT_FOUND>() || st.ok());
1083
43
            DCHECK(is_expected_st || st.is<ErrorCode::MEM_LIMIT_EXCEEDED>())
1084
0
                    << "[BlockAggregator::aggregate_for_insert_after_delete] unexpected error "
1085
0
                       "status while lookup_row_key:"
1086
0
                    << st;
1087
43
            if (!is_expected_st) {
1088
0
                return st;
1089
0
            }
1090
1091
43
            Slice previous_seq_slice {};
1092
43
            if (st.ok()) {
1093
38
                if (_tablet_schema.has_sequence_col()) {
1094
                    // if the insert row doesn't specify the sequence column, we need to
1095
                    // read the historical's sequence column value so that we don't need
1096
                    // to handle seqeunce column in append_block_with_flexible_content()
1097
                    // for this row
1098
34
                    bool row_has_sequence_col = (!skip_bitmap.contains(seq_col_unique_id));
1099
34
                    if (!row_has_sequence_col) {
1100
14
                        read_plan.prepare_to_read(loc, block_pos);
1101
14
                        _writer._rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
1102
14
                    }
1103
34
                }
1104
                // delete the existing row
1105
38
                _writer._mow_context->delete_bitmap->add(
1106
38
                        {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
1107
38
                        loc.row_id);
1108
38
            }
1109
            // and remove the row with delete sign from the current block
1110
43
            filter_map[block_pos - 1] = 0;
1111
43
        }
1112
2.19k
        previous_has_delete_sign = have_delete_sign;
1113
2.19k
        previous_key = std::move(key);
1114
2.19k
    }
1115
265
    if (duplicate_rows > 0) {
1116
9
        if (!read_plan.empty()) {
1117
            // fill sequence column value for some rows
1118
4
            RETURN_IF_ERROR(fill_sequence_column(block, num_rows, read_plan, *skip_bitmaps));
1119
4
        }
1120
9
        RETURN_IF_ERROR(filter_block(block, num_rows, std::move(filter_column), duplicate_rows,
1121
9
                                     "__filter_insert_after_delete_col__"));
1122
9
    }
1123
265
    return Status::OK();
1124
265
}
1125
1126
Status BlockAggregator::filter_block(Block* block, size_t num_rows, MutableColumnPtr filter_column,
1127
9
                                     int duplicate_rows, std::string col_name) {
1128
9
    auto num_cols = block->columns();
1129
9
    block->insert({std::move(filter_column), std::make_shared<DataTypeUInt8>(), col_name});
1130
9
    RETURN_IF_ERROR(Block::filter_block(block, num_cols, num_cols));
1131
9
    DCHECK_EQ(num_cols, block->columns());
1132
9
    size_t merged_rows = num_rows - block->rows();
1133
9
    if (duplicate_rows != merged_rows) {
1134
0
        auto msg = fmt::format(
1135
0
                "filter_block_for_flexible_partial_update {}: duplicate_rows != merged_rows, "
1136
0
                "duplicate_keys={}, merged_rows={}, num_rows={}, mutable_block->rows()={}",
1137
0
                col_name, duplicate_rows, merged_rows, num_rows, block->rows());
1138
0
        DCHECK(false) << msg;
1139
0
        return Status::InternalError<false>(msg);
1140
0
    }
1141
9
    return Status::OK();
1142
9
}
1143
1144
Status BlockAggregator::convert_pk_columns(Block* block, size_t row_pos, size_t num_rows,
1145
544
                                           std::vector<IOlapColumnDataAccessor*>& key_columns) {
1146
544
    key_columns.clear();
1147
1.14k
    for (uint32_t cid {0}; cid < _tablet_schema.num_key_columns(); cid++) {
1148
604
        RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column(
1149
604
                block->get_by_position(cid), row_pos, num_rows, cid));
1150
604
        auto [status, column] = _writer._olap_data_convertor->convert_column_data(cid);
1151
604
        if (!status.ok()) {
1152
0
            return status;
1153
0
        }
1154
604
        key_columns.push_back(column);
1155
604
    }
1156
544
    return Status::OK();
1157
544
}
1158
1159
Status BlockAggregator::convert_seq_column(Block* block, size_t row_pos, size_t num_rows,
1160
545
                                           IOlapColumnDataAccessor*& seq_column) {
1161
545
    seq_column = nullptr;
1162
545
    if (_tablet_schema.has_sequence_col()) {
1163
65
        auto seq_col_idx = _tablet_schema.sequence_col_idx();
1164
65
        RETURN_IF_ERROR(_writer._olap_data_convertor->set_source_content_with_specifid_column(
1165
65
                block->get_by_position(seq_col_idx), row_pos, num_rows, seq_col_idx));
1166
65
        auto [status, column] = _writer._olap_data_convertor->convert_column_data(seq_col_idx);
1167
65
        if (!status.ok()) {
1168
0
            return status;
1169
0
        }
1170
65
        seq_column = column;
1171
65
    }
1172
545
    return Status::OK();
1173
545
};
1174
1175
Status BlockAggregator::aggregate_for_flexible_partial_update(
1176
        Block* block, size_t num_rows, const std::vector<RowsetSharedPtr>& specified_rowsets,
1177
265
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches) {
1178
265
    std::vector<IOlapColumnDataAccessor*> key_columns {};
1179
265
    IOlapColumnDataAccessor* seq_column {nullptr};
1180
1181
265
    RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
1182
265
    RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
1183
1184
    // 1. merge duplicate rows when table has sequence column
1185
    // When there are multiple rows with the same keys in memtable, some of them specify specify the sequence column,
1186
    // some of them don't. We can't do the de-duplication in memtable because we don't know the historical data. We must
1187
    // de-duplicate them here.
1188
265
    if (_tablet_schema.has_sequence_col()) {
1189
25
        RETURN_IF_ERROR(aggregate_for_sequence_column(block, static_cast<int>(num_rows),
1190
25
                                                      key_columns, seq_column, specified_rowsets,
1191
25
                                                      segment_caches));
1192
25
    }
1193
1194
    // 2. merge duplicate rows and handle insert after delete
1195
265
    if (block->rows() != num_rows) {
1196
15
        num_rows = block->rows();
1197
        // data in block has changed, should re-encode key columns, sequence column
1198
15
        _writer._olap_data_convertor->clear_source_content();
1199
15
        RETURN_IF_ERROR(convert_pk_columns(block, 0, num_rows, key_columns));
1200
15
        RETURN_IF_ERROR(convert_seq_column(block, 0, num_rows, seq_column));
1201
15
    }
1202
265
    RETURN_IF_ERROR(aggregate_for_insert_after_delete(block, num_rows, key_columns,
1203
265
                                                      specified_rowsets, segment_caches));
1204
265
    return Status::OK();
1205
265
}
1206
1207
} // namespace doris