Coverage Report

Created: 2026-06-17 03:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/tablet/tablet_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/PaloInternalService_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <stddef.h>
24
#include <stdint.h>
25
26
#include <memory>
27
#include <set>
28
#include <string>
29
#include <unordered_set>
30
#include <utility>
31
#include <vector>
32
33
#include "agent/be_exec_version_manager.h"
34
#include "common/status.h"
35
#include "exprs/function_filter.h"
36
#include "io/io_common.h"
37
#include "storage/delete/delete_handler.h"
38
#include "storage/iterators.h"
39
#include "storage/olap_common.h"
40
#include "storage/olap_tuple.h"
41
#include "storage/predicate/filter_olap_param.h"
42
#include "storage/row_cursor.h"
43
#include "storage/rowid_conversion.h"
44
#include "storage/rowset/rowset.h"
45
#include "storage/rowset/rowset_meta.h"
46
#include "storage/rowset/rowset_reader.h"
47
#include "storage/rowset/rowset_reader_context.h"
48
#include "storage/tablet/base_tablet.h"
49
#include "storage/tablet/tablet_fwd.h"
50
51
namespace doris {
52
53
class RuntimeState;
54
class BloomFilterFuncBase;
55
class ColumnPredicate;
56
class DeleteBitmap;
57
class HybridSetBase;
58
class RuntimeProfile;
59
60
class VCollectIterator;
61
class Block;
62
class VExpr;
63
class Arena;
64
class VExprContext;
65
66
// Used to compare row with input scan key. Scan key only contains key columns,
67
// row contains all key columns, which is superset of key columns.
68
// So we should compare the common prefix columns of lhs and rhs.
69
//
70
// NOTE: if you are not sure if you can use it, please don't use this function.
71
8.65k
inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) {
72
8.65k
    auto cmp_cids = std::min(lhs.field_count(), rhs.field_count());
73
16.1k
    for (uint32_t cid = 0; cid < cmp_cids; ++cid) {
74
16.0k
        const auto& lf = lhs.field(cid);
75
16.0k
        const auto& rf = rhs.field(cid);
76
        // Handle nulls: null < non-null
77
16.0k
        if (lf.is_null() != rf.is_null()) {
78
284
            return lf.is_null() ? -1 : 1;
79
284
        }
80
15.7k
        if (lf.is_null()) {
81
112
            continue; // both null
82
112
        }
83
15.6k
        auto cmp = lf <=> rf;
84
15.6k
        if (cmp < 0) return -1;
85
7.39k
        if (cmp > 0) return 1;
86
7.39k
    }
87
112
    return 0;
88
8.65k
}
89
90
class TabletReader {
91
    struct KeysParam {
92
        std::vector<RowCursor> start_keys;
93
        std::vector<RowCursor> end_keys;
94
        bool start_key_include = false;
95
        bool end_key_include = false;
96
    };
97
98
public:
99
    // Params for Reader,
100
    // mainly include tablet, data version and fetch range.
101
    struct ReaderParams {
102
4.08k
        bool has_single_version() const {
103
4.08k
            return (rs_splits.size() == 1 &&
104
4.08k
                    rs_splits[0].rs_reader->rowset()->start_version() == 0 &&
105
4.08k
                    !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) ||
106
4.08k
                   (rs_splits.size() == 2 &&
107
3.34k
                    rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 &&
108
3.34k
                    rs_splits[1].rs_reader->rowset()->start_version() == 2 &&
109
3.34k
                    !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
110
4.08k
        }
111
112
3
        int get_be_exec_version() const {
113
3
            if (runtime_state) {
114
0
                return runtime_state->be_exec_version();
115
0
            }
116
3
            return BeExecVersionManager::get_newest_version();
117
3
        }
118
119
4.17k
        void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) {
120
4.17k
            rs_splits = std::move(read_source.rs_splits);
121
4.17k
            delete_predicates = std::move(read_source.delete_predicates);
122
4.17k
#ifndef BE_TEST
123
4.17k
            if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) {
124
3.00k
                delete_bitmap = std::move(read_source.delete_bitmap);
125
3.00k
            }
126
4.17k
#endif
127
4.17k
        }
128
129
        BaseTabletSPtr tablet;
130
        TabletSchemaSPtr tablet_schema;
131
        ReaderType reader_type = ReaderType::READER_QUERY;
132
        bool direct_mode = false;
133
        bool aggregation = false;
134
        // for compaction, schema_change, check_sum: we don't use page cache
135
        // for query, when the BE config disable_storage_page_cache is false, we use page cache
136
        bool use_page_cache = false;
137
        Version version = Version(-1, 0);
138
139
        std::vector<OlapTuple> start_key;
140
        std::vector<OlapTuple> end_key;
141
        bool start_key_include = false;
142
        bool end_key_include = false;
143
144
        std::vector<std::shared_ptr<ColumnPredicate>> predicates;
145
        std::vector<FunctionFilter> function_filters;
146
        std::vector<RowsetMetaSharedPtr> delete_predicates;
147
        // slots that cast may be eliminated in storage layer
148
        std::map<std::string, DataTypePtr> target_cast_type_for_variants;
149
150
        std::map<int32_t, TColumnAccessPaths> all_access_paths;
151
        std::map<int32_t, TColumnAccessPaths> predicate_access_paths;
152
153
        std::vector<RowSetSplits> rs_splits;
154
        // For unique key table with merge-on-write
155
        DeleteBitmapPtr delete_bitmap = nullptr;
156
157
        // return_columns is init from query schema
158
        std::vector<ColumnId> return_columns;
159
        // output_columns only contain columns in OrderByExprs and outputExprs
160
        std::set<int32_t> output_columns;
161
        RuntimeProfile* profile = nullptr;
162
        RuntimeState* runtime_state = nullptr;
163
164
        // use only in vec exec engine
165
        std::vector<ColumnId>* origin_return_columns = nullptr;
166
        std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
167
        TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE;
168
        VExprContextSPtrs common_expr_ctxs_push_down;
169
170
        // used for compaction to record row ids
171
        bool record_rowids = false;
172
        RowIdConversion* rowid_conversion = nullptr;
173
        std::vector<int> topn_filter_source_node_ids;
174
        int topn_filter_target_node_id = -1;
175
        // used for special optimization for query : ORDER BY key LIMIT n
176
        bool read_orderby_key = false;
177
        // used for special optimization for query : ORDER BY key DESC LIMIT n
178
        bool read_orderby_key_reverse = false;
179
        // For rows with the same key, use ascending order (small-to-large) for tie-breakers.
180
        // For example, use lower rowset version / segment id first.
181
        bool use_insert_order_when_same = false;
182
        // Force a key-ordered merge across all segments even when their key ranges do not
183
        // overlap. By default a rowset reader can skip the merge heap if its segments are
184
        // mono-ascending and disjoint, but row-binlog scans require strict global key order
185
        // (e.g. so MIN_DELTA can group consecutive same-key changes), so this flag is set.
186
        // See BetaRowsetReader::is_merge_iterator() in beta_rowset_reader.h:62.
187
        bool force_key_ordered_read = false;
188
        // num of columns for orderby key
189
        size_t read_orderby_key_num_prefix_columns = 0;
190
        // limit of rows for read_orderby_key
191
        size_t read_orderby_key_limit = 0;
192
        // for vertical compaction
193
        bool is_key_column_group = false;
194
        std::vector<uint32_t> key_group_cluster_key_idxes;
195
196
        // For sparse column compaction optimization
197
        // When true, use optimized path for sparse wide tables
198
        bool enable_sparse_optimization = false;
199
200
        bool is_segcompaction = false;
201
202
        // Enable value predicate pushdown for MOR tables
203
        bool enable_mor_value_predicate_pushdown = false;
204
205
        std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr;
206
207
        void check_validation() const;
208
209
        int64_t batch_size = -1;
210
211
        std::map<ColumnId, VExprContextSPtr> virtual_column_exprs;
212
        std::map<ColumnId, size_t> vir_cid_to_idx_in_block;
213
        std::map<size_t, DataTypePtr> vir_col_idx_to_type;
214
215
        std::shared_ptr<ScoreRuntime> score_runtime;
216
        CollectionStatisticsPtr collection_statistics;
217
        std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime;
218
219
        uint64_t condition_cache_digest = 0;
220
221
        // General LIMIT budget forwarded to SegmentIterator. -1 means no limit.
222
        int64_t general_read_limit = -1;
223
        TBinlogScanType::type binlog_scan_type = TBinlogScanType::NONE;
224
    };
225
226
4.57k
    TabletReader() = default;
227
228
4.57k
    virtual ~TabletReader() = default;
229
230
    TabletReader(const TabletReader&) = delete;
231
    void operator=(const TabletReader&) = delete;
232
233
    // Initialize TabletReader with tablet, data version and fetch range.
234
    virtual Status init(const ReaderParams& read_params);
235
236
    // Read next block with aggregation.
237
    // Return OK and set `*eof` to false when next block is read
238
    // Return OK and set `*eof` to true when no more rows can be read.
239
    // Return others when unexpected error happens.
240
0
    virtual Status next_block_with_aggregation(Block* block, bool* eof) {
241
0
        return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>(
242
0
                "TabletReader not support next_block_with_aggregation");
243
0
    }
244
245
48
    virtual uint64_t merged_rows() const { return _merged_rows; }
246
247
177
    uint64_t filtered_rows() const {
248
177
        return _stats.rows_del_filtered + _stats.rows_del_by_bitmap +
249
177
               _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered +
250
177
               _stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered;
251
177
    }
252
253
4.08k
    void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; }
254
255
0
    int batch_size() const { return _reader_context.batch_size; }
256
257
280k
    size_t batch_max_rows() const { return _reader_context.batch_size; }
258
259
4.08k
    void set_preferred_block_size_bytes(size_t bytes) {
260
4.08k
        _reader_context.preferred_block_size_bytes = bytes;
261
4.08k
    }
262
263
    // Returns the preferred output block byte budget. Subclasses that support adaptive batch size
264
    // should override this; the base returns 0 (disabled) so VCollectIterator degrades safely
265
    // when called through a TabletReader* that has not been configured.
266
0
    virtual size_t preferred_block_size_bytes() const { return 0; }
267
268
14.8k
    const OlapReaderStatistics& stats() const { return _stats; }
269
16.8k
    OlapReaderStatistics* mutable_stats() { return &_stats; }
270
271
0
    virtual void update_profile(RuntimeProfile* profile) {}
272
    static Status init_reader_params_and_create_block(
273
            TabletSharedPtr tablet, ReaderType reader_type,
274
            const std::vector<RowsetSharedPtr>& input_rowsets,
275
            TabletReader::ReaderParams* reader_params, Block* block);
276
277
protected:
278
    friend class VCollectIterator;
279
    friend class DeleteHandler;
280
281
    Status _init_params(const ReaderParams& read_params);
282
283
    Status _capture_rs_readers(const ReaderParams& read_params);
284
285
    Status _init_keys_param(const ReaderParams& read_params);
286
287
    Status _init_orderby_keys_param(const ReaderParams& read_params);
288
289
    Status _init_conditions_param(const ReaderParams& read_params);
290
291
    virtual std::shared_ptr<ColumnPredicate> _parse_to_predicate(
292
            const FunctionFilter& function_filter);
293
294
    Status _init_delete_condition(const ReaderParams& read_params);
295
296
    Status _init_return_columns(const ReaderParams& read_params);
297
298
9.03k
    const BaseTabletSPtr& tablet() { return _tablet; }
299
    // If original column is a variant type column, and it's predicate is normalized
300
    // so in order to get the real type of column predicate, we need to reset type
301
    // according to the related type in `target_cast_type_for_variants`.Since variant is not
302
    // an predicate applicable type.Otherwise return the original tablet column.
303
    // Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column
304
    // to type bigint
305
    TabletColumn materialize_column(const TabletColumn& orig);
306
307
17.0k
    const TabletSchema& tablet_schema() { return *_tablet_schema; }
308
309
    Arena _predicate_arena;
310
    std::vector<ColumnId> _return_columns;
311
312
    // used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n
313
    // columns for orderby keys
314
    std::vector<uint32_t> _orderby_key_columns;
315
    // only use in outer join which change the column nullable which must keep same in
316
    // vec query engine
317
    std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr;
318
319
    BaseTabletSPtr _tablet;
320
    RowsetReaderContext _reader_context;
321
    TabletSchemaSPtr _tablet_schema;
322
    KeysParam _keys_param;
323
    std::vector<bool> _is_lower_keys_included;
324
    std::vector<bool> _is_upper_keys_included;
325
    std::vector<std::shared_ptr<ColumnPredicate>> _col_predicates;
326
    std::vector<std::shared_ptr<ColumnPredicate>> _value_col_predicates;
327
    DeleteHandler _delete_handler;
328
329
    // Indicates whether the tablets has do a aggregation in storage engine.
330
    bool _aggregation = false;
331
    // for agg query, we don't need to finalize when scan agg object data
332
    ReaderType _reader_type = ReaderType::READER_QUERY;
333
    bool _next_delete_flag = false;
334
    bool _delete_sign_available = false;
335
    bool _filter_delete = false;
336
    int32_t _sequence_col_idx = -1;
337
    bool _direct_mode = false;
338
339
    std::vector<uint32_t> _key_cids;
340
    std::vector<uint32_t> _value_cids;
341
342
    uint64_t _merged_rows = 0;
343
    OlapReaderStatistics _stats;
344
};
345
346
} // namespace doris