Coverage Report

Created: 2026-03-13 19:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/iterator/block_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/iterator/block_reader.h"
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <glog/logging.h>
22
#include <stdint.h>
23
24
#include <algorithm>
25
#include <boost/iterator/iterator_facade.hpp>
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
30
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
31
#include "cloud/config.h"
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/status.h"
34
#include "core/block/column_with_type_and_name.h"
35
#include "core/column/column_nullable.h"
36
#include "core/column/column_vector.h"
37
#include "core/data_type/data_type_number.h"
38
#include "exprs/aggregate/aggregate_function_reader.h"
39
#include "exprs/function_filter.h"
40
#include "runtime/runtime_state.h"
41
#include "storage/iterator/vcollect_iterator.h"
42
#include "storage/olap_common.h"
43
#include "storage/olap_define.h"
44
#include "storage/predicate/like_column_predicate.h"
45
#include "storage/rowset/rowset.h"
46
#include "storage/rowset/rowset_reader_context.h"
47
#include "storage/tablet/tablet.h"
48
#include "storage/tablet/tablet_schema.h"
49
50
namespace doris {
51
class ColumnPredicate;
52
} // namespace doris
53
54
namespace doris {
55
#include "common/compile_check_begin.h"
56
using namespace ErrorCode;
57
58
67
BlockReader::~BlockReader() {
59
67
    for (int i = 0; i < _agg_functions.size(); ++i) {
60
0
        _agg_functions[i]->destroy(_agg_places[i]);
61
0
        delete[] _agg_places[i];
62
0
    }
63
67
}
64
65
578
Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) {
66
578
    auto res = (this->*_next_block_func)(block, eof);
67
578
    if (!config::is_cloud_mode()) {
68
578
        if (!res.ok()) [[unlikely]] {
69
0
            static_cast<Tablet*>(_tablet.get())->report_error(res);
70
0
        }
71
578
    }
72
578
    return res;
73
578
}
74
75
67
bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) {
76
67
    std::string pre_rs_last_key;
77
67
    bool pre_rs_key_bounds_truncated {false};
78
67
    const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
79
162
    for (const auto& rs_split : rs_splits) {
80
162
        if (rs_split.rs_reader->rowset()->num_rows() == 0) {
81
0
            continue;
82
0
        }
83
162
        if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
84
0
            return true;
85
0
        }
86
162
        std::string rs_first_key;
87
162
        bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key);
88
162
        if (!has_first_key) {
89
0
            return true;
90
0
        }
91
162
        bool cur_rs_key_bounds_truncated {
92
162
                rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()};
93
162
        if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_rs_last_key},
94
162
                                                  pre_rs_key_bounds_truncated, Slice {rs_first_key},
95
162
                                                  cur_rs_key_bounds_truncated)) {
96
58
            return true;
97
58
        }
98
104
        bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key);
99
104
        pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated;
100
104
        CHECK(has_last_key);
101
104
    }
102
9
    return false;
103
67
}
104
105
48
Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
106
48
    auto res = _capture_rs_readers(read_params);
107
48
    if (!res.ok()) {
108
0
        LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res
109
0
                     << ", tablet_id:" << read_params.tablet->tablet_id()
110
0
                     << ", schema_hash:" << read_params.tablet->schema_hash()
111
0
                     << ", reader_type:" << int(read_params.reader_type)
112
0
                     << ", version:" << read_params.version;
113
0
        return res;
114
0
    }
115
    // check if rowsets are noneoverlapping
116
48
    {
117
48
        SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns);
118
48
        _is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params);
119
48
        _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
120
48
                            read_params.read_orderby_key_reverse);
121
48
    }
122
123
48
    std::vector<RowsetReaderSharedPtr> valid_rs_readers;
124
48
    RuntimeState* runtime_state = read_params.runtime_state;
125
126
48
    {
127
48
        SCOPED_RAW_TIMER(&_stats.block_reader_rs_readers_init_timer_ns);
128
192
        for (int i = 0; i < read_params.rs_splits.size(); ++i) {
129
144
            if (runtime_state != nullptr && runtime_state->is_cancelled()) {
130
0
                return runtime_state->cancel_reason();
131
0
            }
132
133
144
            auto& rs_split = read_params.rs_splits[i];
134
135
            // _vcollect_iter.topn_next() will init rs_reader by itself
136
144
            if (!_vcollect_iter.use_topn_next()) {
137
144
                RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, rs_split));
138
144
            }
139
140
144
            Status res1 = _vcollect_iter.add_child(rs_split);
141
144
            if (!res1.ok() && !res1.is<END_OF_FILE>()) {
142
0
                LOG(WARNING) << "failed to add child to iterator, err=" << res1;
143
0
                return res1;
144
0
            }
145
144
            if (res1.ok()) {
146
144
                valid_rs_readers.push_back(rs_split.rs_reader);
147
144
            }
148
144
        }
149
48
    }
150
48
    {
151
48
        SCOPED_RAW_TIMER(&_stats.block_reader_build_heap_init_timer_ns);
152
48
        RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers));
153
        // _vcollect_iter.topn_next() can not use current_row
154
48
        if (!_vcollect_iter.use_topn_next()) {
155
48
            auto status = _vcollect_iter.current_row(&_next_row);
156
48
            _eof = status.is<END_OF_FILE>();
157
48
        }
158
48
    }
159
160
0
    return Status::OK();
161
48
}
162
163
0
Status BlockReader::_init_agg_state(const ReaderParams& read_params) {
164
0
    if (_eof) {
165
0
        return Status::OK();
166
0
    }
167
168
0
    _stored_data_columns =
169
0
            _next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns();
170
171
0
    _stored_has_null_tag.resize(_stored_data_columns.size());
172
0
    _stored_has_variable_length_tag.resize(_stored_data_columns.size());
173
174
0
    auto& tablet_schema = *_tablet_schema;
175
0
    for (auto idx : _agg_columns_idx) {
176
0
        auto column = tablet_schema.column(
177
0
                read_params.origin_return_columns->at(_return_columns_loc[idx]));
178
0
        AggregateFunctionPtr function =
179
0
                column.get_aggregate_function(AGG_READER_SUFFIX, read_params.get_be_exec_version());
180
181
        // to avoid coredump when something goes wrong(i.e. column missmatch)
182
0
        if (!function) {
183
0
            return Status::InternalError(
184
0
                    "Failed to init reader when init agg state: "
185
0
                    "tablet_id: {}, schema_hash: {}, reader_type: {}, version: {}",
186
0
                    read_params.tablet->tablet_id(), read_params.tablet->schema_hash(),
187
0
                    int(read_params.reader_type), read_params.version.to_string());
188
0
        }
189
0
        _agg_functions.push_back(function);
190
        // create aggregate data
191
0
        AggregateDataPtr place = new char[function->size_of_data()];
192
0
        SAFE_CREATE(function->create(place), {
193
0
            _agg_functions.pop_back();
194
0
            delete[] place;
195
0
        });
196
0
        _agg_places.push_back(place);
197
198
        // calculate `_has_variable_length_tag` tag. like string, array, map
199
0
        _stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length();
200
0
    }
201
202
0
    return Status::OK();
203
0
}
204
205
48
Status BlockReader::init(const ReaderParams& read_params) {
206
48
    RETURN_IF_ERROR(TabletReader::init(read_params));
207
208
48
    auto return_column_size = read_params.origin_return_columns->size();
209
48
    _return_columns_loc.resize(read_params.return_columns.size(), -1);
210
48
    std::unordered_map<int32_t /*cid*/, int32_t /*pos*/> pos_map;
211
176
    for (int i = 0; i < return_column_size; ++i) {
212
128
        auto cid = read_params.origin_return_columns->at(i);
213
        // For each original cid, find the index in return_columns
214
240
        for (int j = 0; j < read_params.return_columns.size(); ++j) {
215
240
            if (read_params.return_columns[j] == cid) {
216
128
                if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) {
217
128
                    pos_map[cid] = (int32_t)_normal_columns_idx.size();
218
128
                    _normal_columns_idx.emplace_back(j);
219
128
                } else {
220
0
                    _agg_columns_idx.emplace_back(j);
221
0
                }
222
128
                _return_columns_loc[j] = i;
223
128
                break;
224
128
            }
225
240
        }
226
128
    }
227
228
48
    if (_tablet_schema->has_seq_map()) {
229
0
        if (_tablet_schema->has_sequence_col()) {
230
0
            auto msg = "sequence columns conflict, both seq_col and seq_map are true!";
231
0
            LOG(WARNING) << msg;
232
0
            return Status::InternalError(msg);
233
0
        }
234
0
        _has_seq_map = true;
235
0
        for (auto seq_val_iter = _tablet_schema->seq_col_idx_to_value_cols_idx().cbegin();
236
0
             seq_val_iter != _tablet_schema->seq_col_idx_to_value_cols_idx().cend();
237
0
             ++seq_val_iter) {
238
0
            int seq_loc = -1;
239
0
            for (int i = 0; i < read_params.return_columns.size(); ++i) {
240
0
                if (read_params.return_columns[i] == seq_val_iter->first) {
241
0
                    seq_loc = i;
242
0
                    break;
243
0
                }
244
0
            }
245
0
            if (seq_loc == -1) {
246
                // don't need to deal with this seq col
247
0
                continue;
248
0
            }
249
250
0
            std::vector<uint32_t> pos_vec;
251
0
            for (auto agg_cid : seq_val_iter->second) {
252
0
                const auto& val_pos_iter = pos_map.find(agg_cid);
253
0
                if (val_pos_iter == pos_map.end()) {
254
0
                    continue;
255
0
                }
256
0
                pos_vec.emplace_back(val_pos_iter->second);
257
0
            }
258
0
            if (_return_columns_loc[seq_loc] == -1) {
259
0
                _seq_map_not_in_origin_block.emplace(seq_loc, pos_vec);
260
0
            } else {
261
0
                _seq_map_in_origin_block.emplace(seq_loc, pos_vec);
262
0
            }
263
0
        }
264
0
    }
265
266
48
    auto status = _init_collect_iter(read_params);
267
48
    if (!status.ok()) [[unlikely]] {
268
0
        if (!config::is_cloud_mode()) {
269
0
            static_cast<Tablet*>(_tablet.get())->report_error(status);
270
0
        }
271
0
        return status;
272
0
    }
273
274
48
    if (_direct_mode) {
275
0
        _next_block_func = &BlockReader::_direct_next_block;
276
0
        return Status::OK();
277
0
    }
278
48
    if (_has_seq_map && !_eof) {
279
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
280
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
281
0
            auto seq_idx = it->first;
282
0
            _seq_columns.insert(
283
0
                    {seq_idx, _next_row.block->get_by_position(seq_idx).column->clone_empty()});
284
0
        }
285
0
    }
286
287
48
    switch (tablet()->keys_type()) {
288
16
    case KeysType::DUP_KEYS:
289
16
        _next_block_func = &BlockReader::_direct_next_block;
290
16
        break;
291
32
    case KeysType::UNIQUE_KEYS:
292
32
        if (read_params.reader_type == ReaderType::READER_QUERY &&
293
32
            _reader_context.enable_unique_key_merge_on_write) {
294
0
            _next_block_func = &BlockReader::_direct_next_block;
295
32
        } else if (_has_seq_map) {
296
0
            _next_block_func = &BlockReader::_replace_key_next_block;
297
32
        } else {
298
32
            _next_block_func = &BlockReader::_unique_key_next_block;
299
32
            if (_filter_delete) {
300
32
                _delete_filter_column = ColumnUInt8::create();
301
32
            }
302
32
        }
303
32
        break;
304
0
    case KeysType::AGG_KEYS:
305
0
        _next_block_func = &BlockReader::_agg_key_next_block;
306
0
        RETURN_IF_ERROR(_init_agg_state(read_params));
307
0
        break;
308
0
    default:
309
0
        DCHECK(false) << "No next row function for type:" << tablet()->keys_type();
310
0
        break;
311
48
    }
312
313
48
    return Status::OK();
314
48
}
315
316
294
Status BlockReader::_direct_next_block(Block* block, bool* eof) {
317
294
    auto res = _vcollect_iter.next(block);
318
294
    if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
319
0
        return res;
320
0
    }
321
294
    *eof = res.is<END_OF_FILE>();
322
294
    _eof = *eof;
323
294
    if (UNLIKELY(_reader_context.record_rowids)) {
324
294
        res = _vcollect_iter.current_block_row_locations(&_block_row_locations);
325
294
        if (UNLIKELY(!res.ok() && res != Status::Error<END_OF_FILE>(""))) {
326
0
            return res;
327
0
        }
328
294
        DCHECK_EQ(_block_row_locations.size(), block->rows());
329
294
    }
330
294
    return Status::OK();
331
294
}
332
333
0
Status BlockReader::_direct_agg_key_next_block(Block* block, bool* eof) {
334
0
    return Status::OK();
335
0
}
336
337
0
Status BlockReader::_replace_key_next_block(Block* block, bool* eof) {
338
0
    if (UNLIKELY(_eof)) {
339
0
        *eof = true;
340
0
        return Status::OK();
341
0
    }
342
343
0
    auto target_block_row = 0;
344
0
    auto target_columns = block->mutate_columns();
345
    // currently seq mapping only support mor table
346
    // so this will not be executed for the time being
347
0
    if (UNLIKELY(_reader_context.record_rowids)) {
348
0
        _block_row_locations.resize(batch_size());
349
0
    }
350
0
    auto merged_row = 0;
351
0
    while (target_block_row < batch_size() && !_eof) {
352
0
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
353
        // use the first line to init _seq_columns
354
0
        for (auto it = _seq_map_not_in_origin_block.cbegin();
355
0
             it != _seq_map_not_in_origin_block.cend(); ++it) {
356
0
            auto seq_idx = it->first;
357
0
            _update_last_mutil_seq(seq_idx);
358
0
        }
359
0
        if (UNLIKELY(_reader_context.record_rowids)) {
360
0
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
361
0
        }
362
0
        target_block_row++;
363
364
0
        while (!_eof) {
365
            // the version is in reverse order, the first row is the highest version,
366
            // in UNIQUE_KEY highest version is the final result
367
0
            auto res = _vcollect_iter.next(&_next_row);
368
0
            if (UNLIKELY(res.is<END_OF_FILE>())) {
369
0
                _eof = true;
370
0
                *eof = true;
371
0
                if (UNLIKELY(_reader_context.record_rowids)) {
372
0
                    _block_row_locations.resize(target_block_row);
373
0
                }
374
0
                break;
375
0
            }
376
377
0
            if (UNLIKELY(!res.ok())) {
378
0
                LOG(WARNING) << "next failed: " << res;
379
0
                return res;
380
0
            }
381
382
0
            if (_next_row.is_same) {
383
0
                merged_row++;
384
0
                _compare_sequence_map_and_replace(target_columns);
385
0
            } else {
386
0
                break;
387
0
            }
388
0
        }
389
0
    }
390
0
    _merged_rows += merged_row;
391
0
    return Status::OK();
392
0
}
393
394
0
void BlockReader::_compare_sequence_map_and_replace(MutableColumns& columns) {
395
0
    auto src_block = _next_row.block.get();
396
0
    auto src_pos = _next_row.row_pos;
397
398
    // use seq column in origin block to compare and replace
399
0
    for (auto it = _seq_map_in_origin_block.cbegin(); it != _seq_map_in_origin_block.cend(); ++it) {
400
0
        auto seq_idx = it->first;
401
0
        auto dst_seq_column = columns[_return_columns_loc[seq_idx]].get();
402
0
        auto dst_pos = dst_seq_column->size() - 1;
403
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
404
        // the rowset version of dst is higher .
405
0
        auto res = dst_seq_column->compare_at(dst_pos, src_pos, *src_seq_column, -1);
406
0
        if (res >= 0) {
407
0
            continue;
408
0
        }
409
410
        // update value and seq column
411
0
        for (auto& p : it->second) {
412
0
            auto val_idx = _normal_columns_idx[p];
413
0
            auto src_column = src_block->get_by_position(val_idx).column;
414
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
415
0
            dst_column->pop_back(1);
416
0
            dst_column->insert_from(*src_column, src_pos);
417
0
        }
418
419
0
        dst_seq_column->pop_back(1);
420
0
        dst_seq_column->insert_from(*src_seq_column, src_pos);
421
0
    }
422
423
    // use temp seq block to compare and replace because origin block not contains these seq columns
424
0
    for (auto it = _seq_map_not_in_origin_block.cbegin(); it != _seq_map_not_in_origin_block.cend();
425
0
         ++it) {
426
0
        auto seq_idx = it->first;
427
0
        auto dst_seq_column = _seq_columns[seq_idx].get();
428
0
        auto src_seq_column = src_block->get_by_position(seq_idx).column;
429
        // the rowset version of dst is higher .
430
0
        auto res = dst_seq_column->compare_at(0, src_pos, *src_seq_column, -1);
431
0
        if (res >= 0) {
432
0
            continue;
433
0
        }
434
435
        // update value and seq column (if need to return)
436
0
        for (auto& p : it->second) {
437
0
            auto val_idx = _normal_columns_idx[p];
438
0
            auto src_column = src_block->get_by_position(val_idx).column;
439
0
            auto dst_column = columns[_return_columns_loc[val_idx]].get();
440
0
            dst_column->pop_back(1);
441
0
            dst_column->insert_from(*src_column, src_pos);
442
0
        }
443
444
0
        _update_last_mutil_seq(seq_idx);
445
0
    }
446
0
}
447
448
0
void BlockReader::_update_last_mutil_seq(int seq_idx) {
449
0
    auto block = _next_row.block.get();
450
0
    _seq_columns[seq_idx]->clear();
451
0
    _seq_columns[seq_idx]->insert_from(*block->get_by_position(seq_idx).column, _next_row.row_pos);
452
0
}
453
454
0
Status BlockReader::_agg_key_next_block(Block* block, bool* eof) {
455
0
    if (UNLIKELY(_eof)) {
456
0
        *eof = true;
457
0
        return Status::OK();
458
0
    }
459
460
0
    auto target_block_row = 0;
461
0
    auto merged_row = 0;
462
0
    auto target_columns = block->mutate_columns();
463
0
    RETURN_IF_ERROR(_insert_data_normal(target_columns));
464
0
    target_block_row++;
465
0
    _append_agg_data(target_columns);
466
467
0
    while (true) {
468
0
        auto res = _vcollect_iter.next(&_next_row);
469
0
        if (UNLIKELY(res.is<END_OF_FILE>())) {
470
0
            _eof = true;
471
0
            *eof = true;
472
0
            break;
473
0
        }
474
0
        if (UNLIKELY(!res.ok())) {
475
0
            LOG(WARNING) << "next failed: " << res;
476
0
            return res;
477
0
        }
478
479
0
        if (!_next_row.is_same) {
480
0
            if (target_block_row == _reader_context.batch_size) {
481
0
                break;
482
0
            }
483
0
            _agg_data_counters.push_back(_last_agg_data_counter);
484
0
            _last_agg_data_counter = 0;
485
486
0
            RETURN_IF_ERROR(_insert_data_normal(target_columns));
487
488
0
            target_block_row++;
489
0
        } else {
490
0
            merged_row++;
491
0
        }
492
493
0
        _append_agg_data(target_columns);
494
0
    }
495
496
0
    _agg_data_counters.push_back(_last_agg_data_counter);
497
0
    _last_agg_data_counter = 0;
498
0
    _update_agg_data(target_columns);
499
0
    block->set_columns(std::move(target_columns));
500
501
0
    _merged_rows += merged_row;
502
0
    return Status::OK();
503
0
}
504
505
284
Status BlockReader::_unique_key_next_block(Block* block, bool* eof) {
506
284
    if (UNLIKELY(_eof)) {
507
0
        *eof = true;
508
0
        return Status::OK();
509
0
    }
510
511
284
    auto target_block_row = 0;
512
284
    auto target_columns = block->mutate_columns();
513
284
    if (UNLIKELY(_reader_context.record_rowids)) {
514
284
        _block_row_locations.resize(_reader_context.batch_size);
515
284
    }
516
517
280k
    do {
518
280k
        RETURN_IF_ERROR(_insert_data_normal(target_columns));
519
520
280k
        if (UNLIKELY(_reader_context.record_rowids)) {
521
280k
            _block_row_locations[target_block_row] = _vcollect_iter.current_row_location();
522
280k
        }
523
280k
        target_block_row++;
524
525
        // the version is in reverse order, the first row is the highest version,
526
        // in UNIQUE_KEY highest version is the final result, there is no need to
527
        // merge the lower versions
528
280k
        auto res = _vcollect_iter.next(&_next_row);
529
280k
        if (UNLIKELY(res.is<END_OF_FILE>())) {
530
32
            _eof = true;
531
32
            *eof = true;
532
32
            if (UNLIKELY(_reader_context.record_rowids)) {
533
32
                _block_row_locations.resize(target_block_row);
534
32
            }
535
32
            break;
536
32
        }
537
538
280k
        if (UNLIKELY(!res.ok())) {
539
0
            LOG(WARNING) << "next failed: " << res;
540
0
            return res;
541
0
        }
542
280k
    } while (target_block_row < _reader_context.batch_size);
543
544
284
    if (_delete_sign_available) {
545
284
        int delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN);
546
284
        DCHECK(delete_sign_idx > 0);
547
284
        if (delete_sign_idx <= 0 || delete_sign_idx >= target_columns.size()) {
548
0
            LOG(WARNING) << "tablet_id: " << tablet()->tablet_id() << " delete sign idx "
549
0
                         << delete_sign_idx
550
0
                         << " not invalid, skip filter delete in base compaction";
551
0
            return Status::OK();
552
0
        }
553
284
        MutableColumnPtr delete_filter_column = (*std::move(_delete_filter_column)).mutate();
554
284
        reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->resize(target_block_row);
555
556
284
        auto* __restrict filter_data =
557
284
                reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->get_data().data();
558
284
        auto* __restrict delete_data =
559
284
                reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get())
560
284
                        ->get_data()
561
284
                        .data();
562
284
        int delete_count = 0;
563
280k
        for (int i = 0; i < target_block_row; ++i) {
564
280k
            bool sign = (delete_data[i] == 0);
565
280k
            filter_data[i] = sign;
566
280k
            if (UNLIKELY(!sign)) {
567
0
                if (UNLIKELY(_reader_context.record_rowids)) {
568
0
                    _block_row_locations[i].row_id = -1;
569
0
                    delete_count++;
570
0
                }
571
0
            }
572
280k
        }
573
284
        auto target_columns_size = target_columns.size();
574
284
        ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column,
575
284
                                                         std::make_shared<DataTypeUInt8>(),
576
284
                                                         "__DORIS_COMPACTION_FILTER__"};
577
284
        block->set_columns(std::move(target_columns));
578
284
        block->insert(column_with_type_and_name);
579
284
        RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size));
580
284
        _stats.rows_del_filtered += target_block_row - block->rows();
581
284
        if (UNLIKELY(_reader_context.record_rowids)) {
582
284
            DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count);
583
284
        }
584
284
    } else {
585
0
        block->set_columns(std::move(target_columns));
586
0
    }
587
284
    return Status::OK();
588
284
}
589
590
280k
Status BlockReader::_insert_data_normal(MutableColumns& columns) {
591
280k
    auto block = _next_row.block.get();
592
593
280k
    RETURN_IF_CATCH_EXCEPTION({
594
280k
        for (auto idx : _normal_columns_idx) {
595
280k
            columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column,
596
280k
                                                           _next_row.row_pos);
597
280k
        }
598
280k
    });
599
280k
    return Status::OK();
600
280k
}
601
602
0
void BlockReader::_append_agg_data(MutableColumns& columns) {
603
0
    _stored_row_ref.push_back(_next_row);
604
0
    _last_agg_data_counter++;
605
606
    // execute aggregate when have `batch_size` column or some ref invalid soon
607
0
    bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1);
608
0
    if (is_last || _stored_row_ref.size() == _reader_context.batch_size) {
609
0
        _update_agg_data(columns);
610
0
    }
611
0
}
612
613
0
void BlockReader::_update_agg_data(MutableColumns& columns) {
614
    // copy data to stored block
615
0
    size_t copy_size = _copy_agg_data();
616
617
    // calculate has_null_tag
618
0
    for (auto idx : _agg_columns_idx) {
619
0
        _stored_has_null_tag[idx] = _stored_data_columns[idx]->has_null(0, copy_size);
620
0
    }
621
622
    // calculate aggregate and insert
623
0
    int counter_sum = 0;
624
0
    for (int counter : _agg_data_counters) {
625
0
        _update_agg_value(columns, counter_sum, counter_sum + counter - 1);
626
0
        counter_sum += counter;
627
0
    }
628
629
    // some key still has value at next block, so do not insert
630
0
    if (_last_agg_data_counter) {
631
0
        _update_agg_value(columns, counter_sum, counter_sum + _last_agg_data_counter - 1, false);
632
0
        _last_agg_data_counter = 0;
633
0
    }
634
635
0
    _agg_data_counters.clear();
636
0
}
637
638
0
size_t BlockReader::_copy_agg_data() {
639
0
    size_t copy_size = _stored_row_ref.size();
640
641
0
    for (size_t i = 0; i < copy_size; i++) {
642
0
        auto& ref = _stored_row_ref[i];
643
0
        _temp_ref_map[ref.block.get()].emplace_back(ref.row_pos, i);
644
0
    }
645
646
0
    for (auto idx : _agg_columns_idx) {
647
0
        auto& dst_column = _stored_data_columns[idx];
648
0
        if (_stored_has_variable_length_tag[idx]) {
649
            //variable length type should replace ordered
650
0
            dst_column->clear();
651
0
            for (size_t i = 0; i < copy_size; i++) {
652
0
                auto& ref = _stored_row_ref[i];
653
0
                dst_column->insert_from(*ref.block->get_by_position(idx).column, ref.row_pos);
654
0
            }
655
0
        } else {
656
0
            for (auto& it : _temp_ref_map) {
657
0
                if (!it.second.empty()) {
658
0
                    auto& src_column = *it.first->get_by_position(idx).column;
659
0
                    for (auto& pos : it.second) {
660
0
                        dst_column->replace_column_data(src_column, pos.first, pos.second);
661
0
                    }
662
0
                }
663
0
            }
664
0
        }
665
0
    }
666
667
0
    for (auto& it : _temp_ref_map) {
668
0
        it.second.clear();
669
0
    }
670
0
    _stored_row_ref.clear();
671
672
0
    return copy_size;
673
0
}
674
675
0
void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, bool is_close) {
676
0
    for (int i = 0; i < _agg_columns_idx.size(); i++) {
677
0
        auto idx = _agg_columns_idx[i];
678
679
0
        AggregateFunctionPtr function = _agg_functions[i];
680
0
        AggregateDataPtr place = _agg_places[i];
681
0
        auto* column_ptr = _stored_data_columns[idx].get();
682
683
0
        if (begin <= end) {
684
0
            function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr),
685
0
                                      _arena, _stored_has_null_tag[idx]);
686
0
        }
687
688
0
        if (is_close) {
689
0
            function->insert_result_into(place, *columns[_return_columns_loc[idx]]);
690
            // reset aggregate data
691
0
            function->reset(place);
692
0
        }
693
0
    }
694
0
    if (is_close) {
695
0
        _arena.clear();
696
0
    }
697
0
}
698
699
#include "common/compile_check_end.h"
700
} // namespace doris