/root/doris/be/src/exprs/vexpr_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstddef> |
24 | | #include <memory> |
25 | | #include <unordered_map> |
26 | | #include <utility> |
27 | | #include <vector> |
28 | | |
29 | | #include "common/factory_creator.h" |
30 | | #include "common/status.h" |
31 | | #include "core/block/block.h" |
32 | | #include "core/block/column_with_type_and_name.h" |
33 | | #include "core/column/column.h" |
34 | | #include "exec/runtime_filter/runtime_filter_selectivity.h" |
35 | | #include "exprs/function_context.h" |
36 | | #include "exprs/vexpr_fwd.h" |
37 | | #include "runtime/runtime_state.h" |
38 | | #include "storage/index/ann/ann_range_search_runtime.h" |
39 | | #include "storage/index/ann/ann_search_params.h" |
40 | | #include "storage/index/inverted/inverted_index_reader.h" |
41 | | #include "storage/segment/column_reader.h" |
42 | | |
43 | | namespace doris { |
44 | | class RowDescriptor; |
45 | | class RuntimeState; |
46 | | } // namespace doris |
47 | | |
48 | | namespace doris::segment_v2 { |
49 | | class Segment; |
50 | | class ColumnIterator; |
51 | | } // namespace doris::segment_v2 |
52 | | |
53 | | namespace doris { |
54 | | |
55 | | class ScoreRuntime; |
56 | | using ScoreRuntimeSPtr = std::shared_ptr<ScoreRuntime>; |
57 | | |
58 | | class IndexExecContext { |
59 | | public: |
60 | | IndexExecContext(const std::vector<ColumnId>& col_ids, |
61 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& index_iterators, |
62 | | const std::vector<IndexFieldNameAndTypePair>& storage_name_and_type_vec, |
63 | | std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& |
64 | | common_expr_index_status, |
65 | | ScoreRuntimeSPtr score_runtime, segment_v2::Segment* segment, |
66 | | const segment_v2::ColumnIteratorOptions& column_iter_opts) |
67 | 5.63k | : _col_ids(col_ids), |
68 | 5.63k | _index_iterators(index_iterators), |
69 | 5.63k | _storage_name_and_type(storage_name_and_type_vec), |
70 | 5.63k | _expr_index_status(common_expr_index_status), |
71 | 5.63k | _score_runtime(std::move(score_runtime)), |
72 | 5.63k | _segment(segment), |
73 | 5.63k | _column_iter_opts(column_iter_opts) {} |
74 | | |
75 | 3 | segment_v2::IndexIterator* get_inverted_index_iterator_by_column_id(int column_index) const { |
76 | 3 | if (column_index < 0 || column_index >= _col_ids.size()) { |
77 | 0 | return nullptr; |
78 | 0 | } |
79 | 3 | const auto& column_id = _col_ids[column_index]; |
80 | 3 | if (column_id >= _index_iterators.size()) { |
81 | 0 | return nullptr; |
82 | 0 | } |
83 | 3 | if (!_index_iterators[column_id]) { |
84 | 1 | return nullptr; |
85 | 1 | } |
86 | 2 | return _index_iterators[column_id].get(); |
87 | 3 | } |
88 | | |
89 | 0 | segment_v2::IndexIterator* get_inverted_index_iterator_by_id(ColumnId column_id) const { |
90 | 0 | if (column_id >= _index_iterators.size()) { |
91 | 0 | return nullptr; |
92 | 0 | } |
93 | 0 | if (!_index_iterators[column_id]) { |
94 | 0 | return nullptr; |
95 | 0 | } |
96 | 0 | return _index_iterators[column_id].get(); |
97 | 0 | } |
98 | | |
99 | | const IndexFieldNameAndTypePair* get_storage_name_and_type_by_column_id( |
100 | 3 | int column_index) const { |
101 | 3 | if (column_index < 0 || column_index >= _col_ids.size()) { |
102 | 0 | return nullptr; |
103 | 0 | } |
104 | 3 | const auto& column_id = _col_ids[column_index]; |
105 | 3 | if (column_id >= _storage_name_and_type.size()) { |
106 | 2 | return nullptr; |
107 | 2 | } |
108 | 1 | return &_storage_name_and_type[column_id]; |
109 | 3 | } |
110 | | |
111 | 0 | const IndexFieldNameAndTypePair* get_storage_name_and_type_by_id(ColumnId column_id) const { |
112 | 0 | if (column_id >= _storage_name_and_type.size()) { |
113 | 0 | return nullptr; |
114 | 0 | } |
115 | 0 | return &_storage_name_and_type[column_id]; |
116 | 0 | } |
117 | | |
118 | 0 | int column_index_by_id(ColumnId column_id) const { |
119 | 0 | for (int i = 0; i < _col_ids.size(); ++i) { |
120 | 0 | if (_col_ids[i] == column_id) { |
121 | 0 | return i; |
122 | 0 | } |
123 | 0 | } |
124 | 0 | return -1; |
125 | 0 | } |
126 | | |
127 | 0 | bool get_column_id(int column_index, ColumnId* column_id) const { |
128 | 0 | if (column_id == nullptr) { |
129 | 0 | return false; |
130 | 0 | } |
131 | 0 | if (column_index < 0 || column_index >= _col_ids.size()) { |
132 | 0 | return false; |
133 | 0 | } |
134 | 0 | *column_id = _col_ids[column_index]; |
135 | 0 | return true; |
136 | 0 | } |
137 | | |
138 | 0 | segment_v2::Segment* segment() const { return _segment; } |
139 | | |
140 | 0 | const segment_v2::ColumnIteratorOptions& column_iter_opts() const { return _column_iter_opts; } |
141 | | |
142 | 1 | bool has_index_result_for_expr(const VExpr* expr) const { |
143 | 1 | return _index_result_bitmap.contains(expr); |
144 | 1 | } |
145 | | |
146 | | void set_index_result_for_expr(const VExpr* expr, |
147 | 1 | segment_v2::InvertedIndexResultBitmap bitmap) { |
148 | 1 | _index_result_bitmap[expr] = std::move(bitmap); |
149 | 1 | } |
150 | | |
151 | | std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap>& |
152 | 0 | get_index_result_bitmap() { |
153 | 0 | return _index_result_bitmap; |
154 | 0 | } |
155 | | |
156 | 2 | std::unordered_map<const VExpr*, ColumnPtr>& get_index_result_column() { |
157 | 2 | return _index_result_column; |
158 | 2 | } |
159 | | |
160 | 0 | const segment_v2::InvertedIndexResultBitmap* get_index_result_for_expr(const VExpr* expr) { |
161 | 0 | auto iter = _index_result_bitmap.find(expr); |
162 | 0 | if (iter == _index_result_bitmap.end()) { |
163 | 0 | return nullptr; |
164 | 0 | } |
165 | 0 | return &iter->second; |
166 | 0 | } |
167 | | |
168 | 1 | void set_index_result_column_for_expr(const VExpr* expr, ColumnPtr column) { |
169 | 1 | _index_result_column[expr] = std::move(column); |
170 | 1 | } |
171 | | |
172 | 0 | void set_true_for_index_status(const VExpr* expr, int column_index) { |
173 | 0 | if (column_index < 0 || column_index >= _col_ids.size()) { |
174 | 0 | return; |
175 | 0 | } |
176 | 0 | const auto& column_id = _col_ids[column_index]; |
177 | 0 | if (_expr_index_status.contains(column_id)) { |
178 | 0 | if (_expr_index_status[column_id].contains(expr)) { |
179 | 0 | _expr_index_status[column_id][expr] = true; |
180 | 0 | } |
181 | 0 | } |
182 | 0 | } |
183 | | |
184 | 0 | ScoreRuntimeSPtr get_score_runtime() const { return _score_runtime; } |
185 | | |
186 | 0 | void set_analyzer_ctx_for_expr(const VExpr* expr, InvertedIndexAnalyzerCtxSPtr analyzer_ctx) { |
187 | 0 | if (expr == nullptr || analyzer_ctx == nullptr) { |
188 | 0 | return; |
189 | 0 | } |
190 | 0 | _expr_analyzer_ctx[expr] = std::move(analyzer_ctx); |
191 | 0 | } |
192 | | |
193 | 0 | const InvertedIndexAnalyzerCtx* get_analyzer_ctx_for_expr(const VExpr* expr) const { |
194 | 0 | auto iter = _expr_analyzer_ctx.find(expr); |
195 | 0 | if (iter == _expr_analyzer_ctx.end()) { |
196 | 0 | return nullptr; |
197 | 0 | } |
198 | 0 | return iter->second.get(); |
199 | 0 | } |
200 | | |
201 | | private: |
202 | | // A reference to a vector of column IDs for the current expression's output columns. |
203 | | const std::vector<ColumnId>& _col_ids; |
204 | | |
205 | | // A reference to a vector of unique pointers to index iterators. |
206 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& _index_iterators; |
207 | | |
208 | | // A reference to a vector of storage name and type pairs related to schema. |
209 | | const std::vector<IndexFieldNameAndTypePair>& _storage_name_and_type; |
210 | | |
211 | | // A map of expressions to their corresponding inverted index result bitmaps. |
212 | | std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap> _index_result_bitmap; |
213 | | |
214 | | // A map of expressions to their corresponding result columns. |
215 | | std::unordered_map<const VExpr*, ColumnPtr> _index_result_column; |
216 | | |
217 | | // Per-expression analyzer context for inverted index evaluation. |
218 | | std::unordered_map<const VExpr*, InvertedIndexAnalyzerCtxSPtr> _expr_analyzer_ctx; |
219 | | |
220 | | // A reference to a map of common expressions to their inverted index evaluation status. |
221 | | std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& _expr_index_status; |
222 | | |
223 | | ScoreRuntimeSPtr _score_runtime; |
224 | | |
225 | | segment_v2::Segment* _segment = nullptr; // Ref |
226 | | segment_v2::ColumnIteratorOptions _column_iter_opts; |
227 | | }; |
228 | | |
229 | | class VExprContext { |
230 | | ENABLE_FACTORY_CREATOR(VExprContext); |
231 | | |
232 | | public: |
233 | 655k | VExprContext(VExprSPtr expr) : _root(std::move(expr)) {} |
234 | | ~VExprContext(); |
235 | | [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc); |
236 | | [[nodiscard]] Status open(RuntimeState* state); |
237 | | [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx); |
238 | | [[nodiscard]] Status execute(Block* block, int* result_column_id); |
239 | | [[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column); |
240 | | [[nodiscard]] Status execute(const Block* block, ColumnWithTypeAndName& result_data); |
241 | | [[nodiscard]] DataTypePtr execute_type(const Block* block); |
242 | | [[nodiscard]] const std::string& expr_name() const; |
243 | | [[nodiscard]] bool is_blockable() const; |
244 | | |
245 | | [[nodiscard]] Status execute_const_expr(ColumnWithTypeAndName& result); |
246 | | |
247 | | double execute_cost() const; |
248 | | |
249 | 721k | VExprSPtr root() { return _root; } |
250 | 0 | void set_root(const VExprSPtr& expr) { _root = expr; } |
251 | 7 | void set_index_context(std::shared_ptr<IndexExecContext> index_context) { |
252 | 7 | _index_context = std::move(index_context); |
253 | 7 | } |
254 | | |
255 | 46 | std::shared_ptr<IndexExecContext> get_index_context() const { return _index_context; } |
256 | | |
257 | | /// Creates a FunctionContext, and returns the index that's passed to fn_context() to |
258 | | /// retrieve the created context. Exprs that need a FunctionContext should call this in |
259 | | /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the |
260 | | /// size of the varargs buffer in the created FunctionContext (see udf-internal.h). |
261 | | int register_function_context(RuntimeState* state, const DataTypePtr& return_type, |
262 | | const std::vector<DataTypePtr>& arg_types); |
263 | | |
264 | | /// Retrieves a registered FunctionContext. 'i' is the index returned by the call to |
265 | | /// register_function_context(). This should only be called by VExprs. |
266 | 223 | FunctionContext* fn_context(int i) { |
267 | 223 | if (i < 0 || i >= _fn_contexts.size()) { |
268 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
269 | 0 | "fn_context index invalid, index={}, _fn_contexts.size()={}", i, |
270 | 0 | _fn_contexts.size()); |
271 | 0 | } |
272 | 223 | return _fn_contexts[i].get(); |
273 | 223 | } |
274 | | |
275 | | // execute expr with inverted index which column a, b has inverted indexes |
276 | | // but some situation although column b has indexes, but apply index is not useful, we should |
277 | | // skip this expr, just do not apply index anymore. |
278 | | [[nodiscard]] Status evaluate_inverted_index(uint32_t segment_num_rows); |
279 | | |
280 | | bool all_expr_inverted_index_evaluated(); |
281 | | |
282 | | Status execute_filter(const Block* block, uint8_t* __restrict result_filter_data, size_t rows, |
283 | | bool accept_null, bool* can_filter_all); |
284 | | |
285 | | [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block); |
286 | | |
287 | | [[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block, |
288 | | size_t column_to_keep); |
289 | | |
290 | | [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& ctxs, |
291 | | const std::vector<IColumn::Filter*>* filters, |
292 | | bool accept_null, const Block* block, |
293 | | IColumn::Filter* result_filter, |
294 | | bool* can_filter_all); |
295 | | |
296 | | [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts, |
297 | | const Block* block, ColumnUInt8& null_map, |
298 | | IColumn::Filter& result_filter); |
299 | | |
300 | | static Status execute_conjuncts(const VExprContextSPtrs& ctxs, |
301 | | const std::vector<IColumn::Filter*>* filters, Block* block, |
302 | | IColumn::Filter* result_filter, bool* can_filter_all); |
303 | | |
304 | | [[nodiscard]] static Status execute_conjuncts_and_filter_block( |
305 | | const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter, |
306 | | int column_to_keep); |
307 | | |
308 | | static Status execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, |
309 | | std::vector<uint32_t>& columns_to_filter, |
310 | | int column_to_keep, IColumn::Filter& filter); |
311 | | |
312 | | [[nodiscard]] static Status get_output_block_after_execute_exprs(const VExprContextSPtrs&, |
313 | | const Block&, Block*, |
314 | | bool do_projection = false); |
315 | | |
316 | 6 | int get_last_result_column_id() const { |
317 | 6 | DCHECK(_last_result_column_id != -1); |
318 | 6 | return _last_result_column_id; |
319 | 6 | } |
320 | | |
321 | 29 | RuntimeFilterSelectivity& get_runtime_filter_selectivity() { |
322 | 29 | if (!_rf_selectivity) { |
323 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, "RuntimeFilterSelectivity is null"); |
324 | 0 | } |
325 | 29 | return *_rf_selectivity; |
326 | 29 | } |
327 | | |
328 | 0 | FunctionContext::FunctionStateScope get_function_state_scope() const { |
329 | 0 | return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; |
330 | 0 | } |
331 | | |
332 | | void clone_fn_contexts(VExprContext* other); |
333 | | |
334 | 0 | VExprContext& operator=(const VExprContext& other) { |
335 | 0 | if (this == &other) { |
336 | 0 | return *this; |
337 | 0 | } |
338 | 0 |
|
339 | 0 | _root = other._root; |
340 | 0 | _is_clone = other._is_clone; |
341 | 0 | _prepared = other._prepared; |
342 | 0 | _opened = other._opened; |
343 | 0 |
|
344 | 0 | for (const auto& fn : other._fn_contexts) { |
345 | 0 | _fn_contexts.emplace_back(fn->clone()); |
346 | 0 | } |
347 | 0 |
|
348 | 0 | _last_result_column_id = other._last_result_column_id; |
349 | 0 | _depth_num = other._depth_num; |
350 | 0 | return *this; |
351 | 0 | } |
352 | | |
353 | 0 | VExprContext& operator=(VExprContext&& other) { |
354 | 0 | _root = other._root; |
355 | 0 | other._root = nullptr; |
356 | 0 | _is_clone = other._is_clone; |
357 | 0 | _prepared = other._prepared; |
358 | 0 | _opened = other._opened; |
359 | 0 | _fn_contexts = std::move(other._fn_contexts); |
360 | 0 | _last_result_column_id = other._last_result_column_id; |
361 | 0 | _depth_num = other._depth_num; |
362 | 0 | return *this; |
363 | 0 | } |
364 | | |
365 | 111 | [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& contexts) { |
366 | 111 | size_t usage = 0; |
367 | 111 | std::for_each(contexts.cbegin(), contexts.cend(), |
368 | 111 | [&usage](auto&& context) { usage += context->_memory_usage; }); |
369 | 111 | return usage; |
370 | 111 | } |
371 | | |
372 | 0 | [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; } |
373 | | |
374 | | void prepare_ann_range_search(const doris::VectorSearchUserParams& params); |
375 | | |
376 | | Status evaluate_ann_range_search( |
377 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators, |
378 | | const std::vector<ColumnId>& idx_to_cid, |
379 | | const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators, |
380 | | const std::unordered_map<VExprContext*, std::unordered_map<ColumnId, VExpr*>>& |
381 | | common_expr_to_slotref_map, |
382 | | roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats& ann_index_stats); |
383 | | |
384 | | uint64_t get_digest(uint64_t seed) const; |
385 | | |
386 | | private: |
387 | | // Close method is called in vexpr context dector, not need call expicility |
388 | | void close(); |
389 | | |
390 | | static void _reset_memory_usage(const VExprContextSPtrs& contexts); |
391 | | |
392 | | friend class VExpr; |
393 | | |
394 | | /// The expr tree this context is for. |
395 | | VExprSPtr _root; |
396 | | |
397 | | /// True if this context came from a Clone() call. Used to manage FunctionStateScope. |
398 | | bool _is_clone = false; |
399 | | |
400 | | /// Variables keeping track of current state. |
401 | | bool _prepared = false; |
402 | | bool _opened = false; |
403 | | |
404 | | /// FunctionContexts for each registered expression. The FunctionContexts are created |
405 | | /// and owned by this VExprContext. |
406 | | std::vector<std::unique_ptr<FunctionContext>> _fn_contexts; |
407 | | |
408 | | int _last_result_column_id = -1; |
409 | | |
410 | | /// The depth of expression-tree. |
411 | | int _depth_num = 0; |
412 | | |
413 | | std::shared_ptr<IndexExecContext> _index_context; |
414 | | size_t _memory_usage = 0; |
415 | | |
416 | | segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime; |
417 | | bool _suitable_for_ann_index = true; |
418 | | |
419 | | std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity = |
420 | | std::make_unique<RuntimeFilterSelectivity>(); |
421 | | }; |
422 | | } // namespace doris |