Coverage Report

Created: 2026-06-12 10:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_iterator.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <stddef.h>
22
#include <stdint.h>
23
24
#include <map>
25
#include <memory>
26
#include <ostream>
27
#include <roaring/roaring.hh>
28
#include <set>
29
#include <string>
30
#include <unordered_map>
31
#include <utility>
32
#include <vector>
33
34
#include "common/status.h"
35
#include "core/block/block.h"
36
#include "core/block/column_with_type_and_name.h"
37
#include "core/block/columns_with_type_and_name.h"
38
#include "core/column/column.h"
39
#include "core/data_type/data_type.h"
40
#include "core/data_type/primitive_type.h"
41
#include "core/field.h"
42
#include "exec/common/variant_util.h"
43
#include "exprs/score_runtime.h"
44
#include "exprs/vexpr_fwd.h"
45
#include "io/fs/file_reader_writer_fwd.h"
46
#include "runtime/runtime_profile.h"
47
#include "storage/index/ann/ann_topn_runtime.h"
48
#include "storage/index/index_iterator.h"
49
#include "storage/iterators.h"
50
#include "storage/olap_common.h"
51
#include "storage/predicate/block_column_predicate.h"
52
#include "storage/predicate/column_predicate.h"
53
#include "storage/row_cursor.h"
54
#include "storage/schema.h"
55
#include "storage/segment/adaptive_block_size_predictor.h"
56
#include "storage/segment/common.h"
57
#include "storage/segment/segment.h"
58
#include "util/slice.h"
59
60
namespace doris {
61
62
class ObjectPool;
63
class MatchPredicate;
64
65
class VExpr;
66
class VExprContext;
67
struct RowLocation;
68
69
namespace segment_v2 {
70
71
class ColumnIterator;
72
class InvertedIndexIterator;
73
class RowRanges;
74
class IndexIterator;
75
76
Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts, const Schema& schema,
77
                                             const VExprContextSPtrs& common_exprs,
78
                                             std::map<ColumnId, VExprContextSPtr>& virtual_exprs);
79
80
bool storage_expr_slots_match_reader_schema(const StorageReadOptions& read_options);
81
82
struct ColumnPredicateInfo {
83
    ColumnPredicateInfo() = default;
84
85
0
    std::string debug_string() const {
86
0
        std::stringstream ss;
87
0
        ss << "column_name=" << column_name << ", query_op=" << query_op
88
0
           << ", query_value=" << boost::join(query_values, ",");
89
0
        return ss.str();
90
0
    }
91
92
0
    bool is_empty() const {
93
0
        return column_name.empty() && query_values.empty() && query_op.empty();
94
0
    }
95
96
0
    bool is_equal(const ColumnPredicateInfo& column_pred_info) const {
97
0
        if (column_pred_info.column_name != column_name) {
98
0
            return false;
99
0
        }
100
0
101
0
        if (column_pred_info.query_values != query_values) {
102
0
            return false;
103
0
        }
104
0
105
0
        if (column_pred_info.query_op != query_op) {
106
0
            return false;
107
0
        }
108
0
109
0
        return true;
110
0
    }
111
112
    std::string column_name;
113
    // use set to ensure the consistent order of predicate_result_sign generated by inlist.
114
    std::set<std::string> query_values;
115
    std::string query_op;
116
    int32_t column_id;
117
};
118
119
class SegmentIterator : public RowwiseIterator {
120
public:
121
    SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema);
122
    ~SegmentIterator() override;
123
124
    [[nodiscard]] Status init_iterators();
125
    [[nodiscard]] Status init(const StorageReadOptions& opts) override;
126
    [[nodiscard]] Status next_batch(Block* block) override;
127
128
    // Get current block row locations. This function should be called
129
    // after the `next_batch` function.
130
    // Only vectorized version is supported.
131
    [[nodiscard]] Status current_block_row_locations(
132
            std::vector<RowLocation>* block_row_locations) override;
133
134
266
    const Schema& schema() const override { return *_schema; }
135
0
    Segment& segment() { return *_segment; }
136
0
    StorageReadOptions& storage_read_options() { return _opts; }
137
12
    uint64_t data_id() const override { return _segment->id(); }
138
0
    RowsetId rowset_id() const { return _segment->rowset_id(); }
139
0
    int64_t tablet_id() const { return _tablet_id; }
140
141
0
    void update_profile(RuntimeProfile* profile) override {
142
0
        _update_profile(profile, _short_cir_eval_predicate, "ShortCircuitPredicates");
143
0
        _update_profile(profile, _pre_eval_block_predicate, "PreEvaluatePredicates");
144
145
0
        if (_opts.delete_condition_predicates != nullptr) {
146
0
            std::set<std::shared_ptr<const ColumnPredicate>> delete_predicate_set;
147
0
            _opts.delete_condition_predicates->get_all_column_predicate(delete_predicate_set);
148
0
            _update_profile(profile, delete_predicate_set, "DeleteConditionPredicates");
149
0
        }
150
0
    }
151
152
0
    bool has_index_in_iterators() const {
153
0
        return std::any_of(_index_iterators.begin(), _index_iterators.end(),
154
0
                           [](const auto& iterator) { return iterator != nullptr; });
155
0
    }
156
157
private:
158
    Status _next_batch_internal(Block* block);
159
160
    Status _check_output_block(Block* block);
161
162
    template <typename Container>
163
    void _update_profile(RuntimeProfile* profile, const Container& predicates,
164
0
                         const std::string& title) {
165
0
        if (predicates.empty()) {
166
0
            return;
167
0
        }
168
0
        std::string info;
169
0
        for (auto pred : predicates) {
170
0
            info += "\n" + pred->debug_string();
171
0
        }
172
0
        profile->add_info_string(title, info);
173
0
    }
Unexecuted instantiation: _ZN5doris10segment_v215SegmentIterator15_update_profileISt6vectorISt10shared_ptrINS_15ColumnPredicateEESaIS6_EEEEvPNS_14RuntimeProfileERKT_RKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
Unexecuted instantiation: _ZN5doris10segment_v215SegmentIterator15_update_profileISt3setISt10shared_ptrIKNS_15ColumnPredicateEESt4lessIS7_ESaIS7_EEEEvPNS_14RuntimeProfileERKT_RKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEE
174
175
    [[nodiscard]] Status _lazy_init(Block* block);
176
    [[nodiscard]] Status _init_impl(const StorageReadOptions& opts);
177
    [[nodiscard]] Status _init_return_column_iterators();
178
    [[nodiscard]] Status _init_index_iterators();
179
180
    // calculate row ranges that fall into requested key ranges using short key index
181
    [[nodiscard]] Status _get_row_ranges_by_keys();
182
    [[nodiscard]] Status _prepare_seek(const StorageReadOptions::KeyRange& key_range);
183
    [[nodiscard]] Status _lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound,
184
                                         rowid_t* rowid);
185
    // lookup the ordinal of given key from short key index
186
    // the returned rowid is rowid in primary index, not the rowid encoded in primary key
187
    [[nodiscard]] Status _lookup_ordinal_from_sk_index(const RowCursor& key, bool is_include,
188
                                                       rowid_t upper_bound, rowid_t* rowid);
189
    // lookup the ordinal of given key from primary key index
190
    [[nodiscard]] Status _lookup_ordinal_from_pk_index(const RowCursor& key, bool is_include,
191
                                                       rowid_t* rowid);
192
    [[nodiscard]] Status _seek_and_peek(rowid_t rowid);
193
194
    // calculate row ranges that satisfy requested column conditions using various column index
195
    [[nodiscard]] Status _get_row_ranges_by_column_conditions();
196
    [[nodiscard]] Status _get_row_ranges_from_conditions(RowRanges* condition_row_ranges);
197
    [[nodiscard]] Status _apply_expr_zonemap_to_row_ranges(const VExprContextSPtrs& conjuncts,
198
                                                           rowid_t min_rowid,
199
                                                           RowRanges* row_ranges);
200
    [[nodiscard]] Status _apply_inverted_index();
201
    [[nodiscard]] Status _apply_inverted_index_on_column_predicate(
202
            std::shared_ptr<ColumnPredicate> pred,
203
            std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates,
204
            bool* continue_apply);
205
    [[nodiscard]] Status _apply_ann_topn_predicate();
206
    [[nodiscard]] Status _apply_index_expr();
207
208
    bool _column_has_fulltext_index(int32_t cid);
209
    bool _column_has_ann_index(int32_t cid);
210
    bool _downgrade_without_index(Status res, bool need_remaining = false);
211
    inline bool _inverted_index_not_support_pred_type(const PredicateType& type);
212
    bool _is_literal_node(const TExprNodeType::type& node_type);
213
214
    Status _vec_init_lazy_materialization();
215
216
5.80k
    uint32_t segment_id() const { return _segment->id(); }
217
9.39k
    uint32_t num_rows() const { return _segment->num_rows(); }
218
219
    [[nodiscard]] Status _seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos);
220
    // read `nrows` of columns specified by `column_ids` into `block` at `row_offset`.
221
    // for vectorization implementation
222
    [[nodiscard]] Status _read_columns(const std::vector<ColumnId>& column_ids,
223
                                       MutableColumns& column_block, size_t nrows);
224
    [[nodiscard]] Status _read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read);
225
    void _replace_version_col_if_needed(const std::vector<ColumnId>& column_ids, size_t num_rows);
226
    void _update_lsn_col_if_needed(const std::vector<ColumnId>& column_ids, size_t num_rows);
227
    void _update_tso_col_if_needed(const std::vector<ColumnId>& column_ids, size_t num_rows);
228
    Status _init_current_block(Block* block, std::vector<MutableColumnPtr>& non_pred_vector,
229
                               uint32_t nrows_read_limit);
230
    uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
231
    uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
232
    Status _apply_read_limit_to_selected_rows(Block* block, uint16_t& selected_size);
233
    void _collect_runtime_filter_predicate();
234
    Status _output_non_pred_columns(Block* block);
235
    [[nodiscard]] Status _read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
236
                                                 std::vector<rowid_t>& rowid_vector,
237
                                                 uint16_t* sel_rowid_idx, size_t select_size,
238
                                                 MutableColumns* mutable_columns,
239
                                                 bool init_condition_cache = false);
240
241
    Status copy_column_data_by_selector(IColumn* input_col_ptr, MutableColumnPtr& output_col,
242
                                        uint16_t* sel_rowid_idx, uint16_t select_size,
243
                                        size_t batch_size);
244
245
    template <class Container>
246
    [[nodiscard]] Status _output_column_by_sel_idx(Block* block, const Container& column_ids,
247
1.66k
                                                   uint16_t* sel_rowid_idx, uint16_t select_size) {
248
1.66k
        SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
249
1.66k
        for (auto cid : column_ids) {
250
1.66k
            int block_cid = _schema_block_id_map[cid];
251
            // Only the additional deleted filter condition need to materialize column be at the end of the block
252
            // We should not to materialize the column of query engine do not need. So here just return OK.
253
            // Eg:
254
            //      `delete from table where a = 10;`
255
            //      `select b from table;`
256
            // a column only effective in segment iterator, the block from query engine only contain the b column.
257
            // so the `block_cid >= data.size()` is true
258
1.66k
            if (block_cid >= block->columns()) {
259
277
                continue;
260
277
            }
261
1.39k
            DataTypePtr storage_type = _segment->get_data_type_of(*_schema->column(cid), _opts);
262
1.39k
            if (storage_type && !storage_type->equals(*block->get_by_position(block_cid).type)) {
263
                // Do additional cast
264
0
                MutableColumnPtr tmp = storage_type->create_column();
265
0
                RETURN_IF_ERROR(copy_column_data_by_selector(_current_return_columns[cid].get(),
266
0
                                                             tmp, sel_rowid_idx, select_size,
267
0
                                                             _opts.block_row_max));
268
0
                RETURN_IF_ERROR(variant_util::cast_column(
269
0
                        {tmp->get_ptr(), storage_type, ""}, block->get_by_position(block_cid).type,
270
0
                        &block->get_by_position(block_cid).column));
271
1.39k
            } else {
272
1.39k
                MutableColumnPtr output_column =
273
1.39k
                        block->get_by_position(block_cid).column->assert_mutable();
274
1.39k
                RETURN_IF_ERROR(copy_column_data_by_selector(_current_return_columns[cid].get(),
275
1.39k
                                                             output_column, sel_rowid_idx,
276
1.39k
                                                             select_size, _opts.block_row_max));
277
1.39k
            }
278
1.39k
        }
279
1.66k
        return Status::OK();
280
1.66k
    }
281
282
    bool _can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate> predicate);
283
284
    [[nodiscard]] Status _extract_common_expr_columns(const VExprSPtr& expr);
285
    // same with _extract_common_expr_columns, but only extract columns that can be used for index
286
    [[nodiscard]] Status _execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size,
287
                                              Block* block);
288
    Status _process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, Block* block);
289
290
    uint16_t _evaluate_common_expr_filter(uint16_t* sel_rowid_idx, uint16_t selected_size,
291
                                          const IColumn::Filter& filter);
292
293
    // Dictionary column should do something to initial.
294
    void _convert_dict_code_for_predicate_if_necessary();
295
296
    void _convert_dict_code_for_predicate_if_necessary_impl(
297
            std::shared_ptr<ColumnPredicate> predicate);
298
299
    bool _check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred);
300
301
    void _output_index_result_column(const std::vector<VExprContext*>& expr_ctxs,
302
                                     uint16_t* sel_rowid_idx, uint16_t select_size, Block* block);
303
304
    bool _need_read_data(ColumnId cid);
305
    bool _prune_column(ColumnId cid, MutableColumnPtr& column, bool fill_defaults,
306
                       size_t num_of_defaults);
307
308
    Status _construct_compound_expr_context();
309
310
    int _compare_short_key_with_seek_block(const RowCursor& key,
311
0
                                           const std::vector<ColumnId>& col_ids) {
312
0
        for (auto cid : col_ids) {
313
0
            auto ord = key.field(cid) <=> (*_seek_block[cid])[0];
314
0
            if (ord != std::strong_ordering::equal) {
315
0
                return ord < 0 ? -1 : 1;
316
0
            }
317
0
        }
318
0
        return 0;
319
0
    }
320
321
    Status _convert_to_expected_type(const std::vector<ColumnId>& col_ids);
322
323
    bool _no_need_read_key_data(ColumnId cid, MutableColumnPtr& column, size_t nrows_read);
324
325
    bool _has_delete_predicate(ColumnId cid);
326
327
    bool _can_opt_limit_reads();
328
329
    void _initialize_predicate_results();
330
    bool _check_all_conditions_passed_inverted_index_for_column(ColumnId cid,
331
                                                                bool default_return = false);
332
333
    void _calculate_common_expr_index_exec_status();
334
335
    Status _process_eof(Block* block);
336
337
    Status _process_column_predicate();
338
339
    void _fill_column_nothing();
340
341
    Status _process_columns(const std::vector<ColumnId>& column_ids, Block* block);
342
343
    // Initialize virtual columns in the block, set all virtual columns in the block to ColumnNothing
344
    void _init_virtual_columns(Block* block);
345
    // Fallback logic for virtual column materialization, materializing all unmaterialized virtual columns through expressions
346
    Status _materialization_of_virtual_column(Block* block);
347
    void _prepare_score_column_materialization();
348
349
    void _init_row_bitmap_by_condition_cache();
350
351
    void _init_segment_prefetchers();
352
353
    class BitmapRangeIterator;
354
    class BackwardBitmapRangeIterator;
355
356
    std::shared_ptr<Segment> _segment;
357
    // read schema from scanner
358
    SchemaSPtr _schema;
359
    // storage type schema related to _schema, since column in segment may be different with type in _schema
360
    std::vector<IndexFieldNameAndTypePair> _storage_name_and_type;
361
    // vector idx -> column iterarator
362
    std::vector<std::unique_ptr<ColumnIterator>> _column_iterators;
363
    std::vector<std::unique_ptr<IndexIterator>> _index_iterators;
364
    // after init(), `_row_bitmap` contains all rowid to scan
365
    roaring::Roaring _row_bitmap;
366
    // an iterator for `_row_bitmap` that can be used to extract row range to scan
367
    std::unique_ptr<BitmapRangeIterator> _range_iter;
368
    // the next rowid to read
369
    rowid_t _cur_rowid;
370
    // members related to lazy materialization read
371
    // --------------------------------------------
372
    // whether lazy materialization read should be used.
373
    bool _lazy_materialization_read;
374
    // columns to read after predicate evaluation and remaining expr execute
375
    std::vector<ColumnId> _non_predicate_columns;
376
    std::set<ColumnId> _common_expr_columns;
377
    // remember the rowids we've read for the current row block.
378
    // could be a local variable of next_batch(), kept here to reuse vector memory
379
    std::vector<rowid_t> _block_rowids;
380
    bool _is_need_vec_eval = false;
381
    bool _is_need_short_eval = false;
382
    bool _is_need_expr_eval = false;
383
384
    // fields for vectorization execution
385
    std::vector<ColumnId>
386
            _vec_pred_column_ids; // keep columnId of columns for vectorized predicate evaluation
387
    std::vector<ColumnId>
388
            _short_cir_pred_column_ids; // keep columnId of columns for short circuit predicate evaluation
389
    std::vector<bool> _is_pred_column; // columns hold _init segmentIter
390
    std::map<uint32_t, bool> _need_read_data_indices;
391
    std::vector<bool> _is_common_expr_column;
392
    MutableColumns _current_return_columns;
393
    std::vector<std::shared_ptr<ColumnPredicate>> _pre_eval_block_predicate;
394
    std::vector<std::shared_ptr<ColumnPredicate>> _short_cir_eval_predicate;
395
    std::vector<uint32_t> _delete_range_column_ids;
396
    std::vector<uint32_t> _delete_bloom_filter_column_ids;
397
    // when lazy materialization is enabled, segmentIter need to read data at least twice
398
    // first, read predicate columns by various index
399
    // second, read non-predicate columns
400
    // so we need a field to stand for columns first time to read
401
    std::vector<ColumnId> _predicate_column_ids;
402
    std::vector<ColumnId> _common_expr_column_ids;
403
    // TODO: Should use std::vector<size_t>
404
    std::vector<ColumnId> _columns_to_filter;
405
    std::vector<bool> _converted_column_ids;
406
    // TODO: Should use std::vector<size_t>
407
    std::vector<int> _schema_block_id_map; // map from schema column id to column idx in Block
408
409
    // the actual init process is delayed to the first call to next_batch()
410
    bool _lazy_inited;
411
    bool _inited;
412
413
    StorageReadOptions _opts;
414
    // Adaptive batch size predictor; null when the feature is disabled.
415
    std::unique_ptr<AdaptiveBlockSizePredictor> _block_size_predictor;
416
    // Build the AdaptiveBlockSizePredictor for this segment based on segment footer
417
    // metadata for the projected output columns. Returns nullptr if the feature is
418
    // disabled or the byte budget is non-positive.
419
    std::unique_ptr<AdaptiveBlockSizePredictor> _make_block_size_predictor() const;
420
    // Snapshot of _opts.block_row_max at init time; used as the hard upper bound so that
421
    // dynamic adjustments never exceed the capacity of pre-allocated buffers.
422
    uint32_t _initial_block_row_max = 0;
423
    // make a copy of `_opts.column_predicates` in order to make local changes
424
    std::vector<std::shared_ptr<ColumnPredicate>> _col_predicates;
425
    VExprContextSPtrs _common_expr_ctxs_push_down;
426
    std::set<ColumnId> _not_apply_index_pred;
427
428
    // row schema of the key to seek
429
    // only used in `_get_row_ranges_by_keys`
430
    std::unique_ptr<Schema> _seek_schema;
431
    // used to binary search the rowid for a given key
432
    // only used in `_get_row_ranges_by_keys`
433
    MutableColumns _seek_block;
434
435
    io::FileReaderSPtr _file_reader;
436
437
    // used for compaction, record selectd rowids of current batch
438
    uint16_t _selected_size;
439
    std::vector<uint16_t> _sel_rowid_idx;
440
441
    // Rows already produced by this iterator. Used together with
442
    // _opts.read_limit to compute the remaining per-batch budget.
443
    size_t _rows_returned = 0;
444
445
    std::unique_ptr<ObjectPool> _pool;
446
447
    // used to collect filter information.
448
    std::vector<std::shared_ptr<ColumnPredicate>> _filter_info_id;
449
    bool _record_rowids = false;
450
    int64_t _tablet_id = 0;
451
    std::set<int32_t> _output_columns;
452
453
    std::vector<uint8_t> _ret_flags;
454
455
    /*
456
    * column and column_predicates on it.
457
    * a boolean value to indicate whether the column has been read by the index.
458
    */
459
    std::unordered_map<ColumnId, std::unordered_map<std::shared_ptr<ColumnPredicate>, bool>>
460
            _column_predicate_index_exec_status;
461
462
    /*
463
    * column and common expr on it.
464
    * a boolean value to indicate whether the column has been read by the index.
465
    */
466
    std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>
467
            _common_expr_index_exec_status;
468
469
    /*
470
    * common expr context to slotref map
471
    * slot ref map is used to get slot ref expr by using column id.
472
    */
473
    std::unordered_map<VExprContext*, std::unordered_map<ColumnId, VExpr*>>
474
            _common_expr_to_slotref_map;
475
476
    ScoreRuntimeSPtr _score_runtime;
477
478
    std::shared_ptr<segment_v2::AnnTopNRuntime> _ann_topn_runtime;
479
480
    // cid to virtual column expr
481
    std::map<ColumnId, VExprContextSPtr> _virtual_column_exprs;
482
    std::map<ColumnId, size_t> _vir_cid_to_idx_in_block;
483
484
    IndexQueryContextPtr _index_query_context;
485
486
    // key is column uid, value is the sparse column cache
487
    std::unordered_map<int32_t, PathToBinaryColumnCacheUPtr> _variant_sparse_column_cache;
488
489
    bool _find_condition_cache = false;
490
    std::shared_ptr<std::vector<bool>> _condition_cache;
491
    static constexpr int CONDITION_CACHE_OFFSET = 2048;
492
};
493
494
} // namespace segment_v2
495
} // namespace doris