/root/doris/be/src/olap/tablet_reader.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PaloInternalService_types.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <stddef.h> |
23 | | #include <stdint.h> |
24 | | |
25 | | #include <memory> |
26 | | #include <set> |
27 | | #include <string> |
28 | | #include <unordered_set> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "agent/be_exec_version_manager.h" |
33 | | #include "common/status.h" |
34 | | #include "exprs/function_filter.h" |
35 | | #include "gutil/strings/substitute.h" |
36 | | #include "io/io_common.h" |
37 | | #include "olap/delete_handler.h" |
38 | | #include "olap/iterators.h" |
39 | | #include "olap/olap_common.h" |
40 | | #include "olap/olap_tuple.h" |
41 | | #include "olap/row_cursor.h" |
42 | | #include "olap/rowid_conversion.h" |
43 | | #include "olap/rowset/rowset.h" |
44 | | #include "olap/rowset/rowset_meta.h" |
45 | | #include "olap/rowset/rowset_reader.h" |
46 | | #include "olap/rowset/rowset_reader_context.h" |
47 | | #include "olap/tablet_fwd.h" |
48 | | |
49 | | namespace doris { |
50 | | |
51 | | class RuntimeState; |
52 | | class BitmapFilterFuncBase; |
53 | | class BloomFilterFuncBase; |
54 | | class ColumnPredicate; |
55 | | class DeleteBitmap; |
56 | | class HybridSetBase; |
57 | | class RuntimeProfile; |
58 | | |
59 | | namespace vectorized { |
60 | | class VCollectIterator; |
61 | | class Block; |
62 | | class VExpr; |
63 | | class Arena; |
64 | | class VExprContext; |
65 | | } // namespace vectorized |
66 | | |
67 | | // Used to compare row with input scan key. Scan key only contains key columns, |
68 | | // row contains all key columns, which is superset of key columns. |
69 | | // So we should compare the common prefix columns of lhs and rhs. |
70 | | // |
71 | | // NOTE: if you are not sure if you can use it, please don't use this function. |
72 | 0 | inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) { |
73 | 0 | auto cmp_cids = std::min(lhs.schema()->num_column_ids(), rhs.schema()->num_column_ids()); |
74 | 0 | for (uint32_t cid = 0; cid < cmp_cids; ++cid) { |
75 | 0 | auto res = lhs.schema()->column(cid)->compare_cell(lhs.cell(cid), rhs.cell(cid)); |
76 | 0 | if (res != 0) { |
77 | 0 | return res; |
78 | 0 | } |
79 | 0 | } |
80 | 0 | return 0; |
81 | 0 | } |
82 | | |
83 | | class TabletReader { |
84 | | struct KeysParam { |
85 | | std::string to_string() const; |
86 | | |
87 | | std::vector<RowCursor> start_keys; |
88 | | std::vector<RowCursor> end_keys; |
89 | | bool start_key_include = false; |
90 | | bool end_key_include = false; |
91 | | }; |
92 | | |
93 | | public: |
94 | | struct ReadSource { |
95 | | std::vector<RowSetSplits> rs_splits; |
96 | | std::vector<RowsetMetaSharedPtr> delete_predicates; |
97 | | // Fill delete predicates with `rs_splits` |
98 | | void fill_delete_predicates(); |
99 | | }; |
100 | | // Params for Reader, |
101 | | // mainly include tablet, data version and fetch range. |
102 | | struct ReaderParams { |
103 | 0 | bool has_single_version() const { |
104 | 0 | return (rs_splits.size() == 1 && |
105 | 0 | rs_splits[0].rs_reader->rowset()->start_version() == 0 && |
106 | 0 | !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) || |
107 | 0 | (rs_splits.size() == 2 && |
108 | 0 | rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 && |
109 | 0 | rs_splits[1].rs_reader->rowset()->start_version() == 2 && |
110 | 0 | !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()); |
111 | 0 | } |
112 | | |
113 | 3 | int get_be_exec_version() const { |
114 | 3 | if (runtime_state) { |
115 | 0 | return runtime_state->be_exec_version(); |
116 | 0 | } |
117 | 3 | return BeExecVersionManager::get_newest_version(); |
118 | 3 | } |
119 | | |
120 | 141 | void set_read_source(ReadSource read_source) { |
121 | 141 | rs_splits = std::move(read_source.rs_splits); |
122 | 141 | delete_predicates = std::move(read_source.delete_predicates); |
123 | 141 | } |
124 | | |
125 | | BaseTabletSPtr tablet; |
126 | | TabletSchemaSPtr tablet_schema; |
127 | | ReaderType reader_type = ReaderType::READER_QUERY; |
128 | | bool direct_mode = false; |
129 | | bool aggregation = false; |
130 | | // for compaction, schema_change, check_sum: we don't use page cache |
131 | | // for query and config::disable_storage_page_cache is false, we use page cache |
132 | | bool use_page_cache = false; |
133 | | Version version = Version(-1, 0); |
134 | | |
135 | | std::vector<OlapTuple> start_key; |
136 | | std::vector<OlapTuple> end_key; |
137 | | bool start_key_include = false; |
138 | | bool end_key_include = false; |
139 | | |
140 | | std::vector<TCondition> conditions; |
141 | | std::vector<std::pair<string, std::shared_ptr<BloomFilterFuncBase>>> bloom_filters; |
142 | | std::vector<std::pair<string, std::shared_ptr<BitmapFilterFuncBase>>> bitmap_filters; |
143 | | std::vector<std::pair<string, std::shared_ptr<HybridSetBase>>> in_filters; |
144 | | std::vector<FunctionFilter> function_filters; |
145 | | std::vector<RowsetMetaSharedPtr> delete_predicates; |
146 | | // slots that cast may be eliminated in storage layer |
147 | | std::map<std::string, TypeDescriptor> target_cast_type_for_variants; |
148 | | |
149 | | std::vector<RowSetSplits> rs_splits; |
150 | | // For unique key table with merge-on-write |
151 | | DeleteBitmap* delete_bitmap = nullptr; |
152 | | |
153 | | // return_columns is init from query schema |
154 | | std::vector<uint32_t> return_columns; |
155 | | // output_columns only contain columns in OrderByExprs and outputExprs |
156 | | std::set<int32_t> output_columns; |
157 | | RuntimeProfile* profile = nullptr; |
158 | | RuntimeState* runtime_state = nullptr; |
159 | | |
160 | | // use only in vec exec engine |
161 | | std::vector<uint32_t>* origin_return_columns = nullptr; |
162 | | std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; |
163 | | TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; |
164 | | vectorized::VExpr* remaining_vconjunct_root = nullptr; |
165 | | std::vector<vectorized::VExprSPtr> remaining_conjunct_roots; |
166 | | vectorized::VExprContextSPtrs common_expr_ctxs_push_down; |
167 | | |
168 | | // used for compaction to record row ids |
169 | | bool record_rowids = false; |
170 | | RowIdConversion* rowid_conversion = nullptr; |
171 | | std::vector<int> topn_filter_source_node_ids; |
172 | | int topn_filter_target_node_id = -1; |
173 | | // used for special optimization for query : ORDER BY key LIMIT n |
174 | | bool read_orderby_key = false; |
175 | | // used for special optimization for query : ORDER BY key DESC LIMIT n |
176 | | bool read_orderby_key_reverse = false; |
177 | | // num of columns for orderby key |
178 | | size_t read_orderby_key_num_prefix_columns = 0; |
179 | | // limit of rows for read_orderby_key |
180 | | size_t read_orderby_key_limit = 0; |
181 | | // filter_block arguments |
182 | | vectorized::VExprContextSPtrs filter_block_conjuncts; |
183 | | |
184 | | // for vertical compaction |
185 | | bool is_key_column_group = false; |
186 | | std::vector<uint32_t> key_group_cluster_key_idxes; |
187 | | |
188 | | bool is_segcompaction = false; |
189 | | |
190 | | std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr; |
191 | | |
192 | | void check_validation() const; |
193 | | |
194 | | std::string to_string() const; |
195 | | |
196 | | int64_t batch_size = -1; |
197 | | }; |
198 | | |
199 | 163 | TabletReader() = default; |
200 | | |
201 | | virtual ~TabletReader(); |
202 | | |
203 | | TabletReader(const TabletReader&) = delete; |
204 | | void operator=(const TabletReader&) = delete; |
205 | | |
206 | | // Initialize TabletReader with tablet, data version and fetch range. |
207 | | virtual Status init(const ReaderParams& read_params); |
208 | | |
209 | | // Read next block with aggregation. |
210 | | // Return OK and set `*eof` to false when next block is read |
211 | | // Return OK and set `*eof` to true when no more rows can be read. |
212 | | // Return others when unexpected error happens. |
213 | 0 | virtual Status next_block_with_aggregation(vectorized::Block* block, bool* eof) { |
214 | 0 | return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>( |
215 | 0 | "TabletReader not support next_block_with_aggregation"); |
216 | 0 | } |
217 | | |
218 | 48 | virtual uint64_t merged_rows() const { return _merged_rows; } |
219 | | |
220 | 108 | uint64_t filtered_rows() const { |
221 | 108 | return _stats.rows_del_filtered + _stats.rows_del_by_bitmap + |
222 | 108 | _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered + |
223 | 108 | _stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered; |
224 | 108 | } |
225 | | |
226 | 0 | void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; } |
227 | | |
228 | 294 | int batch_size() const { return _reader_context.batch_size; } |
229 | | |
230 | 0 | const OlapReaderStatistics& stats() const { return _stats; } |
231 | 0 | OlapReaderStatistics* mutable_stats() { return &_stats; } |
232 | | |
233 | 0 | virtual bool update_profile(RuntimeProfile* profile) { return false; } |
234 | | static Status init_reader_params_and_create_block( |
235 | | TabletSharedPtr tablet, ReaderType reader_type, |
236 | | const std::vector<RowsetSharedPtr>& input_rowsets, |
237 | | TabletReader::ReaderParams* reader_params, vectorized::Block* block); |
238 | | |
239 | | protected: |
240 | | friend class vectorized::VCollectIterator; |
241 | | friend class DeleteHandler; |
242 | | |
243 | | Status _init_params(const ReaderParams& read_params); |
244 | | |
245 | | Status _capture_rs_readers(const ReaderParams& read_params); |
246 | | |
247 | | bool _optimize_for_single_rowset(const std::vector<RowsetReaderSharedPtr>& rs_readers); |
248 | | |
249 | | Status _init_keys_param(const ReaderParams& read_params); |
250 | | |
251 | | Status _init_orderby_keys_param(const ReaderParams& read_params); |
252 | | |
253 | | Status _init_conditions_param(const ReaderParams& read_params); |
254 | | |
255 | | ColumnPredicate* _parse_to_predicate( |
256 | | const std::pair<std::string, std::shared_ptr<BloomFilterFuncBase>>& bloom_filter); |
257 | | |
258 | | ColumnPredicate* _parse_to_predicate( |
259 | | const std::pair<std::string, std::shared_ptr<BitmapFilterFuncBase>>& bitmap_filter); |
260 | | |
261 | | ColumnPredicate* _parse_to_predicate( |
262 | | const std::pair<std::string, std::shared_ptr<HybridSetBase>>& in_filter); |
263 | | |
264 | | virtual ColumnPredicate* _parse_to_predicate(const FunctionFilter& function_filter); |
265 | | |
266 | | Status _init_delete_condition(const ReaderParams& read_params); |
267 | | |
268 | | Status _init_return_columns(const ReaderParams& read_params); |
269 | | |
270 | 545 | const BaseTabletSPtr& tablet() { return _tablet; } |
271 | | // If original column is a variant type column, and it's predicate is normalized |
272 | | // so in order to get the real type of column predicate, we need to reset type |
273 | | // according to the related type in `target_cast_type_for_variants`.Since variant is not |
274 | | // an predicate applicable type.Otherwise return the original tablet column. |
275 | | // Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column |
276 | | // to type bigint |
277 | | TabletColumn materialize_column(const TabletColumn& orig); |
278 | | |
279 | 240 | const TabletSchema& tablet_schema() { return *_tablet_schema; } |
280 | | |
281 | | std::unique_ptr<vectorized::Arena> _predicate_arena; |
282 | | std::vector<uint32_t> _return_columns; |
283 | | // used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n |
284 | | // columns for orderby keys |
285 | | std::vector<uint32_t> _orderby_key_columns; |
286 | | // only use in outer join which change the column nullable which must keep same in |
287 | | // vec query engine |
288 | | std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; |
289 | | |
290 | | BaseTabletSPtr _tablet; |
291 | | RowsetReaderContext _reader_context; |
292 | | TabletSchemaSPtr _tablet_schema; |
293 | | KeysParam _keys_param; |
294 | | std::vector<bool> _is_lower_keys_included; |
295 | | std::vector<bool> _is_upper_keys_included; |
296 | | std::vector<ColumnPredicate*> _col_predicates; |
297 | | std::vector<ColumnPredicate*> _value_col_predicates; |
298 | | DeleteHandler _delete_handler; |
299 | | |
300 | | bool _aggregation = false; |
301 | | // for agg query, we don't need to finalize when scan agg object data |
302 | | ReaderType _reader_type = ReaderType::READER_QUERY; |
303 | | bool _next_delete_flag = false; |
304 | | bool _delete_sign_available = false; |
305 | | bool _filter_delete = false; |
306 | | int32_t _sequence_col_idx = -1; |
307 | | bool _direct_mode = false; |
308 | | |
309 | | std::vector<uint32_t> _key_cids; |
310 | | std::vector<uint32_t> _value_cids; |
311 | | |
312 | | uint64_t _merged_rows = 0; |
313 | | OlapReaderStatistics _stats; |
314 | | }; |
315 | | |
316 | | } // namespace doris |