Coverage Report

Created: 2026-04-16 14:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/iterator/block_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/iterator/block_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <boost/iterator/iterator_facade.hpp>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
30
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
31
#include "cloud/config.h"
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/status.h"
34
#include "core/block/column_with_type_and_name.h"
35
#include "core/column/column_nullable.h"
36
#include "core/column/column_vector.h"
37
#include "core/data_type/data_type_number.h"
38
#include "exprs/aggregate/aggregate_function_reader.h"
39
#include "exprs/function_filter.h"
40
#include "runtime/runtime_state.h"
41
#include "storage/iterator/vcollect_iterator.h"
42
#include "storage/olap_common.h"
43
#include "storage/olap_define.h"
44
#include "storage/predicate/like_column_predicate.h"
45
#include "storage/rowset/rowset.h"
46
#include "storage/rowset/rowset_reader_context.h"
47
#include "storage/tablet/tablet.h"
48
#include "storage/tablet/tablet_schema.h"
49
50
namespace doris {
51
class ColumnPredicate;
52
} // namespace doris
53
54
namespace doris {
55
using namespace ErrorCode;
56
57
67
BlockReader::~BlockReader() {
58
67
    for (int i = 0; i < _agg_functions.size(); ++i) {
59
0
        _agg_functions[i]->destroy(_agg_places[i]);
60
0
        delete[] _agg_places[i];
61
0
    }
62
67
}
63
64
578
Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
65
578
    auto res = (this->*_next_block_func)(block, eof);
66
578
    if (!config::is_cloud_mode()) {
67
578
        if (!res.ok()) [[unlikely]] {
68
0
            static_cast<Tablet*>(_tablet.get())->report_error(res);
69
0
        }
70
578
    }
71
578
    return res;
72
578
}
73
74
67
bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) {
75
67
    std::string pre_rs_last_key;
76
67
    bool pre_rs_key_bounds_truncated {false};
77
67
    const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
78
162
    for (const auto& rs_split : rs_splits) {
79
162
        if (rs_split.rs_reader->rowset()->num_rows() == 0) {
80
0
            continue;
81
0
        }
82
162
        if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
83
0
            return true;
84
0
        }
85
162
        std::string rs_first_key;
86
162
        bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key);
87
162
        if (!has_first_key) {
88
0
            return true;
89
0
        }
90
162
        bool cur_rs_key_bounds_truncated {
91
162
                rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()};
92
162
        if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_rs_last_key},
93
162
                                                  pre_rs_key_bounds_truncated, Slice {rs_first_key},
94
162
                                                  cur_rs_key_bounds_truncated)) {
95
58
            return true;
96
58
        }
97
104
        bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key);
98
104
        pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
99
104
        CHECK(has_last_key);
100
104
    }
101
9
    return false;
102
67
}
103
104
48
Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
105
48
    auto res = _capture_rs_readers(read_params);
106
48
    if (!res.ok()) {
107
0
        LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
108
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
109
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
110
0
                     << ", reader_type:" << int(read_params.reader_type)
111
0
                     << ", version:" << read_params.version;
112
0
        return res;
113
0
    }
114
    // check if rowsets are noneoverlapping
115
48
    {
116
48
        SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns);
117
48
        _is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params);
118
48
        _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
119
48
                            read_params.read_orderby_key_reverse);
120
48
    }
121
122
48
    std::vector<RowsetReaderSharedPtr> valid_rs_readers;
123
48
    RuntimeState* runtime_state = read_params.runtime_state;
124
125
48
    {
126
48
        SCOPED_RAW_TIMER(&_stats.block_reader_rs_readers_init_timer_ns);
127
192
        for (int i = 0; i < read_params.rs_splits.size(); ++i) {
128
144
            if (runtime_state != nullptr && runtime_state->is_cancelled()) {
129
0
                return runtime_state->cancel_reason();
130
0
            }
131
132
144
            auto& rs_split = read_params.rs_splits[i];
133
134
            // _vcollect_iter.topn_next() will init rs_reader by itself
135
144
            if (!_vcollect_iter.use_topn_next()) {
136
144
                RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, rs_split));
137
144
            }
138
139
144
            Status res1 = _vcollect_iter.add_child(rs_split);
140
144
            if (!res1.ok() && !res1.is<END_OF_FILE>()) {
141
0
                LOG(WARNING) << "failed to add child to iterator, err=" << res1;
142
0
                return res1;
143
0
            }
144
144
            if (res1.ok()) {
145
144
                valid_rs_readers.push_back(rs_split.rs_reader);
146
144
            }
147
144
        }
148
48
    }
149
48
    {
150
48
        SCOPED_RAW_TIMER(&_stats.block_reader_build_heap_init_timer_ns);
151
48
        RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers));
152
        // _vcollect_iter.topn_next() can not use current_row
153
48
        if (!_vcollect_iter.use_topn_next()) {
154
48
            auto status = _vcollect_iter.current_row(&_next_row);
155
48
            _eof = status.is<END_OF_FILE>();
156
48
        }
157
48
    }
158
159
0
    return Status::OK();
160
48
}
161
162
0
Status BlockReader::_init_agg_state(const ReaderParams& read_params) {
163
0
    if (_eof) {
164
0
        return Status::OK();
165
0
    }
166
167
0
    _stored_data_columns =
168
0
            _next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
169
170
0
    _stored_has_null_tag.resize(_stored_data_columns.size());
171
0
    _stored_has_variable_length_tag.resize(_stored_data_columns.size());
172
173
0
    auto& tablet_schema = *_tablet_schema;
174
0
    for (auto idx : _agg_columns_idx) {
175
0
        auto column = tablet_schema.column(
176
0
                read_params.origin_return_columns->at(_return_columns_loc[idx]));
177
0
        AggregateFunctionPtr function =
178
0
                column.get_aggregate_function(AGG_READER_SUFFIX, read_params.get_be_exec_version());
179
180
        // to avoid coredump when something goes wrong(i.e. column missmatch)
181
0
        if (!function) {
182
0
            return Status::InternalError(
183
0
                    "Failed to init reader when init agg state: "
184
0
                    "tablet_id: {}, schema_hash: {}, reader_type: {}, version: {}",
185
0
                    read_params.tablet->tablet_id(), read_params.tablet->schema_hash(),
186
0
                    int(read_params.reader_type), read_params.version.to_string());
187
0
        }
188
0
        _agg_functions.push_back(function);
189
        // create aggregate data
190
0
        AggregateDataPtr place = new char[function->size_of_data()];
191
0
        SAFE_CREATE(function->create(place), {
192
0
            _agg_functions.pop_back();
193
0
            delete[] place;
194
0
        });
195
0
        _agg_places.push_back(place);
196
197
        // calculate `_has_variable_length_tag` tag. like string, array, map
198
0
        _stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
199
0
    }
200
201
0
    return Status::OK();
202
0
}
203
204
48
Status BlockReader::init(const ReaderParams& read_params) {
205
48
    RETURN_IF_ERROR(TabletReader::init(read_params));
206
207
48
    auto return_column_size = read_params.origin_return_columns->size();
208
48
    _return_columns_loc.resize(read_params.return_columns.size(), -1);
209
48
    std::unordered_map<int32_t /*cid*/, int32_t /*pos*/> pos_map;
210
176
    for (int i = 0; i < return_column_size; ++i) {
211
128
        auto cid = read_params.origin_return_columns->at(i);
212
        // For each original cid, find the index in return_columns
213
240
        for (int j = 0; j < read_params.return_columns.size(); ++j) {
214
240
            if (read_params.return_columns[j] == cid) {
215
128
                if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) {
216
128
                    pos_map[cid] = (int32_t)_normal_columns_idx.size();
217
128
                    _normal_columns_idx.emplace_back(j);
218
128
                } else {
219
0
                    _agg_columns_idx.emplace_back(j);
220
0
                }
221
128
                _return_columns_loc[j] = i;
222
128
                break;
223
128
            }
224
240
        }
225
128
    }
226
227
48
    if (_tablet_schema->has_seq_map()) {
228
0
        if (_tablet_schema->has_sequence_col()) {
229
0
            auto msg = "sequence columns conflict, both seq_col and seq_map are true!";
230
0
            LOG(WARNING) << msg;
231
0
            return Status::InternalError(msg);
232
0
        }
233
0
        _has_seq_map = true;
234
0
        for (auto seq_val_iter = _tablet_schema->seq_col_idx_to_value_cols_idx().cbegin();
235
0
             seq_val_iter != _tablet_schema->seq_col_idx_to_value_cols_idx().cend();
236
0
             ++seq_val_iter) {
237
0
            int seq_loc = -1;
238
0
            for (int i = 0; i < read_params.return_columns.size(); ++i) {
239
0
                if (read_params.return_columns[i] == seq_val_iter->first) {
240
0
                    seq_loc = i;
241
0
                    break;
242
0
                }
243
0
            }
244
0
            if (seq_loc == -1) {
245
                // don't need to deal with this seq col
246
0
                continue;
247
0
            }
248
249
0
            std::vector<uint32_t> pos_vec;
250
0
            for (auto agg_cid : seq_val_iter->second) {
251
0
                const auto& val_pos_iter = pos_map.find(agg_cid);
252
0
                if (val_pos_iter == pos_map.end()) {
253
0
                    continue;
254
0
                }
255
0
                pos_vec.emplace_back(val_pos_iter->second);
256
0
            }
257
0
            if (_return_columns_loc[seq_loc] == -1) {
258
0
                _seq_map_not_in_origin_block.emplace(seq_loc, pos_vec);
259
0
            } else {
260
0
                _seq_map_in_origin_block.emplace(seq_loc, pos_vec);
261
0
            }
262
0
        }
263
0
    }
264
265
48
    auto status = _init_collect_iter(read_params);
266
48
    if (!status.ok()) [[unlikely]] {
267
0
        if (!config::is_cloud_mode()) {
268
0
            static_cast<Tablet*>(_tablet.get())->report_error(status);
269
0
        }
270
0
        return status;
271
0
    }
272
273
48
    if (_direct_mode) {
274
0
        _next_block_func = &BlockReader::_direct_next_block;
275
0
        return Status::OK();
276
0
    }
277
48
    if (_has_seq_map && !_eof) {
278
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
279
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
280
0
            auto seq_idx = it->first;
281
0
            _seq_columns.insert(
282
0
                    {seq_idx, _next_row.block->get_by_position(seq_idx).column->clone_empty()});
283
0
        }
284
0
    }
285
286
48
    switch (tablet()->keys_type()) {
287
16
    case KeysType::DUP_KEYS:
288
16
        _next_block_func = &BlockReader::_direct_next_block;
289
16
        break;
290
32
    case KeysType::UNIQUE_KEYS:
291
32
        if (read_params.reader_type == ReaderType::READER_QUERY &&
292
32
            _reader_context.enable_unique_key_merge_on_write) {
293
0
            _next_block_func = &BlockReader::_direct_next_block;
294
32
        } else if (_has_seq_map) {
295
0
            _next_block_func = &BlockReader::_replace_key_next_block;
296
32
        } else {
297
32
            _next_block_func = &BlockReader::_unique_key_next_block;
298
32
            if (_filter_delete) {
299
32
                _delete_filter_column = ColumnUInt8::create();
300
32
            }
301
32
        }
302
32
        break;
303
0
    case KeysType::AGG_KEYS:
304
0
        _next_block_func = &BlockReader::_agg_key_next_block;
305
0
        RETURN_IF_ERROR(_init_agg_state(read_params));
306
0
        break;
307
0
    default:
308
0
        DCHECK(false) << "No next row function for type:" << tablet()->keys_type();
309
0
        break;
310
48
    }
311
312
48
    return Status::OK();
313
48
}
314
315
294
Status BlockReader::_direct_next_block(Block* block, bool* eof) {
316
294
    auto res = _vcollect_iter.next(block);
317
294
    if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
318
0
        return res;
319
0
    }
320
294
    *eof = res.is<END_OF_FILE>();
321
294
    _eof = *eof;
322
294
    if (UNLIKELY(_reader_context.record_rowids)) {
323
294
        res = _vcollect_iter.current_block_row_locations(&_block_row_locations);
324
294
        if (UNLIKELY(!res.ok() && res != Status::Error<END_OF_FILE>(""))) {
325
0
            return res;
326
0
        }
327
294
        DCHECK_EQ(_block_row_locations.size(), block->rows());
328
294
    }
329
294
    return Status::OK();
330
294
}
331
332
0
Status BlockReader::_direct_agg_key_next_block(Block* block, bool* eof) {
333
0
    return Status::OK();
334
0
}
335
336
0
Status BlockReader::_replace_key_next_block(Block* block, bool* eof) {
337
0
    if (UNLIKELY(_eof)) {
338
0
        *eof = true;
339
0
        return Status::OK();
340
0
    }
341
342
0
    auto target_block_row = 0;
343
0
    auto target_columns = block->mutate_columns();
344
    // currently seq mapping only support mor table
345
    // so this will not be executed for the time being
346
0
    if (UNLIKELY(_reader_context.record_rowids)) {
347
0
        _block_row_locations.resize(batch_size());
348
0
    }
349
0
    auto merged_row = 0;
350
0
    while (target_block_row < batch_size() && !_eof) {
351
0
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
352
        // use the first line to init _seq_columns
353
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
354
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
355
0
            auto seq_idx = it->first;
356
0
            _update_last_mutil_seq(seq_idx);
357
0
        }
358
0
        if (UNLIKELY(_reader_context.record_rowids)) {
359
0
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
360
0
        }
361
0
        target_block_row++;
362
363
0
        while (!_eof) {
364
            // the version is in reverse order, the first row is the highest version,
365
            // in UNIQUE_KEY highest version is the final result
366
0
            auto res = _vcollect_iter.next(&_next_row);
367
0
            if (UNLIKELY(res.is<END_OF_FILE>())) {
368
0
                _eof = true;
369
0
                *eof = true;
370
0
                if (UNLIKELY(_reader_context.record_rowids)) {
371
0
                    _block_row_locations.resize(target_block_row);
372
0
                }
373
0
                break;
374
0
            }
375
376
0
            if (UNLIKELY(!res.ok())) {
377
0
                LOG(WARNING) << "next failed: " << res;
378
0
                return res;
379
0
            }
380
381
0
            if (_next_row.is_same) {
382
0
                merged_row++;
383
0
                _compare_sequence_map_and_replace(target_columns);
384
0
            } else {
385
0
                break;
386
0
            }
387
0
        }
388
0
    }
389
0
    _merged_rows += merged_row;
390
0
    return Status::OK();
391
0
}
392
393
0
void BlockReader::_compare_sequence_map_and_replace(MutableColumns& columns) {
394
0
    auto src_block = _next_row.block.get();
395
0
    auto src_pos = _next_row.row_pos;
396
397
    // use seq column in origin block to compare and replace
398
0
    for (auto it = _seq_map_in_origin_block.cbegin(); it != _seq_map_in_origin_block.cend(); ++it) {
399
0
        auto seq_idx = it->first;
400
0
        auto dst_seq_column = columns[_return_columns_loc[seq_idx]].get();
401
0
        auto dst_pos = dst_seq_column->size() - 1;
402
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
403
        // the rowset version of dst is higher .
404
0
        auto res = dst_seq_column->compare_at(dst_pos, src_pos, *src_seq_column, -1);
405
0
        if (res >= 0) {
406
0
            continue;
407
0
        }
408
409
        // update value and seq column
410
0
        for (auto& p : it->second) {
411
0
            auto val_idx = _normal_columns_idx[p];
412
0
            auto src_column = src_block->get_by_position(val_idx).column;
413
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
414
0
            dst_column->pop_back(1);
415
0
            dst_column->insert_from(*src_column, src_pos);
416
0
        }
417
418
0
        dst_seq_column->pop_back(1);
419
0
        dst_seq_column->insert_from(*src_seq_column, src_pos);
420
0
    }
421
422
    // use temp seq block to compare and replace because origin block not contains these seq columns
423
0
    for (auto it = _seq_map_not_in_origin_block.cbegin(); it != _seq_map_not_in_origin_block.cend();
424
0
         ++it) {
425
0
        auto seq_idx = it->first;
426
0
        auto dst_seq_column = _seq_columns[seq_idx].get();
427
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
428
        // the rowset version of dst is higher .
429
0
        auto res = dst_seq_column->compare_at(0, src_pos, *src_seq_column, -1);
430
0
        if (res >= 0) {
431
0
            continue;
432
0
        }
433
434
        // update value and seq column (if need to return)
435
0
        for (auto& p : it->second) {
436
0
            auto val_idx = _normal_columns_idx[p];
437
0
            auto src_column = src_block->get_by_position(val_idx).column;
438
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
439
0
            dst_column->pop_back(1);
440
0
            dst_column->insert_from(*src_column, src_pos);
441
0
        }
442
443
0
        _update_last_mutil_seq(seq_idx);
444
0
    }
445
0
}
446
447
0
void BlockReader::_update_last_mutil_seq(int seq_idx) {
448
0
    auto block = _next_row.block.get();
449
0
    _seq_columns[seq_idx]->clear();
450
0
    _seq_columns[seq_idx]->insert_from(*block->get_by_position(seq_idx).column, _next_row.row_pos);
451
0
}
452
453
0
Status BlockReader::_agg_key_next_block(Block* block, bool* eof) {
454
0
    if (UNLIKELY(_eof)) {
455
0
        *eof = true;
456
0
        return Status::OK();
457
0
    }
458
459
0
    auto target_block_row = 0;
460
0
    auto merged_row = 0;
461
0
    auto target_columns = block->mutate_columns();
462
0
    RETURN_IF_ERROR(_insert_data_normal(target_columns));
463
0
    target_block_row++;
464
0
    _append_agg_data(target_columns);
465
466
0
    while (true) {
467
0
        auto res = _vcollect_iter.next(&_next_row);
468
0
        if (UNLIKELY(res.is<END_OF_FILE>())) {
469
0
            _eof = true;
470
0
            *eof = true;
471
0
            break;
472
0
        }
473
0
        if (UNLIKELY(!res.ok())) {
474
0
            LOG(WARNING) << "next failed: " << res;
475
0
            return res;
476
0
        }
477
478
0
        if (!_next_row.is_same) {
479
0
            if (target_block_row == _reader_context.batch_size) {
480
0
                break;
481
0
            }
482
0
            _agg_data_counters.push_back(_last_agg_data_counter);
483
0
            _last_agg_data_counter = 0;
484
485
0
            RETURN_IF_ERROR(_insert_data_normal(target_columns));
486
487
0
            target_block_row++;
488
0
        } else {
489
0
            merged_row++;
490
0
        }
491
492
0
        _append_agg_data(target_columns);
493
0
    }
494
495
0
    _agg_data_counters.push_back(_last_agg_data_counter);
496
0
    _last_agg_data_counter = 0;
497
0
    _update_agg_data(target_columns);
498
0
    block->set_columns(std::move(target_columns));
499
500
0
    _merged_rows += merged_row;
501
0
    return Status::OK();
502
0
}
503
504
284
Status BlockReader::_unique_key_next_block(Block* block, bool* eof) {
505
284
    if (UNLIKELY(_eof)) {
506
0
        *eof = true;
507
0
        return Status::OK();
508
0
    }
509
510
284
    auto target_block_row = 0;
511
284
    auto target_columns = block->mutate_columns();
512
284
    if (UNLIKELY(_reader_context.record_rowids)) {
513
284
        _block_row_locations.resize(_reader_context.batch_size);
514
284
    }
515
516
280k
    do {
517
280k
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
518
519
280k
        if (UNLIKELY(_reader_context.record_rowids)) {
520
280k
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
521
280k
        }
522
280k
        target_block_row++;
523
524
        // the version is in reverse order, the first row is the highest version,
525
        // in UNIQUE_KEY highest version is the final result, there is no need to
526
        // merge the lower versions
527
280k
        auto res = _vcollect_iter.next(&_next_row);
528
280k
        if (UNLIKELY(res.is<END_OF_FILE>())) {
529
32
            _eof = true;
530
32
            *eof = true;
531
32
            if (UNLIKELY(_reader_context.record_rowids)) {
532
32
                _block_row_locations.resize(target_block_row);
533
32
            }
534
32
            break;
535
32
        }
536
537
280k
        if (UNLIKELY(!res.ok())) {
538
0
            LOG(WARNING) << "next failed: " << res;
539
0
            return res;
540
0
        }
541
280k
    } while (target_block_row < _reader_context.batch_size);
542
543
284
    if (_delete_sign_available) {
544
284
        int delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN);
545
284
        DCHECK(delete_sign_idx > 0);
546
284
        if (delete_sign_idx <= 0 || delete_sign_idx >= target_columns.size()) {
547
0
            LOG(WARNING) << "tablet_id: " << tablet()->tablet_id() << " delete sign idx "
548
0
                         << delete_sign_idx
549
0
                         << " not invalid, skip filter delete in base compaction";
550
0
            return Status::OK();
551
0
        }
552
284
        MutableColumnPtr delete_filter_column = (*std::move(_delete_filter_column)).mutate();
553
284
        reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->resize(target_block_row);
554
555
284
        auto* __restrict filter_data =
556
284
                reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->get_data().data();
557
284
        auto* __restrict delete_data =
558
284
                reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get())
559
284
                        ->get_data()
560
284
                        .data();
561
284
        int delete_count = 0;
562
280k
        for (int i = 0; i < target_block_row; ++i) {
563
280k
            bool sign = (delete_data[i] == 0);
564
280k
            filter_data[i] = sign;
565
280k
            if (UNLIKELY(!sign)) {
566
0
                if (UNLIKELY(_reader_context.record_rowids)) {
567
0
                    _block_row_locations[i].row_id = -1;
568
0
                    delete_count++;
569
0
                }
570
0
            }
571
280k
        }
572
284
        auto target_columns_size = target_columns.size();
573
284
        ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column,
574
284
                                                         std::make_shared<DataTypeUInt8>(),
575
284
                                                         "__DORIS_COMPACTION_FILTER__"};
576
284
        block->set_columns(std::move(target_columns));
577
284
        block->insert(column_with_type_and_name);
578
284
        RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size));
579
284
        _stats.rows_del_filtered += target_block_row - block->rows();
580
284
        if (UNLIKELY(_reader_context.record_rowids)) {
581
284
            DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count);
582
284
        }
583
284
    } else {
584
0
        block->set_columns(std::move(target_columns));
585
0
    }
586
284
    return Status::OK();
587
284
}
588
589
280k
Status BlockReader::_insert_data_normal(MutableColumns& columns) {
590
280k
    auto block = _next_row.block.get();
591
592
280k
    RETURN_IF_CATCH_EXCEPTION({
593
280k
        for (auto idx : _normal_columns_idx) {
594
280k
            columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
595
280k
                                                           _next_row.row_pos);
596
280k
        }
597
280k
    });
598
280k
    return Status::OK();
599
280k
}
600
601
0
void BlockReader::_append_agg_data(MutableColumns& columns) {
602
0
    _stored_row_ref.push_back(_next_row);
603
0
    _last_agg_data_counter++;
604
605
    // execute aggregate when have `batch_size` column or some ref invalid soon
606
0
    bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
607
0
    if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
608
0
        _update_agg_data(columns);
609
0
    }
610
0
}
611
612
0
void BlockReader::_update_agg_data(MutableColumns& columns) {
613
    // copy data to stored block
614
0
    size_t copy_size = _copy_agg_data();
615
616
    // calculate has_null_tag
617
0
    for (auto idx : _agg_columns_idx) {
618
0
        _stored_has_null_tag[idx] = _stored_data_columns[idx]->has_null(0, copy_size);
619
0
    }
620
621
    // calculate aggregate and insert
622
0
    int counter_sum = 0;
623
0
    for (int counter : _agg_data_counters) {
624
0
        _update_agg_value(columns, counter_sum, counter_sum + counter - 1);
625
0
        counter_sum += counter;
626
0
    }
627
628
    // some key still has value at next block, so do not insert
629
0
    if (_last_agg_data_counter) {
630
0
        _update_agg_value(columns, counter_sum, counter_sum + _last_agg_data_counter - 1, false);
631
0
        _last_agg_data_counter = 0;
632
0
    }
633
634
0
    _agg_data_counters.clear();
635
0
}
636
637
0
size_t BlockReader::_copy_agg_data() {
638
0
    size_t copy_size = _stored_row_ref.size();
639
640
0
    for (size_t i = 0; i < copy_size; i++) {
641
0
        auto& ref = _stored_row_ref[i];
642
0
        _temp_ref_map[ref.block.get()].emplace_back(ref.row_pos, i);
643
0
    }
644
645
0
    for (auto idx : _agg_columns_idx) {
646
0
        auto& dst_column = _stored_data_columns[idx];
647
0
        if (_stored_has_variable_length_tag[idx]) {
648
            //variable length type should replace ordered
649
0
            dst_column->clear();
650
0
            for (size_t i = 0; i < copy_size; i++) {
651
0
                auto& ref = _stored_row_ref[i];
652
0
                dst_column->insert_from(*ref.block->get_by_position(idx).column, ref.row_pos);
653
0
            }
654
0
        } else {
655
0
            for (auto& it : _temp_ref_map) {
656
0
                if (!it.second.empty()) {
657
0
                    auto& src_column = *it.first->get_by_position(idx).column;
658
0
                    for (auto& pos : it.second) {
659
0
                        dst_column->replace_column_data(src_column, pos.first, pos.second);
660
0
                    }
661
0
                }
662
0
            }
663
0
        }
664
0
    }
665
666
0
    for (auto& it : _temp_ref_map) {
667
0
        it.second.clear();
668
0
    }
669
0
    _stored_row_ref.clear();
670
671
0
    return copy_size;
672
0
}
673
674
0
void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, bool is_close) {
675
0
    for (int i = 0; i < _agg_columns_idx.size(); i++) {
676
0
        auto idx = _agg_columns_idx[i];
677
678
0
        AggregateFunctionPtr function = _agg_functions[i];
679
0
        AggregateDataPtr place = _agg_places[i];
680
0
        auto* column_ptr = _stored_data_columns[idx].get();
681
682
0
        if (begin <= end) {
683
0
            function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
684
0
                                      _arena, _stored_has_null_tag[idx]);
685
0
        }
686
687
0
        if (is_close) {
688
0
            function->insert_result_into(place, *columns[_return_columns_loc[idx]]);
689
            // reset aggregate data
690
0
            function->reset(place);
691
0
        }
692
0
    }
693
0
    if (is_close) {
694
0
        _arena.clear();
695
0
    }
696
0
}
697
698
} // namespace doris