Coverage Report

Created: 2026-05-09 18:55

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/iterator/block_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/iterator/block_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <boost/iterator/iterator_facade.hpp>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
30
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
31
#include "cloud/config.h"
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/config.h"
34
#include "common/status.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column_nullable.h"
37
#include "core/column/column_vector.h"
38
#include "core/data_type/data_type_number.h"
39
#include "exprs/aggregate/aggregate_function_reader.h"
40
#include "exprs/function_filter.h"
41
#include "runtime/runtime_state.h"
42
#include "storage/iterator/vcollect_iterator.h"
43
#include "storage/olap_common.h"
44
#include "storage/olap_define.h"
45
#include "storage/predicate/like_column_predicate.h"
46
#include "storage/rowset/rowset.h"
47
#include "storage/rowset/rowset_reader_context.h"
48
#include "storage/tablet/tablet.h"
49
#include "storage/tablet/tablet_schema.h"
50
51
namespace doris {
52
class ColumnPredicate;
53
} // namespace doris
54
55
namespace doris {
56
using namespace ErrorCode;
57
58
static constexpr int32_t BLOCK_SIZE_CHECK_INTERVAL_ROWS = 64;
59
60
82
BlockReader::~BlockReader() {
61
85
    for (int i = 0; i < _agg_functions.size(); ++i) {
62
3
        _agg_functions[i]->destroy(_agg_places[i]);
63
3
        delete[] _agg_places[i];
64
3
    }
65
82
}
66
67
578
Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
68
578
    auto res = (this->*_next_block_func)(block, eof);
69
578
    if (!config::is_cloud_mode()) {
70
578
        if (!res.ok()) [[unlikely]] {
71
0
            static_cast<Tablet*>(_tablet.get())->report_error(res);
72
0
        }
73
578
    }
74
578
    return res;
75
578
}
76
77
67
bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) {
78
67
    std::string pre_rs_last_key;
79
67
    bool pre_rs_key_bounds_truncated {false};
80
67
    const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
81
162
    for (const auto& rs_split : rs_splits) {
82
162
        if (rs_split.rs_reader->rowset()->num_rows() == 0) {
83
0
            continue;
84
0
        }
85
162
        if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
86
0
            return true;
87
0
        }
88
162
        std::string rs_first_key;
89
162
        bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key);
90
162
        if (!has_first_key) {
91
0
            return true;
92
0
        }
93
162
        bool cur_rs_key_bounds_truncated {
94
162
                rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()};
95
162
        if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_rs_last_key},
96
162
                                                  pre_rs_key_bounds_truncated, Slice {rs_first_key},
97
162
                                                  cur_rs_key_bounds_truncated)) {
98
58
            return true;
99
58
        }
100
104
        bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key);
101
104
        pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
102
104
        CHECK(has_last_key);
103
104
    }
104
9
    return false;
105
67
}
106
107
48
Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
108
48
    auto res = _capture_rs_readers(read_params);
109
48
    if (!res.ok()) {
110
0
        LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
111
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
112
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
113
0
                     << ", reader_type:" << int(read_params.reader_type)
114
0
                     << ", version:" << read_params.version;
115
0
        return res;
116
0
    }
117
    // check if rowsets are noneoverlapping
118
48
    {
119
48
        SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns);
120
48
        _is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params);
121
48
        _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
122
48
                            read_params.read_orderby_key_reverse);
123
48
    }
124
125
48
    std::vector<RowsetReaderSharedPtr> valid_rs_readers;
126
48
    RuntimeState* runtime_state = read_params.runtime_state;
127
128
48
    {
129
48
        SCOPED_RAW_TIMER(&_stats.block_reader_rs_readers_init_timer_ns);
130
192
        for (int i = 0; i < read_params.rs_splits.size(); ++i) {
131
144
            if (runtime_state != nullptr && runtime_state->is_cancelled()) {
132
0
                return runtime_state->cancel_reason();
133
0
            }
134
135
144
            auto& rs_split = read_params.rs_splits[i];
136
137
            // _vcollect_iter.topn_next() will init rs_reader by itself
138
144
            if (!_vcollect_iter.use_topn_next()) {
139
144
                RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, rs_split));
140
144
            }
141
142
144
            Status res1 = _vcollect_iter.add_child(rs_split);
143
144
            if (!res1.ok() && !res1.is<END_OF_FILE>()) {
144
0
                LOG(WARNING) << "failed to add child to iterator, err=" << res1;
145
0
                return res1;
146
0
            }
147
144
            if (res1.ok()) {
148
144
                valid_rs_readers.push_back(rs_split.rs_reader);
149
144
            }
150
144
        }
151
48
    }
152
48
    {
153
48
        SCOPED_RAW_TIMER(&_stats.block_reader_build_heap_init_timer_ns);
154
48
        RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers));
155
        // _vcollect_iter.topn_next() can not use current_row
156
48
        if (!_vcollect_iter.use_topn_next()) {
157
48
            auto status = _vcollect_iter.current_row(&_next_row);
158
48
            _eof = status.is<END_OF_FILE>();
159
48
        }
160
48
    }
161
162
0
    return Status::OK();
163
48
}
164
165
0
Status BlockReader::_init_agg_state(const ReaderParams& read_params) {
166
0
    if (_eof) {
167
0
        return Status::OK();
168
0
    }
169
170
0
    _stored_data_columns =
171
0
            _next_row.block->create_same_struct_block(batch_max_rows())->mutate_columns();
172
173
0
    _stored_has_null_tag.resize(_stored_data_columns.size());
174
0
    _stored_has_variable_length_tag.resize(_stored_data_columns.size());
175
176
0
    auto& tablet_schema = *_tablet_schema;
177
0
    for (auto idx : _agg_columns_idx) {
178
0
        auto column = tablet_schema.column(
179
0
                read_params.origin_return_columns->at(_return_columns_loc[idx]));
180
0
        AggregateFunctionPtr function =
181
0
                column.get_aggregate_function(AGG_READER_SUFFIX, read_params.get_be_exec_version());
182
183
        // to avoid coredump when something goes wrong(i.e. column missmatch)
184
0
        if (!function) {
185
0
            return Status::InternalError(
186
0
                    "Failed to init reader when init agg state: "
187
0
                    "tablet_id: {}, schema_hash: {}, reader_type: {}, version: {}",
188
0
                    read_params.tablet->tablet_id(), read_params.tablet->schema_hash(),
189
0
                    int(read_params.reader_type), read_params.version.to_string());
190
0
        }
191
0
        _agg_functions.push_back(function);
192
        // create aggregate data
193
0
        AggregateDataPtr place = new char[function->size_of_data()];
194
0
        SAFE_CREATE(function->create(place), {
195
0
            _agg_functions.pop_back();
196
0
            delete[] place;
197
0
        });
198
0
        _agg_places.push_back(place);
199
200
        // calculate `_has_variable_length_tag` tag. like string, array, map
201
0
        _stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
202
0
    }
203
204
0
    return Status::OK();
205
0
}
206
207
48
Status BlockReader::init(const ReaderParams& read_params) {
208
48
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_timer_ns);
209
48
    RETURN_IF_ERROR(TabletReader::init(read_params));
210
211
48
    auto return_column_size = read_params.origin_return_columns->size();
212
48
    _return_columns_loc.resize(read_params.return_columns.size(), -1);
213
48
    std::unordered_map<int32_t /*cid*/, int32_t /*pos*/> pos_map;
214
176
    for (int i = 0; i < return_column_size; ++i) {
215
128
        auto cid = read_params.origin_return_columns->at(i);
216
        // For each original cid, find the index in return_columns
217
240
        for (int j = 0; j < read_params.return_columns.size(); ++j) {
218
240
            if (read_params.return_columns[j] == cid) {
219
128
                if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) {
220
128
                    pos_map[cid] = (int32_t)_normal_columns_idx.size();
221
128
                    _normal_columns_idx.emplace_back(j);
222
128
                } else {
223
0
                    _agg_columns_idx.emplace_back(j);
224
0
                }
225
128
                _return_columns_loc[j] = i;
226
128
                break;
227
128
            }
228
240
        }
229
128
    }
230
231
48
    if (_tablet_schema->has_seq_map()) {
232
0
        if (_tablet_schema->has_sequence_col()) {
233
0
            auto msg = "sequence columns conflict, both seq_col and seq_map are true!";
234
0
            LOG(WARNING) << msg;
235
0
            return Status::InternalError(msg);
236
0
        }
237
0
        _has_seq_map = true;
238
0
        for (auto seq_val_iter = _tablet_schema->seq_col_idx_to_value_cols_idx().cbegin();
239
0
             seq_val_iter != _tablet_schema->seq_col_idx_to_value_cols_idx().cend();
240
0
             ++seq_val_iter) {
241
0
            int seq_loc = -1;
242
0
            for (int i = 0; i < read_params.return_columns.size(); ++i) {
243
0
                if (read_params.return_columns[i] == seq_val_iter->first) {
244
0
                    seq_loc = i;
245
0
                    break;
246
0
                }
247
0
            }
248
0
            if (seq_loc == -1) {
249
                // don't need to deal with this seq col
250
0
                continue;
251
0
            }
252
253
0
            std::vector<uint32_t> pos_vec;
254
0
            for (auto agg_cid : seq_val_iter->second) {
255
0
                const auto& val_pos_iter = pos_map.find(agg_cid);
256
0
                if (val_pos_iter == pos_map.end()) {
257
0
                    continue;
258
0
                }
259
0
                pos_vec.emplace_back(val_pos_iter->second);
260
0
            }
261
0
            if (_return_columns_loc[seq_loc] == -1) {
262
0
                _seq_map_not_in_origin_block.emplace(seq_loc, pos_vec);
263
0
            } else {
264
0
                _seq_map_in_origin_block.emplace(seq_loc, pos_vec);
265
0
            }
266
0
        }
267
0
    }
268
269
48
    auto status = _init_collect_iter(read_params);
270
48
    if (!status.ok()) [[unlikely]] {
271
0
        if (!config::is_cloud_mode()) {
272
0
            static_cast<Tablet*>(_tablet.get())->report_error(status);
273
0
        }
274
0
        return status;
275
0
    }
276
277
48
    if (_direct_mode) {
278
0
        _next_block_func = &BlockReader::_direct_next_block;
279
0
        return Status::OK();
280
0
    }
281
48
    if (_has_seq_map && !_eof) {
282
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
283
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
284
0
            auto seq_idx = it->first;
285
0
            _seq_columns.insert(
286
0
                    {seq_idx, _next_row.block->get_by_position(seq_idx).column->clone_empty()});
287
0
        }
288
0
    }
289
290
48
    switch (tablet()->keys_type()) {
291
16
    case KeysType::DUP_KEYS:
292
16
        _next_block_func = &BlockReader::_direct_next_block;
293
16
        break;
294
32
    case KeysType::UNIQUE_KEYS:
295
32
        if (read_params.reader_type == ReaderType::READER_QUERY &&
296
32
            _reader_context.enable_unique_key_merge_on_write) {
297
0
            _next_block_func = &BlockReader::_direct_next_block;
298
32
        } else if (_has_seq_map) {
299
0
            _next_block_func = &BlockReader::_replace_key_next_block;
300
32
        } else {
301
32
            _next_block_func = &BlockReader::_unique_key_next_block;
302
32
            if (_filter_delete) {
303
32
                _delete_filter_column = ColumnUInt8::create();
304
32
            }
305
32
        }
306
32
        break;
307
0
    case KeysType::AGG_KEYS:
308
0
        _next_block_func = &BlockReader::_agg_key_next_block;
309
0
        RETURN_IF_ERROR(_init_agg_state(read_params));
310
0
        break;
311
0
    default:
312
0
        DCHECK(false) << "No next row function for type:" << tablet()->keys_type();
313
0
        break;
314
48
    }
315
316
48
    return Status::OK();
317
48
}
318
319
294
Status BlockReader::_direct_next_block(Block* block, bool* eof) {
320
294
    auto res = _vcollect_iter.next(block);
321
294
    if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
322
0
        return res;
323
0
    }
324
294
    *eof = res.is<END_OF_FILE>();
325
294
    _eof = *eof;
326
294
    if (UNLIKELY(_reader_context.record_rowids)) {
327
294
        res = _vcollect_iter.current_block_row_locations(&_block_row_locations);
328
294
        if (UNLIKELY(!res.ok() && res != Status::Error<END_OF_FILE>(""))) {
329
0
            return res;
330
0
        }
331
294
        DCHECK_EQ(_block_row_locations.size(), block->rows());
332
294
    }
333
294
    return Status::OK();
334
294
}
335
336
0
Status BlockReader::_direct_agg_key_next_block(Block* block, bool* eof) {
337
0
    return Status::OK();
338
0
}
339
340
0
Status BlockReader::_replace_key_next_block(Block* block, bool* eof) {
341
0
    if (UNLIKELY(_eof)) {
342
0
        *eof = true;
343
0
        return Status::OK();
344
0
    }
345
346
0
    auto target_block_row = 0;
347
0
    auto target_columns = block->mutate_columns();
348
    // currently seq mapping only support mor table
349
    // so this will not be executed for the time being
350
0
    if (UNLIKELY(_reader_context.record_rowids)) {
351
0
        _block_row_locations.resize(batch_max_rows());
352
0
    }
353
0
    auto merged_row = 0;
354
0
    while (target_block_row < batch_max_rows() && !_eof) {
355
0
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
356
        // use the first line to init _seq_columns
357
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
358
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
359
0
            auto seq_idx = it->first;
360
0
            _update_last_mutil_seq(seq_idx);
361
0
        }
362
0
        if (UNLIKELY(_reader_context.record_rowids)) {
363
0
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
364
0
        }
365
0
        target_block_row++;
366
367
0
        while (!_eof) {
368
            // the version is in reverse order, the first row is the highest version,
369
            // in UNIQUE_KEY highest version is the final result
370
0
            auto res = _vcollect_iter.next(&_next_row);
371
0
            if (UNLIKELY(res.is<END_OF_FILE>())) {
372
0
                _eof = true;
373
0
                *eof = true;
374
0
                if (UNLIKELY(_reader_context.record_rowids)) {
375
0
                    _block_row_locations.resize(target_block_row);
376
0
                }
377
0
                break;
378
0
            }
379
380
0
            if (UNLIKELY(!res.ok())) {
381
0
                LOG(WARNING) << "next failed: " << res;
382
0
                return res;
383
0
            }
384
385
0
            if (_next_row.is_same) {
386
0
                merged_row++;
387
0
                _compare_sequence_map_and_replace(target_columns);
388
0
            } else {
389
0
                break;
390
0
            }
391
0
        }
392
        // Byte-budget check: after the inner loop _next_row is either EOF or the next different
393
        // key, so it is safe to stop accumulating here without repeating any row.
394
0
        if (target_block_row % BLOCK_SIZE_CHECK_INTERVAL_ROWS == 0 &&
395
0
            _reached_byte_budget(target_columns)) {
396
0
            if (UNLIKELY(_reader_context.record_rowids)) {
397
0
                _block_row_locations.resize(target_block_row);
398
0
            }
399
0
            break;
400
0
        }
401
0
    }
402
0
    _merged_rows += merged_row;
403
0
    return Status::OK();
404
0
}
405
406
4.36k
bool BlockReader::_reached_byte_budget(const MutableColumns& columns) const {
407
4.36k
    return config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
408
4.36k
           Block::columns_byte_size(columns) >= _reader_context.preferred_block_size_bytes;
409
4.36k
}
410
411
0
void BlockReader::_compare_sequence_map_and_replace(MutableColumns& columns) {
412
0
    auto src_block = _next_row.block.get();
413
0
    auto src_pos = _next_row.row_pos;
414
415
    // use seq column in origin block to compare and replace
416
0
    for (auto it = _seq_map_in_origin_block.cbegin(); it != _seq_map_in_origin_block.cend(); ++it) {
417
0
        auto seq_idx = it->first;
418
0
        auto dst_seq_column = columns[_return_columns_loc[seq_idx]].get();
419
0
        auto dst_pos = dst_seq_column->size() - 1;
420
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
421
        // the rowset version of dst is higher .
422
0
        auto res = dst_seq_column->compare_at(dst_pos, src_pos, *src_seq_column, -1);
423
0
        if (res >= 0) {
424
0
            continue;
425
0
        }
426
427
        // update value and seq column
428
0
        for (auto& p : it->second) {
429
0
            auto val_idx = _normal_columns_idx[p];
430
0
            auto src_column = src_block->get_by_position(val_idx).column;
431
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
432
0
            dst_column->pop_back(1);
433
0
            dst_column->insert_from(*src_column, src_pos);
434
0
        }
435
436
0
        dst_seq_column->pop_back(1);
437
0
        dst_seq_column->insert_from(*src_seq_column, src_pos);
438
0
    }
439
440
    // use temp seq block to compare and replace because origin block not contains these seq columns
441
0
    for (auto it = _seq_map_not_in_origin_block.cbegin(); it != _seq_map_not_in_origin_block.cend();
442
0
         ++it) {
443
0
        auto seq_idx = it->first;
444
0
        auto dst_seq_column = _seq_columns[seq_idx].get();
445
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
446
        // the rowset version of dst is higher .
447
0
        auto res = dst_seq_column->compare_at(0, src_pos, *src_seq_column, -1);
448
0
        if (res >= 0) {
449
0
            continue;
450
0
        }
451
452
        // update value and seq column (if need to return)
453
0
        for (auto& p : it->second) {
454
0
            auto val_idx = _normal_columns_idx[p];
455
0
            auto src_column = src_block->get_by_position(val_idx).column;
456
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
457
0
            dst_column->pop_back(1);
458
0
            dst_column->insert_from(*src_column, src_pos);
459
0
        }
460
461
0
        _update_last_mutil_seq(seq_idx);
462
0
    }
463
0
}
464
465
0
void BlockReader::_update_last_mutil_seq(int seq_idx) {
466
0
    auto block = _next_row.block.get();
467
0
    _seq_columns[seq_idx]->clear();
468
0
    _seq_columns[seq_idx]->insert_from(*block->get_by_position(seq_idx).column, _next_row.row_pos);
469
0
}
470
471
0
Status BlockReader::_agg_key_next_block(Block* block, bool* eof) {
472
0
    if (UNLIKELY(_eof)) {
473
0
        *eof = true;
474
0
        return Status::OK();
475
0
    }
476
477
0
    auto target_block_row = 0;
478
0
    auto merged_row = 0;
479
0
    auto target_columns = block->mutate_columns();
480
0
    RETURN_IF_ERROR(_insert_data_normal(target_columns));
481
0
    target_block_row++;
482
0
    _append_agg_data(target_columns);
483
484
0
    while (true) {
485
0
        auto res = _vcollect_iter.next(&_next_row);
486
0
        if (UNLIKELY(res.is<END_OF_FILE>())) {
487
0
            _eof = true;
488
0
            *eof = true;
489
0
            break;
490
0
        }
491
0
        if (UNLIKELY(!res.ok())) {
492
0
            LOG(WARNING) << "next failed: " << res;
493
0
            return res;
494
0
        }
495
496
0
        if (!_next_row.is_same) {
497
0
            if (target_block_row == batch_max_rows()) {
498
0
                break;
499
0
            }
500
            // Byte-budget check at group boundary: _next_row is the first row of the new group
501
            // and is still pending (not yet inserted), so stopping here is safe.
502
0
            if (target_block_row % BLOCK_SIZE_CHECK_INTERVAL_ROWS == 0 &&
503
0
                _reached_byte_budget(target_columns)) {
504
0
                break;
505
0
            }
506
507
0
            _agg_data_counters.push_back(_last_agg_data_counter);
508
0
            _last_agg_data_counter = 0;
509
510
0
            RETURN_IF_ERROR(_insert_data_normal(target_columns));
511
512
0
            target_block_row++;
513
0
        } else {
514
0
            merged_row++;
515
0
        }
516
517
0
        _append_agg_data(target_columns);
518
0
    }
519
520
0
    _agg_data_counters.push_back(_last_agg_data_counter);
521
0
    _last_agg_data_counter = 0;
522
0
    _update_agg_data(target_columns);
523
0
    block->set_columns(std::move(target_columns));
524
525
0
    _merged_rows += merged_row;
526
0
    return Status::OK();
527
0
}
528
529
284
Status BlockReader::_unique_key_next_block(Block* block, bool* eof) {
530
284
    if (UNLIKELY(_eof)) {
531
0
        *eof = true;
532
0
        return Status::OK();
533
0
    }
534
535
284
    auto target_block_row = 0;
536
284
    auto target_columns = block->mutate_columns();
537
284
    if (UNLIKELY(_reader_context.record_rowids)) {
538
284
        _block_row_locations.resize(batch_max_rows());
539
284
    }
540
541
280k
    do {
542
280k
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
543
544
280k
        if (UNLIKELY(_reader_context.record_rowids)) {
545
280k
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
546
280k
        }
547
280k
        target_block_row++;
548
549
        // the version is in reverse order, the first row is the highest version,
550
        // in UNIQUE_KEY highest version is the final result, there is no need to
551
        // merge the lower versions
552
280k
        auto res = _vcollect_iter.next(&_next_row);
553
280k
        if (UNLIKELY(res.is<END_OF_FILE>())) {
554
32
            _eof = true;
555
32
            *eof = true;
556
32
            if (UNLIKELY(_reader_context.record_rowids)) {
557
32
                _block_row_locations.resize(target_block_row);
558
32
            }
559
32
            break;
560
32
        }
561
562
280k
        if (UNLIKELY(!res.ok())) {
563
0
            LOG(WARNING) << "next failed: " << res;
564
0
            return res;
565
0
        }
566
        // Byte-budget check: _next_row is already saved so stopping here is safe.
567
280k
        if (target_block_row % BLOCK_SIZE_CHECK_INTERVAL_ROWS == 0 &&
568
280k
            _reached_byte_budget(target_columns)) {
569
0
            if (UNLIKELY(_reader_context.record_rowids)) {
570
0
                _block_row_locations.resize(target_block_row);
571
0
            }
572
0
            break;
573
0
        }
574
280k
    } while (target_block_row < batch_max_rows());
575
576
284
    if (_delete_sign_available) {
577
284
        int delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN);
578
284
        DCHECK(delete_sign_idx > 0);
579
284
        if (delete_sign_idx <= 0 || delete_sign_idx >= target_columns.size()) {
580
0
            LOG(WARNING) << "tablet_id: " << tablet()->tablet_id() << " delete sign idx "
581
0
                         << delete_sign_idx
582
0
                         << " not invalid, skip filter delete in base compaction";
583
0
            return Status::OK();
584
0
        }
585
284
        MutableColumnPtr delete_filter_column = (*std::move(_delete_filter_column)).mutate();
586
284
        reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->resize(target_block_row);
587
588
284
        auto* __restrict filter_data =
589
284
                reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->get_data().data();
590
284
        auto* __restrict delete_data =
591
284
                reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get())
592
284
                        ->get_data()
593
284
                        .data();
594
284
        int delete_count = 0;
595
280k
        for (int i = 0; i < target_block_row; ++i) {
596
280k
            bool sign = (delete_data[i] == 0);
597
280k
            filter_data[i] = sign;
598
280k
            if (UNLIKELY(!sign)) {
599
0
                if (UNLIKELY(_reader_context.record_rowids)) {
600
0
                    _block_row_locations[i].row_id = -1;
601
0
                    delete_count++;
602
0
                }
603
0
            }
604
280k
        }
605
284
        auto target_columns_size = target_columns.size();
606
284
        ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column,
607
284
                                                         std::make_shared<DataTypeUInt8>(),
608
284
                                                         "__DORIS_COMPACTION_FILTER__"};
609
284
        block->set_columns(std::move(target_columns));
610
284
        block->insert(column_with_type_and_name);
611
284
        RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size));
612
284
        _stats.rows_del_filtered += target_block_row - block->rows();
613
284
        if (UNLIKELY(_reader_context.record_rowids)) {
614
284
            DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count);
615
284
        }
616
284
    } else {
617
0
        block->set_columns(std::move(target_columns));
618
0
    }
619
284
    return Status::OK();
620
284
}
621
622
280k
Status BlockReader::_insert_data_normal(MutableColumns& columns) {
623
280k
    auto block = _next_row.block.get();
624
625
280k
    RETURN_IF_CATCH_EXCEPTION({
626
280k
        for (auto idx : _normal_columns_idx) {
627
280k
            columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
628
280k
                                                           _next_row.row_pos);
629
280k
        }
630
280k
    });
631
280k
    return Status::OK();
632
280k
}
633
634
115
void BlockReader::_append_agg_data(MutableColumns& columns) {
635
115
    _stored_row_ref.push_back(_next_row);
636
115
    _last_agg_data_counter++;
637
638
    // execute aggregate when accumulated `batch_max_rows()` rows or some ref invalid soon
639
    // `_stored_data_columns` is sized to `batch_max_rows()`,
640
    // this flush keeps the number of rows in `_stored_row_ref` within `batch_max_rows()`.
641
115
    bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
642
115
    if (is_last || _stored_row_ref.size() == batch_max_rows()) {
643
29
        _update_agg_data(columns);
644
29
    }
645
115
}
646
647
32
void BlockReader::_update_agg_data(MutableColumns& columns) {
648
    // copy data to stored block
649
32
    size_t copy_size = _copy_agg_data();
650
651
    // calculate has_null_tag
652
32
    for (auto idx : _agg_columns_idx) {
653
32
        _stored_has_null_tag[idx] = _stored_data_columns[idx]->has_null(0, copy_size);
654
32
    }
655
656
    // calculate aggregate and insert
657
32
    int counter_sum = 0;
658
32
    for (int counter : _agg_data_counters) {
659
3
        _update_agg_value(columns, counter_sum, counter_sum + counter - 1);
660
3
        counter_sum += counter;
661
3
    }
662
663
    // some key still has value at next block, so do not insert
664
32
    if (_last_agg_data_counter) {
665
29
        _update_agg_value(columns, counter_sum, counter_sum + _last_agg_data_counter - 1, false);
666
29
        _last_agg_data_counter = 0;
667
29
    }
668
669
32
    _agg_data_counters.clear();
670
32
}
671
672
32
size_t BlockReader::_copy_agg_data() {
673
32
    size_t copy_size = _stored_row_ref.size();
674
675
147
    for (size_t i = 0; i < copy_size; i++) {
676
115
        auto& ref = _stored_row_ref[i];
677
115
        _temp_ref_map[ref.block.get()].emplace_back(ref.row_pos, i);
678
115
    }
679
680
32
    for (auto idx : _agg_columns_idx) {
681
32
        auto& dst_column = _stored_data_columns[idx];
682
32
        if (_stored_has_variable_length_tag[idx]) {
683
            //variable length type should replace ordered
684
0
            dst_column->clear();
685
0
            for (size_t i = 0; i < copy_size; i++) {
686
0
                auto& ref = _stored_row_ref[i];
687
0
                dst_column->insert_from(*ref.block->get_by_position(idx).column, ref.row_pos);
688
0
            }
689
32
        } else {
690
32
            for (auto& it : _temp_ref_map) {
691
32
                if (!it.second.empty()) {
692
29
                    auto& src_column = *it.first->get_by_position(idx).column;
693
115
                    for (auto& pos : it.second) {
694
115
                        dst_column->replace_column_data(src_column, pos.first, pos.second);
695
115
                    }
696
29
                }
697
32
            }
698
32
        }
699
32
    }
700
701
32
    for (auto& it : _temp_ref_map) {
702
32
        it.second.clear();
703
32
    }
704
32
    _stored_row_ref.clear();
705
706
32
    return copy_size;
707
32
}
708
709
32
void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, bool is_close) {
710
64
    for (int i = 0; i < _agg_columns_idx.size(); i++) {
711
32
        auto idx = _agg_columns_idx[i];
712
713
32
        AggregateFunctionPtr function = _agg_functions[i];
714
32
        AggregateDataPtr place = _agg_places[i];
715
32
        auto* column_ptr = _stored_data_columns[idx].get();
716
717
32
        if (begin <= end) {
718
29
            function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
719
29
                                      _arena, _stored_has_null_tag[idx]);
720
29
        }
721
722
32
        if (is_close) {
723
3
            function->insert_result_into(place, *columns[_return_columns_loc[idx]]);
724
            // reset aggregate data
725
3
            function->reset(place);
726
3
        }
727
32
    }
728
32
    if (is_close) {
729
3
        _arena.clear();
730
3
    }
731
32
}
732
733
} // namespace doris