Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cstddef> |
24 | | #include <memory> |
25 | | #include <optional> |
26 | | #include <unordered_map> |
27 | | #include <utility> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/factory_creator.h" |
31 | | #include "common/status.h" |
32 | | #include "core/block/block.h" |
33 | | #include "core/block/column_with_type_and_name.h" |
34 | | #include "core/column/column.h" |
35 | | #include "exec/runtime_filter/runtime_filter_selectivity.h" |
36 | | #include "exprs/function_context.h" |
37 | | #include "exprs/vexpr_fwd.h" |
38 | | #include "runtime/runtime_state.h" |
39 | | #include "runtime/scan_filter_profile.h" |
40 | | #include "storage/index/ann/ann_range_search_runtime.h" |
41 | | #include "storage/index/ann/ann_search_params.h" |
42 | | #include "storage/index/inverted/inverted_index_reader.h" |
43 | | #include "storage/segment/column_reader.h" |
44 | | |
45 | | namespace doris { |
46 | | class RowDescriptor; |
47 | | class RuntimeState; |
48 | | } // namespace doris |
49 | | |
50 | | namespace doris::segment_v2 { |
51 | | class Segment; |
52 | | class ColumnIterator; |
53 | | } // namespace doris::segment_v2 |
54 | | |
55 | | namespace doris { |
56 | | |
57 | | class ScoreRuntime; |
58 | | using ScoreRuntimeSPtr = std::shared_ptr<ScoreRuntime>; |
59 | | |
60 | | class IndexExecContext { |
61 | | public: |
62 | | IndexExecContext(const std::vector<ColumnId>& col_ids, |
63 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& index_iterators, |
64 | | const std::vector<IndexFieldNameAndTypePair>& storage_name_and_type_vec, |
65 | | std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& |
66 | | common_expr_index_status, |
67 | | ScoreRuntimeSPtr score_runtime, segment_v2::Segment* segment, |
68 | | const segment_v2::ColumnIteratorOptions& column_iter_opts) |
69 | 2.09M | : _col_ids(col_ids), |
70 | 2.09M | _index_iterators(index_iterators), |
71 | 2.09M | _storage_name_and_type(storage_name_and_type_vec), |
72 | 2.09M | _expr_index_status(common_expr_index_status), |
73 | 2.09M | _score_runtime(std::move(score_runtime)), |
74 | 2.09M | _segment(segment), |
75 | 2.09M | _column_iter_opts(column_iter_opts) {} |
76 | | |
77 | 16.8k | segment_v2::IndexIterator* get_inverted_index_iterator_by_column_id(int column_index) const { |
78 | 16.8k | if (column_index < 0 || column_index >= _col_ids.size()) { |
79 | 0 | return nullptr; |
80 | 0 | } |
81 | 16.8k | const auto& column_id = _col_ids[column_index]; |
82 | 16.8k | if (column_id >= _index_iterators.size()) { |
83 | 0 | return nullptr; |
84 | 0 | } |
85 | 16.8k | if (!_index_iterators[column_id]) { |
86 | 4.75k | return nullptr; |
87 | 4.75k | } |
88 | 12.0k | return _index_iterators[column_id].get(); |
89 | 16.8k | } |
90 | | |
91 | 14 | segment_v2::IndexIterator* get_inverted_index_iterator_by_id(ColumnId column_id) const { |
92 | 14 | if (column_id >= _index_iterators.size()) { |
93 | 0 | return nullptr; |
94 | 0 | } |
95 | 14 | if (!_index_iterators[column_id]) { |
96 | 14 | return nullptr; |
97 | 14 | } |
98 | 0 | return _index_iterators[column_id].get(); |
99 | 14 | } |
100 | | |
101 | | const IndexFieldNameAndTypePair* get_storage_name_and_type_by_column_id( |
102 | 16.5k | int column_index) const { |
103 | 16.5k | if (column_index < 0 || column_index >= _col_ids.size()) { |
104 | 0 | return nullptr; |
105 | 0 | } |
106 | 16.5k | const auto& column_id = _col_ids[column_index]; |
107 | 16.5k | if (column_id >= _storage_name_and_type.size()) { |
108 | 2 | return nullptr; |
109 | 2 | } |
110 | 16.5k | return &_storage_name_and_type[column_id]; |
111 | 16.5k | } |
112 | | |
113 | 28 | const IndexFieldNameAndTypePair* get_storage_name_and_type_by_id(ColumnId column_id) const { |
114 | 28 | if (column_id >= _storage_name_and_type.size()) { |
115 | 0 | return nullptr; |
116 | 0 | } |
117 | 28 | return &_storage_name_and_type[column_id]; |
118 | 28 | } |
119 | | |
120 | 0 | int column_index_by_id(ColumnId column_id) const { |
121 | 0 | for (int i = 0; i < _col_ids.size(); ++i) { |
122 | 0 | if (_col_ids[i] == column_id) { |
123 | 0 | return i; |
124 | 0 | } |
125 | 0 | } |
126 | 0 | return -1; |
127 | 0 | } |
128 | | |
129 | 0 | bool get_column_id(int column_index, ColumnId* column_id) const { |
130 | 0 | if (column_id == nullptr) { |
131 | 0 | return false; |
132 | 0 | } |
133 | 0 | if (column_index < 0 || column_index >= _col_ids.size()) { |
134 | 0 | return false; |
135 | 0 | } |
136 | 0 | *column_id = _col_ids[column_index]; |
137 | 0 | return true; |
138 | 0 | } |
139 | | |
140 | 28 | segment_v2::Segment* segment() const { return _segment; } |
141 | | |
142 | 0 | const segment_v2::ColumnIteratorOptions& column_iter_opts() const { return _column_iter_opts; } |
143 | | |
144 | 27.5k | bool has_index_result_for_expr(const VExpr* expr) const { |
145 | 27.5k | return _index_result_bitmap.contains(expr); |
146 | 27.5k | } |
147 | | |
148 | | void set_index_result_for_expr(const VExpr* expr, |
149 | 11.0k | segment_v2::InvertedIndexResultBitmap bitmap) { |
150 | 11.0k | _index_result_bitmap[expr] = std::move(bitmap); |
151 | 11.0k | } |
152 | | |
153 | | std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap>& |
154 | 15.2k | get_index_result_bitmap() { |
155 | 15.2k | return _index_result_bitmap; |
156 | 15.2k | } |
157 | | |
158 | 24.0k | std::unordered_map<const VExpr*, ColumnPtr>& get_index_result_column() { |
159 | 24.0k | return _index_result_column; |
160 | 24.0k | } |
161 | | |
162 | 13.2k | const segment_v2::InvertedIndexResultBitmap* get_index_result_for_expr(const VExpr* expr) { |
163 | 13.2k | auto iter = _index_result_bitmap.find(expr); |
164 | 13.2k | if (iter == _index_result_bitmap.end()) { |
165 | 1.00k | return nullptr; |
166 | 1.00k | } |
167 | 12.2k | return &iter->second; |
168 | 13.2k | } |
169 | | |
170 | 1.43k | void set_index_result_column_for_expr(const VExpr* expr, ColumnPtr column) { |
171 | 1.43k | _index_result_column[expr] = std::move(column); |
172 | 1.43k | } |
173 | | |
174 | 9.61k | void set_true_for_index_status(const VExpr* expr, int column_index) { |
175 | 9.61k | if (column_index < 0 || column_index >= _col_ids.size()) { |
176 | 0 | return; |
177 | 0 | } |
178 | 9.61k | const auto& column_id = _col_ids[column_index]; |
179 | 9.67k | if (_expr_index_status.contains(column_id)) { |
180 | 9.67k | if (_expr_index_status[column_id].contains(expr)) { |
181 | 9.67k | _expr_index_status[column_id][expr] = true; |
182 | 9.67k | } |
183 | 9.67k | } |
184 | 9.61k | } |
185 | | |
186 | 3.24k | ScoreRuntimeSPtr get_score_runtime() const { return _score_runtime; } |
187 | | |
188 | 5.15k | void set_analyzer_ctx_for_expr(const VExpr* expr, InvertedIndexAnalyzerCtxSPtr analyzer_ctx) { |
189 | 5.15k | if (expr == nullptr || analyzer_ctx == nullptr) { |
190 | 0 | return; |
191 | 0 | } |
192 | 5.15k | _expr_analyzer_ctx[expr] = std::move(analyzer_ctx); |
193 | 5.15k | } |
194 | | |
195 | 9.83k | const InvertedIndexAnalyzerCtx* get_analyzer_ctx_for_expr(const VExpr* expr) const { |
196 | 9.83k | auto iter = _expr_analyzer_ctx.find(expr); |
197 | 9.83k | if (iter == _expr_analyzer_ctx.end()) { |
198 | 5.29k | return nullptr; |
199 | 5.29k | } |
200 | 4.53k | return iter->second.get(); |
201 | 9.83k | } |
202 | | |
203 | 2.09M | void set_index_query_context(segment_v2::IndexQueryContextPtr index_query_context) { |
204 | 2.09M | _index_query_context = index_query_context; |
205 | 2.09M | } |
206 | | |
207 | 1.33k | const segment_v2::IndexQueryContextPtr& get_index_query_context() const { |
208 | 1.33k | return _index_query_context; |
209 | 1.33k | } |
210 | | |
211 | | private: |
212 | | // A reference to a vector of column IDs for the current expression's output columns. |
213 | | const std::vector<ColumnId>& _col_ids; |
214 | | |
215 | | // A reference to a vector of unique pointers to index iterators. |
216 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& _index_iterators; |
217 | | |
218 | | // A reference to a vector of storage name and type pairs related to schema. |
219 | | const std::vector<IndexFieldNameAndTypePair>& _storage_name_and_type; |
220 | | |
221 | | // A map of expressions to their corresponding inverted index result bitmaps. |
222 | | std::unordered_map<const VExpr*, segment_v2::InvertedIndexResultBitmap> _index_result_bitmap; |
223 | | |
224 | | // A map of expressions to their corresponding result columns. |
225 | | std::unordered_map<const VExpr*, ColumnPtr> _index_result_column; |
226 | | |
227 | | // Per-expression analyzer context for inverted index evaluation. |
228 | | std::unordered_map<const VExpr*, InvertedIndexAnalyzerCtxSPtr> _expr_analyzer_ctx; |
229 | | |
230 | | // A reference to a map of common expressions to their inverted index evaluation status. |
231 | | std::unordered_map<ColumnId, std::unordered_map<const VExpr*, bool>>& _expr_index_status; |
232 | | |
233 | | ScoreRuntimeSPtr _score_runtime; |
234 | | |
235 | | segment_v2::Segment* _segment = nullptr; // Ref |
236 | | segment_v2::ColumnIteratorOptions _column_iter_opts; |
237 | | segment_v2::IndexQueryContextPtr _index_query_context; |
238 | | }; |
239 | | |
240 | | class VExprContext { |
241 | | ENABLE_FACTORY_CREATOR(VExprContext); |
242 | | |
243 | | public: |
244 | 29.0M | VExprContext(VExprSPtr expr) : _root(std::move(expr)) {} |
245 | | ~VExprContext(); |
246 | | [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc); |
247 | | [[nodiscard]] Status open(RuntimeState* state); |
248 | | [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx); |
249 | | [[nodiscard]] Status execute(Block* block, int* result_column_id); |
250 | | [[nodiscard]] Status execute(const Block* block, ColumnPtr& result_column); |
251 | | [[nodiscard]] Status execute(const Block* block, ColumnWithTypeAndName& result_data); |
252 | | [[nodiscard]] DataTypePtr execute_type(const Block* block); |
253 | | [[nodiscard]] const std::string& expr_name() const; |
254 | | [[nodiscard]] bool is_blockable() const; |
255 | | |
256 | | [[nodiscard]] Status execute_const_expr(ColumnWithTypeAndName& result); |
257 | | |
258 | | double execute_cost() const; |
259 | | |
260 | 15.4M | VExprSPtr root() { return _root; } |
261 | 28.6k | void set_root(const VExprSPtr& expr) { _root = expr; } |
262 | 19.6k | void set_index_context(std::shared_ptr<IndexExecContext> index_context) { |
263 | 19.6k | _index_context = std::move(index_context); |
264 | 19.6k | } |
265 | | |
266 | 1.03M | std::shared_ptr<IndexExecContext> get_index_context() const { return _index_context; } |
267 | | |
268 | 2.64k | void attach_scan_filter(ScanFilterHandle handle) { _scan_filter_handle = std::move(handle); } |
269 | | |
270 | 129k | const ScanFilterHandle& scan_filter_handle() const { return _scan_filter_handle; } |
271 | | |
272 | | /// Creates a FunctionContext, and returns the index that's passed to fn_context() to |
273 | | /// retrieve the created context. Exprs that need a FunctionContext should call this in |
274 | | /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the |
275 | | /// size of the varargs buffer in the created FunctionContext (see udf-internal.h). |
276 | | int register_function_context(RuntimeState* state, const DataTypePtr& return_type, |
277 | | const std::vector<DataTypePtr>& arg_types); |
278 | | |
279 | | /// Retrieves a registered FunctionContext. 'i' is the index returned by the call to |
280 | | /// register_function_context(). This should only be called by VExprs. |
281 | 6.77M | FunctionContext* fn_context(int i) { |
282 | 6.77M | if (i < 0 || i >= _fn_contexts.size()) { |
283 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, |
284 | 0 | "fn_context index invalid, index={}, _fn_contexts.size()={}", i, |
285 | 0 | _fn_contexts.size()); |
286 | 0 | } |
287 | 6.77M | return _fn_contexts[i].get(); |
288 | 6.77M | } |
289 | | |
290 | | // execute expr with inverted index which column a, b has inverted indexes |
291 | | // but some situation although column b has indexes, but apply index is not useful, we should |
292 | | // skip this expr, just do not apply index anymore. |
293 | | [[nodiscard]] Status evaluate_inverted_index(uint32_t segment_num_rows); |
294 | | |
295 | | bool all_expr_inverted_index_evaluated(); |
296 | | |
297 | | Status execute_filter(const Block* block, uint8_t* __restrict result_filter_data, size_t rows, |
298 | | bool accept_null, bool* can_filter_all); |
299 | | |
300 | | [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block); |
301 | | |
302 | | [[nodiscard]] static Status filter_block( |
303 | | const VExprContextSPtrs& expr_contexts, Block* block, size_t column_to_keep, |
304 | | std::optional<ScanFilterStage> scan_filter_stage = std::nullopt); |
305 | | |
306 | | [[nodiscard]] static Status execute_conjuncts( |
307 | | const VExprContextSPtrs& ctxs, const std::vector<IColumn::Filter*>* filters, |
308 | | bool accept_null, const Block* block, IColumn::Filter* result_filter, |
309 | | bool* can_filter_all, std::optional<ScanFilterStage> scan_filter_stage = std::nullopt); |
310 | | |
311 | | [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts, |
312 | | const Block* block, ColumnUInt8& null_map, |
313 | | IColumn::Filter& result_filter); |
314 | | |
315 | | static Status execute_conjuncts( |
316 | | const VExprContextSPtrs& ctxs, const std::vector<IColumn::Filter*>* filters, |
317 | | Block* block, IColumn::Filter* result_filter, bool* can_filter_all, |
318 | | std::optional<ScanFilterStage> scan_filter_stage = std::nullopt); |
319 | | |
320 | | [[nodiscard]] static Status execute_conjuncts_and_filter_block( |
321 | | const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter, |
322 | | int column_to_keep, std::optional<ScanFilterStage> scan_filter_stage = std::nullopt); |
323 | | |
324 | | static Status execute_conjuncts_and_filter_block( |
325 | | const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter, |
326 | | int column_to_keep, IColumn::Filter& filter, |
327 | | std::optional<ScanFilterStage> scan_filter_stage = std::nullopt); |
328 | | |
329 | | [[nodiscard]] static Status get_output_block_after_execute_exprs(const VExprContextSPtrs&, |
330 | | const Block&, Block*, |
331 | | bool do_projection = false); |
332 | | |
333 | 18.2k | int get_last_result_column_id() const { |
334 | 18.2k | DCHECK(_last_result_column_id != -1); |
335 | 18.2k | return _last_result_column_id; |
336 | 18.2k | } |
337 | | |
338 | 149k | RuntimeFilterSelectivity& get_runtime_filter_selectivity() { |
339 | 149k | if (!_rf_selectivity) { |
340 | 0 | throw Exception(ErrorCode::INTERNAL_ERROR, "RuntimeFilterSelectivity is null"); |
341 | 0 | } |
342 | 149k | return *_rf_selectivity; |
343 | 149k | } |
344 | | |
345 | 0 | FunctionContext::FunctionStateScope get_function_state_scope() const { |
346 | 0 | return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; |
347 | 0 | } |
348 | | |
349 | | void clone_fn_contexts(VExprContext* other); |
350 | | |
351 | 0 | VExprContext& operator=(const VExprContext& other) { |
352 | 0 | if (this == &other) { |
353 | 0 | return *this; |
354 | 0 | } |
355 | 0 |
|
356 | 0 | _root = other._root; |
357 | 0 | _is_clone = other._is_clone; |
358 | 0 | _prepared = other._prepared; |
359 | 0 | _opened = other._opened; |
360 | 0 |
|
361 | 0 | for (const auto& fn : other._fn_contexts) { |
362 | 0 | _fn_contexts.emplace_back(fn->clone()); |
363 | 0 | } |
364 | 0 |
|
365 | 0 | _last_result_column_id = other._last_result_column_id; |
366 | 0 | _depth_num = other._depth_num; |
367 | 0 | _scan_filter_handle = other._scan_filter_handle; |
368 | 0 | return *this; |
369 | 0 | } |
370 | | |
371 | 0 | VExprContext& operator=(VExprContext&& other) { |
372 | 0 | _root = other._root; |
373 | 0 | other._root = nullptr; |
374 | 0 | _is_clone = other._is_clone; |
375 | 0 | _prepared = other._prepared; |
376 | 0 | _opened = other._opened; |
377 | 0 | _fn_contexts = std::move(other._fn_contexts); |
378 | 0 | _last_result_column_id = other._last_result_column_id; |
379 | 0 | _depth_num = other._depth_num; |
380 | 0 | _scan_filter_handle = other._scan_filter_handle; |
381 | 0 | return *this; |
382 | 0 | } |
383 | | |
384 | 623k | [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& contexts) { |
385 | 623k | size_t usage = 0; |
386 | 623k | std::for_each(contexts.cbegin(), contexts.cend(), |
387 | 623k | [&usage](auto&& context) { usage += context->_memory_usage; }); |
388 | 623k | return usage; |
389 | 623k | } |
390 | | |
391 | 0 | [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; } |
392 | | |
393 | | void prepare_ann_range_search(const doris::VectorSearchUserParams& params); |
394 | | |
395 | | Status evaluate_ann_range_search( |
396 | | const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators, |
397 | | const std::vector<ColumnId>& idx_to_cid, |
398 | | const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators, |
399 | | const std::unordered_map<VExprContext*, std::unordered_map<ColumnId, VExpr*>>& |
400 | | common_expr_to_slotref_map, |
401 | | roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats& ann_index_stats, |
402 | | bool enable_result_cache, bool* ann_range_search_executed); |
403 | | |
404 | | uint64_t get_digest(uint64_t seed) const; |
405 | | |
406 | | private: |
407 | | // Close method is called in vexpr context dector, not need call expicility |
408 | | void close(); |
409 | | |
410 | | static void _reset_memory_usage(const VExprContextSPtrs& contexts); |
411 | | |
412 | | friend class VExpr; |
413 | | |
414 | | /// The expr tree this context is for. |
415 | | VExprSPtr _root; |
416 | | |
417 | | /// True if this context came from a Clone() call. Used to manage FunctionStateScope. |
418 | | bool _is_clone = false; |
419 | | |
420 | | /// Variables keeping track of current state. |
421 | | bool _prepared = false; |
422 | | bool _opened = false; |
423 | | |
424 | | /// FunctionContexts for each registered expression. The FunctionContexts are created |
425 | | /// and owned by this VExprContext. |
426 | | std::vector<std::unique_ptr<FunctionContext>> _fn_contexts; |
427 | | |
428 | | int _last_result_column_id = -1; |
429 | | |
430 | | /// The depth of expression-tree. |
431 | | int _depth_num = 0; |
432 | | |
433 | | std::shared_ptr<IndexExecContext> _index_context; |
434 | | ScanFilterHandle _scan_filter_handle; |
435 | | size_t _memory_usage = 0; |
436 | | |
437 | | segment_v2::AnnRangeSearchRuntime _ann_range_search_runtime; |
438 | | bool _suitable_for_ann_index = true; |
439 | | |
440 | | std::unique_ptr<RuntimeFilterSelectivity> _rf_selectivity = |
441 | | std::make_unique<RuntimeFilterSelectivity>(); |
442 | | }; |
443 | | } // namespace doris |