be/src/storage/tablet/tablet_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/Descriptors_types.h> |
21 | | #include <gen_cpp/PaloInternalService_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <stddef.h> |
24 | | #include <stdint.h> |
25 | | |
26 | | #include <memory> |
27 | | #include <set> |
28 | | #include <string> |
29 | | #include <unordered_set> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "agent/be_exec_version_manager.h" |
34 | | #include "common/status.h" |
35 | | #include "exprs/function_filter.h" |
36 | | #include "io/io_common.h" |
37 | | #include "storage/delete/delete_handler.h" |
38 | | #include "storage/iterators.h" |
39 | | #include "storage/olap_common.h" |
40 | | #include "storage/olap_tuple.h" |
41 | | #include "storage/predicate/filter_olap_param.h" |
42 | | #include "storage/row_cursor.h" |
43 | | #include "storage/rowid_conversion.h" |
44 | | #include "storage/rowset/rowset.h" |
45 | | #include "storage/rowset/rowset_meta.h" |
46 | | #include "storage/rowset/rowset_reader.h" |
47 | | #include "storage/rowset/rowset_reader_context.h" |
48 | | #include "storage/tablet/base_tablet.h" |
49 | | #include "storage/tablet/tablet_fwd.h" |
50 | | |
51 | | namespace doris { |
52 | | |
53 | | class RuntimeState; |
54 | | class BitmapFilterFuncBase; |
55 | | class BloomFilterFuncBase; |
56 | | class ColumnPredicate; |
57 | | class DeleteBitmap; |
58 | | class HybridSetBase; |
59 | | class RuntimeProfile; |
60 | | |
61 | | class VCollectIterator; |
62 | | class Block; |
63 | | class VExpr; |
64 | | class Arena; |
65 | | class VExprContext; |
66 | | |
67 | | // Used to compare row with input scan key. Scan key only contains key columns, |
68 | | // row contains all key columns, which is superset of key columns. |
69 | | // So we should compare the common prefix columns of lhs and rhs. |
70 | | // |
71 | | // NOTE: if you are not sure if you can use it, please don't use this function. |
72 | 0 | inline int compare_row_key(const RowCursor& lhs, const RowCursor& rhs) { |
73 | 0 | auto cmp_cids = std::min(lhs.field_count(), rhs.field_count()); |
74 | 0 | for (uint32_t cid = 0; cid < cmp_cids; ++cid) { |
75 | 0 | const auto& lf = lhs.field(cid); |
76 | 0 | const auto& rf = rhs.field(cid); |
77 | | // Handle nulls: null < non-null |
78 | 0 | if (lf.is_null() != rf.is_null()) { |
79 | 0 | return lf.is_null() ? -1 : 1; |
80 | 0 | } |
81 | 0 | if (lf.is_null()) { |
82 | 0 | continue; // both null |
83 | 0 | } |
84 | 0 | auto cmp = lf <=> rf; |
85 | 0 | if (cmp < 0) return -1; |
86 | 0 | if (cmp > 0) return 1; |
87 | 0 | } |
88 | 0 | return 0; |
89 | 0 | } |
90 | | |
91 | | class TabletReader { |
92 | | struct KeysParam { |
93 | | std::vector<RowCursor> start_keys; |
94 | | std::vector<RowCursor> end_keys; |
95 | | bool start_key_include = false; |
96 | | bool end_key_include = false; |
97 | | }; |
98 | | |
99 | | public: |
100 | | // Params for Reader, |
101 | | // mainly include tablet, data version and fetch range. |
102 | | struct ReaderParams { |
103 | 0 | bool has_single_version() const { |
104 | 0 | return (rs_splits.size() == 1 && |
105 | 0 | rs_splits[0].rs_reader->rowset()->start_version() == 0 && |
106 | 0 | !rs_splits[0].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()) || |
107 | 0 | (rs_splits.size() == 2 && |
108 | 0 | rs_splits[0].rs_reader->rowset()->rowset_meta()->num_rows() == 0 && |
109 | 0 | rs_splits[1].rs_reader->rowset()->start_version() == 2 && |
110 | 0 | !rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping()); |
111 | 0 | } |
112 | | |
113 | 3 | int get_be_exec_version() const { |
114 | 3 | if (runtime_state) { |
115 | 0 | return runtime_state->be_exec_version(); |
116 | 0 | } |
117 | 3 | return BeExecVersionManager::get_newest_version(); |
118 | 3 | } |
119 | | |
120 | 351 | void set_read_source(TabletReadSource read_source, bool skip_delete_bitmap = false) { |
121 | 351 | rs_splits = std::move(read_source.rs_splits); |
122 | 351 | delete_predicates = std::move(read_source.delete_predicates); |
123 | | #ifndef BE_TEST |
124 | | if (tablet->enable_unique_key_merge_on_write() && !skip_delete_bitmap) { |
125 | | delete_bitmap = std::move(read_source.delete_bitmap); |
126 | | } |
127 | | #endif |
128 | 351 | } |
129 | | |
130 | | BaseTabletSPtr tablet; |
131 | | TabletSchemaSPtr tablet_schema; |
132 | | ReaderType reader_type = ReaderType::READER_QUERY; |
133 | | bool direct_mode = false; |
134 | | bool aggregation = false; |
135 | | // for compaction, schema_change, check_sum: we don't use page cache |
136 | | // for query and config::disable_storage_page_cache is false, we use page cache |
137 | | bool use_page_cache = false; |
138 | | Version version = Version(-1, 0); |
139 | | |
140 | | std::vector<OlapTuple> start_key; |
141 | | std::vector<OlapTuple> end_key; |
142 | | bool start_key_include = false; |
143 | | bool end_key_include = false; |
144 | | |
145 | | std::vector<std::shared_ptr<ColumnPredicate>> predicates; |
146 | | std::vector<FunctionFilter> function_filters; |
147 | | std::vector<RowsetMetaSharedPtr> delete_predicates; |
148 | | // slots that cast may be eliminated in storage layer |
149 | | std::map<std::string, DataTypePtr> target_cast_type_for_variants; |
150 | | |
151 | | std::map<int32_t, TColumnAccessPaths> all_access_paths; |
152 | | std::map<int32_t, TColumnAccessPaths> predicate_access_paths; |
153 | | |
154 | | std::vector<RowSetSplits> rs_splits; |
155 | | // For unique key table with merge-on-write |
156 | | DeleteBitmapPtr delete_bitmap = nullptr; |
157 | | |
158 | | // return_columns is init from query schema |
159 | | std::vector<ColumnId> return_columns; |
160 | | // output_columns only contain columns in OrderByExprs and outputExprs |
161 | | std::set<int32_t> output_columns; |
162 | | RuntimeProfile* profile = nullptr; |
163 | | RuntimeState* runtime_state = nullptr; |
164 | | |
165 | | // use only in vec exec engine |
166 | | std::vector<ColumnId>* origin_return_columns = nullptr; |
167 | | std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr; |
168 | | TPushAggOp::type push_down_agg_type_opt = TPushAggOp::NONE; |
169 | | std::vector<VExprSPtr> remaining_conjunct_roots; |
170 | | VExprContextSPtrs common_expr_ctxs_push_down; |
171 | | |
172 | | // used for compaction to record row ids |
173 | | bool record_rowids = false; |
174 | | RowIdConversion* rowid_conversion = nullptr; |
175 | | std::vector<int> topn_filter_source_node_ids; |
176 | | int topn_filter_target_node_id = -1; |
177 | | // used for special optimization for query : ORDER BY key LIMIT n |
178 | | bool read_orderby_key = false; |
179 | | // used for special optimization for query : ORDER BY key DESC LIMIT n |
180 | | bool read_orderby_key_reverse = false; |
181 | | // num of columns for orderby key |
182 | | size_t read_orderby_key_num_prefix_columns = 0; |
183 | | // limit of rows for read_orderby_key |
184 | | size_t read_orderby_key_limit = 0; |
185 | | // filter_block arguments |
186 | | VExprContextSPtrs filter_block_conjuncts; |
187 | | |
188 | | // for vertical compaction |
189 | | bool is_key_column_group = false; |
190 | | std::vector<uint32_t> key_group_cluster_key_idxes; |
191 | | |
192 | | // For sparse column compaction optimization |
193 | | // When true, use optimized path for sparse wide tables |
194 | | bool enable_sparse_optimization = false; |
195 | | |
196 | | bool is_segcompaction = false; |
197 | | |
198 | | // Enable value predicate pushdown for MOR tables |
199 | | bool enable_mor_value_predicate_pushdown = false; |
200 | | |
201 | | std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = nullptr; |
202 | | |
203 | | void check_validation() const; |
204 | | |
205 | | int64_t batch_size = -1; |
206 | | |
207 | | std::map<ColumnId, VExprContextSPtr> virtual_column_exprs; |
208 | | std::map<ColumnId, size_t> vir_cid_to_idx_in_block; |
209 | | std::map<size_t, DataTypePtr> vir_col_idx_to_type; |
210 | | |
211 | | std::shared_ptr<ScoreRuntime> score_runtime; |
212 | | CollectionStatisticsPtr collection_statistics; |
213 | | std::shared_ptr<segment_v2::AnnTopNRuntime> ann_topn_runtime; |
214 | | |
215 | | uint64_t condition_cache_digest = 0; |
216 | | }; |
217 | | |
218 | 381 | TabletReader() = default; |
219 | | |
220 | 381 | virtual ~TabletReader() = default; |
221 | | |
222 | | TabletReader(const TabletReader&) = delete; |
223 | | void operator=(const TabletReader&) = delete; |
224 | | |
225 | | // Initialize TabletReader with tablet, data version and fetch range. |
226 | | virtual Status init(const ReaderParams& read_params); |
227 | | |
228 | | // Read next block with aggregation. |
229 | | // Return OK and set `*eof` to false when next block is read |
230 | | // Return OK and set `*eof` to true when no more rows can be read. |
231 | | // Return others when unexpected error happens. |
232 | 0 | virtual Status next_block_with_aggregation(Block* block, bool* eof) { |
233 | 0 | return Status::Error<ErrorCode::READER_INITIALIZE_ERROR>( |
234 | 0 | "TabletReader not support next_block_with_aggregation"); |
235 | 0 | } |
236 | | |
237 | 48 | virtual uint64_t merged_rows() const { return _merged_rows; } |
238 | | |
239 | 143 | uint64_t filtered_rows() const { |
240 | 143 | return _stats.rows_del_filtered + _stats.rows_del_by_bitmap + |
241 | 143 | _stats.rows_conditions_filtered + _stats.rows_vec_del_cond_filtered + |
242 | 143 | _stats.rows_vec_cond_filtered + _stats.rows_short_circuit_cond_filtered; |
243 | 143 | } |
244 | | |
245 | 0 | void set_batch_size(int batch_size) { _reader_context.batch_size = batch_size; } |
246 | | |
247 | 294 | int batch_size() const { return _reader_context.batch_size; } |
248 | | |
249 | 1.08k | const OlapReaderStatistics& stats() const { return _stats; } |
250 | 0 | OlapReaderStatistics* mutable_stats() { return &_stats; } |
251 | | |
252 | 0 | virtual void update_profile(RuntimeProfile* profile) {} |
253 | | static Status init_reader_params_and_create_block( |
254 | | TabletSharedPtr tablet, ReaderType reader_type, |
255 | | const std::vector<RowsetSharedPtr>& input_rowsets, |
256 | | TabletReader::ReaderParams* reader_params, Block* block); |
257 | | |
258 | | protected: |
259 | | friend class VCollectIterator; |
260 | | friend class DeleteHandler; |
261 | | |
262 | | Status _init_params(const ReaderParams& read_params); |
263 | | |
264 | | Status _capture_rs_readers(const ReaderParams& read_params); |
265 | | |
266 | | Status _init_keys_param(const ReaderParams& read_params); |
267 | | |
268 | | Status _init_orderby_keys_param(const ReaderParams& read_params); |
269 | | |
270 | | Status _init_conditions_param(const ReaderParams& read_params); |
271 | | |
272 | | virtual std::shared_ptr<ColumnPredicate> _parse_to_predicate( |
273 | | const FunctionFilter& function_filter); |
274 | | |
275 | | Status _init_delete_condition(const ReaderParams& read_params); |
276 | | |
277 | | Status _init_return_columns(const ReaderParams& read_params); |
278 | | |
279 | 1.23k | const BaseTabletSPtr& tablet() { return _tablet; } |
280 | | // If original column is a variant type column, and it's predicate is normalized |
281 | | // so in order to get the real type of column predicate, we need to reset type |
282 | | // according to the related type in `target_cast_type_for_variants`.Since variant is not |
283 | | // an predicate applicable type.Otherwise return the original tablet column. |
284 | | // Eg. `where cast(v:a as bigint) > 1` will elimate cast, and materialize this variant column |
285 | | // to type bigint |
286 | | TabletColumn materialize_column(const TabletColumn& orig); |
287 | | |
288 | 240 | const TabletSchema& tablet_schema() { return *_tablet_schema; } |
289 | | |
290 | | Arena _predicate_arena; |
291 | | std::vector<ColumnId> _return_columns; |
292 | | |
293 | | // used for special optimization for query : ORDER BY key [ASC|DESC] LIMIT n |
294 | | // columns for orderby keys |
295 | | std::vector<uint32_t> _orderby_key_columns; |
296 | | // only use in outer join which change the column nullable which must keep same in |
297 | | // vec query engine |
298 | | std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr; |
299 | | |
300 | | BaseTabletSPtr _tablet; |
301 | | RowsetReaderContext _reader_context; |
302 | | TabletSchemaSPtr _tablet_schema; |
303 | | KeysParam _keys_param; |
304 | | std::vector<bool> _is_lower_keys_included; |
305 | | std::vector<bool> _is_upper_keys_included; |
306 | | std::vector<std::shared_ptr<ColumnPredicate>> _col_predicates; |
307 | | std::vector<std::shared_ptr<ColumnPredicate>> _value_col_predicates; |
308 | | DeleteHandler _delete_handler; |
309 | | |
310 | | // Indicates whether the tablets has do a aggregation in storage engine. |
311 | | bool _aggregation = false; |
312 | | // for agg query, we don't need to finalize when scan agg object data |
313 | | ReaderType _reader_type = ReaderType::READER_QUERY; |
314 | | bool _next_delete_flag = false; |
315 | | bool _delete_sign_available = false; |
316 | | bool _filter_delete = false; |
317 | | int32_t _sequence_col_idx = -1; |
318 | | bool _direct_mode = false; |
319 | | |
320 | | std::vector<uint32_t> _key_cids; |
321 | | std::vector<uint32_t> _value_cids; |
322 | | |
323 | | uint64_t _merged_rows = 0; |
324 | | OlapReaderStatistics _stats; |
325 | | }; |
326 | | |
327 | | } // namespace doris |