Coverage Report

Created: 2026-05-28 00:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/vertical_segment_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/vertical_segment_writer.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/olap_file.pb.h>
22
#include <gen_cpp/segment_v2.pb.h>
23
#include <parallel_hashmap/phmap.h>
24
25
#include <algorithm>
26
#include <cassert>
27
#include <memory>
28
#include <ostream>
29
#include <string>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <utility>
33
34
#include "cloud/config.h"
35
#include "common/cast_set.h"
36
#include "common/compiler_util.h" // IWYU pragma: keep
37
#include "common/config.h"
38
#include "common/logging.h" // LOG
39
#include "common/status.h"
40
#include "core/assert_cast.h"
41
#include "core/block/block.h"
42
#include "core/block/column_with_type_and_name.h"
43
#include "core/column/column_nullable.h"
44
#include "core/column/column_string.h"
45
#include "core/column/column_vector.h"
46
#include "core/data_type/data_type.h"
47
#include "core/data_type/data_type_factory.hpp"
48
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
49
#include "core/types.h"
50
#include "exec/common/variant_util.h"
51
#include "io/fs/file_writer.h"
52
#include "io/fs/local_file_system.h"
53
#include "runtime/exec_env.h"
54
#include "runtime/memory/mem_tracker.h"
55
#include "service/point_query_executor.h"
56
#include "storage/data_dir.h"
57
#include "storage/index/index_file_writer.h"
58
#include "storage/index/inverted/inverted_index_desc.h"
59
#include "storage/index/inverted/inverted_index_fs_directory.h"
60
#include "storage/index/primary_key_index.h"
61
#include "storage/index/short_key_index.h"
62
#include "storage/iterator/olap_data_convertor.h"
63
#include "storage/key_coder.h"
64
#include "storage/olap_common.h"
65
#include "storage/partial_update_info.h"
66
#include "storage/row_cursor.h" // RowCursor // IWYU pragma: keep
67
#include "storage/rowset/rowset_fwd.h"
68
#include "storage/rowset/rowset_writer_context.h" // RowsetWriterContext
69
#include "storage/rowset/segment_creator.h"
70
#include "storage/segment/column_writer.h" // ColumnWriter
71
#include "storage/segment/external_col_meta_util.h"
72
#include "storage/segment/historical_row_retriever.h"
73
#include "storage/segment/page_io.h"
74
#include "storage/segment/page_pointer.h"
75
#include "storage/segment/segment_loader.h"
76
#include "storage/segment/variant/variant_ext_meta_writer.h"
77
#include "storage/tablet/base_tablet.h"
78
#include "storage/tablet/tablet_schema.h"
79
#include "storage/utils.h"
80
#include "util/coding.h"
81
#include "util/debug_points.h"
82
#include "util/faststring.h"
83
#include "util/json/path_in_data.h"
84
#include "util/jsonb/serialize.h"
85
namespace doris::segment_v2 {
86
87
using namespace ErrorCode;
88
using namespace KeyConsts;
89
90
static constexpr const char* k_segment_magic = "D0R1";
91
static constexpr uint32_t k_segment_magic_length = 4;
92
93
14
inline std::string vertical_segment_writer_mem_tracker_name(uint32_t segment_id) {
94
14
    return "VerticalSegmentWriter:Segment-" + std::to_string(segment_id);
95
14
}
96
97
0
static ColumnBitmap* get_mutable_skip_bitmap_column(Block* block, size_t skip_bitmap_col_idx) {
98
0
    auto skip_bitmap_column =
99
0
            IColumn::mutate(std::move(block->get_by_position(skip_bitmap_col_idx).column));
100
0
    auto* skip_bitmap_column_ptr = assert_cast<ColumnBitmap*>(skip_bitmap_column.get());
101
0
    block->replace_by_position(skip_bitmap_col_idx, std::move(skip_bitmap_column));
102
0
    return skip_bitmap_column_ptr;
103
0
}
104
105
VerticalSegmentWriter::VerticalSegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
106
                                             TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet,
107
                                             DataDir* data_dir,
108
                                             const VerticalSegmentWriterOptions& opts,
109
                                             IndexFileWriter* index_file_writer)
110
14
        : _segment_id(segment_id),
111
14
          _tablet_schema(std::move(tablet_schema)),
112
14
          _tablet(std::move(tablet)),
113
14
          _data_dir(data_dir),
114
14
          _opts(opts),
115
14
          _file_writer(file_writer),
116
14
          _index_file_writer(index_file_writer),
117
14
          _mem_tracker(std::make_unique<MemTracker>(
118
14
                  vertical_segment_writer_mem_tracker_name(segment_id))),
119
14
          _mow_context(std::move(opts.mow_ctx)),
120
14
          _block_aggregator(*this) {
121
14
    CHECK_NOTNULL(file_writer);
122
14
    _num_sort_key_columns = _tablet_schema->num_key_columns();
123
14
    _num_short_key_columns = _tablet_schema->num_short_key_columns();
124
14
    if (!_is_mow_with_cluster_key()) {
125
13
        DCHECK(_num_sort_key_columns >= _num_short_key_columns)
126
0
                << ", table_id=" << _tablet_schema->table_id()
127
0
                << ", num_key_columns=" << _num_sort_key_columns
128
0
                << ", num_short_key_columns=" << _num_short_key_columns
129
0
                << ", cluster_key_columns=" << _tablet_schema->cluster_key_uids().size();
130
13
    }
131
47
    for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) {
132
33
        const auto& column = _tablet_schema->column(cid);
133
33
        _key_coders.push_back(get_key_coder(column.type()));
134
33
        _key_index_size.push_back(cast_set<uint16_t>(column.index_length()));
135
33
    }
136
    // encode the sequence id into the primary key index
137
14
    if (_is_mow()) {
138
5
        if (_tablet_schema->has_sequence_col()) {
139
3
            const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
140
3
            _seq_coder = get_key_coder(column.type());
141
3
        }
142
        // encode the rowid into the primary key index
143
5
        if (_is_mow_with_cluster_key()) {
144
1
            _rowid_coder = get_key_coder(FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT);
145
            // primary keys
146
1
            _primary_key_coders.swap(_key_coders);
147
            // cluster keys
148
1
            _key_coders.clear();
149
1
            _key_index_size.clear();
150
1
            _num_sort_key_columns = _tablet_schema->cluster_key_uids().size();
151
2
            for (auto cid : _tablet_schema->cluster_key_uids()) {
152
2
                const auto& column = _tablet_schema->column_by_uid(cid);
153
2
                _key_coders.push_back(get_key_coder(column.type()));
154
2
                _key_index_size.push_back(cast_set<uint16_t>(column.index_length()));
155
2
            }
156
1
        }
157
5
    }
158
14
}
159
160
14
VerticalSegmentWriter::~VerticalSegmentWriter() {
161
14
    _mem_tracker->release(_mem_tracker->consumption());
162
14
}
163
164
void VerticalSegmentWriter::_init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
165
65
                                              const TabletColumn& column) {
166
65
    meta->set_column_id(column_id);
167
65
    meta->set_type(int(column.type()));
168
65
    meta->set_length(cast_set<int32_t>(column.length()));
169
65
    meta->set_encoding(DEFAULT_ENCODING);
170
65
    meta->set_compression(_opts.compression_type);
171
65
    meta->set_is_nullable(column.is_nullable());
172
65
    meta->set_default_value(column.default_value());
173
65
    meta->set_precision(column.precision());
174
65
    meta->set_frac(column.frac());
175
65
    if (column.has_path_info()) {
176
0
        column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(),
177
0
                                            column.parent_unique_id());
178
0
    }
179
65
    meta->set_unique_id(column.unique_id());
180
65
    for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
181
0
        _init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i));
182
0
    }
183
65
    if (column.is_variant_type()) {
184
0
        meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
185
0
        meta->set_variant_enable_doc_mode(column.variant_enable_doc_mode());
186
0
    }
187
65
    meta->set_result_is_nullable(column.get_result_is_nullable());
188
65
    meta->set_function_name(column.get_aggregation_name());
189
65
    meta->set_be_exec_version(column.get_be_exec_version());
190
65
}
191
192
Status VerticalSegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column,
193
65
                                                    const TabletSchemaSPtr& tablet_schema) {
194
65
    ColumnWriterOptions opts;
195
65
    opts.meta = _footer.add_columns();
196
197
65
    _init_column_meta(opts.meta, cid, column);
198
199
    // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
200
    // except for columns whose type don't support zone map.
201
65
    opts.need_zone_map = column.is_key() || tablet_schema->keys_type() != KeysType::AGG_KEYS;
202
65
    opts.need_bloom_filter = column.is_bf_column();
203
65
    if (opts.need_bloom_filter) {
204
0
        opts.bf_options.fpp =
205
0
                tablet_schema->has_bf_fpp() ? tablet_schema->bloom_filter_fpp() : 0.05;
206
0
    }
207
65
    auto* tablet_index = tablet_schema->get_ngram_bf_index(column.unique_id());
208
65
    if (tablet_index) {
209
0
        opts.need_bloom_filter = true;
210
0
        opts.is_ngram_bf_index = true;
211
        //narrow convert from int32_t to uint8_t and uint16_t which is dangerous
212
0
        auto gram_size = tablet_index->get_gram_size();
213
0
        auto gram_bf_size = tablet_index->get_gram_bf_size();
214
0
        if (gram_size > 256 || gram_size < 1) {
215
0
            return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ",
216
0
                                        gram_size);
217
0
        }
218
0
        if (gram_bf_size > 65535 || gram_bf_size < 64) {
219
0
            return Status::NotSupported("Do not support ngram bloom filter for bf_size: ",
220
0
                                        gram_bf_size);
221
0
        }
222
0
        opts.gram_size = cast_set<uint8_t>(gram_size);
223
0
        opts.gram_bf_size = cast_set<uint16_t>(gram_bf_size);
224
0
    }
225
226
65
    bool skip_inverted_index = false;
227
65
    if (_opts.rowset_ctx != nullptr) {
228
        // skip write inverted index for index compaction column
229
65
        skip_inverted_index =
230
65
                _opts.rowset_ctx->columns_to_do_index_compaction.contains(column.unique_id());
231
65
    }
232
    // skip write inverted index on load if skip_write_index_on_load is true
233
65
    if (_opts.write_type == DataWriteType::TYPE_DIRECT &&
234
65
        tablet_schema->skip_write_index_on_load()) {
235
0
        skip_inverted_index = true;
236
0
    }
237
65
    if (!skip_inverted_index) {
238
65
        auto inverted_indexs = tablet_schema->inverted_indexs(column);
239
65
        if (!inverted_indexs.empty()) {
240
0
            opts.inverted_indexes = inverted_indexs;
241
0
            opts.need_inverted_index = true;
242
0
            DCHECK(_index_file_writer != nullptr);
243
0
        }
244
65
    }
245
65
    opts.index_file_writer = _index_file_writer;
246
247
65
    if (const auto& index = tablet_schema->ann_index(column); index != nullptr) {
248
0
        opts.ann_index = index;
249
0
        opts.need_ann_index = true;
250
0
        DCHECK(_index_file_writer != nullptr);
251
0
        opts.index_file_writer = _index_file_writer;
252
0
    }
253
254
65
#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE)                     \
255
585
    if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \
256
0
        opts.need_zone_map = false;                           \
257
0
        opts.need_bloom_filter = false;                       \
258
0
    }
259
260
65
    DISABLE_INDEX_IF_FIELD_TYPE(STRUCT)
261
65
    DISABLE_INDEX_IF_FIELD_TYPE(ARRAY)
262
65
    DISABLE_INDEX_IF_FIELD_TYPE(JSONB)
263
65
    DISABLE_INDEX_IF_FIELD_TYPE(AGG_STATE)
264
65
    DISABLE_INDEX_IF_FIELD_TYPE(MAP)
265
65
    DISABLE_INDEX_IF_FIELD_TYPE(BITMAP)
266
65
    DISABLE_INDEX_IF_FIELD_TYPE(HLL)
267
65
    DISABLE_INDEX_IF_FIELD_TYPE(QUANTILE_STATE)
268
65
    DISABLE_INDEX_IF_FIELD_TYPE(VARIANT)
269
270
65
#undef DISABLE_INDEX_IF_FIELD_TYPE
271
272
65
#undef CHECK_FIELD_TYPE
273
274
65
    int64_t storage_page_size = _tablet_schema->storage_page_size();
275
    // storage_page_size must be between 4KB and 10MB.
276
65
    if (storage_page_size >= 4096 && storage_page_size <= 10485760) {
277
65
        opts.data_page_size = storage_page_size;
278
65
    }
279
65
    opts.dict_page_size = _tablet_schema->storage_dict_page_size();
280
65
    DBUG_EXECUTE_IF("VerticalSegmentWriter._create_column_writer.storage_page_size", {
281
65
        auto table_id = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
282
65
                "VerticalSegmentWriter._create_column_writer.storage_page_size", "table_id",
283
65
                INT_MIN);
284
65
        auto target_data_page_size = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
285
65
                "VerticalSegmentWriter._create_column_writer.storage_page_size",
286
65
                "storage_page_size", INT_MIN);
287
65
        if (table_id == INT_MIN || target_data_page_size == INT_MIN) {
288
65
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
289
65
                    "Debug point parameters missing: either 'table_id' or 'storage_page_size' not "
290
65
                    "set.");
291
65
        }
292
65
        if (table_id == _tablet_schema->table_id() &&
293
65
            opts.data_page_size != target_data_page_size) {
294
65
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
295
65
                    "Mismatch in 'storage_page_size': expected size does not match the current "
296
65
                    "data page size. "
297
65
                    "Expected: " +
298
65
                    std::to_string(target_data_page_size) +
299
65
                    ", Actual: " + std::to_string(opts.data_page_size) + ".");
300
65
        }
301
65
    })
302
65
    if (column.is_row_store_column()) {
303
        // smaller page size for row store column
304
0
        auto page_size = _tablet_schema->row_store_page_size();
305
0
        opts.data_page_size =
306
0
                (page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
307
        // Row store data is already serialized as a single blob. Keep it on plain pages
308
        // to avoid introducing dictionary pages for the hidden row store column.
309
0
        opts.meta->set_encoding(_tablet_schema->binary_plain_encoding_default_impl() ==
310
0
                                                BinaryPlainEncodingTypePB::BINARY_PLAIN_ENCODING_V2
311
0
                                        ? PLAIN_ENCODING_V2
312
0
                                        : PLAIN_ENCODING);
313
0
    }
314
315
65
    opts.rowset_ctx = _opts.rowset_ctx;
316
65
    opts.file_writer = _file_writer;
317
65
    opts.compression_type = _opts.compression_type;
318
65
    opts.footer = &_footer;
319
65
    opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
320
321
65
    opts.encoding_preference = {.integer_type_default_use_plain_encoding =
322
65
                                        _tablet_schema->integer_type_default_use_plain_encoding(),
323
65
                                .binary_plain_encoding_default_impl =
324
65
                                        _tablet_schema->binary_plain_encoding_default_impl()};
325
65
    std::unique_ptr<ColumnWriter> writer;
326
65
    RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
327
65
    RETURN_IF_ERROR(writer->init());
328
65
    _column_writers[cid] = std::move(writer);
329
65
    _olap_data_convertor->add_column_data_convertor_at(column, cid);
330
65
    return Status::OK();
331
65
};
332
333
14
Status VerticalSegmentWriter::init() {
334
14
    DCHECK(_column_writers.empty());
335
14
    if (_opts.compression_type == UNKNOWN_COMPRESSION) {
336
0
        _opts.compression_type = _tablet_schema->compression_type();
337
0
    }
338
14
    _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
339
14
    _olap_data_convertor->resize(_tablet_schema->num_columns());
340
14
    _column_writers.resize(_tablet_schema->num_columns());
341
    // we don't need the short key index for unique key merge on write table.
342
14
    if (_is_mow()) {
343
5
        size_t seq_col_length = 0;
344
5
        if (_tablet_schema->has_sequence_col()) {
345
3
            seq_col_length =
346
3
                    _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
347
3
        }
348
5
        size_t rowid_length = 0;
349
5
        if (_is_mow_with_cluster_key()) {
350
1
            rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
351
1
            _short_key_index_builder.reset(
352
1
                    new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
353
1
        }
354
5
        _primary_key_index_builder.reset(
355
5
                new PrimaryKeyIndexBuilder(_file_writer, seq_col_length, rowid_length));
356
5
        RETURN_IF_ERROR(_primary_key_index_builder->init());
357
9
    } else {
358
9
        _short_key_index_builder.reset(
359
9
                new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
360
9
    }
361
14
    return Status::OK();
362
14
}
363
364
12
void VerticalSegmentWriter::_maybe_invalid_row_cache(const std::string& key) const {
365
    // Just invalid row cache for simplicity, since the rowset is not visible at present.
366
    // If we update/insert cache, if load failed rowset will not be visible but cached data
367
    // will be visible, and lead to inconsistency.
368
12
    if (!config::disable_storage_row_cache && _tablet_schema->has_row_store_for_all_columns() &&
369
12
        _opts.write_type == DataWriteType::TYPE_DIRECT) {
370
        // invalidate cache
371
0
        RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
372
0
    }
373
12
}
374
375
Status VerticalSegmentWriter::_append_row_store_column(const Block& block, size_t row_pos,
376
0
                                                       size_t num_rows, uint32_t cid) {
377
0
    DCHECK(_tablet_schema->column(cid).is_row_store_column());
378
0
    if (num_rows == 0) {
379
0
        return Status::OK();
380
0
    }
381
0
    DCHECK_LE(row_pos + num_rows, block.rows());
382
383
0
    auto serdes = create_data_type_serdes(block.get_data_types());
384
0
    std::unordered_set<int32_t> row_store_cids_set(_tablet_schema->row_columns_uids().begin(),
385
0
                                                   _tablet_schema->row_columns_uids().end());
386
0
    size_t end_pos = row_pos + num_rows;
387
0
    size_t batch_rows = _opts.num_rows_per_block;
388
0
    static constexpr size_t kRowStoreBatchBytes = 4 * 1024 * 1024;
389
0
    DCHECK_GT(batch_rows, 0);
390
0
    for (size_t pos = row_pos; pos < end_pos;) {
391
0
        size_t max_rows = std::min(batch_rows, end_pos - pos);
392
0
        auto row_column = ColumnString::create();
393
0
        auto* row_store_column = row_column.get();
394
0
        size_t rows = JsonbSerializeUtil::block_to_jsonb(
395
0
                *_tablet_schema, block, *row_store_column,
396
0
                cast_set<int>(_tablet_schema->num_columns()), serdes, row_store_cids_set, pos,
397
0
                max_rows, kRowStoreBatchBytes);
398
0
        DCHECK_GT(rows, 0);
399
400
0
        auto typed_column = block.get_by_position(cid);
401
0
        typed_column.column = std::move(row_column);
402
0
        RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
403
0
                typed_column, 0, rows, cid));
404
0
        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
405
0
        RETURN_IF_ERROR(status);
406
0
        RETURN_IF_ERROR(
407
0
                _column_writers[cid]->append(column->get_nullmap(), column->get_data(), rows));
408
0
        _olap_data_convertor->clear_source_content(cid);
409
0
        pos += rows;
410
0
    }
411
0
    return Status::OK();
412
0
}
413
414
Status VerticalSegmentWriter::_probe_key_for_mow(
415
        std::string key, std::size_t segment_pos, bool have_input_seq_column, bool have_delete_sign,
416
        const std::vector<RowsetSharedPtr>& specified_rowsets,
417
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
418
        bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
419
        const std::function<void(const RowLocation& loc)>& found_cb,
420
0
        const std::function<Status()>& not_found_cb, PartialUpdateStats& stats) {
421
0
    RowLocation loc;
422
    // save rowset shared ptr so this rowset wouldn't delete
423
0
    RowsetSharedPtr rowset;
424
0
    auto st = _tablet->lookup_row_key(key, _tablet_schema.get(), have_input_seq_column,
425
0
                                      specified_rowsets, &loc, _mow_context->max_version,
426
0
                                      segment_caches, &rowset);
427
0
    if (st.is<KEY_NOT_FOUND>()) {
428
0
        if (!have_delete_sign) {
429
0
            RETURN_IF_ERROR(not_found_cb());
430
0
        }
431
0
        ++stats.num_rows_new_added;
432
0
        has_default_or_nullable = true;
433
0
        use_default_or_null_flag.emplace_back(true);
434
0
        return Status::OK();
435
0
    }
436
0
    if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
437
0
        LOG(WARNING) << "failed to lookup row key, error: " << st;
438
0
        return st;
439
0
    }
440
441
    // 1. if the delete sign is marked, it means that the value columns of the row will not
442
    //    be read. So we don't need to read the missing values from the previous rows.
443
    // 2. the one exception is when there are sequence columns in the table, we need to read
444
    //    the sequence columns, otherwise it may cause the merge-on-read based compaction
445
    //    policy to produce incorrect results
446
447
    // 3. In flexible partial update, we may delete the existing rows before if there exists
448
    //    insert after delete in one load. In this case, the insert should also be treated
449
    //    as newly inserted rows, note that the sequence column value is filled in
450
    //    BlockAggregator::aggregate_for_insert_after_delete() if this row doesn't specify the sequence column
451
0
    if (st.is<KEY_ALREADY_EXISTS>() || (have_delete_sign && !_tablet_schema->has_sequence_col()) ||
452
0
        (_opts.rowset_ctx->partial_update_info->is_flexible_partial_update() &&
453
0
         _mow_context->delete_bitmap->contains(
454
0
                 {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id))) {
455
0
        has_default_or_nullable = true;
456
0
        use_default_or_null_flag.emplace_back(true);
457
0
    } else {
458
        // partial update should not contain invisible columns
459
0
        use_default_or_null_flag.emplace_back(false);
460
0
        _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
461
0
        found_cb(loc);
462
0
    }
463
464
0
    if (st.is<KEY_ALREADY_EXISTS>()) {
465
        // although we need to mark delete current row, we still need to read missing columns
466
        // for this row, we need to ensure that each column is aligned
467
0
        _mow_context->delete_bitmap->add(
468
0
                {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
469
0
                cast_set<uint32_t>(segment_pos));
470
0
        ++stats.num_rows_deleted;
471
0
    } else {
472
0
        _mow_context->delete_bitmap->add(
473
0
                {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
474
0
        ++stats.num_rows_updated;
475
0
    }
476
0
    return Status::OK();
477
0
}
478
479
65
Status VerticalSegmentWriter::_check_column_writer_disk_capacity(size_t cid) {
480
65
    if (_data_dir != nullptr &&
481
65
        _data_dir->reach_capacity_limit(_column_writers[cid]->estimate_buffer_size())) {
482
0
        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
483
0
                                                        _data_dir->path_hash());
484
0
    }
485
65
    return Status::OK();
486
65
}
487
488
65
Status VerticalSegmentWriter::_finalize_column_writer_and_update_meta(size_t cid) {
489
65
    RETURN_IF_ERROR(_column_writers[cid]->finish());
490
65
    RETURN_IF_ERROR(_column_writers[cid]->write_data());
491
492
65
    auto* column_meta = _column_writers[cid]->get_column_meta();
493
65
    column_meta->set_compressed_data_bytes(
494
65
            _column_writers[cid]->get_total_compressed_data_pages_bytes());
495
65
    column_meta->set_uncompressed_data_bytes(
496
65
            _column_writers[cid]->get_total_uncompressed_data_pages_bytes());
497
65
    column_meta->set_raw_data_bytes(_column_writers[cid]->get_raw_data_bytes());
498
65
    return Status::OK();
499
65
}
500
501
Status VerticalSegmentWriter::_partial_update_preconditions_check(size_t row_pos,
502
0
                                                                  bool is_flexible_update) {
503
0
    if (!_is_mow()) {
504
0
        auto msg = fmt::format(
505
0
                "Can only do partial update on merge-on-write unique table, but found: "
506
0
                "keys_type={}, _opts.enable_unique_key_merge_on_write={}, tablet_id={}",
507
0
                _tablet_schema->keys_type(), _opts.enable_unique_key_merge_on_write,
508
0
                _tablet->tablet_id());
509
0
        DCHECK(false) << msg;
510
0
        return Status::InternalError<false>(msg);
511
0
    }
512
0
    if (_opts.rowset_ctx->partial_update_info == nullptr) {
513
0
        auto msg =
514
0
                fmt::format("partial_update_info should not be nullptr, please check, tablet_id={}",
515
0
                            _tablet->tablet_id());
516
0
        DCHECK(false) << msg;
517
0
        return Status::InternalError<false>(msg);
518
0
    }
519
0
    if (!is_flexible_update) {
520
0
        if (!_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
521
0
            auto msg = fmt::format(
522
0
                    "in fixed partial update code, but update_mode={}, please check, tablet_id={}",
523
0
                    _opts.rowset_ctx->partial_update_info->update_mode(), _tablet->tablet_id());
524
0
            DCHECK(false) << msg;
525
0
            return Status::InternalError<false>(msg);
526
0
        }
527
0
    } else {
528
0
        if (!_opts.rowset_ctx->partial_update_info->is_flexible_partial_update()) {
529
0
            auto msg = fmt::format(
530
0
                    "in flexible partial update code, but update_mode={}, please check, "
531
0
                    "tablet_id={}",
532
0
                    _opts.rowset_ctx->partial_update_info->update_mode(), _tablet->tablet_id());
533
0
            DCHECK(false) << msg;
534
0
            return Status::InternalError<false>(msg);
535
0
        }
536
0
    }
537
0
    if (row_pos != 0) {
538
0
        auto msg = fmt::format("row_pos should be 0, but found {}, tablet_id={}", row_pos,
539
0
                               _tablet->tablet_id());
540
0
        DCHECK(false) << msg;
541
0
        return Status::InternalError<false>(msg);
542
0
    }
543
0
    return Status::OK();
544
0
}
545
546
// for partial update, we should do following steps to fill content of block:
547
// 1. set block data to data convertor, and get all key_column's converted slice
548
// 2. get pk of input block, and read missing columns
549
//       2.1 first find key location{rowset_id, segment_id, row_id}
550
//       2.2 build read plan to read by batch
551
//       2.3 fill block
552
// 3. set columns to data convertor and then write all columns
553
Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& data,
554
0
                                                                 Block& full_block) {
555
0
    DBUG_EXECUTE_IF("_append_block_with_partial_content.block", DBUG_BLOCK);
556
557
0
    RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos, false));
558
    // create full block and fill with input columns
559
0
    full_block = _tablet_schema->create_block();
560
0
    const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids;
561
0
    size_t input_id = 0;
562
0
    for (auto i : including_cids) {
563
0
        full_block.replace_by_position(i, data.block->get_by_position(input_id++).column);
564
0
    }
565
566
0
    if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
567
0
        _tablet_schema->num_variant_columns() > 0) {
568
0
        RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
569
0
                full_block, *_tablet_schema, including_cids));
570
0
    }
571
0
    bool have_input_seq_column = false;
572
    // write including columns
573
0
    std::vector<IOlapColumnDataAccessor*> key_columns;
574
0
    IOlapColumnDataAccessor* seq_column = nullptr;
575
0
    uint32_t segment_start_pos = 0;
576
0
    for (auto cid : including_cids) {
577
0
        RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
578
0
        RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
579
0
                &full_block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}));
580
        // here we get segment column row num before append data.
581
0
        segment_start_pos = cast_set<uint32_t>(_column_writers[cid]->get_next_rowid());
582
        // olap data convertor alway start from id = 0
583
0
        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
584
0
        if (!status.ok()) {
585
0
            return status;
586
0
        }
587
0
        if (cid < _num_sort_key_columns) {
588
0
            key_columns.push_back(column);
589
0
        } else if (_tablet_schema->has_sequence_col() &&
590
0
                   cid == _tablet_schema->sequence_col_idx()) {
591
0
            seq_column = column;
592
0
            have_input_seq_column = true;
593
0
        }
594
0
        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
595
0
                                                     data.num_rows));
596
0
        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
597
        // Don't clear source content for key columns and sequence column here,
598
        // as they will be used later in _full_encode_keys() and _generate_primary_key_index().
599
        // They will be cleared at the end of this method.
600
0
        bool is_key_column = (cid < _num_sort_key_columns);
601
0
        bool is_seq_column = (_tablet_schema->has_sequence_col() &&
602
0
                              cid == _tablet_schema->sequence_col_idx() && have_input_seq_column);
603
0
        if (!is_key_column && !is_seq_column) {
604
0
            _olap_data_convertor->clear_source_content(cid);
605
0
        }
606
0
    }
607
608
0
    bool has_default_or_nullable = false;
609
0
    std::vector<bool> use_default_or_null_flag;
610
0
    use_default_or_null_flag.reserve(data.num_rows);
611
0
    const auto* delete_signs =
612
0
            BaseTablet::get_delete_sign_column_data(full_block, data.row_pos + data.num_rows);
613
614
0
    DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep",
615
0
                    { sleep(60); })
616
0
    const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
617
0
    std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
618
619
0
    FixedReadPlan read_plan;
620
621
    // locate rows in base data
622
0
    PartialUpdateStats stats;
623
624
0
    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) {
625
        // block   segment
626
        //   2   ->   0
627
        //   3   ->   1
628
        //   4   ->   2
629
        //   5   ->   3
630
        // here row_pos = 2, num_rows = 4.
631
0
        size_t delta_pos = block_pos - data.row_pos;
632
0
        size_t segment_pos = segment_start_pos + delta_pos;
633
0
        std::string key = _full_encode_keys(key_columns, delta_pos);
634
0
        _maybe_invalid_row_cache(key);
635
0
        if (have_input_seq_column) {
636
0
            _encode_seq_column(seq_column, delta_pos, &key);
637
0
        }
638
        // If the table have sequence column, and the include-cids don't contain the sequence
639
        // column, we need to update the primary key index builder at the end of this method.
640
        // At that time, we have a valid sequence column to encode the key with seq col.
641
0
        if (!_tablet_schema->has_sequence_col() || have_input_seq_column) {
642
0
            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
643
0
        }
644
645
        // mark key with delete sign as deleted.
646
0
        bool have_delete_sign = (delete_signs != nullptr && delete_signs[block_pos] != 0);
647
648
0
        auto not_found_cb = [&]() {
649
0
            return _opts.rowset_ctx->partial_update_info->handle_new_key(
650
0
                    *_tablet_schema, [&]() -> std::string {
651
0
                        return data.block->dump_one_line(block_pos,
652
0
                                                         cast_set<int>(_num_sort_key_columns));
653
0
                    });
654
0
        };
655
0
        auto update_read_plan = [&](const RowLocation& loc) {
656
0
            read_plan.prepare_to_read(loc, segment_pos);
657
0
        };
658
0
        RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column,
659
0
                                           have_delete_sign, specified_rowsets, segment_caches,
660
0
                                           has_default_or_nullable, use_default_or_null_flag,
661
0
                                           update_read_plan, not_found_cb, stats));
662
0
    }
663
0
    CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);
664
665
0
    if (config::enable_merge_on_write_correctness_check) {
666
0
        _tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
667
0
                                                    *_mow_context->rowset_ids);
668
0
    }
669
670
    // read to fill full_block
671
0
    RETURN_IF_ERROR(read_plan.fill_missing_columns(
672
0
            _opts.rowset_ctx->make_historical_row_retriever_context(), _rsid_to_rowset,
673
0
            *_tablet_schema, full_block, use_default_or_null_flag, has_default_or_nullable,
674
0
            segment_start_pos, data.block));
675
676
0
    if (_tablet_schema->num_variant_columns() > 0) {
677
0
        RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
678
0
                full_block, *_tablet_schema, _opts.rowset_ctx->partial_update_info->missing_cids));
679
0
    }
680
681
    // convert missing columns and send to column writer
682
0
    const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
683
0
    for (auto cid : missing_cids) {
684
0
        RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
685
0
        if (_tablet_schema->column(cid).is_row_store_column()) {
686
0
            RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, data.num_rows, cid));
687
0
            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
688
0
            continue;
689
0
        }
690
0
        RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
691
0
                &full_block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}));
692
0
        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
693
0
        if (!status.ok()) {
694
0
            return status;
695
0
        }
696
0
        if (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
697
0
            cid == _tablet_schema->sequence_col_idx()) {
698
0
            DCHECK_EQ(seq_column, nullptr);
699
0
            seq_column = column;
700
0
        }
701
0
        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
702
0
                                                     data.num_rows));
703
0
        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
704
        // Don't clear source content for sequence column here if it will be used later
705
        // in _generate_primary_key_index(). It will be cleared at the end of this method.
706
0
        bool is_seq_column = (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
707
0
                              cid == _tablet_schema->sequence_col_idx());
708
0
        if (!is_seq_column) {
709
0
            _olap_data_convertor->clear_source_content(cid);
710
0
        }
711
0
    }
712
713
0
    _num_rows_updated += stats.num_rows_updated;
714
0
    _num_rows_deleted += stats.num_rows_deleted;
715
0
    _num_rows_new_added += stats.num_rows_new_added;
716
0
    _num_rows_filtered += stats.num_rows_filtered;
717
0
    if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
718
0
        DCHECK_NE(seq_column, nullptr);
719
0
        if (_num_rows_written != data.row_pos ||
720
0
            _primary_key_index_builder->num_rows() != _num_rows_written) {
721
0
            return Status::InternalError(
722
0
                    "Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
723
0
                    "index builder num rows: {}",
724
0
                    _num_rows_written, data.row_pos, _primary_key_index_builder->num_rows());
725
0
        }
726
0
        RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
727
0
                                                    data.num_rows, false));
728
0
    }
729
730
0
    _num_rows_written += data.num_rows;
731
0
    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
732
0
            << "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
733
0
            << ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
734
0
    _olap_data_convertor->clear_source_content();
735
0
    return Status::OK();
736
0
}
737
738
Status VerticalSegmentWriter::_append_block_with_flexible_partial_content(RowsInBlock& data,
739
0
                                                                          Block& full_block) {
740
0
    RETURN_IF_ERROR(_partial_update_preconditions_check(data.row_pos, true));
741
742
    // data.block has the same schema with full_block
743
0
    DCHECK(data.block->columns() == _tablet_schema->num_columns());
744
745
    // create full block and fill with sort key columns
746
0
    full_block = _tablet_schema->create_block();
747
748
    // Use _num_rows_written instead of creating column writer 0, since all column writers
749
    // should have the same row count, which equals _num_rows_written.
750
0
    uint32_t segment_start_pos = cast_set<uint32_t>(_num_rows_written);
751
752
0
    DCHECK(_tablet_schema->has_skip_bitmap_col());
753
0
    auto skip_bitmap_col_idx = _tablet_schema->skip_bitmap_col_idx();
754
755
0
    bool has_default_or_nullable = false;
756
0
    std::vector<bool> use_default_or_null_flag;
757
0
    use_default_or_null_flag.reserve(data.num_rows);
758
759
0
    int32_t seq_map_col_unique_id = _opts.rowset_ctx->partial_update_info->sequence_map_col_uid();
760
0
    bool schema_has_sequence_col = _tablet_schema->has_sequence_col();
761
762
0
    DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_flexible_partial_content.sleep",
763
0
                    { sleep(60); })
764
0
    const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
765
0
    std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
766
767
    // Ensure all primary key column writers and sequence column writer are created before
768
    // aggregate_for_flexible_partial_update, because it internally calls convert_pk_columns
769
    // and convert_seq_column which need the convertors in _olap_data_convertor
770
0
    for (uint32_t cid = 0; cid < _tablet_schema->num_key_columns(); ++cid) {
771
0
        RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
772
0
    }
773
0
    if (schema_has_sequence_col) {
774
0
        uint32_t cid = _tablet_schema->sequence_col_idx();
775
0
        RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
776
0
    }
777
778
    // 1. aggregate duplicate rows in block
779
0
    RETURN_IF_ERROR(_block_aggregator.aggregate_for_flexible_partial_update(
780
0
            const_cast<Block*>(data.block), data.num_rows, specified_rowsets, segment_caches));
781
0
    if (data.block->rows() != data.num_rows) {
782
0
        data.num_rows = data.block->rows();
783
0
        _olap_data_convertor->clear_source_content();
784
0
    }
785
786
    // 2. encode primary key columns
787
    // we can only encode primary key columns currently becasue all non-primary columns in flexible partial update
788
    // can have missing cells
789
0
    std::vector<IOlapColumnDataAccessor*> key_columns {};
790
0
    RETURN_IF_ERROR(_block_aggregator.convert_pk_columns(const_cast<Block*>(data.block),
791
0
                                                         data.row_pos, data.num_rows, key_columns));
792
    // 3. encode sequence column
793
    // We encode the seguence column even thought it may have invalid values in some rows because we need to
794
    // encode the value of sequence column in key for rows that have a valid value in sequence column during
795
    // lookup_raw_key. We will encode the sequence column again at the end of this method. At that time, we have
796
    // a valid sequence column to encode the key with seq col.
797
0
    IOlapColumnDataAccessor* seq_column {nullptr};
798
0
    RETURN_IF_ERROR(_block_aggregator.convert_seq_column(const_cast<Block*>(data.block),
799
0
                                                         data.row_pos, data.num_rows, seq_column));
800
801
0
    auto* mutable_block = const_cast<Block*>(data.block);
802
0
    std::vector<BitmapValue>* skip_bitmaps =
803
0
            &get_mutable_skip_bitmap_column(mutable_block, skip_bitmap_col_idx)->get_data();
804
0
    const auto* delete_signs =
805
0
            BaseTablet::get_delete_sign_column_data(*data.block, data.row_pos + data.num_rows);
806
0
    DCHECK(delete_signs != nullptr);
807
808
0
    for (std::size_t cid {0}; cid < _tablet_schema->num_key_columns(); cid++) {
809
0
        full_block.replace_by_position(cid, data.block->get_by_position(cid).column);
810
0
    }
811
812
    // 4. write primary key columns data
813
0
    for (std::size_t cid {0}; cid < _tablet_schema->num_key_columns(); cid++) {
814
0
        const auto& column = key_columns[cid];
815
0
        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
816
0
        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
817
0
                                                     data.num_rows));
818
0
        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows);
819
0
        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
820
0
    }
821
822
    // 5. genreate read plan
823
0
    FlexibleReadPlan read_plan {_tablet_schema->has_row_store_for_all_columns()};
824
0
    PartialUpdateStats stats;
825
0
    RETURN_IF_ERROR(_generate_flexible_read_plan(
826
0
            read_plan, data, segment_start_pos, schema_has_sequence_col, seq_map_col_unique_id,
827
0
            skip_bitmaps, key_columns, seq_column, delete_signs, specified_rowsets, segment_caches,
828
0
            has_default_or_nullable, use_default_or_null_flag, stats));
829
0
    CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);
830
831
0
    if (config::enable_merge_on_write_correctness_check) {
832
0
        _tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
833
0
                                                    *_mow_context->rowset_ids);
834
0
    }
835
836
    // 6. read according plan to fill full_block
837
0
    RETURN_IF_ERROR(read_plan.fill_non_primary_key_columns(
838
0
            _opts.rowset_ctx->make_historical_row_retriever_context(), _rsid_to_rowset,
839
0
            *_tablet_schema, full_block, use_default_or_null_flag, has_default_or_nullable,
840
0
            segment_start_pos, cast_set<uint32_t>(data.row_pos), data.block, skip_bitmaps));
841
842
    // TODO(bobhan1): should we replace the skip bitmap column with empty bitmaps to reduce storage occupation?
843
    // this column is not needed in read path for merge-on-write table
844
845
    // 7. fill row store column
846
0
    for (auto cid = _tablet_schema->num_key_columns(); cid < _tablet_schema->num_columns(); cid++) {
847
0
        if (!_tablet_schema->column(cid).is_row_store_column()) {
848
0
            continue;
849
0
        }
850
0
        RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid), _tablet_schema->column(cid),
851
0
                                              _tablet_schema));
852
0
        RETURN_IF_ERROR(_append_row_store_column(full_block, data.row_pos, data.num_rows,
853
0
                                                 cast_set<uint32_t>(cid)));
854
0
        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
855
0
    }
856
857
0
    std::vector<uint32_t> column_ids;
858
0
    for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
859
0
        column_ids.emplace_back(i);
860
0
    }
861
0
    if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
862
0
        _tablet_schema->num_variant_columns() > 0) {
863
0
        RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
864
0
                full_block, *_tablet_schema, column_ids));
865
0
    }
866
867
    // 8. encode and write all non-primary key columns(including sequence column if exists)
868
0
    for (auto cid = _tablet_schema->num_key_columns(); cid < _tablet_schema->num_columns(); cid++) {
869
0
        if (_tablet_schema->column(cid).is_row_store_column()) {
870
0
            continue;
871
0
        }
872
0
        if (cid != _tablet_schema->sequence_col_idx()) {
873
0
            RETURN_IF_ERROR(_create_column_writer(cast_set<uint32_t>(cid),
874
0
                                                  _tablet_schema->column(cid), _tablet_schema));
875
0
        }
876
0
        RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_column(
877
0
                full_block.get_by_position(cid), data.row_pos, data.num_rows,
878
0
                cast_set<uint32_t>(cid)));
879
0
        auto [status, column] = _olap_data_convertor->convert_column_data(cid);
880
0
        if (!status.ok()) {
881
0
            return status;
882
0
        }
883
0
        if (cid == _tablet_schema->sequence_col_idx()) {
884
            // should use the latest encoded sequence column to build the primary index
885
0
            seq_column = column;
886
0
        }
887
0
        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written);
888
0
        RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
889
0
                                                     data.num_rows));
890
0
        DCHECK(_column_writers[cid]->get_next_rowid() == _num_rows_written + data.num_rows);
891
0
        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
892
0
    }
893
894
0
    _num_rows_updated += stats.num_rows_updated;
895
0
    _num_rows_deleted += stats.num_rows_deleted;
896
0
    _num_rows_new_added += stats.num_rows_new_added;
897
0
    _num_rows_filtered += stats.num_rows_filtered;
898
899
0
    if (_num_rows_written != data.row_pos ||
900
0
        _primary_key_index_builder->num_rows() != _num_rows_written) {
901
0
        return Status::InternalError(
902
0
                "Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
903
0
                "index builder num rows: {}",
904
0
                _num_rows_written, data.row_pos, _primary_key_index_builder->num_rows());
905
0
    }
906
907
    // 9. build primary key index
908
0
    RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column, data.num_rows,
909
0
                                                false));
910
911
0
    _num_rows_written += data.num_rows;
912
0
    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
913
0
            << "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
914
0
            << ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
915
0
    _olap_data_convertor->clear_source_content();
916
0
    return Status::OK();
917
0
}
918
919
Status VerticalSegmentWriter::_generate_encoded_default_seq_value(const TabletSchema& tablet_schema,
920
                                                                  const PartialUpdateInfo& info,
921
0
                                                                  std::string* encoded_value) {
922
0
    const auto& seq_column = tablet_schema.column(tablet_schema.sequence_col_idx());
923
0
    auto block = tablet_schema.create_block_by_cids(
924
0
            {cast_set<uint32_t>(tablet_schema.sequence_col_idx())});
925
0
    if (seq_column.has_default_value()) {
926
0
        auto idx = tablet_schema.sequence_col_idx() - tablet_schema.num_key_columns();
927
0
        const auto& default_value = info.default_values[idx];
928
0
        StringRef str {default_value};
929
0
        RETURN_IF_ERROR(block.get_by_position(0).type->get_serde()->default_from_string(
930
0
                str, *block.get_by_position(0).column->assert_mutable().get()));
931
932
0
    } else {
933
0
        block.get_by_position(0).column->assert_mutable()->insert_default();
934
0
    }
935
0
    DCHECK_EQ(block.rows(), 1);
936
0
    auto olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
937
0
    olap_data_convertor->add_column_data_convertor(seq_column);
938
0
    olap_data_convertor->set_source_content(&block, 0, 1);
939
0
    auto [status, column] = olap_data_convertor->convert_column_data(0);
940
0
    if (!status.ok()) {
941
0
        return status;
942
0
    }
943
    // include marker
944
0
    _encode_seq_column(column, 0, encoded_value);
945
0
    return Status::OK();
946
0
}
947
948
Status VerticalSegmentWriter::_generate_flexible_read_plan(
949
        FlexibleReadPlan& read_plan, RowsInBlock& data, size_t segment_start_pos,
950
        bool schema_has_sequence_col, int32_t seq_map_col_unique_id,
951
        std::vector<BitmapValue>* skip_bitmaps,
952
        const std::vector<IOlapColumnDataAccessor*>& key_columns,
953
        IOlapColumnDataAccessor* seq_column, const signed char* delete_signs,
954
        const std::vector<RowsetSharedPtr>& specified_rowsets,
955
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
956
        bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
957
0
        PartialUpdateStats& stats) {
958
0
    int32_t delete_sign_col_unique_id =
959
0
            _tablet_schema->column(_tablet_schema->delete_sign_idx()).unique_id();
960
0
    int32_t seq_col_unique_id =
961
0
            (_tablet_schema->has_sequence_col()
962
0
                     ? _tablet_schema->column(_tablet_schema->sequence_col_idx()).unique_id()
963
0
                     : -1);
964
0
    for (size_t block_pos = data.row_pos; block_pos < data.row_pos + data.num_rows; block_pos++) {
965
0
        size_t delta_pos = block_pos - data.row_pos;
966
0
        size_t segment_pos = segment_start_pos + delta_pos;
967
0
        auto& skip_bitmap = skip_bitmaps->at(block_pos);
968
969
0
        std::string key = _full_encode_keys(key_columns, delta_pos);
970
0
        _maybe_invalid_row_cache(key);
971
0
        bool row_has_sequence_col =
972
0
                (schema_has_sequence_col && !skip_bitmap.contains(seq_col_unique_id));
973
0
        if (row_has_sequence_col) {
974
0
            _encode_seq_column(seq_column, delta_pos, &key);
975
0
        }
976
977
        // mark key with delete sign as deleted.
978
0
        bool have_delete_sign =
979
0
                (!skip_bitmap.contains(delete_sign_col_unique_id) && delete_signs[block_pos] != 0);
980
981
0
        auto not_found_cb = [&]() {
982
0
            return _opts.rowset_ctx->partial_update_info->handle_new_key(
983
0
                    *_tablet_schema,
984
0
                    [&]() -> std::string {
985
0
                        return data.block->dump_one_line(block_pos,
986
0
                                                         cast_set<int>(_num_sort_key_columns));
987
0
                    },
988
0
                    &skip_bitmap);
989
0
        };
990
0
        auto update_read_plan = [&](const RowLocation& loc) {
991
0
            read_plan.prepare_to_read(loc, segment_pos, skip_bitmap);
992
0
        };
993
994
0
        RETURN_IF_ERROR(_probe_key_for_mow(std::move(key), segment_pos, row_has_sequence_col,
995
0
                                           have_delete_sign, specified_rowsets, segment_caches,
996
0
                                           has_default_or_nullable, use_default_or_null_flag,
997
0
                                           update_read_plan, not_found_cb, stats));
998
0
    }
999
0
    return Status::OK();
1000
0
}
1001
1002
14
Status VerticalSegmentWriter::batch_block(const Block* block, size_t row_pos, size_t num_rows) {
1003
14
    if (_opts.rowset_ctx->partial_update_info &&
1004
14
        _opts.rowset_ctx->partial_update_info->is_partial_update() &&
1005
14
        _opts.write_type == DataWriteType::TYPE_DIRECT &&
1006
14
        !_opts.rowset_ctx->is_transient_rowset_writer) {
1007
0
        if (_opts.rowset_ctx->partial_update_info->is_flexible_partial_update()) {
1008
0
            if (block->columns() != _tablet_schema->num_columns()) {
1009
0
                return Status::InvalidArgument(
1010
0
                        "illegal flexible partial update block columns, block columns = {}, "
1011
0
                        "tablet_schema columns = {}",
1012
0
                        block->dump_structure(), _tablet_schema->dump_structure());
1013
0
            }
1014
0
        } else {
1015
0
            if (block->columns() < _tablet_schema->num_key_columns() ||
1016
0
                block->columns() >= _tablet_schema->num_columns()) {
1017
0
                return Status::InvalidArgument(fmt::format(
1018
0
                        "illegal partial update block columns: {}, num key columns: {}, total "
1019
0
                        "schema columns: {}",
1020
0
                        block->columns(), _tablet_schema->num_key_columns(),
1021
0
                        _tablet_schema->num_columns()));
1022
0
            }
1023
0
        }
1024
14
    } else if (block->columns() != _tablet_schema->num_columns()) {
1025
0
        return Status::InvalidArgument(
1026
0
                "illegal block columns, block columns = {}, tablet_schema columns = {}",
1027
0
                block->dump_structure(), _tablet_schema->dump_structure());
1028
0
    }
1029
14
    _batched_blocks.emplace_back(block, row_pos, num_rows);
1030
14
    return Status::OK();
1031
14
}
1032
1033
14
Status VerticalSegmentWriter::write_batch() {
1034
14
    if (_opts.rowset_ctx->partial_update_info &&
1035
14
        _opts.rowset_ctx->partial_update_info->is_partial_update() &&
1036
14
        _opts.write_type == DataWriteType::TYPE_DIRECT &&
1037
14
        !_opts.rowset_ctx->is_transient_rowset_writer) {
1038
0
        bool is_flexible_partial_update =
1039
0
                _opts.rowset_ctx->partial_update_info->is_flexible_partial_update();
1040
0
        Block full_block;
1041
0
        for (auto& data : _batched_blocks) {
1042
0
            if (is_flexible_partial_update) {
1043
0
                RETURN_IF_ERROR(_append_block_with_flexible_partial_content(data, full_block));
1044
0
            } else {
1045
0
                RETURN_IF_ERROR(_append_block_with_partial_content(data, full_block));
1046
0
            }
1047
0
        }
1048
0
        return Status::OK();
1049
0
    }
1050
    // Row column should be filled here when it's a directly write from memtable
1051
    // or it's schema change write(since column data type maybe changed, so we should reubild)
1052
14
    bool should_write_row_store_column = _opts.write_type == DataWriteType::TYPE_DIRECT ||
1053
14
                                         _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE;
1054
14
    if (should_write_row_store_column) {
1055
79
        for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
1056
65
            if (!_tablet_schema->column(cid).is_row_store_column()) {
1057
65
                continue;
1058
65
            }
1059
0
            RETURN_IF_ERROR(
1060
0
                    _create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
1061
0
            for (auto& data : _batched_blocks) {
1062
0
                RETURN_IF_ERROR(
1063
0
                        _append_row_store_column(*data.block, data.row_pos, data.num_rows, cid));
1064
0
            }
1065
0
            RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
1066
0
            RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
1067
0
        }
1068
14
    }
1069
1070
14
    std::vector<uint32_t> column_ids;
1071
79
    for (uint32_t i = 0; i < _tablet_schema->num_columns(); ++i) {
1072
65
        column_ids.emplace_back(i);
1073
65
    }
1074
14
    if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
1075
14
        _tablet_schema->num_variant_columns() > 0) {
1076
0
        for (auto& data : _batched_blocks) {
1077
0
            RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
1078
0
                    const_cast<Block&>(*data.block), *_tablet_schema, column_ids));
1079
0
        }
1080
0
    }
1081
1082
14
    std::vector<IOlapColumnDataAccessor*> key_columns;
1083
14
    IOlapColumnDataAccessor* seq_column = nullptr;
1084
    // the key is cluster key column unique id
1085
14
    std::map<uint32_t, IOlapColumnDataAccessor*> cid_to_column;
1086
79
    for (uint32_t cid = 0; cid < _tablet_schema->num_columns(); ++cid) {
1087
65
        if (should_write_row_store_column && _tablet_schema->column(cid).is_row_store_column()) {
1088
0
            continue;
1089
0
        }
1090
65
        RETURN_IF_ERROR(_create_column_writer(cid, _tablet_schema->column(cid), _tablet_schema));
1091
65
        for (auto& data : _batched_blocks) {
1092
65
            RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
1093
65
                    data.block, data.row_pos, data.num_rows, std::vector<uint32_t> {cid}));
1094
1095
            // convert column data from engine format to storage layer format
1096
65
            auto [status, column] = _olap_data_convertor->convert_column_data(cid);
1097
65
            if (!status.ok()) {
1098
0
                return status;
1099
0
            }
1100
65
            if (cid < _tablet_schema->num_key_columns()) {
1101
33
                key_columns.push_back(column);
1102
33
            }
1103
65
            if (_tablet_schema->has_sequence_col() && cid == _tablet_schema->sequence_col_idx()) {
1104
7
                seq_column = column;
1105
7
            }
1106
65
            auto column_unique_id = _tablet_schema->column(cid).unique_id();
1107
65
            if (_is_mow_with_cluster_key() &&
1108
65
                std::find(_tablet_schema->cluster_key_uids().begin(),
1109
5
                          _tablet_schema->cluster_key_uids().end(),
1110
5
                          column_unique_id) != _tablet_schema->cluster_key_uids().end()) {
1111
2
                cid_to_column[column_unique_id] = column;
1112
2
            }
1113
65
            RETURN_IF_ERROR(_column_writers[cid]->append(column->get_nullmap(), column->get_data(),
1114
65
                                                         data.num_rows));
1115
65
            _olap_data_convertor->clear_source_content();
1116
65
        }
1117
65
        RETURN_IF_ERROR(_check_column_writer_disk_capacity(cid));
1118
65
        RETURN_IF_ERROR(_finalize_column_writer_and_update_meta(cid));
1119
65
    }
1120
1121
14
    for (auto& data : _batched_blocks) {
1122
14
        _olap_data_convertor->set_source_content(data.block, data.row_pos, data.num_rows);
1123
14
        RETURN_IF_ERROR(_generate_key_index(data, key_columns, seq_column, cid_to_column));
1124
14
        _olap_data_convertor->clear_source_content();
1125
14
        _num_rows_written += data.num_rows;
1126
14
    }
1127
1128
14
    _batched_blocks.clear();
1129
14
    return Status::OK();
1130
14
}
1131
1132
Status VerticalSegmentWriter::_generate_key_index(
1133
        RowsInBlock& data, std::vector<IOlapColumnDataAccessor*>& key_columns,
1134
        IOlapColumnDataAccessor* seq_column,
1135
14
        std::map<uint32_t, IOlapColumnDataAccessor*>& cid_to_column) {
1136
    // find all row pos for short key indexes
1137
14
    std::vector<size_t> short_key_pos;
1138
    // We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we
1139
    // build a short key index using 1st rows for first block and `_short_key_row_pos - _row_count`
1140
    // for next blocks.
1141
14
    if (_short_key_row_pos == 0 && _num_rows_written == 0) {
1142
14
        short_key_pos.push_back(0);
1143
14
    }
1144
14
    while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + data.num_rows) {
1145
0
        _short_key_row_pos += _opts.num_rows_per_block;
1146
0
        short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
1147
0
    }
1148
14
    if (_is_mow_with_cluster_key()) {
1149
        // 1. generate primary key index
1150
1
        RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns, seq_column,
1151
1
                                                    data.num_rows, true));
1152
        // 2. generate short key index (use cluster key)
1153
1
        std::vector<IOlapColumnDataAccessor*> short_key_columns;
1154
2
        for (const auto& cid : _tablet_schema->cluster_key_uids()) {
1155
2
            short_key_columns.push_back(cid_to_column[cid]);
1156
2
        }
1157
1
        RETURN_IF_ERROR(_generate_short_key_index(short_key_columns, data.num_rows, short_key_pos));
1158
13
    } else if (_is_mow()) {
1159
4
        RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
1160
4
                                                    data.num_rows, false));
1161
9
    } else { // other tables
1162
9
        RETURN_IF_ERROR(_generate_short_key_index(key_columns, data.num_rows, short_key_pos));
1163
9
    }
1164
14
    return Status::OK();
1165
14
}
1166
1167
Status VerticalSegmentWriter::_generate_primary_key_index(
1168
        const std::vector<const KeyCoder*>& primary_key_coders,
1169
        const std::vector<IOlapColumnDataAccessor*>& primary_key_columns,
1170
5
        IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) {
1171
5
    if (!need_sort) { // mow table without cluster key
1172
4
        std::string last_key;
1173
12
        for (size_t pos = 0; pos < num_rows; pos++) {
1174
            // use _key_coders
1175
8
            std::string key = _full_encode_keys(primary_key_columns, pos);
1176
8
            _maybe_invalid_row_cache(key);
1177
8
            if (_tablet_schema->has_sequence_col()) {
1178
4
                _encode_seq_column(seq_column, pos, &key);
1179
4
            }
1180
8
            DCHECK(key.compare(last_key) > 0)
1181
0
                    << "found duplicate key or key is not sorted! current key: " << key
1182
0
                    << ", last key: " << last_key;
1183
8
            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
1184
8
            last_key = std::move(key);
1185
8
        }
1186
4
    } else { // mow table with cluster key
1187
        // 1. generate primary keys in memory
1188
1
        std::vector<std::string> primary_keys;
1189
5
        for (uint32_t pos = 0; pos < num_rows; pos++) {
1190
4
            std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos);
1191
4
            _maybe_invalid_row_cache(key);
1192
4
            if (_tablet_schema->has_sequence_col()) {
1193
4
                _encode_seq_column(seq_column, pos, &key);
1194
4
            }
1195
4
            _encode_rowid(pos, &key);
1196
4
            primary_keys.emplace_back(std::move(key));
1197
4
        }
1198
        // 2. sort primary keys
1199
1
        std::sort(primary_keys.begin(), primary_keys.end());
1200
        // 3. write primary keys index
1201
1
        std::string last_key;
1202
4
        for (const auto& key : primary_keys) {
1203
4
            DCHECK(key.compare(last_key) > 0)
1204
0
                    << "found duplicate key or key is not sorted! current key: " << key
1205
0
                    << ", last key: " << last_key;
1206
4
            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
1207
4
            last_key = key;
1208
4
        }
1209
1
    }
1210
5
    return Status::OK();
1211
5
}
1212
1213
Status VerticalSegmentWriter::_generate_short_key_index(
1214
        std::vector<IOlapColumnDataAccessor*>& key_columns, size_t num_rows,
1215
10
        const std::vector<size_t>& short_key_pos) {
1216
    // use _key_coders
1217
10
    _set_min_key(_full_encode_keys(key_columns, 0));
1218
10
    _set_max_key(_full_encode_keys(key_columns, num_rows - 1));
1219
10
    DCHECK(Slice(_max_key.data(), _max_key.size())
1220
0
                   .compare(Slice(_min_key.data(), _min_key.size())) >= 0)
1221
0
            << "key is not sorted! min key: " << _min_key << ", max key: " << _max_key;
1222
1223
10
    key_columns.resize(_num_short_key_columns);
1224
10
    std::string last_key;
1225
10
    for (const auto pos : short_key_pos) {
1226
10
        std::string key = _encode_keys(key_columns, pos);
1227
10
        DCHECK(key.compare(last_key) >= 0)
1228
0
                << "key is not sorted! current key: " << key << ", last key: " << last_key;
1229
10
        RETURN_IF_ERROR(_short_key_index_builder->add_item(key));
1230
10
        last_key = std::move(key);
1231
10
    }
1232
10
    return Status::OK();
1233
10
}
1234
1235
4
void VerticalSegmentWriter::_encode_rowid(const uint32_t rowid, std::string* encoded_keys) {
1236
4
    encoded_keys->push_back(KEY_NORMAL_MARKER);
1237
4
    _rowid_coder->full_encode_ascending(&rowid, encoded_keys);
1238
4
}
1239
1240
std::string VerticalSegmentWriter::_full_encode_keys(
1241
28
        const std::vector<IOlapColumnDataAccessor*>& key_columns, size_t pos) {
1242
28
    assert(_key_index_size.size() == _num_sort_key_columns);
1243
28
    if (!(key_columns.size() == _num_sort_key_columns &&
1244
28
          _key_coders.size() == _num_sort_key_columns)) {
1245
0
        LOG_INFO("key_columns.size()={}, _key_coders.size()={}, _num_sort_key_columns={}, ",
1246
0
                 key_columns.size(), _key_coders.size(), _num_sort_key_columns);
1247
0
    }
1248
28
    assert(key_columns.size() == _num_sort_key_columns &&
1249
28
           _key_coders.size() == _num_sort_key_columns);
1250
28
    return _full_encode_keys(_key_coders, key_columns, pos);
1251
28
}
1252
1253
std::string VerticalSegmentWriter::_full_encode_keys(
1254
        const std::vector<const KeyCoder*>& key_coders,
1255
32
        const std::vector<IOlapColumnDataAccessor*>& key_columns, size_t pos) {
1256
32
    assert(key_columns.size() == key_coders.size());
1257
1258
32
    std::string encoded_keys;
1259
32
    size_t cid = 0;
1260
74
    for (const auto& column : key_columns) {
1261
74
        auto field = column->get_data_at(pos);
1262
74
        if (UNLIKELY(!field)) {
1263
0
            encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
1264
0
            ++cid;
1265
0
            continue;
1266
0
        }
1267
74
        encoded_keys.push_back(KEY_NORMAL_MARKER);
1268
74
        DCHECK(key_coders[cid] != nullptr);
1269
74
        key_coders[cid]->full_encode_ascending(field, &encoded_keys);
1270
74
        ++cid;
1271
74
    }
1272
32
    return encoded_keys;
1273
32
}
1274
1275
void VerticalSegmentWriter::_encode_seq_column(const IOlapColumnDataAccessor* seq_column,
1276
8
                                               size_t pos, std::string* encoded_keys) {
1277
8
    const auto* field = seq_column->get_data_at(pos);
1278
    // To facilitate the use of the primary key index, encode the seq column
1279
    // to the minimum value of the corresponding length when the seq column
1280
    // is null
1281
8
    if (UNLIKELY(!field)) {
1282
0
        encoded_keys->push_back(KEY_NULL_FIRST_MARKER);
1283
0
        size_t seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length();
1284
0
        encoded_keys->append(seq_col_length, KEY_MINIMAL_MARKER);
1285
0
        return;
1286
0
    }
1287
8
    encoded_keys->push_back(KEY_NORMAL_MARKER);
1288
8
    _seq_coder->full_encode_ascending(field, encoded_keys);
1289
8
}
1290
1291
std::string VerticalSegmentWriter::_encode_keys(
1292
10
        const std::vector<IOlapColumnDataAccessor*>& key_columns, size_t pos) {
1293
10
    assert(key_columns.size() == _num_short_key_columns);
1294
1295
10
    std::string encoded_keys;
1296
10
    size_t cid = 0;
1297
22
    for (const auto& column : key_columns) {
1298
22
        auto field = column->get_data_at(pos);
1299
22
        if (UNLIKELY(!field)) {
1300
0
            encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
1301
0
            ++cid;
1302
0
            continue;
1303
0
        }
1304
22
        encoded_keys.push_back(KEY_NORMAL_MARKER);
1305
22
        _key_coders[cid]->encode_ascending(field, _key_index_size[cid], &encoded_keys);
1306
22
        ++cid;
1307
22
    }
1308
10
    return encoded_keys;
1309
10
}
1310
1311
// TODO(lingbin): Currently this function does not include the size of various indexes,
1312
// We should make this more precise.
1313
14
uint64_t VerticalSegmentWriter::_estimated_remaining_size() {
1314
    // footer_size(4) + checksum(4) + segment_magic(4)
1315
14
    uint64_t size = 12;
1316
14
    if (_is_mow_with_cluster_key()) {
1317
1
        size += _primary_key_index_builder->size() + _short_key_index_builder->size();
1318
13
    } else if (_is_mow()) {
1319
4
        size += _primary_key_index_builder->size();
1320
9
    } else {
1321
9
        size += _short_key_index_builder->size();
1322
9
    }
1323
1324
    // update the mem_tracker of segment size
1325
14
    _mem_tracker->consume(size - _mem_tracker->consumption());
1326
14
    return size;
1327
14
}
1328
1329
14
Status VerticalSegmentWriter::finalize_columns_index(uint64_t* index_size) {
1330
14
    uint64_t index_start = _file_writer->bytes_appended();
1331
14
    RETURN_IF_ERROR(_write_ordinal_index());
1332
14
    RETURN_IF_ERROR(_write_zone_map());
1333
14
    RETURN_IF_ERROR(_write_inverted_index());
1334
14
    RETURN_IF_ERROR(_write_ann_index());
1335
14
    RETURN_IF_ERROR(_write_bloom_filter_index());
1336
1337
14
    *index_size = _file_writer->bytes_appended() - index_start;
1338
14
    if (_is_mow_with_cluster_key()) {
1339
1
        RETURN_IF_ERROR(_write_short_key_index());
1340
1
        *index_size = _file_writer->bytes_appended() - index_start;
1341
1
        RETURN_IF_ERROR(_write_primary_key_index());
1342
1
        *index_size += _primary_key_index_builder->disk_size();
1343
13
    } else if (_is_mow()) {
1344
4
        RETURN_IF_ERROR(_write_primary_key_index());
1345
        // IndexedColumnWriter write data pages mixed with segment data, we should use
1346
        // the stat from primary key index builder.
1347
4
        *index_size += _primary_key_index_builder->disk_size();
1348
9
    } else {
1349
9
        RETURN_IF_ERROR(_write_short_key_index());
1350
9
        *index_size = _file_writer->bytes_appended() - index_start;
1351
9
    }
1352
1353
    // reset all column writers and data_conveter
1354
14
    clear();
1355
1356
14
    return Status::OK();
1357
14
}
1358
1359
14
Status VerticalSegmentWriter::finalize_footer(uint64_t* segment_file_size) {
1360
14
    RETURN_IF_ERROR(_write_footer());
1361
    // finish
1362
14
    RETURN_IF_ERROR(_file_writer->close(true));
1363
14
    *segment_file_size = _file_writer->bytes_appended();
1364
14
    if (*segment_file_size == 0) {
1365
0
        return Status::Corruption("Bad segment, file size = 0");
1366
0
    }
1367
14
    return Status::OK();
1368
14
}
1369
1370
14
Status VerticalSegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
1371
14
    MonotonicStopWatch timer;
1372
14
    timer.start();
1373
    // check disk capacity
1374
14
    if (_data_dir != nullptr &&
1375
14
        _data_dir->reach_capacity_limit((int64_t)_estimated_remaining_size())) {
1376
0
        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
1377
0
                                                        _data_dir->path_hash());
1378
0
    }
1379
14
    _row_count = _num_rows_written;
1380
14
    _num_rows_written = 0;
1381
    // write index
1382
14
    RETURN_IF_ERROR(finalize_columns_index(index_size));
1383
    // write footer
1384
14
    RETURN_IF_ERROR(finalize_footer(segment_file_size));
1385
1386
14
    if (timer.elapsed_time() > 5000000000L) {
1387
0
        LOG(INFO) << "segment flush consumes a lot time_ns " << timer.elapsed_time()
1388
0
                  << ", segmemt_size " << *segment_file_size;
1389
0
    }
1390
14
    return Status::OK();
1391
14
}
1392
1393
14
void VerticalSegmentWriter::clear() {
1394
65
    for (auto& column_writer : _column_writers) {
1395
65
        column_writer.reset();
1396
65
    }
1397
14
    _column_writers.clear();
1398
14
    _olap_data_convertor.reset();
1399
14
}
1400
1401
// write ordinal index after data has been written
1402
14
Status VerticalSegmentWriter::_write_ordinal_index() {
1403
65
    for (auto& column_writer : _column_writers) {
1404
65
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
1405
65
    }
1406
14
    return Status::OK();
1407
14
}
1408
1409
14
Status VerticalSegmentWriter::_write_zone_map() {
1410
65
    for (auto& column_writer : _column_writers) {
1411
65
        RETURN_IF_ERROR(column_writer->write_zone_map());
1412
65
    }
1413
14
    return Status::OK();
1414
14
}
1415
1416
14
Status VerticalSegmentWriter::_write_inverted_index() {
1417
65
    for (auto& column_writer : _column_writers) {
1418
65
        RETURN_IF_ERROR(column_writer->write_inverted_index());
1419
65
    }
1420
14
    return Status::OK();
1421
14
}
1422
1423
14
Status VerticalSegmentWriter::_write_ann_index() {
1424
65
    for (auto& column_writer : _column_writers) {
1425
65
        RETURN_IF_ERROR(column_writer->write_ann_index());
1426
65
    }
1427
14
    return Status::OK();
1428
14
}
1429
1430
14
Status VerticalSegmentWriter::_write_bloom_filter_index() {
1431
65
    for (auto& column_writer : _column_writers) {
1432
65
        RETURN_IF_ERROR(column_writer->write_bloom_filter_index());
1433
65
    }
1434
14
    return Status::OK();
1435
14
}
1436
1437
10
Status VerticalSegmentWriter::_write_short_key_index() {
1438
10
    std::vector<Slice> body;
1439
10
    PageFooterPB footer;
1440
10
    RETURN_IF_ERROR(_short_key_index_builder->finalize(_row_count, &body, &footer));
1441
10
    PagePointer pp;
1442
    // short key index page is not compressed right now
1443
10
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, body, footer, &pp));
1444
10
    pp.to_proto(_footer.mutable_short_key_index_page());
1445
10
    return Status::OK();
1446
10
}
1447
1448
5
Status VerticalSegmentWriter::_write_primary_key_index() {
1449
5
    CHECK_EQ(_primary_key_index_builder->num_rows(), _row_count);
1450
5
    return _primary_key_index_builder->finalize(_footer.mutable_primary_key_index_meta());
1451
5
}
1452
1453
14
Status VerticalSegmentWriter::_write_footer() {
1454
14
    _footer.set_num_rows(_row_count);
1455
1456
    // Decide whether to externalize ColumnMetaPB by tablet default, and stamp footer version
1457
1458
14
    if (_tablet_schema->is_external_segment_column_meta_used()) {
1459
0
        _footer.set_version(SEGMENT_FOOTER_VERSION_V3_EXT_COL_META);
1460
0
        VLOG_DEBUG << "use external column meta";
1461
        // External ColumnMetaPB writing (optional)
1462
0
        RETURN_IF_ERROR(ExternalColMetaUtil::write_external_column_meta(
1463
0
                _file_writer, &_footer, _opts.compression_type,
1464
0
                [this](const std::vector<Slice>& slices) { return _write_raw_data(slices); }));
1465
0
    }
1466
1467
    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
1468
14
    VLOG_DEBUG << "footer " << _footer.DebugString();
1469
14
    std::string footer_buf;
1470
14
    if (!_footer.SerializeToString(&footer_buf)) {
1471
0
        return Status::InternalError("failed to serialize segment footer");
1472
0
    }
1473
1474
14
    faststring fixed_buf;
1475
    // footer's size
1476
14
    put_fixed32_le(&fixed_buf, cast_set<uint32_t>(footer_buf.size()));
1477
    // footer's checksum
1478
14
    uint32_t checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
1479
14
    put_fixed32_le(&fixed_buf, checksum);
1480
    // Append magic number. we don't write magic number in the header because
1481
    // that will need an extra seek when reading
1482
14
    fixed_buf.append(k_segment_magic, k_segment_magic_length);
1483
1484
14
    std::vector<Slice> slices {footer_buf, fixed_buf};
1485
14
    return _write_raw_data(slices);
1486
14
}
1487
1488
14
Status VerticalSegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
1489
14
    RETURN_IF_ERROR(_file_writer->appendv(&slices[0], slices.size()));
1490
14
    return Status::OK();
1491
14
}
1492
1493
14
Slice VerticalSegmentWriter::min_encoded_key() {
1494
14
    return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size())
1495
14
                                                   : _primary_key_index_builder->min_key();
1496
14
}
1497
14
Slice VerticalSegmentWriter::max_encoded_key() {
1498
14
    return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size())
1499
14
                                                   : _primary_key_index_builder->max_key();
1500
14
}
1501
1502
0
void VerticalSegmentWriter::_set_min_max_key(const Slice& key) {
1503
0
    if (UNLIKELY(_is_first_row)) {
1504
0
        _min_key.append(key.get_data(), key.get_size());
1505
0
        _is_first_row = false;
1506
0
    }
1507
0
    if (key.compare(_max_key) > 0) {
1508
0
        _max_key.clear();
1509
0
        _max_key.append(key.get_data(), key.get_size());
1510
0
    }
1511
0
}
1512
1513
10
void VerticalSegmentWriter::_set_min_key(const Slice& key) {
1514
10
    if (UNLIKELY(_is_first_row)) {
1515
10
        _min_key.append(key.get_data(), key.get_size());
1516
10
        _is_first_row = false;
1517
10
    }
1518
10
}
1519
1520
10
void VerticalSegmentWriter::_set_max_key(const Slice& key) {
1521
10
    _max_key.clear();
1522
10
    _max_key.append(key.get_data(), key.get_size());
1523
10
}
1524
1525
198
inline bool VerticalSegmentWriter::_is_mow() {
1526
198
    return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write;
1527
198
}
1528
1529
131
inline bool VerticalSegmentWriter::_is_mow_with_cluster_key() {
1530
131
    return _is_mow() && !_tablet_schema->cluster_key_uids().empty();
1531
131
}
1532
1533
} // namespace doris::segment_v2