be/src/exprs/vexpr_context.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstddef> |
24 | | #include <memory> |
25 | | #include <string> |
26 | | #include <unordered_map> |
27 | | #include <utility> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/factory_creator.h" |
31 | | #include "common/status.h" |
32 | | #include "core/block/block.h" |
33 | | #include "core/block/column_with_type_and_name.h" |
34 | | #include "core/column/column.h" |
35 | | #include "exec/runtime_filter/runtime_filter_selectivity.h" |
36 | | #include "exprs/function_context.h" |
37 | | #include "exprs/vexpr_fwd.h" |
38 | | #include "runtime/runtime_state.h" |
39 | | #include "storage/index/ann/ann_range_search_runtime.h" |
40 | | #include "storage/index/ann/ann_search_params.h" |
41 | | #include "storage/index/inverted/inverted_index_reader.h" |
42 | | #include "storage/segment/column_reader.h" |
43 | | |
44 | | namespace doris { |
45 | | class RowDescriptor; |
46 | | class RuntimeState; |
47 | | } // namespace doris |
48 | | |
49 | | namespace doris::segment_v2 { |
50 | | class Segment; |
51 | | class ColumnIterator; |
52 | | } // namespace doris::segment_v2 |
53 | | |
54 | | namespace doris { |
55 | | |
56 | | class ScoreRuntime; |
57 | | class LambdaExecutionContext; |
58 | | using ScoreRuntimeSPtr = std::shared_ptr<ScoreRuntime>; |
59 | | |
60 | | class IndexExecContext { |
61 | | public: |
62 | | IndexExecContext(const std::vector<ColumnId>& col_ids, |
63 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& index_iterators, |
64 | | const std::vector<IndexFieldNameAndTypePair>& storage_name_and_type_vec, |
65 | | std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& |
66 | | common_expr_index_status, |
67 | | ScoreRuntimeSPtr score_runtime, segment_v2::Segment* segment, |
68 | | const segment_v2::ColumnIteratorOptions& column_iter_opts) |
69 | 1.72M | : _col_ids(col_ids), |
70 | 1.72M | _index_iterators(index_iterators), |
71 | 1.72M | _storage_name_and_type(storage_name_and_type_vec), |
72 | 1.72M | _expr_index_status(common_expr_index_status), |
73 | 1.72M | _score_runtime(std::move(score_runtime)), |
74 | 1.72M | _segment(segment), |
75 | 1.72M | _column_iter_opts(column_iter_opts) {} |
76 | | |
77 | 16.0k | segment_v2::IndexIterator* get_inverted_index_iterator_by_column_id(int column_index) const { |
78 | 16.0k | if (column_index < 0 || column_index >= _col_ids.size()) { |
79 | 0 | return nullptr; |
80 | 0 | } |
81 | 16.0k | const auto& column_id = _col_ids[column_index]; |
82 | 16.0k | if (column_id >= _index_iterators.size()) { |
83 | 0 | return nullptr; |
84 | 0 | } |
85 | 16.0k | if (!_index_iterators[column_id]) { |
86 | 4.44k | return nullptr; |
87 | 4.44k | } |
88 | 11.6k | return _index_iterators[column_id].get(); |
89 | 16.0k | } |
90 | | |
91 | 14 | segment_v2::IndexIterator* get_inverted_index_iterator_by_id(ColumnId column_id) const { |
92 | 14 | if (column_id >= _index_iterators.size()) { |
93 | 0 | return nullptr; |
94 | 0 | } |
95 | 14 | if (!_index_iterators[column_id]) { |
96 | 14 | return nullptr; |
97 | 14 | } |
98 | 0 | return _index_iterators[column_id].get(); |
99 | 14 | } |
100 | | |
101 | | const IndexFieldNameAndTypePair* get_storage_name_and_type_by_column_id( |
102 | 16.4k | int column_index) const { |
103 | 16.4k | if (column_index < 0 || column_index >= _col_ids.size()) { |
104 | 0 | return nullptr; |
105 | 0 | } |
106 | 16.4k | const auto& column_id = _col_ids[column_index]; |
107 | 16.4k | if (column_id >= _storage_name_and_type.size()) { |
108 | 2 | return nullptr; |
109 | 2 | } |
110 | 16.4k | return &_storage_name_and_type[column_id]; |
111 | 16.4k | } |
112 | | |
113 | 28 | const IndexFieldNameAndTypePair* get_storage_name_and_type_by_id(ColumnId column_id) const { |
114 | 28 | if (column_id >= _storage_name_and_type.size()) { |
115 | 0 | return nullptr; |
116 | 0 | } |
117 | 28 | return &_storage_name_and_type[column_id]; |
118 | 28 | } |
119 | | |
120 | 0 | int column_index_by_id(ColumnId column_id) const { |
121 | 0 | for (int i = 0; i < _col_ids.size(); ++i) { |
122 | 0 | if (_col_ids[i] == column_id) { |
123 | 0 | return i; |
124 | 0 | } |
125 | 0 | } |
126 | 0 | return -1; |
127 | 0 | } |
128 | | |
129 | 0 | bool get_column_id(int column_index, ColumnId* column_id) const { |
130 | 0 | if (column_id == nullptr) { |
131 | 0 | return false; |
132 | 0 | } |
133 | 0 | if (column_index < 0 || column_index >= _col_ids.size()) { |
134 | 0 | return false; |
135 | 0 | } |
136 | 0 | *column_id = _col_ids[column_index]; |
137 | 0 | return true; |
138 | 0 | } |
139 | | |
140 | 28 | segment_v2::Segment* segment() const { return _segment; } |
141 | | |
142 | 0 | const segment_v2::ColumnIteratorOptions& column_iter_opts() const { return _column_iter_opts; } |
143 | | |
144 | 26.6k | bool has_index_result_for_expr(const VExpr* expr) const { |
145 | 26.6k | return _index_result_bitmap.contains(expr); |
146 | 26.6k | } |
147 | | |
148 | | void set_index_result_for_expr(const VExpr* expr, |
149 | 10.6k | segment_v2::InvertedIndexResultBitmap bitmap) { |
150 | 10.6k | _index_result_bitmap[expr] = std::move(bitmap); |
151 | 10.6k | } |
152 | | |
153 | | std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap>& |
154 | 15.2k | get_index_result_bitmap() { |
155 | 15.2k | return _index_result_bitmap; |
156 | 15.2k | } |
157 | | |
158 | 23.0k | std::unordered_map<const VExpr*, ColumnPtr>& get_index_result_column() { |
159 | 23.0k | return _index_result_column; |
160 | 23.0k | } |
161 | | |
162 | 10.6k | const segment_v2::InvertedIndexResultBitmap* get_index_result_for_expr(const VExpr* expr) { |
163 | 10.6k | auto iter = _index_result_bitmap.find(expr); |
164 | 10.6k | if (iter == _index_result_bitmap.end()) { |
165 | 0 | return nullptr; |
166 | 0 | } |
167 | 10.6k | return &iter->second; |
168 | 10.6k | } |
169 | | |
170 | 1.39k | void set_index_result_column_for_expr(const VExpr* expr, ColumnPtr column) { |
171 | 1.39k | _index_result_column[expr] = std::move(column); |
172 | 1.39k | } |
173 | | |
174 | 9.26k | void set_true_for_index_status(const VExpr* expr, int column_index) { |
175 | 9.26k | if (column_index < 0 || column_index >= _col_ids.size()) { |
176 | 0 | return; |
177 | 0 | } |
178 | 9.26k | const auto& column_id = _col_ids[column_index]; |
179 | 9.36k | if (_expr_index_status.contains(column_id)) { |
180 | 9.36k | if (_expr_index_status[column_id].contains(expr)) { |
181 | 9.35k | _expr_index_status[column_id][expr] = true; |
182 | 9.35k | } |
183 | 9.36k | } |
184 | 9.26k | } |
185 | | |
186 | 3.20k | ScoreRuntimeSPtr get_score_runtime() const { return _score_runtime; } |
187 | | |
188 | 4.96k | void set_analyzer_ctx_for_expr(const VExpr* expr, InvertedIndexAnalyzerCtxSPtr analyzer_ctx) { |
189 | 4.97k | if (expr == nullptr || analyzer_ctx == nullptr) { |
190 | 0 | return; |
191 | 0 | } |
192 | 4.96k | _expr_analyzer_ctx[expr] = std::move(analyzer_ctx); |
193 | 4.96k | } |
194 | | |
195 | 9.40k | const InvertedIndexAnalyzerCtx* get_analyzer_ctx_for_expr(const VExpr* expr) const { |
196 | 9.40k | auto iter = _expr_analyzer_ctx.find(expr); |
197 | 9.40k | if (iter == _expr_analyzer_ctx.end()) { |
198 | 5.05k | return nullptr; |
199 | 5.05k | } |
200 | 4.34k | return iter->second.get(); |
201 | 9.40k | } |
202 | | |
203 | 1.73M | void set_index_query_context(segment_v2::IndexQueryContextPtr index_query_context) { |
204 | 1.73M | _index_query_context = index_query_context; |
205 | 1.73M | } |
206 | | |
207 | 1.28k | const segment_v2::IndexQueryContextPtr& get_index_query_context() const { |
208 | 1.28k | return _index_query_context; |
209 | 1.28k | } |
210 | | |
211 | | private: |
212 | | // A reference to a vector of column IDs for the current expression's output columns. |
213 | | const std::vector<ColumnId>& _col_ids; |
214 | | |
215 | | // A reference to a vector of unique pointers to index iterators. |
216 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& _index_iterators; |
217 | | |
218 | | // A reference to a vector of storage name and type pairs related to schema. |
219 | | const std::vector<IndexFieldNameAndTypePair>& _storage_name_and_type; |
220 | | |
221 | | // A map of expressions to their corresponding inverted index result bitmaps. |
222 | | std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap> _index_result_bitmap; |
223 | | |
224 | | // A map of expressions to their corresponding result columns. |
225 | | std::unordered_map<const VExpr*, ColumnPtr> _index_result_column; |
226 | | |
227 | | // Per-expression analyzer context for inverted index evaluation. |
228 | | std::unordered_map<const VExpr*, InvertedIndexAnalyzerCtxSPtr> _expr_analyzer_ctx; |
229 | | |
230 | | // A reference to a map of common expressions to their inverted index evaluation status. |
231 | | std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& _expr_index_status; |
232 | | |
233 | | ScoreRuntimeSPtr _score_runtime; |
234 | | |
235 | | segment_v2::Segment* _segment = nullptr; // Ref |
236 | | segment_v2::ColumnIteratorOptions _column_iter_opts; |
237 | | segment_v2::IndexQueryContextPtr _index_query_context; |
238 | | }; |
239 | | |
240 | | class VExprContext { |
241 | | ENABLE_FACTORY_CREATOR(VExprContext); |
242 | | |
243 | | public: |
244 | | VExprContext(VExprSPtr expr); |
245 | | ~VExprContext(); |
246 | | [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc); |
247 | | [[nodiscard]] Status open(RuntimeState* state); |
248 | | [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx); |
249 | | [[nodiscard]] Status execute(Block* block, int* result_column_id); |
250 | | [[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column); |
251 | | [[nodiscard]] Status execute(const Block* block, ColumnWithTypeAndName& result_data); |
252 | | [[nodiscard]] DataTypePtr execute_type(const Block* block); |
253 | | [[nodiscard]] const std::string& expr_name() const; |
254 | | [[nodiscard]] bool is_blockable() const; |
255 | | |
256 | | [[nodiscard]] Status execute_const_expr(ColumnWithTypeAndName& result); |
257 | | |
258 | | double execute_cost() const; |
259 | | |
260 | 12.7M | VExprSPtr root() { return _root; } |
261 | 19.5k | void set_root(const VExprSPtr& expr) { _root = expr; } |
262 | 19.1k | void set_index_context(std::shared_ptr<IndexExecContext> index_context) { |
263 | 19.1k | _index_context = std::move(index_context); |
264 | 19.1k | } |
265 | | |
266 | 763k | std::shared_ptr<IndexExecContext> get_index_context() const { return _index_context; } |
267 | | |
268 | | LambdaExecutionContext& lambda_execution_context(); |
269 | | |
270 | | const LambdaExecutionContext& lambda_execution_context() const; |
271 | | |
272 | | /// Creates a FunctionContext, and returns the index that's passed to fn_context() to |
273 | | /// retrieve the created context. Exprs that need a FunctionContext should call this in |
274 | | /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the |
275 | | /// size of the varargs buffer in the created FunctionContext (see udf-internal.h). |
276 | | int register_function_context(RuntimeState* state, const DataTypePtr& return_type, |
277 | | const std::vector<DataTypePtr>& arg_types); |
278 | | |
279 | | /// Retrieves a registered FunctionContext. 'i' is the index returned by the call to |
280 | | /// register_function_context(). This should only be called by VExprs. |
281 | 5.63M | FunctionContext* fn_context(int i) { |
282 | 5.63M | if (i < 0 || i >= _fn_contexts.size()) { |
283 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
284 | 0 | "fn_context index invalid, index={}, _fn_contexts.size()={}", i, |
285 | 0 | _fn_contexts.size()); |
286 | 0 | } |
287 | 5.63M | return _fn_contexts[i].get(); |
288 | 5.63M | } |
289 | | |
290 | | // execute expr with inverted index which column a, b has inverted indexes |
291 | | // but some situation although column b has indexes, but apply index is not useful, we should |
292 | | // skip this expr, just do not apply index anymore. |
293 | | [[nodiscard]] Status evaluate_inverted_index(uint32_t segment_num_rows); |
294 | | |
295 | | bool all_expr_inverted_index_evaluated(); |
296 | | |
297 | | Status execute_filter(const Block* block, uint8_t* __restrict result_filter_data, size_t rows, |
298 | | bool accept_null, bool* can_filter_all); |
299 | | |
300 | | [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block); |
301 | | |
302 | | [[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block, |
303 | | size_t column_to_keep); |
304 | | |
305 | | [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& ctxs, |
306 | | const std::vector<IColumn::Filter*>* filters, |
307 | | bool accept_null, const Block* block, |
308 | | IColumn::Filter* result_filter, |
309 | | bool* can_filter_all); |
310 | | |
311 | | [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts, |
312 | | const Block* block, ColumnUInt8& null_map, |
313 | | IColumn::Filter& result_filter); |
314 | | |
315 | | static Status execute_conjuncts(const VExprContextSPtrs& ctxs, |
316 | | const std::vector<IColumn::Filter*>* filters, Block* block, |
317 | | IColumn::Filter* result_filter, bool* can_filter_all); |
318 | | |
319 | | [[nodiscard]] static Status execute_conjuncts_and_filter_block( |
320 | | const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter, |
321 | | int column_to_keep); |
322 | | |
323 | | static Status execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, |
324 | | std::vector<uint32_t>& columns_to_filter, |
325 | | int column_to_keep, IColumn::Filter& filter); |
326 | | |
327 | | [[nodiscard]] static Status get_output_block_after_execute_exprs(const VExprContextSPtrs&, |
328 | | const Block&, Block*, |
329 | | bool do_projection = false); |
330 | | |
331 | 16.0k | int get_last_result_column_id() const { |
332 | 16.0k | DCHECK(_last_result_column_id != -1); |
333 | 16.0k | return _last_result_column_id; |
334 | 16.0k | } |
335 | | |
336 | 90.6k | RuntimeFilterSelectivity& get_runtime_filter_selectivity() { |
337 | 90.6k | if (!_rf_selectivity) { |
338 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, "RuntimeFilterSelectivity is null"); |
339 | 0 | } |
340 | 90.6k | return *_rf_selectivity; |
341 | 90.6k | } |
342 | | |
343 | 0 | FunctionContext::FunctionStateScope get_function_state_scope() const { |
344 | 0 | return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; |
345 | 0 | } |
346 | | |
347 | | void clone_fn_contexts(VExprContext* other); |
348 | | |
349 | | VExprContext& operator=(const VExprContext& other); |
350 | | |
351 | | VExprContext& operator=(VExprContext&& other); |
352 | | |
353 | 592k | [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& contexts) { |
354 | 592k | size_t usage = 0; |
355 | 592k | std::for_each(contexts.cbegin(), contexts.cend(), |
356 | 592k | [&usage](auto&& context) { usage += context->_memory_usage; }); |
357 | 592k | return usage; |
358 | 592k | } |
359 | | |
360 | 0 | [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; } |
361 | | |
362 | | void prepare_ann_range_search(const doris::VectorSearchUserParams& params); |
363 | | |
364 | | Status evaluate_ann_range_search( |
365 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators, |
366 | | const std::vector<ColumnId>& idx_to_cid, |
367 | | const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators, |
368 | | const std::unordered_map<VExprContext*, std::unordered_map<ColumnId, VExpr*>>& |
369 | | common_expr_to_slotref_map, |
370 | | size_t rows_of_segment, roaring::Roaring& row_bitmap, |
371 | | segment_v2::AnnIndexStats& ann_index_stats, bool enable_result_cache, |
372 | | bool* ann_range_search_executed); |
373 | | |
374 | | uint64_t get_digest(uint64_t seed) const; |
375 | | |
376 | | private: |
377 | | // Close method is called in vexpr context dector, not need call expicility |
378 | | void close(); |
379 | | |
380 | | static void _reset_memory_usage(const VExprContextSPtrs& contexts); |
381 | | |
382 | | friend class VExpr; |
383 | | |
384 | | /// The expr tree this context is for. |
385 | | VExprSPtr _root; |
386 | | |
387 | | /// True if this context came from a Clone() call. Used to manage FunctionStateScope. |
388 | | bool _is_clone = false; |
389 | | |
390 | | /// Variables keeping track of current state. |
391 | | bool _prepared = false; |
392 | | bool _opened = false; |
393 | | |
394 | | /// FunctionContexts for each registered expression. The FunctionContexts are created |
395 | | /// and owned by this VExprContext. |
396 | | std::vector<std::unique_ptr<FunctionContext>> _fn_contexts; |
397 | | |
398 | | int _last_result_column_id = -1; |
399 | | |
400 | | /// The depth of expression-tree. |
401 | | int _depth_num = 0; |
402 | | |
403 | | std::shared_ptr<IndexExecContext> _index_context; |
404 | | size_t _memory_usage = 0; |
405 | | |
406 | | segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime; |
407 | | bool _suitable_for_ann_index = true; |
408 | | |
409 | | std::unique_ptr<LambdaExecutionContext> _lambda_execution_context; |
410 | | |
411 | | std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity = |
412 | | std::make_unique<RuntimeFilterSelectivity>(); |
413 | | }; |
414 | | } // namespace doris |