Coverage Report

Created: 2026-05-08 23:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/iterator/block_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <parallel_hashmap/phmap.h>
21
#include <stddef.h>
22
#include <sys/types.h>
23
24
#include <utility>
25
#include <vector>
26
27
#include "common/config.h"
28
#include "common/status.h"
29
#include "core/block/block.h"
30
#include "core/column/column.h"
31
#include "core/data_type/data_type.h"
32
#include "exprs/aggregate/aggregate_function.h"
33
#include "storage/iterator/vcollect_iterator.h"
34
#include "storage/rowset/rowset_reader.h"
35
#include "storage/tablet/tablet_reader.h"
36
#include "storage/utils.h"
37
38
namespace doris {
39
class ColumnPredicate;
40
class FunctionFilter;
41
class RuntimeProfile;
42
43
class BlockReader final : public TabletReader {
44
public:
45
    ~BlockReader() override;
46
47
    // Initialize BlockReader with tablet, data version and fetch range.
48
    Status init(const ReaderParams& read_params) override;
49
50
    Status next_block_with_aggregation(Block* block, bool* eof) override;
51
52
578
    std::vector<RowLocation> current_block_row_locations() { return _block_row_locations; }
53
54
0
    void update_profile(RuntimeProfile* profile) override {
55
0
        return _vcollect_iter.update_profile(profile);
56
0
    }
57
58
    // Returns the configured preferred output block byte budget; 0 when adaptive is disabled.
59
296k
    size_t preferred_block_size_bytes() const override {
60
296k
        return config::enable_adaptive_batch_size ? _reader_context.preferred_block_size_bytes : 0;
61
296k
    }
62
63
private:
64
    // Directly read row from rowset and pass to upper caller. No need to do aggregation.
65
    // This is usually used for DUPLICATE KEY tables
66
    Status _direct_next_block(Block* block, bool* eof);
67
    // Just same as _direct_next_block, but this is only for AGGREGATE KEY tables.
68
    // And this is an optimization for AGGR tables.
69
    // When there is only one rowset and is not overlapping, we can read it directly without aggregation.
70
    Status _direct_agg_key_next_block(Block* block, bool* eof);
71
    // For normal AGGREGATE KEY tables, read data by a merge heap.
72
    Status _agg_key_next_block(Block* block, bool* eof);
73
    // For UNIQUE KEY tables, read data by a merge heap.
74
    // The difference from _agg_key_next_block is that it will read the data from high version to low version,
75
    // to minimize the comparison time in merge heap.
76
    Status _unique_key_next_block(Block* block, bool* eof);
77
78
    Status _replace_key_next_block(Block* block, bool* eof);
79
80
    Status _init_collect_iter(const ReaderParams& read_params);
81
82
    Status _init_agg_state(const ReaderParams& read_params);
83
84
    Status _insert_data_normal(MutableColumns& columns);
85
86
    // for partial update table
87
    void _update_last_mutil_seq(int seq_idx);
88
    void _compare_sequence_map_and_replace(MutableColumns& columns);
89
90
    // Check if the accumulated output columns have reached the preferred byte budget,
91
    // used to limit the output block size for adaptive batch sizing.
92
    bool _reached_byte_budget(const MutableColumns& columns) const;
93
94
    void _append_agg_data(MutableColumns& columns);
95
96
    void _update_agg_data(MutableColumns& columns);
97
98
    size_t _copy_agg_data();
99
100
    void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true);
101
102
    // return false if keys of rowsets are mono ascending and disjoint
103
    bool _rowsets_not_mono_asc_disjoint(const ReaderParams& read_params);
104
105
    VCollectIterator _vcollect_iter;
106
    IteratorRowRef _next_row {{}, -1, false};
107
108
    std::vector<AggregateFunctionPtr> _agg_functions;
109
    std::vector<AggregateDataPtr> _agg_places;
110
111
    std::vector<int> _normal_columns_idx; // key column on agg mode, all column on uniq mode
112
    std::vector<int> _agg_columns_idx;
113
    std::vector<int> _return_columns_loc;
114
115
    std::vector<int> _agg_data_counters;
116
    int _last_agg_data_counter = 0;
117
118
    MutableColumns _stored_data_columns;
119
    std::vector<IteratorRowRef> _stored_row_ref;
120
121
    std::vector<bool> _stored_has_null_tag;
122
    std::vector<bool> _stored_has_variable_length_tag;
123
124
    phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> _temp_ref_map;
125
126
    bool _eof = false;
127
128
    Status (BlockReader::*_next_block_func)(Block* block, bool* eof) = nullptr;
129
130
    std::vector<RowLocation> _block_row_locations;
131
132
    ColumnPtr _delete_filter_column;
133
134
    bool _is_rowsets_overlapping = true;
135
136
    bool _has_seq_map = false;
137
    // for check multi seq
138
    std::unordered_map<uint32_t, MutableColumnPtr> _seq_columns;
139
    // MutableColumns _seq_columns;
140
    // seq in return_columns, val pos in _normal_columns_idx
141
    std::unordered_map<uint32_t, std::vector<uint32_t>> _seq_map_in_origin_block;
142
    std::unordered_map<uint32_t, std::vector<uint32_t>> _seq_map_not_in_origin_block;
143
144
    Arena _arena;
145
};
146
147
} // namespace doris