/root/doris/be/src/olap/tablet_reader.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PaloInternalService_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <stddef.h> |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <set> |
27 | | #include <string> |
28 | | #include <unordered_set> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "agent/be_exec_version_manager.h" |
33 | | #include "common/status.h" |
34 | | #include "exprs/function_filter.h" |
35 | | #include "gutil/strings/substitute.h" |
36 | | #include "io/io_common.h" |
37 | | #include "olap/base_tablet.h" |
38 | | #include "olap/delete_handler.h" |
39 | | #include "olap/iterators.h" |
40 | | #include "olap/olap_common.h" |
41 | | #include "olap/olap_tuple.h" |
42 | | #include "olap/row_cursor.h" |
43 | | #include "olap/rowid_conversion.h" |
44 | | #include "olap/rowset/rowset.h" |
45 | | #include "olap/rowset/rowset_meta.h" |
46 | | #include "olap/rowset/rowset_reader.h" |
47 | | #include "olap/rowset/rowset_reader_context.h" |
48 | | #include "olap/tablet_fwd.h" |
49 | | |
50 | | namespace doris { |
51 | | |
52 | | class RuntimeState; |
53 | | class BitmapFilterFuncBase; |
54 | | class BloomFilterFuncBase; |
55 | | class ColumnPredicate; |
56 | | class DeleteBitmap; |
57 | | class HybridSetBase; |
58 | | class RuntimeProfile; |
59 | | |
60 | | namespace vectorized { |
61 | | class VCollectIterator; |
62 | | class Block; |
63 | | class VExpr; |
64 | | class Arena; |
65 | | class VExprContext; |
66 | | } // namespace vectorized |
67 | | |
68 | | // Used to compare row with input scan key. Scan key only contains key columns, |
69 | | // row contains all key columns, which is superset of key columns. |
70 | | // So we should compare the common prefix columns of lhs and rhs. |
71 | | // |
72 | | // NOTE: if you are not sure if you can use it, please don't use this function. |
73 | 0 | inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) { |
74 | 0 | auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids()); |
75 | 0 | for (uint32_t cid = 0; cid < cmp_cids; ++cid) { |
76 | 0 | auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid)); |
77 | 0 | if (res != 0) { |
78 | 0 | return res; |
79 | 0 | } |
80 | 0 | } |
81 | 0 | return 0; |
82 | 0 | } |
83 | | |
84 | | class TabletReader { |
85 | | struct KeysParam { |
86 | | std::string to_string() const; |
87 | | |
88 | | std::vector<RowCursor> start_keys; |
89 | | std::vector<RowCursor> end_keys; |
90 | | bool start_key_include = false; |
91 | | bool end_key_include = false; |
92 | | }; |
93 | | |
94 | | public: |
95 | | // Params for Reader, |
96 | | // mainly include tablet, data version and fetch range. |
97 | | struct ReaderParams { |
98 | 0 | bool has_single_version() const { |
99 | 0 | return (rs_splits.size() == 1 && |
100 | 0 | rs_splits[0].rs_reader->rowset()->start_version() == 0 && |
101 | 0 | !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) || |
102 | 0 | (rs_splits.size() == 2 && |
103 | 0 | rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 && |
104 | 0 | rs_splits[1].rs_reader->rowset()->start_version() == 2 && |
105 | 0 | !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()); |
106 | 0 | } |
107 | | |
108 | 3 | int get_be_exec_version() const { |
109 | 3 | if (runtime_state) { |
110 | 0 | return runtime_state->be_exec_version(); |
111 | 0 | } |
112 | 3 | return BeExecVersionManager::get_newest_version(); |
113 | 3 | } |
114 | | |
115 | 331 | void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) { |
116 | 331 | rs_splits = std::move(read_source.rs_splits); |
117 | 331 | delete_predicates = std::move(read_source.delete_predicates); |
118 | 331 | if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) { |
119 | 138 | delete_bitmap = std::move(read_source.delete_bitmap); |
120 | 138 | } |
121 | 331 | } |
122 | | |
123 | | BaseTabletSPtr tablet; |
124 | | TabletSchemaSPtr tablet_schema; |
125 | | ReaderType reader_type = ReaderType::READER_QUERY; |
126 | | bool direct_mode = false; |
127 | | bool aggregation = false; |
128 | | // for compaction, schema_change, check_sum: we don't use page cache |
129 | | // for query and config::disable_storage_page_cache is false, we use page cache |
130 | | bool use_page_cache = false; |
131 | | Version version = Version(-1, 0); |
132 | | |
133 | | std::vector<OlapTuple> start_key; |
134 | | std::vector<OlapTuple> end_key; |
135 | | bool start_key_include = false; |
136 | | bool end_key_include = false; |
137 | | |
138 | | std::vector<TCondition> conditions; |
139 | | std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters; |
140 | | std::vector<std::pair<string, std::shared_ptr<BitmapFilterFuncBase>>> bitmap_filters; |
141 | | std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>> in_filters; |
142 | | std::vector<FunctionFilter> function_filters; |
143 | | std::vector<RowsetMetaSharedPtr> delete_predicates; |
144 | | // slots that cast may be eliminated in storage layer |
145 | | std::map<std::string, TypeDescriptor> target_cast_type_for_variants; |
146 | | |
147 | | std::vector<RowSetSplits> rs_splits; |
148 | | // For unique key table with merge-on-write |
149 | | std::shared_ptr<DeleteBitmap> delete_bitmap = nullptr; |
150 | | |
151 | | // return_columns is init from query schema |
152 | | std::vector<uint32_t> return_columns; |
153 | | // output_columns only contain columns in OrderByExprs and outputExprs |
154 | | std::set<int32_t> output_columns; |
155 | | RuntimeProfile* profile = nullptr; |
156 | | RuntimeState* runtime_state = nullptr; |
157 | | |
158 | | // use only in vec exec engine |
159 | | std::vector<uint32_t>* origin_return_columns = nullptr; |
160 | | std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; |
161 | | TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; |
162 | | vectorized::VExpr* remaining_vconjunct_root = nullptr; |
163 | | std::vector<vectorized::VExprSPtr> remaining_conjunct_roots; |
164 | | vectorized::VExprContextSPtrs common_expr_ctxs_push_down; |
165 | | |
166 | | // used for compaction to record row ids |
167 | | bool record_rowids = false; |
168 | | RowIdConversion* rowid_conversion; |
169 | | std::vector<int> topn_filter_source_node_ids; |
170 | | int topn_filter_target_node_id = -1; |
171 | | // used for special optimization for query : ORDER BY key LIMIT n |
172 | | bool read_orderby_key = false; |
173 | | // used for special optimization for query : ORDER BY key DESC LIMIT n |
174 | | bool read_orderby_key_reverse = false; |
175 | | // num of columns for orderby key |
176 | | size_t read_orderby_key_num_prefix_columns = 0; |
177 | | // limit of rows for read_orderby_key |
178 | | size_t read_orderby_key_limit = 0; |
179 | | // filter_block arguments |
180 | | vectorized::VExprContextSPtrs filter_block_conjuncts; |
181 | | |
182 | | // for vertical compaction |
183 | | bool is_key_column_group = false; |
184 | | std::vector<uint32_t> key_group_cluster_key_idxes; |
185 | | |
186 | | bool is_segcompaction = false; |
187 | | |
188 | | std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr; |
189 | | |
190 | | void check_validation() const; |
191 | | |
192 | | std::string to_string() const; |
193 | | |
194 | | int64_t batch_size = -1; |
195 | | }; |
196 | | |
197 | 372 | TabletReader() = default; |
198 | | |
199 | | virtual ~TabletReader(); |
200 | | |
201 | | TabletReader(const TabletReader&) = delete; |
202 | | void operator=(const TabletReader&) = delete; |
203 | | |
204 | | // Initialize TabletReader with tablet, data version and fetch range. |
205 | | virtual Status init(const ReaderParams& read_params); |
206 | | |
207 | | // Read next block with aggregation. |
208 | | // Return OK and set `*eof` to false when next block is read |
209 | | // Return OK and set `*eof` to true when no more rows can be read. |
210 | | // Return others when unexpected error happens. |
211 | 0 | virtual Status next_block_with_aggregation(vectorized::Block* block, bool* eof) { |
212 | 0 | return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>( |
213 | 0 | "TabletReader not support next_block_with_aggregation"); |
214 | 0 | } |
215 | | |
216 | 48 | virtual uint64_t merged_rows() const { return _merged_rows; } |
217 | | |
218 | 137 | uint64_t filtered_rows() const { |
219 | 137 | return _stats.rows_del_filtered + _stats.rows_del_by_bitmap + |
220 | 137 | _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered + |
221 | 137 | _stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered; |
222 | 137 | } |
223 | | |
224 | 0 | void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; } |
225 | | |
226 | 294 | int batch_size() const { return _reader_context.batch_size; } |
227 | | |
228 | 663 | const OlapReaderStatistics& stats() const { return _stats; } |
229 | 0 | OlapReaderStatistics* mutable_stats() { return &_stats; } |
230 | | |
231 | 0 | virtual bool update_profile(RuntimeProfile* profile) { return false; } |
232 | | static Status init_reader_params_and_create_block( |
233 | | TabletSharedPtr tablet, ReaderType reader_type, |
234 | | const std::vector<RowsetSharedPtr>& input_rowsets, |
235 | | TabletReader::ReaderParams* reader_params, vectorized::Block* block); |
236 | | |
237 | | protected: |
238 | | friend class vectorized::VCollectIterator; |
239 | | friend class DeleteHandler; |
240 | | |
241 | | Status _init_params(const ReaderParams& read_params); |
242 | | |
243 | | Status _capture_rs_readers(const ReaderParams& read_params); |
244 | | |
245 | | bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers); |
246 | | |
247 | | Status _init_keys_param(const ReaderParams& read_params); |
248 | | |
249 | | Status _init_orderby_keys_param(const ReaderParams& read_params); |
250 | | |
251 | | Status _init_conditions_param(const ReaderParams& read_params); |
252 | | |
253 | | ColumnPredicate* _parse_to_predicate( |
254 | | const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter); |
255 | | |
256 | | ColumnPredicate* _parse_to_predicate( |
257 | | const std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>& bitmap_filter); |
258 | | |
259 | | ColumnPredicate* _parse_to_predicate( |
260 | | const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter); |
261 | | |
262 | | virtual ColumnPredicate* _parse_to_predicate(const FunctionFilter& function_filter); |
263 | | |
264 | | Status _init_delete_condition(const ReaderParams& read_params); |
265 | | |
266 | | Status _init_return_columns(const ReaderParams& read_params); |
267 | | |
268 | 1.20k | const BaseTabletSPtr& tablet() { return _tablet; } |
269 | | // If original column is a variant type column, and it's predicate is normalized |
270 | | // so in order to get the real type of column predicate, we need to reset type |
271 | | // according to the related type in `target_cast_type_for_variants`.Since variant is not |
272 | | // an predicate applicable type.Otherwise return the original tablet column. |
273 | | // Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column |
274 | | // to type bigint |
275 | | TabletColumn materialize_column(const TabletColumn& orig); |
276 | | |
277 | 240 | const TabletSchema& tablet_schema() { return *_tablet_schema; } |
278 | | |
279 | | std::unique_ptr<vectorized::Arena> _predicate_arena; |
280 | | std::vector<uint32_t> _return_columns; |
281 | | // used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n |
282 | | // columns for orderby keys |
283 | | std::vector<uint32_t> _orderby_key_columns; |
284 | | // only use in outer join which change the column nullable which must keep same in |
285 | | // vec query engine |
286 | | std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; |
287 | | |
288 | | BaseTabletSPtr _tablet; |
289 | | RowsetReaderContext _reader_context; |
290 | | TabletSchemaSPtr _tablet_schema; |
291 | | KeysParam _keys_param; |
292 | | std::vector<bool> _is_lower_keys_included; |
293 | | std::vector<bool> _is_upper_keys_included; |
294 | | std::vector<ColumnPredicate*> _col_predicates; |
295 | | std::vector<ColumnPredicate*> _value_col_predicates; |
296 | | DeleteHandler _delete_handler; |
297 | | |
298 | | bool _aggregation = false; |
299 | | // for agg query, we don't need to finalize when scan agg object data |
300 | | ReaderType _reader_type = ReaderType::READER_QUERY; |
301 | | bool _next_delete_flag = false; |
302 | | bool _delete_sign_available = false; |
303 | | bool _filter_delete = false; |
304 | | int32_t _sequence_col_idx = -1; |
305 | | bool _direct_mode = false; |
306 | | |
307 | | std::vector<uint32_t> _key_cids; |
308 | | std::vector<uint32_t> _value_cids; |
309 | | |
310 | | uint64_t _merged_rows = 0; |
311 | | OlapReaderStatistics _stats; |
312 | | }; |
313 | | |
314 | | } // namespace doris |