Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_writer.h"
19
20
#include <assert.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <parallel_hashmap/phmap.h>
23
24
#include <algorithm>
25
26
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
27
#include <crc32c/crc32c.h>
28
29
#include "cloud/config.h"
30
#include "common/cast_set.h"
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
#include "common/config.h"
33
#include "common/logging.h" // LOG
34
#include "common/status.h"
35
#include "core/block/block.h"
36
#include "core/block/column_with_type_and_name.h"
37
#include "core/column/column_nullable.h"
38
#include "core/types.h"
39
#include "core/value/vdatetime_value.h"
40
#include "exec/common/variant_util.h"
41
#include "io/cache/block_file_cache.h"
42
#include "io/cache/block_file_cache_factory.h"
43
#include "io/fs/file_system.h"
44
#include "io/fs/file_writer.h"
45
#include "io/fs/local_file_system.h"
46
#include "runtime/exec_env.h"
47
#include "runtime/memory/mem_tracker.h"
48
#include "service/point_query_executor.h"
49
#include "storage/data_dir.h"
50
#include "storage/index/index_file_writer.h"
51
#include "storage/index/index_writer.h"
52
#include "storage/index/inverted/inverted_index_fs_directory.h"
53
#include "storage/index/primary_key_index.h"
54
#include "storage/index/short_key_index.h"
55
#include "storage/iterator/olap_data_convertor.h"
56
#include "storage/key_coder.h"
57
#include "storage/olap_common.h"
58
#include "storage/olap_define.h"
59
#include "storage/partial_update_info.h"
60
#include "storage/row_cursor.h"                   // RowCursor // IWYU pragma: keep
61
#include "storage/rowset/rowset_writer_context.h" // RowsetWriterContext
62
#include "storage/rowset/segment_creator.h"
63
#include "storage/segment/column_writer.h" // ColumnWriter
64
#include "storage/segment/external_col_meta_util.h"
65
#include "storage/segment/page_io.h"
66
#include "storage/segment/page_pointer.h"
67
#include "storage/segment/segment_loader.h"
68
#include "storage/segment/variant/variant_ext_meta_writer.h"
69
#include "storage/segment/variant_stats_calculator.h"
70
#include "storage/storage_engine.h"
71
#include "storage/tablet/tablet_schema.h"
72
#include "storage/utils.h"
73
#include "util/coding.h"
74
#include "util/faststring.h"
75
#include "util/jsonb/serialize.h"
76
#include "util/simd/bits.h"
77
namespace doris {
78
namespace segment_v2 {
79
#include "common/compile_check_begin.h"
80
81
using namespace ErrorCode;
82
using namespace KeyConsts;
83
84
const char* k_segment_magic = "D0R1";
85
const uint32_t k_segment_magic_length = 4;
86
87
5.25k
inline std::string segment_mem_tracker_name(uint32_t segment_id) {
88
5.25k
    return "SegmentWriter:Segment-" + std::to_string(segment_id);
89
5.25k
}
90
91
SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id,
92
                             TabletSchemaSPtr tablet_schema, BaseTabletSPtr tablet,
93
                             DataDir* data_dir, const SegmentWriterOptions& opts,
94
                             IndexFileWriter* index_file_writer)
95
5.25k
        : _segment_id(segment_id),
96
5.25k
          _tablet_schema(std::move(tablet_schema)),
97
5.25k
          _tablet(std::move(tablet)),
98
5.25k
          _data_dir(data_dir),
99
5.25k
          _opts(opts),
100
5.25k
          _file_writer(file_writer),
101
5.25k
          _index_file_writer(index_file_writer),
102
5.25k
          _mem_tracker(std::make_unique<MemTracker>(segment_mem_tracker_name(segment_id))),
103
5.25k
          _mow_context(std::move(opts.mow_ctx)) {
104
5.25k
    CHECK_NOTNULL(file_writer);
105
5.25k
    _num_sort_key_columns = _tablet_schema->num_key_columns();
106
5.25k
    _num_short_key_columns = _tablet_schema->num_short_key_columns();
107
5.25k
    if (!_is_mow_with_cluster_key()) {
108
5.25k
        DCHECK(_num_sort_key_columns >= _num_short_key_columns)
109
0
                << ", table_id=" << _tablet_schema->table_id()
110
0
                << ", num_key_columns=" << _num_sort_key_columns
111
0
                << ", num_short_key_columns=" << _num_short_key_columns
112
0
                << ", cluster_key_columns=" << _tablet_schema->cluster_key_uids().size();
113
5.25k
    }
114
8.88k
    for (size_t cid = 0; cid < _num_sort_key_columns; ++cid) {
115
3.62k
        const auto& column = _tablet_schema->column(cid);
116
3.62k
        _key_coders.push_back(get_key_coder(column.type()));
117
3.62k
        _key_index_size.push_back(cast_set<uint16_t>(column.index_length()));
118
3.62k
    }
119
5.25k
    if (_is_mow()) {
120
        // encode the sequence id into the primary key index
121
69
        if (_tablet_schema->has_sequence_col()) {
122
26
            const auto& column = _tablet_schema->column(_tablet_schema->sequence_col_idx());
123
26
            _seq_coder = get_key_coder(column.type());
124
26
        }
125
        // encode the rowid into the primary key index
126
69
        if (_is_mow_with_cluster_key()) {
127
0
            const auto* type_info = get_scalar_type_info<FieldType::OLAP_FIELD_TYPE_UNSIGNED_INT>();
128
0
            _rowid_coder = get_key_coder(type_info->type());
129
            // primary keys
130
0
            _primary_key_coders.swap(_key_coders);
131
            // cluster keys
132
0
            _key_coders.clear();
133
0
            _key_index_size.clear();
134
0
            _num_sort_key_columns = _tablet_schema->cluster_key_uids().size();
135
0
            for (auto cid : _tablet_schema->cluster_key_uids()) {
136
0
                const auto& column = _tablet_schema->column_by_uid(cid);
137
0
                _key_coders.push_back(get_key_coder(column.type()));
138
0
                _key_index_size.push_back(cast_set<uint16_t>(column.index_length()));
139
0
            }
140
0
        }
141
69
    }
142
5.25k
}
143
144
5.25k
SegmentWriter::~SegmentWriter() {
145
5.25k
    _mem_tracker->release(_mem_tracker->consumption());
146
5.25k
}
147
148
void SegmentWriter::init_column_meta(ColumnMetaPB* meta, uint32_t column_id,
149
15.9k
                                     const TabletColumn& column, TabletSchemaSPtr tablet_schema) {
150
15.9k
    meta->set_column_id(column_id);
151
15.9k
    meta->set_type(int(column.type()));
152
15.9k
    meta->set_length(column.length());
153
15.9k
    meta->set_encoding(DEFAULT_ENCODING);
154
15.9k
    meta->set_compression(_opts.compression_type);
155
15.9k
    meta->set_is_nullable(column.is_nullable());
156
15.9k
    meta->set_default_value(column.default_value());
157
15.9k
    meta->set_precision(column.precision());
158
15.9k
    meta->set_frac(column.frac());
159
15.9k
    if (column.has_path_info()) {
160
294
        column.path_info_ptr()->to_protobuf(meta->mutable_column_path_info(),
161
294
                                            column.parent_unique_id());
162
294
    }
163
15.9k
    meta->set_unique_id(column.unique_id());
164
15.9k
    for (uint32_t i = 0; i < column.get_subtype_count(); ++i) {
165
13
        init_column_meta(meta->add_children_columns(), column_id, column.get_sub_column(i),
166
13
                         tablet_schema);
167
13
    }
168
15.9k
    meta->set_result_is_nullable(column.get_result_is_nullable());
169
15.9k
    meta->set_function_name(column.get_aggregation_name());
170
15.9k
    meta->set_be_exec_version(column.get_be_exec_version());
171
15.9k
    if (column.is_variant_type()) {
172
294
        meta->set_variant_max_subcolumns_count(column.variant_max_subcolumns_count());
173
294
    }
174
15.9k
}
175
176
1.24k
Status SegmentWriter::init() {
177
1.24k
    std::vector<uint32_t> column_ids;
178
1.24k
    auto column_cnt = cast_set<int>(_tablet_schema->num_columns());
179
8.09k
    for (uint32_t i = 0; i < column_cnt; ++i) {
180
6.84k
        column_ids.emplace_back(i);
181
6.84k
    }
182
1.24k
    return init(column_ids, true);
183
1.24k
}
184
185
Status SegmentWriter::_create_column_writer(uint32_t cid, const TabletColumn& column,
186
15.9k
                                            const TabletSchemaSPtr& schema) {
187
15.9k
    ColumnWriterOptions opts;
188
15.9k
    opts.meta = _footer.add_columns();
189
190
15.9k
    init_column_meta(opts.meta, cid, column, schema);
191
192
    // now we create zone map for key columns in AGG_KEYS or all column in UNIQUE_KEYS or DUP_KEYS
193
    // except for columns whose type don't support zone map.
194
15.9k
    opts.need_zone_map = column.is_key() || schema->keys_type() != KeysType::AGG_KEYS;
195
15.9k
    opts.need_bloom_filter = column.is_bf_column();
196
15.9k
    if (opts.need_bloom_filter) {
197
4
        opts.bf_options.fpp = schema->has_bf_fpp() ? schema->bloom_filter_fpp() : 0.05;
198
4
    }
199
15.9k
    auto* tablet_index = schema->get_ngram_bf_index(column.unique_id());
200
15.9k
    if (tablet_index) {
201
0
        opts.need_bloom_filter = true;
202
0
        opts.is_ngram_bf_index = true;
203
        //narrow convert from int32_t to uint8_t and uint16_t which is dangerous
204
0
        auto gram_size = tablet_index->get_gram_size();
205
0
        auto gram_bf_size = tablet_index->get_gram_bf_size();
206
0
        if (gram_size > 256 || gram_size < 1) {
207
0
            return Status::NotSupported("Do not support ngram bloom filter for ngram_size: ",
208
0
                                        gram_size);
209
0
        }
210
0
        if (gram_bf_size > 65535 || gram_bf_size < 64) {
211
0
            return Status::NotSupported("Do not support ngram bloom filter for bf_size: ",
212
0
                                        gram_bf_size);
213
0
        }
214
0
        opts.gram_size = cast_set<uint8_t>(gram_size);
215
0
        opts.gram_bf_size = cast_set<uint16_t>(gram_bf_size);
216
0
    }
217
218
15.9k
    bool skip_inverted_index = false;
219
15.9k
    if (_opts.rowset_ctx != nullptr) {
220
        // skip write inverted index for index compaction column
221
13.7k
        skip_inverted_index =
222
13.7k
                _opts.rowset_ctx->columns_to_do_index_compaction.count(column.unique_id()) > 0;
223
13.7k
    }
224
    // skip write inverted index on load if skip_write_index_on_load is true
225
15.9k
    if (_opts.write_type == DataWriteType::TYPE_DIRECT && schema->skip_write_index_on_load()) {
226
0
        skip_inverted_index = true;
227
0
    }
228
    // indexes for this column
229
15.9k
    if (!skip_inverted_index) {
230
15.5k
        auto inverted_indexs = schema->inverted_indexs(column);
231
15.5k
        if (!inverted_indexs.empty()) {
232
2.18k
            opts.inverted_indexes = inverted_indexs;
233
2.18k
            opts.need_inverted_index = true;
234
2.18k
            DCHECK(_index_file_writer != nullptr);
235
2.18k
        }
236
15.5k
    }
237
    // indexes for this column
238
15.9k
    if (const auto& index = schema->ann_index(column); index != nullptr) {
239
1
        opts.ann_index = index;
240
1
        opts.need_ann_index = true;
241
1
        DCHECK(_index_file_writer != nullptr);
242
1
    }
243
244
15.9k
    opts.index_file_writer = _index_file_writer;
245
246
15.9k
#define DISABLE_INDEX_IF_FIELD_TYPE(TYPE)                     \
247
143k
    if (column.type() == FieldType::OLAP_FIELD_TYPE_##TYPE) { \
248
297
        opts.need_zone_map = false;                           \
249
297
        opts.need_bloom_filter = false;                       \
250
297
    }
251
252
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(STRUCT)
253
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(ARRAY)
254
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(JSONB)
255
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(AGG_STATE)
256
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(MAP)
257
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(BITMAP)
258
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(HLL)
259
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(QUANTILE_STATE)
260
15.9k
    DISABLE_INDEX_IF_FIELD_TYPE(VARIANT)
261
262
15.9k
#undef DISABLE_INDEX_IF_FIELD_TYPE
263
264
15.9k
    int64_t storage_page_size = _tablet_schema->storage_page_size();
265
    // storage_page_size must be between 4KB and 10MB.
266
15.9k
    if (storage_page_size >= 4096 && storage_page_size <= 10485760) {
267
15.9k
        opts.data_page_size = storage_page_size;
268
15.9k
    }
269
15.9k
    opts.dict_page_size = _tablet_schema->storage_dict_page_size();
270
15.9k
    DBUG_EXECUTE_IF("VerticalSegmentWriter._create_column_writer.storage_page_size", {
271
15.9k
        auto table_id = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
272
15.9k
                "VerticalSegmentWriter._create_column_writer.storage_page_size", "table_id",
273
15.9k
                INT_MIN);
274
15.9k
        auto target_data_page_size = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
275
15.9k
                "VerticalSegmentWriter._create_column_writer.storage_page_size",
276
15.9k
                "storage_page_size", INT_MIN);
277
15.9k
        if (table_id == INT_MIN || target_data_page_size == INT_MIN) {
278
15.9k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
279
15.9k
                    "Debug point parameters missing: either 'table_id' or 'storage_page_size' not "
280
15.9k
                    "set.");
281
15.9k
        }
282
15.9k
        if (table_id == _tablet_schema->table_id() &&
283
15.9k
            opts.data_page_size != target_data_page_size) {
284
15.9k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
285
15.9k
                    "Mismatch in 'storage_page_size': expected size does not match the current "
286
15.9k
                    "data page size. "
287
15.9k
                    "Expected: " +
288
15.9k
                    std::to_string(target_data_page_size) +
289
15.9k
                    ", Actual: " + std::to_string(opts.data_page_size) + ".");
290
15.9k
        }
291
15.9k
    })
292
15.9k
    if (column.is_row_store_column()) {
293
        // smaller page size for row store column
294
0
        auto page_size = _tablet_schema->row_store_page_size();
295
0
        opts.data_page_size =
296
0
                (page_size > 0) ? page_size : segment_v2::ROW_STORE_PAGE_SIZE_DEFAULT_VALUE;
297
0
    }
298
299
15.9k
    opts.rowset_ctx = _opts.rowset_ctx;
300
15.9k
    opts.file_writer = _file_writer;
301
15.9k
    opts.compression_type = _opts.compression_type;
302
15.9k
    opts.footer = &_footer;
303
15.9k
    if (_opts.rowset_ctx != nullptr) {
304
13.7k
        opts.input_rs_readers = _opts.rowset_ctx->input_rs_readers;
305
13.7k
    }
306
15.9k
    opts.encoding_preference = {.integer_type_default_use_plain_encoding =
307
15.9k
                                        _tablet_schema->integer_type_default_use_plain_encoding(),
308
15.9k
                                .binary_plain_encoding_default_impl =
309
15.9k
                                        _tablet_schema->binary_plain_encoding_default_impl()};
310
311
15.9k
    std::unique_ptr<ColumnWriter> writer;
312
15.9k
    RETURN_IF_ERROR(ColumnWriter::create(opts, &column, _file_writer, &writer));
313
15.9k
    RETURN_IF_ERROR(writer->init());
314
15.9k
    _column_writers.push_back(std::move(writer));
315
316
15.9k
    _olap_data_convertor->add_column_data_convertor(column);
317
15.9k
    return Status::OK();
318
15.9k
}
319
320
7.75k
Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
321
7.75k
    DCHECK(_column_writers.empty());
322
7.75k
    DCHECK(_column_ids.empty());
323
7.75k
    _has_key = has_key;
324
7.75k
    _column_writers.reserve(_tablet_schema->columns().size());
325
7.75k
    _column_ids.insert(_column_ids.end(), col_ids.begin(), col_ids.end());
326
7.75k
    _olap_data_convertor = std::make_unique<OlapBlockDataConvertor>();
327
7.75k
    if (_opts.compression_type == UNKNOWN_COMPRESSION) {
328
5.25k
        _opts.compression_type = _tablet_schema->compression_type();
329
5.25k
    }
330
331
7.75k
    RETURN_IF_ERROR(_create_writers(_tablet_schema, col_ids));
332
333
    // Initialize variant statistics calculator
334
7.75k
    _variant_stats_calculator =
335
7.75k
            std::make_unique<VariantStatsCaculator>(&_footer, _tablet_schema, col_ids);
336
337
    // we don't need the short key index for unique key merge on write table.
338
7.75k
    if (_has_key) {
339
5.25k
        if (_is_mow()) {
340
69
            size_t seq_col_length = 0;
341
69
            if (_tablet_schema->has_sequence_col()) {
342
26
                seq_col_length =
343
26
                        _tablet_schema->column(_tablet_schema->sequence_col_idx()).length() + 1;
344
26
            }
345
69
            size_t rowid_length = 0;
346
69
            if (_is_mow_with_cluster_key()) {
347
0
                rowid_length = PrimaryKeyIndexReader::ROW_ID_LENGTH;
348
0
                _short_key_index_builder.reset(
349
0
                        new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
350
0
            }
351
69
            _primary_key_index_builder.reset(
352
69
                    new PrimaryKeyIndexBuilder(_file_writer, seq_col_length, rowid_length));
353
69
            RETURN_IF_ERROR(_primary_key_index_builder->init());
354
5.18k
        } else {
355
5.18k
            _short_key_index_builder.reset(
356
5.18k
                    new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
357
5.18k
        }
358
5.25k
    }
359
7.75k
    return Status::OK();
360
7.75k
}
361
362
Status SegmentWriter::_create_writers(const TabletSchemaSPtr& tablet_schema,
363
7.75k
                                      const std::vector<uint32_t>& col_ids) {
364
7.75k
    _olap_data_convertor->reserve(col_ids.size());
365
15.9k
    for (auto& cid : col_ids) {
366
15.9k
        RETURN_IF_ERROR(_create_column_writer(cid, tablet_schema->column(cid), tablet_schema));
367
15.9k
    }
368
7.75k
    return Status::OK();
369
7.75k
}
370
371
800
void SegmentWriter::_maybe_invalid_row_cache(const std::string& key) {
372
    // Just invalid row cache for simplicity, since the rowset is not visible at present.
373
    // If we update/insert cache, if load failed rowset will not be visible but cached data
374
    // will be visible, and lead to inconsistency.
375
800
    if (!config::disable_storage_row_cache && _tablet_schema->has_row_store_for_all_columns() &&
376
800
        _opts.write_type == DataWriteType::TYPE_DIRECT) {
377
        // invalidate cache
378
0
        RowCache::instance()->erase({_opts.rowset_ctx->tablet_id, key});
379
0
    }
380
800
}
381
382
0
void SegmentWriter::_serialize_block_to_row_column(const Block& block) {
383
0
    if (block.rows() == 0) {
384
0
        return;
385
0
    }
386
0
    MonotonicStopWatch watch;
387
0
    watch.start();
388
0
    int row_column_id = 0;
389
0
    for (int i = 0; i < _tablet_schema->num_columns(); ++i) {
390
0
        if (_tablet_schema->column(i).is_row_store_column()) {
391
0
            auto* row_store_column = static_cast<ColumnString*>(
392
0
                    block.get_by_position(i).column->assume_mutable_ref().assume_mutable().get());
393
0
            row_store_column->clear();
394
0
            DataTypeSerDeSPtrs serdes = create_data_type_serdes(block.get_data_types());
395
0
            JsonbSerializeUtil::block_to_jsonb(*_tablet_schema, block, *row_store_column,
396
0
                                               cast_set<int>(_tablet_schema->num_columns()), serdes,
397
0
                                               {_tablet_schema->row_columns_uids().begin(),
398
0
                                                _tablet_schema->row_columns_uids().end()});
399
0
            break;
400
0
        }
401
0
    }
402
403
0
    VLOG_DEBUG << "serialize , num_rows:" << block.rows() << ", row_column_id:" << row_column_id
404
0
               << ", total_byte_size:" << block.allocated_bytes() << ", serialize_cost(us)"
405
0
               << watch.elapsed_time() / 1000;
406
0
}
407
408
Status SegmentWriter::probe_key_for_mow(
409
        std::string key, std::size_t segment_pos, bool have_input_seq_column, bool have_delete_sign,
410
        const std::vector<RowsetSharedPtr>& specified_rowsets,
411
        std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
412
        bool& has_default_or_nullable, std::vector<bool>& use_default_or_null_flag,
413
        const std::function<void(const RowLocation& loc)>& found_cb,
414
0
        const std::function<Status()>& not_found_cb, PartialUpdateStats& stats) {
415
0
    RowLocation loc;
416
    // save rowset shared ptr so this rowset wouldn't delete
417
0
    RowsetSharedPtr rowset;
418
0
    auto st = _tablet->lookup_row_key(
419
0
            key, _tablet_schema.get(), have_input_seq_column, specified_rowsets, &loc,
420
0
            cast_set<uint32_t>(_mow_context->max_version), segment_caches, &rowset);
421
0
    if (st.is<KEY_NOT_FOUND>()) {
422
0
        if (!have_delete_sign) {
423
0
            RETURN_IF_ERROR(not_found_cb());
424
0
        }
425
0
        ++stats.num_rows_new_added;
426
0
        has_default_or_nullable = true;
427
0
        use_default_or_null_flag.emplace_back(true);
428
0
        return Status::OK();
429
0
    }
430
0
    if (!st.ok() && !st.is<KEY_ALREADY_EXISTS>()) {
431
0
        LOG(WARNING) << "failed to lookup row key, error: " << st;
432
0
        return st;
433
0
    }
434
435
    // 1. if the delete sign is marked, it means that the value columns of the row will not
436
    //    be read. So we don't need to read the missing values from the previous rows.
437
    // 2. the one exception is when there are sequence columns in the table, we need to read
438
    //    the sequence columns, otherwise it may cause the merge-on-read based compaction
439
    //    policy to produce incorrect results
440
    // TODO(bobhan1): only read seq col rather than all columns in this situation for
441
    // partial update and flexible partial update
442
443
    // TODO(bobhan1): handle sequence column here
444
0
    if (st.is<KEY_ALREADY_EXISTS>() || (have_delete_sign && !_tablet_schema->has_sequence_col())) {
445
0
        has_default_or_nullable = true;
446
0
        use_default_or_null_flag.emplace_back(true);
447
0
    } else {
448
        // partial update should not contain invisible columns
449
0
        use_default_or_null_flag.emplace_back(false);
450
0
        _rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
451
0
        found_cb(loc);
452
0
    }
453
454
0
    if (st.is<KEY_ALREADY_EXISTS>()) {
455
        // although we need to mark delete current row, we still need to read missing columns
456
        // for this row, we need to ensure that each column is aligned
457
0
        _mow_context->delete_bitmap->add(
458
0
                {_opts.rowset_ctx->rowset_id, _segment_id, DeleteBitmap::TEMP_VERSION_COMMON},
459
0
                cast_set<uint32_t>(segment_pos));
460
0
        ++stats.num_rows_deleted;
461
0
    } else {
462
0
        _mow_context->delete_bitmap->add(
463
0
                {loc.rowset_id, loc.segment_id, DeleteBitmap::TEMP_VERSION_COMMON}, loc.row_id);
464
0
        ++stats.num_rows_updated;
465
0
    }
466
0
    return Status::OK();
467
0
}
468
469
0
Status SegmentWriter::partial_update_preconditions_check(size_t row_pos) {
470
0
    if (!_is_mow()) {
471
0
        auto msg = fmt::format(
472
0
                "Can only do partial update on merge-on-write unique table, but found: "
473
0
                "keys_type={}, _opts.enable_unique_key_merge_on_write={}, tablet_id={}",
474
0
                _tablet_schema->keys_type(), _opts.enable_unique_key_merge_on_write,
475
0
                _tablet->tablet_id());
476
0
        DCHECK(false) << msg;
477
0
        return Status::InternalError<false>(msg);
478
0
    }
479
0
    if (_opts.rowset_ctx->partial_update_info == nullptr) {
480
0
        auto msg =
481
0
                fmt::format("partial_update_info should not be nullptr, please check, tablet_id={}",
482
0
                            _tablet->tablet_id());
483
0
        DCHECK(false) << msg;
484
0
        return Status::InternalError<false>(msg);
485
0
    }
486
0
    if (!_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
487
0
        auto msg = fmt::format(
488
0
                "in fixed partial update code, but update_mode={}, please check, tablet_id={}",
489
0
                _opts.rowset_ctx->partial_update_info->update_mode(), _tablet->tablet_id());
490
0
        DCHECK(false) << msg;
491
0
        return Status::InternalError<false>(msg);
492
0
    }
493
0
    if (row_pos != 0) {
494
0
        auto msg = fmt::format("row_pos should be 0, but found {}, tablet_id={}", row_pos,
495
0
                               _tablet->tablet_id());
496
0
        DCHECK(false) << msg;
497
0
        return Status::InternalError<false>(msg);
498
0
    }
499
0
    return Status::OK();
500
0
}
501
502
// for partial update, we should do following steps to fill content of block:
503
// 1. set block data to data convertor, and get all key_column's converted slice
504
// 2. get pk of input block, and read missing columns
505
//       2.1 first find key location{rowset_id, segment_id, row_id}
506
//       2.2 build read plan to read by batch
507
//       2.3 fill block
508
// 3. set columns to data convertor and then write all columns
509
Status SegmentWriter::append_block_with_partial_content(const Block* block, size_t row_pos,
510
0
                                                        size_t num_rows) {
511
0
    if (block->columns() < _tablet_schema->num_key_columns() ||
512
0
        block->columns() >= _tablet_schema->num_columns()) {
513
0
        return Status::InvalidArgument(
514
0
                fmt::format("illegal partial update block columns: {}, num key columns: {}, total "
515
0
                            "schema columns: {}",
516
0
                            block->columns(), _tablet_schema->num_key_columns(),
517
0
                            _tablet_schema->num_columns()));
518
0
    }
519
0
    RETURN_IF_ERROR(partial_update_preconditions_check(row_pos));
520
521
    // find missing column cids
522
0
    const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
523
0
    const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids;
524
525
    // create full block and fill with input columns
526
0
    auto full_block = _tablet_schema->create_block();
527
0
    size_t input_id = 0;
528
0
    for (auto i : including_cids) {
529
0
        full_block.replace_by_position(i, block->get_by_position(input_id++).column);
530
0
    }
531
532
0
    if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
533
0
        _tablet_schema->num_variant_columns() > 0) {
534
0
        RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
535
0
                full_block, *_tablet_schema, including_cids));
536
0
    }
537
0
    RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
538
0
            &full_block, row_pos, num_rows, including_cids));
539
540
0
    bool have_input_seq_column = false;
541
    // write including columns
542
0
    std::vector<IOlapColumnDataAccessor*> key_columns;
543
0
    IOlapColumnDataAccessor* seq_column = nullptr;
544
0
    size_t segment_start_pos = 0;
545
0
    for (auto cid : including_cids) {
546
        // here we get segment column row num before append data.
547
0
        segment_start_pos = _column_writers[cid]->get_next_rowid();
548
        // olap data convertor alway start from id = 0
549
0
        auto converted_result = _olap_data_convertor->convert_column_data(cid);
550
0
        if (!converted_result.first.ok()) {
551
0
            return converted_result.first;
552
0
        }
553
0
        if (cid < _num_sort_key_columns) {
554
0
            key_columns.push_back(converted_result.second);
555
0
        } else if (_tablet_schema->has_sequence_col() &&
556
0
                   cid == _tablet_schema->sequence_col_idx()) {
557
0
            seq_column = converted_result.second;
558
0
            have_input_seq_column = true;
559
0
        }
560
0
        RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
561
0
                                                     converted_result.second->get_data(),
562
0
                                                     num_rows));
563
0
    }
564
565
0
    bool has_default_or_nullable = false;
566
0
    std::vector<bool> use_default_or_null_flag;
567
0
    use_default_or_null_flag.reserve(num_rows);
568
0
    const auto* delete_signs =
569
0
            BaseTablet::get_delete_sign_column_data(full_block, row_pos + num_rows);
570
571
0
    const std::vector<RowsetSharedPtr>& specified_rowsets = _mow_context->rowset_ptrs;
572
0
    std::vector<std::unique_ptr<SegmentCacheHandle>> segment_caches(specified_rowsets.size());
573
574
0
    FixedReadPlan read_plan;
575
576
    // locate rows in base data
577
0
    PartialUpdateStats stats;
578
579
0
    for (size_t block_pos = row_pos; block_pos < row_pos + num_rows; block_pos++) {
580
        // block   segment
581
        //   2   ->   0
582
        //   3   ->   1
583
        //   4   ->   2
584
        //   5   ->   3
585
        // here row_pos = 2, num_rows = 4.
586
0
        size_t delta_pos = block_pos - row_pos;
587
0
        size_t segment_pos = segment_start_pos + delta_pos;
588
0
        std::string key = _full_encode_keys(key_columns, delta_pos);
589
0
        _maybe_invalid_row_cache(key);
590
0
        if (have_input_seq_column) {
591
0
            _encode_seq_column(seq_column, delta_pos, &key);
592
0
        }
593
        // If the table have sequence column, and the include-cids don't contain the sequence
594
        // column, we need to update the primary key index builder at the end of this method.
595
        // At that time, we have a valid sequence column to encode the key with seq col.
596
0
        if (!_tablet_schema->has_sequence_col() || have_input_seq_column) {
597
0
            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
598
0
        }
599
600
        // mark key with delete sign as deleted.
601
0
        bool have_delete_sign = (delete_signs != nullptr && delete_signs[block_pos] != 0);
602
603
0
        auto not_found_cb = [&]() {
604
0
            return _opts.rowset_ctx->partial_update_info->handle_new_key(
605
0
                    *_tablet_schema, [&]() -> std::string {
606
0
                        return block->dump_one_line(block_pos,
607
0
                                                    cast_set<int>(_num_sort_key_columns));
608
0
                    });
609
0
        };
610
0
        auto update_read_plan = [&](const RowLocation& loc) {
611
0
            read_plan.prepare_to_read(loc, segment_pos);
612
0
        };
613
0
        RETURN_IF_ERROR(probe_key_for_mow(std::move(key), segment_pos, have_input_seq_column,
614
0
                                          have_delete_sign, specified_rowsets, segment_caches,
615
0
                                          has_default_or_nullable, use_default_or_null_flag,
616
0
                                          update_read_plan, not_found_cb, stats));
617
0
    }
618
0
    CHECK_EQ(use_default_or_null_flag.size(), num_rows);
619
620
0
    if (config::enable_merge_on_write_correctness_check) {
621
0
        _tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
622
0
                                                    *_mow_context->rowset_ids);
623
0
    }
624
625
    // read to fill full block
626
0
    RETURN_IF_ERROR(read_plan.fill_missing_columns(
627
0
            _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block,
628
0
            use_default_or_null_flag, has_default_or_nullable,
629
0
            cast_set<uint32_t>(segment_start_pos), block));
630
631
    // convert block to row store format
632
0
    _serialize_block_to_row_column(full_block);
633
634
    // convert missing columns and send to column writer
635
0
    RETURN_IF_ERROR(_olap_data_convertor->set_source_content_with_specifid_columns(
636
0
            &full_block, row_pos, num_rows, missing_cids));
637
0
    for (auto cid : missing_cids) {
638
0
        auto converted_result = _olap_data_convertor->convert_column_data(cid);
639
0
        if (!converted_result.first.ok()) {
640
0
            return converted_result.first;
641
0
        }
642
0
        if (_tablet_schema->has_sequence_col() && !have_input_seq_column &&
643
0
            cid == _tablet_schema->sequence_col_idx()) {
644
0
            DCHECK_EQ(seq_column, nullptr);
645
0
            seq_column = converted_result.second;
646
0
        }
647
0
        RETURN_IF_ERROR(_column_writers[cid]->append(converted_result.second->get_nullmap(),
648
0
                                                     converted_result.second->get_data(),
649
0
                                                     num_rows));
650
0
    }
651
0
    _num_rows_updated += stats.num_rows_updated;
652
0
    _num_rows_deleted += stats.num_rows_deleted;
653
0
    _num_rows_new_added += stats.num_rows_new_added;
654
0
    _num_rows_filtered += stats.num_rows_filtered;
655
0
    if (_tablet_schema->has_sequence_col() && !have_input_seq_column) {
656
0
        DCHECK_NE(seq_column, nullptr);
657
0
        if (_num_rows_written != row_pos ||
658
0
            _primary_key_index_builder->num_rows() != _num_rows_written) {
659
0
            return Status::InternalError(
660
0
                    "Correctness check failed, _num_rows_written: {}, row_pos: {}, primary key "
661
0
                    "index builder num rows: {}",
662
0
                    _num_rows_written, row_pos, _primary_key_index_builder->num_rows());
663
0
        }
664
0
        RETURN_IF_ERROR(
665
0
                _generate_primary_key_index(_key_coders, key_columns, seq_column, num_rows, false));
666
0
    }
667
668
0
    _num_rows_written += num_rows;
669
0
    DCHECK_EQ(_primary_key_index_builder->num_rows(), _num_rows_written)
670
0
            << "primary key index builder num rows(" << _primary_key_index_builder->num_rows()
671
0
            << ") not equal to segment writer's num rows written(" << _num_rows_written << ")";
672
0
    _olap_data_convertor->clear_source_content();
673
674
0
    return Status::OK();
675
0
}
676
677
9.55k
Status SegmentWriter::append_block(const Block* block, size_t row_pos, size_t num_rows) {
678
9.55k
    if (_opts.rowset_ctx->partial_update_info &&
679
9.55k
        _opts.rowset_ctx->partial_update_info->is_partial_update() &&
680
9.55k
        _opts.write_type == DataWriteType::TYPE_DIRECT &&
681
9.55k
        !_opts.rowset_ctx->is_transient_rowset_writer) {
682
0
        if (_opts.rowset_ctx->partial_update_info->is_fixed_partial_update()) {
683
0
            RETURN_IF_ERROR(append_block_with_partial_content(block, row_pos, num_rows));
684
0
        } else {
685
0
            return Status::NotSupported<false>(
686
0
                    "SegmentWriter doesn't support flexible partial update, please set "
687
0
                    "enable_vertical_segment_writer=true in be.conf on all BEs to use "
688
0
                    "VerticalSegmentWriter.");
689
0
        }
690
0
        return Status::OK();
691
0
    }
692
9.55k
    if (block->columns() < _column_writers.size()) {
693
0
        return Status::InternalError(
694
0
                "block->columns() < _column_writers.size(), block->columns()=" +
695
0
                std::to_string(block->columns()) +
696
0
                ", _column_writers.size()=" + std::to_string(_column_writers.size()) +
697
0
                ", _tablet_schema->dump_structure()=" + _tablet_schema->dump_structure());
698
0
    }
699
9.55k
    CHECK(block->columns() >= _column_writers.size())
700
0
            << ", block->columns()=" << block->columns()
701
0
            << ", _column_writers.size()=" << _column_writers.size()
702
0
            << ", _tablet_schema->dump_structure()=" << _tablet_schema->dump_structure();
703
    // Row column should be filled here when it's a directly write from memtable
704
    // or it's schema change write(since column data type maybe changed, so we should reubild)
705
9.55k
    if (_opts.write_type == DataWriteType::TYPE_DIRECT ||
706
9.55k
        _opts.write_type == DataWriteType::TYPE_SCHEMA_CHANGE) {
707
0
        _serialize_block_to_row_column(*block);
708
0
    }
709
710
9.55k
    if (_opts.rowset_ctx->write_type != DataWriteType::TYPE_COMPACTION &&
711
9.55k
        _tablet_schema->num_variant_columns() > 0) {
712
135
        RETURN_IF_ERROR(variant_util::parse_and_materialize_variant_columns(
713
135
                const_cast<Block&>(*block), *_tablet_schema, _column_ids));
714
135
    }
715
716
9.55k
    _olap_data_convertor->set_source_content(block, row_pos, num_rows);
717
718
    // find all row pos for short key indexes
719
9.55k
    std::vector<size_t> short_key_pos;
720
9.55k
    if (_has_key) {
721
        // We build a short key index every `_opts.num_rows_per_block` rows. Specifically, we
722
        // build a short key index using 1st rows for first block and `_short_key_row_pos - _row_count`
723
        // for next blocks.
724
        // Ensure we build a short key index using 1st rows only for the first block (ISSUE-9766).
725
6.39k
        if (UNLIKELY(_short_key_row_pos == 0 && _num_rows_written == 0)) {
726
5.19k
            short_key_pos.push_back(0);
727
5.19k
        }
728
14.3k
        while (_short_key_row_pos + _opts.num_rows_per_block < _num_rows_written + num_rows) {
729
7.96k
            _short_key_row_pos += _opts.num_rows_per_block;
730
7.96k
            short_key_pos.push_back(_short_key_row_pos - _num_rows_written);
731
7.96k
        }
732
6.39k
    }
733
734
    // convert column data from engine format to storage layer format
735
9.55k
    std::vector<IOlapColumnDataAccessor*> key_columns;
736
9.55k
    IOlapColumnDataAccessor* seq_column = nullptr;
737
26.5k
    for (size_t id = 0; id < _column_writers.size(); ++id) {
738
        // olap data convertor alway start from id = 0
739
16.9k
        auto converted_result = _olap_data_convertor->convert_column_data(id);
740
16.9k
        if (!converted_result.first.ok()) {
741
0
            return converted_result.first;
742
0
        }
743
16.9k
        auto cid = _column_ids[id];
744
16.9k
        if (_has_key && cid < _tablet_schema->num_key_columns()) {
745
4.74k
            key_columns.push_back(converted_result.second);
746
12.2k
        } else if (_has_key && _tablet_schema->has_sequence_col() &&
747
12.2k
                   cid == _tablet_schema->sequence_col_idx()) {
748
90
            seq_column = converted_result.second;
749
90
        }
750
16.9k
        RETURN_IF_ERROR(_column_writers[id]->append(converted_result.second->get_nullmap(),
751
16.9k
                                                    converted_result.second->get_data(), num_rows));
752
16.9k
    }
753
9.55k
    if (_opts.write_type == DataWriteType::TYPE_COMPACTION) {
754
7.84k
        RETURN_IF_ERROR(
755
7.84k
                _variant_stats_calculator->calculate_variant_stats(block, row_pos, num_rows));
756
7.84k
    }
757
9.55k
    if (_has_key) {
758
6.39k
        if (_is_mow_with_cluster_key()) {
759
            // for now we don't need to query short key index for CLUSTER BY feature,
760
            // but we still write the index for future usage.
761
            // 1. generate primary key index, the key_columns is primary_key_columns
762
0
            RETURN_IF_ERROR(_generate_primary_key_index(_primary_key_coders, key_columns,
763
0
                                                        seq_column, num_rows, true));
764
            // 2. generate short key index (use cluster key)
765
0
            key_columns.clear();
766
0
            for (const auto& cid : _tablet_schema->cluster_key_uids()) {
767
                // find cluster key index in tablet schema
768
0
                auto cluster_key_index = _tablet_schema->field_index(cid);
769
0
                if (cluster_key_index == -1) {
770
0
                    return Status::InternalError(
771
0
                            "could not find cluster key column with unique_id=" +
772
0
                            std::to_string(cid) + " in tablet schema");
773
0
                }
774
0
                bool found = false;
775
0
                for (auto i = 0; i < _column_ids.size(); ++i) {
776
0
                    if (_column_ids[i] == cluster_key_index) {
777
0
                        auto converted_result = _olap_data_convertor->convert_column_data(i);
778
0
                        if (!converted_result.first.ok()) {
779
0
                            return converted_result.first;
780
0
                        }
781
0
                        key_columns.push_back(converted_result.second);
782
0
                        found = true;
783
0
                        break;
784
0
                    }
785
0
                }
786
0
                if (!found) {
787
0
                    return Status::InternalError(
788
0
                            "could not found cluster key column with unique_id=" +
789
0
                            std::to_string(cid) +
790
0
                            ", tablet schema index=" + std::to_string(cluster_key_index));
791
0
                }
792
0
            }
793
0
            RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
794
6.39k
        } else if (_is_mow()) {
795
10
            RETURN_IF_ERROR(_generate_primary_key_index(_key_coders, key_columns, seq_column,
796
10
                                                        num_rows, false));
797
6.38k
        } else {
798
6.38k
            RETURN_IF_ERROR(_generate_short_key_index(key_columns, num_rows, short_key_pos));
799
6.38k
        }
800
6.39k
    }
801
802
9.55k
    _num_rows_written += num_rows;
803
9.55k
    _olap_data_convertor->clear_source_content();
804
9.55k
    return Status::OK();
805
9.55k
}
806
807
2.06k
int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) {
808
2.06k
    auto segment_size = estimate_segment_size();
809
2.06k
    if (segment_size >= MAX_SEGMENT_SIZE || _num_rows_written >= _opts.max_rows_per_segment)
810
354
            [[unlikely]] {
811
354
        return 0;
812
354
    }
813
1.71k
    int64_t size_rows = ((int64_t)MAX_SEGMENT_SIZE - (int64_t)segment_size) / row_avg_size_in_bytes;
814
1.71k
    int64_t count_rows = (int64_t)_opts.max_rows_per_segment - _num_rows_written;
815
816
1.71k
    return std::min(size_rows, count_rows);
817
2.06k
}
818
819
std::string SegmentWriter::_full_encode_keys(
820
13.5k
        const std::vector<IOlapColumnDataAccessor*>& key_columns, size_t pos, bool null_first) {
821
13.5k
    assert(_key_index_size.size() == _num_sort_key_columns);
822
13.5k
    assert(key_columns.size() == _num_sort_key_columns &&
823
13.5k
           _key_coders.size() == _num_sort_key_columns);
824
13.5k
    return _full_encode_keys(_key_coders, key_columns, pos, null_first);
825
13.5k
}
826
827
std::string SegmentWriter::_full_encode_keys(
828
        const std::vector<const KeyCoder*>& key_coders,
829
13.5k
        const std::vector<IOlapColumnDataAccessor*>& key_columns, size_t pos, bool null_first) {
830
13.5k
    assert(key_columns.size() == key_coders.size());
831
832
13.5k
    std::string encoded_keys;
833
13.5k
    size_t cid = 0;
834
13.5k
    for (const auto& column : key_columns) {
835
10.2k
        auto field = column->get_data_at(pos);
836
10.2k
        if (UNLIKELY(!field)) {
837
0
            if (null_first) {
838
0
                encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
839
0
            } else {
840
0
                encoded_keys.push_back(KEY_NORMAL_MARKER);
841
0
            }
842
0
            ++cid;
843
0
            continue;
844
0
        }
845
10.2k
        encoded_keys.push_back(KEY_NORMAL_MARKER);
846
10.2k
        DCHECK(key_coders[cid] != nullptr);
847
10.2k
        key_coders[cid]->full_encode_ascending(field, &encoded_keys);
848
10.2k
        ++cid;
849
10.2k
    }
850
13.5k
    return encoded_keys;
851
13.5k
}
852
853
void SegmentWriter::_encode_seq_column(const IOlapColumnDataAccessor* seq_column, size_t pos,
854
0
                                       std::string* encoded_keys) {
855
0
    auto field = seq_column->get_data_at(pos);
856
    // To facilitate the use of the primary key index, encode the seq column
857
    // to the minimum value of the corresponding length when the seq column
858
    // is null
859
0
    if (UNLIKELY(!field)) {
860
0
        encoded_keys->push_back(KEY_NULL_FIRST_MARKER);
861
0
        size_t seq_col_length = _tablet_schema->column(_tablet_schema->sequence_col_idx()).length();
862
0
        encoded_keys->append(seq_col_length, KEY_MINIMAL_MARKER);
863
0
        return;
864
0
    }
865
0
    encoded_keys->push_back(KEY_NORMAL_MARKER);
866
0
    _seq_coder->full_encode_ascending(field, encoded_keys);
867
0
}
868
869
0
void SegmentWriter::_encode_rowid(const uint32_t rowid, std::string* encoded_keys) {
870
0
    encoded_keys->push_back(KEY_NORMAL_MARKER);
871
0
    _rowid_coder->full_encode_ascending(&rowid, encoded_keys);
872
0
}
873
874
std::string SegmentWriter::_encode_keys(const std::vector<IOlapColumnDataAccessor*>& key_columns,
875
13.1k
                                        size_t pos) {
876
13.1k
    assert(key_columns.size() == _num_short_key_columns);
877
878
13.1k
    std::string encoded_keys;
879
13.1k
    size_t cid = 0;
880
13.1k
    for (const auto& column : key_columns) {
881
10.1k
        auto field = column->get_data_at(pos);
882
10.1k
        if (UNLIKELY(!field)) {
883
0
            encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
884
0
            ++cid;
885
0
            continue;
886
0
        }
887
10.1k
        encoded_keys.push_back(KEY_NORMAL_MARKER);
888
10.1k
        _key_coders[cid]->encode_ascending(field, _key_index_size[cid], &encoded_keys);
889
10.1k
        ++cid;
890
10.1k
    }
891
13.1k
    return encoded_keys;
892
13.1k
}
893
129k
Status SegmentWriter::append_row(const RowCursor& row) {
894
780k
    for (size_t cid = 0; cid < _column_writers.size(); ++cid) {
895
651k
        auto cell = row.cell(cast_set<uint32_t>(cid));
896
651k
        RETURN_IF_ERROR(_column_writers[cid]->append(cell));
897
651k
    }
898
129k
    std::string full_encoded_key;
899
129k
    row.encode_key<true>(&full_encoded_key, _num_sort_key_columns);
900
129k
    if (_tablet_schema->has_sequence_col()) {
901
64.4k
        full_encoded_key.push_back(KEY_NORMAL_MARKER);
902
64.4k
        auto cid = _tablet_schema->sequence_col_idx();
903
64.4k
        auto cell = row.cell(cid);
904
64.4k
        row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &full_encoded_key);
905
64.4k
    }
906
907
129k
    if (_is_mow_with_cluster_key()) {
908
0
        return Status::InternalError(
909
0
                "SegmentWriter::append_row does not support mow tables with cluster key");
910
129k
    } else if (_is_mow()) {
911
128k
        RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key));
912
128k
    } else {
913
        // At the beginning of one block, so add a short key index entry
914
202
        if ((_num_rows_written % _opts.num_rows_per_block) == 0) {
915
4
            std::string encoded_key;
916
4
            row.encode_key(&encoded_key, _num_short_key_columns);
917
4
            RETURN_IF_ERROR(_short_key_index_builder->add_item(encoded_key));
918
4
        }
919
202
        set_min_max_key(full_encoded_key);
920
202
    }
921
129k
    ++_num_rows_written;
922
129k
    return Status::OK();
923
129k
}
924
925
// TODO(lingbin): Currently this function does not include the size of various indexes,
926
// We should make this more precise.
927
// NOTE: This function will be called when any row of data is added, so we need to
928
// make this function efficient.
929
2.38k
uint64_t SegmentWriter::estimate_segment_size() {
930
    // footer_size(4) + checksum(4) + segment_magic(4)
931
2.38k
    uint64_t size = 12;
932
9.62k
    for (auto& column_writer : _column_writers) {
933
9.62k
        size += column_writer->estimate_buffer_size();
934
9.62k
    }
935
2.38k
    if (_is_mow_with_cluster_key()) {
936
0
        size += _primary_key_index_builder->size() + _short_key_index_builder->size();
937
2.38k
    } else if (_is_mow()) {
938
0
        size += _primary_key_index_builder->size();
939
2.38k
    } else {
940
2.38k
        size += _short_key_index_builder->size();
941
2.38k
    }
942
943
    // update the mem_tracker of segment size
944
2.38k
    _mem_tracker->consume(size - _mem_tracker->consumption());
945
2.38k
    return size;
946
2.38k
}
947
948
7.75k
Status SegmentWriter::finalize_columns_data() {
949
7.75k
    if (_has_key) {
950
5.25k
        _row_count = _num_rows_written;
951
5.25k
    } else {
952
2.49k
        DCHECK(_row_count == _num_rows_written)
953
0
                << "_row_count != _num_rows_written:" << _row_count << " vs. " << _num_rows_written;
954
2.49k
        if (_row_count != _num_rows_written) {
955
0
            std::stringstream ss;
956
0
            ss << "_row_count != _num_rows_written:" << _row_count << " vs. " << _num_rows_written;
957
0
            LOG(WARNING) << ss.str();
958
0
            return Status::InternalError(ss.str());
959
0
        }
960
2.49k
    }
961
7.75k
    _num_rows_written = 0;
962
963
15.9k
    for (auto& column_writer : _column_writers) {
964
15.9k
        RETURN_IF_ERROR(column_writer->finish());
965
15.9k
    }
966
7.75k
    RETURN_IF_ERROR(_write_data());
967
968
7.75k
    return Status::OK();
969
7.75k
}
970
971
7.75k
Status SegmentWriter::finalize_columns_index(uint64_t* index_size) {
972
7.75k
    uint64_t index_start = _file_writer->bytes_appended();
973
7.75k
    RETURN_IF_ERROR(_write_ordinal_index());
974
7.75k
    RETURN_IF_ERROR(_write_zone_map());
975
7.75k
    RETURN_IF_ERROR(_write_inverted_index());
976
7.75k
    RETURN_IF_ERROR(_write_ann_index());
977
7.75k
    RETURN_IF_ERROR(_write_bloom_filter_index());
978
979
7.75k
    *index_size = _file_writer->bytes_appended() - index_start;
980
7.75k
    if (_has_key) {
981
5.25k
        if (_is_mow_with_cluster_key()) {
982
            // 1. sort primary keys
983
0
            std::sort(_primary_keys.begin(), _primary_keys.end());
984
            // 2. write primary keys index
985
0
            std::string last_key;
986
0
            for (const auto& key : _primary_keys) {
987
0
                DCHECK(key.compare(last_key) > 0)
988
0
                        << "found duplicate key or key is not sorted! current key: " << key
989
0
                        << ", last key: " << last_key;
990
0
                RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
991
0
                last_key = key;
992
0
            }
993
994
0
            RETURN_IF_ERROR(_write_short_key_index());
995
0
            *index_size = _file_writer->bytes_appended() - index_start;
996
0
            RETURN_IF_ERROR(_write_primary_key_index());
997
0
            *index_size += _primary_key_index_builder->disk_size();
998
5.25k
        } else if (_is_mow()) {
999
69
            RETURN_IF_ERROR(_write_primary_key_index());
1000
            // IndexedColumnWriter write data pages mixed with segment data, we should use
1001
            // the stat from primary key index builder.
1002
69
            *index_size += _primary_key_index_builder->disk_size();
1003
5.18k
        } else {
1004
5.18k
            RETURN_IF_ERROR(_write_short_key_index());
1005
5.18k
            *index_size = _file_writer->bytes_appended() - index_start;
1006
5.18k
        }
1007
5.25k
    }
1008
    // reset all column writers and data_conveter
1009
7.75k
    clear();
1010
1011
7.75k
    return Status::OK();
1012
7.75k
}
1013
1014
5.25k
Status SegmentWriter::finalize_footer(uint64_t* segment_file_size) {
1015
5.25k
    RETURN_IF_ERROR(_write_footer());
1016
    // finish
1017
5.25k
    RETURN_IF_ERROR(_file_writer->close(true));
1018
5.25k
    *segment_file_size = _file_writer->bytes_appended();
1019
5.25k
    if (*segment_file_size == 0) {
1020
0
        return Status::Corruption("Bad segment, file size = 0");
1021
0
    }
1022
5.25k
    return Status::OK();
1023
5.25k
}
1024
1025
1.24k
Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size) {
1026
1.24k
    MonotonicStopWatch timer;
1027
1.24k
    timer.start();
1028
    // check disk capacity
1029
1.24k
    if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
1030
0
        return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit, path: {}",
1031
0
                                                        _data_dir->path_hash(), _data_dir->path());
1032
0
    }
1033
    // write data
1034
1.24k
    RETURN_IF_ERROR(finalize_columns_data());
1035
    // Get the index start before finalize_footer since this function would write new data.
1036
1.24k
    uint64_t index_start = _file_writer->bytes_appended();
1037
    // write index
1038
1.24k
    RETURN_IF_ERROR(finalize_columns_index(index_size));
1039
    // write footer
1040
1.24k
    RETURN_IF_ERROR(finalize_footer(segment_file_size));
1041
1042
1.24k
    if (timer.elapsed_time() > 5000000000l) {
1043
0
        LOG(INFO) << "segment flush consumes a lot time_ns " << timer.elapsed_time()
1044
0
                  << ", segmemt_size " << *segment_file_size;
1045
0
    }
1046
    // When the cache type is not ttl(expiration time == 0), the data should be split into normal cache queue
1047
    // and index cache queue
1048
1.24k
    if (auto* cache_builder = _file_writer->cache_builder(); cache_builder != nullptr &&
1049
1.24k
                                                             cache_builder->_expiration_time == 0 &&
1050
1.24k
                                                             config::is_cloud_mode()) {
1051
0
        auto size = *index_size + *segment_file_size;
1052
0
        auto holder = cache_builder->allocate_cache_holder(index_start, size, _tablet->tablet_id());
1053
0
        for (auto& segment : holder->file_blocks) {
1054
0
            static_cast<void>(segment->change_cache_type(io::FileCacheType::INDEX));
1055
0
        }
1056
0
    }
1057
1.24k
    return Status::OK();
1058
1.24k
}
1059
1060
7.77k
void SegmentWriter::clear() {
1061
15.9k
    for (auto& column_writer : _column_writers) {
1062
15.9k
        column_writer.reset();
1063
15.9k
    }
1064
7.77k
    _column_writers.clear();
1065
7.77k
    _column_ids.clear();
1066
7.77k
    _olap_data_convertor.reset();
1067
7.77k
}
1068
1069
// write column data to file one by one
1070
7.75k
Status SegmentWriter::_write_data() {
1071
15.9k
    for (auto& column_writer : _column_writers) {
1072
15.9k
        RETURN_IF_ERROR(column_writer->write_data());
1073
1074
15.9k
        auto* column_meta = column_writer->get_column_meta();
1075
15.9k
        DCHECK(column_meta != nullptr);
1076
15.9k
        column_meta->set_compressed_data_bytes(
1077
15.9k
                (column_meta->has_compressed_data_bytes() ? column_meta->compressed_data_bytes()
1078
15.9k
                                                          : 0) +
1079
15.9k
                column_writer->get_total_compressed_data_pages_bytes());
1080
15.9k
        column_meta->set_uncompressed_data_bytes(
1081
15.9k
                (column_meta->has_uncompressed_data_bytes() ? column_meta->uncompressed_data_bytes()
1082
15.9k
                                                            : 0) +
1083
15.9k
                column_writer->get_total_uncompressed_data_pages_bytes());
1084
15.9k
        column_meta->set_raw_data_bytes(
1085
15.9k
                (column_meta->has_raw_data_bytes() ? column_meta->raw_data_bytes() : 0) +
1086
15.9k
                column_writer->get_raw_data_bytes());
1087
15.9k
    }
1088
7.75k
    return Status::OK();
1089
7.75k
}
1090
1091
// write ordinal index after data has been written
1092
7.75k
Status SegmentWriter::_write_ordinal_index() {
1093
15.9k
    for (auto& column_writer : _column_writers) {
1094
15.9k
        RETURN_IF_ERROR(column_writer->write_ordinal_index());
1095
15.9k
    }
1096
7.75k
    return Status::OK();
1097
7.75k
}
1098
1099
7.75k
Status SegmentWriter::_write_zone_map() {
1100
15.9k
    for (auto& column_writer : _column_writers) {
1101
15.9k
        RETURN_IF_ERROR(column_writer->write_zone_map());
1102
15.9k
    }
1103
7.75k
    return Status::OK();
1104
7.75k
}
1105
1106
7.75k
Status SegmentWriter::_write_inverted_index() {
1107
15.9k
    for (auto& column_writer : _column_writers) {
1108
15.9k
        RETURN_IF_ERROR(column_writer->write_inverted_index());
1109
15.9k
    }
1110
7.75k
    return Status::OK();
1111
7.75k
}
1112
1113
7.75k
Status SegmentWriter::_write_ann_index() {
1114
15.9k
    for (auto& column_writer : _column_writers) {
1115
15.9k
        RETURN_IF_ERROR(column_writer->write_ann_index());
1116
15.9k
    }
1117
7.75k
    return Status::OK();
1118
7.75k
}
1119
1120
7.75k
Status SegmentWriter::_write_bloom_filter_index() {
1121
15.9k
    for (auto& column_writer : _column_writers) {
1122
15.9k
        RETURN_IF_ERROR(column_writer->write_bloom_filter_index());
1123
15.9k
    }
1124
7.75k
    return Status::OK();
1125
7.75k
}
1126
1127
5.18k
Status SegmentWriter::_write_short_key_index() {
1128
5.18k
    std::vector<Slice> body;
1129
5.18k
    PageFooterPB footer;
1130
5.18k
    RETURN_IF_ERROR(_short_key_index_builder->finalize(_row_count, &body, &footer));
1131
5.18k
    PagePointer pp;
1132
    // short key index page is not compressed right now
1133
5.18k
    RETURN_IF_ERROR(PageIO::write_page(_file_writer, body, footer, &pp));
1134
5.18k
    pp.to_proto(_footer.mutable_short_key_index_page());
1135
5.18k
    return Status::OK();
1136
5.18k
}
1137
1138
69
Status SegmentWriter::_write_primary_key_index() {
1139
69
    CHECK_EQ(_primary_key_index_builder->num_rows(), _row_count);
1140
69
    return _primary_key_index_builder->finalize(_footer.mutable_primary_key_index_meta());
1141
69
}
1142
1143
5.25k
Status SegmentWriter::_write_footer() {
1144
5.25k
    _footer.set_num_rows(_row_count);
1145
    // Decide whether to externalize ColumnMetaPB by tablet default, and stamp footer version
1146
5.25k
    if (_tablet_schema->is_external_segment_column_meta_used()) {
1147
67
        _footer.set_version(SEGMENT_FOOTER_VERSION_V3_EXT_COL_META);
1148
67
        VLOG_DEBUG << "use external column meta";
1149
        // External ColumnMetaPB writing (optional)
1150
67
        RETURN_IF_ERROR(ExternalColMetaUtil::write_external_column_meta(
1151
67
                _file_writer, &_footer, _opts.compression_type,
1152
67
                [this](const std::vector<Slice>& slices) { return _write_raw_data(slices); }));
1153
67
    }
1154
1155
    // Footer := SegmentFooterPB, FooterPBSize(4), FooterPBChecksum(4), MagicNumber(4)
1156
5.25k
    std::string footer_buf;
1157
5.25k
    VLOG_DEBUG << "footer " << _footer.DebugString();
1158
5.25k
    if (!_footer.SerializeToString(&footer_buf)) {
1159
0
        return Status::InternalError("failed to serialize segment footer");
1160
0
    }
1161
1162
5.25k
    faststring fixed_buf;
1163
    // footer's size
1164
5.25k
    put_fixed32_le(&fixed_buf, cast_set<uint32_t>(footer_buf.size()));
1165
    // footer's checksum
1166
5.25k
    uint32_t checksum = crc32c::Crc32c(footer_buf.data(), footer_buf.size());
1167
5.25k
    put_fixed32_le(&fixed_buf, checksum);
1168
    // Append magic number. we don't write magic number in the header because
1169
    // that will need an extra seek when reading
1170
5.25k
    fixed_buf.append(k_segment_magic, k_segment_magic_length);
1171
1172
5.25k
    std::vector<Slice> slices {footer_buf, fixed_buf};
1173
5.25k
    return _write_raw_data(slices);
1174
5.25k
}
1175
1176
6.97k
Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) {
1177
6.97k
    RETURN_IF_ERROR(_file_writer->appendv(&slices[0], slices.size()));
1178
6.97k
    return Status::OK();
1179
6.97k
}
1180
1181
5.25k
Slice SegmentWriter::min_encoded_key() {
1182
5.25k
    return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size())
1183
5.25k
                                                   : _primary_key_index_builder->min_key();
1184
5.25k
}
1185
5.25k
Slice SegmentWriter::max_encoded_key() {
1186
5.25k
    return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size())
1187
5.25k
                                                   : _primary_key_index_builder->max_key();
1188
5.25k
}
1189
1190
202
void SegmentWriter::set_min_max_key(const Slice& key) {
1191
202
    if (UNLIKELY(_is_first_row)) {
1192
4
        _min_key.append(key.get_data(), key.get_size());
1193
4
        _is_first_row = false;
1194
4
    }
1195
202
    if (key.compare(_max_key) > 0) {
1196
202
        _max_key.clear();
1197
202
        _max_key.append(key.get_data(), key.get_size());
1198
202
    }
1199
202
}
1200
1201
6.38k
void SegmentWriter::set_min_key(const Slice& key) {
1202
6.38k
    if (UNLIKELY(_is_first_row)) {
1203
5.18k
        _min_key.append(key.get_data(), key.get_size());
1204
5.18k
        _is_first_row = false;
1205
5.18k
    }
1206
6.38k
}
1207
1208
6.38k
void SegmentWriter::set_max_key(const Slice& key) {
1209
6.38k
    _max_key.clear();
1210
6.38k
    _max_key.append(key.get_data(), key.get_size());
1211
6.38k
}
1212
1213
0
void SegmentWriter::set_mow_context(std::shared_ptr<MowContext> mow_context) {
1214
0
    _mow_context = mow_context;
1215
0
}
1216
1217
Status SegmentWriter::_generate_primary_key_index(
1218
        const std::vector<const KeyCoder*>& primary_key_coders,
1219
        const std::vector<IOlapColumnDataAccessor*>& primary_key_columns,
1220
10
        IOlapColumnDataAccessor* seq_column, size_t num_rows, bool need_sort) {
1221
10
    if (!need_sort) { // mow table without cluster key
1222
10
        std::string last_key;
1223
810
        for (size_t pos = 0; pos < num_rows; pos++) {
1224
            // use _key_coders
1225
800
            std::string key = _full_encode_keys(primary_key_columns, pos);
1226
800
            _maybe_invalid_row_cache(key);
1227
800
            if (_tablet_schema->has_sequence_col()) {
1228
0
                _encode_seq_column(seq_column, pos, &key);
1229
0
            }
1230
800
            DCHECK(key.compare(last_key) > 0)
1231
0
                    << "found duplicate key or key is not sorted! current key: " << key
1232
0
                    << ", last key: " << last_key;
1233
800
            RETURN_IF_ERROR(_primary_key_index_builder->add_item(key));
1234
800
            last_key = std::move(key);
1235
800
        }
1236
10
    } else { // mow table with cluster key
1237
        // generate primary keys in memory
1238
0
        for (uint32_t pos = 0; pos < num_rows; pos++) {
1239
0
            std::string key = _full_encode_keys(primary_key_coders, primary_key_columns, pos);
1240
0
            _maybe_invalid_row_cache(key);
1241
0
            if (_tablet_schema->has_sequence_col()) {
1242
0
                _encode_seq_column(seq_column, pos, &key);
1243
0
            }
1244
0
            _encode_rowid(pos + _num_rows_written, &key);
1245
0
            _primary_keys_size += key.size();
1246
0
            _primary_keys.emplace_back(std::move(key));
1247
0
        }
1248
0
    }
1249
10
    return Status::OK();
1250
10
}
1251
1252
Status SegmentWriter::_generate_short_key_index(std::vector<IOlapColumnDataAccessor*>& key_columns,
1253
                                                size_t num_rows,
1254
6.38k
                                                const std::vector<size_t>& short_key_pos) {
1255
    // use _key_coders
1256
6.38k
    set_min_key(_full_encode_keys(key_columns, 0));
1257
6.38k
    set_max_key(_full_encode_keys(key_columns, num_rows - 1));
1258
6.38k
    DCHECK(Slice(_max_key.data(), _max_key.size())
1259
0
                   .compare(Slice(_min_key.data(), _min_key.size())) >= 0)
1260
0
            << "key is not sorted! min key: " << _min_key << ", max key: " << _max_key;
1261
1262
6.38k
    key_columns.resize(_num_short_key_columns);
1263
6.38k
    std::string last_key;
1264
13.1k
    for (const auto pos : short_key_pos) {
1265
13.1k
        std::string key = _encode_keys(key_columns, pos);
1266
13.1k
        DCHECK(key.compare(last_key) >= 0)
1267
0
                << "key is not sorted! current key: " << key << ", last key: " << last_key;
1268
13.1k
        RETURN_IF_ERROR(_short_key_index_builder->add_item(key));
1269
13.1k
        last_key = std::move(key);
1270
13.1k
    }
1271
6.38k
    return Status::OK();
1272
6.38k
}
1273
1274
302k
inline bool SegmentWriter::_is_mow() {
1275
302k
    return _tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write;
1276
302k
}
1277
1278
148k
inline bool SegmentWriter::_is_mow_with_cluster_key() {
1279
148k
    return _is_mow() && !_tablet_schema->cluster_key_uids().empty();
1280
148k
}
1281
1282
#include "common/compile_check_end.h"
1283
1284
} // namespace segment_v2
1285
} // namespace doris