Coverage Report

Created: 2026-06-18 12:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/iterator/block_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/iterator/block_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <boost/iterator/iterator_facade.hpp>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
30
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
31
#include "cloud/config.h"
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/config.h"
34
#include "common/status.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column_nullable.h"
37
#include "core/column/column_string.h"
38
#include "core/column/column_vector.h"
39
#include "core/data_type/data_type_number.h"
40
#include "exprs/aggregate/aggregate_function_reader.h"
41
#include "exprs/function_filter.h"
42
#include "runtime/runtime_state.h"
43
#include "storage/binlog.h"
44
#include "storage/iterator/binlog_block_reader_utils.h"
45
#include "storage/iterator/vcollect_iterator.h"
46
#include "storage/olap_common.h"
47
#include "storage/olap_define.h"
48
#include "storage/predicate/like_column_predicate.h"
49
#include "storage/rowset/rowset.h"
50
#include "storage/rowset/rowset_reader_context.h"
51
#include "storage/tablet/tablet.h"
52
#include "storage/tablet/tablet_schema.h"
53
54
namespace doris {
55
class ColumnPredicate;
56
} // namespace doris
57
58
namespace doris {
59
using namespace ErrorCode;
60
61
static constexpr int32_t BLOCK_SIZE_CHECK_INTERVAL_ROWS = 64;
62
63
82
BlockReader::~BlockReader() {
64
85
    for (int i = 0; i < _agg_functions.size(); ++i) {
65
3
        _agg_functions[i]->destroy(_agg_places[i]);
66
3
        delete[] _agg_places[i];
67
3
    }
68
82
}
69
70
578
Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
71
578
    auto res = (this->*_next_block_func)(block, eof);
72
578
    if (!config::is_cloud_mode()) {
73
578
        if (!res.ok()) [[unlikely]] {
74
0
            static_cast<Tablet*>(_tablet.get())->report_error(res);
75
0
        }
76
578
    }
77
578
    return res;
78
578
}
79
80
// Lazily resolves the positions of the binlog meta columns (op / lsn / tso) inside the
81
// merged source block, and builds _before_column_idx mapping each non-meta column to its
82
// __BEFORE__ mirror. The resolved positions are reused across blocks; if the column
83
// layout changes (detected via _binlog_op_pos sanity check), they are re-resolved.
84
0
Status BlockReader::_ensure_binlog_column_pos(const Block& src_block) {
85
0
    if (_binlog_column_pos_inited) {
86
0
        if (_binlog_op_pos >= 0 && _binlog_op_pos < src_block.columns() &&
87
0
            src_block.get_by_position(_binlog_op_pos).name == kRowBinlogOpColName) {
88
0
            return Status::OK();
89
0
        }
90
0
        _binlog_op_pos = -1;
91
0
        _binlog_lsn_pos = -1;
92
0
        _binlog_timestamp_pos = -1;
93
0
        _binlog_column_pos_inited = false;
94
0
    }
95
96
0
    const uint32_t col_num = src_block.columns();
97
0
    _before_column_idx.resize(col_num);
98
0
    for (uint32_t i = 0; i < col_num; ++i) {
99
0
        const auto& name = src_block.get_by_position(i).name;
100
0
        if (name == kRowBinlogOpColName) {
101
0
            _binlog_op_pos = static_cast<int>(i);
102
0
        } else if (name == kRowBinlogLsnColName) {
103
0
            _binlog_lsn_pos = static_cast<int>(i);
104
0
        } else if (name == kRowBinlogTimestampColName) {
105
0
            _binlog_timestamp_pos = static_cast<int>(i);
106
0
        } else {
107
0
            std::string before_name = binlog::build_before_column_name(name);
108
0
            int tmp_idx = src_block.get_position_by_name(before_name);
109
0
            _before_column_idx[i] = tmp_idx < 0 ? i : tmp_idx;
110
0
        }
111
0
    }
112
0
    _binlog_column_pos_inited = true;
113
0
    return Status::OK();
114
0
}
115
116
0
int64_t BlockReader::_read_binlog_op(const IColumn& col, size_t row) const {
117
0
    const IColumn* cur = &col;
118
0
    if (const auto* nullable = check_and_get_column<ColumnNullable>(*cur)) {
119
0
        if (nullable->is_null_at(row)) {
120
0
            return binlog::ROW_BINLOG_UNKNOWN;
121
0
        }
122
0
        cur = &nullable->get_nested_column();
123
0
    }
124
125
0
    if (const auto* int64_col = check_and_get_column<ColumnInt64>(*cur)) {
126
0
        return int64_col->get_element(row);
127
0
    }
128
129
0
    return binlog::ROW_BINLOG_UNKNOWN;
130
0
}
131
132
0
Status BlockReader::_write_binlog_op(IColumn& col, int64_t op) const {
133
0
    IColumn* cur = &col;
134
0
    ColumnNullable* nullable = nullptr;
135
0
    if (auto* n = typeid_cast<ColumnNullable*>(cur)) {
136
0
        nullable = n;
137
0
        cur = &nullable->get_nested_column();
138
0
    }
139
140
0
    if (auto* int64_col = typeid_cast<ColumnInt64*>(cur)) {
141
0
        int64_col->insert_value(op);
142
0
    } else {
143
0
        return Status::InternalError("invalid column type");
144
0
    }
145
146
0
    if (nullable != nullptr) {
147
0
        nullable->get_null_map_data().push_back(0);
148
0
    }
149
0
    return Status::OK();
150
0
}
151
152
0
bool BlockReader::_is_binlog_meta_column(int idx) const {
153
0
    return idx == _binlog_op_pos || idx == _binlog_lsn_pos || idx == _binlog_timestamp_pos;
154
0
}
155
156
// Resolves which source-block column to read from for a given binlog row position.
157
// When use_before is true and idx is a regular data column, return the index of its
158
// __BEFORE__ mirror (built in _before_column_idx); otherwise return idx itself.
159
// Binlog meta columns (op / lsn / tso) have no BEFORE mirror, so they always pass through.
160
0
int BlockReader::_resolve_source_column_index(int idx, bool use_before) const {
161
0
    if (!use_before || _is_binlog_meta_column(idx)) {
162
0
        return idx;
163
0
    }
164
165
0
    return _before_column_idx[idx];
166
0
}
167
168
0
void BlockReader::_init_pending_row_columns(const Block& block) {
169
0
    if (!_pending_row_columns.empty()) {
170
0
        return;
171
0
    }
172
0
    _pending_row_columns = block.clone_empty_columns();
173
0
}
174
175
// Drains the carry-over row produced on the previous batch boundary into the current
176
// output block. Returns true if a row was emitted, false if no pending row exists.
177
0
bool BlockReader::_emit_pending_row(MutableColumns& target_columns, size_t& output_row_count) {
178
0
    if (!_has_pending_row) {
179
0
        return false;
180
0
    }
181
0
    for (size_t i = 0; i < _pending_row_columns.size(); ++i) {
182
0
        target_columns[i]->insert_from(*_pending_row_columns[i], 0);
183
0
        _pending_row_columns[i]->clear();
184
0
    }
185
0
    _has_pending_row = false;
186
0
    output_row_count++;
187
0
    return true;
188
0
}
189
190
// Copies one source row into target_columns with the given output op code, picking BEFORE
191
// or AFTER values per column according to use_before. Used by _detail_change_next_block to
192
// materialize the BEFORE / AFTER halves of an UPDATE pair (and INSERT / DELETE singletons).
193
Status BlockReader::_append_change_row(MutableColumns& target_columns, const Block& src_block,
194
0
                                       size_t row_pos, int64_t output_op, bool use_before) {
195
0
    for (auto idx : _normal_columns_idx) {
196
0
        int target_col_idx = _return_columns_loc[idx];
197
0
        if (target_col_idx < 0) {
198
0
            continue;
199
0
        }
200
0
        if (idx == _binlog_op_pos) {
201
0
            RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx], output_op));
202
0
            continue;
203
0
        }
204
0
        int source_idx = _resolve_source_column_index(idx, use_before);
205
0
        target_columns[target_col_idx]->insert_from(*src_block.get_by_position(source_idx).column,
206
0
                                                    row_pos);
207
0
    }
208
0
    return Status::OK();
209
0
}
210
211
// MIN_DELTA reader: groups consecutive rows sharing the same primary key in
212
// _stored_data_columns, then collapses the group into the minimum equivalent change
213
// (SKIP / INSERT / DELETE / UPDATE_BEFORE+AFTER) via AggregateFunctionMinDelta.
214
0
Status BlockReader::_min_delta_next_block(Block* block, bool* eof) {
215
0
    if (UNLIKELY(_eof && !_has_pending_row)) {
216
0
        *eof = true;
217
0
        return Status::OK();
218
0
    }
219
220
0
    if (_stored_data_columns.empty()) {
221
0
        _stored_data_columns = _next_row.block->clone_empty_columns();
222
0
    }
223
224
0
    auto target_columns_guard = block->mutate_columns_scoped();
225
0
    auto& target_columns = target_columns_guard.mutable_columns();
226
0
    size_t output_row_count = 0;
227
0
    _init_pending_row_columns(*block);
228
0
    RETURN_IF_ERROR(_ensure_binlog_column_pos(*_next_row.block));
229
0
    while (output_row_count < batch_max_rows()) {
230
0
        if (_emit_pending_row(target_columns, output_row_count)) {
231
0
            continue;
232
0
        }
233
0
        if (_eof) {
234
0
            break;
235
0
        }
236
0
        bool need_pop = _stored_data_columns[0]->size() > 1;
237
0
        for (size_t i = 0; i < _stored_data_columns.size(); ++i) {
238
0
            if (need_pop) {
239
0
                _stored_data_columns[i]->pop_back(1);
240
0
            }
241
0
            _stored_data_columns[i]->insert_from(*_next_row.block->get_by_position(i).column,
242
0
                                                 _next_row.row_pos);
243
0
        }
244
0
        auto res = _vcollect_iter.next(&_next_row);
245
0
        if (UNLIKELY(res.is<END_OF_FILE>())) {
246
0
            _eof = true;
247
0
        } else if (UNLIKELY(!res.ok())) {
248
0
            return res;
249
0
        }
250
251
0
        if (!_eof && _next_row.is_same) {
252
0
            continue;
253
0
        }
254
0
        size_t group_size = _stored_data_columns[0]->size();
255
0
        auto first_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos], 0);
256
0
        auto last_op = _read_binlog_op(*_stored_data_columns[_binlog_op_pos], group_size - 1);
257
0
        auto result = binlog::AggregateFunctionMinDelta::calculate_result(first_op, last_op);
258
0
        switch (result) {
259
0
        case binlog::AggregateFunctionMinDelta::ResultType::SKIP:
260
0
            break;
261
0
        case binlog::AggregateFunctionMinDelta::ResultType::INSERT:
262
0
            for (auto idx : _normal_columns_idx) {
263
0
                int target_col_idx = _return_columns_loc[idx];
264
0
                if (idx == _binlog_op_pos) {
265
0
                    RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
266
0
                                                     binlog::STREAM_CHANGE_INSERT));
267
0
                } else {
268
0
                    target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
269
0
                                                                group_size - 1);
270
0
                }
271
0
            }
272
0
            output_row_count++;
273
0
            break;
274
0
        case binlog::AggregateFunctionMinDelta::ResultType::DELETE:
275
0
            for (auto idx : _normal_columns_idx) {
276
0
                int target_col_idx = _return_columns_loc[idx];
277
0
                if (idx == _binlog_op_pos) {
278
0
                    RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
279
0
                                                     binlog::STREAM_CHANGE_DELETE));
280
0
                } else {
281
0
                    target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
282
0
                                                                group_size - 1);
283
0
                }
284
0
            }
285
0
            output_row_count++;
286
0
            break;
287
0
        case binlog::AggregateFunctionMinDelta::ResultType::UPDATE_BEFORE_AFTER:
288
0
            for (auto idx : _normal_columns_idx) {
289
0
                int target_col_idx = _return_columns_loc[idx];
290
0
                if (idx == _binlog_op_pos) {
291
0
                    RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
292
0
                                                     binlog::STREAM_CHANGE_UPDATE_BEFORE));
293
0
                } else if (idx == _binlog_lsn_pos) {
294
0
                    target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
295
0
                                                                group_size - 1);
296
0
                } else {
297
0
                    int source_idx = _resolve_source_column_index(idx, true);
298
0
                    target_columns[target_col_idx]->insert_from(*_stored_data_columns[source_idx],
299
0
                                                                0);
300
0
                }
301
0
            }
302
0
            output_row_count++;
303
0
            if (output_row_count >= batch_max_rows()) {
304
0
                for (auto idx : _normal_columns_idx) {
305
0
                    int target_col_idx = _return_columns_loc[idx];
306
0
                    if (idx == _binlog_op_pos) {
307
0
                        RETURN_IF_ERROR(_write_binlog_op(*_pending_row_columns[target_col_idx],
308
0
                                                         binlog::STREAM_CHANGE_UPDATE_AFTER));
309
0
                    } else {
310
0
                        _pending_row_columns[target_col_idx]->insert_from(
311
0
                                *_stored_data_columns[idx], group_size - 1);
312
0
                    }
313
0
                }
314
0
                _has_pending_row = true;
315
0
            } else {
316
0
                for (auto idx : _normal_columns_idx) {
317
0
                    int target_col_idx = _return_columns_loc[idx];
318
0
                    if (idx == _binlog_op_pos) {
319
0
                        RETURN_IF_ERROR(_write_binlog_op(*target_columns[target_col_idx],
320
0
                                                         binlog::STREAM_CHANGE_UPDATE_AFTER));
321
0
                    } else {
322
0
                        target_columns[target_col_idx]->insert_from(*_stored_data_columns[idx],
323
0
                                                                    group_size - 1);
324
0
                    }
325
0
                }
326
0
                output_row_count++;
327
0
            }
328
0
            break;
329
0
        }
330
331
0
        for (auto& col : _stored_data_columns) {
332
0
            col->clear();
333
0
        }
334
0
    }
335
0
    *eof = _eof && !_has_pending_row;
336
0
    return Status::OK();
337
0
}
338
339
// DETAIL reader: emits every recorded binlog change verbatim. APPEND -> single INSERT row,
340
// DELETE -> single DELETE row, UPDATE -> a BEFORE+AFTER pair. When the AFTER row would
341
// overflow batch_max_rows(), it is parked in _pending_row_columns and flushed next call.
342
0
Status BlockReader::_detail_change_next_block(Block* block, bool* eof) {
343
0
    if (UNLIKELY(_eof && !_has_pending_row)) {
344
0
        *eof = true;
345
0
        return Status::OK();
346
0
    }
347
0
    auto target_columns_guard = block->mutate_columns_scoped();
348
0
    auto& target_columns = target_columns_guard.mutable_columns();
349
0
    size_t output_row_count = 0;
350
0
    _init_pending_row_columns(*block);
351
0
    RETURN_IF_ERROR(_ensure_binlog_column_pos(*_next_row.block));
352
0
    while (output_row_count < batch_max_rows()) {
353
0
        if (_emit_pending_row(target_columns, output_row_count)) {
354
0
            continue;
355
0
        }
356
0
        if (_eof) {
357
0
            break;
358
0
        }
359
0
        if (UNLIKELY(_next_row.block == nullptr)) {
360
0
            return Status::InternalError("invalid row reference in detail change reader");
361
0
        }
362
0
        const Block& source_block = *_next_row.block;
363
0
        const size_t row = _next_row.row_pos;
364
0
        int64_t op = _read_binlog_op(*source_block.get_by_position(_binlog_op_pos).column, row);
365
0
        if (op == ROW_BINLOG_UPDATE) {
366
0
            RETURN_IF_ERROR(_append_change_row(target_columns, source_block, row,
367
0
                                               binlog::STREAM_CHANGE_UPDATE_BEFORE, true));
368
0
            output_row_count++;
369
0
            if (output_row_count >= batch_max_rows()) {
370
0
                RETURN_IF_ERROR(_append_change_row(_pending_row_columns, source_block, row,
371
0
                                                   binlog::STREAM_CHANGE_UPDATE_AFTER, false));
372
0
                _has_pending_row = true;
373
0
            } else {
374
0
                RETURN_IF_ERROR(_append_change_row(target_columns, source_block, row,
375
0
                                                   binlog::STREAM_CHANGE_UPDATE_AFTER, false));
376
0
                output_row_count++;
377
0
            }
378
0
        } else if (op == ROW_BINLOG_APPEND) {
379
0
            RETURN_IF_ERROR(_append_change_row(target_columns, source_block, row,
380
0
                                               binlog::STREAM_CHANGE_INSERT, false));
381
0
            output_row_count++;
382
0
        } else if (op == ROW_BINLOG_DELETE) {
383
0
            RETURN_IF_ERROR(_append_change_row(target_columns, source_block, row,
384
0
                                               binlog::STREAM_CHANGE_DELETE, false));
385
0
            output_row_count++;
386
0
        }
387
388
0
        auto res = _vcollect_iter.next(&_next_row);
389
0
        if (UNLIKELY(res.is<END_OF_FILE>())) {
390
0
            _eof = true;
391
0
        } else if (UNLIKELY(!res.ok())) {
392
0
            return res;
393
0
        }
394
0
    }
395
0
    *eof = _eof && !_has_pending_row;
396
0
    return Status::OK();
397
0
}
398
399
67
bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) {
400
67
    std::string pre_rs_last_key;
401
67
    bool pre_rs_key_bounds_truncated {false};
402
67
    const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
403
162
    for (const auto& rs_split : rs_splits) {
404
162
        if (rs_split.rs_reader->rowset()->num_rows() == 0) {
405
0
            continue;
406
0
        }
407
162
        if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
408
0
            return true;
409
0
        }
410
162
        std::string rs_first_key;
411
162
        bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key);
412
162
        if (!has_first_key) {
413
0
            return true;
414
0
        }
415
162
        bool cur_rs_key_bounds_truncated {
416
162
                rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()};
417
162
        if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_rs_last_key},
418
162
                                                  pre_rs_key_bounds_truncated, Slice {rs_first_key},
419
162
                                                  cur_rs_key_bounds_truncated)) {
420
58
            return true;
421
58
        }
422
104
        bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key);
423
104
        pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
424
104
        CHECK(has_last_key);
425
104
    }
426
9
    return false;
427
67
}
428
429
48
Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
430
48
    auto res = _capture_rs_readers(read_params);
431
48
    if (!res.ok()) {
432
0
        LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
433
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
434
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
435
0
                     << ", reader_type:" << int(read_params.reader_type)
436
0
                     << ", version:" << read_params.version;
437
0
        return res;
438
0
    }
439
    // check if rowsets are noneoverlapping
440
48
    {
441
48
        SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns);
442
48
        _is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params);
443
48
        const bool is_min_delta_stream = read_params.binlog_scan_type == TBinlogScanType::MIN_DELTA;
444
48
        const bool force_merge = read_params.read_orderby_key || is_min_delta_stream;
445
48
        const bool is_reverse = !is_min_delta_stream && read_params.read_orderby_key_reverse;
446
48
        _vcollect_iter.init(this, _is_rowsets_overlapping, force_merge, is_reverse);
447
48
    }
448
449
48
    std::vector<RowsetReaderSharedPtr> valid_rs_readers;
450
48
    RuntimeState* runtime_state = read_params.runtime_state;
451
452
48
    {
453
48
        SCOPED_RAW_TIMER(&_stats.block_reader_rs_readers_init_timer_ns);
454
192
        for (int i = 0; i < read_params.rs_splits.size(); ++i) {
455
144
            if (runtime_state != nullptr && runtime_state->is_cancelled()) {
456
0
                return runtime_state->cancel_reason();
457
0
            }
458
459
144
            auto& rs_split = read_params.rs_splits[i];
460
461
            // _vcollect_iter.topn_next() will init rs_reader by itself
462
144
            if (!_vcollect_iter.use_topn_next()) {
463
144
                RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, rs_split));
464
144
            }
465
466
144
            Status res1 = _vcollect_iter.add_child(rs_split);
467
144
            if (!res1.ok() && !res1.is<END_OF_FILE>()) {
468
0
                LOG(WARNING) << "failed to add child to iterator, err=" << res1;
469
0
                return res1;
470
0
            }
471
144
            if (res1.ok()) {
472
144
                valid_rs_readers.push_back(rs_split.rs_reader);
473
144
            }
474
144
        }
475
48
    }
476
48
    {
477
48
        SCOPED_RAW_TIMER(&_stats.block_reader_build_heap_init_timer_ns);
478
48
        RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers));
479
        // _vcollect_iter.topn_next() can not use current_row
480
48
        if (!_vcollect_iter.use_topn_next()) {
481
48
            auto status = _vcollect_iter.current_row(&_next_row);
482
48
            _eof = status.is<END_OF_FILE>();
483
48
        }
484
48
    }
485
486
0
    return Status::OK();
487
48
}
488
489
0
Status BlockReader::_init_agg_state(const ReaderParams& read_params) {
490
0
    if (_eof) {
491
0
        return Status::OK();
492
0
    }
493
494
0
    auto stored_block = _next_row.block->create_same_struct_block(batch_max_rows());
495
0
    _stored_data_columns = std::move(*stored_block).mutate_columns();
496
497
0
    _stored_has_null_tag.resize(_stored_data_columns.size());
498
0
    _stored_has_variable_length_tag.resize(_stored_data_columns.size());
499
500
0
    auto& tablet_schema = *_tablet_schema;
501
0
    for (auto idx : _agg_columns_idx) {
502
0
        auto column = tablet_schema.column(
503
0
                read_params.origin_return_columns->at(_return_columns_loc[idx]));
504
0
        AggregateFunctionPtr function =
505
0
                column.get_aggregate_function(AGG_READER_SUFFIX, read_params.get_be_exec_version());
506
507
        // to avoid coredump when something goes wrong(i.e. column missmatch)
508
0
        if (!function) {
509
0
            return Status::InternalError(
510
0
                    "Failed to init reader when init agg state: "
511
0
                    "tablet_id: {}, schema_hash: {}, reader_type: {}, version: {}",
512
0
                    read_params.tablet->tablet_id(), read_params.tablet->schema_hash(),
513
0
                    int(read_params.reader_type), read_params.version.to_string());
514
0
        }
515
0
        _agg_functions.push_back(function);
516
        // create aggregate data
517
0
        AggregateDataPtr place = new char[function->size_of_data()];
518
0
        SAFE_CREATE(function->create(place), {
519
0
            _agg_functions.pop_back();
520
0
            delete[] place;
521
0
        });
522
0
        _agg_places.push_back(place);
523
524
        // calculate `_has_variable_length_tag` tag. like string, array, map
525
0
        _stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
526
0
    }
527
528
0
    return Status::OK();
529
0
}
530
531
48
Status BlockReader::init(const ReaderParams& read_params) {
532
48
    SCOPED_RAW_TIMER(&_stats.tablet_reader_init_timer_ns);
533
48
    RETURN_IF_ERROR(TabletReader::init(read_params));
534
535
48
    auto return_column_size = read_params.origin_return_columns->size();
536
48
    _return_columns_loc.resize(read_params.return_columns.size(), -1);
537
48
    std::unordered_map<int32_t /*cid*/, int32_t /*pos*/> pos_map;
538
176
    for (int i = 0; i < return_column_size; ++i) {
539
128
        auto cid = read_params.origin_return_columns->at(i);
540
        // For each original cid, find the index in return_columns
541
240
        for (int j = 0; j < read_params.return_columns.size(); ++j) {
542
240
            if (read_params.return_columns[j] == cid) {
543
128
                if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) {
544
128
                    pos_map[cid] = (int32_t)_normal_columns_idx.size();
545
128
                    _normal_columns_idx.emplace_back(j);
546
128
                } else {
547
0
                    _agg_columns_idx.emplace_back(j);
548
0
                }
549
128
                _return_columns_loc[j] = i;
550
128
                break;
551
128
            }
552
240
        }
553
128
    }
554
555
48
    if (_tablet_schema->has_seq_map()) {
556
0
        if (_tablet_schema->has_sequence_col()) {
557
0
            auto msg = "sequence columns conflict, both seq_col and seq_map are true!";
558
0
            LOG(WARNING) << msg;
559
0
            return Status::InternalError(msg);
560
0
        }
561
0
        _has_seq_map = true;
562
0
        for (auto seq_val_iter = _tablet_schema->seq_col_idx_to_value_cols_idx().cbegin();
563
0
             seq_val_iter != _tablet_schema->seq_col_idx_to_value_cols_idx().cend();
564
0
             ++seq_val_iter) {
565
0
            int seq_loc = -1;
566
0
            for (int i = 0; i < read_params.return_columns.size(); ++i) {
567
0
                if (read_params.return_columns[i] == seq_val_iter->first) {
568
0
                    seq_loc = i;
569
0
                    break;
570
0
                }
571
0
            }
572
0
            if (seq_loc == -1) {
573
                // don't need to deal with this seq col
574
0
                continue;
575
0
            }
576
577
0
            std::vector<uint32_t> pos_vec;
578
0
            for (auto agg_cid : seq_val_iter->second) {
579
0
                const auto& val_pos_iter = pos_map.find(agg_cid);
580
0
                if (val_pos_iter == pos_map.end()) {
581
0
                    continue;
582
0
                }
583
0
                pos_vec.emplace_back(val_pos_iter->second);
584
0
            }
585
0
            if (_return_columns_loc[seq_loc] == -1) {
586
0
                _seq_map_not_in_origin_block.emplace(seq_loc, pos_vec);
587
0
            } else {
588
0
                _seq_map_in_origin_block.emplace(seq_loc, pos_vec);
589
0
            }
590
0
        }
591
0
    }
592
593
48
    auto status = _init_collect_iter(read_params);
594
48
    if (!status.ok()) [[unlikely]] {
595
0
        if (!config::is_cloud_mode()) {
596
0
            static_cast<Tablet*>(_tablet.get())->report_error(status);
597
0
        }
598
0
        return status;
599
0
    }
600
601
    // MIN_DELTA: collapse consecutive same-key changes to the minimum equivalent change set
602
    // (e.g. INSERT+DELETE -> SKIP, INSERT+UPDATE -> INSERT). Reduces downstream traffic.
603
48
    if (read_params.binlog_scan_type == TBinlogScanType::MIN_DELTA) {
604
0
        _next_block_func = &BlockReader::_min_delta_next_block;
605
0
        return Status::OK();
606
0
    }
607
    // DETAIL: emit every recorded change as-is, with BEFORE+AFTER rows for UPDATE.
608
    // Used when the consumer needs full change history rather than the net delta.
609
48
    if (read_params.binlog_scan_type == TBinlogScanType::DETAIL) {
610
0
        _next_block_func = &BlockReader::_detail_change_next_block;
611
0
        return Status::OK();
612
0
    }
613
614
48
    if (_direct_mode) {
615
0
        _next_block_func = &BlockReader::_direct_next_block;
616
0
        return Status::OK();
617
0
    }
618
48
    if (_has_seq_map && !_eof) {
619
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
620
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
621
0
            auto seq_idx = it->first;
622
0
            _seq_columns.insert(
623
0
                    {seq_idx, _next_row.block->get_by_position(seq_idx).column->clone_empty()});
624
0
        }
625
0
    }
626
627
48
    switch (_tablet_schema->keys_type()) {
628
16
    case KeysType::DUP_KEYS:
629
16
        _next_block_func = &BlockReader::_direct_next_block;
630
16
        break;
631
32
    case KeysType::UNIQUE_KEYS:
632
32
        if (read_params.reader_type == ReaderType::READER_QUERY &&
633
32
            _reader_context.enable_unique_key_merge_on_write) {
634
0
            _next_block_func = &BlockReader::_direct_next_block;
635
32
        } else if (_has_seq_map) {
636
0
            _next_block_func = &BlockReader::_replace_key_next_block;
637
32
        } else {
638
32
            _next_block_func = &BlockReader::_unique_key_next_block;
639
32
            if (_filter_delete) {
640
32
                _delete_filter_column = ColumnUInt8::create();
641
32
            }
642
32
        }
643
32
        break;
644
0
    case KeysType::AGG_KEYS:
645
0
        _next_block_func = &BlockReader::_agg_key_next_block;
646
0
        RETURN_IF_ERROR(_init_agg_state(read_params));
647
0
        break;
648
0
    default:
649
0
        DCHECK(false) << "No next row function for type:" << _tablet_schema->keys_type();
650
0
        break;
651
48
    }
652
653
48
    return Status::OK();
654
48
}
655
656
294
Status BlockReader::_direct_next_block(Block* block, bool* eof) {
657
294
    auto res = _vcollect_iter.next(block);
658
294
    if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
659
0
        return res;
660
0
    }
661
294
    *eof = res.is<END_OF_FILE>();
662
294
    _eof = *eof;
663
294
    if (UNLIKELY(_reader_context.record_rowids)) {
664
294
        res = _vcollect_iter.current_block_row_locations(&_block_row_locations);
665
294
        if (UNLIKELY(!res.ok() && res != Status::Error<END_OF_FILE>(""))) {
666
0
            return res;
667
0
        }
668
294
        DCHECK_EQ(_block_row_locations.size(), block->rows());
669
294
    }
670
294
    return Status::OK();
671
294
}
672
673
0
Status BlockReader::_direct_agg_key_next_block(Block* block, bool* eof) {
674
0
    return Status::OK();
675
0
}
676
677
0
Status BlockReader::_replace_key_next_block(Block* block, bool* eof) {
678
0
    if (UNLIKELY(_eof)) {
679
0
        *eof = true;
680
0
        return Status::OK();
681
0
    }
682
683
0
    auto target_block_row = 0;
684
0
    auto target_columns_guard = block->mutate_columns_scoped();
685
0
    auto& target_columns = target_columns_guard.mutable_columns();
686
    // currently seq mapping only support mor table
687
    // so this will not be executed for the time being
688
0
    if (UNLIKELY(_reader_context.record_rowids)) {
689
0
        _block_row_locations.resize(batch_max_rows());
690
0
    }
691
0
    auto merged_row = 0;
692
0
    while (target_block_row < batch_max_rows() && !_eof) {
693
0
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
694
        // use the first line to init _seq_columns
695
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
696
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
697
0
            auto seq_idx = it->first;
698
0
            _update_last_mutil_seq(seq_idx);
699
0
        }
700
0
        if (UNLIKELY(_reader_context.record_rowids)) {
701
0
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
702
0
        }
703
0
        target_block_row++;
704
705
0
        while (!_eof) {
706
            // the version is in reverse order, the first row is the highest version,
707
            // in UNIQUE_KEY highest version is the final result
708
0
            auto res = _vcollect_iter.next(&_next_row);
709
0
            if (UNLIKELY(res.is<END_OF_FILE>())) {
710
0
                _eof = true;
711
0
                *eof = true;
712
0
                if (UNLIKELY(_reader_context.record_rowids)) {
713
0
                    _block_row_locations.resize(target_block_row);
714
0
                }
715
0
                break;
716
0
            }
717
718
0
            if (UNLIKELY(!res.ok())) {
719
0
                LOG(WARNING) << "next failed: " << res;
720
0
                return res;
721
0
            }
722
723
0
            if (_next_row.is_same) {
724
0
                merged_row++;
725
0
                _compare_sequence_map_and_replace(target_columns);
726
0
            } else {
727
0
                break;
728
0
            }
729
0
        }
730
        // Byte-budget check: after the inner loop _next_row is either EOF or the next different
731
        // key, so it is safe to stop accumulating here without repeating any row.
732
0
        if (target_block_row % BLOCK_SIZE_CHECK_INTERVAL_ROWS == 0 &&
733
0
            _reached_byte_budget(target_columns)) {
734
0
            if (UNLIKELY(_reader_context.record_rowids)) {
735
0
                _block_row_locations.resize(target_block_row);
736
0
            }
737
0
            break;
738
0
        }
739
0
    }
740
0
    _merged_rows += merged_row;
741
0
    return Status::OK();
742
0
}
743
744
4.36k
bool BlockReader::_reached_byte_budget(const MutableColumns& columns) const {
745
4.36k
    return config::enable_adaptive_batch_size && _reader_context.preferred_block_size_bytes > 0 &&
746
4.36k
           Block::columns_byte_size(columns) >= _reader_context.preferred_block_size_bytes;
747
4.36k
}
748
749
0
void BlockReader::_compare_sequence_map_and_replace(MutableColumns& columns) {
750
0
    auto src_block = _next_row.block.get();
751
0
    auto src_pos = _next_row.row_pos;
752
753
    // use seq column in origin block to compare and replace
754
0
    for (auto it = _seq_map_in_origin_block.cbegin(); it != _seq_map_in_origin_block.cend(); ++it) {
755
0
        auto seq_idx = it->first;
756
0
        auto dst_seq_column = columns[_return_columns_loc[seq_idx]].get();
757
0
        auto dst_pos = dst_seq_column->size() - 1;
758
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
759
        // the rowset version of dst is higher .
760
0
        auto res = dst_seq_column->compare_at(dst_pos, src_pos, *src_seq_column, -1);
761
0
        if (res >= 0) {
762
0
            continue;
763
0
        }
764
765
        // update value and seq column
766
0
        for (auto& p : it->second) {
767
0
            auto val_idx = _normal_columns_idx[p];
768
0
            auto src_column = src_block->get_by_position(val_idx).column;
769
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
770
0
            dst_column->pop_back(1);
771
0
            dst_column->insert_from(*src_column, src_pos);
772
0
        }
773
774
0
        dst_seq_column->pop_back(1);
775
0
        dst_seq_column->insert_from(*src_seq_column, src_pos);
776
0
    }
777
778
    // use temp seq block to compare and replace because origin block not contains these seq columns
779
0
    for (auto it = _seq_map_not_in_origin_block.cbegin(); it != _seq_map_not_in_origin_block.cend();
780
0
         ++it) {
781
0
        auto seq_idx = it->first;
782
0
        auto dst_seq_column = _seq_columns[seq_idx].get();
783
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
784
        // the rowset version of dst is higher .
785
0
        auto res = dst_seq_column->compare_at(0, src_pos, *src_seq_column, -1);
786
0
        if (res >= 0) {
787
0
            continue;
788
0
        }
789
790
        // update value and seq column (if need to return)
791
0
        for (auto& p : it->second) {
792
0
            auto val_idx = _normal_columns_idx[p];
793
0
            auto src_column = src_block->get_by_position(val_idx).column;
794
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
795
0
            dst_column->pop_back(1);
796
0
            dst_column->insert_from(*src_column, src_pos);
797
0
        }
798
799
0
        _update_last_mutil_seq(seq_idx);
800
0
    }
801
0
}
802
803
0
void BlockReader::_update_last_mutil_seq(int seq_idx) {
804
0
    auto block = _next_row.block.get();
805
0
    _seq_columns[seq_idx]->clear();
806
0
    _seq_columns[seq_idx]->insert_from(*block->get_by_position(seq_idx).column, _next_row.row_pos);
807
0
}
808
809
0
Status BlockReader::_agg_key_next_block(Block* block, bool* eof) {
810
0
    if (UNLIKELY(_eof)) {
811
0
        *eof = true;
812
0
        return Status::OK();
813
0
    }
814
815
0
    auto target_block_row = 0;
816
0
    auto merged_row = 0;
817
0
    auto target_columns_guard = block->mutate_columns_scoped();
818
0
    auto& target_columns = target_columns_guard.mutable_columns();
819
0
    RETURN_IF_ERROR(_insert_data_normal(target_columns));
820
0
    target_block_row++;
821
0
    _append_agg_data(target_columns);
822
823
0
    while (true) {
824
0
        auto res = _vcollect_iter.next(&_next_row);
825
0
        if (UNLIKELY(res.is<END_OF_FILE>())) {
826
0
            _eof = true;
827
0
            *eof = true;
828
0
            break;
829
0
        }
830
0
        if (UNLIKELY(!res.ok())) {
831
0
            LOG(WARNING) << "next failed: " << res;
832
0
            return res;
833
0
        }
834
835
0
        if (!_next_row.is_same) {
836
0
            if (target_block_row == batch_max_rows()) {
837
0
                break;
838
0
            }
839
            // Byte-budget check at group boundary: _next_row is the first row of the new group
840
            // and is still pending (not yet inserted), so stopping here is safe.
841
0
            if (target_block_row % BLOCK_SIZE_CHECK_INTERVAL_ROWS == 0 &&
842
0
                _reached_byte_budget(target_columns)) {
843
0
                break;
844
0
            }
845
846
0
            _agg_data_counters.push_back(_last_agg_data_counter);
847
0
            _last_agg_data_counter = 0;
848
849
0
            RETURN_IF_ERROR(_insert_data_normal(target_columns));
850
851
0
            target_block_row++;
852
0
        } else {
853
0
            merged_row++;
854
0
        }
855
856
0
        _append_agg_data(target_columns);
857
0
    }
858
859
0
    _agg_data_counters.push_back(_last_agg_data_counter);
860
0
    _last_agg_data_counter = 0;
861
0
    _update_agg_data(target_columns);
862
863
0
    _merged_rows += merged_row;
864
0
    return Status::OK();
865
0
}
866
867
284
Status BlockReader::_unique_key_next_block(Block* block, bool* eof) {
868
284
    if (UNLIKELY(_eof)) {
869
0
        *eof = true;
870
0
        return Status::OK();
871
0
    }
872
873
284
    auto target_block_row = 0;
874
284
    auto target_columns_guard = block->mutate_columns_scoped();
875
284
    auto& target_columns = target_columns_guard.mutable_columns();
876
284
    if (UNLIKELY(_reader_context.record_rowids)) {
877
284
        _block_row_locations.resize(batch_max_rows());
878
284
    }
879
880
280k
    do {
881
280k
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
882
883
280k
        if (UNLIKELY(_reader_context.record_rowids)) {
884
280k
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
885
280k
        }
886
280k
        target_block_row++;
887
888
        // the version is in reverse order, the first row is the highest version,
889
        // in UNIQUE_KEY highest version is the final result, there is no need to
890
        // merge the lower versions
891
280k
        auto res = _vcollect_iter.next(&_next_row);
892
280k
        if (UNLIKELY(res.is<END_OF_FILE>())) {
893
32
            _eof = true;
894
32
            *eof = true;
895
32
            if (UNLIKELY(_reader_context.record_rowids)) {
896
32
                _block_row_locations.resize(target_block_row);
897
32
            }
898
32
            break;
899
32
        }
900
901
280k
        if (UNLIKELY(!res.ok())) {
902
0
            LOG(WARNING) << "next failed: " << res;
903
0
            return res;
904
0
        }
905
        // Byte-budget check: _next_row is already saved so stopping here is safe.
906
280k
        if (target_block_row % BLOCK_SIZE_CHECK_INTERVAL_ROWS == 0 &&
907
280k
            _reached_byte_budget(target_columns)) {
908
0
            if (UNLIKELY(_reader_context.record_rowids)) {
909
0
                _block_row_locations.resize(target_block_row);
910
0
            }
911
0
            break;
912
0
        }
913
280k
    } while (target_block_row < batch_max_rows());
914
915
284
    if (_delete_sign_available) {
916
284
        int delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN);
917
284
        DCHECK(delete_sign_idx > 0);
918
284
        if (delete_sign_idx <= 0 || delete_sign_idx >= target_columns.size()) {
919
0
            LOG(WARNING) << "tablet_id: " << tablet()->tablet_id() << " delete sign idx "
920
0
                         << delete_sign_idx
921
0
                         << " not invalid, skip filter delete in base compaction";
922
0
            target_columns_guard.restore();
923
0
            return Status::OK();
924
0
        }
925
284
        auto delete_filter_column = IColumn::mutate(std::move(_delete_filter_column));
926
284
        reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->resize(target_block_row);
927
928
284
        auto* __restrict filter_data =
929
284
                reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->get_data().data();
930
284
        auto* __restrict delete_data =
931
284
                reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get())
932
284
                        ->get_data()
933
284
                        .data();
934
284
        int delete_count = 0;
935
280k
        for (int i = 0; i < target_block_row; ++i) {
936
280k
            bool sign = (delete_data[i] == 0);
937
280k
            filter_data[i] = sign;
938
280k
            if (UNLIKELY(!sign)) {
939
0
                if (UNLIKELY(_reader_context.record_rowids)) {
940
0
                    _block_row_locations[i].row_id = -1;
941
0
                    delete_count++;
942
0
                }
943
0
            }
944
280k
        }
945
284
        auto target_columns_size = target_columns.size();
946
284
        _delete_filter_column = std::move(delete_filter_column);
947
284
        ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column,
948
284
                                                         std::make_shared<DataTypeUInt8>(),
949
284
                                                         "__DORIS_COMPACTION_FILTER__"};
950
284
        target_columns_guard.restore();
951
284
        block->insert(column_with_type_and_name);
952
284
        RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size));
953
284
        _stats.rows_del_filtered += target_block_row - block->rows();
954
284
        if (UNLIKELY(_reader_context.record_rowids)) {
955
284
            DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count);
956
284
        }
957
284
    }
958
284
    return Status::OK();
959
284
}
960
961
280k
Status BlockReader::_insert_data_normal(MutableColumns& columns) {
962
280k
    auto block = _next_row.block.get();
963
964
280k
    RETURN_IF_CATCH_EXCEPTION({
965
280k
        for (auto idx : _normal_columns_idx) {
966
280k
            columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
967
280k
                                                           _next_row.row_pos);
968
280k
        }
969
280k
    });
970
280k
    return Status::OK();
971
280k
}
972
973
115
void BlockReader::_append_agg_data(MutableColumns& columns) {
974
115
    _stored_row_ref.push_back(_next_row);
975
115
    _last_agg_data_counter++;
976
977
    // execute aggregate when accumulated `batch_max_rows()` rows or some ref invalid soon
978
    // `_stored_data_columns` is sized to `batch_max_rows()`,
979
    // this flush keeps the number of rows in `_stored_row_ref` within `batch_max_rows()`.
980
115
    bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
981
115
    if (is_last || _stored_row_ref.size() == batch_max_rows()) {
982
29
        _update_agg_data(columns);
983
29
    }
984
115
}
985
986
32
void BlockReader::_update_agg_data(MutableColumns& columns) {
987
    // copy data to stored block
988
32
    size_t copy_size = _copy_agg_data();
989
990
    // calculate has_null_tag
991
32
    for (auto idx : _agg_columns_idx) {
992
32
        _stored_has_null_tag[idx] = _stored_data_columns[idx]->has_null(0, copy_size);
993
32
    }
994
995
    // calculate aggregate and insert
996
32
    int counter_sum = 0;
997
32
    for (int counter : _agg_data_counters) {
998
3
        _update_agg_value(columns, counter_sum, counter_sum + counter - 1);
999
3
        counter_sum += counter;
1000
3
    }
1001
1002
    // some key still has value at next block, so do not insert
1003
32
    if (_last_agg_data_counter) {
1004
29
        _update_agg_value(columns, counter_sum, counter_sum + _last_agg_data_counter - 1, false);
1005
29
        _last_agg_data_counter = 0;
1006
29
    }
1007
1008
32
    _agg_data_counters.clear();
1009
32
}
1010
1011
32
size_t BlockReader::_copy_agg_data() {
1012
32
    size_t copy_size = _stored_row_ref.size();
1013
1014
147
    for (size_t i = 0; i < copy_size; i++) {
1015
115
        auto& ref = _stored_row_ref[i];
1016
115
        _temp_ref_map[ref.block.get()].emplace_back(ref.row_pos, i);
1017
115
    }
1018
1019
32
    for (auto idx : _agg_columns_idx) {
1020
32
        auto& dst_column = _stored_data_columns[idx];
1021
32
        if (_stored_has_variable_length_tag[idx]) {
1022
            //variable length type should replace ordered
1023
0
            dst_column->clear();
1024
0
            for (size_t i = 0; i < copy_size; i++) {
1025
0
                auto& ref = _stored_row_ref[i];
1026
0
                dst_column->insert_from(*ref.block->get_by_position(idx).column, ref.row_pos);
1027
0
            }
1028
32
        } else {
1029
32
            for (auto& it : _temp_ref_map) {
1030
32
                if (!it.second.empty()) {
1031
29
                    auto& src_column = *it.first->get_by_position(idx).column;
1032
115
                    for (auto& pos : it.second) {
1033
115
                        dst_column->replace_column_data(src_column, pos.first, pos.second);
1034
115
                    }
1035
29
                }
1036
32
            }
1037
32
        }
1038
32
    }
1039
1040
32
    for (auto& it : _temp_ref_map) {
1041
32
        it.second.clear();
1042
32
    }
1043
32
    _stored_row_ref.clear();
1044
1045
32
    return copy_size;
1046
32
}
1047
1048
32
void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, bool is_close) {
1049
64
    for (int i = 0; i < _agg_columns_idx.size(); i++) {
1050
32
        auto idx = _agg_columns_idx[i];
1051
1052
32
        AggregateFunctionPtr function = _agg_functions[i];
1053
32
        AggregateDataPtr place = _agg_places[i];
1054
32
        auto* column_ptr = _stored_data_columns[idx].get();
1055
1056
32
        if (begin <= end) {
1057
29
            function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
1058
29
                                      _arena, _stored_has_null_tag[idx]);
1059
29
        }
1060
1061
32
        if (is_close) {
1062
3
            function->insert_result_into(place, *columns[_return_columns_loc[idx]]);
1063
            // reset aggregate data
1064
3
            function->reset(place);
1065
3
        }
1066
32
    }
1067
32
    if (is_close) {
1068
3
        _arena.clear();
1069
3
    }
1070
32
}
1071
1072
} // namespace doris