Coverage Report

Created: 2026-06-18 20:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/rowset/beta_rowset_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/olap_file.pb.h>
21
#include <stdint.h>
22
23
#include <memory>
24
#include <utility>
25
#include <vector>
26
27
#include "common/status.h"
28
#include "core/block/block.h"
29
#include "storage/iterators.h"
30
#include "storage/olap_common.h"
31
#include "storage/rowset/beta_rowset.h"
32
#include "storage/rowset/rowset.h"
33
#include "storage/rowset/rowset_reader.h"
34
#include "storage/schema.h"
35
#include "storage/segment/segment_loader.h"
36
#include "util/once.h"
37
38
namespace doris {
39
class RuntimeProfile;
40
class Schema;
41
struct RowLocation;
42
struct RowsetReaderContext;
43
44
class BetaRowsetReader : public RowsetReader {
45
public:
46
    BetaRowsetReader(BetaRowsetSharedPtr rowset);
47
48
914
    ~BetaRowsetReader() override { _rowset->release(); }
49
50
    Status init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) override;
51
52
    Status get_segment_iterators(RowsetReaderContext* read_context,
53
                                 std::vector<RowwiseIteratorUPtr>* out_iters,
54
                                 bool use_cache = false) override;
55
    void reset_read_options() override;
56
6.42k
    Status next_batch(Block* block) override { return _next_batch(block); }
57
0
    Status next_batch(BlockView* block_view) override { return _next_batch(block_view); }
58
209
    Status next_batch(BlockWithSameBit* block_with_same_bit) override {
59
209
        return _next_batch(block_with_same_bit);
60
209
    }
61
62
391
    bool is_merge_iterator() const override {
63
391
        return _read_context->need_ordered_result && _get_segment_num() > 1 &&
64
391
               (_rowset->rowset_meta()->is_segments_overlapping() ||
65
188
                _read_context->force_key_ordered_read);
66
391
    }
67
68
0
    bool delete_flag() override { return _rowset->delete_flag(); }
69
70
1.03M
    Version version() override { return _rowset->version(); }
71
72
18
    int64_t newest_write_timestamp() override { return _rowset->newest_write_timestamp(); }
73
74
587k
    RowsetSharedPtr rowset() override { return std::dynamic_pointer_cast<Rowset>(_rowset); }
75
76
    // Return the total number of filtered rows, will be used for validation of schema change
77
0
    int64_t filtered_rows() override {
78
0
        return _stats->rows_del_filtered + _stats->rows_del_by_bitmap +
79
0
               _stats->rows_conditions_filtered + _stats->rows_vec_del_cond_filtered +
80
0
               _stats->rows_vec_cond_filtered + _stats->rows_short_circuit_cond_filtered;
81
0
    }
82
83
0
    uint64_t merged_rows() override { return *(_read_context->merged_rows); }
84
85
144
    RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; }
86
87
1.03k
    Status current_block_row_locations(std::vector<RowLocation>* locations) override {
88
1.03k
        return _iterator->current_block_row_locations(locations);
89
1.03k
    }
90
91
    void update_profile(RuntimeProfile* profile) override;
92
93
    RowsetReaderSharedPtr clone() override;
94
95
0
    void set_topn_limit(size_t topn_limit) override { _topn_limit = topn_limit; }
96
97
0
    OlapReaderStatistics* get_stats() { return _stats; }
98
99
private:
100
    template <typename T>
101
6.63k
    Status _next_batch(T* block) {
102
6.63k
        RETURN_IF_ERROR(_init_iterator_once());
103
6.63k
        SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
104
6.63k
        if (_empty) {
105
0
            return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
106
0
        }
107
108
6.63k
        RuntimeState* runtime_state = nullptr;
109
6.63k
        if (_read_context != nullptr) {
110
6.63k
            runtime_state = _read_context->runtime_state;
111
6.63k
        }
112
113
6.65k
        do {
114
6.65k
            Status s = _iterator->next_batch(block);
115
6.65k
            if (!s.ok()) {
116
247
                if (!s.is<ErrorCode::END_OF_FILE>()) {
117
0
                    LOG(WARNING) << "failed to read next block: " << s.to_string();
118
0
                }
119
247
                return s;
120
247
            }
121
122
6.40k
            if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
123
0
                return runtime_state->cancel_reason();
124
0
            }
125
6.40k
        } while (block->empty());
126
127
6.38k
        return Status::OK();
128
6.63k
    }
_ZN5doris16BetaRowsetReader11_next_batchINS_5BlockEEENS_6StatusEPT_
Line
Count
Source
101
6.42k
    Status _next_batch(T* block) {
102
6.42k
        RETURN_IF_ERROR(_init_iterator_once());
103
6.42k
        SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
104
6.42k
        if (_empty) {
105
0
            return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
106
0
        }
107
108
6.42k
        RuntimeState* runtime_state = nullptr;
109
6.42k
        if (_read_context != nullptr) {
110
6.42k
            runtime_state = _read_context->runtime_state;
111
6.42k
        }
112
113
6.44k
        do {
114
6.44k
            Status s = _iterator->next_batch(block);
115
6.44k
            if (!s.ok()) {
116
239
                if (!s.is<ErrorCode::END_OF_FILE>()) {
117
0
                    LOG(WARNING) << "failed to read next block: " << s.to_string();
118
0
                }
119
239
                return s;
120
239
            }
121
122
6.20k
            if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
123
0
                return runtime_state->cancel_reason();
124
0
            }
125
6.20k
        } while (block->empty());
126
127
6.18k
        return Status::OK();
128
6.42k
    }
Unexecuted instantiation: _ZN5doris16BetaRowsetReader11_next_batchISt6vectorINS_14IteratorRowRefESaIS3_EEEENS_6StatusEPT_
_ZN5doris16BetaRowsetReader11_next_batchINS_16BlockWithSameBitEEENS_6StatusEPT_
Line
Count
Source
101
209
    Status _next_batch(T* block) {
102
209
        RETURN_IF_ERROR(_init_iterator_once());
103
209
        SCOPED_RAW_TIMER(&_stats->block_fetch_ns);
104
209
        if (_empty) {
105
0
            return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty");
106
0
        }
107
108
209
        RuntimeState* runtime_state = nullptr;
109
209
        if (_read_context != nullptr) {
110
209
            runtime_state = _read_context->runtime_state;
111
209
        }
112
113
209
        do {
114
209
            Status s = _iterator->next_batch(block);
115
209
            if (!s.ok()) {
116
8
                if (!s.is<ErrorCode::END_OF_FILE>()) {
117
0
                    LOG(WARNING) << "failed to read next block: " << s.to_string();
118
0
                }
119
8
                return s;
120
8
            }
121
122
201
            if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] {
123
0
                return runtime_state->cancel_reason();
124
0
            }
125
201
        } while (block->empty());
126
127
201
        return Status::OK();
128
209
    }
129
130
    [[nodiscard]] Status _init_iterator_once();
131
    [[nodiscard]] Status _init_iterator();
132
    bool _should_push_down_value_predicates() const;
133
134
296
    int64_t _get_segment_num() const {
135
296
        auto [seg_start, seg_end] = _segment_offsets;
136
296
        if (seg_start == seg_end) {
137
296
            seg_start = 0;
138
296
            seg_end = _rowset->num_segments();
139
296
        }
140
296
        return seg_end - seg_start;
141
296
    }
142
143
    DorisCallOnce<Status> _init_iter_once;
144
145
    std::pair<int64_t, int64_t> _segment_offsets;
146
    std::vector<RowRanges> _segment_row_ranges;
147
148
    // _input_schema: includes return_columns + delete_predicate_columns.
149
    // Used by SegmentIterator internally (iter->schema() returns this). SegmentIterator
150
    // handles the extra delete predicate columns through _current_return_columns and
151
    // _evaluate_short_circuit_predicate(), independent of the block structure.
152
    // e.g. return_columns={c1, c2}, delete_pred on c3 => input_schema={c1, c2, c3}
153
    SchemaSPtr _input_schema;
154
    // _output_schema: includes only return_columns (a subset of input_schema).
155
    // Passed to VMergeIterator/VUnionIterator. block_reset() builds the internal block
156
    // with this schema, and copy_rows() copies exactly these columns to the destination.
157
    // e.g. return_columns={c1, c2} => output_schema={c1, c2}
158
    SchemaSPtr _output_schema;
159
    RowsetReaderContext* _read_context = nullptr;
160
    BetaRowsetSharedPtr _rowset;
161
162
    OlapReaderStatistics _owned_stats;
163
    OlapReaderStatistics* _stats = nullptr;
164
165
    std::unique_ptr<RowwiseIterator> _iterator;
166
167
    StorageReadOptions _read_options;
168
169
    bool _empty = false;
170
    size_t _topn_limit = 0;
171
    uint64_t _merged_rows = 0;
172
};
173
174
} // namespace doris