be/src/storage/segment/segment_iterator.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/segment_iterator.h" |
19 | | |
20 | | #include <assert.h> |
21 | | #include <gen_cpp/Exprs_types.h> |
22 | | #include <gen_cpp/Opcodes_types.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <gen_cpp/olap_file.pb.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <boost/iterator/iterator_facade.hpp> |
28 | | #include <cstdint> |
29 | | #include <memory> |
30 | | #include <numeric> |
31 | | #include <set> |
32 | | #include <unordered_map> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | #include "cloud/config.h" |
37 | | #include "common/compiler_util.h" // IWYU pragma: keep |
38 | | #include "common/config.h" |
39 | | #include "common/consts.h" |
40 | | #include "common/exception.h" |
41 | | #include "common/logging.h" |
42 | | #include "common/metrics/doris_metrics.h" |
43 | | #include "common/object_pool.h" |
44 | | #include "common/status.h" |
45 | | #include "core/assert_cast.h" |
46 | | #include "core/block/column_with_type_and_name.h" |
47 | | #include "core/column/column.h" |
48 | | #include "core/column/column_const.h" |
49 | | #include "core/column/column_nothing.h" |
50 | | #include "core/column/column_nullable.h" |
51 | | #include "core/column/column_string.h" |
52 | | #include "core/column/column_variant.h" |
53 | | #include "core/column/column_vector.h" |
54 | | #include "core/data_type/data_type.h" |
55 | | #include "core/data_type/data_type_factory.hpp" |
56 | | #include "core/data_type/data_type_number.h" |
57 | | #include "core/data_type/define_primitive_type.h" |
58 | | #include "core/field.h" |
59 | | #include "core/string_ref.h" |
60 | | #include "core/typeid_cast.h" |
61 | | #include "core/types.h" |
62 | | #include "exprs/function/array/function_array_index.h" |
63 | | #include "exprs/vexpr.h" |
64 | | #include "exprs/vexpr_context.h" |
65 | | #include "exprs/virtual_slot_ref.h" |
66 | | #include "exprs/vliteral.h" |
67 | | #include "exprs/vslot_ref.h" |
68 | | #include "io/cache/cached_remote_file_reader.h" |
69 | | #include "io/fs/file_reader.h" |
70 | | #include "io/io_common.h" |
71 | | #include "runtime/query_context.h" |
72 | | #include "runtime/runtime_predicate.h" |
73 | | #include "runtime/runtime_state.h" |
74 | | #include "runtime/thread_context.h" |
75 | | #include "storage/compaction/collection_similarity.h" |
76 | | #include "storage/field.h" |
77 | | #include "storage/id_manager.h" |
78 | | #include "storage/index/ann/ann_index.h" |
79 | | #include "storage/index/ann/ann_index_reader.h" |
80 | | #include "storage/index/ann/ann_topn_runtime.h" |
81 | | #include "storage/index/index_file_reader.h" |
82 | | #include "storage/index/index_iterator.h" |
83 | | #include "storage/index/index_query_context.h" |
84 | | #include "storage/index/index_reader_helper.h" |
85 | | #include "storage/index/indexed_column_reader.h" |
86 | | #include "storage/index/inverted/inverted_index_reader.h" |
87 | | #include "storage/index/ordinal_page_index.h" |
88 | | #include "storage/index/primary_key_index.h" |
89 | | #include "storage/index/short_key_index.h" |
90 | | #include "storage/iterators.h" |
91 | | #include "storage/olap_common.h" |
92 | | #include "storage/predicate/bloom_filter_predicate.h" |
93 | | #include "storage/predicate/column_predicate.h" |
94 | | #include "storage/predicate/like_column_predicate.h" |
95 | | #include "storage/schema.h" |
96 | | #include "storage/segment/column_reader.h" |
97 | | #include "storage/segment/column_reader_cache.h" |
98 | | #include "storage/segment/condition_cache.h" |
99 | | #include "storage/segment/row_ranges.h" |
100 | | #include "storage/segment/segment.h" |
101 | | #include "storage/segment/segment_prefetcher.h" |
102 | | #include "storage/segment/variant/variant_column_reader.h" |
103 | | #include "storage/segment/virtual_column_iterator.h" |
104 | | #include "storage/tablet/tablet_schema.h" |
105 | | #include "storage/types.h" |
106 | | #include "storage/utils.h" |
107 | | #include "util/concurrency_stats.h" |
108 | | #include "util/defer_op.h" |
109 | | #include "util/simd/bits.h" |
110 | | |
111 | | namespace doris { |
112 | | using namespace ErrorCode; |
113 | | namespace segment_v2 { |
114 | | |
115 | | #include "common/compile_check_begin.h" |
116 | | |
117 | 5.63k | SegmentIterator::~SegmentIterator() = default; |
118 | | |
119 | 5.63k | void SegmentIterator::_init_row_bitmap_by_condition_cache() { |
120 | | // Only dispose need column predicate and expr cal in condition cache |
121 | 5.63k | if (!_col_predicates.empty() || |
122 | 5.63k | (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty())) { |
123 | 0 | if (_opts.condition_cache_digest) { |
124 | 0 | auto* condition_cache = ConditionCache::instance(); |
125 | 0 | ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(), |
126 | 0 | _opts.condition_cache_digest); |
127 | | |
128 | | // Increment search count when digest != 0 |
129 | 0 | DorisMetrics::instance()->condition_cache_search_count->increment(1); |
130 | |
|
131 | 0 | ConditionCacheHandle handle; |
132 | 0 | _find_condition_cache = condition_cache->lookup(cache_key, &handle); |
133 | | |
134 | | // Increment hit count if cache lookup is successful |
135 | 0 | if (_find_condition_cache) { |
136 | 0 | DorisMetrics::instance()->condition_cache_hit_count->increment(1); |
137 | 0 | if (_opts.runtime_state) { |
138 | 0 | VLOG_DEBUG << "Condition cache hit, query id: " |
139 | 0 | << print_id(_opts.runtime_state->query_id()) |
140 | 0 | << ", segment id: " << _segment->id() |
141 | 0 | << ", cache digest: " << _opts.condition_cache_digest |
142 | 0 | << ", rowset id: " << _opts.rowset_id.to_string(); |
143 | 0 | } |
144 | 0 | } |
145 | |
|
146 | 0 | auto num_rows = _segment->num_rows(); |
147 | 0 | if (_find_condition_cache) { |
148 | 0 | const auto& filter_result = *(handle.get_filter_result()); |
149 | 0 | int64_t filtered_blocks = 0; |
150 | 0 | for (int i = 0; i < filter_result.size(); i++) { |
151 | 0 | if (!filter_result[i]) { |
152 | 0 | _row_bitmap.removeRange( |
153 | 0 | i * CONDITION_CACHE_OFFSET, |
154 | 0 | i * CONDITION_CACHE_OFFSET + CONDITION_CACHE_OFFSET); |
155 | 0 | filtered_blocks++; |
156 | 0 | } |
157 | 0 | } |
158 | | // Record condition_cache hit segment number |
159 | 0 | _opts.stats->condition_cache_hit_seg_nums++; |
160 | | // Record rows filtered by condition cache hit |
161 | 0 | _opts.stats->condition_cache_filtered_rows += |
162 | 0 | filtered_blocks * SegmentIterator::CONDITION_CACHE_OFFSET; |
163 | 0 | } else { |
164 | 0 | _condition_cache = std::make_shared<std::vector<bool>>( |
165 | 0 | num_rows / CONDITION_CACHE_OFFSET + 1, false); |
166 | 0 | } |
167 | 0 | } |
168 | 5.63k | } else { |
169 | 5.63k | _opts.condition_cache_digest = 0; |
170 | 5.63k | } |
171 | 5.63k | } |
172 | | |
173 | | // A fast range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to). |
174 | | // Example: |
175 | | // input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] |
176 | | // output ranges: [0,2), [4,8), [10,11), [15,20) (when max_range_size=10) |
177 | | // output ranges: [0,2), [4,7), [7,8), [10,11), [15,18), [18,20) (when max_range_size=3) |
178 | | class SegmentIterator::BitmapRangeIterator { |
179 | | public: |
180 | 0 | BitmapRangeIterator() = default; |
181 | 5.63k | virtual ~BitmapRangeIterator() = default; |
182 | | |
183 | 5.63k | explicit BitmapRangeIterator(const roaring::Roaring& bitmap) { |
184 | 5.63k | roaring_init_iterator(&bitmap.roaring, &_iter); |
185 | 5.63k | } |
186 | | |
187 | 0 | bool has_more_range() const { return !_eof; } |
188 | | |
189 | 12.1k | [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; } |
190 | | |
191 | | // read next range into [*from, *to) whose size <= max_range_size. |
192 | | // return false when there is no more range. |
193 | 0 | virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) { |
194 | 0 | if (_eof) { |
195 | 0 | return false; |
196 | 0 | } |
197 | | |
198 | 0 | *from = _buf[_buf_pos]; |
199 | 0 | uint32_t range_size = 0; |
200 | 0 | uint32_t expect_val = _buf[_buf_pos]; // this initial value just make first batch valid |
201 | | |
202 | | // if array is contiguous sequence then the following conditions need to be met : |
203 | | // a_0: x |
204 | | // a_1: x+1 |
205 | | // a_2: x+2 |
206 | | // ... |
207 | | // a_p: x+p |
208 | | // so we can just use (a_p-a_0)-p to check conditions |
209 | | // and should notice the previous batch needs to be continuous with the current batch |
210 | 0 | while (!_eof && range_size + _buf_size - _buf_pos <= max_range_size && |
211 | 0 | expect_val == _buf[_buf_pos] && |
212 | 0 | _buf[_buf_size - 1] - _buf[_buf_pos] == _buf_size - 1 - _buf_pos) { |
213 | 0 | range_size += _buf_size - _buf_pos; |
214 | 0 | expect_val = _buf[_buf_size - 1] + 1; |
215 | 0 | _read_next_batch(); |
216 | 0 | } |
217 | | |
218 | | // promise remain range not will reach next batch |
219 | 0 | if (!_eof && range_size < max_range_size && expect_val == _buf[_buf_pos]) { |
220 | 0 | do { |
221 | 0 | _buf_pos++; |
222 | 0 | range_size++; |
223 | 0 | } while (range_size < max_range_size && _buf[_buf_pos] == _buf[_buf_pos - 1] + 1); |
224 | 0 | } |
225 | 0 | *to = *from + range_size; |
226 | 0 | return true; |
227 | 0 | } |
228 | | |
229 | | // read batch_size of rowids from roaring bitmap into buf array |
230 | 23.1k | virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) { |
231 | 23.1k | return roaring::api::roaring_read_uint32_iterator(&_iter, buf, batch_size); |
232 | 23.1k | } |
233 | | |
234 | | private: |
235 | 0 | void _read_next_batch() { |
236 | 0 | _buf_pos = 0; |
237 | 0 | _buf_size = roaring::api::roaring_read_uint32_iterator(&_iter, _buf, kBatchSize); |
238 | 0 | _eof = (_buf_size == 0); |
239 | 0 | } |
240 | | |
241 | | static const uint32_t kBatchSize = 256; |
242 | | roaring::api::roaring_uint32_iterator_t _iter; |
243 | | uint32_t _buf[kBatchSize]; |
244 | | uint32_t _buf_pos = 0; |
245 | | uint32_t _buf_size = 0; |
246 | | bool _eof = false; |
247 | | }; |
248 | | |
249 | | // A backward range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to). |
250 | | // Example: |
251 | | // input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] |
252 | | // output ranges: , [15,20), [10,11), [4,8), [0,2) (when max_range_size=10) |
253 | | // output ranges: [17,20), [15,17), [10,11), [5,8), [4, 5), [0,2) (when max_range_size=3) |
254 | | class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::BitmapRangeIterator { |
255 | | public: |
256 | 0 | explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) { |
257 | 0 | roaring_init_iterator_last(&bitmap.roaring, &_riter); |
258 | 0 | _rowid_count = cast_set<uint32_t>(roaring_bitmap_get_cardinality(&bitmap.roaring)); |
259 | 0 | _rowid_left = _rowid_count; |
260 | 0 | } |
261 | | |
262 | 0 | bool has_more_range() const { return !_riter.has_value; } |
263 | | |
264 | | // read next range into [*from, *to) whose size <= max_range_size. |
265 | | // return false when there is no more range. |
266 | 0 | bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) override { |
267 | 0 | if (!_riter.has_value) { |
268 | 0 | return false; |
269 | 0 | } |
270 | | |
271 | 0 | uint32_t range_size = 0; |
272 | 0 | *to = _riter.current_value + 1; |
273 | |
|
274 | 0 | do { |
275 | 0 | *from = _riter.current_value; |
276 | 0 | range_size++; |
277 | 0 | roaring_previous_uint32_iterator(&_riter); |
278 | 0 | } while (range_size < max_range_size && _riter.has_value && |
279 | 0 | _riter.current_value + 1 == *from); |
280 | |
|
281 | 0 | return true; |
282 | 0 | } |
283 | | /** |
284 | | * Reads a batch of row IDs from a roaring bitmap, starting from the end and moving backwards. |
285 | | * This function retrieves the last `batch_size` row IDs from the bitmap and stores them in the provided buffer. |
286 | | * It updates the internal state to track how many row IDs are left to read in subsequent calls. |
287 | | * |
288 | | * The row IDs are read in reverse order, but stored in the buffer maintaining their original order in the bitmap. |
289 | | * |
290 | | * Example: |
291 | | * input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] |
292 | | * If the bitmap has 12 elements and batch_size is set to 5, the function will first read [15, 16, 17, 18, 19] |
293 | | * into the buffer, leaving 7 elements left. In the next call with batch_size 5, it will read [4, 5, 6, 7, 10]. |
294 | | * |
295 | | */ |
296 | 0 | uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override { |
297 | 0 | if (!_riter.has_value || _rowid_left == 0) { |
298 | 0 | return 0; |
299 | 0 | } |
300 | | |
301 | 0 | if (_rowid_count <= batch_size) { |
302 | 0 | roaring_bitmap_to_uint32_array(_riter.parent, |
303 | 0 | buf); // Fill 'buf' with '_rowid_count' elements. |
304 | 0 | uint32_t num_read = _rowid_left; // Save the number of row IDs read. |
305 | 0 | _rowid_left = 0; // No row IDs left after this operation. |
306 | 0 | return num_read; // Return the number of row IDs read. |
307 | 0 | } |
308 | | |
309 | 0 | uint32_t read_size = std::min(batch_size, _rowid_left); |
310 | 0 | uint32_t num_read = 0; // Counter for the number of row IDs read. |
311 | | |
312 | | // Read row IDs into the buffer in reverse order. |
313 | 0 | while (num_read < read_size && _riter.has_value) { |
314 | 0 | buf[read_size - num_read - 1] = _riter.current_value; |
315 | 0 | num_read++; |
316 | 0 | _rowid_left--; // Decrement the count of remaining row IDs. |
317 | 0 | roaring_previous_uint32_iterator(&_riter); |
318 | 0 | } |
319 | | |
320 | | // Return the actual number of row IDs read. |
321 | 0 | return num_read; |
322 | 0 | } |
323 | | |
324 | | private: |
325 | | roaring::api::roaring_uint32_iterator_t _riter; |
326 | | uint32_t _rowid_count; |
327 | | uint32_t _rowid_left; |
328 | | }; |
329 | | |
330 | | SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema) |
331 | 5.63k | : _segment(std::move(segment)), |
332 | 5.63k | _schema(schema), |
333 | 5.63k | _column_iterators(_schema->num_columns()), |
334 | 5.63k | _index_iterators(_schema->num_columns()), |
335 | 5.63k | _cur_rowid(0), |
336 | 5.63k | _lazy_materialization_read(false), |
337 | 5.63k | _lazy_inited(false), |
338 | 5.63k | _inited(false), |
339 | 5.63k | _pool(new ObjectPool) {} |
340 | | |
341 | 11.1k | Status SegmentIterator::init(const StorageReadOptions& opts) { |
342 | 11.1k | auto status = _init_impl(opts); |
343 | 11.1k | if (!status.ok()) { |
344 | 0 | _segment->update_healthy_status(status); |
345 | 0 | } |
346 | 11.1k | return status; |
347 | 11.1k | } |
348 | | |
349 | 11.1k | Status SegmentIterator::_init_impl(const StorageReadOptions& opts) { |
350 | | // get file handle from file descriptor of segment |
351 | 11.1k | if (_inited) { |
352 | 5.49k | return Status::OK(); |
353 | 5.49k | } |
354 | 5.63k | _opts = opts; |
355 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_timer_ns); |
356 | 5.63k | _inited = true; |
357 | 5.63k | _file_reader = _segment->_file_reader; |
358 | 5.63k | _col_predicates.clear(); |
359 | | |
360 | 5.63k | for (const auto& predicate : opts.column_predicates) { |
361 | 0 | if (!_segment->can_apply_predicate_safely(predicate->column_id(), *_schema, |
362 | 0 | _opts.target_cast_type_for_variants, _opts)) { |
363 | 0 | continue; |
364 | 0 | } |
365 | 0 | _col_predicates.emplace_back(predicate); |
366 | 0 | } |
367 | 5.63k | _tablet_id = opts.tablet_id; |
368 | | // Read options will not change, so that just resize here |
369 | 5.63k | _block_rowids.resize(_opts.block_row_max); |
370 | | |
371 | 5.63k | _remaining_conjunct_roots = opts.remaining_conjunct_roots; |
372 | | |
373 | 5.63k | if (_schema->rowid_col_idx() > 0) { |
374 | 0 | _record_rowids = true; |
375 | 0 | } |
376 | | |
377 | 5.63k | _virtual_column_exprs = _opts.virtual_column_exprs; |
378 | 5.63k | _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block; |
379 | 5.63k | _score_runtime = _opts.score_runtime; |
380 | 5.63k | _ann_topn_runtime = _opts.ann_topn_runtime; |
381 | | |
382 | 5.63k | if (opts.output_columns != nullptr) { |
383 | 1.29k | _output_columns = *(opts.output_columns); |
384 | 1.29k | } |
385 | | |
386 | 5.63k | _storage_name_and_type.resize(_schema->columns().size()); |
387 | 5.63k | auto storage_format = _opts.tablet_schema->get_inverted_index_storage_format(); |
388 | 34.2k | for (int i = 0; i < _schema->columns().size(); ++i) { |
389 | 28.6k | const StorageField* col = _schema->column(i); |
390 | 28.6k | if (col) { |
391 | 12.6k | auto storage_type = _segment->get_data_type_of(col->get_desc(), _opts); |
392 | 12.6k | if (storage_type == nullptr) { |
393 | 0 | storage_type = DataTypeFactory::instance().create_data_type(col->get_desc(), |
394 | 0 | col->is_nullable()); |
395 | 0 | } |
396 | | // Currently, when writing a lucene index, the field of the document is column_name, and the column name is |
397 | | // bound to the index field. Since version 1.2, the data file storage has been changed from column_name to |
398 | | // column_unique_id, allowing the column name to be changed. Due to current limitations, previous inverted |
399 | | // index data cannot be used after Doris changes the column name. Column names also support Unicode |
400 | | // characters, which may cause other problems with indexing in non-ASCII characters. |
401 | | // After consideration, it was decided to change the field name from column_name to column_unique_id in |
402 | | // format V2, while format V1 continues to use column_name. |
403 | 12.6k | std::string field_name; |
404 | 12.6k | if (storage_format == InvertedIndexStorageFormatPB::V1) { |
405 | 9.98k | field_name = col->name(); |
406 | 9.98k | } else { |
407 | 2.68k | if (col->is_extracted_column()) { |
408 | | // variant sub col |
409 | | // field_name format: parent_unique_id.sub_col_name |
410 | 0 | field_name = std::to_string(col->parent_unique_id()) + "." + col->name(); |
411 | 2.68k | } else { |
412 | 2.68k | field_name = std::to_string(col->unique_id()); |
413 | 2.68k | } |
414 | 2.68k | } |
415 | 12.6k | _storage_name_and_type[i] = std::make_pair(field_name, storage_type); |
416 | 12.6k | if (int32_t uid = col->get_unique_id(); !_variant_sparse_column_cache.contains(uid)) { |
417 | 12.6k | DCHECK(uid >= 0); |
418 | 12.6k | _variant_sparse_column_cache.emplace(uid, |
419 | 12.6k | std::make_unique<PathToBinaryColumnCache>()); |
420 | 12.6k | } |
421 | 12.6k | } |
422 | 28.6k | } |
423 | | |
424 | 5.63k | RETURN_IF_ERROR(init_iterators()); |
425 | | |
426 | 5.63k | RETURN_IF_ERROR(_construct_compound_expr_context()); |
427 | 5.63k | _enable_common_expr_pushdown = !_common_expr_ctxs_push_down.empty(); |
428 | 5.63k | VLOG_DEBUG << fmt::format( |
429 | 0 | "Segment iterator init, virtual_column_exprs size: {}, " |
430 | 0 | "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}", |
431 | 0 | _opts.virtual_column_exprs.size(), _opts.vir_cid_to_idx_in_block.size(), |
432 | 0 | _common_expr_ctxs_push_down.size()); |
433 | 5.63k | _initialize_predicate_results(); |
434 | 5.63k | return Status::OK(); |
435 | 5.63k | } |
436 | | |
437 | 5.63k | void SegmentIterator::_initialize_predicate_results() { |
438 | | // Initialize from _col_predicates |
439 | 5.63k | for (auto pred : _col_predicates) { |
440 | 0 | int cid = pred->column_id(); |
441 | 0 | _column_predicate_index_exec_status[cid][pred] = false; |
442 | 0 | } |
443 | | |
444 | 5.63k | _calculate_expr_in_remaining_conjunct_root(); |
445 | 5.63k | } |
446 | | |
447 | 5.63k | Status SegmentIterator::init_iterators() { |
448 | 5.63k | RETURN_IF_ERROR(_init_return_column_iterators()); |
449 | 5.63k | RETURN_IF_ERROR(_init_index_iterators()); |
450 | 5.63k | return Status::OK(); |
451 | 5.63k | } |
452 | | |
453 | 23.1k | Status SegmentIterator::_lazy_init(Block* block) { |
454 | 23.1k | if (_lazy_inited) { |
455 | 17.4k | return Status::OK(); |
456 | 17.4k | } |
457 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->block_init_ns); |
458 | 5.63k | DorisMetrics::instance()->segment_read_total->increment(1); |
459 | 5.63k | _row_bitmap.addRange(0, _segment->num_rows()); |
460 | 5.63k | _init_row_bitmap_by_condition_cache(); |
461 | | |
462 | | // z-order can not use prefix index |
463 | 5.63k | if (_segment->_tablet_schema->sort_type() != SortType::ZORDER && |
464 | 5.63k | _segment->_tablet_schema->cluster_key_uids().empty()) { |
465 | 5.63k | RETURN_IF_ERROR(_get_row_ranges_by_keys()); |
466 | 5.63k | } |
467 | 5.63k | RETURN_IF_ERROR(_get_row_ranges_by_column_conditions()); |
468 | 5.63k | RETURN_IF_ERROR(_vec_init_lazy_materialization()); |
469 | | // Remove rows that have been marked deleted |
470 | 5.63k | if (_opts.delete_bitmap.count(segment_id()) > 0 && |
471 | 5.63k | _opts.delete_bitmap.at(segment_id()) != nullptr) { |
472 | 25 | size_t pre_size = _row_bitmap.cardinality(); |
473 | 25 | _row_bitmap -= *(_opts.delete_bitmap.at(segment_id())); |
474 | 25 | _opts.stats->rows_del_by_bitmap += (pre_size - _row_bitmap.cardinality()); |
475 | 25 | VLOG_DEBUG << "read on segment: " << segment_id() << ", delete bitmap cardinality: " |
476 | 0 | << _opts.delete_bitmap.at(segment_id())->cardinality() << ", " |
477 | 0 | << _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap"; |
478 | 25 | } |
479 | | |
480 | 5.63k | if (!_opts.row_ranges.is_empty()) { |
481 | 0 | _row_bitmap &= RowRanges::ranges_to_roaring(_opts.row_ranges); |
482 | 0 | } |
483 | | |
484 | 5.63k | _prepare_score_column_materialization(); |
485 | | |
486 | 5.63k | RETURN_IF_ERROR(_apply_ann_topn_predicate()); |
487 | | |
488 | 5.63k | if (_opts.read_orderby_key_reverse) { |
489 | 0 | _range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap)); |
490 | 5.63k | } else { |
491 | 5.63k | _range_iter.reset(new BitmapRangeIterator(_row_bitmap)); |
492 | 5.63k | } |
493 | | |
494 | | // If the row bitmap size is smaller than block_row_max, there's no need to reserve that many column rows. |
495 | 5.63k | auto nrows_reserve_limit = std::min(_row_bitmap.cardinality(), uint64_t(_opts.block_row_max)); |
496 | 5.63k | if (_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval) { |
497 | 894 | _block_rowids.resize(_opts.block_row_max); |
498 | 894 | } |
499 | 5.63k | _current_return_columns.resize(_schema->columns().size()); |
500 | | |
501 | 5.63k | _vec_init_char_column_id(block); |
502 | 18.3k | for (size_t i = 0; i < _schema->column_ids().size(); i++) { |
503 | 12.6k | ColumnId cid = _schema->column_ids()[i]; |
504 | 12.6k | const auto* column_desc = _schema->column(cid); |
505 | 12.6k | if (_is_pred_column[cid]) { |
506 | 467 | auto storage_column_type = _storage_name_and_type[cid].second; |
507 | | // Char type is special , since char type's computational datatype is same with string, |
508 | | // both are DataTypeString, but DataTypeString only return FieldType::OLAP_FIELD_TYPE_STRING |
509 | | // in get_storage_field_type. |
510 | 467 | RETURN_IF_CATCH_EXCEPTION( |
511 | | // Here, cid will not go out of bounds |
512 | | // because the size of _current_return_columns equals _schema->tablet_columns().size() |
513 | 467 | _current_return_columns[cid] = Schema::get_predicate_column_ptr( |
514 | 467 | _is_char_type[cid] ? FieldType::OLAP_FIELD_TYPE_CHAR |
515 | 467 | : storage_column_type->get_storage_field_type(), |
516 | 467 | storage_column_type->is_nullable(), _opts.io_ctx.reader_type)); |
517 | 467 | _current_return_columns[cid]->set_rowset_segment_id( |
518 | 467 | {_segment->rowset_id(), _segment->id()}); |
519 | 467 | _current_return_columns[cid]->reserve(nrows_reserve_limit); |
520 | 12.2k | } else if (i >= block->columns()) { |
521 | | // This column needs to be scanned, but doesn't need to be returned upward. (delete sign) |
522 | | // if i >= block->columns means the column and not the pred_column means `column i` is |
523 | | // a delete condition column. but the column is not effective in the segment. so we just |
524 | | // create a column to hold the data. |
525 | | // a. origin data -> b. delete condition -> c. new load data |
526 | | // the segment of c do not effective delete condition, but it still need read the column |
527 | | // to match the schema. |
528 | | // TODO: skip read the not effective delete column to speed up segment read. |
529 | 0 | _current_return_columns[cid] = Schema::get_data_type_ptr(*column_desc)->create_column(); |
530 | 0 | _current_return_columns[cid]->reserve(nrows_reserve_limit); |
531 | 0 | } |
532 | 12.6k | } |
533 | | |
534 | | // Additional deleted filter condition will be materialized column be at the end of the block, |
535 | | // after _output_column_by_sel_idx will be erase, we not need to filter it, |
536 | | // so erase it from _columns_to_filter in the first next_batch. |
537 | | // Eg: |
538 | | // `delete from table where a = 10;` |
539 | | // `select b from table;` |
540 | | // a column only effective in segment iterator, the block from query engine only contain the b column, |
541 | | // so no need to filter a column by expr. |
542 | 5.63k | for (auto it = _columns_to_filter.begin(); it != _columns_to_filter.end();) { |
543 | 0 | if (*it >= block->columns()) { |
544 | 0 | it = _columns_to_filter.erase(it); |
545 | 0 | } else { |
546 | 0 | ++it; |
547 | 0 | } |
548 | 0 | } |
549 | | |
550 | 5.63k | _lazy_inited = true; |
551 | | |
552 | 5.63k | _init_segment_prefetchers(); |
553 | | |
554 | 5.63k | return Status::OK(); |
555 | 5.63k | } |
556 | | |
557 | 5.63k | void SegmentIterator::_init_segment_prefetchers() { |
558 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns); |
559 | 5.63k | if (!config::is_cloud_mode()) { |
560 | 5.63k | return; |
561 | 5.63k | } |
562 | 0 | static std::vector<ReaderType> supported_reader_types { |
563 | 0 | ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION, |
564 | 0 | ReaderType::READER_CUMULATIVE_COMPACTION, ReaderType::READER_FULL_COMPACTION}; |
565 | 0 | if (std::ranges::none_of(supported_reader_types, |
566 | 0 | [&](ReaderType t) { return _opts.io_ctx.reader_type == t; })) { |
567 | 0 | return; |
568 | 0 | } |
569 | | // Initialize segment prefetcher for predicate and non-predicate columns |
570 | 0 | bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY); |
571 | 0 | bool enable_prefetch = is_query ? config::enable_query_segment_file_cache_prefetch |
572 | 0 | : config::enable_compaction_segment_file_cache_prefetch; |
573 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
574 | 0 | "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, enable_prefetch={}, " |
575 | 0 | "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, rowset={}, " |
576 | 0 | "segment={}, predicate_column_ids={}, common_expr_column_ids={}", |
577 | 0 | is_query, enable_prefetch, _row_bitmap.isEmpty(), _row_bitmap.cardinality(), |
578 | 0 | _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), |
579 | 0 | fmt::join(_predicate_column_ids, ","), fmt::join(_common_expr_column_ids, ",")); |
580 | 0 | if (enable_prefetch && !_row_bitmap.isEmpty()) { |
581 | 0 | int window_size = |
582 | 0 | 1 + (is_query ? config::query_segment_file_cache_prefetch_block_size |
583 | 0 | : config::compaction_segment_file_cache_prefetch_block_size); |
584 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
585 | 0 | "[verbose] SegmentIterator prefetch config: window_size={}", window_size); |
586 | 0 | if (window_size > 0 && |
587 | 0 | !_column_iterators.empty()) { // ensure init_iterators has been called |
588 | 0 | SegmentPrefetcherConfig prefetch_config(window_size, |
589 | 0 | config::file_cache_each_block_size); |
590 | 0 | for (auto cid : _schema->column_ids()) { |
591 | 0 | auto& column_iter = _column_iterators[cid]; |
592 | 0 | if (column_iter == nullptr) { |
593 | 0 | continue; |
594 | 0 | } |
595 | 0 | const auto* tablet_column = _schema->column(cid); |
596 | 0 | SegmentPrefetchParams params { |
597 | 0 | .config = prefetch_config, |
598 | 0 | .read_options = _opts, |
599 | 0 | }; |
600 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
601 | 0 | "[verbose] SegmentIterator init_segment_prefetchers, " |
602 | 0 | "tablet={}, rowset={}, segment={}, column_id={}, col_name={}, type={}", |
603 | 0 | _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), cid, |
604 | 0 | tablet_column->name(), tablet_column->type()); |
605 | 0 | Status st = column_iter->init_prefetcher(params); |
606 | 0 | if (!st.ok()) { |
607 | 0 | LOG_IF(WARNING, config::enable_segment_prefetch_verbose_log) << fmt::format( |
608 | 0 | "[verbose] failed to init prefetcher for column_id={}, " |
609 | 0 | "tablet={}, rowset={}, segment={}, error={}", |
610 | 0 | cid, _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), |
611 | 0 | st.to_string()); |
612 | 0 | } |
613 | 0 | } |
614 | | |
615 | | // for compaction, it's guaranteed that all rows are read, so we can prefetch all data blocks |
616 | 0 | PrefetcherInitMethod init_method = (is_query && _row_bitmap.cardinality() < num_rows()) |
617 | 0 | ? PrefetcherInitMethod::FROM_ROWIDS |
618 | 0 | : PrefetcherInitMethod::ALL_DATA_BLOCKS; |
619 | 0 | std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>> prefetchers; |
620 | 0 | for (const auto& column_iter : _column_iterators) { |
621 | 0 | if (column_iter != nullptr) { |
622 | 0 | column_iter->collect_prefetchers(prefetchers, init_method); |
623 | 0 | } |
624 | 0 | } |
625 | 0 | for (auto& [method, prefetcher_vec] : prefetchers) { |
626 | 0 | if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) { |
627 | 0 | for (auto* prefetcher : prefetcher_vec) { |
628 | 0 | prefetcher->build_all_data_blocks(); |
629 | 0 | } |
630 | 0 | } else if (method == PrefetcherInitMethod::FROM_ROWIDS && !prefetcher_vec.empty()) { |
631 | 0 | SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap, prefetcher_vec); |
632 | 0 | } |
633 | 0 | } |
634 | 0 | } |
635 | 0 | } |
636 | 0 | } |
637 | | |
638 | 5.63k | Status SegmentIterator::_get_row_ranges_by_keys() { |
639 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns); |
640 | 5.63k | DorisMetrics::instance()->segment_row_total->increment(num_rows()); |
641 | | |
642 | | // fast path for empty segment or empty key ranges |
643 | 5.63k | if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) { |
644 | 5.63k | return Status::OK(); |
645 | 5.63k | } |
646 | | |
647 | | // Read & seek key columns is a waste of time when no key column in _schema |
648 | 0 | if (std::none_of( |
649 | 0 | _schema->columns().begin(), _schema->columns().end(), [&](const StorageField* col) { |
650 | 0 | return col && _opts.tablet_schema->column_by_uid(col->unique_id()).is_key(); |
651 | 0 | })) { |
652 | 0 | return Status::OK(); |
653 | 0 | } |
654 | | |
655 | 0 | RowRanges result_ranges; |
656 | 0 | for (auto& key_range : _opts.key_ranges) { |
657 | 0 | rowid_t lower_rowid = 0; |
658 | 0 | rowid_t upper_rowid = num_rows(); |
659 | 0 | RETURN_IF_ERROR(_prepare_seek(key_range)); |
660 | 0 | if (key_range.upper_key != nullptr) { |
661 | | // If client want to read upper_bound, the include_upper is true. So we |
662 | | // should get the first ordinal at which key is larger than upper_bound. |
663 | | // So we call _lookup_ordinal with include_upper's negate |
664 | 0 | RETURN_IF_ERROR(_lookup_ordinal(*key_range.upper_key, !key_range.include_upper, |
665 | 0 | num_rows(), &upper_rowid)); |
666 | 0 | } |
667 | 0 | if (upper_rowid > 0 && key_range.lower_key != nullptr) { |
668 | 0 | RETURN_IF_ERROR(_lookup_ordinal(*key_range.lower_key, key_range.include_lower, |
669 | 0 | upper_rowid, &lower_rowid)); |
670 | 0 | } |
671 | 0 | auto row_range = RowRanges::create_single(lower_rowid, upper_rowid); |
672 | 0 | RowRanges::ranges_union(result_ranges, row_range, &result_ranges); |
673 | 0 | } |
674 | 0 | size_t pre_size = _row_bitmap.cardinality(); |
675 | 0 | _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges); |
676 | 0 | _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality()); |
677 | |
|
678 | 0 | return Status::OK(); |
679 | 0 | } |
680 | | |
681 | | // Set up environment for the following seek. |
682 | 0 | Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_range) { |
683 | 0 | std::vector<const StorageField*> key_fields; |
684 | 0 | std::set<uint32_t> column_set; |
685 | 0 | if (key_range.lower_key != nullptr) { |
686 | 0 | for (auto cid : key_range.lower_key->schema()->column_ids()) { |
687 | 0 | column_set.emplace(cid); |
688 | 0 | key_fields.emplace_back(key_range.lower_key->column_schema(cid)); |
689 | 0 | } |
690 | 0 | } |
691 | 0 | if (key_range.upper_key != nullptr) { |
692 | 0 | for (auto cid : key_range.upper_key->schema()->column_ids()) { |
693 | 0 | if (column_set.count(cid) == 0) { |
694 | 0 | key_fields.emplace_back(key_range.upper_key->column_schema(cid)); |
695 | 0 | column_set.emplace(cid); |
696 | 0 | } |
697 | 0 | } |
698 | 0 | } |
699 | 0 | if (!_seek_schema) { |
700 | 0 | _seek_schema = std::make_unique<Schema>(key_fields, key_fields.size()); |
701 | 0 | } |
702 | | // todo(wb) need refactor here, when using pk to search, _seek_block is useless |
703 | 0 | if (_seek_block.size() == 0) { |
704 | 0 | _seek_block.resize(_seek_schema->num_column_ids()); |
705 | 0 | int i = 0; |
706 | 0 | for (auto cid : _seek_schema->column_ids()) { |
707 | 0 | auto column_desc = _seek_schema->column(cid); |
708 | 0 | _seek_block[i] = Schema::get_column_by_field(*column_desc); |
709 | 0 | i++; |
710 | 0 | } |
711 | 0 | } |
712 | | |
713 | | // create used column iterator |
714 | 0 | for (auto cid : _seek_schema->column_ids()) { |
715 | 0 | if (_column_iterators[cid] == nullptr) { |
716 | | // TODO: Do we need this? |
717 | 0 | if (_virtual_column_exprs.contains(cid)) { |
718 | 0 | _column_iterators[cid] = std::make_unique<VirtualColumnIterator>(); |
719 | 0 | continue; |
720 | 0 | } |
721 | | |
722 | 0 | RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid), |
723 | 0 | &_column_iterators[cid], &_opts, |
724 | 0 | &_variant_sparse_column_cache)); |
725 | 0 | ColumnIteratorOptions iter_opts { |
726 | 0 | .use_page_cache = _opts.use_page_cache, |
727 | 0 | .file_reader = _file_reader.get(), |
728 | 0 | .stats = _opts.stats, |
729 | 0 | .io_ctx = _opts.io_ctx, |
730 | 0 | }; |
731 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); |
732 | 0 | } |
733 | 0 | } |
734 | | |
735 | 0 | return Status::OK(); |
736 | 0 | } |
737 | | |
738 | 5.63k | Status SegmentIterator::_get_row_ranges_by_column_conditions() { |
739 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_column_conditions_ns); |
740 | 5.63k | if (_row_bitmap.isEmpty()) { |
741 | 0 | return Status::OK(); |
742 | 0 | } |
743 | | |
744 | 5.63k | { |
745 | 5.63k | if (_opts.runtime_state && |
746 | 5.63k | _opts.runtime_state->query_options().enable_inverted_index_query && |
747 | 5.63k | (has_index_in_iterators() || !_common_expr_ctxs_push_down.empty())) { |
748 | 0 | SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer); |
749 | 0 | size_t input_rows = _row_bitmap.cardinality(); |
750 | | // Only apply column-level inverted index if we have iterators |
751 | 0 | if (has_index_in_iterators()) { |
752 | 0 | RETURN_IF_ERROR(_apply_inverted_index()); |
753 | 0 | } |
754 | | // Always apply expr-level index (e.g., search expressions) if we have common_expr_pushdown |
755 | | // This allows search expressions with variant subcolumns to be evaluated even when |
756 | | // the segment doesn't have all subcolumns |
757 | 0 | RETURN_IF_ERROR(_apply_index_expr()); |
758 | 0 | for (auto it = _common_expr_ctxs_push_down.begin(); |
759 | 0 | it != _common_expr_ctxs_push_down.end();) { |
760 | 0 | if ((*it)->all_expr_inverted_index_evaluated()) { |
761 | 0 | const auto* result = (*it)->get_index_context()->get_index_result_for_expr( |
762 | 0 | (*it)->root().get()); |
763 | 0 | if (result != nullptr) { |
764 | 0 | _row_bitmap &= *result->get_data_bitmap(); |
765 | 0 | auto root = (*it)->root(); |
766 | 0 | auto iter_find = std::find(_remaining_conjunct_roots.begin(), |
767 | 0 | _remaining_conjunct_roots.end(), root); |
768 | 0 | if (iter_find != _remaining_conjunct_roots.end()) { |
769 | 0 | _remaining_conjunct_roots.erase(iter_find); |
770 | 0 | } |
771 | 0 | it = _common_expr_ctxs_push_down.erase(it); |
772 | 0 | } |
773 | 0 | } else { |
774 | 0 | ++it; |
775 | 0 | } |
776 | 0 | } |
777 | 0 | _opts.condition_cache_digest = |
778 | 0 | _common_expr_ctxs_push_down.empty() ? 0 : _opts.condition_cache_digest; |
779 | 0 | _opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality()); |
780 | 0 | for (auto cid : _schema->column_ids()) { |
781 | 0 | bool result_true = _check_all_conditions_passed_inverted_index_for_column(cid); |
782 | 0 | if (result_true) { |
783 | 0 | _need_read_data_indices[cid] = false; |
784 | 0 | } |
785 | 0 | } |
786 | 0 | } |
787 | 5.63k | } |
788 | | |
789 | 5.63k | DBUG_EXECUTE_IF("segment_iterator.inverted_index.filtered_rows", { |
790 | 5.63k | LOG(INFO) << "Debug Point: segment_iterator.inverted_index.filtered_rows: " |
791 | 5.63k | << _opts.stats->rows_inverted_index_filtered; |
792 | 5.63k | auto filtered_rows = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
793 | 5.63k | "segment_iterator.inverted_index.filtered_rows", "filtered_rows", -1); |
794 | 5.63k | if (filtered_rows != _opts.stats->rows_inverted_index_filtered) { |
795 | 5.63k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
796 | 5.63k | "filtered_rows: {} not equal to expected: {}", |
797 | 5.63k | _opts.stats->rows_inverted_index_filtered, filtered_rows); |
798 | 5.63k | } |
799 | 5.63k | }) |
800 | | |
801 | 5.63k | DBUG_EXECUTE_IF("segment_iterator.apply_inverted_index", { |
802 | 5.63k | LOG(INFO) << "Debug Point: segment_iterator.apply_inverted_index"; |
803 | 5.63k | if (!_common_expr_ctxs_push_down.empty() || !_col_predicates.empty()) { |
804 | 5.63k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
805 | 5.63k | "it is failed to apply inverted index, common_expr_ctxs_push_down: {}, " |
806 | 5.63k | "col_predicates: {}", |
807 | 5.63k | _common_expr_ctxs_push_down.size(), _col_predicates.size()); |
808 | 5.63k | } |
809 | 5.63k | }) |
810 | | |
811 | 5.63k | if (!_row_bitmap.isEmpty() && |
812 | 5.63k | (!_opts.topn_filter_source_node_ids.empty() || !_opts.col_id_to_predicates.empty() || |
813 | 5.63k | _opts.delete_condition_predicates->num_of_column_predicate() > 0)) { |
814 | 467 | RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows()); |
815 | 467 | RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges)); |
816 | 467 | size_t pre_size = _row_bitmap.cardinality(); |
817 | 467 | _row_bitmap &= RowRanges::ranges_to_roaring(condition_row_ranges); |
818 | 467 | _opts.stats->rows_conditions_filtered += (pre_size - _row_bitmap.cardinality()); |
819 | 467 | } |
820 | | |
821 | 5.63k | DBUG_EXECUTE_IF("bloom_filter_must_filter_data", { |
822 | 5.63k | if (_opts.stats->rows_bf_filtered == 0) { |
823 | 5.63k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
824 | 5.63k | "Bloom filter did not filter the data."); |
825 | 5.63k | } |
826 | 5.63k | }) |
827 | | |
828 | | // TODO(hkp): calculate filter rate to decide whether to |
829 | | // use zone map/bloom filter/secondary index or not. |
830 | 5.63k | return Status::OK(); |
831 | 5.63k | } |
832 | | |
833 | 0 | bool SegmentIterator::_column_has_ann_index(int32_t cid) { |
834 | 0 | bool has_ann_index = _index_iterators[cid] != nullptr && |
835 | 0 | _index_iterators[cid]->get_reader(AnnIndexReaderType::ANN); |
836 | |
|
837 | 0 | return has_ann_index; |
838 | 0 | } |
839 | | |
840 | 5.63k | Status SegmentIterator::_apply_ann_topn_predicate() { |
841 | 5.63k | if (_ann_topn_runtime == nullptr) { |
842 | 5.63k | return Status::OK(); |
843 | 5.63k | } |
844 | | |
845 | 0 | VLOG_DEBUG << fmt::format("Try apply ann topn: {}", _ann_topn_runtime->debug_string()); |
846 | 0 | size_t src_col_idx = _ann_topn_runtime->get_src_column_idx(); |
847 | 0 | ColumnId src_cid = _schema->column_id(src_col_idx); |
848 | 0 | IndexIterator* ann_index_iterator = _index_iterators[src_cid].get(); |
849 | 0 | bool has_ann_index = _column_has_ann_index(src_cid); |
850 | 0 | bool has_common_expr_push_down = !_common_expr_ctxs_push_down.empty(); |
851 | 0 | bool has_column_predicate = std::any_of(_is_pred_column.begin(), _is_pred_column.end(), |
852 | 0 | [](bool is_pred) { return is_pred; }); |
853 | 0 | if (!has_ann_index || has_common_expr_push_down || has_column_predicate) { |
854 | 0 | VLOG_DEBUG << fmt::format( |
855 | 0 | "Ann topn can not be evaluated by ann index, has_ann_index: {}, " |
856 | 0 | "has_common_expr_push_down: {}, has_column_predicate: {}", |
857 | 0 | has_ann_index, has_common_expr_push_down, has_column_predicate); |
858 | | // Disable index-only scan on ann indexed column. |
859 | 0 | _need_read_data_indices[src_cid] = true; |
860 | 0 | return Status::OK(); |
861 | 0 | } |
862 | | |
863 | | // Process asc & desc according to the type of metric |
864 | 0 | auto index_reader = ann_index_iterator->get_reader(AnnIndexReaderType::ANN); |
865 | 0 | auto ann_index_reader = dynamic_cast<AnnIndexReader*>(index_reader.get()); |
866 | 0 | DCHECK(ann_index_reader != nullptr); |
867 | 0 | if (ann_index_reader->get_metric_type() == AnnIndexMetric::IP) { |
868 | 0 | if (_ann_topn_runtime->is_asc()) { |
869 | 0 | VLOG_DEBUG << fmt::format( |
870 | 0 | "Asc topn for inner product can not be evaluated by ann index"); |
871 | | // Disable index-only scan on ann indexed column. |
872 | 0 | _need_read_data_indices[src_cid] = true; |
873 | 0 | return Status::OK(); |
874 | 0 | } |
875 | 0 | } else { |
876 | 0 | if (!_ann_topn_runtime->is_asc()) { |
877 | 0 | VLOG_DEBUG << fmt::format("Desc topn for l2/cosine can not be evaluated by ann index"); |
878 | | // Disable index-only scan on ann indexed column. |
879 | 0 | _need_read_data_indices[src_cid] = true; |
880 | 0 | return Status::OK(); |
881 | 0 | } |
882 | 0 | } |
883 | | |
884 | 0 | if (ann_index_reader->get_metric_type() != _ann_topn_runtime->get_metric_type()) { |
885 | 0 | VLOG_DEBUG << fmt::format( |
886 | 0 | "Ann topn metric type {} not match index metric type {}, can not be evaluated by " |
887 | 0 | "ann index", |
888 | 0 | metric_to_string(_ann_topn_runtime->get_metric_type()), |
889 | 0 | metric_to_string(ann_index_reader->get_metric_type())); |
890 | | // Disable index-only scan on ann indexed column. |
891 | 0 | _need_read_data_indices[src_cid] = true; |
892 | 0 | return Status::OK(); |
893 | 0 | } |
894 | | |
895 | 0 | size_t pre_size = _row_bitmap.cardinality(); |
896 | 0 | size_t rows_of_segment = _segment->num_rows(); |
897 | 0 | if (static_cast<double>(pre_size) < static_cast<double>(rows_of_segment) * 0.3) { |
898 | 0 | VLOG_DEBUG << fmt::format( |
899 | 0 | "Ann topn predicate input rows {} < 30% of segment rows {}, will not use ann index " |
900 | 0 | "to " |
901 | 0 | "filter", |
902 | 0 | pre_size, rows_of_segment); |
903 | | // Disable index-only scan on ann indexed column. |
904 | 0 | _need_read_data_indices[src_cid] = true; |
905 | 0 | return Status::OK(); |
906 | 0 | } |
907 | 0 | IColumn::MutablePtr result_column; |
908 | 0 | std::unique_ptr<std::vector<uint64_t>> result_row_ids; |
909 | 0 | segment_v2::AnnIndexStats ann_index_stats; |
910 | 0 | RETURN_IF_ERROR(_ann_topn_runtime->evaluate_vector_ann_search(ann_index_iterator, &_row_bitmap, |
911 | 0 | rows_of_segment, result_column, |
912 | 0 | result_row_ids, ann_index_stats)); |
913 | | |
914 | 0 | VLOG_DEBUG << fmt::format("Ann topn filtered {} - {} = {} rows", pre_size, |
915 | 0 | _row_bitmap.cardinality(), pre_size - _row_bitmap.cardinality()); |
916 | |
|
917 | 0 | int64_t rows_filterd = pre_size - _row_bitmap.cardinality(); |
918 | 0 | _opts.stats->rows_ann_index_topn_filtered += rows_filterd; |
919 | 0 | _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value(); |
920 | 0 | _opts.stats->ann_topn_search_ns += ann_index_stats.search_costs_ns.value(); |
921 | 0 | _opts.stats->ann_index_topn_engine_search_ns += ann_index_stats.engine_search_ns.value(); |
922 | 0 | _opts.stats->ann_index_topn_result_process_ns += |
923 | 0 | ann_index_stats.result_process_costs_ns.value(); |
924 | 0 | _opts.stats->ann_index_topn_engine_convert_ns += ann_index_stats.engine_convert_ns.value(); |
925 | 0 | _opts.stats->ann_index_topn_engine_prepare_ns += ann_index_stats.engine_prepare_ns.value(); |
926 | 0 | _opts.stats->ann_index_topn_search_cnt += 1; |
927 | 0 | const size_t dst_col_idx = _ann_topn_runtime->get_dest_column_idx(); |
928 | 0 | ColumnIterator* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get(); |
929 | 0 | DCHECK(column_iter != nullptr); |
930 | 0 | VirtualColumnIterator* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter); |
931 | 0 | DCHECK(virtual_column_iter != nullptr); |
932 | 0 | VLOG_DEBUG << fmt::format( |
933 | 0 | "Virtual column iterator, column_idx {}, is materialized with {} rows", dst_col_idx, |
934 | 0 | result_row_ids->size()); |
935 | | // reference count of result_column should be 1, so move will not issue any data copy. |
936 | 0 | virtual_column_iter->prepare_materialization(std::move(result_column), |
937 | 0 | std::move(result_row_ids)); |
938 | |
|
939 | 0 | _need_read_data_indices[src_cid] = false; |
940 | 0 | VLOG_DEBUG << fmt::format( |
941 | 0 | "Enable ANN index-only scan for src column cid {} (skip reading data pages)", src_cid); |
942 | |
|
943 | 0 | return Status::OK(); |
944 | 0 | } |
945 | | |
946 | 467 | Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row_ranges) { |
947 | 467 | std::set<int32_t> cids; |
948 | 467 | for (auto& entry : _opts.col_id_to_predicates) { |
949 | 0 | cids.insert(entry.first); |
950 | 0 | } |
951 | | |
952 | 467 | { |
953 | 467 | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_dict_ns); |
954 | | /// Low cardinality optimization is currently not very stable, so to prevent data corruption, |
955 | | /// we are temporarily disabling its use in data compaction. |
956 | | // TODO: enable it in not only ReaderTyper::READER_QUERY but also other reader types. |
957 | 467 | if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) { |
958 | 0 | RowRanges dict_row_ranges = RowRanges::create_single(num_rows()); |
959 | 0 | for (auto cid : cids) { |
960 | 0 | if (!_segment->can_apply_predicate_safely( |
961 | 0 | cid, *_schema, _opts.target_cast_type_for_variants, _opts)) { |
962 | 0 | continue; |
963 | 0 | } |
964 | 0 | DCHECK(_opts.col_id_to_predicates.count(cid) > 0); |
965 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict( |
966 | 0 | _opts.col_id_to_predicates.at(cid).get(), &dict_row_ranges)); |
967 | | |
968 | 0 | if (dict_row_ranges.is_empty()) { |
969 | 0 | break; |
970 | 0 | } |
971 | 0 | } |
972 | | |
973 | 0 | if (dict_row_ranges.is_empty()) { |
974 | 0 | RowRanges::ranges_intersection(*condition_row_ranges, dict_row_ranges, |
975 | 0 | condition_row_ranges); |
976 | 0 | _opts.stats->segment_dict_filtered++; |
977 | 0 | _opts.stats->filtered_segment_number++; |
978 | 0 | return Status::OK(); |
979 | 0 | } |
980 | 0 | } |
981 | 467 | } |
982 | | |
983 | 467 | size_t pre_size = 0; |
984 | 467 | { |
985 | 467 | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_bf_ns); |
986 | | // first filter data by bloom filter index |
987 | | // bloom filter index only use CondColumn |
988 | 467 | RowRanges bf_row_ranges = RowRanges::create_single(num_rows()); |
989 | 467 | for (auto& cid : cids) { |
990 | 0 | DCHECK(_opts.col_id_to_predicates.count(cid) > 0); |
991 | 0 | if (!_segment->can_apply_predicate_safely(cid, *_schema, |
992 | 0 | _opts.target_cast_type_for_variants, _opts)) { |
993 | 0 | continue; |
994 | 0 | } |
995 | | // get row ranges by bf index of this column, |
996 | 0 | RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows()); |
997 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter( |
998 | 0 | _opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges)); |
999 | 0 | RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges); |
1000 | 0 | } |
1001 | | |
1002 | 467 | pre_size = condition_row_ranges->count(); |
1003 | 467 | RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges); |
1004 | 467 | _opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count()); |
1005 | 467 | } |
1006 | | |
1007 | 0 | { |
1008 | 467 | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_zonemap_ns); |
1009 | 467 | RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows()); |
1010 | | // second filter data by zone map |
1011 | 467 | for (const auto& cid : cids) { |
1012 | 0 | DCHECK(_opts.col_id_to_predicates.count(cid) > 0); |
1013 | 0 | if (!_segment->can_apply_predicate_safely(cid, *_schema, |
1014 | 0 | _opts.target_cast_type_for_variants, _opts)) { |
1015 | 0 | continue; |
1016 | 0 | } |
1017 | | // do not check zonemap if predicate does not support zonemap |
1018 | 0 | if (!_opts.col_id_to_predicates.at(cid)->support_zonemap()) { |
1019 | 0 | VLOG_DEBUG << "skip zonemap for column " << cid; |
1020 | 0 | continue; |
1021 | 0 | } |
1022 | | // get row ranges by zone map of this column, |
1023 | 0 | RowRanges column_row_ranges = RowRanges::create_single(num_rows()); |
1024 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map( |
1025 | 0 | _opts.col_id_to_predicates.at(cid).get(), |
1026 | 0 | _opts.del_predicates_for_zone_map.count(cid) > 0 |
1027 | 0 | ? &(_opts.del_predicates_for_zone_map.at(cid)) |
1028 | 0 | : nullptr, |
1029 | 0 | &column_row_ranges)); |
1030 | | // intersect different columns's row ranges to get final row ranges by zone map |
1031 | 0 | RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges, |
1032 | 0 | &zone_map_row_ranges); |
1033 | 0 | } |
1034 | | |
1035 | 467 | pre_size = condition_row_ranges->count(); |
1036 | 467 | RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, |
1037 | 467 | condition_row_ranges); |
1038 | | |
1039 | 467 | size_t pre_size2 = condition_row_ranges->count(); |
1040 | 467 | RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, |
1041 | 467 | condition_row_ranges); |
1042 | 467 | _opts.stats->rows_stats_rp_filtered += (pre_size2 - condition_row_ranges->count()); |
1043 | 467 | _opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count()); |
1044 | 467 | } |
1045 | | |
1046 | 0 | return Status::OK(); |
1047 | 467 | } |
1048 | | |
1049 | 0 | bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) { |
1050 | 0 | switch (node_type) { |
1051 | 0 | case TExprNodeType::BOOL_LITERAL: |
1052 | 0 | case TExprNodeType::INT_LITERAL: |
1053 | 0 | case TExprNodeType::LARGE_INT_LITERAL: |
1054 | 0 | case TExprNodeType::FLOAT_LITERAL: |
1055 | 0 | case TExprNodeType::DECIMAL_LITERAL: |
1056 | 0 | case TExprNodeType::STRING_LITERAL: |
1057 | 0 | case TExprNodeType::DATE_LITERAL: |
1058 | 0 | case TExprNodeType::TIMEV2_LITERAL: |
1059 | 0 | return true; |
1060 | 0 | default: |
1061 | 0 | return false; |
1062 | 0 | } |
1063 | 0 | } |
1064 | | |
1065 | 0 | Status SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) { |
1066 | 0 | auto& children = expr->children(); |
1067 | 0 | for (int i = 0; i < children.size(); ++i) { |
1068 | 0 | RETURN_IF_ERROR(_extract_common_expr_columns(children[i])); |
1069 | 0 | } |
1070 | | |
1071 | 0 | auto node_type = expr->node_type(); |
1072 | 0 | if (node_type == TExprNodeType::SLOT_REF) { |
1073 | 0 | auto slot_expr = std::dynamic_pointer_cast<doris::VSlotRef>(expr); |
1074 | 0 | _is_common_expr_column[_schema->column_id(slot_expr->column_id())] = true; |
1075 | 0 | _common_expr_columns.insert(_schema->column_id(slot_expr->column_id())); |
1076 | 0 | } else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) { |
1077 | 0 | std::shared_ptr<VirtualSlotRef> virtual_slot_ref = |
1078 | 0 | std::dynamic_pointer_cast<VirtualSlotRef>(expr); |
1079 | 0 | RETURN_IF_ERROR(_extract_common_expr_columns(virtual_slot_ref->get_virtual_column_expr())); |
1080 | 0 | } |
1081 | | |
1082 | 0 | return Status::OK(); |
1083 | 0 | } |
1084 | | |
1085 | 0 | bool SegmentIterator::_check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred) { |
1086 | 0 | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) { |
1087 | 0 | return false; |
1088 | 0 | } |
1089 | 0 | auto pred_column_id = pred->column_id(); |
1090 | 0 | if (_index_iterators[pred_column_id] == nullptr) { |
1091 | | //this column without inverted index |
1092 | 0 | return false; |
1093 | 0 | } |
1094 | | |
1095 | 0 | if (_inverted_index_not_support_pred_type(pred->type())) { |
1096 | 0 | return false; |
1097 | 0 | } |
1098 | | |
1099 | 0 | if (pred->type() == PredicateType::IN_LIST || pred->type() == PredicateType::NOT_IN_LIST) { |
1100 | | // in_list or not_in_list predicate produced by runtime filter |
1101 | 0 | if (pred->is_runtime_filter()) { |
1102 | 0 | return false; |
1103 | 0 | } |
1104 | 0 | } |
1105 | | |
1106 | | // UNTOKENIZED strings exceed ignore_above, they are written as null, causing range query errors |
1107 | 0 | if (PredicateTypeTraits::is_range(pred->type()) && |
1108 | 0 | !IndexReaderHelper::has_bkd_index(_index_iterators[pred_column_id].get())) { |
1109 | 0 | return false; |
1110 | 0 | } |
1111 | | |
1112 | | // Function filter no apply inverted index |
1113 | 0 | if (dynamic_cast<LikeColumnPredicate<TYPE_CHAR>*>(pred.get()) != nullptr || |
1114 | 0 | dynamic_cast<LikeColumnPredicate<TYPE_STRING>*>(pred.get()) != nullptr) { |
1115 | 0 | return false; |
1116 | 0 | } |
1117 | | |
1118 | 0 | bool handle_by_fulltext = _column_has_fulltext_index(pred_column_id); |
1119 | 0 | if (handle_by_fulltext) { |
1120 | | // when predicate is leafNode of andNode, |
1121 | | // can apply 'match query' and 'equal query' and 'list query' for fulltext index. |
1122 | 0 | return pred->type() == PredicateType::MATCH || pred->type() == PredicateType::IS_NULL || |
1123 | 0 | pred->type() == PredicateType::IS_NOT_NULL || |
1124 | 0 | PredicateTypeTraits::is_equal_or_list(pred->type()); |
1125 | 0 | } |
1126 | | |
1127 | 0 | return true; |
1128 | 0 | } |
1129 | | |
1130 | | // TODO: optimization when all expr can not evaluate by inverted/ann index, |
1131 | 0 | Status SegmentIterator::_apply_index_expr() { |
1132 | 0 | for (const auto& expr_ctx : _common_expr_ctxs_push_down) { |
1133 | 0 | if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) { |
1134 | 0 | if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) { |
1135 | 0 | continue; |
1136 | 0 | } else { |
1137 | | // other code is not to be handled, we should just break |
1138 | 0 | LOG(WARNING) << "failed to evaluate inverted index for expr_ctx: " |
1139 | 0 | << expr_ctx->root()->debug_string() |
1140 | 0 | << ", error msg: " << st.to_string(); |
1141 | 0 | return st; |
1142 | 0 | } |
1143 | 0 | } |
1144 | 0 | } |
1145 | | |
1146 | | // Evaluate inverted index for virtual column MATCH expressions (projections). |
1147 | | // Unlike common exprs which filter rows, these only compute index result bitmaps |
1148 | | // for later materialization via fast_execute(). |
1149 | 0 | for (auto& [cid, expr_ctx] : _virtual_column_exprs) { |
1150 | 0 | if (expr_ctx->get_index_context() == nullptr) { |
1151 | 0 | continue; |
1152 | 0 | } |
1153 | 0 | if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) { |
1154 | 0 | if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) { |
1155 | 0 | continue; |
1156 | 0 | } else { |
1157 | 0 | LOG(WARNING) << "failed to evaluate inverted index for virtual column expr: " |
1158 | 0 | << expr_ctx->root()->debug_string() |
1159 | 0 | << ", error msg: " << st.to_string(); |
1160 | 0 | return st; |
1161 | 0 | } |
1162 | 0 | } |
1163 | 0 | } |
1164 | | |
1165 | | // Apply ann range search |
1166 | 0 | segment_v2::AnnIndexStats ann_index_stats; |
1167 | 0 | for (const auto& expr_ctx : _common_expr_ctxs_push_down) { |
1168 | 0 | size_t origin_rows = _row_bitmap.cardinality(); |
1169 | 0 | RETURN_IF_ERROR(expr_ctx->evaluate_ann_range_search( |
1170 | 0 | _index_iterators, _schema->column_ids(), _column_iterators, |
1171 | 0 | _common_expr_to_slotref_map, _row_bitmap, ann_index_stats)); |
1172 | 0 | _opts.stats->rows_ann_index_range_filtered += (origin_rows - _row_bitmap.cardinality()); |
1173 | 0 | _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value(); |
1174 | 0 | _opts.stats->ann_index_range_search_ns += ann_index_stats.search_costs_ns.value(); |
1175 | 0 | _opts.stats->ann_range_engine_search_ns += ann_index_stats.engine_search_ns.value(); |
1176 | 0 | _opts.stats->ann_range_result_convert_ns += ann_index_stats.result_process_costs_ns.value(); |
1177 | 0 | _opts.stats->ann_range_engine_convert_ns += ann_index_stats.engine_convert_ns.value(); |
1178 | 0 | _opts.stats->ann_range_pre_process_ns += ann_index_stats.engine_prepare_ns.value(); |
1179 | 0 | } |
1180 | | |
1181 | 0 | for (auto it = _common_expr_ctxs_push_down.begin(); it != _common_expr_ctxs_push_down.end();) { |
1182 | 0 | if ((*it)->root()->ann_range_search_executedd()) { |
1183 | 0 | _opts.stats->ann_index_range_search_cnt++; |
1184 | 0 | it = _common_expr_ctxs_push_down.erase(it); |
1185 | 0 | } else { |
1186 | 0 | ++it; |
1187 | 0 | } |
1188 | 0 | } |
1189 | | // TODO:Do we need to remove these expr root from _remaining_conjunct_roots? |
1190 | |
|
1191 | 0 | return Status::OK(); |
1192 | 0 | } |
1193 | | |
1194 | 0 | bool SegmentIterator::_downgrade_without_index(Status res, bool need_remaining) { |
1195 | 0 | bool is_fallback = |
1196 | 0 | _opts.runtime_state->query_options().enable_fallback_on_missing_inverted_index; |
1197 | 0 | if ((res.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND && is_fallback) || |
1198 | 0 | res.code() == ErrorCode::INVERTED_INDEX_BYPASS || |
1199 | 0 | res.code() == ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED || |
1200 | 0 | (res.code() == ErrorCode::INVERTED_INDEX_NO_TERMS && need_remaining) || |
1201 | 0 | res.code() == ErrorCode::INVERTED_INDEX_FILE_CORRUPTED) { |
1202 | | // 1. INVERTED_INDEX_FILE_NOT_FOUND means index file has not been built, |
1203 | | // usually occurs when creating a new index, queries can be downgraded |
1204 | | // without index. |
1205 | | // 2. INVERTED_INDEX_BYPASS means the hit of condition by index |
1206 | | // has reached the optimal limit, downgrade without index query can |
1207 | | // improve query performance. |
1208 | | // 3. INVERTED_INDEX_EVALUATE_SKIPPED means the inverted index is not |
1209 | | // suitable for executing this predicate, skipped it and filter data |
1210 | | // by function later. |
1211 | | // 4. INVERTED_INDEX_NO_TERMS means the column has fulltext index, |
1212 | | // but the column condition value no terms in specified parser, |
1213 | | // such as: where A = '' and B = ',' |
1214 | | // the predicate of A and B need downgrade without index query. |
1215 | | // 5. INVERTED_INDEX_FILE_CORRUPTED means the index file is corrupted, |
1216 | | // such as when index segment files are not generated |
1217 | | // above case can downgrade without index query |
1218 | 0 | _opts.stats->inverted_index_downgrade_count++; |
1219 | 0 | if (!res.is<ErrorCode::INVERTED_INDEX_BYPASS>()) { |
1220 | 0 | LOG(INFO) << "will downgrade without index to evaluate predicate, because of res: " |
1221 | 0 | << res; |
1222 | 0 | } else { |
1223 | 0 | VLOG_DEBUG << "will downgrade without index to evaluate predicate, because of res: " |
1224 | 0 | << res; |
1225 | 0 | } |
1226 | 0 | return true; |
1227 | 0 | } |
1228 | 0 | return false; |
1229 | 0 | } |
1230 | | |
1231 | 0 | bool SegmentIterator::_column_has_fulltext_index(int32_t cid) { |
1232 | 0 | bool has_fulltext_index = |
1233 | 0 | _index_iterators[cid] != nullptr && |
1234 | 0 | _index_iterators[cid]->get_reader(InvertedIndexReaderType::FULLTEXT) && |
1235 | 0 | _index_iterators[cid]->get_reader(InvertedIndexReaderType::STRING_TYPE) == nullptr; |
1236 | |
|
1237 | 0 | return has_fulltext_index; |
1238 | 0 | } |
1239 | | |
1240 | 0 | inline bool SegmentIterator::_inverted_index_not_support_pred_type(const PredicateType& type) { |
1241 | 0 | return type == PredicateType::BF || type == PredicateType::BITMAP_FILTER; |
1242 | 0 | } |
1243 | | |
1244 | | Status SegmentIterator::_apply_inverted_index_on_column_predicate( |
1245 | | std::shared_ptr<ColumnPredicate> pred, |
1246 | 0 | std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates, bool* continue_apply) { |
1247 | 0 | if (!_check_apply_by_inverted_index(pred)) { |
1248 | 0 | remaining_predicates.emplace_back(pred); |
1249 | 0 | } else { |
1250 | 0 | bool need_remaining_after_evaluate = _column_has_fulltext_index(pred->column_id()) && |
1251 | 0 | PredicateTypeTraits::is_equal_or_list(pred->type()); |
1252 | 0 | Status res = |
1253 | 0 | pred->evaluate(_storage_name_and_type[pred->column_id()], |
1254 | 0 | _index_iterators[pred->column_id()].get(), num_rows(), &_row_bitmap); |
1255 | 0 | if (!res.ok()) { |
1256 | 0 | if (_downgrade_without_index(res, need_remaining_after_evaluate)) { |
1257 | 0 | remaining_predicates.emplace_back(pred); |
1258 | 0 | return Status::OK(); |
1259 | 0 | } |
1260 | 0 | LOG(WARNING) << "failed to evaluate index" |
1261 | 0 | << ", column predicate type: " << pred->pred_type_string(pred->type()) |
1262 | 0 | << ", error msg: " << res; |
1263 | 0 | return res; |
1264 | 0 | } |
1265 | | |
1266 | 0 | if (_row_bitmap.isEmpty()) { |
1267 | | // all rows have been pruned, no need to process further predicates |
1268 | 0 | *continue_apply = false; |
1269 | 0 | } |
1270 | |
|
1271 | 0 | if (need_remaining_after_evaluate) { |
1272 | 0 | remaining_predicates.emplace_back(pred); |
1273 | 0 | return Status::OK(); |
1274 | 0 | } |
1275 | 0 | if (!pred->is_runtime_filter()) { |
1276 | 0 | _column_predicate_index_exec_status[pred->column_id()][pred] = true; |
1277 | 0 | } |
1278 | 0 | } |
1279 | 0 | return Status::OK(); |
1280 | 0 | } |
1281 | | |
1282 | 43.4k | bool SegmentIterator::_need_read_data(ColumnId cid) { |
1283 | 43.4k | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) { |
1284 | 0 | return true; |
1285 | 0 | } |
1286 | | // only support DUP_KEYS and UNIQUE_KEYS with MOW |
1287 | 43.4k | if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS || |
1288 | 43.4k | (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS && |
1289 | 16.3k | _opts.enable_unique_key_merge_on_write)))) { |
1290 | 12.0k | return true; |
1291 | 12.0k | } |
1292 | | // this is a virtual column, we always need to read data |
1293 | 31.3k | if (this->_vir_cid_to_idx_in_block.contains(cid)) { |
1294 | 0 | return true; |
1295 | 0 | } |
1296 | | |
1297 | | // if there is a delete predicate, we always need to read data |
1298 | 31.3k | if (_has_delete_predicate(cid)) { |
1299 | 4.03k | return true; |
1300 | 4.03k | } |
1301 | 27.2k | if (_output_columns.count(-1)) { |
1302 | | // if _output_columns contains -1, it means that the light |
1303 | | // weight schema change may not be enabled or other reasons |
1304 | | // caused the column unique_id not be set, to prevent errors |
1305 | | // occurring, return true here that column data needs to be read |
1306 | 0 | return true; |
1307 | 0 | } |
1308 | | // Check the following conditions: |
1309 | | // 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]') |
1310 | | // and it's not marked for projection in '_output_columns'. |
1311 | | // 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns', |
1312 | | // and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function. |
1313 | | // If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column. |
1314 | | // Then, return false. |
1315 | 27.2k | const auto& column = _opts.tablet_schema->column(cid); |
1316 | | // Different subcolumns may share the same parent_unique_id, so we choose to abandon this optimization. |
1317 | 27.2k | if (column.is_extracted_column() && |
1318 | 27.2k | _opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) { |
1319 | 1 | return true; |
1320 | 1 | } |
1321 | 27.2k | int32_t unique_id = column.unique_id(); |
1322 | 27.2k | if (unique_id < 0) { |
1323 | 1 | unique_id = column.parent_unique_id(); |
1324 | 1 | } |
1325 | 27.2k | if ((_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] && |
1326 | 27.2k | !_output_columns.contains(unique_id)) || |
1327 | 27.2k | (_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] && |
1328 | 27.2k | _output_columns.count(unique_id) == 1 && |
1329 | 27.2k | _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) { |
1330 | 1 | VLOG_DEBUG << "SegmentIterator no need read data for column: " |
1331 | 0 | << _opts.tablet_schema->column_by_uid(unique_id).name(); |
1332 | 1 | return false; |
1333 | 1 | } |
1334 | 27.2k | return true; |
1335 | 27.2k | } |
1336 | | |
1337 | 0 | Status SegmentIterator::_apply_inverted_index() { |
1338 | 0 | std::vector<std::shared_ptr<ColumnPredicate>> remaining_predicates; |
1339 | 0 | std::set<std::shared_ptr<ColumnPredicate>> no_need_to_pass_column_predicate_set; |
1340 | |
|
1341 | 0 | for (auto pred : _col_predicates) { |
1342 | 0 | if (no_need_to_pass_column_predicate_set.count(pred) > 0) { |
1343 | 0 | continue; |
1344 | 0 | } else { |
1345 | 0 | bool continue_apply = true; |
1346 | 0 | RETURN_IF_ERROR(_apply_inverted_index_on_column_predicate(pred, remaining_predicates, |
1347 | 0 | &continue_apply)); |
1348 | 0 | if (!continue_apply) { |
1349 | 0 | break; |
1350 | 0 | } |
1351 | 0 | } |
1352 | 0 | } |
1353 | | |
1354 | 0 | _col_predicates = std::move(remaining_predicates); |
1355 | 0 | return Status::OK(); |
1356 | 0 | } |
1357 | | |
1358 | | /** |
1359 | | * @brief Checks if all conditions related to a specific column have passed in both |
1360 | | * `_column_predicate_inverted_index_status` and `_common_expr_inverted_index_status`. |
1361 | | * |
1362 | | * This function first checks the conditions in `_column_predicate_inverted_index_status` |
1363 | | * for the given `ColumnId`. If all conditions pass, it sets `default_return` to `true`. |
1364 | | * It then checks the conditions in `_common_expr_inverted_index_status` for the same column. |
1365 | | * |
1366 | | * The function returns `true` if all conditions in both maps pass. If any condition fails |
1367 | | * in either map, the function immediately returns `false`. If the column does not exist |
1368 | | * in one of the maps, the function returns `default_return`. |
1369 | | * |
1370 | | * @param cid The ColumnId of the column to check. |
1371 | | * @param default_return The default value to return if the column is not found in the status maps. |
1372 | | * @return true if all conditions in both status maps pass, or if the column is not found |
1373 | | * and `default_return` is true. |
1374 | | * @return false if any condition in either status map fails, or if the column is not found |
1375 | | * and `default_return` is false. |
1376 | | */ |
1377 | | bool SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(ColumnId cid, |
1378 | 0 | bool default_return) { |
1379 | | // If common_expr_pushdown is disabled, we cannot guarantee that all conditions are processed by the inverted index. |
1380 | | // Consider a scenario where there is a column predicate and an expression involving the same column in the SQL query, |
1381 | | // such as 'a < 0' and 'abs(a) > 1'. This could potentially lead to errors. |
1382 | 0 | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_common_expr_pushdown) { |
1383 | 0 | return false; |
1384 | 0 | } |
1385 | 0 | auto pred_it = _column_predicate_index_exec_status.find(cid); |
1386 | 0 | if (pred_it != _column_predicate_index_exec_status.end()) { |
1387 | 0 | const auto& pred_map = pred_it->second; |
1388 | 0 | bool pred_passed = std::all_of(pred_map.begin(), pred_map.end(), |
1389 | 0 | [](const auto& pred_entry) { return pred_entry.second; }); |
1390 | 0 | if (!pred_passed) { |
1391 | 0 | return false; |
1392 | 0 | } else { |
1393 | 0 | default_return = true; |
1394 | 0 | } |
1395 | 0 | } |
1396 | | |
1397 | 0 | auto expr_it = _common_expr_index_exec_status.find(cid); |
1398 | 0 | if (expr_it != _common_expr_index_exec_status.end()) { |
1399 | 0 | const auto& expr_map = expr_it->second; |
1400 | 0 | return std::all_of(expr_map.begin(), expr_map.end(), |
1401 | 0 | [](const auto& expr_entry) { return expr_entry.second; }); |
1402 | 0 | } |
1403 | 0 | return default_return; |
1404 | 0 | } |
1405 | | |
1406 | 5.63k | Status SegmentIterator::_init_return_column_iterators() { |
1407 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_return_column_iterators_timer_ns); |
1408 | 5.63k | if (_cur_rowid >= num_rows()) { |
1409 | 0 | return Status::OK(); |
1410 | 0 | } |
1411 | | |
1412 | 12.6k | for (auto cid : _schema->column_ids()) { |
1413 | 12.6k | if (_schema->column(cid)->name() == BeConsts::ROWID_COL) { |
1414 | 0 | _column_iterators[cid].reset( |
1415 | 0 | new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id())); |
1416 | 0 | continue; |
1417 | 0 | } |
1418 | | |
1419 | 12.6k | if (_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { |
1420 | 0 | auto& id_file_map = _opts.runtime_state->get_id_file_map(); |
1421 | 0 | uint32_t file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>( |
1422 | 0 | _opts.tablet_id, _opts.rowset_id, _segment->id())); |
1423 | 0 | _column_iterators[cid].reset(new RowIdColumnIteratorV2( |
1424 | 0 | IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id)); |
1425 | 0 | continue; |
1426 | 0 | } |
1427 | | |
1428 | 12.6k | if (_schema->column(cid)->name().starts_with(BeConsts::VIRTUAL_COLUMN_PREFIX)) { |
1429 | 0 | _column_iterators[cid] = std::make_unique<VirtualColumnIterator>(); |
1430 | 0 | continue; |
1431 | 0 | } |
1432 | | |
1433 | 12.6k | std::set<ColumnId> del_cond_id_set; |
1434 | 12.6k | _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set); |
1435 | 12.6k | std::vector<bool> tmp_is_pred_column; |
1436 | 12.6k | tmp_is_pred_column.resize(_schema->columns().size(), false); |
1437 | 12.6k | for (auto predicate : _col_predicates) { |
1438 | 0 | auto p_cid = predicate->column_id(); |
1439 | 0 | tmp_is_pred_column[p_cid] = true; |
1440 | 0 | } |
1441 | | // handle delete_condition |
1442 | 12.6k | for (auto d_cid : del_cond_id_set) { |
1443 | 1.32k | tmp_is_pred_column[d_cid] = true; |
1444 | 1.32k | } |
1445 | | |
1446 | 12.6k | if (_column_iterators[cid] == nullptr) { |
1447 | 12.6k | RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid), |
1448 | 12.6k | &_column_iterators[cid], &_opts, |
1449 | 12.6k | &_variant_sparse_column_cache)); |
1450 | 12.6k | ColumnIteratorOptions iter_opts { |
1451 | 12.6k | .use_page_cache = _opts.use_page_cache, |
1452 | | // If the col is predicate column, then should read the last page to check |
1453 | | // if the column is full dict encoding |
1454 | 12.6k | .is_predicate_column = tmp_is_pred_column[cid], |
1455 | 12.6k | .file_reader = _file_reader.get(), |
1456 | 12.6k | .stats = _opts.stats, |
1457 | 12.6k | .io_ctx = _opts.io_ctx, |
1458 | 12.6k | }; |
1459 | 12.6k | RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); |
1460 | 12.6k | } |
1461 | 12.6k | } |
1462 | | |
1463 | 5.63k | #ifndef NDEBUG |
1464 | 5.63k | for (auto pair : _vir_cid_to_idx_in_block) { |
1465 | 0 | ColumnId vir_col_cid = pair.first; |
1466 | 0 | DCHECK(_column_iterators[vir_col_cid] != nullptr) |
1467 | 0 | << "Virtual column iterator for " << vir_col_cid << " should not be null"; |
1468 | 0 | ColumnIterator* column_iter = _column_iterators[vir_col_cid].get(); |
1469 | 0 | DCHECK(dynamic_cast<VirtualColumnIterator*>(column_iter) != nullptr) |
1470 | 0 | << "Virtual column iterator for " << vir_col_cid |
1471 | 0 | << " should be VirtualColumnIterator"; |
1472 | 0 | } |
1473 | 5.63k | #endif |
1474 | 5.63k | return Status::OK(); |
1475 | 5.63k | } |
1476 | | |
1477 | 5.63k | Status SegmentIterator::_init_index_iterators() { |
1478 | 5.63k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_index_iterators_timer_ns); |
1479 | 5.63k | if (_cur_rowid >= num_rows()) { |
1480 | 0 | return Status::OK(); |
1481 | 0 | } |
1482 | | |
1483 | 5.63k | _index_query_context = std::make_shared<IndexQueryContext>(); |
1484 | 5.63k | _index_query_context->io_ctx = &_opts.io_ctx; |
1485 | 5.63k | _index_query_context->stats = _opts.stats; |
1486 | 5.63k | _index_query_context->runtime_state = _opts.runtime_state; |
1487 | | |
1488 | 5.63k | if (_score_runtime) { |
1489 | 0 | _index_query_context->collection_statistics = _opts.collection_statistics; |
1490 | 0 | _index_query_context->collection_similarity = std::make_shared<CollectionSimilarity>(); |
1491 | 0 | } |
1492 | | |
1493 | | // Inverted index iterators |
1494 | 12.6k | for (auto cid : _schema->column_ids()) { |
1495 | | // Use segment’s own index_meta, for compatibility with future indexing needs to default to lowercase. |
1496 | 12.6k | if (_index_iterators[cid] == nullptr) { |
1497 | | // In the _opts.tablet_schema, the sub-column type information for the variant is FieldType::OLAP_FIELD_TYPE_VARIANT. |
1498 | | // This is because the sub-column is created in create_materialized_variant_column. |
1499 | | // We use this column to locate the metadata for the inverted index, which requires a unique_id and path. |
1500 | 12.6k | const auto& column = _opts.tablet_schema->column(cid); |
1501 | 12.6k | std::vector<const TabletIndex*> inverted_indexs; |
1502 | | // Keep shared_ptr alive to prevent use-after-free when accessing raw pointers |
1503 | 12.6k | TabletIndexes inverted_indexs_holder; |
1504 | | // If the column is an extracted column, we need to find the sub-column in the parent column reader. |
1505 | 12.6k | std::shared_ptr<ColumnReader> column_reader; |
1506 | 12.6k | if (column.is_extracted_column()) { |
1507 | 0 | if (!_segment->_column_reader_cache->get_column_reader( |
1508 | 0 | column.parent_unique_id(), &column_reader, _opts.stats) || |
1509 | 0 | column_reader == nullptr) { |
1510 | 0 | continue; |
1511 | 0 | } |
1512 | 0 | auto* variant_reader = assert_cast<VariantColumnReader*>(column_reader.get()); |
1513 | 0 | DataTypePtr data_type = _storage_name_and_type[cid].second; |
1514 | 0 | if (data_type != nullptr && |
1515 | 0 | data_type->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
1516 | 0 | DataTypePtr inferred_type; |
1517 | 0 | Status st = variant_reader->infer_data_type_for_path( |
1518 | 0 | &inferred_type, column, _opts, _segment->_column_reader_cache.get()); |
1519 | 0 | if (st.ok() && inferred_type != nullptr) { |
1520 | 0 | data_type = inferred_type; |
1521 | 0 | } |
1522 | 0 | } |
1523 | 0 | inverted_indexs_holder = |
1524 | 0 | variant_reader->find_subcolumn_tablet_indexes(column, data_type); |
1525 | | // Extract raw pointers from shared_ptr for iteration |
1526 | 0 | for (const auto& index_ptr : inverted_indexs_holder) { |
1527 | 0 | inverted_indexs.push_back(index_ptr.get()); |
1528 | 0 | } |
1529 | 0 | } |
1530 | | // If the column is not an extracted column, we can directly get the inverted index metadata from the tablet schema. |
1531 | 12.6k | else { |
1532 | 12.6k | inverted_indexs = _segment->_tablet_schema->inverted_indexs(column); |
1533 | 12.6k | } |
1534 | 12.6k | for (const auto& inverted_index : inverted_indexs) { |
1535 | 2.56k | RETURN_IF_ERROR(_segment->new_index_iterator(column, inverted_index, _opts, |
1536 | 2.56k | &_index_iterators[cid])); |
1537 | 2.56k | } |
1538 | 12.6k | if (_index_iterators[cid] != nullptr) { |
1539 | 2.55k | _index_iterators[cid]->set_context(_index_query_context); |
1540 | 2.55k | } |
1541 | 12.6k | } |
1542 | 12.6k | } |
1543 | | |
1544 | | // Ann index iterators |
1545 | 12.6k | for (auto cid : _schema->column_ids()) { |
1546 | 12.6k | if (_index_iterators[cid] == nullptr) { |
1547 | 10.1k | const auto& column = _opts.tablet_schema->column(cid); |
1548 | 10.1k | const auto* index_meta = _segment->_tablet_schema->ann_index(column); |
1549 | 10.1k | if (index_meta) { |
1550 | 1 | RETURN_IF_ERROR(_segment->new_index_iterator(column, index_meta, _opts, |
1551 | 1 | &_index_iterators[cid])); |
1552 | | |
1553 | 1 | if (_index_iterators[cid] != nullptr) { |
1554 | 1 | _index_iterators[cid]->set_context(_index_query_context); |
1555 | 1 | } |
1556 | 1 | } |
1557 | 10.1k | } |
1558 | 12.6k | } |
1559 | | |
1560 | 5.63k | return Status::OK(); |
1561 | 5.63k | } |
1562 | | |
1563 | | Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound, |
1564 | 0 | rowid_t* rowid) { |
1565 | 0 | if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS && |
1566 | 0 | _segment->get_primary_key_index() != nullptr) { |
1567 | 0 | return _lookup_ordinal_from_pk_index(key, is_include, rowid); |
1568 | 0 | } |
1569 | 0 | return _lookup_ordinal_from_sk_index(key, is_include, upper_bound, rowid); |
1570 | 0 | } |
1571 | | |
1572 | | // look up one key to get its ordinal at which can get data by using short key index. |
1573 | | // 'upper_bound' is defined the max ordinal the function will search. |
1574 | | // We use upper_bound to reduce search times. |
1575 | | // If we find a valid ordinal, it will be set in rowid and with Status::OK() |
1576 | | // If we can not find a valid key in this segment, we will set rowid to upper_bound |
1577 | | // Otherwise return error. |
1578 | | // 1. get [start, end) ordinal through short key index |
1579 | | // 2. binary search to find exact ordinal that match the input condition |
1580 | | // Make is_include template to reduce branch |
1581 | | Status SegmentIterator::_lookup_ordinal_from_sk_index(const RowCursor& key, bool is_include, |
1582 | 0 | rowid_t upper_bound, rowid_t* rowid) { |
1583 | 0 | const ShortKeyIndexDecoder* sk_index_decoder = _segment->get_short_key_index(); |
1584 | 0 | DCHECK(sk_index_decoder != nullptr); |
1585 | |
|
1586 | 0 | std::string index_key; |
1587 | 0 | key.encode_key_with_padding(&index_key, _segment->_tablet_schema->num_short_key_columns(), |
1588 | 0 | is_include); |
1589 | |
|
1590 | 0 | const auto& key_col_ids = key.schema()->column_ids(); |
1591 | 0 | _convert_rowcursor_to_short_key(key, key_col_ids.size()); |
1592 | |
|
1593 | 0 | ssize_t start_block_id = 0; |
1594 | 0 | auto start_iter = sk_index_decoder->lower_bound(index_key); |
1595 | 0 | if (start_iter.valid()) { |
1596 | | // Because previous block may contain this key, so we should set rowid to |
1597 | | // last block's first row. |
1598 | 0 | start_block_id = start_iter.ordinal(); |
1599 | 0 | if (start_block_id > 0) { |
1600 | 0 | start_block_id--; |
1601 | 0 | } |
1602 | 0 | } else { |
1603 | | // When we don't find a valid index item, which means all short key is |
1604 | | // smaller than input key, this means that this key may exist in the last |
1605 | | // row block. so we set the rowid to first row of last row block. |
1606 | 0 | start_block_id = sk_index_decoder->num_items() - 1; |
1607 | 0 | } |
1608 | 0 | rowid_t start = cast_set<rowid_t>(start_block_id) * sk_index_decoder->num_rows_per_block(); |
1609 | |
|
1610 | 0 | rowid_t end = upper_bound; |
1611 | 0 | auto end_iter = sk_index_decoder->upper_bound(index_key); |
1612 | 0 | if (end_iter.valid()) { |
1613 | 0 | end = cast_set<rowid_t>(end_iter.ordinal()) * sk_index_decoder->num_rows_per_block(); |
1614 | 0 | } |
1615 | | |
1616 | | // binary search to find the exact key |
1617 | 0 | while (start < end) { |
1618 | 0 | rowid_t mid = (start + end) / 2; |
1619 | 0 | RETURN_IF_ERROR(_seek_and_peek(mid)); |
1620 | 0 | int cmp = _compare_short_key_with_seek_block(key_col_ids); |
1621 | 0 | if (cmp > 0) { |
1622 | 0 | start = mid + 1; |
1623 | 0 | } else if (cmp == 0) { |
1624 | 0 | if (is_include) { |
1625 | | // lower bound |
1626 | 0 | end = mid; |
1627 | 0 | } else { |
1628 | | // upper bound |
1629 | 0 | start = mid + 1; |
1630 | 0 | } |
1631 | 0 | } else { |
1632 | 0 | end = mid; |
1633 | 0 | } |
1634 | 0 | } |
1635 | | |
1636 | 0 | *rowid = start; |
1637 | 0 | return Status::OK(); |
1638 | 0 | } |
1639 | | |
1640 | | Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool is_include, |
1641 | 0 | rowid_t* rowid) { |
1642 | 0 | DCHECK(_segment->_tablet_schema->keys_type() == UNIQUE_KEYS); |
1643 | 0 | const PrimaryKeyIndexReader* pk_index_reader = _segment->get_primary_key_index(); |
1644 | 0 | DCHECK(pk_index_reader != nullptr); |
1645 | |
|
1646 | 0 | std::string index_key; |
1647 | 0 | key.encode_key_with_padding<true>(&index_key, _segment->_tablet_schema->num_key_columns(), |
1648 | 0 | is_include); |
1649 | 0 | if (index_key < _segment->min_key()) { |
1650 | 0 | *rowid = 0; |
1651 | 0 | return Status::OK(); |
1652 | 0 | } else if (index_key > _segment->max_key()) { |
1653 | 0 | *rowid = num_rows(); |
1654 | 0 | return Status::OK(); |
1655 | 0 | } |
1656 | 0 | bool exact_match = false; |
1657 | |
|
1658 | 0 | std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator; |
1659 | 0 | RETURN_IF_ERROR(pk_index_reader->new_iterator(&index_iterator, _opts.stats)); |
1660 | | |
1661 | 0 | Status status = index_iterator->seek_at_or_after(&index_key, &exact_match); |
1662 | 0 | if (UNLIKELY(!status.ok())) { |
1663 | 0 | *rowid = num_rows(); |
1664 | 0 | if (status.is<ENTRY_NOT_FOUND>()) { |
1665 | 0 | return Status::OK(); |
1666 | 0 | } |
1667 | 0 | return status; |
1668 | 0 | } |
1669 | 0 | *rowid = cast_set<rowid_t>(index_iterator->get_current_ordinal()); |
1670 | | |
1671 | | // The sequence column needs to be removed from primary key index when comparing key |
1672 | 0 | bool has_seq_col = _segment->_tablet_schema->has_sequence_col(); |
1673 | | // Used to get key range from primary key index, |
1674 | | // for mow with cluster key table, we should get key range from short key index. |
1675 | 0 | DCHECK(_segment->_tablet_schema->cluster_key_uids().empty()); |
1676 | | |
1677 | | // if full key is exact_match, the primary key without sequence column should also the same |
1678 | 0 | if (has_seq_col && !exact_match) { |
1679 | 0 | size_t seq_col_length = |
1680 | 0 | _segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx()) |
1681 | 0 | .length() + |
1682 | 0 | 1; |
1683 | 0 | auto index_type = DataTypeFactory::instance().create_data_type( |
1684 | 0 | _segment->_pk_index_reader->type_info()->type(), 1, 0); |
1685 | 0 | auto index_column = index_type->create_column(); |
1686 | 0 | size_t num_to_read = 1; |
1687 | 0 | size_t num_read = num_to_read; |
1688 | 0 | RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column)); |
1689 | 0 | DCHECK(num_to_read == num_read); |
1690 | |
|
1691 | 0 | Slice sought_key = |
1692 | 0 | Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); |
1693 | 0 | Slice sought_key_without_seq = |
1694 | 0 | Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length); |
1695 | | |
1696 | | // compare key |
1697 | 0 | if (Slice(index_key).compare(sought_key_without_seq) == 0) { |
1698 | 0 | exact_match = true; |
1699 | 0 | } |
1700 | 0 | } |
1701 | | |
1702 | | // find the key in primary key index, and the is_include is false, so move |
1703 | | // to the next row. |
1704 | 0 | if (exact_match && !is_include) { |
1705 | 0 | *rowid += 1; |
1706 | 0 | } |
1707 | 0 | return Status::OK(); |
1708 | 0 | } |
1709 | | |
1710 | | // seek to the row and load that row to _key_cursor |
1711 | 0 | Status SegmentIterator::_seek_and_peek(rowid_t rowid) { |
1712 | 0 | { |
1713 | 0 | _opts.stats->block_init_seek_num += 1; |
1714 | 0 | SCOPED_RAW_TIMER(&_opts.stats->block_init_seek_ns); |
1715 | 0 | RETURN_IF_ERROR(_seek_columns(_seek_schema->column_ids(), rowid)); |
1716 | 0 | } |
1717 | 0 | size_t num_rows = 1; |
1718 | | |
1719 | | //note(wb) reset _seek_block for memory reuse |
1720 | | // it is easier to use row based memory layout for clear memory |
1721 | 0 | for (int i = 0; i < _seek_block.size(); i++) { |
1722 | 0 | _seek_block[i]->clear(); |
1723 | 0 | } |
1724 | 0 | RETURN_IF_ERROR(_read_columns(_seek_schema->column_ids(), _seek_block, num_rows)); |
1725 | 0 | return Status::OK(); |
1726 | 0 | } |
1727 | | |
1728 | 0 | Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) { |
1729 | 0 | for (auto cid : column_ids) { |
1730 | 0 | if (!_need_read_data(cid)) { |
1731 | 0 | continue; |
1732 | 0 | } |
1733 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(pos)); |
1734 | 0 | } |
1735 | 0 | return Status::OK(); |
1736 | 0 | } |
1737 | | |
1738 | | /* ---------------------- for vectorization implementation ---------------------- */ |
1739 | | |
1740 | | /** |
1741 | | * For storage layer data type, can be measured from two perspectives: |
1742 | | * 1 Whether the type can be read in a fast way(batch read using SIMD) |
1743 | | * Such as integer type and float type, this type can be read in SIMD way. |
1744 | | * For the type string/bitmap/hll, they can not be read in batch way, so read this type data is slow. |
1745 | | * If a type can be read fast, we can try to eliminate Lazy Materialization, because we think for this type, seek cost > read cost. |
1746 | | * This is an estimate, if we want more precise cost, statistics collection is necessary(this is a todo). |
1747 | | * In short, when returned non-pred columns contains string/hll/bitmap, we using Lazy Materialization. |
1748 | | * Otherwise, we disable it. |
1749 | | * |
1750 | | * When Lazy Materialization enable, we need to read column at least two times. |
1751 | | * First time to read Pred col, second time to read non-pred. |
1752 | | * Here's an interesting question to research, whether read Pred col once is the best plan. |
1753 | | * (why not read Pred col twice or more?) |
1754 | | * |
1755 | | * When Lazy Materialization disable, we just need to read once. |
1756 | | * |
1757 | | * |
1758 | | * 2 Whether the predicate type can be evaluate in a fast way(using SIMD to eval pred) |
1759 | | * Such as integer type and float type, they can be eval fast. |
1760 | | * But for BloomFilter/string/date, they eval slow. |
1761 | | * If a type can be eval fast, we use vectorization to eval it. |
1762 | | * Otherwise, we use short-circuit to eval it. |
1763 | | * |
1764 | | * |
1765 | | */ |
1766 | | |
1767 | | // todo(wb) need a UT here |
1768 | 5.63k | Status SegmentIterator::_vec_init_lazy_materialization() { |
1769 | 5.63k | _is_pred_column.resize(_schema->columns().size(), false); |
1770 | | |
1771 | | // including short/vec/delete pred |
1772 | 5.63k | std::set<ColumnId> pred_column_ids; |
1773 | 5.63k | _lazy_materialization_read = false; |
1774 | | |
1775 | 5.63k | std::set<ColumnId> del_cond_id_set; |
1776 | 5.63k | _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set); |
1777 | | |
1778 | 5.63k | std::set<std::shared_ptr<const ColumnPredicate>> delete_predicate_set {}; |
1779 | 5.63k | _opts.delete_condition_predicates->get_all_column_predicate(delete_predicate_set); |
1780 | 5.63k | for (auto predicate : delete_predicate_set) { |
1781 | 467 | if (PredicateTypeTraits::is_range(predicate->type())) { |
1782 | 327 | _delete_range_column_ids.push_back(predicate->column_id()); |
1783 | 327 | } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) { |
1784 | 0 | _delete_bloom_filter_column_ids.push_back(predicate->column_id()); |
1785 | 0 | } |
1786 | 467 | } |
1787 | | |
1788 | | // Step1: extract columns that can be lazy materialization |
1789 | 5.63k | if (!_col_predicates.empty() || !del_cond_id_set.empty()) { |
1790 | 467 | std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid |
1791 | 467 | std::set<ColumnId> vec_pred_col_id_set; |
1792 | | |
1793 | 467 | for (auto predicate : _col_predicates) { |
1794 | 0 | auto cid = predicate->column_id(); |
1795 | 0 | _is_pred_column[cid] = true; |
1796 | 0 | pred_column_ids.insert(cid); |
1797 | | |
1798 | | // check pred using short eval or vec eval |
1799 | 0 | if (_can_evaluated_by_vectorized(predicate)) { |
1800 | 0 | vec_pred_col_id_set.insert(cid); |
1801 | 0 | _pre_eval_block_predicate.push_back(predicate); |
1802 | 0 | } else { |
1803 | 0 | short_cir_pred_col_id_set.insert(cid); |
1804 | 0 | _short_cir_eval_predicate.push_back(predicate); |
1805 | 0 | } |
1806 | 0 | if (predicate->is_runtime_filter()) { |
1807 | 0 | _filter_info_id.push_back(predicate); |
1808 | 0 | } |
1809 | 0 | } |
1810 | | |
1811 | | // handle delete_condition |
1812 | 467 | if (!del_cond_id_set.empty()) { |
1813 | 467 | short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end()); |
1814 | 467 | pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end()); |
1815 | | |
1816 | 467 | for (auto cid : del_cond_id_set) { |
1817 | 467 | _is_pred_column[cid] = true; |
1818 | 467 | } |
1819 | 467 | } |
1820 | | |
1821 | 467 | _vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend()); |
1822 | 467 | _short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(), |
1823 | 467 | short_cir_pred_col_id_set.cend()); |
1824 | 467 | } |
1825 | | |
1826 | 5.63k | if (!_vec_pred_column_ids.empty()) { |
1827 | 0 | _is_need_vec_eval = true; |
1828 | 0 | } |
1829 | 5.63k | if (!_short_cir_pred_column_ids.empty()) { |
1830 | 467 | _is_need_short_eval = true; |
1831 | 467 | } |
1832 | | |
1833 | | // ColumnId to column index in block |
1834 | | // ColumnId will contail all columns in tablet schema, including virtual columns and global rowid column, |
1835 | 5.63k | _schema_block_id_map.resize(_schema->columns().size(), -1); |
1836 | | // Use cols read by query to initialize _schema_block_id_map. |
1837 | | // We need to know the index of each column in the block. |
1838 | | // There is an assumption here that the columns in the block are in the same order as in the read schema. |
1839 | | // TODO: A probelm is that, delete condition columns will exist in _schema->column_ids but not in block if |
1840 | | // delete column is not read by the query. |
1841 | 18.3k | for (int i = 0; i < _schema->num_column_ids(); i++) { |
1842 | 12.6k | auto cid = _schema->column_id(i); |
1843 | 12.6k | _schema_block_id_map[cid] = i; |
1844 | 12.6k | } |
1845 | | |
1846 | | // Step2: extract columns that can execute expr context |
1847 | 5.63k | _is_common_expr_column.resize(_schema->columns().size(), false); |
1848 | 5.63k | if (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty()) { |
1849 | 0 | for (auto expr : _remaining_conjunct_roots) { |
1850 | 0 | RETURN_IF_ERROR(_extract_common_expr_columns(expr)); |
1851 | 0 | } |
1852 | 0 | if (!_common_expr_columns.empty()) { |
1853 | 0 | _is_need_expr_eval = true; |
1854 | 0 | for (auto cid : _schema->column_ids()) { |
1855 | | // pred column also needs to be filtered by expr, exclude additional delete condition column. |
1856 | | // if delete condition column not in the block, no filter is needed |
1857 | | // and will be removed from _columns_to_filter in the first next_batch. |
1858 | 0 | if (_is_common_expr_column[cid] || _is_pred_column[cid]) { |
1859 | 0 | auto loc = _schema_block_id_map[cid]; |
1860 | 0 | _columns_to_filter.push_back(loc); |
1861 | 0 | } |
1862 | 0 | } |
1863 | |
|
1864 | 0 | for (auto pair : _vir_cid_to_idx_in_block) { |
1865 | 0 | _columns_to_filter.push_back(cast_set<ColumnId>(pair.second)); |
1866 | 0 | } |
1867 | 0 | } |
1868 | 0 | } |
1869 | | |
1870 | | // Step 3: fill non predicate columns and second read column |
1871 | | // if _schema columns size equal to pred_column_ids size, lazy_materialization_read is false, |
1872 | | // all columns are lazy materialization columns without non predicte column. |
1873 | | // If common expr pushdown exists, and expr column is not contained in lazy materialization columns, |
1874 | | // add to second read column, which will be read after lazy materialization |
1875 | 5.63k | if (_schema->column_ids().size() > pred_column_ids.size()) { |
1876 | | // pred_column_ids maybe empty, so that could not set _lazy_materialization_read = true here |
1877 | | // has to check there is at least one predicate column |
1878 | 12.6k | for (auto cid : _schema->column_ids()) { |
1879 | 12.6k | if (!_is_pred_column[cid]) { |
1880 | 12.2k | if (_is_need_vec_eval || _is_need_short_eval) { |
1881 | 862 | _lazy_materialization_read = true; |
1882 | 862 | } |
1883 | 12.2k | if (_is_common_expr_column[cid]) { |
1884 | 0 | _common_expr_column_ids.push_back(cid); |
1885 | 12.2k | } else { |
1886 | 12.2k | _non_predicate_columns.push_back(cid); |
1887 | 12.2k | } |
1888 | 12.2k | } |
1889 | 12.6k | } |
1890 | 5.57k | } |
1891 | | |
1892 | | // Step 4: fill first read columns |
1893 | 5.63k | if (_lazy_materialization_read) { |
1894 | | // insert pred cid to first_read_columns |
1895 | 410 | for (auto cid : pred_column_ids) { |
1896 | 410 | _predicate_column_ids.push_back(cid); |
1897 | 410 | } |
1898 | 5.22k | } else if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { |
1899 | 16.5k | for (int i = 0; i < _schema->num_column_ids(); i++) { |
1900 | 11.3k | auto cid = _schema->column_id(i); |
1901 | 11.3k | _predicate_column_ids.push_back(cid); |
1902 | 11.3k | } |
1903 | 5.16k | } else { |
1904 | 57 | if (_is_need_vec_eval || _is_need_short_eval) { |
1905 | | // TODO To refactor, because we suppose lazy materialization is better performance. |
1906 | | // pred exits, but we can eliminate lazy materialization |
1907 | | // insert pred/non-pred cid to first read columns |
1908 | 57 | std::set<ColumnId> pred_id_set; |
1909 | 57 | pred_id_set.insert(_short_cir_pred_column_ids.begin(), |
1910 | 57 | _short_cir_pred_column_ids.end()); |
1911 | 57 | pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end()); |
1912 | | |
1913 | 57 | DCHECK(_common_expr_column_ids.empty()); |
1914 | | // _non_predicate_column_ids must be empty. Otherwise _lazy_materialization_read must not false. |
1915 | 114 | for (int i = 0; i < _schema->num_column_ids(); i++) { |
1916 | 57 | auto cid = _schema->column_id(i); |
1917 | 57 | if (pred_id_set.find(cid) != pred_id_set.end()) { |
1918 | 57 | _predicate_column_ids.push_back(cid); |
1919 | 57 | } |
1920 | 57 | } |
1921 | 57 | } else if (_is_need_expr_eval) { |
1922 | 0 | DCHECK(!_is_need_vec_eval && !_is_need_short_eval); |
1923 | 0 | for (auto cid : _common_expr_columns) { |
1924 | 0 | _predicate_column_ids.push_back(cid); |
1925 | 0 | } |
1926 | 0 | } |
1927 | 57 | } |
1928 | | |
1929 | 5.63k | VLOG_DEBUG << fmt::format( |
1930 | 0 | "Laze materialization init end. " |
1931 | 0 | "lazy_materialization_read: {}, " |
1932 | 0 | "_col_predicates size: {}, " |
1933 | 0 | "_cols_read_by_column_predicate: [{}], " |
1934 | 0 | "_non_predicate_columns: [{}], " |
1935 | 0 | "_cols_read_by_common_expr: [{}], " |
1936 | 0 | "columns_to_filter: [{}], " |
1937 | 0 | "_schema_block_id_map: [{}]", |
1938 | 0 | _lazy_materialization_read, _col_predicates.size(), |
1939 | 0 | fmt::join(_predicate_column_ids, ","), fmt::join(_non_predicate_columns, ","), |
1940 | 0 | fmt::join(_common_expr_column_ids, ","), fmt::join(_columns_to_filter, ","), |
1941 | 0 | fmt::join(_schema_block_id_map, ",")); |
1942 | 5.63k | return Status::OK(); |
1943 | 5.63k | } |
1944 | | |
1945 | 0 | bool SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate> predicate) { |
1946 | 0 | auto cid = predicate->column_id(); |
1947 | 0 | FieldType field_type = _schema->column(cid)->type(); |
1948 | 0 | if (field_type == FieldType::OLAP_FIELD_TYPE_VARIANT) { |
1949 | | // Use variant cast dst type |
1950 | 0 | field_type = _opts.target_cast_type_for_variants[_schema->column(cid)->name()] |
1951 | 0 | ->get_storage_field_type(); |
1952 | 0 | } |
1953 | 0 | switch (predicate->type()) { |
1954 | 0 | case PredicateType::EQ: |
1955 | 0 | case PredicateType::NE: |
1956 | 0 | case PredicateType::LE: |
1957 | 0 | case PredicateType::LT: |
1958 | 0 | case PredicateType::GE: |
1959 | 0 | case PredicateType::GT: { |
1960 | 0 | if (field_type == FieldType::OLAP_FIELD_TYPE_VARCHAR || |
1961 | 0 | field_type == FieldType::OLAP_FIELD_TYPE_CHAR || |
1962 | 0 | field_type == FieldType::OLAP_FIELD_TYPE_STRING) { |
1963 | 0 | return config::enable_low_cardinality_optimize && |
1964 | 0 | _opts.io_ctx.reader_type == ReaderType::READER_QUERY && |
1965 | 0 | _column_iterators[cid]->is_all_dict_encoding(); |
1966 | 0 | } else if (field_type == FieldType::OLAP_FIELD_TYPE_DECIMAL) { |
1967 | 0 | return false; |
1968 | 0 | } |
1969 | 0 | return true; |
1970 | 0 | } |
1971 | 0 | default: |
1972 | 0 | return false; |
1973 | 0 | } |
1974 | 0 | } |
1975 | | |
1976 | 12.6k | bool SegmentIterator::_has_char_type(const StorageField& column_desc) { |
1977 | 12.6k | switch (column_desc.type()) { |
1978 | 0 | case FieldType::OLAP_FIELD_TYPE_CHAR: |
1979 | 0 | return true; |
1980 | 2 | case FieldType::OLAP_FIELD_TYPE_ARRAY: |
1981 | 2 | return _has_char_type(*column_desc.get_sub_field(0)); |
1982 | 0 | case FieldType::OLAP_FIELD_TYPE_MAP: |
1983 | 0 | return _has_char_type(*column_desc.get_sub_field(0)) || |
1984 | 0 | _has_char_type(*column_desc.get_sub_field(1)); |
1985 | 0 | case FieldType::OLAP_FIELD_TYPE_STRUCT: |
1986 | 0 | for (int idx = 0; idx < column_desc.get_sub_field_count(); ++idx) { |
1987 | 0 | if (_has_char_type(*column_desc.get_sub_field(idx))) { |
1988 | 0 | return true; |
1989 | 0 | } |
1990 | 0 | } |
1991 | 0 | return false; |
1992 | 12.6k | default: |
1993 | 12.6k | return false; |
1994 | 12.6k | } |
1995 | 12.6k | }; |
1996 | | |
1997 | 5.63k | void SegmentIterator::_vec_init_char_column_id(Block* block) { |
1998 | 5.63k | if (!_char_type_idx.empty()) { |
1999 | 0 | return; |
2000 | 0 | } |
2001 | 5.63k | _is_char_type.resize(_schema->columns().size(), false); |
2002 | 18.3k | for (size_t i = 0; i < _schema->num_column_ids(); i++) { |
2003 | 12.6k | auto cid = _schema->column_id(i); |
2004 | 12.6k | const StorageField* column_desc = _schema->column(cid); |
2005 | | |
2006 | | // The additional deleted filter condition will be in the materialized column at the end of the block. |
2007 | | // After _output_column_by_sel_idx, it will be erased, so we do not need to shrink it. |
2008 | 12.6k | if (i < block->columns()) { |
2009 | 12.6k | if (_has_char_type(*column_desc)) { |
2010 | 0 | _char_type_idx.emplace_back(i); |
2011 | 0 | } |
2012 | 12.6k | } |
2013 | | |
2014 | 12.6k | if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) { |
2015 | 0 | _is_char_type[cid] = true; |
2016 | 0 | } |
2017 | 12.6k | } |
2018 | 5.63k | } |
2019 | | |
2020 | | bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, bool fill_defaults, |
2021 | 43.4k | size_t num_of_defaults) { |
2022 | 43.4k | if (_need_read_data(cid)) { |
2023 | 43.4k | return false; |
2024 | 43.4k | } |
2025 | 0 | if (!fill_defaults) { |
2026 | 0 | return true; |
2027 | 0 | } |
2028 | 0 | if (column->is_nullable()) { |
2029 | 0 | auto nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get()); |
2030 | 0 | nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults); |
2031 | 0 | nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults); |
2032 | 0 | } else { |
2033 | | // assert(column->is_const()); |
2034 | 0 | column->insert_many_defaults(num_of_defaults); |
2035 | 0 | } |
2036 | 0 | return true; |
2037 | 0 | } |
2038 | | |
2039 | | Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids, |
2040 | 0 | MutableColumns& column_block, size_t nrows) { |
2041 | 0 | for (auto cid : column_ids) { |
2042 | 0 | auto& column = column_block[cid]; |
2043 | 0 | size_t rows_read = nrows; |
2044 | 0 | if (_prune_column(cid, column, true, rows_read)) { |
2045 | 0 | continue; |
2046 | 0 | } |
2047 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); |
2048 | 0 | if (nrows != rows_read) { |
2049 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows, |
2050 | 0 | rows_read); |
2051 | 0 | } |
2052 | 0 | } |
2053 | 0 | return Status::OK(); |
2054 | 0 | } |
2055 | | |
2056 | | Status SegmentIterator::_init_current_block(Block* block, |
2057 | | std::vector<MutableColumnPtr>& current_columns, |
2058 | 23.1k | uint32_t nrows_read_limit) { |
2059 | 23.1k | block->clear_column_data(_schema->num_column_ids()); |
2060 | | |
2061 | 67.5k | for (size_t i = 0; i < _schema->num_column_ids(); i++) { |
2062 | 44.4k | auto cid = _schema->column_id(i); |
2063 | 44.4k | const auto* column_desc = _schema->column(cid); |
2064 | | |
2065 | 44.4k | auto file_column_type = _storage_name_and_type[cid].second; |
2066 | 44.4k | auto expected_type = Schema::get_data_type_ptr(*column_desc); |
2067 | 44.4k | if (!_is_pred_column[cid] && !file_column_type->equals(*expected_type)) { |
2068 | | // The storage layer type is different from schema needed type, so we use storage |
2069 | | // type to read columns instead of schema type for safety |
2070 | 0 | VLOG_DEBUG << fmt::format( |
2071 | 0 | "Recreate column with expected type {}, file column type {}, col_name {}, " |
2072 | 0 | "col_path {}", |
2073 | 0 | block->get_by_position(i).type->get_name(), file_column_type->get_name(), |
2074 | 0 | column_desc->name(), |
2075 | 0 | column_desc->path() == nullptr ? "" : column_desc->path()->get_path()); |
2076 | | // TODO reuse |
2077 | 0 | current_columns[cid] = file_column_type->create_column(); |
2078 | 0 | current_columns[cid]->reserve(nrows_read_limit); |
2079 | 44.4k | } else { |
2080 | | // the column in block must clear() here to insert new data |
2081 | 44.4k | if (_is_pred_column[cid] || |
2082 | 44.4k | i >= block->columns()) { //todo(wb) maybe we can release it after output block |
2083 | 4.66k | if (current_columns[cid].get() == nullptr) { |
2084 | 0 | return Status::InternalError( |
2085 | 0 | "SegmentIterator meet invalid column, id={}, name={}", cid, |
2086 | 0 | _schema->column(cid)->name()); |
2087 | 0 | } |
2088 | 4.66k | current_columns[cid]->clear(); |
2089 | 39.7k | } else { // non-predicate column |
2090 | 39.7k | current_columns[cid] = std::move(*block->get_by_position(i).column).mutate(); |
2091 | 39.7k | current_columns[cid]->reserve(nrows_read_limit); |
2092 | 39.7k | } |
2093 | 44.4k | } |
2094 | 44.4k | } |
2095 | | |
2096 | 23.1k | for (auto entry : _virtual_column_exprs) { |
2097 | 0 | auto cid = entry.first; |
2098 | 0 | current_columns[cid] = ColumnNothing::create(0); |
2099 | 0 | current_columns[cid]->reserve(nrows_read_limit); |
2100 | 0 | } |
2101 | | |
2102 | 23.1k | return Status::OK(); |
2103 | 23.1k | } |
2104 | | |
2105 | 17.5k | Status SegmentIterator::_output_non_pred_columns(Block* block) { |
2106 | 17.5k | SCOPED_RAW_TIMER(&_opts.stats->output_col_ns); |
2107 | 17.5k | VLOG_DEBUG << fmt::format( |
2108 | 0 | "Output non-predicate columns, _non_predicate_columns: [{}], " |
2109 | 0 | "_schema_block_id_map: [{}]", |
2110 | 0 | fmt::join(_non_predicate_columns, ","), fmt::join(_schema_block_id_map, ",")); |
2111 | 17.5k | RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns)); |
2112 | 27.5k | for (auto cid : _non_predicate_columns) { |
2113 | 27.5k | auto loc = _schema_block_id_map[cid]; |
2114 | | // Whether a delete predicate column gets output depends on how the caller builds |
2115 | | // the block passed to next_batch(). Both calling paths now build the block with |
2116 | | // only the output schema (return_columns), so delete predicate columns are skipped: |
2117 | | // |
2118 | | // 1) VMergeIterator path: block_reset() builds _block using the output schema |
2119 | | // (return_columns only), e.g. block has 2 columns {c1, c2}. |
2120 | | // Here loc=2 for delete predicate c3, block->columns()=2, so loc < block->columns() |
2121 | | // is false, and c3 is skipped. |
2122 | | // |
2123 | | // 2) VUnionIterator path: the caller's block is built with only return_columns |
2124 | | // (output schema), e.g. block has 2 columns {c1, c2}. |
2125 | | // Here loc=2 for c3, block->columns()=2, so loc < block->columns() is false, |
2126 | | // and c3 is skipped — same behavior as the VMergeIterator path. |
2127 | 27.5k | if (loc < block->columns()) { |
2128 | 27.5k | bool column_in_block_is_nothing = check_and_get_column<const ColumnNothing>( |
2129 | 27.5k | block->get_by_position(loc).column.get()); |
2130 | 27.5k | bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid); |
2131 | 27.5k | bool return_column_is_nothing = |
2132 | 27.5k | check_and_get_column<const ColumnNothing>(_current_return_columns[cid].get()); |
2133 | 27.5k | VLOG_DEBUG << fmt::format( |
2134 | 0 | "Cid {} loc {}, column_in_block_is_nothing {}, column_is_normal {}, " |
2135 | 0 | "return_column_is_nothing {}", |
2136 | 0 | cid, loc, column_in_block_is_nothing, column_is_normal, |
2137 | 0 | return_column_is_nothing); |
2138 | | |
2139 | 27.5k | if (column_in_block_is_nothing || column_is_normal) { |
2140 | 27.5k | block->replace_by_position(loc, std::move(_current_return_columns[cid])); |
2141 | 27.5k | VLOG_DEBUG << fmt::format( |
2142 | 0 | "Output non-predicate column, cid: {}, loc: {}, col_name: {}, rows {}", cid, |
2143 | 0 | loc, _schema->column(cid)->name(), |
2144 | 0 | block->get_by_position(loc).column->size()); |
2145 | 27.5k | } |
2146 | | // Means virtual column in block has been materialized(maybe by common expr). |
2147 | | // so do nothing here. |
2148 | 27.5k | } |
2149 | 27.5k | } |
2150 | 17.5k | return Status::OK(); |
2151 | 17.5k | } |
2152 | | |
2153 | | /** |
2154 | | * Reads columns by their index, handling both continuous and discontinuous rowid scenarios. |
2155 | | * |
2156 | | * This function is designed to read a specified number of rows (up to nrows_read_limit) |
2157 | | * from the segment iterator, dealing with both continuous and discontinuous rowid arrays. |
2158 | | * It operates as follows: |
2159 | | * |
2160 | | * 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous. |
2161 | | * Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...). |
2162 | | * |
2163 | | * 2. For each column that needs to be read (identified by _predicate_column_ids): |
2164 | | * - If the rowids are continuous, the function uses seek_to_ordinal and next_batch |
2165 | | * for efficient reading. |
2166 | | * - If the rowids are not continuous, the function processes them in smaller batches |
2167 | | * (each of size up to 256). Each batch is checked for internal continuity: |
2168 | | * a. If a batch is continuous, uses seek_to_ordinal and next_batch for that batch. |
2169 | | * b. If a batch is not continuous, uses read_by_rowids for individual rowids in the batch. |
2170 | | * |
2171 | | * This approach optimizes reading performance by leveraging batch processing for continuous |
2172 | | * rowid sequences and handling discontinuities gracefully in smaller chunks. |
2173 | | */ |
2174 | 23.1k | Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read) { |
2175 | 23.1k | SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns); |
2176 | | |
2177 | 23.1k | nrows_read = (uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit); |
2178 | 23.1k | bool is_continuous = (nrows_read > 1) && |
2179 | 23.1k | (_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1); |
2180 | 23.1k | VLOG_DEBUG << fmt::format( |
2181 | 0 | "nrows_read from range iterator: {}, is_continus {}, _cols_read_by_column_predicate " |
2182 | 0 | "[{}]", |
2183 | 0 | nrows_read, is_continuous, fmt::join(_predicate_column_ids, ",")); |
2184 | | |
2185 | 23.1k | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
2186 | 0 | "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, continuous: {}, " |
2187 | 0 | "rowids: [{}...{}]", |
2188 | 0 | nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0, |
2189 | 0 | nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0); |
2190 | 39.6k | for (auto cid : _predicate_column_ids) { |
2191 | 39.6k | auto& column = _current_return_columns[cid]; |
2192 | 39.6k | VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid, |
2193 | 0 | _schema->column(cid)->name()); |
2194 | 39.6k | if (!_virtual_column_exprs.contains(cid)) { |
2195 | 39.6k | if (_no_need_read_key_data(cid, column, nrows_read)) { |
2196 | 0 | VLOG_DEBUG << fmt::format("Column {} no need to read.", cid); |
2197 | 0 | continue; |
2198 | 0 | } |
2199 | 39.6k | if (_prune_column(cid, column, true, nrows_read)) { |
2200 | 0 | VLOG_DEBUG << fmt::format("Column {} is pruned. No need to read data.", cid); |
2201 | 0 | continue; |
2202 | 0 | } |
2203 | 39.6k | DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", { |
2204 | 39.6k | auto col_name = _opts.tablet_schema->column(cid).name(); |
2205 | 39.6k | auto debug_col_name = |
2206 | 39.6k | DebugPoints::instance()->get_debug_param_or_default<std::string>( |
2207 | 39.6k | "segment_iterator._read_columns_by_index", "column_name", ""); |
2208 | 39.6k | if (debug_col_name.empty() && col_name != "__DORIS_DELETE_SIGN__") { |
2209 | 39.6k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2210 | 39.6k | "does not need to read data, {}", col_name); |
2211 | 39.6k | } |
2212 | 39.6k | if (debug_col_name.find(col_name) != std::string::npos) { |
2213 | 39.6k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2214 | 39.6k | "does not need to read data, {}", col_name); |
2215 | 39.6k | } |
2216 | 39.6k | }) |
2217 | 39.6k | } |
2218 | | |
2219 | 39.6k | if (is_continuous) { |
2220 | 27.5k | size_t rows_read = nrows_read; |
2221 | 27.5k | _opts.stats->predicate_column_read_seek_num += 1; |
2222 | 27.5k | if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { |
2223 | 0 | SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns); |
2224 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0])); |
2225 | 27.5k | } else { |
2226 | 27.5k | RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0])); |
2227 | 27.5k | } |
2228 | 27.5k | RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); |
2229 | 27.5k | if (rows_read != nrows_read) { |
2230 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", |
2231 | 0 | nrows_read, rows_read); |
2232 | 0 | } |
2233 | 27.5k | } else { |
2234 | 12.1k | const uint32_t batch_size = _range_iter->get_batch_size(); |
2235 | 12.1k | uint32_t processed = 0; |
2236 | 13.3k | while (processed < nrows_read) { |
2237 | 1.19k | uint32_t current_batch_size = std::min(batch_size, nrows_read - processed); |
2238 | 1.19k | bool batch_continuous = (current_batch_size > 1) && |
2239 | 1.19k | (_block_rowids[processed + current_batch_size - 1] - |
2240 | 1.17k | _block_rowids[processed] == |
2241 | 1.17k | current_batch_size - 1); |
2242 | | |
2243 | 1.19k | if (batch_continuous) { |
2244 | 0 | size_t rows_read = current_batch_size; |
2245 | 0 | _opts.stats->predicate_column_read_seek_num += 1; |
2246 | 0 | if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { |
2247 | 0 | SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns); |
2248 | 0 | RETURN_IF_ERROR( |
2249 | 0 | _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed])); |
2250 | 0 | } else { |
2251 | 0 | RETURN_IF_ERROR( |
2252 | 0 | _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed])); |
2253 | 0 | } |
2254 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); |
2255 | 0 | if (rows_read != current_batch_size) { |
2256 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2257 | 0 | "batch nrows({}) != rows_read({})", current_batch_size, rows_read); |
2258 | 0 | } |
2259 | 1.19k | } else { |
2260 | 1.19k | RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids( |
2261 | 1.19k | &_block_rowids[processed], current_batch_size, column)); |
2262 | 1.19k | } |
2263 | 1.19k | processed += current_batch_size; |
2264 | 1.19k | } |
2265 | 12.1k | } |
2266 | 39.6k | } |
2267 | | |
2268 | 23.1k | return Status::OK(); |
2269 | 23.1k | } |
2270 | | void SegmentIterator::_replace_version_col_if_needed(const std::vector<ColumnId>& column_ids, |
2271 | 26.1k | size_t num_rows) { |
2272 | | // Only the rowset with single version need to replace the version column. |
2273 | | // Doris can't determine the version before publish_version finished, so |
2274 | | // we can't write data to __DORIS_VERSION_COL__ in segment writer, the value |
2275 | | // is 0 by default. |
2276 | | // So we need to replace the value to real version while reading. |
2277 | 26.1k | if (_opts.version.first != _opts.version.second) { |
2278 | 9.41k | return; |
2279 | 9.41k | } |
2280 | 16.7k | int32_t version_idx = _schema->version_col_idx(); |
2281 | 16.7k | if (std::ranges::find(column_ids, version_idx) == column_ids.end()) { |
2282 | 16.7k | return; |
2283 | 16.7k | } |
2284 | | |
2285 | 0 | const auto* column_desc = _schema->column(version_idx); |
2286 | 0 | auto column = Schema::get_data_type_ptr(*column_desc)->create_column(); |
2287 | 0 | DCHECK(_schema->column(version_idx)->type() == FieldType::OLAP_FIELD_TYPE_BIGINT); |
2288 | 0 | auto* col_ptr = assert_cast<ColumnInt64*>(column.get()); |
2289 | 0 | for (size_t j = 0; j < num_rows; j++) { |
2290 | 0 | col_ptr->insert_value(_opts.version.second); |
2291 | 0 | } |
2292 | 0 | _current_return_columns[version_idx] = std::move(column); |
2293 | 0 | VLOG_DEBUG << "replaced version column in segment iterator, version_col_idx:" << version_idx; |
2294 | 0 | } |
2295 | | |
2296 | | uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, |
2297 | 4.19k | uint16_t selected_size) { |
2298 | 4.19k | SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns); |
2299 | 4.19k | bool all_pred_always_true = true; |
2300 | 4.19k | for (const auto& pred : _pre_eval_block_predicate) { |
2301 | 0 | if (!pred->always_true()) { |
2302 | 0 | all_pred_always_true = false; |
2303 | 0 | } else { |
2304 | 0 | pred->update_filter_info(0, 0, selected_size); |
2305 | 0 | } |
2306 | 0 | } |
2307 | | |
2308 | 4.19k | const uint16_t original_size = selected_size; |
2309 | | //If all predicates are always_true, then return directly. |
2310 | 4.19k | if (all_pred_always_true || !_is_need_vec_eval) { |
2311 | 3.91M | for (uint16_t i = 0; i < original_size; ++i) { |
2312 | 3.90M | sel_rowid_idx[i] = i; |
2313 | 3.90M | } |
2314 | | // All preds are always_true, so return immediately and update the profile statistics here. |
2315 | 4.19k | _opts.stats->vec_cond_input_rows += original_size; |
2316 | 4.19k | return original_size; |
2317 | 4.19k | } |
2318 | | |
2319 | 0 | _ret_flags.resize(original_size); |
2320 | 0 | DCHECK(!_pre_eval_block_predicate.empty()); |
2321 | 0 | bool is_first = true; |
2322 | 0 | for (auto& pred : _pre_eval_block_predicate) { |
2323 | 0 | if (pred->always_true()) { |
2324 | 0 | continue; |
2325 | 0 | } |
2326 | 0 | auto column_id = pred->column_id(); |
2327 | 0 | auto& column = _current_return_columns[column_id]; |
2328 | 0 | if (is_first) { |
2329 | 0 | pred->evaluate_vec(*column, original_size, (bool*)_ret_flags.data()); |
2330 | 0 | is_first = false; |
2331 | 0 | } else { |
2332 | 0 | pred->evaluate_and_vec(*column, original_size, (bool*)_ret_flags.data()); |
2333 | 0 | } |
2334 | 0 | } |
2335 | |
|
2336 | 0 | uint16_t new_size = 0; |
2337 | |
|
2338 | 0 | uint16_t sel_pos = 0; |
2339 | 0 | const uint16_t sel_end = sel_pos + selected_size; |
2340 | 0 | static constexpr size_t SIMD_BYTES = simd::bits_mask_length(); |
2341 | 0 | const uint16_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES; |
2342 | |
|
2343 | 0 | while (sel_pos < sel_end_simd) { |
2344 | 0 | auto mask = simd::bytes_mask_to_bits_mask(_ret_flags.data() + sel_pos); |
2345 | 0 | if (0 == mask) { |
2346 | | //pass |
2347 | 0 | } else if (simd::bits_mask_all() == mask) { |
2348 | 0 | for (uint16_t i = 0; i < SIMD_BYTES; i++) { |
2349 | 0 | sel_rowid_idx[new_size++] = sel_pos + i; |
2350 | 0 | } |
2351 | 0 | } else { |
2352 | 0 | simd::iterate_through_bits_mask( |
2353 | 0 | [&](const int bit_pos) { |
2354 | 0 | sel_rowid_idx[new_size++] = sel_pos + (uint16_t)bit_pos; |
2355 | 0 | }, |
2356 | 0 | mask); |
2357 | 0 | } |
2358 | 0 | sel_pos += SIMD_BYTES; |
2359 | 0 | } |
2360 | |
|
2361 | 0 | for (; sel_pos < sel_end; sel_pos++) { |
2362 | 0 | if (_ret_flags[sel_pos]) { |
2363 | 0 | sel_rowid_idx[new_size++] = sel_pos; |
2364 | 0 | } |
2365 | 0 | } |
2366 | |
|
2367 | 0 | _opts.stats->vec_cond_input_rows += original_size; |
2368 | 0 | _opts.stats->rows_vec_cond_filtered += original_size - new_size; |
2369 | 0 | return new_size; |
2370 | 4.19k | } |
2371 | | |
2372 | | uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx, |
2373 | 4.19k | uint16_t selected_size) { |
2374 | 4.19k | SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns); |
2375 | 4.19k | if (!_is_need_short_eval) { |
2376 | 0 | return selected_size; |
2377 | 0 | } |
2378 | | |
2379 | 4.19k | uint16_t original_size = selected_size; |
2380 | 4.19k | for (auto predicate : _short_cir_eval_predicate) { |
2381 | 0 | auto column_id = predicate->column_id(); |
2382 | 0 | auto& short_cir_column = _current_return_columns[column_id]; |
2383 | 0 | selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size); |
2384 | 0 | } |
2385 | | |
2386 | 4.19k | _opts.stats->short_circuit_cond_input_rows += original_size; |
2387 | 4.19k | _opts.stats->rows_short_circuit_cond_filtered += original_size - selected_size; |
2388 | | |
2389 | | // evaluate delete condition |
2390 | 4.19k | original_size = selected_size; |
2391 | 4.19k | selected_size = _opts.delete_condition_predicates->evaluate(_current_return_columns, |
2392 | 4.19k | vec_sel_rowid_idx, selected_size); |
2393 | 4.19k | _opts.stats->rows_vec_del_cond_filtered += original_size - selected_size; |
2394 | 4.19k | return selected_size; |
2395 | 4.19k | } |
2396 | | |
2397 | | Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids, |
2398 | | std::vector<rowid_t>& rowid_vector, |
2399 | | uint16_t* sel_rowid_idx, size_t select_size, |
2400 | | MutableColumns* mutable_columns, |
2401 | 3.05k | bool init_condition_cache) { |
2402 | 3.05k | SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns); |
2403 | 3.05k | std::vector<rowid_t> rowids(select_size); |
2404 | | |
2405 | 3.05k | if (init_condition_cache) { |
2406 | 0 | DCHECK(_condition_cache); |
2407 | 0 | auto& condition_cache = *_condition_cache; |
2408 | 0 | for (size_t i = 0; i < select_size; ++i) { |
2409 | 0 | rowids[i] = rowid_vector[sel_rowid_idx[i]]; |
2410 | 0 | condition_cache[rowids[i] / SegmentIterator::CONDITION_CACHE_OFFSET] = true; |
2411 | 0 | } |
2412 | 3.05k | } else { |
2413 | 2.74M | for (size_t i = 0; i < select_size; ++i) { |
2414 | 2.74M | rowids[i] = rowid_vector[sel_rowid_idx[i]]; |
2415 | 2.74M | } |
2416 | 3.05k | } |
2417 | | |
2418 | 3.77k | for (auto cid : read_column_ids) { |
2419 | 3.77k | auto& colunm = (*mutable_columns)[cid]; |
2420 | 3.77k | if (_no_need_read_key_data(cid, colunm, select_size)) { |
2421 | 0 | continue; |
2422 | 0 | } |
2423 | 3.77k | if (_prune_column(cid, colunm, true, select_size)) { |
2424 | 0 | continue; |
2425 | 0 | } |
2426 | | |
2427 | 3.77k | DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", { |
2428 | 3.77k | auto debug_col_name = DebugPoints::instance()->get_debug_param_or_default<std::string>( |
2429 | 3.77k | "segment_iterator._read_columns_by_index", "column_name", ""); |
2430 | 3.77k | if (debug_col_name.empty()) { |
2431 | 3.77k | return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data"); |
2432 | 3.77k | } |
2433 | 3.77k | auto col_name = _opts.tablet_schema->column(cid).name(); |
2434 | 3.77k | if (debug_col_name.find(col_name) != std::string::npos) { |
2435 | 3.77k | return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data, {}", |
2436 | 3.77k | debug_col_name); |
2437 | 3.77k | } |
2438 | 3.77k | }) |
2439 | | |
2440 | 3.77k | if (_current_return_columns[cid].get() == nullptr) { |
2441 | 0 | return Status::InternalError( |
2442 | 0 | "SegmentIterator meet invalid column, return columns size {}, cid {}", |
2443 | 0 | _current_return_columns.size(), cid); |
2444 | 0 | } |
2445 | 3.77k | RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, |
2446 | 3.77k | _current_return_columns[cid])); |
2447 | 3.77k | } |
2448 | | |
2449 | 3.05k | return Status::OK(); |
2450 | 3.05k | } |
2451 | | |
2452 | 23.1k | Status SegmentIterator::next_batch(Block* block) { |
2453 | | // Replace virtual columns with ColumnNothing at the begining of each next_batch call. |
2454 | 23.1k | _init_virtual_columns(block); |
2455 | 23.1k | auto status = [&]() { |
2456 | 23.1k | RETURN_IF_CATCH_EXCEPTION({ |
2457 | 23.1k | auto res = _next_batch_internal(block); |
2458 | | |
2459 | 23.1k | if (res.is<END_OF_FILE>()) { |
2460 | | // Since we have a type check at the caller. |
2461 | | // So a replacement of nothing column with real column is needed. |
2462 | 23.1k | const auto& idx_to_datatype = _opts.vir_col_idx_to_type; |
2463 | 23.1k | for (const auto& pair : _vir_cid_to_idx_in_block) { |
2464 | 23.1k | size_t idx = pair.second; |
2465 | 23.1k | auto type = idx_to_datatype.find(idx)->second; |
2466 | 23.1k | block->replace_by_position(idx, type->create_column()); |
2467 | 23.1k | } |
2468 | | |
2469 | 23.1k | if (_opts.condition_cache_digest && !_find_condition_cache) { |
2470 | 23.1k | auto* condition_cache = ConditionCache::instance(); |
2471 | 23.1k | ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(), |
2472 | 23.1k | _opts.condition_cache_digest); |
2473 | 23.1k | VLOG_DEBUG << "Condition cache insert, query id: " |
2474 | 23.1k | << print_id(_opts.runtime_state->query_id()) |
2475 | 23.1k | << ", rowset id: " << _opts.rowset_id.to_string() |
2476 | 23.1k | << ", segment id: " << _segment->id() |
2477 | 23.1k | << ", cache digest: " << _opts.condition_cache_digest; |
2478 | 23.1k | condition_cache->insert(cache_key, std::move(_condition_cache)); |
2479 | 23.1k | } |
2480 | 23.1k | return res; |
2481 | 23.1k | } |
2482 | | |
2483 | 23.1k | RETURN_IF_ERROR(res); |
2484 | | // reverse block row order if read_orderby_key_reverse is true for key topn |
2485 | | // it should be processed for all success _next_batch_internal |
2486 | 23.1k | if (_opts.read_orderby_key_reverse) { |
2487 | 23.1k | size_t num_rows = block->rows(); |
2488 | 23.1k | if (num_rows == 0) { |
2489 | 23.1k | return Status::OK(); |
2490 | 23.1k | } |
2491 | 23.1k | size_t num_columns = block->columns(); |
2492 | 23.1k | IColumn::Permutation permutation; |
2493 | 23.1k | for (size_t i = 0; i < num_rows; ++i) permutation.emplace_back(num_rows - 1 - i); |
2494 | | |
2495 | 23.1k | for (size_t i = 0; i < num_columns; ++i) |
2496 | 23.1k | block->get_by_position(i).column = |
2497 | 23.1k | block->get_by_position(i).column->permute(permutation, num_rows); |
2498 | 23.1k | } |
2499 | | |
2500 | 23.1k | RETURN_IF_ERROR(block->check_type_and_column()); |
2501 | | |
2502 | 23.1k | return Status::OK(); |
2503 | 23.1k | }); |
2504 | 23.1k | }(); |
2505 | | |
2506 | | // if rows read by batch is 0, will return end of file, we should not remove segment cache in this situation. |
2507 | 23.1k | if (!status.ok() && !status.is<END_OF_FILE>()) { |
2508 | 0 | _segment->update_healthy_status(status); |
2509 | 0 | } |
2510 | 23.1k | return status; |
2511 | 23.1k | } |
2512 | | |
2513 | 23.1k | Status SegmentIterator::_convert_to_expected_type(const std::vector<ColumnId>& col_ids) { |
2514 | 40.2k | for (ColumnId i : col_ids) { |
2515 | 40.2k | if (!_current_return_columns[i] || _converted_column_ids[i] || _is_pred_column[i]) { |
2516 | 467 | continue; |
2517 | 467 | } |
2518 | 39.7k | const StorageField* field_type = _schema->column(i); |
2519 | 39.7k | DataTypePtr expected_type = Schema::get_data_type_ptr(*field_type); |
2520 | 39.7k | DataTypePtr file_column_type = _storage_name_and_type[i].second; |
2521 | 39.7k | if (!file_column_type->equals(*expected_type)) { |
2522 | 0 | ColumnPtr expected; |
2523 | 0 | ColumnPtr original = _current_return_columns[i]->assume_mutable()->get_ptr(); |
2524 | 0 | RETURN_IF_ERROR(variant_util::cast_column({original, file_column_type, ""}, |
2525 | 0 | expected_type, &expected)); |
2526 | 0 | _current_return_columns[i] = expected->assume_mutable(); |
2527 | 0 | _converted_column_ids[i] = true; |
2528 | 0 | VLOG_DEBUG << fmt::format( |
2529 | 0 | "Convert {} fom file column type {} to {}, num_rows {}", |
2530 | 0 | field_type->path() == nullptr ? "" : field_type->path()->get_path(), |
2531 | 0 | file_column_type->get_name(), expected_type->get_name(), |
2532 | 0 | _current_return_columns[i]->size()); |
2533 | 0 | } |
2534 | 39.7k | } |
2535 | 23.1k | return Status::OK(); |
2536 | 23.1k | } |
2537 | | |
2538 | | Status SegmentIterator::copy_column_data_by_selector(IColumn* input_col_ptr, |
2539 | | MutableColumnPtr& output_col, |
2540 | | uint16_t* sel_rowid_idx, uint16_t select_size, |
2541 | 2.83k | size_t batch_size) { |
2542 | 2.83k | if (output_col->is_nullable() != input_col_ptr->is_nullable()) { |
2543 | 0 | LOG(WARNING) << "nullable mismatch for output_column: " << output_col->dump_structure() |
2544 | 0 | << " input_column: " << input_col_ptr->dump_structure() |
2545 | 0 | << " select_size: " << select_size; |
2546 | 0 | return Status::RuntimeError("copy_column_data_by_selector nullable mismatch"); |
2547 | 0 | } |
2548 | 2.83k | output_col->reserve(select_size); |
2549 | 2.83k | return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col.get()); |
2550 | 2.83k | } |
2551 | | |
2552 | 23.1k | Status SegmentIterator::_next_batch_internal(Block* block) { |
2553 | 23.1k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch); |
2554 | | |
2555 | 23.1k | bool is_mem_reuse = block->mem_reuse(); |
2556 | 23.1k | DCHECK(is_mem_reuse); |
2557 | | |
2558 | 23.1k | RETURN_IF_ERROR(_lazy_init(block)); |
2559 | | |
2560 | 23.1k | SCOPED_RAW_TIMER(&_opts.stats->block_load_ns); |
2561 | | |
2562 | | // If the row bitmap size is smaller than nrows_read_limit, there's no need to reserve that many column rows. |
2563 | 23.1k | uint32_t nrows_read_limit = |
2564 | 23.1k | std::min(cast_set<uint32_t>(_row_bitmap.cardinality()), _opts.block_row_max); |
2565 | 23.1k | if (_can_opt_topn_reads()) { |
2566 | 0 | nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit), nrows_read_limit); |
2567 | 0 | } |
2568 | 23.1k | DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", { |
2569 | 23.1k | if (nrows_read_limit != 1) { |
2570 | 23.1k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2571 | 23.1k | "topn opt 1 execute failed: nrows_read_limit={}, _opts.topn_limit={}", |
2572 | 23.1k | nrows_read_limit, _opts.topn_limit); |
2573 | 23.1k | } |
2574 | 23.1k | }) |
2575 | | |
2576 | 23.1k | RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit)); |
2577 | 23.1k | _converted_column_ids.assign(_schema->columns().size(), false); |
2578 | | |
2579 | 23.1k | _selected_size = 0; |
2580 | 23.1k | RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size)); |
2581 | 23.1k | _replace_version_col_if_needed(_predicate_column_ids, _selected_size); |
2582 | | |
2583 | 23.1k | _opts.stats->blocks_load += 1; |
2584 | 23.1k | _opts.stats->raw_rows_read += _selected_size; |
2585 | | |
2586 | 23.1k | if (_selected_size == 0) { |
2587 | 5.62k | return _process_eof(block); |
2588 | 5.62k | } |
2589 | | |
2590 | 17.5k | if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) { |
2591 | 4.19k | _sel_rowid_idx.resize(_selected_size); |
2592 | | |
2593 | 4.19k | if (_is_need_vec_eval || _is_need_short_eval) { |
2594 | 4.19k | _convert_dict_code_for_predicate_if_necessary(); |
2595 | | |
2596 | | // step 1: evaluate vectorization predicate |
2597 | 4.19k | _selected_size = |
2598 | 4.19k | _evaluate_vectorization_predicate(_sel_rowid_idx.data(), _selected_size); |
2599 | | |
2600 | | // step 2: evaluate short circuit predicate |
2601 | | // todo(wb) research whether need to read short predicate after vectorization evaluation |
2602 | | // to reduce cost of read short circuit columns. |
2603 | | // In SSB test, it make no difference; So need more scenarios to test |
2604 | 4.19k | _selected_size = |
2605 | 4.19k | _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), _selected_size); |
2606 | 4.19k | VLOG_DEBUG << fmt::format("After evaluate predicates, selected size: {} ", |
2607 | 0 | _selected_size); |
2608 | 4.19k | if (_selected_size > 0) { |
2609 | | // step 3.1: output short circuit and predicate column |
2610 | | // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) |
2611 | | // see _vec_init_lazy_materialization |
2612 | | // todo(wb) need to tell input columnids from output columnids |
2613 | 4.04k | RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids, |
2614 | 4.04k | _sel_rowid_idx.data(), _selected_size)); |
2615 | | |
2616 | | // step 3.2: read remaining expr column and evaluate it. |
2617 | 4.04k | if (_is_need_expr_eval) { |
2618 | | // The predicate column contains the remaining expr column, no need second read. |
2619 | 0 | if (_common_expr_column_ids.size() > 0) { |
2620 | 0 | SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns); |
2621 | 0 | RETURN_IF_ERROR(_read_columns_by_rowids( |
2622 | 0 | _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(), |
2623 | 0 | _selected_size, &_current_return_columns)); |
2624 | 0 | _replace_version_col_if_needed(_common_expr_column_ids, _selected_size); |
2625 | 0 | RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); |
2626 | 0 | } |
2627 | | |
2628 | 0 | DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]); |
2629 | 0 | RETURN_IF_ERROR( |
2630 | 0 | _process_common_expr(_sel_rowid_idx.data(), _selected_size, block)); |
2631 | 0 | } |
2632 | 4.04k | } else { |
2633 | 150 | _fill_column_nothing(); |
2634 | 150 | if (_is_need_expr_eval) { |
2635 | 0 | RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); |
2636 | 0 | } |
2637 | 150 | } |
2638 | 4.19k | } else if (_is_need_expr_eval) { |
2639 | 0 | DCHECK(!_predicate_column_ids.empty()); |
2640 | 0 | RETURN_IF_ERROR(_process_columns(_predicate_column_ids, block)); |
2641 | | // first read all rows are insert block, initialize sel_rowid_idx to all rows. |
2642 | 0 | for (uint16_t i = 0; i < _selected_size; ++i) { |
2643 | 0 | _sel_rowid_idx[i] = i; |
2644 | 0 | } |
2645 | 0 | RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), _selected_size, block)); |
2646 | 0 | } |
2647 | | |
2648 | | // step4: read non_predicate column |
2649 | 4.19k | if (_selected_size > 0) { |
2650 | 4.04k | if (!_non_predicate_columns.empty()) { |
2651 | 3.05k | RETURN_IF_ERROR(_read_columns_by_rowids( |
2652 | 3.05k | _non_predicate_columns, _block_rowids, _sel_rowid_idx.data(), |
2653 | 3.05k | _selected_size, &_current_return_columns, |
2654 | 3.05k | _opts.condition_cache_digest && !_find_condition_cache)); |
2655 | 3.05k | _replace_version_col_if_needed(_non_predicate_columns, _selected_size); |
2656 | 3.05k | } else { |
2657 | 995 | if (_opts.condition_cache_digest && !_find_condition_cache) { |
2658 | 0 | auto& condition_cache = *_condition_cache; |
2659 | 0 | for (size_t i = 0; i < _selected_size; ++i) { |
2660 | 0 | auto rowid = _block_rowids[_sel_rowid_idx[i]]; |
2661 | 0 | condition_cache[rowid / SegmentIterator::CONDITION_CACHE_OFFSET] = true; |
2662 | 0 | } |
2663 | 0 | } |
2664 | 995 | } |
2665 | 4.04k | } |
2666 | 4.19k | } |
2667 | | |
2668 | | // step5: output columns |
2669 | 17.5k | RETURN_IF_ERROR(_output_non_pred_columns(block)); |
2670 | | // Convert inverted index bitmaps to result columns for virtual column exprs |
2671 | | // (e.g., MATCH projections). This must run before _materialization_of_virtual_column |
2672 | | // so that fast_execute() can find the pre-computed result columns. |
2673 | 17.5k | if (!_virtual_column_exprs.empty()) { |
2674 | 0 | bool use_sel = _is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval; |
2675 | 0 | uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr; |
2676 | 0 | std::vector<VExprContext*> vir_ctxs; |
2677 | 0 | vir_ctxs.reserve(_virtual_column_exprs.size()); |
2678 | 0 | for (auto& [cid, ctx] : _virtual_column_exprs) { |
2679 | 0 | vir_ctxs.push_back(ctx.get()); |
2680 | 0 | } |
2681 | 0 | _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size, block); |
2682 | 0 | } |
2683 | 17.5k | RETURN_IF_ERROR(_materialization_of_virtual_column(block)); |
2684 | | // shrink char_type suffix zero data |
2685 | 17.5k | block->shrink_char_type_column_suffix_zero(_char_type_idx); |
2686 | 17.5k | return _check_output_block(block); |
2687 | 17.5k | } |
2688 | | |
2689 | 0 | Status SegmentIterator::_process_columns(const std::vector<ColumnId>& column_ids, Block* block) { |
2690 | 0 | RETURN_IF_ERROR(_convert_to_expected_type(column_ids)); |
2691 | 0 | for (auto cid : column_ids) { |
2692 | 0 | auto loc = _schema_block_id_map[cid]; |
2693 | 0 | block->replace_by_position(loc, std::move(_current_return_columns[cid])); |
2694 | 0 | } |
2695 | 0 | return Status::OK(); |
2696 | 0 | } |
2697 | | |
2698 | 150 | void SegmentIterator::_fill_column_nothing() { |
2699 | | // If column_predicate filters out all rows, the corresponding column in _current_return_columns[cid] must be a ColumnNothing. |
2700 | | // Because: |
2701 | | // 1. Before each batch, _init_return_columns is called to initialize _current_return_columns, and virtual columns in _current_return_columns are initialized as ColumnNothing. |
2702 | | // 2. When select_size == 0, the read method of VirtualColumnIterator will definitely not be called, so the corresponding Column remains a ColumnNothing |
2703 | 150 | for (const auto pair : _vir_cid_to_idx_in_block) { |
2704 | 0 | auto cid = pair.first; |
2705 | 0 | auto pos = pair.second; |
2706 | 0 | const auto* nothing_col = |
2707 | 0 | check_and_get_column<ColumnNothing>(_current_return_columns[cid].get()); |
2708 | 0 | DCHECK(nothing_col != nullptr) |
2709 | 0 | << fmt::format("ColumnNothing expected, but got {}, cid: {}, pos: {}", |
2710 | 0 | _current_return_columns[cid]->get_name(), cid, pos); |
2711 | 0 | _current_return_columns[cid] = _opts.vir_col_idx_to_type[pos]->create_column(); |
2712 | 0 | } |
2713 | 150 | } |
2714 | | |
2715 | 17.5k | Status SegmentIterator::_check_output_block(Block* block) { |
2716 | 17.5k | #ifndef NDEBUG |
2717 | 17.5k | size_t rows = block->rows(); |
2718 | 17.5k | size_t idx = 0; |
2719 | 30.5k | for (const auto& entry : *block) { |
2720 | 30.5k | if (!entry.column) { |
2721 | 0 | return Status::InternalError( |
2722 | 0 | "Column in idx {} is null, block columns {}, normal_columns {}, " |
2723 | 0 | "virtual_columns {}", |
2724 | 0 | idx, block->columns(), _schema->num_column_ids(), _virtual_column_exprs.size()); |
2725 | 30.5k | } else if (check_and_get_column<ColumnNothing>(entry.column.get())) { |
2726 | 0 | if (rows > 0) { |
2727 | 0 | std::vector<std::string> vcid_to_idx; |
2728 | 0 | for (const auto& pair : _vir_cid_to_idx_in_block) { |
2729 | 0 | vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); |
2730 | 0 | } |
2731 | 0 | std::string vir_cid_to_idx_in_block_msg = |
2732 | 0 | fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")); |
2733 | 0 | return Status::InternalError( |
2734 | 0 | "Column in idx {} is nothing, block columns {}, normal_columns {}, " |
2735 | 0 | "vir_cid_to_idx_in_block_msg {}", |
2736 | 0 | idx, block->columns(), _schema->num_column_ids(), |
2737 | 0 | vir_cid_to_idx_in_block_msg); |
2738 | 0 | } |
2739 | 30.5k | } else if (entry.column->size() != rows) { |
2740 | 0 | return Status::InternalError( |
2741 | 0 | "Unmatched size {}, expected {}, column: {}, type: {}, idx_in_block: {}, " |
2742 | 0 | "block: {}", |
2743 | 0 | entry.column->size(), rows, entry.column->get_name(), entry.type->get_name(), |
2744 | 0 | idx, block->dump_structure()); |
2745 | 0 | } |
2746 | 30.5k | idx++; |
2747 | 30.5k | } |
2748 | 17.5k | #endif |
2749 | 17.5k | return Status::OK(); |
2750 | 17.5k | } |
2751 | | |
2752 | 0 | Status SegmentIterator::_process_column_predicate() { |
2753 | 0 | return Status::OK(); |
2754 | 0 | } |
2755 | | |
2756 | 5.62k | Status SegmentIterator::_process_eof(Block* block) { |
2757 | | // Convert all columns in _current_return_columns to schema column |
2758 | 5.62k | RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids())); |
2759 | 18.0k | for (int i = 0; i < block->columns(); i++) { |
2760 | 12.4k | auto cid = _schema->column_id(i); |
2761 | 12.4k | if (!_is_pred_column[cid]) { |
2762 | 12.1k | block->replace_by_position(i, std::move(_current_return_columns[cid])); |
2763 | 12.1k | } |
2764 | 12.4k | } |
2765 | 5.62k | block->clear_column_data(); |
2766 | | // clear and release iterators memory footprint in advance |
2767 | 5.62k | _column_iterators.clear(); |
2768 | 5.62k | _index_iterators.clear(); |
2769 | 5.62k | return Status::EndOfFile("no more data in segment"); |
2770 | 5.62k | } |
2771 | | |
2772 | | Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, |
2773 | 0 | Block* block) { |
2774 | | // Here we just use col0 as row_number indicator. when reach here, we will calculate the predicates first. |
2775 | | // then use the result to reduce our data read(that is, expr push down). there's now row in block means the first |
2776 | | // column is not in common expr. so it's safe to replace it temporarily to provide correct `selected_size`. |
2777 | 0 | VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected size {}", block->rows(), |
2778 | 0 | _selected_size); |
2779 | |
|
2780 | 0 | bool need_mock_col = block->rows() != selected_size; |
2781 | 0 | MutableColumnPtr col0; |
2782 | 0 | if (need_mock_col) { |
2783 | 0 | col0 = std::move(*block->get_by_position(0).column).mutate(); |
2784 | 0 | block->replace_by_position( |
2785 | 0 | 0, block->get_by_position(0).type->create_column_const_with_default_value( |
2786 | 0 | _selected_size)); |
2787 | 0 | } |
2788 | |
|
2789 | 0 | std::vector<VExprContext*> common_ctxs; |
2790 | 0 | common_ctxs.reserve(_common_expr_ctxs_push_down.size()); |
2791 | 0 | for (auto& ctx : _common_expr_ctxs_push_down) { |
2792 | 0 | common_ctxs.push_back(ctx.get()); |
2793 | 0 | } |
2794 | 0 | _output_index_result_column(common_ctxs, _sel_rowid_idx.data(), _selected_size, block); |
2795 | 0 | block->shrink_char_type_column_suffix_zero(_char_type_idx); |
2796 | 0 | RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), _selected_size, block)); |
2797 | | |
2798 | 0 | if (need_mock_col) { |
2799 | 0 | block->replace_by_position(0, std::move(col0)); |
2800 | 0 | } |
2801 | |
|
2802 | 0 | VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, selected size {}", |
2803 | 0 | block->rows(), _selected_size); |
2804 | 0 | return Status::OK(); |
2805 | 0 | } |
2806 | | |
2807 | | Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, |
2808 | 0 | Block* block) { |
2809 | 0 | SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns); |
2810 | 0 | DCHECK(!_remaining_conjunct_roots.empty()); |
2811 | 0 | DCHECK(block->rows() != 0); |
2812 | 0 | int prev_columns = block->columns(); |
2813 | 0 | uint16_t original_size = selected_size; |
2814 | 0 | _opts.stats->expr_cond_input_rows += original_size; |
2815 | |
|
2816 | 0 | IColumn::Filter filter; |
2817 | 0 | RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( |
2818 | 0 | _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter)); |
2819 | | |
2820 | 0 | selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); |
2821 | 0 | _opts.stats->rows_expr_cond_filtered += original_size - selected_size; |
2822 | 0 | return Status::OK(); |
2823 | 0 | } |
2824 | | |
2825 | | uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx, |
2826 | | uint16_t selected_size, |
2827 | 0 | const IColumn::Filter& filter) { |
2828 | 0 | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
2829 | 0 | if (count == 0) { |
2830 | 0 | return 0; |
2831 | 0 | } else { |
2832 | 0 | const UInt8* filt_pos = filter.data(); |
2833 | |
|
2834 | 0 | uint16_t new_size = 0; |
2835 | 0 | uint32_t sel_pos = 0; |
2836 | 0 | const uint32_t sel_end = selected_size; |
2837 | 0 | static constexpr size_t SIMD_BYTES = simd::bits_mask_length(); |
2838 | 0 | const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES; |
2839 | |
|
2840 | 0 | while (sel_pos < sel_end_simd) { |
2841 | 0 | auto mask = simd::bytes_mask_to_bits_mask(filt_pos + sel_pos); |
2842 | 0 | if (0 == mask) { |
2843 | | //pass |
2844 | 0 | } else if (simd::bits_mask_all() == mask) { |
2845 | 0 | for (uint32_t i = 0; i < SIMD_BYTES; i++) { |
2846 | 0 | sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i]; |
2847 | 0 | } |
2848 | 0 | } else { |
2849 | 0 | simd::iterate_through_bits_mask( |
2850 | 0 | [&](const size_t bit_pos) { |
2851 | 0 | sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + bit_pos]; |
2852 | 0 | }, |
2853 | 0 | mask); |
2854 | 0 | } |
2855 | 0 | sel_pos += SIMD_BYTES; |
2856 | 0 | } |
2857 | |
|
2858 | 0 | for (; sel_pos < sel_end; sel_pos++) { |
2859 | 0 | if (filt_pos[sel_pos]) { |
2860 | 0 | sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos]; |
2861 | 0 | } |
2862 | 0 | } |
2863 | 0 | return new_size; |
2864 | 0 | } |
2865 | 0 | } |
2866 | | |
2867 | | void SegmentIterator::_output_index_result_column(const std::vector<VExprContext*>& expr_ctxs, |
2868 | | uint16_t* sel_rowid_idx, uint16_t select_size, |
2869 | 0 | Block* block) { |
2870 | 0 | SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer); |
2871 | 0 | if (block->rows() == 0) { |
2872 | 0 | return; |
2873 | 0 | } |
2874 | 0 | for (auto* expr_ctx_ptr : expr_ctxs) { |
2875 | 0 | auto index_ctx = expr_ctx_ptr->get_index_context(); |
2876 | 0 | if (index_ctx == nullptr) { |
2877 | 0 | continue; |
2878 | 0 | } |
2879 | 0 | for (auto& inverted_index_result_bitmap_for_expr : index_ctx->get_index_result_bitmap()) { |
2880 | 0 | const auto* expr = inverted_index_result_bitmap_for_expr.first; |
2881 | 0 | const auto& result_bitmap = inverted_index_result_bitmap_for_expr.second; |
2882 | 0 | const auto& index_result_bitmap = result_bitmap.get_data_bitmap(); |
2883 | 0 | auto index_result_column = ColumnUInt8::create(); |
2884 | 0 | ColumnUInt8::Container& vec_match_pred = index_result_column->get_data(); |
2885 | 0 | vec_match_pred.resize(block->rows()); |
2886 | 0 | std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0); |
2887 | |
|
2888 | 0 | const auto& null_bitmap = result_bitmap.get_null_bitmap(); |
2889 | 0 | bool has_null_bitmap = null_bitmap != nullptr && !null_bitmap->isEmpty(); |
2890 | 0 | bool expr_returns_nullable = expr->data_type()->is_nullable(); |
2891 | |
|
2892 | 0 | ColumnUInt8::MutablePtr null_map_column = nullptr; |
2893 | 0 | ColumnUInt8::Container* null_map_data = nullptr; |
2894 | 0 | if (has_null_bitmap && expr_returns_nullable) { |
2895 | 0 | null_map_column = ColumnUInt8::create(); |
2896 | 0 | auto& null_map_vec = null_map_column->get_data(); |
2897 | 0 | null_map_vec.resize(block->rows()); |
2898 | 0 | std::fill(null_map_vec.begin(), null_map_vec.end(), 0); |
2899 | 0 | null_map_data = &null_map_column->get_data(); |
2900 | 0 | } |
2901 | |
|
2902 | 0 | roaring::BulkContext bulk_context; |
2903 | 0 | for (uint32_t i = 0; i < select_size; i++) { |
2904 | 0 | auto rowid = sel_rowid_idx ? _block_rowids[sel_rowid_idx[i]] : _block_rowids[i]; |
2905 | 0 | if (index_result_bitmap) { |
2906 | 0 | vec_match_pred[i] = index_result_bitmap->containsBulk(bulk_context, rowid); |
2907 | 0 | } |
2908 | 0 | if (null_map_data != nullptr && null_bitmap->contains(rowid)) { |
2909 | 0 | (*null_map_data)[i] = 1; |
2910 | 0 | vec_match_pred[i] = 0; |
2911 | 0 | } |
2912 | 0 | } |
2913 | |
|
2914 | 0 | DCHECK(block->rows() == vec_match_pred.size()); |
2915 | |
|
2916 | 0 | if (null_map_column) { |
2917 | 0 | index_ctx->set_index_result_column_for_expr( |
2918 | 0 | expr, ColumnNullable::create(std::move(index_result_column), |
2919 | 0 | std::move(null_map_column))); |
2920 | 0 | } else { |
2921 | 0 | index_ctx->set_index_result_column_for_expr(expr, std::move(index_result_column)); |
2922 | 0 | } |
2923 | 0 | } |
2924 | 0 | } |
2925 | 0 | } |
2926 | | |
2927 | 4.19k | void SegmentIterator::_convert_dict_code_for_predicate_if_necessary() { |
2928 | 4.19k | for (auto predicate : _short_cir_eval_predicate) { |
2929 | 0 | _convert_dict_code_for_predicate_if_necessary_impl(predicate); |
2930 | 0 | } |
2931 | | |
2932 | 4.19k | for (auto predicate : _pre_eval_block_predicate) { |
2933 | 0 | _convert_dict_code_for_predicate_if_necessary_impl(predicate); |
2934 | 0 | } |
2935 | | |
2936 | 4.19k | for (auto column_id : _delete_range_column_ids) { |
2937 | 4.05k | _current_return_columns[column_id].get()->convert_dict_codes_if_necessary(); |
2938 | 4.05k | } |
2939 | | |
2940 | 4.19k | for (auto column_id : _delete_bloom_filter_column_ids) { |
2941 | 0 | _current_return_columns[column_id].get()->initialize_hash_values_for_runtime_filter(); |
2942 | 0 | } |
2943 | 4.19k | } |
2944 | | |
2945 | | void SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl( |
2946 | 0 | std::shared_ptr<ColumnPredicate> predicate) { |
2947 | 0 | auto& column = _current_return_columns[predicate->column_id()]; |
2948 | 0 | auto* col_ptr = column.get(); |
2949 | |
|
2950 | 0 | if (PredicateTypeTraits::is_range(predicate->type())) { |
2951 | 0 | col_ptr->convert_dict_codes_if_necessary(); |
2952 | 0 | } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) { |
2953 | 0 | col_ptr->initialize_hash_values_for_runtime_filter(); |
2954 | 0 | } |
2955 | 0 | } |
2956 | | |
2957 | 7.19k | Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* block_row_locations) { |
2958 | 7.19k | DCHECK(_opts.record_rowids); |
2959 | 7.19k | DCHECK_GE(_block_rowids.size(), _selected_size); |
2960 | 7.19k | block_row_locations->resize(_selected_size); |
2961 | 7.19k | uint32_t sid = segment_id(); |
2962 | 7.19k | if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { |
2963 | 4.24M | for (auto i = 0; i < _selected_size; i++) { |
2964 | 4.23M | (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]); |
2965 | 4.23M | } |
2966 | 4.46k | } else { |
2967 | 2.46M | for (auto i = 0; i < _selected_size; i++) { |
2968 | 2.46M | (*block_row_locations)[i] = RowLocation(sid, _block_rowids[_sel_rowid_idx[i]]); |
2969 | 2.46M | } |
2970 | 2.73k | } |
2971 | 7.19k | return Status::OK(); |
2972 | 7.19k | } |
2973 | | |
2974 | 5.63k | Status SegmentIterator::_construct_compound_expr_context() { |
2975 | 5.63k | ColumnIteratorOptions iter_opts { |
2976 | 5.63k | .use_page_cache = _opts.use_page_cache, |
2977 | 5.63k | .file_reader = _file_reader.get(), |
2978 | 5.63k | .stats = _opts.stats, |
2979 | 5.63k | .io_ctx = _opts.io_ctx, |
2980 | 5.63k | }; |
2981 | 5.63k | auto inverted_index_context = std::make_shared<IndexExecContext>( |
2982 | 5.63k | _schema->column_ids(), _index_iterators, _storage_name_and_type, |
2983 | 5.63k | _common_expr_index_exec_status, _score_runtime, _segment.get(), iter_opts); |
2984 | 5.63k | for (const auto& expr_ctx : _opts.common_expr_ctxs_push_down) { |
2985 | 0 | VExprContextSPtr context; |
2986 | | // _ann_range_search_runtime will do deep copy. |
2987 | 0 | RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context)); |
2988 | 0 | context->set_index_context(inverted_index_context); |
2989 | 0 | _common_expr_ctxs_push_down.emplace_back(context); |
2990 | 0 | } |
2991 | | // Clone virtual column exprs before setting IndexExecContext, because |
2992 | | // IndexExecContext holds segment-specific index iterator references. |
2993 | | // Without cloning, shared VExprContext would be overwritten per-segment |
2994 | | // and could point to the wrong segment's context. |
2995 | 5.63k | for (auto& [cid, expr_ctx] : _virtual_column_exprs) { |
2996 | 0 | VExprContextSPtr context; |
2997 | 0 | RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context)); |
2998 | 0 | context->set_index_context(inverted_index_context); |
2999 | 0 | expr_ctx = context; |
3000 | 0 | } |
3001 | 5.63k | return Status::OK(); |
3002 | 5.63k | } |
3003 | | |
3004 | 5.63k | void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() { |
3005 | 5.63k | for (const auto& root_expr_ctx : _common_expr_ctxs_push_down) { |
3006 | 0 | const auto& root_expr = root_expr_ctx->root(); |
3007 | 0 | if (root_expr == nullptr) { |
3008 | 0 | continue; |
3009 | 0 | } |
3010 | 0 | _common_expr_to_slotref_map[root_expr_ctx.get()] = std::unordered_map<ColumnId, VExpr*>(); |
3011 | |
|
3012 | 0 | std::stack<VExprSPtr> stack; |
3013 | 0 | stack.emplace(root_expr); |
3014 | |
|
3015 | 0 | while (!stack.empty()) { |
3016 | 0 | const auto& expr = stack.top(); |
3017 | 0 | stack.pop(); |
3018 | |
|
3019 | 0 | for (const auto& child : expr->children()) { |
3020 | 0 | if (child->is_virtual_slot_ref()) { |
3021 | | // Expand virtual slot ref to its underlying expression tree and |
3022 | | // collect real slot refs used inside. We still associate those |
3023 | | // slot refs with the current parent expr node for inverted index |
3024 | | // tracking, just like normal slot refs. |
3025 | 0 | auto* vir_slot_ref = assert_cast<VirtualSlotRef*>(child.get()); |
3026 | 0 | auto vir_expr = vir_slot_ref->get_virtual_column_expr(); |
3027 | 0 | if (vir_expr) { |
3028 | 0 | std::stack<VExprSPtr> vir_stack; |
3029 | 0 | vir_stack.emplace(vir_expr); |
3030 | |
|
3031 | 0 | while (!vir_stack.empty()) { |
3032 | 0 | const auto& vir_node = vir_stack.top(); |
3033 | 0 | vir_stack.pop(); |
3034 | |
|
3035 | 0 | for (const auto& vir_child : vir_node->children()) { |
3036 | 0 | if (vir_child->is_slot_ref()) { |
3037 | 0 | auto* inner_slot_ref = assert_cast<VSlotRef*>(vir_child.get()); |
3038 | 0 | _common_expr_index_exec_status[_schema->column_id( |
3039 | 0 | inner_slot_ref->column_id())][expr.get()] = false; |
3040 | 0 | _common_expr_to_slotref_map[root_expr_ctx.get()] |
3041 | 0 | [inner_slot_ref->column_id()] = |
3042 | 0 | expr.get(); |
3043 | 0 | } |
3044 | |
|
3045 | 0 | if (!vir_child->children().empty()) { |
3046 | 0 | vir_stack.emplace(vir_child); |
3047 | 0 | } |
3048 | 0 | } |
3049 | 0 | } |
3050 | 0 | } |
3051 | 0 | } |
3052 | | // Example: CAST(v['a'] AS VARCHAR) MATCH 'hello', do not add CAST expr to index tracking. |
3053 | 0 | auto expr_without_cast = VExpr::expr_without_cast(child); |
3054 | 0 | if (expr_without_cast->is_slot_ref() && expr->op() != TExprOpcode::CAST) { |
3055 | 0 | auto* column_slot_ref = assert_cast<VSlotRef*>(expr_without_cast.get()); |
3056 | 0 | _common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())] |
3057 | 0 | [expr.get()] = false; |
3058 | 0 | _common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] = |
3059 | 0 | expr.get(); |
3060 | 0 | } |
3061 | 0 | } |
3062 | |
|
3063 | 0 | const auto& children = expr->children(); |
3064 | 0 | for (int i = cast_set<int>(children.size()) - 1; i >= 0; --i) { |
3065 | 0 | if (!children[i]->children().empty()) { |
3066 | 0 | stack.emplace(children[i]); |
3067 | 0 | } |
3068 | 0 | } |
3069 | 0 | } |
3070 | 0 | } |
3071 | 5.63k | } |
3072 | | |
3073 | | bool SegmentIterator::_no_need_read_key_data(ColumnId cid, MutableColumnPtr& column, |
3074 | 43.4k | size_t nrows_read) { |
3075 | 43.4k | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) { |
3076 | 0 | return false; |
3077 | 0 | } |
3078 | | |
3079 | 43.4k | if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS || |
3080 | 43.4k | (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS && |
3081 | 16.3k | _opts.enable_unique_key_merge_on_write)))) { |
3082 | 12.0k | return false; |
3083 | 12.0k | } |
3084 | | |
3085 | 31.3k | if (_opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) { |
3086 | 31.3k | return false; |
3087 | 31.3k | } |
3088 | | |
3089 | 0 | if (!_opts.tablet_schema->column(cid).is_key()) { |
3090 | 0 | return false; |
3091 | 0 | } |
3092 | | |
3093 | 0 | if (_has_delete_predicate(cid)) { |
3094 | 0 | return false; |
3095 | 0 | } |
3096 | | |
3097 | 0 | if (!_check_all_conditions_passed_inverted_index_for_column(cid)) { |
3098 | 0 | return false; |
3099 | 0 | } |
3100 | | |
3101 | 0 | if (column->is_nullable()) { |
3102 | 0 | auto* nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get()); |
3103 | 0 | nullable_col_ptr->get_null_map_column().insert_many_defaults(nrows_read); |
3104 | 0 | nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(nrows_read); |
3105 | 0 | } else { |
3106 | 0 | column->insert_many_defaults(nrows_read); |
3107 | 0 | } |
3108 | |
|
3109 | 0 | return true; |
3110 | 0 | } |
3111 | | |
3112 | 31.3k | bool SegmentIterator::_has_delete_predicate(ColumnId cid) { |
3113 | 31.3k | std::set<uint32_t> delete_columns_set; |
3114 | 31.3k | _opts.delete_condition_predicates->get_all_column_ids(delete_columns_set); |
3115 | 31.3k | return delete_columns_set.contains(cid); |
3116 | 31.3k | } |
3117 | | |
3118 | 23.1k | bool SegmentIterator::_can_opt_topn_reads() { |
3119 | 23.1k | if (_opts.topn_limit <= 0) { |
3120 | 23.1k | return false; |
3121 | 23.1k | } |
3122 | | |
3123 | 0 | if (_opts.delete_condition_predicates->num_of_column_predicate() > 0) { |
3124 | 0 | return false; |
3125 | 0 | } |
3126 | | |
3127 | 0 | bool all_true = std::ranges::all_of(_schema->column_ids(), [this](auto cid) { |
3128 | 0 | if (cid == _opts.tablet_schema->delete_sign_idx()) { |
3129 | 0 | return true; |
3130 | 0 | } |
3131 | 0 | if (_check_all_conditions_passed_inverted_index_for_column(cid, true)) { |
3132 | 0 | return true; |
3133 | 0 | } |
3134 | 0 | return false; |
3135 | 0 | }); |
3136 | |
|
3137 | 0 | DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", { |
3138 | 0 | LOG(INFO) << "col_predicates: " << _col_predicates.size() << ", all_true: " << all_true; |
3139 | 0 | }) |
3140 | |
|
3141 | 0 | DBUG_EXECUTE_IF("segment_iterator.topn_opt_2", { |
3142 | 0 | if (all_true) { |
3143 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>("topn opt 2 execute failed"); |
3144 | 0 | } |
3145 | 0 | }) |
3146 | | |
3147 | 0 | return all_true; |
3148 | 0 | } |
3149 | | |
3150 | | // Before get next batch. make sure all virtual columns in block has type ColumnNothing. |
3151 | 23.1k | void SegmentIterator::_init_virtual_columns(Block* block) { |
3152 | 23.1k | for (const auto& pair : _vir_cid_to_idx_in_block) { |
3153 | 0 | auto& col_with_type_and_name = block->get_by_position(pair.second); |
3154 | 0 | col_with_type_and_name.column = ColumnNothing::create(0); |
3155 | 0 | col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second]; |
3156 | 0 | } |
3157 | 23.1k | } |
3158 | | |
3159 | 17.5k | Status SegmentIterator::_materialization_of_virtual_column(Block* block) { |
3160 | 17.5k | size_t prev_block_columns = block->columns(); |
3161 | | // Some expr can not process empty block, such as function `element_at`. |
3162 | | // So materialize virtual column in advance to avoid errors. |
3163 | 17.5k | if (block->rows() == 0) { |
3164 | 150 | for (const auto& pair : _vir_cid_to_idx_in_block) { |
3165 | 0 | auto& col_with_type_and_name = block->get_by_position(pair.second); |
3166 | 0 | col_with_type_and_name.column = _opts.vir_col_idx_to_type[pair.second]->create_column(); |
3167 | 0 | col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second]; |
3168 | 0 | } |
3169 | 150 | return Status::OK(); |
3170 | 150 | } |
3171 | | |
3172 | 17.3k | for (const auto& cid_and_expr : _virtual_column_exprs) { |
3173 | 0 | auto cid = cid_and_expr.first; |
3174 | 0 | auto column_expr = cid_and_expr.second; |
3175 | 0 | size_t idx_in_block = _vir_cid_to_idx_in_block[cid]; |
3176 | 0 | if (block->columns() <= idx_in_block) { |
3177 | 0 | return Status::InternalError( |
3178 | 0 | "Virtual column index {} is out of range, block columns {}, " |
3179 | 0 | "virtual columns size {}, virtual column expr {}", |
3180 | 0 | idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(), |
3181 | 0 | column_expr->root()->debug_string()); |
3182 | 0 | } else if (block->get_by_position(idx_in_block).column.get() == nullptr) { |
3183 | 0 | return Status::InternalError( |
3184 | 0 | "Virtual column index {} is null, block columns {}, virtual columns size {}, " |
3185 | 0 | "virtual column expr {}", |
3186 | 0 | idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(), |
3187 | 0 | column_expr->root()->debug_string()); |
3188 | 0 | } |
3189 | 0 | block->shrink_char_type_column_suffix_zero(_char_type_idx); |
3190 | 0 | if (check_and_get_column<const ColumnNothing>( |
3191 | 0 | block->get_by_position(idx_in_block).column.get())) { |
3192 | 0 | VLOG_DEBUG << fmt::format("Virtual column is doing materialization, cid {}, col idx {}", |
3193 | 0 | cid, idx_in_block); |
3194 | 0 | int result_cid = -1; |
3195 | 0 | RETURN_IF_ERROR(column_expr->execute(block, &result_cid)); |
3196 | | |
3197 | 0 | block->replace_by_position(idx_in_block, |
3198 | 0 | std::move(block->get_by_position(result_cid).column)); |
3199 | 0 | if (block->get_by_position(idx_in_block).column->size() == 0) { |
3200 | 0 | LOG_WARNING( |
3201 | 0 | "Result of expr column {} is empty. cid {}, idx_in_block {}, result_cid", |
3202 | 0 | column_expr->root()->debug_string(), cid, idx_in_block, result_cid); |
3203 | 0 | } |
3204 | 0 | } |
3205 | 0 | } |
3206 | | // During execution of expr, some columns may be added to the end of the block. |
3207 | | // Remove them to keep consistent with current block. |
3208 | 17.3k | block->erase_tail(prev_block_columns); |
3209 | 17.3k | return Status::OK(); |
3210 | 17.3k | } |
3211 | | |
3212 | 5.63k | void SegmentIterator::_prepare_score_column_materialization() { |
3213 | 5.63k | if (_score_runtime == nullptr) { |
3214 | 5.63k | return; |
3215 | 5.63k | } |
3216 | | |
3217 | 0 | ScoreRangeFilterPtr filter; |
3218 | 0 | if (_score_runtime->has_score_range_filter()) { |
3219 | 0 | const auto& range_info = _score_runtime->get_score_range_info(); |
3220 | 0 | filter = std::make_shared<ScoreRangeFilter>(range_info->op, range_info->threshold); |
3221 | 0 | } |
3222 | |
|
3223 | 0 | IColumn::MutablePtr result_column; |
3224 | 0 | auto result_row_ids = std::make_unique<std::vector<uint64_t>>(); |
3225 | 0 | if (_score_runtime->get_limit() > 0 && _col_predicates.empty() && |
3226 | 0 | _common_expr_ctxs_push_down.empty()) { |
3227 | 0 | OrderType order_type = _score_runtime->is_asc() ? OrderType::ASC : OrderType::DESC; |
3228 | 0 | _index_query_context->collection_similarity->get_topn_bm25_scores( |
3229 | 0 | &_row_bitmap, result_column, result_row_ids, order_type, |
3230 | 0 | _score_runtime->get_limit(), filter); |
3231 | 0 | } else { |
3232 | 0 | _index_query_context->collection_similarity->get_bm25_scores(&_row_bitmap, result_column, |
3233 | 0 | result_row_ids, filter); |
3234 | 0 | } |
3235 | 0 | const size_t dst_col_idx = _score_runtime->get_dest_column_idx(); |
3236 | 0 | auto* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get(); |
3237 | 0 | auto* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter); |
3238 | 0 | virtual_column_iter->prepare_materialization(std::move(result_column), |
3239 | 0 | std::move(result_row_ids)); |
3240 | 0 | } |
3241 | | |
3242 | | } // namespace segment_v2 |
3243 | | } // namespace doris |