Coverage Report

Created: 2026-06-17 11:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/iterator/block_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <parallel_hashmap/phmap.h>
21
#include <stddef.h>
22
#include <sys/types.h>
23
24
#include <utility>
25
#include <vector>
26
27
#include "common/config.h"
28
#include "common/status.h"
29
#include "core/block/block.h"
30
#include "core/column/column.h"
31
#include "core/data_type/data_type.h"
32
#include "exprs/aggregate/aggregate_function.h"
33
#include "storage/iterator/vcollect_iterator.h"
34
#include "storage/rowset/rowset_reader.h"
35
#include "storage/tablet/tablet_reader.h"
36
#include "storage/utils.h"
37
38
namespace doris {
39
class ColumnPredicate;
40
class FunctionFilter;
41
class RuntimeProfile;
42
43
class BlockReader final : public TabletReader {
44
public:
45
    ~BlockReader() override;
46
47
    // Initialize BlockReader with tablet, data version and fetch range.
48
    Status init(const ReaderParams& read_params) override;
49
50
    Status next_block_with_aggregation(Block* block, bool* eof) override;
51
52
578
    std::vector<RowLocation> current_block_row_locations() { return _block_row_locations; }
53
54
0
    void update_profile(RuntimeProfile* profile) override {
55
0
        return _vcollect_iter.update_profile(profile);
56
0
    }
57
58
    // Returns the configured preferred output block byte budget; 0 when adaptive is disabled.
59
296k
    size_t preferred_block_size_bytes() const override {
60
296k
        return config::enable_adaptive_batch_size ? _reader_context.preferred_block_size_bytes : 0;
61
296k
    }
62
63
private:
64
    // Directly read row from rowset and pass to upper caller. No need to do aggregation.
65
    // This is usually used for DUPLICATE KEY tables
66
    Status _direct_next_block(Block* block, bool* eof);
67
    // Just same as _direct_next_block, but this is only for AGGREGATE KEY tables.
68
    // And this is an optimization for AGGR tables.
69
    // When there is only one rowset and is not overlapping, we can read it directly without aggregation.
70
    Status _direct_agg_key_next_block(Block* block, bool* eof);
71
    // For normal AGGREGATE KEY tables, read data by a merge heap.
72
    Status _agg_key_next_block(Block* block, bool* eof);
73
    // For UNIQUE KEY tables, read data by a merge heap.
74
    // The difference from _agg_key_next_block is that it will read the data from high version to low version,
75
    // to minimize the comparison time in merge heap.
76
    Status _unique_key_next_block(Block* block, bool* eof);
77
78
    Status _min_delta_next_block(Block* block, bool* eof);
79
80
    Status _detail_change_next_block(Block* block, bool* eof);
81
82
    Status _ensure_binlog_column_pos(const Block& src_block);
83
84
    int64_t _read_binlog_op(const IColumn& col, size_t row) const;
85
86
    Status _write_binlog_op(IColumn& col, int64_t op) const;
87
88
    bool _is_binlog_meta_column(int idx) const;
89
90
    int _resolve_source_column_index(int idx, bool use_before) const;
91
92
    void _init_pending_row_columns(const Block& block);
93
94
    bool _emit_pending_row(MutableColumns& target_columns, size_t& output_row_count);
95
96
    Status _replace_key_next_block(Block* block, bool* eof);
97
98
    Status _init_collect_iter(const ReaderParams& read_params);
99
100
    Status _init_agg_state(const ReaderParams& read_params);
101
102
    Status _insert_data_normal(MutableColumns& columns);
103
104
    // for partial update table
105
    void _update_last_mutil_seq(int seq_idx);
106
    void _compare_sequence_map_and_replace(MutableColumns& columns);
107
108
    // Check if the accumulated output columns have reached the preferred byte budget,
109
    // used to limit the output block size for adaptive batch sizing.
110
    bool _reached_byte_budget(const MutableColumns& columns) const;
111
112
    void _append_agg_data(MutableColumns& columns);
113
114
    void _update_agg_data(MutableColumns& columns);
115
116
    size_t _copy_agg_data();
117
118
    void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true);
119
120
    Status _append_change_row(MutableColumns& target_columns, const Block& src_block,
121
                              size_t row_pos, int64_t output_op, bool use_before);
122
123
    // return false if keys of rowsets are mono ascending and disjoint
124
    bool _rowsets_not_mono_asc_disjoint(const ReaderParams& read_params);
125
126
    VCollectIterator _vcollect_iter;
127
    IteratorRowRef _next_row {{}, -1, false};
128
129
    std::vector<AggregateFunctionPtr> _agg_functions;
130
    std::vector<AggregateDataPtr> _agg_places;
131
132
    std::vector<int> _normal_columns_idx; // key column on agg mode, all column on uniq mode
133
    std::vector<int> _agg_columns_idx;
134
    std::vector<int> _return_columns_loc;
135
136
    std::vector<int> _agg_data_counters;
137
    int _last_agg_data_counter = 0;
138
139
    // Buffer of consecutive rows that share the same primary key, used by
140
    // _min_delta_next_block to fold INSERT/UPDATE/DELETE into a single net change.
141
    // Rows are appended as the merge iterator advances and cleared after each key group.
142
    MutableColumns _stored_data_columns;
143
    std::vector<IteratorRowRef> _stored_row_ref;
144
145
    std::vector<bool> _stored_has_null_tag;
146
    std::vector<bool> _stored_has_variable_length_tag;
147
148
    // One-row carry-over buffer holding the AFTER row of an UPDATE pair when the BEFORE row
149
    // was already emitted on the boundary of batch_max_rows(). Flushed by _emit_pending_row()
150
    // at the start of the next call to *_next_block.
151
    MutableColumns _pending_row_columns;
152
    bool _has_pending_row = false;
153
154
    phmap::flat_hash_map<const Block*, std::vector<std::pair<int, int>>> _temp_ref_map;
155
156
    bool _eof = false;
157
158
    Status (BlockReader::*_next_block_func)(Block* block, bool* eof) = nullptr;
159
160
    std::vector<RowLocation> _block_row_locations;
161
162
    ColumnPtr _delete_filter_column;
163
164
    bool _is_rowsets_overlapping = true;
165
166
    int _binlog_op_pos = -1;
167
    int _binlog_lsn_pos = -1;
168
    int _binlog_timestamp_pos = -1;
169
    bool _binlog_column_pos_inited = false;
170
171
    bool _has_seq_map = false;
172
    // for check multi seq
173
    std::unordered_map<uint32_t, MutableColumnPtr> _seq_columns;
174
    // MutableColumns _seq_columns;
175
    // seq in return_columns, val pos in _normal_columns_idx
176
    std::unordered_map<uint32_t, std::vector<uint32_t>> _seq_map_in_origin_block;
177
    std::unordered_map<uint32_t, std::vector<uint32_t>> _seq_map_not_in_origin_block;
178
    // For each src column index in the binlog block, the index of its companion __BEFORE__
179
    // column (or itself if no BEFORE mirror exists). Built lazily by _ensure_binlog_column_pos
180
    // and consulted via _resolve_source_column_index when emitting BEFORE rows.
181
    std::vector<int> _before_column_idx;
182
    Arena _arena;
183
};
184
185
} // namespace doris