Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <utility>
25
#include <vector>
26
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "storage/iterators.h"
30
#include "storage/olap_common.h"
31
#include "storage/rowset/beta_rowset.h"
32
#include "storage/rowset/rowset.h"
33
#include "storage/rowset/rowset_reader.h"
34
#include "storage/schema.h"
35
#include "storage/segment/segment_loader.h"
36
#include "util/once.h"
37
38
namespace doris {
39
class RuntimeProfile;
40
class Schema;
41
struct RowLocation;
42
struct RowsetReaderContext;
43
44
class BetaRowsetReader : public RowsetReader {
45
public:
46
    BetaRowsetReader(BetaRowsetSharedPtr rowset);
47
48
884
    ~BetaRowsetReader() override { _rowset->release(); }
49
50
    Status init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) override;
51
52
    Status get_segment_iterators(RowsetReaderContext* read_context,
53
                                 std::vector<RowwiseIteratorUPtr>* out_iters,
54
                                 bool use_cache = false) override;
55
    void reset_read_options() override;
56
6.53k
    Status next_batch(Block* block) override { return _next_batch(block); }
57
0
    Status next_batch(BlockView* block_view) override { return _next_batch(block_view); }
58
209
    Status next_batch(BlockWithSameBit* block_with_same_bit) override {
59
209
        return _next_batch(block_with_same_bit);
60
209
    }
61
62
391
    bool is_merge_iterator() const override {
63
391
        return _read_context->need_ordered_result &&
64
391
               _rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1;
65
391
    }
66
67
0
    bool delete_flag() override { return _rowset->delete_flag(); }
68
69
1.03M
    Version version() override { return _rowset->version(); }
70
71
18
    int64_t newest_write_timestamp() override { return _rowset->newest_write_timestamp(); }
72
73
587k
    RowsetSharedPtr rowset() override { return std::dynamic_pointer_cast<Rowset>(_rowset); }
74
75
    // Return the total number of filtered rows, will be used for validation of schema change
76
0
    int64_t filtered_rows() override {
77
0
        return _stats->rows_del_filtered + _stats->rows_del_by_bitmap +
78
0
               _stats->rows_conditions_filtered + _stats->rows_vec_del_cond_filtered +
79
0
               _stats->rows_vec_cond_filtered + _stats->rows_short_circuit_cond_filtered;
80
0
    }
81
82
0
    uint64_t merged_rows() override { return *(_read_context->merged_rows); }
83
84
144
    RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
85
86
1.03k
    Status current_block_row_locations(std::vector<RowLocation>* locations) override {
87
1.03k
        return _iterator->current_block_row_locations(locations);
88
1.03k
    }
89
90
    void update_profile(RuntimeProfile* profile) override;
91
92
    RowsetReaderSharedPtr clone() override;
93
94
0
    void set_topn_limit(size_t topn_limit) override { _topn_limit = topn_limit; }
95
96
0
    OlapReaderStatistics* get_stats() { return _stats; }
97
98
private:
99
    template <typename T>
100
6.74k
    Status _next_batch(T* block) {
101
6.74k
        RETURN_IF_ERROR(_init_iterator_once());
102
6.74k
        SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
103
6.74k
        if (_empty) {
104
0
            return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
105
0
        }
106
107
6.74k
        RuntimeState* runtime_state = nullptr;
108
6.74k
        if (_read_context != nullptr) {
109
6.74k
            runtime_state = _read_context->runtime_state;
110
6.74k
        }
111
112
6.76k
        do {
113
6.76k
            Status s = _iterator->next_batch(block);
114
6.76k
            if (!s.ok()) {
115
247
                if (!s.is<ErrorCode::END_OF_FILE>()) {
116
0
                    LOG(WARNING) << "failed to read next block: " << s.to_string();
117
0
                }
118
247
                return s;
119
247
            }
120
121
6.51k
            if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
122
0
                return runtime_state->cancel_reason();
123
0
            }
124
6.51k
        } while (block->empty());
125
126
6.49k
        return Status::OK();
127
6.74k
    }
_ZN5doris16BetaRowsetReader11_next_batchINS_5BlockEEENS_6StatusEPT_
Line
Count
Source
100
6.53k
    Status _next_batch(T* block) {
101
6.53k
        RETURN_IF_ERROR(_init_iterator_once());
102
6.53k
        SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
103
6.53k
        if (_empty) {
104
0
            return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
105
0
        }
106
107
6.53k
        RuntimeState* runtime_state = nullptr;
108
6.53k
        if (_read_context != nullptr) {
109
6.53k
            runtime_state = _read_context->runtime_state;
110
6.53k
        }
111
112
6.55k
        do {
113
6.55k
            Status s = _iterator->next_batch(block);
114
6.55k
            if (!s.ok()) {
115
239
                if (!s.is<ErrorCode::END_OF_FILE>()) {
116
0
                    LOG(WARNING) << "failed to read next block: " << s.to_string();
117
0
                }
118
239
                return s;
119
239
            }
120
121
6.31k
            if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
122
0
                return runtime_state->cancel_reason();
123
0
            }
124
6.31k
        } while (block->empty());
125
126
6.29k
        return Status::OK();
127
6.53k
    }
Unexecuted instantiation: _ZN5doris16BetaRowsetReader11_next_batchISt6vectorINS_14IteratorRowRefESaIS3_EEEENS_6StatusEPT_
_ZN5doris16BetaRowsetReader11_next_batchINS_16BlockWithSameBitEEENS_6StatusEPT_
Line
Count
Source
100
209
    Status _next_batch(T* block) {
101
209
        RETURN_IF_ERROR(_init_iterator_once());
102
209
        SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
103
209
        if (_empty) {
104
0
            return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
105
0
        }
106
107
209
        RuntimeState* runtime_state = nullptr;
108
209
        if (_read_context != nullptr) {
109
209
            runtime_state = _read_context->runtime_state;
110
209
        }
111
112
209
        do {
113
209
            Status s = _iterator->next_batch(block);
114
209
            if (!s.ok()) {
115
8
                if (!s.is<ErrorCode::END_OF_FILE>()) {
116
0
                    LOG(WARNING) << "failed to read next block: " << s.to_string();
117
0
                }
118
8
                return s;
119
8
            }
120
121
201
            if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
122
0
                return runtime_state->cancel_reason();
123
0
            }
124
201
        } while (block->empty());
125
126
201
        return Status::OK();
127
209
    }
128
129
    [[nodiscard]] Status _init_iterator_once();
130
    [[nodiscard]] Status _init_iterator();
131
    bool _should_push_down_value_predicates() const;
132
133
8
    int64_t _get_segment_num() const {
134
8
        auto [seg_start, seg_end] = _segment_offsets;
135
8
        if (seg_start == seg_end) {
136
8
            seg_start = 0;
137
8
            seg_end = _rowset->num_segments();
138
8
        }
139
8
        return seg_end - seg_start;
140
8
    }
141
142
    DorisCallOnce<Status> _init_iter_once;
143
144
    std::pair<int64_t, int64_t> _segment_offsets;
145
    std::vector<RowRanges> _segment_row_ranges;
146
147
    // _input_schema: includes return_columns + delete_predicate_columns.
148
    // Used by SegmentIterator internally (iter->schema() returns this). SegmentIterator
149
    // handles the extra delete predicate columns through _current_return_columns and
150
    // _evaluate_short_circuit_predicate(), independent of the block structure.
151
    // e.g. return_columns={c1, c2}, delete_pred on c3 => input_schema={c1, c2, c3}
152
    SchemaSPtr _input_schema;
153
    // _output_schema: includes only return_columns (a subset of input_schema).
154
    // Passed to VMergeIterator/VUnionIterator. block_reset() builds the internal block
155
    // with this schema, and copy_rows() copies exactly these columns to the destination.
156
    // e.g. return_columns={c1, c2} => output_schema={c1, c2}
157
    SchemaSPtr _output_schema;
158
    RowsetReaderContext* _read_context = nullptr;
159
    BetaRowsetSharedPtr _rowset;
160
161
    OlapReaderStatistics _owned_stats;
162
    OlapReaderStatistics* _stats = nullptr;
163
164
    std::unique_ptr<RowwiseIterator> _iterator;
165
166
    StorageReadOptions _read_options;
167
168
    bool _empty = false;
169
    size_t _topn_limit = 0;
170
    uint64_t _merged_rows = 0;
171
};
172
173
} // namespace doris