Coverage Report

Created: 2026-04-22 11:46

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/merger.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/merger.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <gen_cpp/types.pb.h>
22
#include <stddef.h>
23
#include <unistd.h>
24
25
#include <algorithm>
26
#include <iterator>
27
#include <memory>
28
#include <mutex>
29
#include <numeric>
30
#include <ostream>
31
#include <shared_mutex>
32
#include <string>
33
#include <unordered_map>
34
#include <utility>
35
#include <vector>
36
37
#include "cloud/config.h"
38
#include "common/config.h"
39
#include "common/logging.h"
40
#include "common/status.h"
41
#include "core/block/block.h"
42
#include "storage/iterator/block_reader.h"
43
#include "storage/iterator/vertical_block_reader.h"
44
#include "storage/iterator/vertical_merge_iterator.h"
45
#include "storage/iterators.h"
46
#include "storage/olap_common.h"
47
#include "storage/olap_define.h"
48
#include "storage/rowid_conversion.h"
49
#include "storage/rowset/beta_rowset.h"
50
#include "storage/rowset/rowset.h"
51
#include "storage/rowset/rowset_meta.h"
52
#include "storage/rowset/rowset_writer.h"
53
#include "storage/segment/segment.h"
54
#include "storage/segment/segment_writer.h"
55
#include "storage/storage_engine.h"
56
#include "storage/tablet/base_tablet.h"
57
#include "storage/tablet/tablet.h"
58
#include "storage/tablet/tablet_fwd.h"
59
#include "storage/tablet/tablet_meta.h"
60
#include "storage/tablet/tablet_reader.h"
61
#include "storage/utils.h"
62
#include "util/slice.h"
63
64
namespace doris {
65
Status Merger::vmerge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
66
                              const TabletSchema& cur_tablet_schema,
67
                              const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
68
1.44k
                              RowsetWriter* dst_rowset_writer, Statistics* stats_output) {
69
1.44k
    if (!cur_tablet_schema.cluster_key_uids().empty()) {
70
0
        return Status::InternalError(
71
0
                "mow table with cluster keys does not support non vertical compaction");
72
0
    }
73
1.44k
    BlockReader reader;
74
1.44k
    TabletReader::ReaderParams reader_params;
75
1.44k
    reader_params.tablet = tablet;
76
1.44k
    reader_params.reader_type = reader_type;
77
78
1.44k
    TabletReadSource read_source;
79
1.44k
    read_source.rs_splits.reserve(src_rowset_readers.size());
80
1.55k
    for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
81
1.55k
        read_source.rs_splits.emplace_back(rs_reader);
82
1.55k
    }
83
1.44k
    read_source.fill_delete_predicates();
84
1.44k
    reader_params.set_read_source(std::move(read_source));
85
86
1.44k
    reader_params.version = dst_rowset_writer->version();
87
88
1.44k
    TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
89
1.44k
    merge_tablet_schema->copy_from(cur_tablet_schema);
90
91
    // Merge the columns in delete predicate that not in latest schema in to current tablet schema
92
1.44k
    for (auto& del_pred_rs : reader_params.delete_predicates) {
93
24
        merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
94
24
    }
95
1.44k
    reader_params.tablet_schema = merge_tablet_schema;
96
1.44k
    if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
97
0
        reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr();
98
0
    }
99
100
1.44k
    if (stats_output && stats_output->rowid_conversion) {
101
48
        reader_params.record_rowids = true;
102
48
        reader_params.rowid_conversion = stats_output->rowid_conversion;
103
48
        stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
104
48
    }
105
106
1.44k
    reader_params.return_columns.resize(cur_tablet_schema.num_columns());
107
1.44k
    std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0);
108
1.44k
    reader_params.origin_return_columns = &reader_params.return_columns;
109
1.44k
    RETURN_IF_ERROR(reader.init(reader_params));
110
111
1.44k
    Block block = cur_tablet_schema.create_block(reader_params.return_columns);
112
1.44k
    size_t output_rows = 0;
113
1.44k
    bool eof = false;
114
4.20k
    while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) {
115
2.76k
        auto tablet_state = tablet->tablet_state();
116
2.76k
        if (tablet_state != TABLET_RUNNING && tablet_state != TABLET_NOTREADY) {
117
0
            tablet->clear_cache();
118
0
            return Status::Error<INTERNAL_ERROR>("tablet {} is not used any more",
119
0
                                                 tablet->tablet_id());
120
0
        }
121
122
        // Read one block from block reader
123
2.76k
        RETURN_NOT_OK_STATUS_WITH_WARN(reader.next_block_with_aggregation(&block, &eof),
124
2.76k
                                       "failed to read next block when merging rowsets of tablet " +
125
2.76k
                                               std::to_string(tablet->tablet_id()));
126
2.76k
        RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->add_block(&block),
127
2.76k
                                       "failed to write block when merging rowsets of tablet " +
128
2.76k
                                               std::to_string(tablet->tablet_id()));
129
130
2.76k
        if (reader_params.record_rowids && block.rows() > 0) {
131
578
            std::vector<uint32_t> segment_num_rows;
132
578
            RETURN_IF_ERROR(dst_rowset_writer->get_segment_num_rows(&segment_num_rows));
133
578
            stats_output->rowid_conversion->add(reader.current_block_row_locations(),
134
578
                                                segment_num_rows);
135
578
        }
136
137
2.76k
        output_rows += block.rows();
138
2.76k
        block.clear_column_data();
139
2.76k
    }
140
1.44k
    if (ExecEnv::GetInstance()->storage_engine().stopped()) {
141
0
        return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped",
142
0
                                             tablet->tablet_id());
143
0
    }
144
145
1.44k
    if (stats_output != nullptr) {
146
1.43k
        stats_output->output_rows = output_rows;
147
1.43k
        stats_output->merged_rows = reader.merged_rows();
148
1.43k
        stats_output->filtered_rows = reader.filtered_rows();
149
1.43k
        stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local;
150
1.43k
        stats_output->bytes_read_from_remote =
151
1.43k
                reader.stats().file_cache_stats.bytes_read_from_remote;
152
1.43k
        stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache;
153
1.43k
        if (config::is_cloud_mode()) {
154
1.39k
            stats_output->cloud_local_read_time =
155
1.39k
                    reader.stats().file_cache_stats.local_io_timer / 1000;
156
1.39k
            stats_output->cloud_remote_read_time =
157
1.39k
                    reader.stats().file_cache_stats.remote_io_timer / 1000;
158
1.39k
        }
159
1.43k
    }
160
161
1.44k
    RETURN_NOT_OK_STATUS_WITH_WARN(dst_rowset_writer->flush(),
162
1.44k
                                   "failed to flush rowset when merging rowsets of tablet " +
163
1.44k
                                           std::to_string(tablet->tablet_id()));
164
165
1.44k
    return Status::OK();
166
1.44k
}
167
168
// split columns into several groups, make sure all keys in one group
169
// unique_key should consider sequence&delete column
170
void Merger::vertical_split_columns(const TabletSchema& tablet_schema,
171
                                    std::vector<std::vector<uint32_t>>* column_groups,
172
                                    std::vector<uint32_t>* key_group_cluster_key_idxes,
173
7.89k
                                    int32_t num_columns_per_group) {
174
7.89k
    size_t num_key_cols = tablet_schema.num_key_columns();
175
7.89k
    size_t total_cols = tablet_schema.num_columns();
176
7.89k
    std::vector<uint32_t> key_columns;
177
41.8k
    for (auto i = 0; i < num_key_cols; ++i) {
178
33.9k
        key_columns.emplace_back(i);
179
33.9k
    }
180
    // in unique key, sequence & delete sign column should merge with key columns
181
7.89k
    int32_t sequence_col_idx = -1;
182
7.89k
    int32_t delete_sign_idx = -1;
183
    // in key column compaction, seq_col real index is _num_key_columns
184
    // and delete_sign column is _block->columns() - 1
185
7.89k
    if (tablet_schema.keys_type() == KeysType::UNIQUE_KEYS) {
186
3.97k
        if (tablet_schema.has_sequence_col()) {
187
86
            sequence_col_idx = tablet_schema.sequence_col_idx();
188
86
            key_columns.emplace_back(sequence_col_idx);
189
86
        }
190
3.97k
        delete_sign_idx = tablet_schema.field_index(DELETE_SIGN);
191
3.97k
        if (delete_sign_idx != -1) {
192
3.96k
            key_columns.emplace_back(delete_sign_idx);
193
3.96k
        }
194
3.97k
        if (!tablet_schema.cluster_key_uids().empty()) {
195
459
            for (const auto& cid : tablet_schema.cluster_key_uids()) {
196
459
                auto idx = tablet_schema.field_index(cid);
197
459
                DCHECK(idx >= 0) << "could not find cluster key column with unique_id=" << cid
198
0
                                 << " in tablet schema, table_id=" << tablet_schema.table_id();
199
459
                if (idx >= num_key_cols) {
200
218
                    key_columns.emplace_back(idx);
201
218
                }
202
459
            }
203
            // tablet schema unique ids: [1, 2, 5, 3, 6, 4], [1 2] is key columns
204
            // cluster key unique ids: [3, 1, 4]
205
            // the key_columns should be [0, 1, 3, 5]
206
            // the key_group_cluster_key_idxes should be [2, 1, 3]
207
459
            for (const auto& cid : tablet_schema.cluster_key_uids()) {
208
459
                auto idx = tablet_schema.field_index(cid);
209
2.66k
                for (auto i = 0; i < key_columns.size(); ++i) {
210
2.66k
                    if (idx == key_columns[i]) {
211
459
                        key_group_cluster_key_idxes->emplace_back(i);
212
459
                        break;
213
459
                    }
214
2.66k
                }
215
459
            }
216
154
        }
217
3.97k
    }
218
7.89k
    VLOG_NOTICE << "sequence_col_idx=" << sequence_col_idx
219
49
                << ", delete_sign_idx=" << delete_sign_idx;
220
    // for duplicate no keys
221
7.89k
    if (!key_columns.empty()) {
222
7.86k
        column_groups->emplace_back(key_columns);
223
7.86k
    }
224
225
7.89k
    std::vector<uint32_t> value_columns;
226
227
68.2k
    for (size_t i = num_key_cols; i < total_cols; ++i) {
228
60.3k
        if (i == sequence_col_idx || i == delete_sign_idx ||
229
60.3k
            key_columns.end() != std::find(key_columns.begin(), key_columns.end(), i)) {
230
4.25k
            continue;
231
4.25k
        }
232
233
56.1k
        if (!value_columns.empty() && value_columns.size() % num_columns_per_group == 0) {
234
6.66k
            column_groups->push_back(value_columns);
235
6.66k
            value_columns.clear();
236
6.66k
        }
237
56.1k
        value_columns.push_back(cast_set<uint32_t>(i));
238
56.1k
    }
239
240
7.89k
    if (!value_columns.empty()) {
241
7.43k
        column_groups->push_back(value_columns);
242
7.43k
    }
243
7.89k
}
244
245
Status Merger::vertical_compact_one_group(
246
        BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema,
247
        bool is_key, const std::vector<uint32_t>& column_group, RowSourcesBuffer* row_source_buf,
248
        const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
249
        RowsetWriter* dst_rowset_writer, uint32_t max_rows_per_segment, Statistics* stats_output,
250
        std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size,
251
21.9k
        CompactionSampleInfo* sample_info, bool enable_sparse_optimization) {
252
    // build tablet reader
253
21.9k
    VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment;
254
21.9k
    VerticalBlockReader reader(row_source_buf);
255
21.9k
    TabletReader::ReaderParams reader_params;
256
21.9k
    reader_params.is_key_column_group = is_key;
257
21.9k
    reader_params.key_group_cluster_key_idxes = key_group_cluster_key_idxes;
258
21.9k
    reader_params.tablet = tablet;
259
21.9k
    reader_params.reader_type = reader_type;
260
21.9k
    reader_params.enable_sparse_optimization = enable_sparse_optimization;
261
262
21.9k
    TabletReadSource read_source;
263
21.9k
    read_source.rs_splits.reserve(src_rowset_readers.size());
264
174k
    for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
265
174k
        read_source.rs_splits.emplace_back(rs_reader);
266
174k
    }
267
21.9k
    read_source.fill_delete_predicates();
268
21.9k
    reader_params.set_read_source(std::move(read_source));
269
270
21.9k
    reader_params.version = dst_rowset_writer->version();
271
272
21.9k
    TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
273
21.9k
    merge_tablet_schema->copy_from(tablet_schema);
274
275
21.9k
    for (auto& del_pred_rs : reader_params.delete_predicates) {
276
807
        merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
277
807
    }
278
279
21.9k
    reader_params.tablet_schema = merge_tablet_schema;
280
21.9k
    bool has_cluster_key = false;
281
21.9k
    if (!tablet->tablet_schema()->cluster_key_uids().empty()) {
282
442
        reader_params.delete_bitmap = tablet->tablet_meta()->delete_bitmap_ptr();
283
442
        has_cluster_key = true;
284
442
    }
285
286
21.9k
    if (is_key && stats_output && stats_output->rowid_conversion) {
287
4.23k
        reader_params.record_rowids = true;
288
4.23k
        reader_params.rowid_conversion = stats_output->rowid_conversion;
289
4.23k
        stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
290
4.23k
    }
291
292
21.9k
    reader_params.return_columns = column_group;
293
21.9k
    reader_params.origin_return_columns = &reader_params.return_columns;
294
21.9k
    reader_params.batch_size = batch_size;
295
21.9k
    RETURN_IF_ERROR(reader.init(reader_params, sample_info));
296
297
21.9k
    Block block = tablet_schema.create_block(reader_params.return_columns);
298
21.9k
    size_t output_rows = 0;
299
21.9k
    bool eof = false;
300
54.4k
    while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) {
301
32.4k
        auto tablet_state = tablet->tablet_state();
302
32.4k
        if (tablet_state != TABLET_RUNNING && tablet_state != TABLET_NOTREADY) {
303
0
            tablet->clear_cache();
304
0
            return Status::Error<INTERNAL_ERROR>("tablet {} is not used any more",
305
0
                                                 tablet->tablet_id());
306
0
        }
307
        // Read one block from block reader
308
32.4k
        RETURN_NOT_OK_STATUS_WITH_WARN(reader.next_block_with_aggregation(&block, &eof),
309
32.4k
                                       "failed to read next block when merging rowsets of tablet " +
310
32.4k
                                               std::to_string(tablet->tablet_id()));
311
32.4k
        RETURN_NOT_OK_STATUS_WITH_WARN(
312
32.4k
                dst_rowset_writer->add_columns(&block, column_group, is_key, max_rows_per_segment,
313
32.4k
                                               has_cluster_key),
314
32.4k
                "failed to write block when merging rowsets of tablet " +
315
32.4k
                        std::to_string(tablet->tablet_id()));
316
317
32.4k
        if (is_key && reader_params.record_rowids && block.rows() > 0) {
318
4.57k
            std::vector<uint32_t> segment_num_rows;
319
4.57k
            RETURN_IF_ERROR(dst_rowset_writer->get_segment_num_rows(&segment_num_rows));
320
4.57k
            stats_output->rowid_conversion->add(reader.current_block_row_locations(),
321
4.57k
                                                segment_num_rows);
322
4.57k
        }
323
32.4k
        output_rows += block.rows();
324
32.4k
        block.clear_column_data();
325
32.4k
    }
326
21.9k
    if (ExecEnv::GetInstance()->storage_engine().stopped()) {
327
0
        return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped",
328
0
                                             tablet->tablet_id());
329
0
    }
330
331
21.9k
    if (stats_output != nullptr) {
332
21.9k
        if (is_key) {
333
7.87k
            stats_output->output_rows = output_rows;
334
7.87k
            stats_output->merged_rows = reader.merged_rows();
335
7.87k
            stats_output->filtered_rows = reader.filtered_rows();
336
7.87k
        }
337
21.9k
        stats_output->bytes_read_from_local = reader.stats().file_cache_stats.bytes_read_from_local;
338
21.9k
        stats_output->bytes_read_from_remote =
339
21.9k
                reader.stats().file_cache_stats.bytes_read_from_remote;
340
21.9k
        stats_output->cached_bytes_total = reader.stats().file_cache_stats.bytes_write_into_cache;
341
21.9k
        if (config::is_cloud_mode()) {
342
19.0k
            stats_output->cloud_local_read_time =
343
19.0k
                    reader.stats().file_cache_stats.local_io_timer / 1000;
344
19.0k
            stats_output->cloud_remote_read_time =
345
19.0k
                    reader.stats().file_cache_stats.remote_io_timer / 1000;
346
19.0k
        }
347
21.9k
    }
348
21.9k
    RETURN_IF_ERROR(dst_rowset_writer->flush_columns(is_key));
349
350
21.9k
    return Status::OK();
351
21.9k
}
352
353
// for segcompaction
354
Status Merger::vertical_compact_one_group(
355
        int64_t tablet_id, ReaderType reader_type, const TabletSchema& tablet_schema, bool is_key,
356
        const std::vector<uint32_t>& column_group, RowSourcesBuffer* row_source_buf,
357
        VerticalBlockReader& src_block_reader, segment_v2::SegmentWriter& dst_segment_writer,
358
        Statistics* stats_output, uint64_t* index_size, KeyBoundsPB& key_bounds,
359
22
        SimpleRowIdConversion* rowid_conversion) {
360
    // TODO: record_rowids
361
22
    Block block = tablet_schema.create_block(column_group);
362
22
    size_t output_rows = 0;
363
22
    bool eof = false;
364
138
    while (!eof && !ExecEnv::GetInstance()->storage_engine().stopped()) {
365
        // Read one block from block reader
366
116
        RETURN_NOT_OK_STATUS_WITH_WARN(src_block_reader.next_block_with_aggregation(&block, &eof),
367
116
                                       "failed to read next block when merging rowsets of tablet " +
368
116
                                               std::to_string(tablet_id));
369
116
        if (!block.rows()) {
370
0
            break;
371
0
        }
372
116
        RETURN_NOT_OK_STATUS_WITH_WARN(dst_segment_writer.append_block(&block, 0, block.rows()),
373
116
                                       "failed to write block when merging rowsets of tablet " +
374
116
                                               std::to_string(tablet_id));
375
376
116
        if (is_key && rowid_conversion != nullptr) {
377
30
            rowid_conversion->add(src_block_reader.current_block_row_locations());
378
30
        }
379
116
        output_rows += block.rows();
380
116
        block.clear_column_data();
381
116
    }
382
22
    if (ExecEnv::GetInstance()->storage_engine().stopped()) {
383
0
        return Status::Error<INTERNAL_ERROR>("tablet {} failed to do compaction, engine stopped",
384
0
                                             tablet_id);
385
0
    }
386
387
22
    if (stats_output != nullptr) {
388
22
        if (is_key) {
389
11
            stats_output->output_rows = output_rows;
390
11
            stats_output->merged_rows = src_block_reader.merged_rows();
391
11
            stats_output->filtered_rows = src_block_reader.filtered_rows();
392
11
        }
393
22
        stats_output->bytes_read_from_local =
394
22
                src_block_reader.stats().file_cache_stats.bytes_read_from_local;
395
22
        stats_output->bytes_read_from_remote =
396
22
                src_block_reader.stats().file_cache_stats.bytes_read_from_remote;
397
22
        stats_output->cached_bytes_total =
398
22
                src_block_reader.stats().file_cache_stats.bytes_write_into_cache;
399
22
    }
400
401
    // segcompaction produce only one segment at once
402
22
    RETURN_IF_ERROR(dst_segment_writer.finalize_columns_data());
403
22
    RETURN_IF_ERROR(dst_segment_writer.finalize_columns_index(index_size));
404
405
22
    if (is_key) {
406
11
        Slice min_key = dst_segment_writer.min_encoded_key();
407
11
        Slice max_key = dst_segment_writer.max_encoded_key();
408
11
        DCHECK_LE(min_key.compare(max_key), 0);
409
11
        key_bounds.set_min_key(min_key.to_string());
410
11
        key_bounds.set_max_key(max_key.to_string());
411
11
    }
412
413
22
    return Status::OK();
414
22
}
415
416
int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt,
417
                            ReaderType reader_type, int64_t group_per_row_from_footer,
418
21.7k
                            bool footer_fallback) {
419
21.7k
    auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
420
21.7k
    auto& sample_infos = tablet->get_sample_infos(reader_type);
421
21.7k
    std::unique_lock<std::mutex> lock(sample_info_lock);
422
21.7k
    CompactionSampleInfo info = sample_infos[group_index];
423
21.7k
    if (way_cnt <= 0) {
424
6.56k
        LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
425
6.56k
                  << tablet->tablet_id() << " way cnt: " << way_cnt;
426
6.56k
        return 4096 - 32;
427
6.56k
    }
428
15.1k
    int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt;
429
15.1k
    if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) {
430
0
        block_mem_limit /= 4;
431
0
    }
432
433
15.1k
    int64_t group_data_size = 0;
434
15.1k
    if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) {
435
0
        double smoothing_factor = 0.5;
436
0
        group_data_size =
437
0
                int64_t((cast_set<double>(info.group_data_size) * (1 - smoothing_factor)) +
438
0
                        (cast_set<double>(info.bytes / info.rows) * smoothing_factor));
439
0
        sample_infos[group_index].group_data_size = group_data_size;
440
15.1k
    } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) {
441
0
        group_data_size = info.group_data_size;
442
15.1k
    } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) {
443
8.33k
        group_data_size = info.bytes / info.rows;
444
8.33k
        sample_infos[group_index].group_data_size = group_data_size;
445
8.33k
    } else {
446
        // No historical sampling data available.
447
        // Try to use raw_data_bytes from segment footer for a better estimate.
448
6.86k
        if (!footer_fallback && group_per_row_from_footer > 0) {
449
6.21k
            int64_t batch_size = block_mem_limit / group_per_row_from_footer;
450
6.21k
            int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L));
451
6.21k
            LOG(INFO) << "estimate batch size from footer for vertical compaction, tablet id: "
452
6.21k
                      << tablet->tablet_id()
453
6.21k
                      << " group_per_row_from_footer: " << group_per_row_from_footer
454
6.21k
                      << " way cnt: " << way_cnt << " batch size: " << res;
455
6.21k
            return res;
456
6.21k
        }
457
6.86k
        LOG(INFO) << "estimate batch size for vertical compaction, tablet id: "
458
647
                  << tablet->tablet_id() << " group data size: " << info.group_data_size
459
647
                  << " row num: " << info.rows << " consume bytes: " << info.bytes
460
647
                  << " footer_fallback: " << footer_fallback;
461
647
        return 1024 - 32;
462
6.86k
    }
463
464
8.33k
    if (group_data_size <= 0) {
465
0
        LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: "
466
0
                     << tablet->tablet_id() << " unexpected group data size: " << group_data_size;
467
0
        return 4096 - 32;
468
0
    }
469
470
8.33k
    sample_infos[group_index].bytes = 0;
471
8.33k
    sample_infos[group_index].rows = 0;
472
473
8.33k
    int64_t batch_size = block_mem_limit / group_data_size;
474
8.33k
    int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), int64_t(32L));
475
8.33k
    LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id()
476
8.33k
              << " group data size: " << info.group_data_size << " row num: " << info.rows
477
8.33k
              << " consume bytes: " << info.bytes << " way cnt: " << way_cnt
478
8.33k
              << " batch size: " << res;
479
8.33k
    return res;
480
8.33k
}
481
482
// steps to do vertical merge:
483
// 1. split columns into column groups
484
// 2. compact groups one by one, generate a row_source_buf when compact key group
485
// and use this row_source_buf to compact value column groups
486
// 3. build output rowset
487
Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_type,
488
                                      const TabletSchema& tablet_schema,
489
                                      const std::vector<RowsetReaderSharedPtr>& src_rowset_readers,
490
                                      RowsetWriter* dst_rowset_writer,
491
                                      uint32_t max_rows_per_segment, int64_t merge_way_num,
492
                                      Statistics* stats_output,
493
7.86k
                                      VerticalCompactionProgressCallback progress_cb) {
494
7.86k
    LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id();
495
7.86k
    std::vector<std::vector<uint32_t>> column_groups;
496
7.86k
    std::vector<uint32_t> key_group_cluster_key_idxes;
497
    // If BE config vertical_compaction_num_columns_per_group has been modified from
498
    // its default value (5), use the BE config; otherwise use the tablet meta value.
499
7.86k
    constexpr int32_t default_num_columns_per_group = 5;
500
7.86k
    int32_t num_columns_per_group =
501
7.86k
            config::vertical_compaction_num_columns_per_group != default_num_columns_per_group
502
7.86k
                    ? config::vertical_compaction_num_columns_per_group
503
7.86k
                    : tablet->tablet_meta()->vertical_compaction_num_columns_per_group();
504
505
7.86k
    DBUG_EXECUTE_IF("Merger.vertical_merge_rowsets.check_num_columns_per_group", {
506
7.86k
        auto expected_value = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
507
7.86k
                "Merger.vertical_merge_rowsets.check_num_columns_per_group", "expected_value", -1);
508
7.86k
        auto expected_tablet_id = DebugPoints::instance()->get_debug_param_or_default<int64_t>(
509
7.86k
                "Merger.vertical_merge_rowsets.check_num_columns_per_group", "tablet_id", -1);
510
7.86k
        if (expected_tablet_id != -1 && expected_tablet_id == tablet->tablet_id()) {
511
7.86k
            if (expected_value != -1 && expected_value != num_columns_per_group) {
512
7.86k
                LOG(FATAL) << "DEBUG_POINT CHECK FAILED: expected num_columns_per_group="
513
7.86k
                           << expected_value << " but got " << num_columns_per_group
514
7.86k
                           << " for tablet_id=" << tablet->tablet_id();
515
7.86k
            } else {
516
7.86k
                LOG(INFO) << "DEBUG_POINT CHECK PASSED: num_columns_per_group="
517
7.86k
                          << num_columns_per_group << ", tablet_id=" << tablet->tablet_id();
518
7.86k
            }
519
7.86k
        }
520
7.86k
    });
521
522
7.86k
    vertical_split_columns(tablet_schema, &column_groups, &key_group_cluster_key_idxes,
523
7.86k
                           num_columns_per_group);
524
525
7.86k
    if (progress_cb) {
526
7.76k
        progress_cb(column_groups.size(), 0);
527
7.76k
    }
528
529
    // Calculate total rows for density calculation after compaction
530
7.86k
    int64_t total_rows = 0;
531
62.2k
    for (const auto& rs_reader : src_rowset_readers) {
532
62.2k
        total_rows += rs_reader->rowset()->rowset_meta()->num_rows();
533
62.2k
    }
534
535
    // Use historical density for sparse wide table optimization
536
    // density = (total_cells - null_cells) / total_cells, smaller means more sparse
537
    // When density <= threshold, enable sparse optimization
538
    // threshold = 0 means disable, 1 means always enable (default)
539
7.86k
    bool enable_sparse_optimization = false;
540
7.86k
    if (config::sparse_column_compaction_threshold_percent > 0 &&
541
7.88k
        tablet->keys_type() == KeysType::UNIQUE_KEYS) {
542
3.96k
        double density = tablet->compaction_density.load();
543
3.96k
        enable_sparse_optimization = density <= config::sparse_column_compaction_threshold_percent;
544
545
3.96k
        LOG(INFO) << "Vertical compaction sparse optimization check: tablet_id="
546
3.96k
                  << tablet->tablet_id() << ", density=" << density
547
3.96k
                  << ", threshold=" << config::sparse_column_compaction_threshold_percent
548
3.96k
                  << ", total_rows=" << total_rows
549
3.96k
                  << ", num_columns=" << tablet_schema.num_columns()
550
3.96k
                  << ", total_cells=" << total_rows * tablet_schema.num_columns()
551
3.96k
                  << ", enable_sparse_optimization=" << enable_sparse_optimization;
552
3.96k
    }
553
554
7.86k
    RowSourcesBuffer row_sources_buf(tablet->tablet_id(), dst_rowset_writer->context().tablet_path,
555
7.86k
                                     reader_type);
556
7.86k
    Merger::Statistics total_stats;
557
7.88k
    if (stats_output != nullptr) {
558
7.88k
        total_stats.rowid_conversion = stats_output->rowid_conversion;
559
7.88k
    }
560
7.86k
    auto& sample_info_lock = tablet->get_sample_info_lock(reader_type);
561
7.86k
    auto& sample_infos = tablet->get_sample_infos(reader_type);
562
7.86k
    {
563
7.86k
        std::unique_lock<std::mutex> lock(sample_info_lock);
564
7.86k
        sample_infos.resize(column_groups.size());
565
7.86k
    }
566
    // Collect per-column raw_data_bytes from segment footer for first-time batch size estimation.
567
    // raw_data_bytes is the original data size before encoding, close to runtime Block::bytes().
568
    // Only collect when needed: skip if manual batch_size override is set, or if ALL groups
569
    // already have historical sampling data. Use per-group granularity so that schema evolution
570
    // (new groups without history) still gets footer-based estimation.
571
7.86k
    struct ColumnRawSizeInfo {
572
7.86k
        int64_t total_raw_bytes = 0;
573
7.86k
        int64_t rows_with_data = 0;
574
7.86k
    };
575
7.86k
    std::unordered_map<int32_t, ColumnRawSizeInfo> column_raw_sizes;
576
7.86k
    bool need_footer_collection = false;
577
7.86k
    if (config::compaction_batch_size == -1) {
578
7.83k
        std::unique_lock<std::mutex> lock(sample_info_lock);
579
14.7k
        for (const auto& info : sample_infos) {
580
14.7k
            if (info.group_data_size <= 0 && info.bytes <= 0 && info.rows <= 0) {
581
4.48k
                need_footer_collection = true;
582
4.48k
                break;
583
4.48k
            }
584
14.7k
        }
585
7.83k
    }
586
7.86k
    if (need_footer_collection) {
587
32.8k
        for (const auto& rs_reader : src_rowset_readers) {
588
32.8k
            auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rs_reader->rowset());
589
32.8k
            if (!beta_rowset) {
590
0
                continue;
591
0
            }
592
32.8k
            std::vector<segment_v2::SegmentSharedPtr> segments;
593
32.8k
            auto st = beta_rowset->load_segments(&segments);
594
32.8k
            if (!st.ok()) {
595
0
                LOG(WARNING) << "Failed to load segments for footer raw_data_bytes collection"
596
0
                             << ", tablet_id: " << tablet->tablet_id()
597
0
                             << ", rowset_id: " << beta_rowset->rowset_id() << ", status: " << st;
598
0
                continue;
599
0
            }
600
32.8k
            for (const auto& segment : segments) {
601
12.1k
                int64_t row_count = segment->num_rows();
602
12.1k
                auto collect_st = segment->traverse_column_meta_pbs(
603
125k
                        [&](const segment_v2::ColumnMetaPB& meta) {
604
125k
                            int32_t uid = meta.unique_id();
605
125k
                            if (uid >= 0 && meta.has_raw_data_bytes()) {
606
117k
                                auto& info = column_raw_sizes[uid];
607
117k
                                info.total_raw_bytes += meta.raw_data_bytes();
608
117k
                                info.rows_with_data += row_count;
609
117k
                            }
610
125k
                        });
611
12.1k
                if (!collect_st.ok()) {
612
0
                    LOG(WARNING) << "Failed to traverse column meta for footer collection"
613
0
                                 << ", tablet_id: " << tablet->tablet_id()
614
0
                                 << ", status: " << collect_st;
615
0
                }
616
12.1k
            }
617
32.8k
        }
618
4.48k
    }
619
620
    // Pre-compute per-row estimate for each column group from footer data.
621
7.86k
    std::vector<int64_t> group_per_row_from_footer(column_groups.size(), 0);
622
7.86k
    std::vector<bool> group_footer_fallback(column_groups.size(), false);
623
29.8k
    for (size_t i = 0; i < column_groups.size(); ++i) {
624
21.9k
        int64_t group_per_row = 0;
625
21.9k
        bool need_fallback = false;
626
90.0k
        for (uint32_t col_ordinal : column_groups[i]) {
627
90.0k
            const auto& col = tablet_schema.column(col_ordinal);
628
90.0k
            int32_t uid = col.unique_id();
629
630
            // Variant columns (root or subcolumn): raw_data_bytes is 0 (TODO in writer),
631
            // cannot estimate from footer, fallback to default for the entire group.
632
90.0k
            if (uid < 0 || col.is_variant_type()) {
633
1.29k
                need_fallback = true;
634
1.29k
                break;
635
1.29k
            }
636
637
88.8k
            int64_t col_per_row = 0;
638
88.8k
            auto it = column_raw_sizes.find(uid);
639
88.8k
            if (it != column_raw_sizes.end() && it->second.rows_with_data > 0) {
640
22.6k
                col_per_row = it->second.total_raw_bytes / it->second.rows_with_data;
641
642
                // Structural overhead compensation for scalar columns only.
643
                // Only add when footer data actually exists for this column,
644
                // to avoid creating a non-zero estimate purely from structural overhead.
645
                // Complex types (ARRAY/MAP/STRUCT) have raw_data_bytes recursively aggregated
646
                // from sub-writers, so no additional compensation is needed.
647
22.6k
                if (col.type() != FieldType::OLAP_FIELD_TYPE_ARRAY &&
648
22.6k
                    col.type() != FieldType::OLAP_FIELD_TYPE_MAP &&
649
22.6k
                    col.type() != FieldType::OLAP_FIELD_TYPE_STRUCT) {
650
21.4k
                    if (col.is_nullable()) {
651
12.3k
                        col_per_row += 1; // null map: 1 byte per row
652
12.3k
                    }
653
21.4k
                    if (col.is_length_variable_type()) {
654
8.23k
                        col_per_row += 8; // offset array: 8 bytes per row at runtime
655
8.23k
                    }
656
21.4k
                }
657
22.6k
            }
658
659
88.8k
            group_per_row += col_per_row;
660
88.8k
        }
661
21.9k
        group_per_row_from_footer[i] = group_per_row;
662
21.9k
        group_footer_fallback[i] = need_fallback;
663
21.9k
    }
664
665
    // compact group one by one
666
29.8k
    for (auto i = 0; i < column_groups.size(); ++i) {
667
21.9k
        VLOG_NOTICE << "row source size: " << row_sources_buf.total_size();
668
21.9k
        bool is_key = (i == 0);
669
21.9k
        int64_t batch_size = config::compaction_batch_size != -1
670
21.9k
                                     ? config::compaction_batch_size
671
21.9k
                                     : estimate_batch_size(i, tablet, merge_way_num, reader_type,
672
21.7k
                                                           group_per_row_from_footer[i],
673
21.7k
                                                           group_footer_fallback[i]);
674
21.9k
        CompactionSampleInfo sample_info;
675
21.9k
        Merger::Statistics group_stats;
676
21.9k
        group_stats.rowid_conversion = total_stats.rowid_conversion;
677
18.4E
        Merger::Statistics* group_stats_ptr = stats_output != nullptr ? &group_stats : nullptr;
678
21.9k
        Status st = vertical_compact_one_group(
679
21.9k
                tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf,
680
21.9k
                src_rowset_readers, dst_rowset_writer, max_rows_per_segment, group_stats_ptr,
681
21.9k
                key_group_cluster_key_idxes, batch_size, &sample_info, enable_sparse_optimization);
682
21.9k
        {
683
21.9k
            std::unique_lock<std::mutex> lock(sample_info_lock);
684
21.9k
            sample_infos[i] = sample_info;
685
21.9k
        }
686
21.9k
        RETURN_IF_ERROR(st);
687
21.9k
        if (stats_output != nullptr) {
688
21.9k
            total_stats.bytes_read_from_local += group_stats.bytes_read_from_local;
689
21.9k
            total_stats.bytes_read_from_remote += group_stats.bytes_read_from_remote;
690
21.9k
            total_stats.cached_bytes_total += group_stats.cached_bytes_total;
691
21.9k
            total_stats.cloud_local_read_time += group_stats.cloud_local_read_time;
692
21.9k
            total_stats.cloud_remote_read_time += group_stats.cloud_remote_read_time;
693
21.9k
            if (is_key) {
694
7.85k
                total_stats.output_rows = group_stats.output_rows;
695
7.85k
                total_stats.merged_rows = group_stats.merged_rows;
696
7.85k
                total_stats.filtered_rows = group_stats.filtered_rows;
697
7.85k
                total_stats.rowid_conversion = group_stats.rowid_conversion;
698
7.85k
            }
699
21.9k
        }
700
21.9k
        if (progress_cb) {
701
21.6k
            progress_cb(column_groups.size(), i + 1);
702
21.6k
        }
703
21.9k
        if (is_key) {
704
7.88k
            RETURN_IF_ERROR(row_sources_buf.flush());
705
7.88k
        }
706
21.9k
        RETURN_IF_ERROR(row_sources_buf.seek_to_begin());
707
21.9k
    }
708
709
    // Calculate and store density for next compaction's sparse optimization threshold
710
    // density = (total_cells - total_null_count) / total_cells
711
    // Smaller density means more sparse
712
7.86k
    {
713
7.86k
        std::unique_lock<std::mutex> lock(sample_info_lock);
714
7.86k
        int64_t total_null_count = 0;
715
21.9k
        for (const auto& info : sample_infos) {
716
21.9k
            total_null_count += info.null_count;
717
21.9k
        }
718
7.86k
        int64_t total_cells = total_rows * tablet_schema.num_columns();
719
7.86k
        if (total_cells > 0) {
720
5.17k
            double density = static_cast<double>(total_cells - total_null_count) /
721
5.17k
                             static_cast<double>(total_cells);
722
5.17k
            tablet->compaction_density.store(density);
723
5.17k
            LOG(INFO) << "Vertical compaction density update: tablet_id=" << tablet->tablet_id()
724
5.17k
                      << ", total_cells=" << total_cells
725
5.17k
                      << ", total_null_count=" << total_null_count << ", density=" << density;
726
5.17k
        }
727
7.86k
    }
728
729
    // finish compact, build output rowset
730
7.86k
    VLOG_NOTICE << "finish compact groups";
731
7.86k
    RETURN_IF_ERROR(dst_rowset_writer->final_flush());
732
733
7.88k
    if (stats_output != nullptr) {
734
7.88k
        *stats_output = total_stats;
735
7.88k
    }
736
737
7.86k
    return Status::OK();
738
7.86k
}
739
} // namespace doris