be/src/storage/rowset/beta_rowset_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/olap_file.pb.h> |
21 | | #include <stdint.h> |
22 | | |
23 | | #include <memory> |
24 | | #include <utility> |
25 | | #include <vector> |
26 | | |
27 | | #include "common/status.h" |
28 | | #include "core/block/block.h" |
29 | | #include "storage/iterators.h" |
30 | | #include "storage/olap_common.h" |
31 | | #include "storage/rowset/beta_rowset.h" |
32 | | #include "storage/rowset/rowset.h" |
33 | | #include "storage/rowset/rowset_reader.h" |
34 | | #include "storage/schema.h" |
35 | | #include "storage/segment/segment_loader.h" |
36 | | #include "util/once.h" |
37 | | |
38 | | namespace doris { |
39 | | class RuntimeProfile; |
40 | | class Schema; |
41 | | struct RowLocation; |
42 | | struct RowsetReaderContext; |
43 | | |
44 | | class BetaRowsetReader : public RowsetReader { |
45 | | public: |
46 | | BetaRowsetReader(BetaRowsetSharedPtr rowset); |
47 | | |
48 | 884 | ~BetaRowsetReader() override { _rowset->release(); } |
49 | | |
50 | | Status init(RowsetReaderContext* read_context, const RowSetSplits& rs_splits) override; |
51 | | |
52 | | Status get_segment_iterators(RowsetReaderContext* read_context, |
53 | | std::vector<RowwiseIteratorUPtr>* out_iters, |
54 | | bool use_cache = false) override; |
55 | | void reset_read_options() override; |
56 | 6.53k | Status next_batch(Block* block) override { return _next_batch(block); } |
57 | 0 | Status next_batch(BlockView* block_view) override { return _next_batch(block_view); } |
58 | 209 | Status next_batch(BlockWithSameBit* block_with_same_bit) override { |
59 | 209 | return _next_batch(block_with_same_bit); |
60 | 209 | } |
61 | | |
62 | 391 | bool is_merge_iterator() const override { |
63 | 391 | return _read_context->need_ordered_result && |
64 | 391 | _rowset->rowset_meta()->is_segments_overlapping() && _get_segment_num() > 1; |
65 | 391 | } |
66 | | |
67 | 0 | bool delete_flag() override { return _rowset->delete_flag(); } |
68 | | |
69 | 1.03M | Version version() override { return _rowset->version(); } |
70 | | |
71 | 18 | int64_t newest_write_timestamp() override { return _rowset->newest_write_timestamp(); } |
72 | | |
73 | 587k | RowsetSharedPtr rowset() override { return std::dynamic_pointer_cast<Rowset>(_rowset); } |
74 | | |
75 | | // Return the total number of filtered rows, will be used for validation of schema change |
76 | 0 | int64_t filtered_rows() override { |
77 | 0 | return _stats->rows_del_filtered + _stats->rows_del_by_bitmap + |
78 | 0 | _stats->rows_conditions_filtered + _stats->rows_vec_del_cond_filtered + |
79 | 0 | _stats->rows_vec_cond_filtered + _stats->rows_short_circuit_cond_filtered; |
80 | 0 | } |
81 | | |
82 | 0 | uint64_t merged_rows() override { return *(_read_context->merged_rows); } |
83 | | |
84 | 144 | RowsetTypePB type() const override { return RowsetTypePB::BETA_ROWSET; } |
85 | | |
86 | 1.03k | Status current_block_row_locations(std::vector<RowLocation>* locations) override { |
87 | 1.03k | return _iterator->current_block_row_locations(locations); |
88 | 1.03k | } |
89 | | |
90 | | void update_profile(RuntimeProfile* profile) override; |
91 | | |
92 | | RowsetReaderSharedPtr clone() override; |
93 | | |
94 | 0 | void set_topn_limit(size_t topn_limit) override { _topn_limit = topn_limit; } |
95 | | |
96 | 0 | OlapReaderStatistics* get_stats() { return _stats; } |
97 | | |
98 | | private: |
99 | | template <typename T> |
100 | 6.74k | Status _next_batch(T* block) { |
101 | 6.74k | RETURN_IF_ERROR(_init_iterator_once()); |
102 | 6.74k | SCOPED_RAW_TIMER(&_stats->block_fetch_ns); |
103 | 6.74k | if (_empty) { |
104 | 0 | return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty"); |
105 | 0 | } |
106 | | |
107 | 6.74k | RuntimeState* runtime_state = nullptr; |
108 | 6.74k | if (_read_context != nullptr) { |
109 | 6.74k | runtime_state = _read_context->runtime_state; |
110 | 6.74k | } |
111 | | |
112 | 6.76k | do { |
113 | 6.76k | Status s = _iterator->next_batch(block); |
114 | 6.76k | if (!s.ok()) { |
115 | 247 | if (!s.is<ErrorCode::END_OF_FILE>()) { |
116 | 0 | LOG(WARNING) << "failed to read next block: " << s.to_string(); |
117 | 0 | } |
118 | 247 | return s; |
119 | 247 | } |
120 | | |
121 | 6.51k | if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] { |
122 | 0 | return runtime_state->cancel_reason(); |
123 | 0 | } |
124 | 6.51k | } while (block->empty()); |
125 | | |
126 | 6.49k | return Status::OK(); |
127 | 6.74k | } _ZN5doris16BetaRowsetReader11_next_batchINS_5BlockEEENS_6StatusEPT_ Line | Count | Source | 100 | 6.53k | Status _next_batch(T* block) { | 101 | 6.53k | RETURN_IF_ERROR(_init_iterator_once()); | 102 | 6.53k | SCOPED_RAW_TIMER(&_stats->block_fetch_ns); | 103 | 6.53k | if (_empty) { | 104 | 0 | return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty"); | 105 | 0 | } | 106 | | | 107 | 6.53k | RuntimeState* runtime_state = nullptr; | 108 | 6.53k | if (_read_context != nullptr) { | 109 | 6.53k | runtime_state = _read_context->runtime_state; | 110 | 6.53k | } | 111 | | | 112 | 6.55k | do { | 113 | 6.55k | Status s = _iterator->next_batch(block); | 114 | 6.55k | if (!s.ok()) { | 115 | 239 | if (!s.is<ErrorCode::END_OF_FILE>()) { | 116 | 0 | LOG(WARNING) << "failed to read next block: " << s.to_string(); | 117 | 0 | } | 118 | 239 | return s; | 119 | 239 | } | 120 | | | 121 | 6.31k | if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] { | 122 | 0 | return runtime_state->cancel_reason(); | 123 | 0 | } | 124 | 6.31k | } while (block->empty()); | 125 | | | 126 | 6.29k | return Status::OK(); | 127 | 6.53k | } |
Unexecuted instantiation: _ZN5doris16BetaRowsetReader11_next_batchISt6vectorINS_14IteratorRowRefESaIS3_EEEENS_6StatusEPT_ _ZN5doris16BetaRowsetReader11_next_batchINS_16BlockWithSameBitEEENS_6StatusEPT_ Line | Count | Source | 100 | 209 | Status _next_batch(T* block) { | 101 | 209 | RETURN_IF_ERROR(_init_iterator_once()); | 102 | 209 | SCOPED_RAW_TIMER(&_stats->block_fetch_ns); | 103 | 209 | if (_empty) { | 104 | 0 | return Status::Error<ErrorCode::END_OF_FILE>("BetaRowsetReader is empty"); | 105 | 0 | } | 106 | | | 107 | 209 | RuntimeState* runtime_state = nullptr; | 108 | 209 | if (_read_context != nullptr) { | 109 | 209 | runtime_state = _read_context->runtime_state; | 110 | 209 | } | 111 | | | 112 | 209 | do { | 113 | 209 | Status s = _iterator->next_batch(block); | 114 | 209 | if (!s.ok()) { | 115 | 8 | if (!s.is<ErrorCode::END_OF_FILE>()) { | 116 | 0 | LOG(WARNING) << "failed to read next block: " << s.to_string(); | 117 | 0 | } | 118 | 8 | return s; | 119 | 8 | } | 120 | | | 121 | 201 | if (runtime_state != nullptr && runtime_state->is_cancelled()) [[unlikely]] { | 122 | 0 | return runtime_state->cancel_reason(); | 123 | 0 | } | 124 | 201 | } while (block->empty()); | 125 | | | 126 | 201 | return Status::OK(); | 127 | 209 | } |
|
128 | | |
129 | | [[nodiscard]] Status _init_iterator_once(); |
130 | | [[nodiscard]] Status _init_iterator(); |
131 | | bool _should_push_down_value_predicates() const; |
132 | | |
133 | 8 | int64_t _get_segment_num() const { |
134 | 8 | auto [seg_start, seg_end] = _segment_offsets; |
135 | 8 | if (seg_start == seg_end) { |
136 | 8 | seg_start = 0; |
137 | 8 | seg_end = _rowset->num_segments(); |
138 | 8 | } |
139 | 8 | return seg_end - seg_start; |
140 | 8 | } |
141 | | |
142 | | DorisCallOnce<Status> _init_iter_once; |
143 | | |
144 | | std::pair<int64_t, int64_t> _segment_offsets; |
145 | | std::vector<RowRanges> _segment_row_ranges; |
146 | | |
147 | | // _input_schema: includes return_columns + delete_predicate_columns. |
148 | | // Used by SegmentIterator internally (iter->schema() returns this). SegmentIterator |
149 | | // handles the extra delete predicate columns through _current_return_columns and |
150 | | // _evaluate_short_circuit_predicate(), independent of the block structure. |
151 | | // e.g. return_columns={c1, c2}, delete_pred on c3 => input_schema={c1, c2, c3} |
152 | | SchemaSPtr _input_schema; |
153 | | // _output_schema: includes only return_columns (a subset of input_schema). |
154 | | // Passed to VMergeIterator/VUnionIterator. block_reset() builds the internal block |
155 | | // with this schema, and copy_rows() copies exactly these columns to the destination. |
156 | | // e.g. return_columns={c1, c2} => output_schema={c1, c2} |
157 | | SchemaSPtr _output_schema; |
158 | | RowsetReaderContext* _read_context = nullptr; |
159 | | BetaRowsetSharedPtr _rowset; |
160 | | |
161 | | OlapReaderStatistics _owned_stats; |
162 | | OlapReaderStatistics* _stats = nullptr; |
163 | | |
164 | | std::unique_ptr<RowwiseIterator> _iterator; |
165 | | |
166 | | StorageReadOptions _read_options; |
167 | | |
168 | | bool _empty = false; |
169 | | size_t _topn_limit = 0; |
170 | | uint64_t _merged_rows = 0; |
171 | | }; |
172 | | |
173 | | } // namespace doris |