be/src/storage/segment/segment_iterator.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/segment/segment_iterator.h" |
19 | | |
20 | | #include <assert.h> |
21 | | #include <gen_cpp/Exprs_types.h> |
22 | | #include <gen_cpp/Opcodes_types.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <gen_cpp/olap_file.pb.h> |
25 | | |
26 | | #include <algorithm> |
27 | | #include <boost/iterator/iterator_facade.hpp> |
28 | | #include <cstdint> |
29 | | #include <memory> |
30 | | #include <numeric> |
31 | | #include <set> |
32 | | #include <unordered_map> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | #include "cloud/config.h" |
37 | | #include "common/compiler_util.h" // IWYU pragma: keep |
38 | | #include "common/config.h" |
39 | | #include "common/consts.h" |
40 | | #include "common/exception.h" |
41 | | #include "common/logging.h" |
42 | | #include "common/metrics/doris_metrics.h" |
43 | | #include "common/object_pool.h" |
44 | | #include "common/status.h" |
45 | | #include "core/assert_cast.h" |
46 | | #include "core/block/column_with_type_and_name.h" |
47 | | #include "core/column/column.h" |
48 | | #include "core/column/column_const.h" |
49 | | #include "core/column/column_nothing.h" |
50 | | #include "core/column/column_nullable.h" |
51 | | #include "core/column/column_string.h" |
52 | | #include "core/column/column_variant.h" |
53 | | #include "core/column/column_vector.h" |
54 | | #include "core/data_type/data_type.h" |
55 | | #include "core/data_type/data_type_factory.hpp" |
56 | | #include "core/data_type/data_type_number.h" |
57 | | #include "core/data_type/define_primitive_type.h" |
58 | | #include "core/field.h" |
59 | | #include "core/string_ref.h" |
60 | | #include "core/typeid_cast.h" |
61 | | #include "core/types.h" |
62 | | #include "exprs/function/array/function_array_index.h" |
63 | | #include "exprs/vexpr.h" |
64 | | #include "exprs/vexpr_context.h" |
65 | | #include "exprs/virtual_slot_ref.h" |
66 | | #include "exprs/vliteral.h" |
67 | | #include "exprs/vslot_ref.h" |
68 | | #include "io/cache/cached_remote_file_reader.h" |
69 | | #include "io/fs/file_reader.h" |
70 | | #include "io/io_common.h" |
71 | | #include "runtime/query_context.h" |
72 | | #include "runtime/runtime_predicate.h" |
73 | | #include "runtime/runtime_state.h" |
74 | | #include "runtime/thread_context.h" |
75 | | #include "storage/compaction/collection_similarity.h" |
76 | | #include "storage/field.h" |
77 | | #include "storage/id_manager.h" |
78 | | #include "storage/index/ann/ann_index.h" |
79 | | #include "storage/index/ann/ann_index_iterator.h" |
80 | | #include "storage/index/ann/ann_index_reader.h" |
81 | | #include "storage/index/ann/ann_topn_runtime.h" |
82 | | #include "storage/index/index_file_reader.h" |
83 | | #include "storage/index/index_iterator.h" |
84 | | #include "storage/index/index_query_context.h" |
85 | | #include "storage/index/index_reader_helper.h" |
86 | | #include "storage/index/indexed_column_reader.h" |
87 | | #include "storage/index/inverted/inverted_index_reader.h" |
88 | | #include "storage/index/ordinal_page_index.h" |
89 | | #include "storage/index/primary_key_index.h" |
90 | | #include "storage/index/short_key_index.h" |
91 | | #include "storage/iterators.h" |
92 | | #include "storage/olap_common.h" |
93 | | #include "storage/predicate/bloom_filter_predicate.h" |
94 | | #include "storage/predicate/column_predicate.h" |
95 | | #include "storage/predicate/like_column_predicate.h" |
96 | | #include "storage/schema.h" |
97 | | #include "storage/segment/column_reader.h" |
98 | | #include "storage/segment/column_reader_cache.h" |
99 | | #include "storage/segment/condition_cache.h" |
100 | | #include "storage/segment/row_ranges.h" |
101 | | #include "storage/segment/segment.h" |
102 | | #include "storage/segment/segment_prefetcher.h" |
103 | | #include "storage/segment/variant/variant_column_reader.h" |
104 | | #include "storage/segment/virtual_column_iterator.h" |
105 | | #include "storage/tablet/tablet_schema.h" |
106 | | #include "storage/types.h" |
107 | | #include "storage/utils.h" |
108 | | #include "util/concurrency_stats.h" |
109 | | #include "util/defer_op.h" |
110 | | #include "util/simd/bits.h" |
111 | | |
112 | | namespace doris { |
113 | | using namespace ErrorCode; |
114 | | namespace segment_v2 { |
115 | | |
116 | 2.83k | SegmentIterator::~SegmentIterator() = default; |
117 | | |
118 | 2.82k | void SegmentIterator::_init_row_bitmap_by_condition_cache() { |
119 | | // Only dispose need column predicate and expr cal in condition cache |
120 | 2.82k | if (!_col_predicates.empty() || |
121 | 2.82k | (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty())) { |
122 | 0 | if (_opts.condition_cache_digest) { |
123 | 0 | auto* condition_cache = ConditionCache::instance(); |
124 | 0 | ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(), |
125 | 0 | _opts.condition_cache_digest); |
126 | | |
127 | | // Increment search count when digest != 0 |
128 | 0 | DorisMetrics::instance()->condition_cache_search_count->increment(1); |
129 | |
|
130 | 0 | ConditionCacheHandle handle; |
131 | 0 | _find_condition_cache = condition_cache->lookup(cache_key, &handle); |
132 | | |
133 | | // Increment hit count if cache lookup is successful |
134 | 0 | if (_find_condition_cache) { |
135 | 0 | DorisMetrics::instance()->condition_cache_hit_count->increment(1); |
136 | 0 | if (_opts.runtime_state) { |
137 | 0 | VLOG_DEBUG << "Condition cache hit, query id: " |
138 | 0 | << print_id(_opts.runtime_state->query_id()) |
139 | 0 | << ", segment id: " << _segment->id() |
140 | 0 | << ", cache digest: " << _opts.condition_cache_digest |
141 | 0 | << ", rowset id: " << _opts.rowset_id.to_string(); |
142 | 0 | } |
143 | 0 | } |
144 | |
|
145 | 0 | auto num_rows = _segment->num_rows(); |
146 | 0 | if (_find_condition_cache) { |
147 | 0 | const auto& filter_result = *(handle.get_filter_result()); |
148 | 0 | int64_t filtered_blocks = 0; |
149 | 0 | for (int i = 0; i < filter_result.size(); i++) { |
150 | 0 | if (!filter_result[i]) { |
151 | 0 | _row_bitmap.removeRange( |
152 | 0 | i * CONDITION_CACHE_OFFSET, |
153 | 0 | i * CONDITION_CACHE_OFFSET + CONDITION_CACHE_OFFSET); |
154 | 0 | filtered_blocks++; |
155 | 0 | } |
156 | 0 | } |
157 | | // Record condition_cache hit segment number |
158 | 0 | _opts.stats->condition_cache_hit_seg_nums++; |
159 | | // Record rows filtered by condition cache hit |
160 | 0 | _opts.stats->condition_cache_filtered_rows += |
161 | 0 | filtered_blocks * SegmentIterator::CONDITION_CACHE_OFFSET; |
162 | 0 | } else { |
163 | 0 | _condition_cache = std::make_shared<std::vector<bool>>( |
164 | 0 | num_rows / CONDITION_CACHE_OFFSET + 1, false); |
165 | 0 | } |
166 | 0 | } |
167 | 2.82k | } else { |
168 | 2.82k | _opts.condition_cache_digest = 0; |
169 | 2.82k | } |
170 | 2.82k | } |
171 | | |
172 | | // A fast range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to). |
173 | | // Example: |
174 | | // input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] |
175 | | // output ranges: [0,2), [4,8), [10,11), [15,20) (when max_range_size=10) |
176 | | // output ranges: [0,2), [4,7), [7,8), [10,11), [15,18), [18,20) (when max_range_size=3) |
177 | | class SegmentIterator::BitmapRangeIterator { |
178 | | public: |
179 | 0 | BitmapRangeIterator() = default; |
180 | 2.82k | virtual ~BitmapRangeIterator() = default; |
181 | | |
182 | 2.82k | explicit BitmapRangeIterator(const roaring::Roaring& bitmap) { |
183 | 2.82k | roaring_init_iterator(&bitmap.roaring, &_iter); |
184 | 2.82k | } |
185 | | |
186 | 0 | bool has_more_range() const { return !_eof; } |
187 | | |
188 | 6.50k | [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; } |
189 | | |
190 | | // read next range into [*from, *to) whose size <= max_range_size. |
191 | | // return false when there is no more range. |
192 | 0 | virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) { |
193 | 0 | if (_eof) { |
194 | 0 | return false; |
195 | 0 | } |
196 | | |
197 | 0 | *from = _buf[_buf_pos]; |
198 | 0 | uint32_t range_size = 0; |
199 | 0 | uint32_t expect_val = _buf[_buf_pos]; // this initial value just make first batch valid |
200 | | |
201 | | // if array is contiguous sequence then the following conditions need to be met : |
202 | | // a_0: x |
203 | | // a_1: x+1 |
204 | | // a_2: x+2 |
205 | | // ... |
206 | | // a_p: x+p |
207 | | // so we can just use (a_p-a_0)-p to check conditions |
208 | | // and should notice the previous batch needs to be continuous with the current batch |
209 | 0 | while (!_eof && range_size + _buf_size - _buf_pos <= max_range_size && |
210 | 0 | expect_val == _buf[_buf_pos] && |
211 | 0 | _buf[_buf_size - 1] - _buf[_buf_pos] == _buf_size - 1 - _buf_pos) { |
212 | 0 | range_size += _buf_size - _buf_pos; |
213 | 0 | expect_val = _buf[_buf_size - 1] + 1; |
214 | 0 | _read_next_batch(); |
215 | 0 | } |
216 | | |
217 | | // promise remain range not will reach next batch |
218 | 0 | if (!_eof && range_size < max_range_size && expect_val == _buf[_buf_pos]) { |
219 | 0 | do { |
220 | 0 | _buf_pos++; |
221 | 0 | range_size++; |
222 | 0 | } while (range_size < max_range_size && _buf[_buf_pos] == _buf[_buf_pos - 1] + 1); |
223 | 0 | } |
224 | 0 | *to = *from + range_size; |
225 | 0 | return true; |
226 | 0 | } |
227 | | |
228 | | // read batch_size of rowids from roaring bitmap into buf array |
229 | 12.9k | virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) { |
230 | 12.9k | return roaring::api::roaring_read_uint32_iterator(&_iter, buf, batch_size); |
231 | 12.9k | } |
232 | | |
233 | | private: |
234 | 0 | void _read_next_batch() { |
235 | 0 | _buf_pos = 0; |
236 | 0 | _buf_size = roaring::api::roaring_read_uint32_iterator(&_iter, _buf, kBatchSize); |
237 | 0 | _eof = (_buf_size == 0); |
238 | 0 | } |
239 | | |
240 | | static const uint32_t kBatchSize = 256; |
241 | | roaring::api::roaring_uint32_iterator_t _iter; |
242 | | uint32_t _buf[kBatchSize]; |
243 | | uint32_t _buf_pos = 0; |
244 | | uint32_t _buf_size = 0; |
245 | | bool _eof = false; |
246 | | }; |
247 | | |
248 | | // A backward range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to). |
249 | | // Example: |
250 | | // input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] |
251 | | // output ranges: , [15,20), [10,11), [4,8), [0,2) (when max_range_size=10) |
252 | | // output ranges: [17,20), [15,17), [10,11), [5,8), [4, 5), [0,2) (when max_range_size=3) |
253 | | class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::BitmapRangeIterator { |
254 | | public: |
255 | 0 | explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) { |
256 | 0 | roaring_init_iterator_last(&bitmap.roaring, &_riter); |
257 | 0 | _rowid_count = cast_set<uint32_t>(roaring_bitmap_get_cardinality(&bitmap.roaring)); |
258 | 0 | _rowid_left = _rowid_count; |
259 | 0 | } |
260 | | |
261 | 0 | bool has_more_range() const { return !_riter.has_value; } |
262 | | |
263 | | // read next range into [*from, *to) whose size <= max_range_size. |
264 | | // return false when there is no more range. |
265 | 0 | bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) override { |
266 | 0 | if (!_riter.has_value) { |
267 | 0 | return false; |
268 | 0 | } |
269 | | |
270 | 0 | uint32_t range_size = 0; |
271 | 0 | *to = _riter.current_value + 1; |
272 | |
|
273 | 0 | do { |
274 | 0 | *from = _riter.current_value; |
275 | 0 | range_size++; |
276 | 0 | roaring_previous_uint32_iterator(&_riter); |
277 | 0 | } while (range_size < max_range_size && _riter.has_value && |
278 | 0 | _riter.current_value + 1 == *from); |
279 | |
|
280 | 0 | return true; |
281 | 0 | } |
282 | | /** |
283 | | * Reads a batch of row IDs from a roaring bitmap, starting from the end and moving backwards. |
284 | | * This function retrieves the last `batch_size` row IDs from the bitmap and stores them in the provided buffer. |
285 | | * It updates the internal state to track how many row IDs are left to read in subsequent calls. |
286 | | * |
287 | | * The row IDs are read in reverse order, but stored in the buffer maintaining their original order in the bitmap. |
288 | | * |
289 | | * Example: |
290 | | * input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19] |
291 | | * If the bitmap has 12 elements and batch_size is set to 5, the function will first read [15, 16, 17, 18, 19] |
292 | | * into the buffer, leaving 7 elements left. In the next call with batch_size 5, it will read [4, 5, 6, 7, 10]. |
293 | | * |
294 | | */ |
295 | 0 | uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override { |
296 | 0 | if (!_riter.has_value || _rowid_left == 0) { |
297 | 0 | return 0; |
298 | 0 | } |
299 | | |
300 | 0 | if (_rowid_count <= batch_size) { |
301 | 0 | roaring_bitmap_to_uint32_array(_riter.parent, |
302 | 0 | buf); // Fill 'buf' with '_rowid_count' elements. |
303 | 0 | uint32_t num_read = _rowid_left; // Save the number of row IDs read. |
304 | 0 | _rowid_left = 0; // No row IDs left after this operation. |
305 | 0 | return num_read; // Return the number of row IDs read. |
306 | 0 | } |
307 | | |
308 | 0 | uint32_t read_size = std::min(batch_size, _rowid_left); |
309 | 0 | uint32_t num_read = 0; // Counter for the number of row IDs read. |
310 | | |
311 | | // Read row IDs into the buffer in reverse order. |
312 | 0 | while (num_read < read_size && _riter.has_value) { |
313 | 0 | buf[read_size - num_read - 1] = _riter.current_value; |
314 | 0 | num_read++; |
315 | 0 | _rowid_left--; // Decrement the count of remaining row IDs. |
316 | 0 | roaring_previous_uint32_iterator(&_riter); |
317 | 0 | } |
318 | | |
319 | | // Return the actual number of row IDs read. |
320 | 0 | return num_read; |
321 | 0 | } |
322 | | |
323 | | private: |
324 | | roaring::api::roaring_uint32_iterator_t _riter; |
325 | | uint32_t _rowid_count; |
326 | | uint32_t _rowid_left; |
327 | | }; |
328 | | |
329 | | SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema) |
330 | 2.83k | : _segment(std::move(segment)), |
331 | 2.83k | _schema(schema), |
332 | 2.83k | _column_iterators(_schema->num_columns()), |
333 | 2.83k | _index_iterators(_schema->num_columns()), |
334 | 2.83k | _cur_rowid(0), |
335 | 2.83k | _lazy_materialization_read(false), |
336 | 2.83k | _lazy_inited(false), |
337 | 2.83k | _inited(false), |
338 | 2.83k | _pool(new ObjectPool) {} |
339 | | |
340 | 5.50k | Status SegmentIterator::init(const StorageReadOptions& opts) { |
341 | 5.50k | auto status = _init_impl(opts); |
342 | 5.50k | if (!status.ok()) { |
343 | 0 | _segment->update_healthy_status(status); |
344 | 0 | } |
345 | 5.50k | return status; |
346 | 5.50k | } |
347 | | |
348 | 2.82k | std::unique_ptr<AdaptiveBlockSizePredictor> SegmentIterator::_make_block_size_predictor() const { |
349 | 2.82k | if (!config::enable_adaptive_batch_size || _opts.preferred_block_size_bytes == 0) { |
350 | 0 | return nullptr; |
351 | 0 | } |
352 | | |
353 | | // Collect per-column raw byte metadata from the segment footer for the columns |
354 | | // this iterator will actually output (defined by _schema, which is built from |
355 | | // _opts.return_columns). |
356 | 2.82k | std::vector<AdaptiveBlockSizePredictor::ColumnMetadata> col_metadata; |
357 | 2.82k | uint32_t seg_rows = _segment->num_rows(); |
358 | 2.82k | uint64_t total_raw_bytes = 0; |
359 | 2.82k | double metadata_hint_bytes_per_row = 0.0; |
360 | 2.82k | if (seg_rows > 0) { |
361 | 2.82k | const auto& ts = _segment->tablet_schema(); |
362 | 2.82k | if (ts) { |
363 | 7.05k | for (ColumnId cid : _schema->column_ids()) { |
364 | 7.05k | if (static_cast<size_t>(cid) < ts->num_columns()) { |
365 | 7.05k | int32_t uid = ts->column(cid).unique_id(); |
366 | 7.05k | uint64_t raw_bytes = _segment->column_raw_data_bytes(uid); |
367 | 7.05k | if (uid >= 0 && raw_bytes > 0) { |
368 | 6.94k | total_raw_bytes += raw_bytes; |
369 | 6.94k | } |
370 | 7.05k | } |
371 | 7.05k | } |
372 | 2.82k | metadata_hint_bytes_per_row = total_raw_bytes / static_cast<double>(seg_rows); |
373 | 2.82k | } |
374 | 2.82k | } |
375 | | |
376 | 2.82k | return std::make_unique<AdaptiveBlockSizePredictor>( |
377 | 2.82k | _opts.preferred_block_size_bytes, metadata_hint_bytes_per_row, |
378 | 2.82k | AdaptiveBlockSizePredictor::kDefaultProbeRows, _opts.block_row_max); |
379 | 2.82k | } |
380 | | |
381 | 5.50k | Status SegmentIterator::_init_impl(const StorageReadOptions& opts) { |
382 | | // get file handle from file descriptor of segment |
383 | 5.50k | if (_inited) { |
384 | 2.68k | return Status::OK(); |
385 | 2.68k | } |
386 | 2.82k | _opts = opts; |
387 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_timer_ns); |
388 | 2.82k | _inited = true; |
389 | 2.82k | _file_reader = _segment->_file_reader; |
390 | 2.82k | _col_predicates.clear(); |
391 | | |
392 | 2.82k | for (const auto& predicate : opts.column_predicates) { |
393 | 0 | if (!_segment->can_apply_predicate_safely(predicate->column_id(), *_schema, |
394 | 0 | _opts.target_cast_type_for_variants, _opts)) { |
395 | 0 | continue; |
396 | 0 | } |
397 | 0 | _col_predicates.emplace_back(predicate); |
398 | 0 | } |
399 | 2.82k | _tablet_id = opts.tablet_id; |
400 | | // Read options will not change, so that just resize here |
401 | 2.82k | _block_rowids.resize(_opts.block_row_max); |
402 | | |
403 | | // Adaptive batch size: snapshot the initial row limit and create predictor if enabled. |
404 | 2.82k | _initial_block_row_max = _opts.block_row_max; |
405 | 2.82k | _block_size_predictor = _make_block_size_predictor(); |
406 | | |
407 | 2.82k | _remaining_conjunct_roots = opts.remaining_conjunct_roots; |
408 | | |
409 | 2.82k | if (_schema->rowid_col_idx() > 0) { |
410 | 0 | _record_rowids = true; |
411 | 0 | } |
412 | | |
413 | 2.82k | _virtual_column_exprs = _opts.virtual_column_exprs; |
414 | 2.82k | _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block; |
415 | 2.82k | _score_runtime = _opts.score_runtime; |
416 | 2.82k | _ann_topn_runtime = _opts.ann_topn_runtime; |
417 | | |
418 | 2.82k | if (opts.output_columns != nullptr) { |
419 | 1.30k | _output_columns = *(opts.output_columns); |
420 | 1.30k | } |
421 | | |
422 | 2.82k | _storage_name_and_type.resize(_schema->columns().size()); |
423 | 2.82k | auto storage_format = _opts.tablet_schema->get_inverted_index_storage_format(); |
424 | 25.8k | for (int i = 0; i < _schema->columns().size(); ++i) { |
425 | 23.0k | const StorageField* col = _schema->column(i); |
426 | 23.0k | if (col) { |
427 | 7.05k | auto storage_type = _segment->get_data_type_of(col->get_desc(), _opts); |
428 | 7.05k | if (storage_type == nullptr) { |
429 | 0 | storage_type = DataTypeFactory::instance().create_data_type(col->get_desc(), |
430 | 0 | col->is_nullable()); |
431 | 0 | } |
432 | | // Currently, when writing a lucene index, the field of the document is column_name, and the column name is |
433 | | // bound to the index field. Since version 1.2, the data file storage has been changed from column_name to |
434 | | // column_unique_id, allowing the column name to be changed. Due to current limitations, previous inverted |
435 | | // index data cannot be used after Doris changes the column name. Column names also support Unicode |
436 | | // characters, which may cause other problems with indexing in non-ASCII characters. |
437 | | // After consideration, it was decided to change the field name from column_name to column_unique_id in |
438 | | // format V2, while format V1 continues to use column_name. |
439 | 7.05k | std::string field_name; |
440 | 7.05k | if (storage_format == InvertedIndexStorageFormatPB::V1) { |
441 | 4.37k | field_name = col->name(); |
442 | 4.37k | } else { |
443 | 2.68k | if (col->is_extracted_column()) { |
444 | | // variant sub col |
445 | | // field_name format: parent_unique_id.sub_col_name |
446 | 0 | field_name = std::to_string(col->parent_unique_id()) + "." + col->name(); |
447 | 2.68k | } else { |
448 | 2.68k | field_name = std::to_string(col->unique_id()); |
449 | 2.68k | } |
450 | 2.68k | } |
451 | 7.05k | _storage_name_and_type[i] = std::make_pair(field_name, storage_type); |
452 | 7.05k | if (int32_t uid = col->get_unique_id(); !_variant_sparse_column_cache.contains(uid)) { |
453 | 7.05k | DCHECK(uid >= 0); |
454 | 7.05k | _variant_sparse_column_cache.emplace(uid, |
455 | 7.05k | std::make_unique<PathToBinaryColumnCache>()); |
456 | 7.05k | } |
457 | 7.05k | } |
458 | 23.0k | } |
459 | | |
460 | 2.82k | RETURN_IF_ERROR(init_iterators()); |
461 | | |
462 | 2.82k | RETURN_IF_ERROR(_construct_compound_expr_context()); |
463 | 2.82k | _enable_common_expr_pushdown = !_common_expr_ctxs_push_down.empty(); |
464 | 2.82k | VLOG_DEBUG << fmt::format( |
465 | 0 | "Segment iterator init, virtual_column_exprs size: {}, " |
466 | 0 | "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}", |
467 | 0 | _opts.virtual_column_exprs.size(), _opts.vir_cid_to_idx_in_block.size(), |
468 | 0 | _common_expr_ctxs_push_down.size()); |
469 | 2.82k | _initialize_predicate_results(); |
470 | 2.82k | return Status::OK(); |
471 | 2.82k | } |
472 | | |
473 | 2.82k | void SegmentIterator::_initialize_predicate_results() { |
474 | | // Initialize from _col_predicates |
475 | 2.82k | for (auto pred : _col_predicates) { |
476 | 0 | int cid = pred->column_id(); |
477 | 0 | _column_predicate_index_exec_status[cid][pred] = false; |
478 | 0 | } |
479 | | |
480 | 2.82k | _calculate_expr_in_remaining_conjunct_root(); |
481 | 2.82k | } |
482 | | |
483 | 2.82k | Status SegmentIterator::init_iterators() { |
484 | 2.82k | RETURN_IF_ERROR(_init_return_column_iterators()); |
485 | 2.82k | RETURN_IF_ERROR(_init_index_iterators()); |
486 | 2.82k | return Status::OK(); |
487 | 2.82k | } |
488 | | |
489 | 12.9k | Status SegmentIterator::_lazy_init(Block* block) { |
490 | 12.9k | if (_lazy_inited) { |
491 | 10.1k | return Status::OK(); |
492 | 10.1k | } |
493 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->block_init_ns); |
494 | 2.82k | DorisMetrics::instance()->segment_read_total->increment(1); |
495 | 2.82k | _row_bitmap.addRange(0, _segment->num_rows()); |
496 | 2.82k | _init_row_bitmap_by_condition_cache(); |
497 | | |
498 | | // z-order can not use prefix index |
499 | 2.82k | if (_segment->_tablet_schema->sort_type() != SortType::ZORDER && |
500 | 2.82k | _segment->_tablet_schema->cluster_key_uids().empty()) { |
501 | 2.82k | RETURN_IF_ERROR(_get_row_ranges_by_keys()); |
502 | 2.82k | } |
503 | 2.82k | RETURN_IF_ERROR(_get_row_ranges_by_column_conditions()); |
504 | 2.82k | RETURN_IF_ERROR(_vec_init_lazy_materialization()); |
505 | | // Remove rows that have been marked deleted |
506 | 2.82k | if (_opts.delete_bitmap.count(segment_id()) > 0 && |
507 | 2.82k | _opts.delete_bitmap.at(segment_id()) != nullptr) { |
508 | 25 | size_t pre_size = _row_bitmap.cardinality(); |
509 | 25 | _row_bitmap -= *(_opts.delete_bitmap.at(segment_id())); |
510 | 25 | _opts.stats->rows_del_by_bitmap += (pre_size - _row_bitmap.cardinality()); |
511 | 25 | VLOG_DEBUG << "read on segment: " << segment_id() << ", delete bitmap cardinality: " |
512 | 0 | << _opts.delete_bitmap.at(segment_id())->cardinality() << ", " |
513 | 0 | << _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap"; |
514 | 25 | } |
515 | | |
516 | 2.82k | if (!_opts.row_ranges.is_empty()) { |
517 | 0 | _row_bitmap &= RowRanges::ranges_to_roaring(_opts.row_ranges); |
518 | 0 | } |
519 | | |
520 | 2.82k | _prepare_score_column_materialization(); |
521 | | |
522 | 2.82k | RETURN_IF_ERROR(_apply_ann_topn_predicate()); |
523 | | |
524 | 2.82k | if (_opts.read_orderby_key_reverse) { |
525 | 0 | _range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap)); |
526 | 2.82k | } else { |
527 | 2.82k | _range_iter.reset(new BitmapRangeIterator(_row_bitmap)); |
528 | 2.82k | } |
529 | | |
530 | | // Reserve columns for _initial_block_row_max (the original max before any adaptive |
531 | | // prediction) because the predictor may increase block_row_max on subsequent batches |
532 | | // up to this ceiling. Using the current (possibly reduced) _opts.block_row_max would |
533 | | // cause heap-buffer-overflow if a later prediction is larger. |
534 | 2.82k | auto nrows_reserve_limit = |
535 | 2.82k | std::min(_row_bitmap.cardinality(), uint64_t(_initial_block_row_max)); |
536 | 2.82k | if (_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval) { |
537 | 896 | _block_rowids.resize(_initial_block_row_max); |
538 | 896 | } |
539 | 2.82k | _current_return_columns.resize(_schema->columns().size()); |
540 | | |
541 | 2.82k | _vec_init_char_column_id(block); |
542 | 9.87k | for (size_t i = 0; i < _schema->column_ids().size(); i++) { |
543 | 7.05k | ColumnId cid = _schema->column_ids()[i]; |
544 | 7.05k | const auto* column_desc = _schema->column(cid); |
545 | 7.05k | if (_is_pred_column[cid]) { |
546 | 467 | auto storage_column_type = _storage_name_and_type[cid].second; |
547 | | // Char type is special , since char type's computational datatype is same with string, |
548 | | // both are DataTypeString, but DataTypeString only return FieldType::OLAP_FIELD_TYPE_STRING |
549 | | // in get_storage_field_type. |
550 | 467 | RETURN_IF_CATCH_EXCEPTION( |
551 | | // Here, cid will not go out of bounds |
552 | | // because the size of _current_return_columns equals _schema->tablet_columns().size() |
553 | 467 | _current_return_columns[cid] = Schema::get_predicate_column_ptr( |
554 | 467 | _is_char_type[cid] ? FieldType::OLAP_FIELD_TYPE_CHAR |
555 | 467 | : storage_column_type->get_storage_field_type(), |
556 | 467 | storage_column_type->is_nullable(), _opts.io_ctx.reader_type)); |
557 | 467 | _current_return_columns[cid]->set_rowset_segment_id( |
558 | 467 | {_segment->rowset_id(), _segment->id()}); |
559 | 467 | _current_return_columns[cid]->reserve(nrows_reserve_limit); |
560 | 6.58k | } else if (i >= block->columns()) { |
561 | | // This column needs to be scanned, but doesn't need to be returned upward. (delete sign) |
562 | | // if i >= block->columns means the column and not the pred_column means `column i` is |
563 | | // a delete condition column. but the column is not effective in the segment. so we just |
564 | | // create a column to hold the data. |
565 | | // a. origin data -> b. delete condition -> c. new load data |
566 | | // the segment of c do not effective delete condition, but it still need read the column |
567 | | // to match the schema. |
568 | | // TODO: skip read the not effective delete column to speed up segment read. |
569 | 0 | _current_return_columns[cid] = Schema::get_data_type_ptr(*column_desc)->create_column(); |
570 | 0 | _current_return_columns[cid]->reserve(nrows_reserve_limit); |
571 | 0 | } |
572 | 7.05k | } |
573 | | |
574 | | // Additional deleted filter condition will be materialized column be at the end of the block, |
575 | | // after _output_column_by_sel_idx will be erase, we not need to filter it, |
576 | | // so erase it from _columns_to_filter in the first next_batch. |
577 | | // Eg: |
578 | | // `delete from table where a = 10;` |
579 | | // `select b from table;` |
580 | | // a column only effective in segment iterator, the block from query engine only contain the b column, |
581 | | // so no need to filter a column by expr. |
582 | 2.82k | for (auto it = _columns_to_filter.begin(); it != _columns_to_filter.end();) { |
583 | 0 | if (*it >= block->columns()) { |
584 | 0 | it = _columns_to_filter.erase(it); |
585 | 0 | } else { |
586 | 0 | ++it; |
587 | 0 | } |
588 | 0 | } |
589 | | |
590 | 2.82k | _lazy_inited = true; |
591 | | |
592 | 2.82k | _init_segment_prefetchers(); |
593 | | |
594 | 2.82k | return Status::OK(); |
595 | 2.82k | } |
596 | | |
597 | 2.82k | void SegmentIterator::_init_segment_prefetchers() { |
598 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns); |
599 | 2.82k | if (!config::is_cloud_mode()) { |
600 | 2.82k | return; |
601 | 2.82k | } |
602 | 0 | static std::vector<ReaderType> supported_reader_types { |
603 | 0 | ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION, |
604 | 0 | ReaderType::READER_CUMULATIVE_COMPACTION, ReaderType::READER_FULL_COMPACTION}; |
605 | 0 | if (std::ranges::none_of(supported_reader_types, |
606 | 0 | [&](ReaderType t) { return _opts.io_ctx.reader_type == t; })) { |
607 | 0 | return; |
608 | 0 | } |
609 | | // Initialize segment prefetcher for predicate and non-predicate columns |
610 | 0 | bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY); |
611 | 0 | bool enable_prefetch = is_query ? config::enable_query_segment_file_cache_prefetch |
612 | 0 | : config::enable_compaction_segment_file_cache_prefetch; |
613 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
614 | 0 | "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, enable_prefetch={}, " |
615 | 0 | "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, rowset={}, " |
616 | 0 | "segment={}, predicate_column_ids={}, common_expr_column_ids={}", |
617 | 0 | is_query, enable_prefetch, _row_bitmap.isEmpty(), _row_bitmap.cardinality(), |
618 | 0 | _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), |
619 | 0 | fmt::join(_predicate_column_ids, ","), fmt::join(_common_expr_column_ids, ",")); |
620 | 0 | if (enable_prefetch && !_row_bitmap.isEmpty()) { |
621 | 0 | int window_size = |
622 | 0 | 1 + (is_query ? config::query_segment_file_cache_prefetch_block_size |
623 | 0 | : config::compaction_segment_file_cache_prefetch_block_size); |
624 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
625 | 0 | "[verbose] SegmentIterator prefetch config: window_size={}", window_size); |
626 | 0 | if (window_size > 0 && |
627 | 0 | !_column_iterators.empty()) { // ensure init_iterators has been called |
628 | 0 | SegmentPrefetcherConfig prefetch_config(window_size, |
629 | 0 | config::file_cache_each_block_size); |
630 | 0 | for (auto cid : _schema->column_ids()) { |
631 | 0 | auto& column_iter = _column_iterators[cid]; |
632 | 0 | if (column_iter == nullptr) { |
633 | 0 | continue; |
634 | 0 | } |
635 | 0 | const auto* tablet_column = _schema->column(cid); |
636 | 0 | SegmentPrefetchParams params { |
637 | 0 | .config = prefetch_config, |
638 | 0 | .read_options = _opts, |
639 | 0 | }; |
640 | 0 | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
641 | 0 | "[verbose] SegmentIterator init_segment_prefetchers, " |
642 | 0 | "tablet={}, rowset={}, segment={}, column_id={}, col_name={}, type={}", |
643 | 0 | _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), cid, |
644 | 0 | tablet_column->name(), tablet_column->type()); |
645 | 0 | Status st = column_iter->init_prefetcher(params); |
646 | 0 | if (!st.ok()) { |
647 | 0 | LOG_IF(WARNING, config::enable_segment_prefetch_verbose_log) << fmt::format( |
648 | 0 | "[verbose] failed to init prefetcher for column_id={}, " |
649 | 0 | "tablet={}, rowset={}, segment={}, error={}", |
650 | 0 | cid, _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), |
651 | 0 | st.to_string()); |
652 | 0 | } |
653 | 0 | } |
654 | | |
655 | | // for compaction, it's guaranteed that all rows are read, so we can prefetch all data blocks |
656 | 0 | PrefetcherInitMethod init_method = (is_query && _row_bitmap.cardinality() < num_rows()) |
657 | 0 | ? PrefetcherInitMethod::FROM_ROWIDS |
658 | 0 | : PrefetcherInitMethod::ALL_DATA_BLOCKS; |
659 | 0 | std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>> prefetchers; |
660 | 0 | for (const auto& column_iter : _column_iterators) { |
661 | 0 | if (column_iter != nullptr) { |
662 | 0 | column_iter->collect_prefetchers(prefetchers, init_method); |
663 | 0 | } |
664 | 0 | } |
665 | 0 | for (auto& [method, prefetcher_vec] : prefetchers) { |
666 | 0 | if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) { |
667 | 0 | for (auto* prefetcher : prefetcher_vec) { |
668 | 0 | prefetcher->build_all_data_blocks(); |
669 | 0 | } |
670 | 0 | } else if (method == PrefetcherInitMethod::FROM_ROWIDS && !prefetcher_vec.empty()) { |
671 | 0 | SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap, prefetcher_vec); |
672 | 0 | } |
673 | 0 | } |
674 | 0 | } |
675 | 0 | } |
676 | 0 | } |
677 | | |
678 | 2.82k | Status SegmentIterator::_get_row_ranges_by_keys() { |
679 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns); |
680 | 2.82k | DorisMetrics::instance()->segment_row_total->increment(num_rows()); |
681 | | |
682 | | // fast path for empty segment or empty key ranges |
683 | 2.82k | if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) { |
684 | 2.82k | return Status::OK(); |
685 | 2.82k | } |
686 | | |
687 | | // Read & seek key columns is a waste of time when no key column in _schema |
688 | 0 | if (std::none_of( |
689 | 0 | _schema->columns().begin(), _schema->columns().end(), [&](const StorageField* col) { |
690 | 0 | return col && _opts.tablet_schema->column_by_uid(col->unique_id()).is_key(); |
691 | 0 | })) { |
692 | 0 | return Status::OK(); |
693 | 0 | } |
694 | | |
695 | 0 | RowRanges result_ranges; |
696 | 0 | for (auto& key_range : _opts.key_ranges) { |
697 | 0 | rowid_t lower_rowid = 0; |
698 | 0 | rowid_t upper_rowid = num_rows(); |
699 | 0 | RETURN_IF_ERROR(_prepare_seek(key_range)); |
700 | 0 | if (key_range.upper_key != nullptr) { |
701 | | // If client want to read upper_bound, the include_upper is true. So we |
702 | | // should get the first ordinal at which key is larger than upper_bound. |
703 | | // So we call _lookup_ordinal with include_upper's negate |
704 | 0 | RETURN_IF_ERROR(_lookup_ordinal(*key_range.upper_key, !key_range.include_upper, |
705 | 0 | num_rows(), &upper_rowid)); |
706 | 0 | } |
707 | 0 | if (upper_rowid > 0 && key_range.lower_key != nullptr) { |
708 | 0 | RETURN_IF_ERROR(_lookup_ordinal(*key_range.lower_key, key_range.include_lower, |
709 | 0 | upper_rowid, &lower_rowid)); |
710 | 0 | } |
711 | 0 | auto row_range = RowRanges::create_single(lower_rowid, upper_rowid); |
712 | 0 | RowRanges::ranges_union(result_ranges, row_range, &result_ranges); |
713 | 0 | } |
714 | 0 | size_t pre_size = _row_bitmap.cardinality(); |
715 | 0 | _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges); |
716 | 0 | _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality()); |
717 | |
|
718 | 0 | return Status::OK(); |
719 | 0 | } |
720 | | |
721 | | // Set up environment for the following seek. |
722 | 0 | Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_range) { |
723 | 0 | std::vector<const StorageField*> key_fields; |
724 | 0 | std::set<uint32_t> column_set; |
725 | 0 | if (key_range.lower_key != nullptr) { |
726 | 0 | for (auto cid : key_range.lower_key->schema()->column_ids()) { |
727 | 0 | column_set.emplace(cid); |
728 | 0 | key_fields.emplace_back(key_range.lower_key->column_schema(cid)); |
729 | 0 | } |
730 | 0 | } |
731 | 0 | if (key_range.upper_key != nullptr) { |
732 | 0 | for (auto cid : key_range.upper_key->schema()->column_ids()) { |
733 | 0 | if (column_set.count(cid) == 0) { |
734 | 0 | key_fields.emplace_back(key_range.upper_key->column_schema(cid)); |
735 | 0 | column_set.emplace(cid); |
736 | 0 | } |
737 | 0 | } |
738 | 0 | } |
739 | 0 | if (!_seek_schema) { |
740 | | // Schema constructors accept a vector of TabletColumnPtr. Convert |
741 | | // StorageField pointers to TabletColumnPtr by copying their descriptors. |
742 | 0 | std::vector<TabletColumnPtr> cols; |
743 | 0 | cols.reserve(key_fields.size()); |
744 | 0 | for (const StorageField* f : key_fields) { |
745 | 0 | cols.emplace_back(std::make_shared<TabletColumn>(f->get_desc())); |
746 | 0 | } |
747 | 0 | _seek_schema = std::make_unique<Schema>(cols, cols.size()); |
748 | 0 | } |
749 | | // todo(wb) need refactor here, when using pk to search, _seek_block is useless |
750 | 0 | if (_seek_block.size() == 0) { |
751 | 0 | _seek_block.resize(_seek_schema->num_column_ids()); |
752 | 0 | int i = 0; |
753 | 0 | for (auto cid : _seek_schema->column_ids()) { |
754 | 0 | auto column_desc = _seek_schema->column(cid); |
755 | 0 | _seek_block[i] = Schema::get_column_by_field(*column_desc); |
756 | 0 | i++; |
757 | 0 | } |
758 | 0 | } |
759 | | |
760 | | // create used column iterator |
761 | 0 | for (auto cid : _seek_schema->column_ids()) { |
762 | 0 | if (_column_iterators[cid] == nullptr) { |
763 | | // TODO: Do we need this? |
764 | 0 | if (_virtual_column_exprs.contains(cid)) { |
765 | 0 | _column_iterators[cid] = std::make_unique<VirtualColumnIterator>(); |
766 | 0 | continue; |
767 | 0 | } |
768 | | |
769 | 0 | RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid), |
770 | 0 | &_column_iterators[cid], &_opts, |
771 | 0 | &_variant_sparse_column_cache)); |
772 | 0 | ColumnIteratorOptions iter_opts { |
773 | 0 | .use_page_cache = _opts.use_page_cache, |
774 | 0 | .file_reader = _file_reader.get(), |
775 | 0 | .stats = _opts.stats, |
776 | 0 | .io_ctx = _opts.io_ctx, |
777 | 0 | }; |
778 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); |
779 | 0 | } |
780 | 0 | } |
781 | | |
782 | 0 | return Status::OK(); |
783 | 0 | } |
784 | | |
785 | 2.82k | Status SegmentIterator::_get_row_ranges_by_column_conditions() { |
786 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_column_conditions_ns); |
787 | 2.82k | if (_row_bitmap.isEmpty()) { |
788 | 0 | return Status::OK(); |
789 | 0 | } |
790 | | |
791 | 2.82k | { |
792 | 2.82k | if (_opts.runtime_state && |
793 | 2.82k | _opts.runtime_state->query_options().enable_inverted_index_query && |
794 | 2.82k | (has_index_in_iterators() || !_common_expr_ctxs_push_down.empty())) { |
795 | 0 | SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer); |
796 | 0 | size_t input_rows = _row_bitmap.cardinality(); |
797 | | // Only apply column-level inverted index if we have iterators |
798 | 0 | if (has_index_in_iterators()) { |
799 | 0 | RETURN_IF_ERROR(_apply_inverted_index()); |
800 | 0 | } |
801 | | // Always apply expr-level index (e.g., search expressions) if we have common_expr_pushdown |
802 | | // This allows search expressions with variant subcolumns to be evaluated even when |
803 | | // the segment doesn't have all subcolumns |
804 | 0 | RETURN_IF_ERROR(_apply_index_expr()); |
805 | 0 | for (auto it = _common_expr_ctxs_push_down.begin(); |
806 | 0 | it != _common_expr_ctxs_push_down.end();) { |
807 | 0 | if ((*it)->all_expr_inverted_index_evaluated()) { |
808 | 0 | const auto* result = (*it)->get_index_context()->get_index_result_for_expr( |
809 | 0 | (*it)->root().get()); |
810 | 0 | if (result != nullptr) { |
811 | 0 | _row_bitmap &= *result->get_data_bitmap(); |
812 | 0 | auto root = (*it)->root(); |
813 | 0 | auto iter_find = std::find(_remaining_conjunct_roots.begin(), |
814 | 0 | _remaining_conjunct_roots.end(), root); |
815 | 0 | if (iter_find != _remaining_conjunct_roots.end()) { |
816 | 0 | _remaining_conjunct_roots.erase(iter_find); |
817 | 0 | } |
818 | 0 | it = _common_expr_ctxs_push_down.erase(it); |
819 | 0 | } |
820 | 0 | } else { |
821 | 0 | ++it; |
822 | 0 | } |
823 | 0 | } |
824 | 0 | _opts.condition_cache_digest = |
825 | 0 | _common_expr_ctxs_push_down.empty() ? 0 : _opts.condition_cache_digest; |
826 | 0 | _opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality()); |
827 | 0 | for (auto cid : _schema->column_ids()) { |
828 | 0 | bool result_true = _check_all_conditions_passed_inverted_index_for_column(cid); |
829 | 0 | if (result_true) { |
830 | 0 | _need_read_data_indices[cid] = false; |
831 | 0 | } |
832 | 0 | } |
833 | 0 | } |
834 | 2.82k | } |
835 | | |
836 | 2.82k | DBUG_EXECUTE_IF("segment_iterator.inverted_index.filtered_rows", { |
837 | 2.82k | LOG(INFO) << "Debug Point: segment_iterator.inverted_index.filtered_rows: " |
838 | 2.82k | << _opts.stats->rows_inverted_index_filtered; |
839 | 2.82k | auto filtered_rows = DebugPoints::instance()->get_debug_param_or_default<int32_t>( |
840 | 2.82k | "segment_iterator.inverted_index.filtered_rows", "filtered_rows", -1); |
841 | 2.82k | if (filtered_rows != _opts.stats->rows_inverted_index_filtered) { |
842 | 2.82k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
843 | 2.82k | "filtered_rows: {} not equal to expected: {}", |
844 | 2.82k | _opts.stats->rows_inverted_index_filtered, filtered_rows); |
845 | 2.82k | } |
846 | 2.82k | }) |
847 | | |
848 | 2.82k | DBUG_EXECUTE_IF("segment_iterator.apply_inverted_index", { |
849 | 2.82k | LOG(INFO) << "Debug Point: segment_iterator.apply_inverted_index"; |
850 | 2.82k | if (!_common_expr_ctxs_push_down.empty() || !_col_predicates.empty()) { |
851 | 2.82k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
852 | 2.82k | "it is failed to apply inverted index, common_expr_ctxs_push_down: {}, " |
853 | 2.82k | "col_predicates: {}", |
854 | 2.82k | _common_expr_ctxs_push_down.size(), _col_predicates.size()); |
855 | 2.82k | } |
856 | 2.82k | }) |
857 | | |
858 | 2.82k | if (!_row_bitmap.isEmpty() && |
859 | 2.82k | (!_opts.topn_filter_source_node_ids.empty() || !_opts.col_id_to_predicates.empty() || |
860 | 2.82k | _opts.delete_condition_predicates->num_of_column_predicate() > 0)) { |
861 | 467 | RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows()); |
862 | 467 | RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges)); |
863 | 467 | size_t pre_size = _row_bitmap.cardinality(); |
864 | 467 | _row_bitmap &= RowRanges::ranges_to_roaring(condition_row_ranges); |
865 | 467 | _opts.stats->rows_conditions_filtered += (pre_size - _row_bitmap.cardinality()); |
866 | 467 | } |
867 | | |
868 | 2.82k | DBUG_EXECUTE_IF("bloom_filter_must_filter_data", { |
869 | 2.82k | if (_opts.stats->rows_bf_filtered == 0) { |
870 | 2.82k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
871 | 2.82k | "Bloom filter did not filter the data."); |
872 | 2.82k | } |
873 | 2.82k | }) |
874 | | |
875 | | // TODO(hkp): calculate filter rate to decide whether to |
876 | | // use zone map/bloom filter/secondary index or not. |
877 | 2.82k | return Status::OK(); |
878 | 2.82k | } |
879 | | |
880 | 0 | bool SegmentIterator::_column_has_ann_index(int32_t cid) { |
881 | 0 | bool has_ann_index = _index_iterators[cid] != nullptr && |
882 | 0 | _index_iterators[cid]->get_reader(AnnIndexReaderType::ANN); |
883 | |
|
884 | 0 | return has_ann_index; |
885 | 0 | } |
886 | | |
887 | 2.82k | Status SegmentIterator::_apply_ann_topn_predicate() { |
888 | 2.82k | if (_ann_topn_runtime == nullptr) { |
889 | 2.82k | return Status::OK(); |
890 | 2.82k | } |
891 | | |
892 | 0 | VLOG_DEBUG << fmt::format("Try apply ann topn: {}", _ann_topn_runtime->debug_string()); |
893 | 0 | size_t src_col_idx = _ann_topn_runtime->get_src_column_idx(); |
894 | 0 | ColumnId src_cid = _schema->column_id(src_col_idx); |
895 | 0 | IndexIterator* ann_index_iterator = _index_iterators[src_cid].get(); |
896 | 0 | bool has_ann_index = _column_has_ann_index(src_cid); |
897 | 0 | bool has_common_expr_push_down = !_common_expr_ctxs_push_down.empty(); |
898 | 0 | bool has_column_predicate = std::any_of(_is_pred_column.begin(), _is_pred_column.end(), |
899 | 0 | [](bool is_pred) { return is_pred; }); |
900 | 0 | if (!has_ann_index || has_common_expr_push_down || has_column_predicate) { |
901 | 0 | VLOG_DEBUG << fmt::format( |
902 | 0 | "Ann topn can not be evaluated by ann index, has_ann_index: {}, " |
903 | 0 | "has_common_expr_push_down: {}, has_column_predicate: {}", |
904 | 0 | has_ann_index, has_common_expr_push_down, has_column_predicate); |
905 | | // Disable index-only scan on ann indexed column. |
906 | 0 | _need_read_data_indices[src_cid] = true; |
907 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
908 | 0 | return Status::OK(); |
909 | 0 | } |
910 | | |
911 | | // Process asc & desc according to the type of metric |
912 | 0 | auto index_reader = ann_index_iterator->get_reader(AnnIndexReaderType::ANN); |
913 | 0 | auto ann_index_reader = dynamic_cast<AnnIndexReader*>(index_reader.get()); |
914 | 0 | DCHECK(ann_index_reader != nullptr); |
915 | 0 | if (ann_index_reader->get_metric_type() == AnnIndexMetric::IP) { |
916 | 0 | if (_ann_topn_runtime->is_asc()) { |
917 | 0 | VLOG_DEBUG << fmt::format( |
918 | 0 | "Asc topn for inner product can not be evaluated by ann index"); |
919 | | // Disable index-only scan on ann indexed column. |
920 | 0 | _need_read_data_indices[src_cid] = true; |
921 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
922 | 0 | return Status::OK(); |
923 | 0 | } |
924 | 0 | } else { |
925 | 0 | if (!_ann_topn_runtime->is_asc()) { |
926 | 0 | VLOG_DEBUG << fmt::format("Desc topn for l2/cosine can not be evaluated by ann index"); |
927 | | // Disable index-only scan on ann indexed column. |
928 | 0 | _need_read_data_indices[src_cid] = true; |
929 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
930 | 0 | return Status::OK(); |
931 | 0 | } |
932 | 0 | } |
933 | | |
934 | 0 | if (ann_index_reader->get_metric_type() != _ann_topn_runtime->get_metric_type()) { |
935 | 0 | VLOG_DEBUG << fmt::format( |
936 | 0 | "Ann topn metric type {} not match index metric type {}, can not be evaluated by " |
937 | 0 | "ann index", |
938 | 0 | metric_to_string(_ann_topn_runtime->get_metric_type()), |
939 | 0 | metric_to_string(ann_index_reader->get_metric_type())); |
940 | | // Disable index-only scan on ann indexed column. |
941 | 0 | _need_read_data_indices[src_cid] = true; |
942 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
943 | 0 | return Status::OK(); |
944 | 0 | } |
945 | | |
946 | 0 | size_t pre_size = _row_bitmap.cardinality(); |
947 | 0 | size_t rows_of_segment = _segment->num_rows(); |
948 | 0 | if (static_cast<double>(pre_size) < static_cast<double>(rows_of_segment) * 0.3) { |
949 | 0 | VLOG_DEBUG << fmt::format( |
950 | 0 | "Ann topn predicate input rows {} < 30% of segment rows {}, will not use ann index " |
951 | 0 | "to " |
952 | 0 | "filter", |
953 | 0 | pre_size, rows_of_segment); |
954 | | // Disable index-only scan on ann indexed column. |
955 | 0 | _need_read_data_indices[src_cid] = true; |
956 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
957 | 0 | return Status::OK(); |
958 | 0 | } |
959 | 0 | IColumn::MutablePtr result_column; |
960 | 0 | std::shared_ptr<std::vector<uint64_t>> result_row_ids; |
961 | 0 | segment_v2::AnnIndexStats ann_index_stats; |
962 | | |
963 | | // Try to load ANN index before search |
964 | 0 | auto ann_index_iterator_casted = |
965 | 0 | dynamic_cast<segment_v2::AnnIndexIterator*>(ann_index_iterator); |
966 | 0 | if (ann_index_iterator_casted == nullptr) { |
967 | 0 | VLOG_DEBUG << "Failed to cast index iterator to AnnIndexIterator, fallback to brute force"; |
968 | 0 | _need_read_data_indices[src_cid] = true; |
969 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
970 | 0 | return Status::OK(); |
971 | 0 | } |
972 | | |
973 | | // Track load index timing |
974 | 0 | { |
975 | 0 | SCOPED_TIMER(&(ann_index_stats.load_index_costs_ns)); |
976 | 0 | if (!ann_index_iterator_casted->try_load_index()) { |
977 | 0 | VLOG_DEBUG << "Failed to load ANN index, fallback to brute force search"; |
978 | 0 | _need_read_data_indices[src_cid] = true; |
979 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += 1; |
980 | 0 | return Status::OK(); |
981 | 0 | } |
982 | 0 | double load_costs_ms = |
983 | 0 | static_cast<double>(ann_index_stats.load_index_costs_ns.value()) / 1000000.0; |
984 | 0 | DorisMetrics::instance()->ann_index_load_costs_ms->increment( |
985 | 0 | static_cast<int64_t>(load_costs_ms)); |
986 | 0 | } |
987 | | |
988 | 0 | RETURN_IF_ERROR(_ann_topn_runtime->evaluate_vector_ann_search( |
989 | 0 | ann_index_iterator_casted, &_row_bitmap, rows_of_segment, result_column, result_row_ids, |
990 | 0 | ann_index_stats)); |
991 | | |
992 | 0 | VLOG_DEBUG << fmt::format("Ann topn filtered {} - {} = {} rows", pre_size, |
993 | 0 | _row_bitmap.cardinality(), pre_size - _row_bitmap.cardinality()); |
994 | |
|
995 | 0 | int64_t rows_filterd = pre_size - _row_bitmap.cardinality(); |
996 | 0 | _opts.stats->rows_ann_index_topn_filtered += rows_filterd; |
997 | 0 | _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value(); |
998 | 0 | _opts.stats->ann_topn_search_ns += ann_index_stats.search_costs_ns.value(); |
999 | 0 | _opts.stats->ann_ivf_on_disk_load_ns += ann_index_stats.ivf_on_disk_load_costs_ns.value(); |
1000 | 0 | _opts.stats->ann_ivf_on_disk_cache_hit_cnt += ann_index_stats.ivf_on_disk_cache_hit_cnt.value(); |
1001 | 0 | _opts.stats->ann_ivf_on_disk_cache_miss_cnt += |
1002 | 0 | ann_index_stats.ivf_on_disk_cache_miss_cnt.value(); |
1003 | 0 | _opts.stats->ann_index_topn_engine_search_ns += ann_index_stats.engine_search_ns.value(); |
1004 | 0 | _opts.stats->ann_index_topn_result_process_ns += |
1005 | 0 | ann_index_stats.result_process_costs_ns.value(); |
1006 | 0 | _opts.stats->ann_index_topn_engine_convert_ns += ann_index_stats.engine_convert_ns.value(); |
1007 | 0 | _opts.stats->ann_index_topn_engine_prepare_ns += ann_index_stats.engine_prepare_ns.value(); |
1008 | 0 | _opts.stats->ann_index_topn_search_cnt += 1; |
1009 | 0 | const size_t dst_col_idx = _ann_topn_runtime->get_dest_column_idx(); |
1010 | 0 | ColumnIterator* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get(); |
1011 | 0 | DCHECK(column_iter != nullptr); |
1012 | 0 | VirtualColumnIterator* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter); |
1013 | 0 | DCHECK(virtual_column_iter != nullptr); |
1014 | 0 | VLOG_DEBUG << fmt::format( |
1015 | 0 | "Virtual column iterator, column_idx {}, is materialized with {} rows", dst_col_idx, |
1016 | 0 | result_row_ids->size()); |
1017 | | // reference count of result_column should be 1, so move will not issue any data copy. |
1018 | 0 | virtual_column_iter->prepare_materialization(std::move(result_column), result_row_ids); |
1019 | |
|
1020 | 0 | _need_read_data_indices[src_cid] = false; |
1021 | 0 | VLOG_DEBUG << fmt::format( |
1022 | 0 | "Enable ANN index-only scan for src column cid {} (skip reading data pages)", src_cid); |
1023 | |
|
1024 | 0 | return Status::OK(); |
1025 | 0 | } |
1026 | | |
1027 | 467 | Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row_ranges) { |
1028 | 467 | std::set<int32_t> cids; |
1029 | 467 | for (auto& entry : _opts.col_id_to_predicates) { |
1030 | 0 | cids.insert(entry.first); |
1031 | 0 | } |
1032 | | |
1033 | 467 | { |
1034 | 467 | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_dict_ns); |
1035 | | /// Low cardinality optimization is currently not very stable, so to prevent data corruption, |
1036 | | /// we are temporarily disabling its use in data compaction. |
1037 | | // TODO: enable it in not only ReaderTyper::READER_QUERY but also other reader types. |
1038 | 467 | if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) { |
1039 | 0 | RowRanges dict_row_ranges = RowRanges::create_single(num_rows()); |
1040 | 0 | for (auto cid : cids) { |
1041 | 0 | if (!_segment->can_apply_predicate_safely( |
1042 | 0 | cid, *_schema, _opts.target_cast_type_for_variants, _opts)) { |
1043 | 0 | continue; |
1044 | 0 | } |
1045 | 0 | DCHECK(_opts.col_id_to_predicates.count(cid) > 0); |
1046 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict( |
1047 | 0 | _opts.col_id_to_predicates.at(cid).get(), &dict_row_ranges)); |
1048 | | |
1049 | 0 | if (dict_row_ranges.is_empty()) { |
1050 | 0 | break; |
1051 | 0 | } |
1052 | 0 | } |
1053 | | |
1054 | 0 | if (dict_row_ranges.is_empty()) { |
1055 | 0 | RowRanges::ranges_intersection(*condition_row_ranges, dict_row_ranges, |
1056 | 0 | condition_row_ranges); |
1057 | 0 | _opts.stats->segment_dict_filtered++; |
1058 | 0 | _opts.stats->filtered_segment_number++; |
1059 | 0 | return Status::OK(); |
1060 | 0 | } |
1061 | 0 | } |
1062 | 467 | } |
1063 | | |
1064 | 467 | size_t pre_size = 0; |
1065 | 467 | { |
1066 | 467 | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_bf_ns); |
1067 | | // first filter data by bloom filter index |
1068 | | // bloom filter index only use CondColumn |
1069 | 467 | RowRanges bf_row_ranges = RowRanges::create_single(num_rows()); |
1070 | 467 | for (auto& cid : cids) { |
1071 | 0 | DCHECK(_opts.col_id_to_predicates.count(cid) > 0); |
1072 | 0 | if (!_segment->can_apply_predicate_safely(cid, *_schema, |
1073 | 0 | _opts.target_cast_type_for_variants, _opts)) { |
1074 | 0 | continue; |
1075 | 0 | } |
1076 | | // get row ranges by bf index of this column, |
1077 | 0 | RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows()); |
1078 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter( |
1079 | 0 | _opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges)); |
1080 | 0 | RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges); |
1081 | 0 | } |
1082 | | |
1083 | 467 | pre_size = condition_row_ranges->count(); |
1084 | 467 | RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges); |
1085 | 467 | _opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count()); |
1086 | 467 | } |
1087 | | |
1088 | 0 | { |
1089 | 467 | SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_zonemap_ns); |
1090 | 467 | RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows()); |
1091 | | // second filter data by zone map |
1092 | 467 | for (const auto& cid : cids) { |
1093 | 0 | DCHECK(_opts.col_id_to_predicates.count(cid) > 0); |
1094 | 0 | if (!_segment->can_apply_predicate_safely(cid, *_schema, |
1095 | 0 | _opts.target_cast_type_for_variants, _opts)) { |
1096 | 0 | continue; |
1097 | 0 | } |
1098 | | // do not check zonemap if predicate does not support zonemap |
1099 | 0 | if (!_opts.col_id_to_predicates.at(cid)->support_zonemap()) { |
1100 | 0 | VLOG_DEBUG << "skip zonemap for column " << cid; |
1101 | 0 | continue; |
1102 | 0 | } |
1103 | | // get row ranges by zone map of this column, |
1104 | 0 | RowRanges column_row_ranges = RowRanges::create_single(num_rows()); |
1105 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map( |
1106 | 0 | _opts.col_id_to_predicates.at(cid).get(), |
1107 | 0 | _opts.del_predicates_for_zone_map.count(cid) > 0 |
1108 | 0 | ? &(_opts.del_predicates_for_zone_map.at(cid)) |
1109 | 0 | : nullptr, |
1110 | 0 | &column_row_ranges)); |
1111 | | // intersect different columns's row ranges to get final row ranges by zone map |
1112 | 0 | RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges, |
1113 | 0 | &zone_map_row_ranges); |
1114 | 0 | } |
1115 | | |
1116 | 467 | pre_size = condition_row_ranges->count(); |
1117 | 467 | RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, |
1118 | 467 | condition_row_ranges); |
1119 | | |
1120 | 467 | size_t pre_size2 = condition_row_ranges->count(); |
1121 | 467 | RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges, |
1122 | 467 | condition_row_ranges); |
1123 | 467 | _opts.stats->rows_stats_rp_filtered += (pre_size2 - condition_row_ranges->count()); |
1124 | 467 | _opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count()); |
1125 | 467 | } |
1126 | | |
1127 | 0 | return Status::OK(); |
1128 | 467 | } |
1129 | | |
1130 | 0 | bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) { |
1131 | 0 | switch (node_type) { |
1132 | 0 | case TExprNodeType::BOOL_LITERAL: |
1133 | 0 | case TExprNodeType::INT_LITERAL: |
1134 | 0 | case TExprNodeType::LARGE_INT_LITERAL: |
1135 | 0 | case TExprNodeType::FLOAT_LITERAL: |
1136 | 0 | case TExprNodeType::DECIMAL_LITERAL: |
1137 | 0 | case TExprNodeType::STRING_LITERAL: |
1138 | 0 | case TExprNodeType::DATE_LITERAL: |
1139 | 0 | case TExprNodeType::TIMEV2_LITERAL: |
1140 | 0 | return true; |
1141 | 0 | default: |
1142 | 0 | return false; |
1143 | 0 | } |
1144 | 0 | } |
1145 | | |
1146 | 0 | Status SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) { |
1147 | 0 | auto& children = expr->children(); |
1148 | 0 | for (int i = 0; i < children.size(); ++i) { |
1149 | 0 | RETURN_IF_ERROR(_extract_common_expr_columns(children[i])); |
1150 | 0 | } |
1151 | | |
1152 | 0 | auto node_type = expr->node_type(); |
1153 | 0 | if (node_type == TExprNodeType::SLOT_REF) { |
1154 | 0 | auto slot_expr = std::dynamic_pointer_cast<doris::VSlotRef>(expr); |
1155 | 0 | _is_common_expr_column[_schema->column_id(slot_expr->column_id())] = true; |
1156 | 0 | _common_expr_columns.insert(_schema->column_id(slot_expr->column_id())); |
1157 | 0 | } else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) { |
1158 | 0 | std::shared_ptr<VirtualSlotRef> virtual_slot_ref = |
1159 | 0 | std::dynamic_pointer_cast<VirtualSlotRef>(expr); |
1160 | 0 | RETURN_IF_ERROR(_extract_common_expr_columns(virtual_slot_ref->get_virtual_column_expr())); |
1161 | 0 | } |
1162 | | |
1163 | 0 | return Status::OK(); |
1164 | 0 | } |
1165 | | |
1166 | 0 | bool SegmentIterator::_check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred) { |
1167 | 0 | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) { |
1168 | 0 | return false; |
1169 | 0 | } |
1170 | 0 | auto pred_column_id = pred->column_id(); |
1171 | 0 | if (_index_iterators[pred_column_id] == nullptr) { |
1172 | | //this column without inverted index |
1173 | 0 | return false; |
1174 | 0 | } |
1175 | | |
1176 | 0 | if (_inverted_index_not_support_pred_type(pred->type())) { |
1177 | 0 | return false; |
1178 | 0 | } |
1179 | | |
1180 | 0 | if (pred->type() == PredicateType::IN_LIST || pred->type() == PredicateType::NOT_IN_LIST) { |
1181 | | // in_list or not_in_list predicate produced by runtime filter |
1182 | 0 | if (pred->is_runtime_filter()) { |
1183 | 0 | return false; |
1184 | 0 | } |
1185 | 0 | } |
1186 | | |
1187 | | // UNTOKENIZED strings exceed ignore_above, they are written as null, causing range query errors |
1188 | 0 | if (PredicateTypeTraits::is_range(pred->type()) && |
1189 | 0 | !IndexReaderHelper::has_bkd_index(_index_iterators[pred_column_id].get())) { |
1190 | 0 | return false; |
1191 | 0 | } |
1192 | | |
1193 | | // Function filter no apply inverted index |
1194 | 0 | if (dynamic_cast<LikeColumnPredicate<TYPE_CHAR>*>(pred.get()) != nullptr || |
1195 | 0 | dynamic_cast<LikeColumnPredicate<TYPE_STRING>*>(pred.get()) != nullptr) { |
1196 | 0 | return false; |
1197 | 0 | } |
1198 | | |
1199 | 0 | bool handle_by_fulltext = _column_has_fulltext_index(pred_column_id); |
1200 | 0 | if (handle_by_fulltext) { |
1201 | | // when predicate is leafNode of andNode, |
1202 | | // can apply 'match query' and 'equal query' and 'list query' for fulltext index. |
1203 | 0 | return pred->type() == PredicateType::MATCH || pred->type() == PredicateType::IS_NULL || |
1204 | 0 | pred->type() == PredicateType::IS_NOT_NULL || |
1205 | 0 | PredicateTypeTraits::is_equal_or_list(pred->type()); |
1206 | 0 | } |
1207 | | |
1208 | 0 | return true; |
1209 | 0 | } |
1210 | | |
1211 | | // TODO: optimization when all expr can not evaluate by inverted/ann index, |
1212 | 0 | Status SegmentIterator::_apply_index_expr() { |
1213 | 0 | for (const auto& expr_ctx : _common_expr_ctxs_push_down) { |
1214 | 0 | if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) { |
1215 | 0 | if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) { |
1216 | 0 | continue; |
1217 | 0 | } else { |
1218 | | // other code is not to be handled, we should just break |
1219 | 0 | LOG(WARNING) << "failed to evaluate inverted index for expr_ctx: " |
1220 | 0 | << expr_ctx->root()->debug_string() |
1221 | 0 | << ", error msg: " << st.to_string(); |
1222 | 0 | return st; |
1223 | 0 | } |
1224 | 0 | } |
1225 | 0 | } |
1226 | | |
1227 | | // Evaluate inverted index for virtual column MATCH expressions (projections). |
1228 | | // Unlike common exprs which filter rows, these only compute index result bitmaps |
1229 | | // for later materialization via fast_execute(). |
1230 | 0 | for (auto& [cid, expr_ctx] : _virtual_column_exprs) { |
1231 | 0 | if (expr_ctx->get_index_context() == nullptr) { |
1232 | 0 | continue; |
1233 | 0 | } |
1234 | 0 | if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) { |
1235 | 0 | if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) { |
1236 | 0 | continue; |
1237 | 0 | } else { |
1238 | 0 | LOG(WARNING) << "failed to evaluate inverted index for virtual column expr: " |
1239 | 0 | << expr_ctx->root()->debug_string() |
1240 | 0 | << ", error msg: " << st.to_string(); |
1241 | 0 | return st; |
1242 | 0 | } |
1243 | 0 | } |
1244 | 0 | } |
1245 | | |
1246 | | // Apply ann range search |
1247 | 0 | for (const auto& expr_ctx : _common_expr_ctxs_push_down) { |
1248 | 0 | segment_v2::AnnIndexStats ann_index_stats; |
1249 | 0 | size_t origin_rows = _row_bitmap.cardinality(); |
1250 | 0 | RETURN_IF_ERROR(expr_ctx->evaluate_ann_range_search( |
1251 | 0 | _index_iterators, _schema->column_ids(), _column_iterators, |
1252 | 0 | _common_expr_to_slotref_map, _row_bitmap, ann_index_stats)); |
1253 | 0 | _opts.stats->rows_ann_index_range_filtered += (origin_rows - _row_bitmap.cardinality()); |
1254 | 0 | _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value(); |
1255 | 0 | _opts.stats->ann_index_range_search_ns += ann_index_stats.search_costs_ns.value(); |
1256 | 0 | _opts.stats->ann_ivf_on_disk_load_ns += ann_index_stats.ivf_on_disk_load_costs_ns.value(); |
1257 | 0 | _opts.stats->ann_ivf_on_disk_cache_hit_cnt += |
1258 | 0 | ann_index_stats.ivf_on_disk_cache_hit_cnt.value(); |
1259 | 0 | _opts.stats->ann_ivf_on_disk_cache_miss_cnt += |
1260 | 0 | ann_index_stats.ivf_on_disk_cache_miss_cnt.value(); |
1261 | 0 | _opts.stats->ann_range_engine_search_ns += ann_index_stats.engine_search_ns.value(); |
1262 | 0 | _opts.stats->ann_range_result_convert_ns += ann_index_stats.result_process_costs_ns.value(); |
1263 | 0 | _opts.stats->ann_range_engine_convert_ns += ann_index_stats.engine_convert_ns.value(); |
1264 | 0 | _opts.stats->ann_range_pre_process_ns += ann_index_stats.engine_prepare_ns.value(); |
1265 | 0 | _opts.stats->ann_fall_back_brute_force_cnt += ann_index_stats.fall_back_brute_force_cnt; |
1266 | 0 | } |
1267 | | |
1268 | 0 | for (auto it = _common_expr_ctxs_push_down.begin(); it != _common_expr_ctxs_push_down.end();) { |
1269 | 0 | if ((*it)->root()->ann_range_search_executedd()) { |
1270 | 0 | _opts.stats->ann_index_range_search_cnt++; |
1271 | 0 | it = _common_expr_ctxs_push_down.erase(it); |
1272 | 0 | } else { |
1273 | 0 | ++it; |
1274 | 0 | } |
1275 | 0 | } |
1276 | | // TODO:Do we need to remove these expr root from _remaining_conjunct_roots? |
1277 | |
|
1278 | 0 | return Status::OK(); |
1279 | 0 | } |
1280 | | |
1281 | 0 | bool SegmentIterator::_downgrade_without_index(Status res, bool need_remaining) { |
1282 | 0 | bool is_fallback = |
1283 | 0 | _opts.runtime_state->query_options().enable_fallback_on_missing_inverted_index; |
1284 | 0 | if ((res.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND && is_fallback) || |
1285 | 0 | res.code() == ErrorCode::INVERTED_INDEX_BYPASS || |
1286 | 0 | res.code() == ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED || |
1287 | 0 | (res.code() == ErrorCode::INVERTED_INDEX_NO_TERMS && need_remaining) || |
1288 | 0 | res.code() == ErrorCode::INVERTED_INDEX_FILE_CORRUPTED) { |
1289 | | // 1. INVERTED_INDEX_FILE_NOT_FOUND means index file has not been built, |
1290 | | // usually occurs when creating a new index, queries can be downgraded |
1291 | | // without index. |
1292 | | // 2. INVERTED_INDEX_BYPASS means the hit of condition by index |
1293 | | // has reached the optimal limit, downgrade without index query can |
1294 | | // improve query performance. |
1295 | | // 3. INVERTED_INDEX_EVALUATE_SKIPPED means the inverted index is not |
1296 | | // suitable for executing this predicate, skipped it and filter data |
1297 | | // by function later. |
1298 | | // 4. INVERTED_INDEX_NO_TERMS means the column has fulltext index, |
1299 | | // but the column condition value no terms in specified parser, |
1300 | | // such as: where A = '' and B = ',' |
1301 | | // the predicate of A and B need downgrade without index query. |
1302 | | // 5. INVERTED_INDEX_FILE_CORRUPTED means the index file is corrupted, |
1303 | | // such as when index segment files are not generated |
1304 | | // above case can downgrade without index query |
1305 | 0 | _opts.stats->inverted_index_downgrade_count++; |
1306 | 0 | if (!res.is<ErrorCode::INVERTED_INDEX_BYPASS>()) { |
1307 | 0 | LOG(INFO) << "will downgrade without index to evaluate predicate, because of res: " |
1308 | 0 | << res; |
1309 | 0 | } else { |
1310 | 0 | VLOG_DEBUG << "will downgrade without index to evaluate predicate, because of res: " |
1311 | 0 | << res; |
1312 | 0 | } |
1313 | 0 | return true; |
1314 | 0 | } |
1315 | 0 | return false; |
1316 | 0 | } |
1317 | | |
1318 | 0 | bool SegmentIterator::_column_has_fulltext_index(int32_t cid) { |
1319 | 0 | bool has_fulltext_index = |
1320 | 0 | _index_iterators[cid] != nullptr && |
1321 | 0 | _index_iterators[cid]->get_reader(InvertedIndexReaderType::FULLTEXT) && |
1322 | 0 | _index_iterators[cid]->get_reader(InvertedIndexReaderType::STRING_TYPE) == nullptr; |
1323 | |
|
1324 | 0 | return has_fulltext_index; |
1325 | 0 | } |
1326 | | |
1327 | 0 | inline bool SegmentIterator::_inverted_index_not_support_pred_type(const PredicateType& type) { |
1328 | 0 | return type == PredicateType::BF || type == PredicateType::BITMAP_FILTER; |
1329 | 0 | } |
1330 | | |
1331 | | Status SegmentIterator::_apply_inverted_index_on_column_predicate( |
1332 | | std::shared_ptr<ColumnPredicate> pred, |
1333 | 0 | std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates, bool* continue_apply) { |
1334 | 0 | if (!_check_apply_by_inverted_index(pred)) { |
1335 | 0 | remaining_predicates.emplace_back(pred); |
1336 | 0 | } else { |
1337 | 0 | bool need_remaining_after_evaluate = _column_has_fulltext_index(pred->column_id()) && |
1338 | 0 | PredicateTypeTraits::is_equal_or_list(pred->type()); |
1339 | 0 | Status res = |
1340 | 0 | pred->evaluate(_storage_name_and_type[pred->column_id()], |
1341 | 0 | _index_iterators[pred->column_id()].get(), num_rows(), &_row_bitmap); |
1342 | 0 | if (!res.ok()) { |
1343 | 0 | if (_downgrade_without_index(res, need_remaining_after_evaluate)) { |
1344 | 0 | remaining_predicates.emplace_back(pred); |
1345 | 0 | return Status::OK(); |
1346 | 0 | } |
1347 | 0 | LOG(WARNING) << "failed to evaluate index" |
1348 | 0 | << ", column predicate type: " << pred->pred_type_string(pred->type()) |
1349 | 0 | << ", error msg: " << res; |
1350 | 0 | return res; |
1351 | 0 | } |
1352 | | |
1353 | 0 | if (_row_bitmap.isEmpty()) { |
1354 | | // all rows have been pruned, no need to process further predicates |
1355 | 0 | *continue_apply = false; |
1356 | 0 | } |
1357 | |
|
1358 | 0 | if (need_remaining_after_evaluate) { |
1359 | 0 | remaining_predicates.emplace_back(pred); |
1360 | 0 | return Status::OK(); |
1361 | 0 | } |
1362 | 0 | if (!pred->is_runtime_filter()) { |
1363 | 0 | _column_predicate_index_exec_status[pred->column_id()][pred] = true; |
1364 | 0 | } |
1365 | 0 | } |
1366 | 0 | return Status::OK(); |
1367 | 0 | } |
1368 | | |
1369 | 27.1k | bool SegmentIterator::_need_read_data(ColumnId cid) { |
1370 | 27.1k | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) { |
1371 | 0 | return true; |
1372 | 0 | } |
1373 | | // only support DUP_KEYS and UNIQUE_KEYS with MOW |
1374 | 27.1k | if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS || |
1375 | 27.1k | (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS && |
1376 | 11.1k | _opts.enable_unique_key_merge_on_write)))) { |
1377 | 7.58k | return true; |
1378 | 7.58k | } |
1379 | | // this is a virtual column, we always need to read data |
1380 | 19.5k | if (this->_vir_cid_to_idx_in_block.contains(cid)) { |
1381 | 0 | return true; |
1382 | 0 | } |
1383 | | |
1384 | | // if there is a delete predicate, we always need to read data |
1385 | 19.5k | if (_has_delete_predicate(cid)) { |
1386 | 1.74k | return true; |
1387 | 1.74k | } |
1388 | 17.8k | if (_output_columns.count(-1)) { |
1389 | | // if _output_columns contains -1, it means that the light |
1390 | | // weight schema change may not be enabled or other reasons |
1391 | | // caused the column unique_id not be set, to prevent errors |
1392 | | // occurring, return true here that column data needs to be read |
1393 | 0 | return true; |
1394 | 0 | } |
1395 | | // Check the following conditions: |
1396 | | // 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]') |
1397 | | // and it's not marked for projection in '_output_columns'. |
1398 | | // 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns', |
1399 | | // and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function. |
1400 | | // If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column. |
1401 | | // Then, return false. |
1402 | 17.8k | const auto& column = _opts.tablet_schema->column(cid); |
1403 | | // Different subcolumns may share the same parent_unique_id, so we choose to abandon this optimization. |
1404 | 17.8k | if (column.is_extracted_column() && |
1405 | 17.8k | _opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) { |
1406 | 13 | return true; |
1407 | 13 | } |
1408 | 17.8k | int32_t unique_id = column.unique_id(); |
1409 | 17.8k | if (unique_id < 0) { |
1410 | 1 | unique_id = column.parent_unique_id(); |
1411 | 1 | } |
1412 | 17.8k | if ((_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] && |
1413 | 17.8k | !_output_columns.contains(unique_id)) || |
1414 | 17.8k | (_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] && |
1415 | 17.8k | _output_columns.count(unique_id) == 1 && |
1416 | 17.8k | _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) { |
1417 | 1 | VLOG_DEBUG << "SegmentIterator no need read data for column: " |
1418 | 0 | << _opts.tablet_schema->column_by_uid(unique_id).name(); |
1419 | 1 | return false; |
1420 | 1 | } |
1421 | 17.8k | return true; |
1422 | 17.8k | } |
1423 | | |
1424 | 0 | Status SegmentIterator::_apply_inverted_index() { |
1425 | 0 | std::vector<std::shared_ptr<ColumnPredicate>> remaining_predicates; |
1426 | 0 | std::set<std::shared_ptr<ColumnPredicate>> no_need_to_pass_column_predicate_set; |
1427 | |
|
1428 | 0 | for (auto pred : _col_predicates) { |
1429 | 0 | if (no_need_to_pass_column_predicate_set.count(pred) > 0) { |
1430 | 0 | continue; |
1431 | 0 | } else { |
1432 | 0 | bool continue_apply = true; |
1433 | 0 | RETURN_IF_ERROR(_apply_inverted_index_on_column_predicate(pred, remaining_predicates, |
1434 | 0 | &continue_apply)); |
1435 | 0 | if (!continue_apply) { |
1436 | 0 | break; |
1437 | 0 | } |
1438 | 0 | } |
1439 | 0 | } |
1440 | | |
1441 | 0 | _col_predicates = std::move(remaining_predicates); |
1442 | 0 | return Status::OK(); |
1443 | 0 | } |
1444 | | |
1445 | | /** |
1446 | | * @brief Checks if all conditions related to a specific column have passed in both |
1447 | | * `_column_predicate_inverted_index_status` and `_common_expr_inverted_index_status`. |
1448 | | * |
1449 | | * This function first checks the conditions in `_column_predicate_inverted_index_status` |
1450 | | * for the given `ColumnId`. If all conditions pass, it sets `default_return` to `true`. |
1451 | | * It then checks the conditions in `_common_expr_inverted_index_status` for the same column. |
1452 | | * |
1453 | | * The function returns `true` if all conditions in both maps pass. If any condition fails |
1454 | | * in either map, the function immediately returns `false`. If the column does not exist |
1455 | | * in one of the maps, the function returns `default_return`. |
1456 | | * |
1457 | | * @param cid The ColumnId of the column to check. |
1458 | | * @param default_return The default value to return if the column is not found in the status maps. |
1459 | | * @return true if all conditions in both status maps pass, or if the column is not found |
1460 | | * and `default_return` is true. |
1461 | | * @return false if any condition in either status map fails, or if the column is not found |
1462 | | * and `default_return` is false. |
1463 | | */ |
1464 | | bool SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(ColumnId cid, |
1465 | 3 | bool default_return) { |
1466 | 3 | auto pred_it = _column_predicate_index_exec_status.find(cid); |
1467 | 3 | if (pred_it != _column_predicate_index_exec_status.end()) { |
1468 | 2 | const auto& pred_map = pred_it->second; |
1469 | 2 | bool pred_passed = std::all_of(pred_map.begin(), pred_map.end(), |
1470 | 2 | [](const auto& pred_entry) { return pred_entry.second; }); |
1471 | 2 | if (!pred_passed) { |
1472 | 1 | return false; |
1473 | 1 | } else { |
1474 | 1 | default_return = true; |
1475 | 1 | } |
1476 | 2 | } |
1477 | | |
1478 | 2 | auto expr_it = _common_expr_index_exec_status.find(cid); |
1479 | 2 | if (expr_it != _common_expr_index_exec_status.end()) { |
1480 | 0 | const auto& expr_map = expr_it->second; |
1481 | 0 | return std::all_of(expr_map.begin(), expr_map.end(), |
1482 | 0 | [](const auto& expr_entry) { return expr_entry.second; }); |
1483 | 0 | } |
1484 | 2 | return default_return; |
1485 | 2 | } |
1486 | | |
1487 | 2.82k | Status SegmentIterator::_init_return_column_iterators() { |
1488 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_return_column_iterators_timer_ns); |
1489 | 2.82k | if (_cur_rowid >= num_rows()) { |
1490 | 0 | return Status::OK(); |
1491 | 0 | } |
1492 | | |
1493 | 7.05k | for (auto cid : _schema->column_ids()) { |
1494 | 7.05k | if (_schema->column(cid)->name() == BeConsts::ROWID_COL) { |
1495 | 0 | _column_iterators[cid].reset( |
1496 | 0 | new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id())); |
1497 | 0 | continue; |
1498 | 0 | } |
1499 | | |
1500 | 7.05k | if (_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) { |
1501 | 0 | auto& id_file_map = _opts.runtime_state->get_id_file_map(); |
1502 | 0 | uint32_t file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>( |
1503 | 0 | _opts.tablet_id, _opts.rowset_id, _segment->id())); |
1504 | 0 | _column_iterators[cid].reset(new RowIdColumnIteratorV2( |
1505 | 0 | IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id)); |
1506 | 0 | continue; |
1507 | 0 | } |
1508 | | |
1509 | 7.05k | if (_schema->column(cid)->name().starts_with(BeConsts::VIRTUAL_COLUMN_PREFIX)) { |
1510 | 0 | _column_iterators[cid] = std::make_unique<VirtualColumnIterator>(); |
1511 | 0 | continue; |
1512 | 0 | } |
1513 | | |
1514 | 7.05k | std::set<ColumnId> del_cond_id_set; |
1515 | 7.05k | _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set); |
1516 | 7.05k | std::vector<bool> tmp_is_pred_column; |
1517 | 7.05k | tmp_is_pred_column.resize(_schema->columns().size(), false); |
1518 | 7.05k | for (auto predicate : _col_predicates) { |
1519 | 0 | auto p_cid = predicate->column_id(); |
1520 | 0 | tmp_is_pred_column[p_cid] = true; |
1521 | 0 | } |
1522 | | // handle delete_condition |
1523 | 7.05k | for (auto d_cid : del_cond_id_set) { |
1524 | 1.32k | tmp_is_pred_column[d_cid] = true; |
1525 | 1.32k | } |
1526 | | |
1527 | 7.05k | if (_column_iterators[cid] == nullptr) { |
1528 | 7.05k | RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid), |
1529 | 7.05k | &_column_iterators[cid], &_opts, |
1530 | 7.05k | &_variant_sparse_column_cache)); |
1531 | 7.05k | ColumnIteratorOptions iter_opts { |
1532 | 7.05k | .use_page_cache = _opts.use_page_cache, |
1533 | | // If the col is predicate column, then should read the last page to check |
1534 | | // if the column is full dict encoding |
1535 | 7.05k | .is_predicate_column = tmp_is_pred_column[cid], |
1536 | 7.05k | .file_reader = _file_reader.get(), |
1537 | 7.05k | .stats = _opts.stats, |
1538 | 7.05k | .io_ctx = _opts.io_ctx, |
1539 | 7.05k | }; |
1540 | 7.05k | RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts)); |
1541 | 7.05k | } |
1542 | 7.05k | } |
1543 | | |
1544 | 2.82k | #ifndef NDEBUG |
1545 | 2.82k | for (auto pair : _vir_cid_to_idx_in_block) { |
1546 | 0 | ColumnId vir_col_cid = pair.first; |
1547 | 0 | DCHECK(_column_iterators[vir_col_cid] != nullptr) |
1548 | 0 | << "Virtual column iterator for " << vir_col_cid << " should not be null"; |
1549 | 0 | ColumnIterator* column_iter = _column_iterators[vir_col_cid].get(); |
1550 | 0 | DCHECK(dynamic_cast<VirtualColumnIterator*>(column_iter) != nullptr) |
1551 | 0 | << "Virtual column iterator for " << vir_col_cid |
1552 | 0 | << " should be VirtualColumnIterator"; |
1553 | 0 | } |
1554 | 2.82k | #endif |
1555 | 2.82k | return Status::OK(); |
1556 | 2.82k | } |
1557 | | |
1558 | 2.82k | Status SegmentIterator::_init_index_iterators() { |
1559 | 2.82k | SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_index_iterators_timer_ns); |
1560 | 2.82k | if (_cur_rowid >= num_rows()) { |
1561 | 0 | return Status::OK(); |
1562 | 0 | } |
1563 | | |
1564 | 2.82k | _index_query_context = std::make_shared<IndexQueryContext>(); |
1565 | 2.82k | _index_query_context->io_ctx = &_opts.io_ctx; |
1566 | 2.82k | _index_query_context->stats = _opts.stats; |
1567 | 2.82k | _index_query_context->runtime_state = _opts.runtime_state; |
1568 | | |
1569 | 2.82k | if (_score_runtime) { |
1570 | 0 | _index_query_context->collection_statistics = _opts.collection_statistics; |
1571 | 0 | _index_query_context->collection_similarity = std::make_shared<CollectionSimilarity>(); |
1572 | 0 | _index_query_context->query_limit = _score_runtime->get_limit(); |
1573 | 0 | _index_query_context->is_asc = _score_runtime->is_asc(); |
1574 | 0 | } |
1575 | | |
1576 | | // Inverted index iterators |
1577 | 7.05k | for (auto cid : _schema->column_ids()) { |
1578 | | // Use segment’s own index_meta, for compatibility with future indexing needs to default to lowercase. |
1579 | 7.05k | if (_index_iterators[cid] == nullptr) { |
1580 | | // In the _opts.tablet_schema, the sub-column type information for the variant is FieldType::OLAP_FIELD_TYPE_VARIANT. |
1581 | | // This is because the sub-column is created in create_materialized_variant_column. |
1582 | | // We use this column to locate the metadata for the inverted index, which requires a unique_id and path. |
1583 | 7.05k | const auto& column = _opts.tablet_schema->column(cid); |
1584 | 7.05k | std::vector<const TabletIndex*> inverted_indexs; |
1585 | | // Keep shared_ptr alive to prevent use-after-free when accessing raw pointers |
1586 | 7.05k | TabletIndexes inverted_indexs_holder; |
1587 | | // If the column is an extracted column, we need to find the sub-column in the parent column reader. |
1588 | 7.05k | std::shared_ptr<ColumnReader> column_reader; |
1589 | 7.05k | if (column.is_extracted_column()) { |
1590 | 6 | if (!_segment->_column_reader_cache->get_column_reader( |
1591 | 6 | column.parent_unique_id(), &column_reader, _opts.stats) || |
1592 | 6 | column_reader == nullptr) { |
1593 | 0 | continue; |
1594 | 0 | } |
1595 | 6 | auto* variant_reader = assert_cast<VariantColumnReader*>(column_reader.get()); |
1596 | 6 | DataTypePtr data_type = _storage_name_and_type[cid].second; |
1597 | 6 | if (data_type != nullptr && |
1598 | 6 | data_type->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
1599 | 0 | DataTypePtr inferred_type; |
1600 | 0 | Status st = variant_reader->infer_data_type_for_path( |
1601 | 0 | &inferred_type, column, _opts, _segment->_column_reader_cache.get()); |
1602 | 0 | if (st.ok() && inferred_type != nullptr) { |
1603 | 0 | data_type = inferred_type; |
1604 | 0 | } |
1605 | 0 | } |
1606 | 6 | inverted_indexs_holder = |
1607 | 6 | variant_reader->find_subcolumn_tablet_indexes(column, data_type); |
1608 | | // Extract raw pointers from shared_ptr for iteration |
1609 | 6 | for (const auto& index_ptr : inverted_indexs_holder) { |
1610 | 0 | inverted_indexs.push_back(index_ptr.get()); |
1611 | 0 | } |
1612 | 6 | } |
1613 | | // If the column is not an extracted column, we can directly get the inverted index metadata from the tablet schema. |
1614 | 7.05k | else { |
1615 | 7.05k | inverted_indexs = _segment->_tablet_schema->inverted_indexs(column); |
1616 | 7.05k | } |
1617 | 7.05k | for (const auto& inverted_index : inverted_indexs) { |
1618 | 2.56k | RETURN_IF_ERROR(_segment->new_index_iterator(column, inverted_index, _opts, |
1619 | 2.56k | &_index_iterators[cid])); |
1620 | 2.56k | } |
1621 | 7.05k | if (_index_iterators[cid] != nullptr) { |
1622 | 2.55k | _index_iterators[cid]->set_context(_index_query_context); |
1623 | 2.55k | } |
1624 | 7.05k | } |
1625 | 7.05k | } |
1626 | | |
1627 | | // Ann index iterators |
1628 | 7.05k | for (auto cid : _schema->column_ids()) { |
1629 | 7.05k | if (_index_iterators[cid] == nullptr) { |
1630 | 4.50k | const auto& column = _opts.tablet_schema->column(cid); |
1631 | 4.50k | const auto* index_meta = _segment->_tablet_schema->ann_index(column); |
1632 | 4.50k | if (index_meta) { |
1633 | 1 | RETURN_IF_ERROR(_segment->new_index_iterator(column, index_meta, _opts, |
1634 | 1 | &_index_iterators[cid])); |
1635 | | |
1636 | 1 | if (_index_iterators[cid] != nullptr) { |
1637 | 1 | _index_iterators[cid]->set_context(_index_query_context); |
1638 | 1 | } |
1639 | 1 | } |
1640 | 4.50k | } |
1641 | 7.05k | } |
1642 | | |
1643 | 2.82k | return Status::OK(); |
1644 | 2.82k | } |
1645 | | |
1646 | | Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound, |
1647 | 0 | rowid_t* rowid) { |
1648 | 0 | if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS && |
1649 | 0 | _segment->get_primary_key_index() != nullptr) { |
1650 | 0 | return _lookup_ordinal_from_pk_index(key, is_include, rowid); |
1651 | 0 | } |
1652 | 0 | return _lookup_ordinal_from_sk_index(key, is_include, upper_bound, rowid); |
1653 | 0 | } |
1654 | | |
1655 | | // look up one key to get its ordinal at which can get data by using short key index. |
1656 | | // 'upper_bound' is defined the max ordinal the function will search. |
1657 | | // We use upper_bound to reduce search times. |
1658 | | // If we find a valid ordinal, it will be set in rowid and with Status::OK() |
1659 | | // If we can not find a valid key in this segment, we will set rowid to upper_bound |
1660 | | // Otherwise return error. |
1661 | | // 1. get [start, end) ordinal through short key index |
1662 | | // 2. binary search to find exact ordinal that match the input condition |
1663 | | // Make is_include template to reduce branch |
1664 | | Status SegmentIterator::_lookup_ordinal_from_sk_index(const RowCursor& key, bool is_include, |
1665 | 0 | rowid_t upper_bound, rowid_t* rowid) { |
1666 | 0 | const ShortKeyIndexDecoder* sk_index_decoder = _segment->get_short_key_index(); |
1667 | 0 | DCHECK(sk_index_decoder != nullptr); |
1668 | |
|
1669 | 0 | std::string index_key; |
1670 | 0 | key.encode_key_with_padding(&index_key, _segment->_tablet_schema->num_short_key_columns(), |
1671 | 0 | is_include); |
1672 | |
|
1673 | 0 | const auto& key_col_ids = key.schema()->column_ids(); |
1674 | | |
1675 | | // Clone the key once and pad CHAR fields to storage format before the binary search. |
1676 | | // _seek_block holds storage-format data where CHAR is zero-padded to column length, |
1677 | | // while RowCursor holds CHAR in compute format (unpadded). Padding once here avoids |
1678 | | // repeated allocation inside the comparison loop. |
1679 | 0 | RowCursor padded_key = key.clone(); |
1680 | 0 | padded_key.pad_char_fields(); |
1681 | |
|
1682 | 0 | ssize_t start_block_id = 0; |
1683 | 0 | auto start_iter = sk_index_decoder->lower_bound(index_key); |
1684 | 0 | if (start_iter.valid()) { |
1685 | | // Because previous block may contain this key, so we should set rowid to |
1686 | | // last block's first row. |
1687 | 0 | start_block_id = start_iter.ordinal(); |
1688 | 0 | if (start_block_id > 0) { |
1689 | 0 | start_block_id--; |
1690 | 0 | } |
1691 | 0 | } else { |
1692 | | // When we don't find a valid index item, which means all short key is |
1693 | | // smaller than input key, this means that this key may exist in the last |
1694 | | // row block. so we set the rowid to first row of last row block. |
1695 | 0 | start_block_id = sk_index_decoder->num_items() - 1; |
1696 | 0 | } |
1697 | 0 | rowid_t start = cast_set<rowid_t>(start_block_id) * sk_index_decoder->num_rows_per_block(); |
1698 | |
|
1699 | 0 | rowid_t end = upper_bound; |
1700 | 0 | auto end_iter = sk_index_decoder->upper_bound(index_key); |
1701 | 0 | if (end_iter.valid()) { |
1702 | 0 | end = cast_set<rowid_t>(end_iter.ordinal()) * sk_index_decoder->num_rows_per_block(); |
1703 | 0 | } |
1704 | | |
1705 | | // binary search to find the exact key |
1706 | 0 | while (start < end) { |
1707 | 0 | rowid_t mid = (start + end) / 2; |
1708 | 0 | RETURN_IF_ERROR(_seek_and_peek(mid)); |
1709 | 0 | int cmp = _compare_short_key_with_seek_block(padded_key, key_col_ids); |
1710 | 0 | if (cmp > 0) { |
1711 | 0 | start = mid + 1; |
1712 | 0 | } else if (cmp == 0) { |
1713 | 0 | if (is_include) { |
1714 | | // lower bound |
1715 | 0 | end = mid; |
1716 | 0 | } else { |
1717 | | // upper bound |
1718 | 0 | start = mid + 1; |
1719 | 0 | } |
1720 | 0 | } else { |
1721 | 0 | end = mid; |
1722 | 0 | } |
1723 | 0 | } |
1724 | | |
1725 | 0 | *rowid = start; |
1726 | 0 | return Status::OK(); |
1727 | 0 | } |
1728 | | |
1729 | | Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool is_include, |
1730 | 0 | rowid_t* rowid) { |
1731 | 0 | DCHECK(_segment->_tablet_schema->keys_type() == UNIQUE_KEYS); |
1732 | 0 | const PrimaryKeyIndexReader* pk_index_reader = _segment->get_primary_key_index(); |
1733 | 0 | DCHECK(pk_index_reader != nullptr); |
1734 | |
|
1735 | 0 | std::string index_key; |
1736 | 0 | key.encode_key_with_padding<true>(&index_key, _segment->_tablet_schema->num_key_columns(), |
1737 | 0 | is_include); |
1738 | 0 | if (index_key < _segment->min_key()) { |
1739 | 0 | *rowid = 0; |
1740 | 0 | return Status::OK(); |
1741 | 0 | } else if (index_key > _segment->max_key()) { |
1742 | 0 | *rowid = num_rows(); |
1743 | 0 | return Status::OK(); |
1744 | 0 | } |
1745 | 0 | bool exact_match = false; |
1746 | |
|
1747 | 0 | std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator; |
1748 | 0 | RETURN_IF_ERROR(pk_index_reader->new_iterator(&index_iterator, _opts.stats)); |
1749 | | |
1750 | 0 | Status status = index_iterator->seek_at_or_after(&index_key, &exact_match); |
1751 | 0 | if (UNLIKELY(!status.ok())) { |
1752 | 0 | *rowid = num_rows(); |
1753 | 0 | if (status.is<ENTRY_NOT_FOUND>()) { |
1754 | 0 | return Status::OK(); |
1755 | 0 | } |
1756 | 0 | return status; |
1757 | 0 | } |
1758 | 0 | *rowid = cast_set<rowid_t>(index_iterator->get_current_ordinal()); |
1759 | | |
1760 | | // The sequence column needs to be removed from primary key index when comparing key |
1761 | 0 | bool has_seq_col = _segment->_tablet_schema->has_sequence_col(); |
1762 | | // Used to get key range from primary key index, |
1763 | | // for mow with cluster key table, we should get key range from short key index. |
1764 | 0 | DCHECK(_segment->_tablet_schema->cluster_key_uids().empty()); |
1765 | | |
1766 | | // if full key is exact_match, the primary key without sequence column should also the same |
1767 | 0 | if (has_seq_col && !exact_match) { |
1768 | 0 | size_t seq_col_length = |
1769 | 0 | _segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx()) |
1770 | 0 | .length() + |
1771 | 0 | 1; |
1772 | 0 | auto index_type = DataTypeFactory::instance().create_data_type( |
1773 | 0 | _segment->_pk_index_reader->type_info()->type(), 1, 0); |
1774 | 0 | auto index_column = index_type->create_column(); |
1775 | 0 | size_t num_to_read = 1; |
1776 | 0 | size_t num_read = num_to_read; |
1777 | 0 | RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column)); |
1778 | 0 | DCHECK(num_to_read == num_read); |
1779 | |
|
1780 | 0 | Slice sought_key = |
1781 | 0 | Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size); |
1782 | 0 | Slice sought_key_without_seq = |
1783 | 0 | Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length); |
1784 | | |
1785 | | // compare key |
1786 | 0 | if (Slice(index_key).compare(sought_key_without_seq) == 0) { |
1787 | 0 | exact_match = true; |
1788 | 0 | } |
1789 | 0 | } |
1790 | | |
1791 | | // find the key in primary key index, and the is_include is false, so move |
1792 | | // to the next row. |
1793 | 0 | if (exact_match && !is_include) { |
1794 | 0 | *rowid += 1; |
1795 | 0 | } |
1796 | 0 | return Status::OK(); |
1797 | 0 | } |
1798 | | |
1799 | | // seek to the row and load that row to _key_cursor |
1800 | 0 | Status SegmentIterator::_seek_and_peek(rowid_t rowid) { |
1801 | 0 | { |
1802 | 0 | _opts.stats->block_init_seek_num += 1; |
1803 | 0 | SCOPED_RAW_TIMER(&_opts.stats->block_init_seek_ns); |
1804 | 0 | RETURN_IF_ERROR(_seek_columns(_seek_schema->column_ids(), rowid)); |
1805 | 0 | } |
1806 | 0 | size_t num_rows = 1; |
1807 | | |
1808 | | //note(wb) reset _seek_block for memory reuse |
1809 | | // it is easier to use row based memory layout for clear memory |
1810 | 0 | for (int i = 0; i < _seek_block.size(); i++) { |
1811 | 0 | _seek_block[i]->clear(); |
1812 | 0 | } |
1813 | 0 | RETURN_IF_ERROR(_read_columns(_seek_schema->column_ids(), _seek_block, num_rows)); |
1814 | 0 | return Status::OK(); |
1815 | 0 | } |
1816 | | |
1817 | 0 | Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) { |
1818 | 0 | for (auto cid : column_ids) { |
1819 | 0 | if (!_need_read_data(cid)) { |
1820 | 0 | continue; |
1821 | 0 | } |
1822 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(pos)); |
1823 | 0 | } |
1824 | 0 | return Status::OK(); |
1825 | 0 | } |
1826 | | |
1827 | | /* ---------------------- for vectorization implementation ---------------------- */ |
1828 | | |
1829 | | /** |
1830 | | * For storage layer data type, can be measured from two perspectives: |
1831 | | * 1 Whether the type can be read in a fast way(batch read using SIMD) |
1832 | | * Such as integer type and float type, this type can be read in SIMD way. |
1833 | | * For the type string/bitmap/hll, they can not be read in batch way, so read this type data is slow. |
1834 | | * If a type can be read fast, we can try to eliminate Lazy Materialization, because we think for this type, seek cost > read cost. |
1835 | | * This is an estimate, if we want more precise cost, statistics collection is necessary(this is a todo). |
1836 | | * In short, when returned non-pred columns contains string/hll/bitmap, we using Lazy Materialization. |
1837 | | * Otherwise, we disable it. |
1838 | | * |
1839 | | * When Lazy Materialization enable, we need to read column at least two times. |
1840 | | * First time to read Pred col, second time to read non-pred. |
1841 | | * Here's an interesting question to research, whether read Pred col once is the best plan. |
1842 | | * (why not read Pred col twice or more?) |
1843 | | * |
1844 | | * When Lazy Materialization disable, we just need to read once. |
1845 | | * |
1846 | | * |
1847 | | * 2 Whether the predicate type can be evaluate in a fast way(using SIMD to eval pred) |
1848 | | * Such as integer type and float type, they can be eval fast. |
1849 | | * But for BloomFilter/string/date, they eval slow. |
1850 | | * If a type can be eval fast, we use vectorization to eval it. |
1851 | | * Otherwise, we use short-circuit to eval it. |
1852 | | * |
1853 | | * |
1854 | | */ |
1855 | | |
1856 | | // todo(wb) need a UT here |
1857 | 2.82k | Status SegmentIterator::_vec_init_lazy_materialization() { |
1858 | 2.82k | _is_pred_column.resize(_schema->columns().size(), false); |
1859 | | |
1860 | | // including short/vec/delete pred |
1861 | 2.82k | std::set<ColumnId> pred_column_ids; |
1862 | 2.82k | _lazy_materialization_read = false; |
1863 | | |
1864 | 2.82k | std::set<ColumnId> del_cond_id_set; |
1865 | 2.82k | _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set); |
1866 | | |
1867 | 2.82k | std::set<std::shared_ptr<const ColumnPredicate>> delete_predicate_set {}; |
1868 | 2.82k | _opts.delete_condition_predicates->get_all_column_predicate(delete_predicate_set); |
1869 | 2.82k | for (auto predicate : delete_predicate_set) { |
1870 | 467 | if (PredicateTypeTraits::is_range(predicate->type())) { |
1871 | 327 | _delete_range_column_ids.push_back(predicate->column_id()); |
1872 | 327 | } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) { |
1873 | 0 | _delete_bloom_filter_column_ids.push_back(predicate->column_id()); |
1874 | 0 | } |
1875 | 467 | } |
1876 | | |
1877 | | // Step1: extract columns that can be lazy materialization |
1878 | 2.82k | if (!_col_predicates.empty() || !del_cond_id_set.empty()) { |
1879 | 467 | std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid |
1880 | 467 | std::set<ColumnId> vec_pred_col_id_set; |
1881 | | |
1882 | 467 | for (auto predicate : _col_predicates) { |
1883 | 0 | auto cid = predicate->column_id(); |
1884 | 0 | _is_pred_column[cid] = true; |
1885 | 0 | pred_column_ids.insert(cid); |
1886 | | |
1887 | | // check pred using short eval or vec eval |
1888 | 0 | if (_can_evaluated_by_vectorized(predicate)) { |
1889 | 0 | vec_pred_col_id_set.insert(cid); |
1890 | 0 | _pre_eval_block_predicate.push_back(predicate); |
1891 | 0 | } else { |
1892 | 0 | short_cir_pred_col_id_set.insert(cid); |
1893 | 0 | _short_cir_eval_predicate.push_back(predicate); |
1894 | 0 | } |
1895 | 0 | if (predicate->is_runtime_filter()) { |
1896 | 0 | _filter_info_id.push_back(predicate); |
1897 | 0 | } |
1898 | 0 | } |
1899 | | |
1900 | | // handle delete_condition |
1901 | 467 | if (!del_cond_id_set.empty()) { |
1902 | 467 | short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end()); |
1903 | 467 | pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end()); |
1904 | | |
1905 | 467 | for (auto cid : del_cond_id_set) { |
1906 | 467 | _is_pred_column[cid] = true; |
1907 | 467 | } |
1908 | 467 | } |
1909 | | |
1910 | 467 | _vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend()); |
1911 | 467 | _short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(), |
1912 | 467 | short_cir_pred_col_id_set.cend()); |
1913 | 467 | } |
1914 | | |
1915 | 2.82k | if (!_vec_pred_column_ids.empty()) { |
1916 | 0 | _is_need_vec_eval = true; |
1917 | 0 | } |
1918 | 2.82k | if (!_short_cir_pred_column_ids.empty()) { |
1919 | 467 | _is_need_short_eval = true; |
1920 | 467 | } |
1921 | | |
1922 | | // ColumnId to column index in block |
1923 | | // ColumnId will contail all columns in tablet schema, including virtual columns and global rowid column, |
1924 | 2.82k | _schema_block_id_map.resize(_schema->columns().size(), -1); |
1925 | | // Use cols read by query to initialize _schema_block_id_map. |
1926 | | // We need to know the index of each column in the block. |
1927 | | // There is an assumption here that the columns in the block are in the same order as in the read schema. |
1928 | | // TODO: A probelm is that, delete condition columns will exist in _schema->column_ids but not in block if |
1929 | | // delete column is not read by the query. |
1930 | 9.87k | for (int i = 0; i < _schema->num_column_ids(); i++) { |
1931 | 7.05k | auto cid = _schema->column_id(i); |
1932 | 7.05k | _schema_block_id_map[cid] = i; |
1933 | 7.05k | } |
1934 | | |
1935 | | // Step2: extract columns that can execute expr context |
1936 | 2.82k | _is_common_expr_column.resize(_schema->columns().size(), false); |
1937 | 2.82k | if (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty()) { |
1938 | 0 | for (auto expr : _remaining_conjunct_roots) { |
1939 | 0 | RETURN_IF_ERROR(_extract_common_expr_columns(expr)); |
1940 | 0 | } |
1941 | 0 | if (!_common_expr_columns.empty()) { |
1942 | 0 | _is_need_expr_eval = true; |
1943 | 0 | for (auto cid : _schema->column_ids()) { |
1944 | | // pred column also needs to be filtered by expr, exclude additional delete condition column. |
1945 | | // if delete condition column not in the block, no filter is needed |
1946 | | // and will be removed from _columns_to_filter in the first next_batch. |
1947 | 0 | if (_is_common_expr_column[cid] || _is_pred_column[cid]) { |
1948 | 0 | auto loc = _schema_block_id_map[cid]; |
1949 | 0 | _columns_to_filter.push_back(loc); |
1950 | 0 | } |
1951 | 0 | } |
1952 | |
|
1953 | 0 | for (auto pair : _vir_cid_to_idx_in_block) { |
1954 | 0 | _columns_to_filter.push_back(cast_set<ColumnId>(pair.second)); |
1955 | 0 | } |
1956 | 0 | } |
1957 | 0 | } |
1958 | | |
1959 | | // Step 3: fill non predicate columns and second read column |
1960 | | // if _schema columns size equal to pred_column_ids size, lazy_materialization_read is false, |
1961 | | // all columns are lazy materialization columns without non predicte column. |
1962 | | // If common expr pushdown exists, and expr column is not contained in lazy materialization columns, |
1963 | | // add to second read column, which will be read after lazy materialization |
1964 | 2.82k | if (_schema->column_ids().size() > pred_column_ids.size()) { |
1965 | | // pred_column_ids maybe empty, so that could not set _lazy_materialization_read = true here |
1966 | | // has to check there is at least one predicate column |
1967 | 6.99k | for (auto cid : _schema->column_ids()) { |
1968 | 6.99k | if (!_is_pred_column[cid]) { |
1969 | 6.58k | if (_is_need_vec_eval || _is_need_short_eval) { |
1970 | 862 | _lazy_materialization_read = true; |
1971 | 862 | } |
1972 | 6.58k | if (_is_common_expr_column[cid]) { |
1973 | 0 | _common_expr_column_ids.push_back(cid); |
1974 | 6.58k | } else { |
1975 | 6.58k | _non_predicate_columns.push_back(cid); |
1976 | 6.58k | } |
1977 | 6.58k | } |
1978 | 6.99k | } |
1979 | 2.76k | } |
1980 | | |
1981 | | // Step 4: fill first read columns |
1982 | 2.82k | if (_lazy_materialization_read) { |
1983 | | // insert pred cid to first_read_columns |
1984 | 410 | for (auto cid : pred_column_ids) { |
1985 | 410 | _predicate_column_ids.push_back(cid); |
1986 | 410 | } |
1987 | 2.41k | } else if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { |
1988 | 8.08k | for (int i = 0; i < _schema->num_column_ids(); i++) { |
1989 | 5.72k | auto cid = _schema->column_id(i); |
1990 | 5.72k | _predicate_column_ids.push_back(cid); |
1991 | 5.72k | } |
1992 | 2.35k | } else { |
1993 | 57 | if (_is_need_vec_eval || _is_need_short_eval) { |
1994 | | // TODO To refactor, because we suppose lazy materialization is better performance. |
1995 | | // pred exits, but we can eliminate lazy materialization |
1996 | | // insert pred/non-pred cid to first read columns |
1997 | 57 | std::set<ColumnId> pred_id_set; |
1998 | 57 | pred_id_set.insert(_short_cir_pred_column_ids.begin(), |
1999 | 57 | _short_cir_pred_column_ids.end()); |
2000 | 57 | pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end()); |
2001 | | |
2002 | 57 | DCHECK(_common_expr_column_ids.empty()); |
2003 | | // _non_predicate_column_ids must be empty. Otherwise _lazy_materialization_read must not false. |
2004 | 114 | for (int i = 0; i < _schema->num_column_ids(); i++) { |
2005 | 57 | auto cid = _schema->column_id(i); |
2006 | 57 | if (pred_id_set.find(cid) != pred_id_set.end()) { |
2007 | 57 | _predicate_column_ids.push_back(cid); |
2008 | 57 | } |
2009 | 57 | } |
2010 | 57 | } else if (_is_need_expr_eval) { |
2011 | 0 | DCHECK(!_is_need_vec_eval && !_is_need_short_eval); |
2012 | 0 | for (auto cid : _common_expr_columns) { |
2013 | 0 | _predicate_column_ids.push_back(cid); |
2014 | 0 | } |
2015 | 0 | } |
2016 | 57 | } |
2017 | | |
2018 | 2.82k | VLOG_DEBUG << fmt::format( |
2019 | 0 | "Laze materialization init end. " |
2020 | 0 | "lazy_materialization_read: {}, " |
2021 | 0 | "_col_predicates size: {}, " |
2022 | 0 | "_cols_read_by_column_predicate: [{}], " |
2023 | 0 | "_non_predicate_columns: [{}], " |
2024 | 0 | "_cols_read_by_common_expr: [{}], " |
2025 | 0 | "columns_to_filter: [{}], " |
2026 | 0 | "_schema_block_id_map: [{}]", |
2027 | 0 | _lazy_materialization_read, _col_predicates.size(), |
2028 | 0 | fmt::join(_predicate_column_ids, ","), fmt::join(_non_predicate_columns, ","), |
2029 | 0 | fmt::join(_common_expr_column_ids, ","), fmt::join(_columns_to_filter, ","), |
2030 | 0 | fmt::join(_schema_block_id_map, ",")); |
2031 | 2.82k | return Status::OK(); |
2032 | 2.82k | } |
2033 | | |
2034 | 0 | bool SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate> predicate) { |
2035 | 0 | auto cid = predicate->column_id(); |
2036 | 0 | FieldType field_type = _schema->column(cid)->type(); |
2037 | 0 | if (field_type == FieldType::OLAP_FIELD_TYPE_VARIANT) { |
2038 | | // Use variant cast dst type |
2039 | 0 | field_type = _opts.target_cast_type_for_variants[_schema->column(cid)->name()] |
2040 | 0 | ->get_storage_field_type(); |
2041 | 0 | } |
2042 | 0 | switch (predicate->type()) { |
2043 | 0 | case PredicateType::EQ: |
2044 | 0 | case PredicateType::NE: |
2045 | 0 | case PredicateType::LE: |
2046 | 0 | case PredicateType::LT: |
2047 | 0 | case PredicateType::GE: |
2048 | 0 | case PredicateType::GT: { |
2049 | 0 | if (field_type == FieldType::OLAP_FIELD_TYPE_VARCHAR || |
2050 | 0 | field_type == FieldType::OLAP_FIELD_TYPE_CHAR || |
2051 | 0 | field_type == FieldType::OLAP_FIELD_TYPE_STRING) { |
2052 | 0 | return config::enable_low_cardinality_optimize && |
2053 | 0 | _opts.io_ctx.reader_type == ReaderType::READER_QUERY && |
2054 | 0 | _column_iterators[cid]->is_all_dict_encoding(); |
2055 | 0 | } else if (field_type == FieldType::OLAP_FIELD_TYPE_DECIMAL) { |
2056 | 0 | return false; |
2057 | 0 | } |
2058 | 0 | return true; |
2059 | 0 | } |
2060 | 0 | default: |
2061 | 0 | return false; |
2062 | 0 | } |
2063 | 0 | } |
2064 | | |
2065 | 7.06k | bool SegmentIterator::_has_char_type(const StorageField& column_desc) { |
2066 | 7.06k | switch (column_desc.type()) { |
2067 | 0 | case FieldType::OLAP_FIELD_TYPE_CHAR: |
2068 | 0 | return true; |
2069 | 2 | case FieldType::OLAP_FIELD_TYPE_ARRAY: |
2070 | 2 | return _has_char_type(*column_desc.get_sub_field(0)); |
2071 | 2 | case FieldType::OLAP_FIELD_TYPE_MAP: |
2072 | 2 | return _has_char_type(*column_desc.get_sub_field(0)) || |
2073 | 2 | _has_char_type(*column_desc.get_sub_field(1)); |
2074 | 0 | case FieldType::OLAP_FIELD_TYPE_STRUCT: |
2075 | 0 | for (int idx = 0; idx < column_desc.get_sub_field_count(); ++idx) { |
2076 | 0 | if (_has_char_type(*column_desc.get_sub_field(idx))) { |
2077 | 0 | return true; |
2078 | 0 | } |
2079 | 0 | } |
2080 | 0 | return false; |
2081 | 7.05k | default: |
2082 | 7.05k | return false; |
2083 | 7.06k | } |
2084 | 7.06k | }; |
2085 | | |
2086 | 2.82k | void SegmentIterator::_vec_init_char_column_id(Block* block) { |
2087 | 2.82k | if (!_char_type_idx.empty()) { |
2088 | 0 | return; |
2089 | 0 | } |
2090 | 2.82k | _is_char_type.resize(_schema->columns().size(), false); |
2091 | 9.87k | for (size_t i = 0; i < _schema->num_column_ids(); i++) { |
2092 | 7.05k | auto cid = _schema->column_id(i); |
2093 | 7.05k | const StorageField* column_desc = _schema->column(cid); |
2094 | | |
2095 | | // The additional deleted filter condition will be in the materialized column at the end of the block. |
2096 | | // After _output_column_by_sel_idx, it will be erased, so we do not need to shrink it. |
2097 | 7.05k | if (i < block->columns()) { |
2098 | 7.05k | if (_has_char_type(*column_desc)) { |
2099 | 0 | _char_type_idx.emplace_back(i); |
2100 | 0 | } |
2101 | 7.05k | } |
2102 | | |
2103 | 7.05k | if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) { |
2104 | 0 | _is_char_type[cid] = true; |
2105 | 0 | } |
2106 | 7.05k | } |
2107 | 2.82k | } |
2108 | | |
2109 | | bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, bool fill_defaults, |
2110 | 27.1k | size_t num_of_defaults) { |
2111 | 27.1k | if (_need_read_data(cid)) { |
2112 | 27.1k | return false; |
2113 | 27.1k | } |
2114 | 0 | if (!fill_defaults) { |
2115 | 0 | return true; |
2116 | 0 | } |
2117 | 0 | if (column->is_nullable()) { |
2118 | 0 | auto nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get()); |
2119 | 0 | nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults); |
2120 | 0 | nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults); |
2121 | 0 | } else { |
2122 | | // assert(column->is_const()); |
2123 | 0 | column->insert_many_defaults(num_of_defaults); |
2124 | 0 | } |
2125 | 0 | return true; |
2126 | 0 | } |
2127 | | |
2128 | | Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids, |
2129 | 0 | MutableColumns& column_block, size_t nrows) { |
2130 | 0 | for (auto cid : column_ids) { |
2131 | 0 | auto& column = column_block[cid]; |
2132 | 0 | size_t rows_read = nrows; |
2133 | 0 | if (_prune_column(cid, column, true, rows_read)) { |
2134 | 0 | continue; |
2135 | 0 | } |
2136 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); |
2137 | 0 | if (nrows != rows_read) { |
2138 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows, |
2139 | 0 | rows_read); |
2140 | 0 | } |
2141 | 0 | } |
2142 | 0 | return Status::OK(); |
2143 | 0 | } |
2144 | | |
2145 | | Status SegmentIterator::_init_current_block(Block* block, |
2146 | | std::vector<MutableColumnPtr>& current_columns, |
2147 | 12.9k | uint32_t nrows_read_limit) { |
2148 | 12.9k | block->clear_column_data(_schema->num_column_ids()); |
2149 | | |
2150 | 40.9k | for (size_t i = 0; i < _schema->num_column_ids(); i++) { |
2151 | 28.0k | auto cid = _schema->column_id(i); |
2152 | 28.0k | const auto* column_desc = _schema->column(cid); |
2153 | | |
2154 | 28.0k | auto file_column_type = _storage_name_and_type[cid].second; |
2155 | 28.0k | auto expected_type = Schema::get_data_type_ptr(*column_desc); |
2156 | 28.0k | if (!_is_pred_column[cid] && !file_column_type->equals(*expected_type)) { |
2157 | | // The storage layer type is different from schema needed type, so we use storage |
2158 | | // type to read columns instead of schema type for safety |
2159 | 0 | VLOG_DEBUG << fmt::format( |
2160 | 0 | "Recreate column with expected type {}, file column type {}, col_name {}, " |
2161 | 0 | "col_path {}", |
2162 | 0 | block->get_by_position(i).type->get_name(), file_column_type->get_name(), |
2163 | 0 | column_desc->name(), |
2164 | 0 | column_desc->path() == nullptr ? "" : column_desc->path()->get_path()); |
2165 | | // TODO reuse |
2166 | 0 | current_columns[cid] = file_column_type->create_column(); |
2167 | 0 | current_columns[cid]->reserve(nrows_read_limit); |
2168 | 28.0k | } else { |
2169 | | // the column in block must clear() here to insert new data |
2170 | 28.0k | if (_is_pred_column[cid] || |
2171 | 28.0k | i >= block->columns()) { //todo(wb) maybe we can release it after output block |
2172 | 2.16k | if (current_columns[cid].get() == nullptr) { |
2173 | 0 | return Status::InternalError( |
2174 | 0 | "SegmentIterator meet invalid column, id={}, name={}", cid, |
2175 | 0 | _schema->column(cid)->name()); |
2176 | 0 | } |
2177 | 2.16k | current_columns[cid]->clear(); |
2178 | 25.9k | } else { // non-predicate column |
2179 | 25.9k | current_columns[cid] = std::move(*block->get_by_position(i).column).mutate(); |
2180 | 25.9k | current_columns[cid]->reserve(nrows_read_limit); |
2181 | 25.9k | } |
2182 | 28.0k | } |
2183 | 28.0k | } |
2184 | | |
2185 | 12.9k | for (auto entry : _virtual_column_exprs) { |
2186 | 0 | auto cid = entry.first; |
2187 | 0 | current_columns[cid] = ColumnNothing::create(0); |
2188 | 0 | current_columns[cid]->reserve(nrows_read_limit); |
2189 | 0 | } |
2190 | | |
2191 | 12.9k | return Status::OK(); |
2192 | 12.9k | } |
2193 | | |
2194 | 10.1k | Status SegmentIterator::_output_non_pred_columns(Block* block) { |
2195 | 10.1k | SCOPED_RAW_TIMER(&_opts.stats->output_col_ns); |
2196 | 10.1k | VLOG_DEBUG << fmt::format( |
2197 | 0 | "Output non-predicate columns, _non_predicate_columns: [{}], " |
2198 | 0 | "_schema_block_id_map: [{}]", |
2199 | 0 | fmt::join(_non_predicate_columns, ","), fmt::join(_schema_block_id_map, ",")); |
2200 | 10.1k | RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns)); |
2201 | 19.3k | for (auto cid : _non_predicate_columns) { |
2202 | 19.3k | auto loc = _schema_block_id_map[cid]; |
2203 | | // Whether a delete predicate column gets output depends on how the caller builds |
2204 | | // the block passed to next_batch(). Both calling paths now build the block with |
2205 | | // only the output schema (return_columns), so delete predicate columns are skipped: |
2206 | | // |
2207 | | // 1) VMergeIterator path: block_reset() builds _block using the output schema |
2208 | | // (return_columns only), e.g. block has 2 columns {c1, c2}. |
2209 | | // Here loc=2 for delete predicate c3, block->columns()=2, so loc < block->columns() |
2210 | | // is false, and c3 is skipped. |
2211 | | // |
2212 | | // 2) VUnionIterator path: the caller's block is built with only return_columns |
2213 | | // (output schema), e.g. block has 2 columns {c1, c2}. |
2214 | | // Here loc=2 for c3, block->columns()=2, so loc < block->columns() is false, |
2215 | | // and c3 is skipped — same behavior as the VMergeIterator path. |
2216 | 19.3k | if (loc < block->columns()) { |
2217 | 19.3k | bool column_in_block_is_nothing = check_and_get_column<const ColumnNothing>( |
2218 | 19.3k | block->get_by_position(loc).column.get()); |
2219 | 19.3k | bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid); |
2220 | 19.3k | bool return_column_is_nothing = |
2221 | 19.3k | check_and_get_column<const ColumnNothing>(_current_return_columns[cid].get()); |
2222 | 19.3k | VLOG_DEBUG << fmt::format( |
2223 | 0 | "Cid {} loc {}, column_in_block_is_nothing {}, column_is_normal {}, " |
2224 | 0 | "return_column_is_nothing {}", |
2225 | 0 | cid, loc, column_in_block_is_nothing, column_is_normal, |
2226 | 0 | return_column_is_nothing); |
2227 | | |
2228 | 19.3k | if (column_in_block_is_nothing || column_is_normal) { |
2229 | 19.3k | block->replace_by_position(loc, std::move(_current_return_columns[cid])); |
2230 | 19.3k | VLOG_DEBUG << fmt::format( |
2231 | 0 | "Output non-predicate column, cid: {}, loc: {}, col_name: {}, rows {}", cid, |
2232 | 0 | loc, _schema->column(cid)->name(), |
2233 | 0 | block->get_by_position(loc).column->size()); |
2234 | 19.3k | } |
2235 | | // Means virtual column in block has been materialized(maybe by common expr). |
2236 | | // so do nothing here. |
2237 | 19.3k | } |
2238 | 19.3k | } |
2239 | 10.1k | return Status::OK(); |
2240 | 10.1k | } |
2241 | | |
2242 | | /** |
2243 | | * Reads columns by their index, handling both continuous and discontinuous rowid scenarios. |
2244 | | * |
2245 | | * This function is designed to read a specified number of rows (up to nrows_read_limit) |
2246 | | * from the segment iterator, dealing with both continuous and discontinuous rowid arrays. |
2247 | | * It operates as follows: |
2248 | | * |
2249 | | * 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous. |
2250 | | * Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...). |
2251 | | * |
2252 | | * 2. For each column that needs to be read (identified by _predicate_column_ids): |
2253 | | * - If the rowids are continuous, the function uses seek_to_ordinal and next_batch |
2254 | | * for efficient reading. |
2255 | | * - If the rowids are not continuous, the function processes them in smaller batches |
2256 | | * (each of size up to 256). Each batch is checked for internal continuity: |
2257 | | * a. If a batch is continuous, uses seek_to_ordinal and next_batch for that batch. |
2258 | | * b. If a batch is not continuous, uses read_by_rowids for individual rowids in the batch. |
2259 | | * |
2260 | | * This approach optimizes reading performance by leveraging batch processing for continuous |
2261 | | * rowid sequences and handling discontinuities gracefully in smaller chunks. |
2262 | | */ |
2263 | 12.9k | Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read) { |
2264 | 12.9k | SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns); |
2265 | | |
2266 | 12.9k | nrows_read = (uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit); |
2267 | 12.9k | bool is_continuous = (nrows_read > 1) && |
2268 | 12.9k | (_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1); |
2269 | 12.9k | VLOG_DEBUG << fmt::format( |
2270 | 0 | "nrows_read from range iterator: {}, is_continus {}, _cols_read_by_column_predicate " |
2271 | 0 | "[{}]", |
2272 | 0 | nrows_read, is_continuous, fmt::join(_predicate_column_ids, ",")); |
2273 | | |
2274 | 12.9k | LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format( |
2275 | 0 | "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, continuous: {}, " |
2276 | 0 | "rowids: [{}...{}]", |
2277 | 0 | nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0, |
2278 | 0 | nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0); |
2279 | 25.0k | for (auto cid : _predicate_column_ids) { |
2280 | 25.0k | auto& column = _current_return_columns[cid]; |
2281 | 25.0k | VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid, |
2282 | 0 | _schema->column(cid)->name()); |
2283 | 25.0k | if (!_virtual_column_exprs.contains(cid)) { |
2284 | 25.0k | if (_no_need_read_key_data(cid, column, nrows_read)) { |
2285 | 0 | VLOG_DEBUG << fmt::format("Column {} no need to read.", cid); |
2286 | 0 | continue; |
2287 | 0 | } |
2288 | 25.0k | if (_prune_column(cid, column, true, nrows_read)) { |
2289 | 0 | VLOG_DEBUG << fmt::format("Column {} is pruned. No need to read data.", cid); |
2290 | 0 | continue; |
2291 | 0 | } |
2292 | 25.0k | DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", { |
2293 | 25.0k | auto col_name = _opts.tablet_schema->column(cid).name(); |
2294 | 25.0k | auto debug_col_name = |
2295 | 25.0k | DebugPoints::instance()->get_debug_param_or_default<std::string>( |
2296 | 25.0k | "segment_iterator._read_columns_by_index", "column_name", ""); |
2297 | 25.0k | if (debug_col_name.empty() && col_name != "__DORIS_DELETE_SIGN__") { |
2298 | 25.0k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2299 | 25.0k | "does not need to read data, {}", col_name); |
2300 | 25.0k | } |
2301 | 25.0k | if (debug_col_name.find(col_name) != std::string::npos) { |
2302 | 25.0k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2303 | 25.0k | "does not need to read data, {}", col_name); |
2304 | 25.0k | } |
2305 | 25.0k | }) |
2306 | 25.0k | } |
2307 | | |
2308 | 25.0k | if (is_continuous) { |
2309 | 18.5k | size_t rows_read = nrows_read; |
2310 | 18.5k | _opts.stats->predicate_column_read_seek_num += 1; |
2311 | 18.5k | if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { |
2312 | 0 | SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns); |
2313 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0])); |
2314 | 18.5k | } else { |
2315 | 18.5k | RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0])); |
2316 | 18.5k | } |
2317 | 18.5k | RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); |
2318 | 18.5k | if (rows_read != nrows_read) { |
2319 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", |
2320 | 0 | nrows_read, rows_read); |
2321 | 0 | } |
2322 | 18.5k | } else { |
2323 | 6.50k | const uint32_t batch_size = _range_iter->get_batch_size(); |
2324 | 6.50k | uint32_t processed = 0; |
2325 | 7.69k | while (processed < nrows_read) { |
2326 | 1.19k | uint32_t current_batch_size = std::min(batch_size, nrows_read - processed); |
2327 | 1.19k | bool batch_continuous = (current_batch_size > 1) && |
2328 | 1.19k | (_block_rowids[processed + current_batch_size - 1] - |
2329 | 1.17k | _block_rowids[processed] == |
2330 | 1.17k | current_batch_size - 1); |
2331 | | |
2332 | 1.19k | if (batch_continuous) { |
2333 | 0 | size_t rows_read = current_batch_size; |
2334 | 0 | _opts.stats->predicate_column_read_seek_num += 1; |
2335 | 0 | if (_opts.runtime_state && _opts.runtime_state->enable_profile()) { |
2336 | 0 | SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns); |
2337 | 0 | RETURN_IF_ERROR( |
2338 | 0 | _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed])); |
2339 | 0 | } else { |
2340 | 0 | RETURN_IF_ERROR( |
2341 | 0 | _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed])); |
2342 | 0 | } |
2343 | 0 | RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column)); |
2344 | 0 | if (rows_read != current_batch_size) { |
2345 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2346 | 0 | "batch nrows({}) != rows_read({})", current_batch_size, rows_read); |
2347 | 0 | } |
2348 | 1.19k | } else { |
2349 | 1.19k | RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids( |
2350 | 1.19k | &_block_rowids[processed], current_batch_size, column)); |
2351 | 1.19k | } |
2352 | 1.19k | processed += current_batch_size; |
2353 | 1.19k | } |
2354 | 6.50k | } |
2355 | 25.0k | } |
2356 | | |
2357 | 12.9k | return Status::OK(); |
2358 | 12.9k | } |
2359 | | void SegmentIterator::_replace_version_col_if_needed(const std::vector<ColumnId>& column_ids, |
2360 | 14.3k | size_t num_rows) { |
2361 | | // Only the rowset with single version need to replace the version column. |
2362 | | // Doris can't determine the version before publish_version finished, so |
2363 | | // we can't write data to __DORIS_VERSION_COL__ in segment writer, the value |
2364 | | // is 0 by default. |
2365 | | // So we need to replace the value to real version while reading. |
2366 | 14.3k | if (_opts.version.first != _opts.version.second) { |
2367 | 6.49k | return; |
2368 | 6.49k | } |
2369 | 7.81k | int32_t version_idx = _schema->version_col_idx(); |
2370 | 7.81k | if (std::ranges::find(column_ids, version_idx) == column_ids.end()) { |
2371 | 7.81k | return; |
2372 | 7.81k | } |
2373 | | |
2374 | 0 | const auto* column_desc = _schema->column(version_idx); |
2375 | 0 | auto column = Schema::get_data_type_ptr(*column_desc)->create_column(); |
2376 | 0 | DCHECK(_schema->column(version_idx)->type() == FieldType::OLAP_FIELD_TYPE_BIGINT); |
2377 | 0 | auto* col_ptr = assert_cast<ColumnInt64*>(column.get()); |
2378 | 0 | for (size_t j = 0; j < num_rows; j++) { |
2379 | 0 | col_ptr->insert_value(_opts.version.second); |
2380 | 0 | } |
2381 | 0 | _current_return_columns[version_idx] = std::move(column); |
2382 | 0 | VLOG_DEBUG << "replaced version column in segment iterator, version_col_idx:" << version_idx; |
2383 | 0 | } |
2384 | | |
2385 | | uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, |
2386 | 1.69k | uint16_t selected_size) { |
2387 | 1.69k | SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns); |
2388 | 1.69k | bool all_pred_always_true = true; |
2389 | 1.69k | for (const auto& pred : _pre_eval_block_predicate) { |
2390 | 0 | if (!pred->always_true()) { |
2391 | 0 | all_pred_always_true = false; |
2392 | 0 | } else { |
2393 | 0 | pred->update_filter_info(0, 0, selected_size); |
2394 | 0 | } |
2395 | 0 | } |
2396 | | |
2397 | 1.69k | const uint16_t original_size = selected_size; |
2398 | | //If all predicates are always_true, then return directly. |
2399 | 1.69k | if (all_pred_always_true || !_is_need_vec_eval) { |
2400 | 3.90M | for (uint16_t i = 0; i < original_size; ++i) { |
2401 | 3.90M | sel_rowid_idx[i] = i; |
2402 | 3.90M | } |
2403 | | // All preds are always_true, so return immediately and update the profile statistics here. |
2404 | 1.69k | _opts.stats->vec_cond_input_rows += original_size; |
2405 | 1.69k | return original_size; |
2406 | 1.69k | } |
2407 | | |
2408 | 0 | _ret_flags.resize(original_size); |
2409 | 0 | DCHECK(!_pre_eval_block_predicate.empty()); |
2410 | 0 | bool is_first = true; |
2411 | 0 | for (auto& pred : _pre_eval_block_predicate) { |
2412 | 0 | if (pred->always_true()) { |
2413 | 0 | continue; |
2414 | 0 | } |
2415 | 0 | auto column_id = pred->column_id(); |
2416 | 0 | auto& column = _current_return_columns[column_id]; |
2417 | 0 | if (is_first) { |
2418 | 0 | pred->evaluate_vec(*column, original_size, (bool*)_ret_flags.data()); |
2419 | 0 | is_first = false; |
2420 | 0 | } else { |
2421 | 0 | pred->evaluate_and_vec(*column, original_size, (bool*)_ret_flags.data()); |
2422 | 0 | } |
2423 | 0 | } |
2424 | |
|
2425 | 0 | uint16_t new_size = 0; |
2426 | |
|
2427 | 0 | uint16_t sel_pos = 0; |
2428 | 0 | const uint16_t sel_end = sel_pos + selected_size; |
2429 | 0 | static constexpr size_t SIMD_BYTES = simd::bits_mask_length(); |
2430 | 0 | const uint16_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES; |
2431 | |
|
2432 | 0 | while (sel_pos < sel_end_simd) { |
2433 | 0 | auto mask = simd::bytes_mask_to_bits_mask(_ret_flags.data() + sel_pos); |
2434 | 0 | if (0 == mask) { |
2435 | | //pass |
2436 | 0 | } else if (simd::bits_mask_all() == mask) { |
2437 | 0 | for (uint16_t i = 0; i < SIMD_BYTES; i++) { |
2438 | 0 | sel_rowid_idx[new_size++] = sel_pos + i; |
2439 | 0 | } |
2440 | 0 | } else { |
2441 | 0 | simd::iterate_through_bits_mask( |
2442 | 0 | [&](const int bit_pos) { |
2443 | 0 | sel_rowid_idx[new_size++] = sel_pos + (uint16_t)bit_pos; |
2444 | 0 | }, |
2445 | 0 | mask); |
2446 | 0 | } |
2447 | 0 | sel_pos += SIMD_BYTES; |
2448 | 0 | } |
2449 | |
|
2450 | 0 | for (; sel_pos < sel_end; sel_pos++) { |
2451 | 0 | if (_ret_flags[sel_pos]) { |
2452 | 0 | sel_rowid_idx[new_size++] = sel_pos; |
2453 | 0 | } |
2454 | 0 | } |
2455 | |
|
2456 | 0 | _opts.stats->vec_cond_input_rows += original_size; |
2457 | 0 | _opts.stats->rows_vec_cond_filtered += original_size - new_size; |
2458 | 0 | return new_size; |
2459 | 1.69k | } |
2460 | | |
2461 | | uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx, |
2462 | 1.69k | uint16_t selected_size) { |
2463 | 1.69k | SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns); |
2464 | 1.69k | if (!_is_need_short_eval) { |
2465 | 0 | return selected_size; |
2466 | 0 | } |
2467 | | |
2468 | 1.69k | uint16_t original_size = selected_size; |
2469 | 1.69k | for (auto predicate : _short_cir_eval_predicate) { |
2470 | 0 | auto column_id = predicate->column_id(); |
2471 | 0 | auto& short_cir_column = _current_return_columns[column_id]; |
2472 | 0 | selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size); |
2473 | 0 | } |
2474 | | |
2475 | 1.69k | _opts.stats->short_circuit_cond_input_rows += original_size; |
2476 | 1.69k | _opts.stats->rows_short_circuit_cond_filtered += original_size - selected_size; |
2477 | | |
2478 | | // evaluate delete condition |
2479 | 1.69k | original_size = selected_size; |
2480 | 1.69k | selected_size = _opts.delete_condition_predicates->evaluate(_current_return_columns, |
2481 | 1.69k | vec_sel_rowid_idx, selected_size); |
2482 | 1.69k | _opts.stats->rows_vec_del_cond_filtered += original_size - selected_size; |
2483 | 1.69k | return selected_size; |
2484 | 1.69k | } |
2485 | | |
2486 | 1 | static void shrink_materialized_block_columns(Block* block, size_t rows) { |
2487 | 2 | for (auto& entry : *block) { |
2488 | 2 | if (entry.column && entry.column->size() > rows) { |
2489 | 1 | entry.column = entry.column->shrink(rows); |
2490 | 1 | } |
2491 | 2 | } |
2492 | 1 | } |
2493 | | |
2494 | | static void slice_materialized_block_columns(Block* block, size_t offset, size_t rows, |
2495 | 1 | size_t original_rows) { |
2496 | 1 | for (auto& entry : *block) { |
2497 | 1 | if (!entry.column || entry.column->size() == 0) { |
2498 | 0 | continue; |
2499 | 0 | } |
2500 | 1 | DORIS_CHECK(entry.column->size() == original_rows); |
2501 | 1 | entry.column = entry.column->cut(offset, rows); |
2502 | 1 | } |
2503 | 1 | } |
2504 | | |
2505 | 1.69k | Status SegmentIterator::_apply_read_limit_to_selected_rows(Block* block, uint16_t& selected_size) { |
2506 | 1.69k | if (_opts.read_limit == 0) { |
2507 | 1.69k | return Status::OK(); |
2508 | 1.69k | } |
2509 | 2 | DORIS_CHECK(_rows_returned <= _opts.read_limit); |
2510 | 2 | size_t remaining = _opts.read_limit - _rows_returned; |
2511 | 2 | if (remaining == 0) { |
2512 | 0 | selected_size = 0; |
2513 | 0 | shrink_materialized_block_columns(block, 0); |
2514 | 0 | return Status::OK(); |
2515 | 0 | } |
2516 | 2 | if (selected_size > remaining) { |
2517 | 2 | if (_opts.read_orderby_key_reverse) { |
2518 | 1 | const auto original_size = selected_size; |
2519 | 1 | const auto offset = original_size - remaining; |
2520 | 21 | for (size_t i = 0; i < remaining; ++i) { |
2521 | 20 | _sel_rowid_idx[i] = _sel_rowid_idx[offset + i]; |
2522 | 20 | } |
2523 | 1 | selected_size = cast_set<uint16_t>(remaining); |
2524 | 1 | slice_materialized_block_columns(block, offset, remaining, original_size); |
2525 | 1 | return Status::OK(); |
2526 | 1 | } |
2527 | 1 | selected_size = cast_set<uint16_t>(remaining); |
2528 | 1 | shrink_materialized_block_columns(block, selected_size); |
2529 | 1 | } |
2530 | 1 | return Status::OK(); |
2531 | 2 | } |
2532 | | |
2533 | | Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids, |
2534 | | std::vector<rowid_t>& rowid_vector, |
2535 | | uint16_t* sel_rowid_idx, size_t select_size, |
2536 | | MutableColumns* mutable_columns, |
2537 | 1.38k | bool init_condition_cache) { |
2538 | 1.38k | SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns); |
2539 | 1.38k | std::vector<rowid_t> rowids(select_size); |
2540 | | |
2541 | 1.38k | if (init_condition_cache) { |
2542 | 0 | DCHECK(_condition_cache); |
2543 | 0 | auto& condition_cache = *_condition_cache; |
2544 | 0 | for (size_t i = 0; i < select_size; ++i) { |
2545 | 0 | rowids[i] = rowid_vector[sel_rowid_idx[i]]; |
2546 | 0 | condition_cache[rowids[i] / SegmentIterator::CONDITION_CACHE_OFFSET] = true; |
2547 | 0 | } |
2548 | 1.38k | } else { |
2549 | 2.74M | for (size_t i = 0; i < select_size; ++i) { |
2550 | 2.74M | rowids[i] = rowid_vector[sel_rowid_idx[i]]; |
2551 | 2.74M | } |
2552 | 1.38k | } |
2553 | | |
2554 | 2.10k | for (auto cid : read_column_ids) { |
2555 | 2.10k | auto& colunm = (*mutable_columns)[cid]; |
2556 | 2.10k | if (_no_need_read_key_data(cid, colunm, select_size)) { |
2557 | 0 | continue; |
2558 | 0 | } |
2559 | 2.10k | if (_prune_column(cid, colunm, true, select_size)) { |
2560 | 0 | continue; |
2561 | 0 | } |
2562 | | |
2563 | 2.10k | DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", { |
2564 | 2.10k | auto debug_col_name = DebugPoints::instance()->get_debug_param_or_default<std::string>( |
2565 | 2.10k | "segment_iterator._read_columns_by_index", "column_name", ""); |
2566 | 2.10k | if (debug_col_name.empty()) { |
2567 | 2.10k | return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data"); |
2568 | 2.10k | } |
2569 | 2.10k | auto col_name = _opts.tablet_schema->column(cid).name(); |
2570 | 2.10k | if (debug_col_name.find(col_name) != std::string::npos) { |
2571 | 2.10k | return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data, {}", |
2572 | 2.10k | debug_col_name); |
2573 | 2.10k | } |
2574 | 2.10k | }) |
2575 | | |
2576 | 2.10k | if (_current_return_columns[cid].get() == nullptr) { |
2577 | 0 | return Status::InternalError( |
2578 | 0 | "SegmentIterator meet invalid column, return columns size {}, cid {}", |
2579 | 0 | _current_return_columns.size(), cid); |
2580 | 0 | } |
2581 | 2.10k | RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size, |
2582 | 2.10k | _current_return_columns[cid])); |
2583 | 2.10k | } |
2584 | | |
2585 | 1.38k | return Status::OK(); |
2586 | 1.38k | } |
2587 | | |
2588 | 12.9k | Status SegmentIterator::next_batch(Block* block) { |
2589 | | // Replace virtual columns with ColumnNothing at the begining of each next_batch call. |
2590 | 12.9k | _init_virtual_columns(block); |
2591 | 12.9k | auto status = [&]() { |
2592 | 12.9k | RETURN_IF_CATCH_EXCEPTION({ |
2593 | | // Adaptive batch size: predict how many rows this batch should read. |
2594 | 12.9k | if (_block_size_predictor) { |
2595 | 12.9k | auto predicted = static_cast<uint32_t>(_block_size_predictor->predict_next_rows()); |
2596 | 12.9k | _opts.block_row_max = std::min(predicted, _initial_block_row_max); |
2597 | 12.9k | _opts.stats->adaptive_batch_size_predict_min_rows = |
2598 | 12.9k | std::min(_opts.stats->adaptive_batch_size_predict_min_rows, |
2599 | 12.9k | static_cast<int64_t>(predicted)); |
2600 | 12.9k | _opts.stats->adaptive_batch_size_predict_max_rows = |
2601 | 12.9k | std::max(_opts.stats->adaptive_batch_size_predict_max_rows, |
2602 | 12.9k | static_cast<int64_t>(predicted)); |
2603 | 12.9k | } else { |
2604 | | // No predictor — record the fixed batch size using min/max so we don't |
2605 | | // clobber values already accumulated by other segment iterators that |
2606 | | // share the same OlapReaderStatistics. |
2607 | 12.9k | _opts.stats->adaptive_batch_size_predict_min_rows = |
2608 | 12.9k | std::min(_opts.stats->adaptive_batch_size_predict_min_rows, |
2609 | 12.9k | static_cast<int64_t>(_opts.block_row_max)); |
2610 | 12.9k | _opts.stats->adaptive_batch_size_predict_max_rows = |
2611 | 12.9k | std::max(_opts.stats->adaptive_batch_size_predict_max_rows, |
2612 | 12.9k | static_cast<int64_t>(_opts.block_row_max)); |
2613 | 12.9k | } |
2614 | | |
2615 | 12.9k | auto res = _next_batch_internal(block); |
2616 | | |
2617 | 12.9k | if (res.is<END_OF_FILE>()) { |
2618 | | // Since we have a type check at the caller. |
2619 | | // So a replacement of nothing column with real column is needed. |
2620 | 12.9k | const auto& idx_to_datatype = _opts.vir_col_idx_to_type; |
2621 | 12.9k | for (const auto& pair : _vir_cid_to_idx_in_block) { |
2622 | 12.9k | size_t idx = pair.second; |
2623 | 12.9k | auto type = idx_to_datatype.find(idx)->second; |
2624 | 12.9k | block->replace_by_position(idx, type->create_column()); |
2625 | 12.9k | } |
2626 | | |
2627 | 12.9k | if (_opts.condition_cache_digest && !_find_condition_cache) { |
2628 | 12.9k | auto* condition_cache = ConditionCache::instance(); |
2629 | 12.9k | ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(), |
2630 | 12.9k | _opts.condition_cache_digest); |
2631 | 12.9k | VLOG_DEBUG << "Condition cache insert, query id: " |
2632 | 12.9k | << print_id(_opts.runtime_state->query_id()) |
2633 | 12.9k | << ", rowset id: " << _opts.rowset_id.to_string() |
2634 | 12.9k | << ", segment id: " << _segment->id() |
2635 | 12.9k | << ", cache digest: " << _opts.condition_cache_digest; |
2636 | 12.9k | condition_cache->insert(cache_key, std::move(_condition_cache)); |
2637 | 12.9k | } |
2638 | 12.9k | return res; |
2639 | 12.9k | } |
2640 | | |
2641 | 12.9k | RETURN_IF_ERROR(res); |
2642 | | // reverse block row order if read_orderby_key_reverse is true for key topn |
2643 | | // it should be processed for all success _next_batch_internal |
2644 | 12.9k | if (_opts.read_orderby_key_reverse) { |
2645 | 12.9k | size_t num_rows = block->rows(); |
2646 | 12.9k | if (num_rows == 0) { |
2647 | 12.9k | return Status::OK(); |
2648 | 12.9k | } |
2649 | 12.9k | size_t num_columns = block->columns(); |
2650 | 12.9k | IColumn::Permutation permutation; |
2651 | 12.9k | for (size_t i = 0; i < num_rows; ++i) permutation.emplace_back(num_rows - 1 - i); |
2652 | | |
2653 | 12.9k | for (size_t i = 0; i < num_columns; ++i) |
2654 | 12.9k | block->get_by_position(i).column = |
2655 | 12.9k | block->get_by_position(i).column->permute(permutation, num_rows); |
2656 | 12.9k | } |
2657 | | |
2658 | 12.9k | RETURN_IF_ERROR(block->check_type_and_column()); |
2659 | | |
2660 | | // Adaptive batch size: update EWMA estimate from the completed batch. |
2661 | | // block->bytes() is accurate here: predicates have been applied and non-predicate |
2662 | | // columns have been filled for surviving rows by _next_batch_internal. |
2663 | 12.9k | if (_block_size_predictor && block->rows() > 0) { |
2664 | 12.9k | _block_size_predictor->update(*block); |
2665 | 12.9k | } |
2666 | | |
2667 | 12.9k | return Status::OK(); |
2668 | 12.9k | }); |
2669 | 12.9k | }(); |
2670 | | |
2671 | | // if rows read by batch is 0, will return end of file, we should not remove segment cache in this situation. |
2672 | 12.9k | if (!status.ok() && !status.is<END_OF_FILE>()) { |
2673 | 0 | _segment->update_healthy_status(status); |
2674 | 0 | } |
2675 | 12.9k | return status; |
2676 | 12.9k | } |
2677 | | |
2678 | 12.9k | Status SegmentIterator::_convert_to_expected_type(const std::vector<ColumnId>& col_ids) { |
2679 | 26.3k | for (ColumnId i : col_ids) { |
2680 | 26.3k | if (!_current_return_columns[i] || _converted_column_ids[i] || _is_pred_column[i]) { |
2681 | 467 | continue; |
2682 | 467 | } |
2683 | 25.9k | const StorageField* field_type = _schema->column(i); |
2684 | 25.9k | DataTypePtr expected_type = Schema::get_data_type_ptr(*field_type); |
2685 | 25.9k | DataTypePtr file_column_type = _storage_name_and_type[i].second; |
2686 | 25.9k | if (!file_column_type->equals(*expected_type)) { |
2687 | 0 | ColumnPtr expected; |
2688 | 0 | ColumnPtr original = _current_return_columns[i]->assume_mutable()->get_ptr(); |
2689 | 0 | RETURN_IF_ERROR(variant_util::cast_column({original, file_column_type, ""}, |
2690 | 0 | expected_type, &expected)); |
2691 | 0 | _current_return_columns[i] = expected->assume_mutable(); |
2692 | 0 | _converted_column_ids[i] = true; |
2693 | 0 | VLOG_DEBUG << fmt::format( |
2694 | 0 | "Convert {} fom file column type {} to {}, num_rows {}", |
2695 | 0 | field_type->path() == nullptr ? "" : field_type->path()->get_path(), |
2696 | 0 | file_column_type->get_name(), expected_type->get_name(), |
2697 | 0 | _current_return_columns[i]->size()); |
2698 | 0 | } |
2699 | 25.9k | } |
2700 | 12.9k | return Status::OK(); |
2701 | 12.9k | } |
2702 | | |
2703 | | Status SegmentIterator::copy_column_data_by_selector(IColumn* input_col_ptr, |
2704 | | MutableColumnPtr& output_col, |
2705 | | uint16_t* sel_rowid_idx, uint16_t select_size, |
2706 | 1.39k | size_t batch_size) { |
2707 | 1.39k | if (output_col->is_nullable() != input_col_ptr->is_nullable()) { |
2708 | 0 | LOG(WARNING) << "nullable mismatch for output_column: " << output_col->dump_structure() |
2709 | 0 | << " input_column: " << input_col_ptr->dump_structure() |
2710 | 0 | << " select_size: " << select_size; |
2711 | 0 | return Status::RuntimeError("copy_column_data_by_selector nullable mismatch"); |
2712 | 0 | } |
2713 | 1.39k | output_col->reserve(select_size); |
2714 | 1.39k | return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col.get()); |
2715 | 1.39k | } |
2716 | | |
2717 | 12.9k | Status SegmentIterator::_next_batch_internal(Block* block) { |
2718 | 12.9k | SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch); |
2719 | | |
2720 | 12.9k | bool is_mem_reuse = block->mem_reuse(); |
2721 | 12.9k | DCHECK(is_mem_reuse); |
2722 | | |
2723 | 12.9k | RETURN_IF_ERROR(_lazy_init(block)); |
2724 | | |
2725 | 12.9k | SCOPED_RAW_TIMER(&_opts.stats->block_load_ns); |
2726 | | |
2727 | 12.9k | if (_opts.read_limit > 0 && _rows_returned >= _opts.read_limit) { |
2728 | 0 | return _process_eof(block); |
2729 | 0 | } |
2730 | | |
2731 | | // If the row bitmap size is smaller than nrows_read_limit, there's no need to reserve that many column rows. |
2732 | 12.9k | uint32_t nrows_read_limit = |
2733 | 12.9k | std::min(cast_set<uint32_t>(_row_bitmap.cardinality()), _opts.block_row_max); |
2734 | 12.9k | if (_can_opt_limit_reads()) { |
2735 | | // No SegmentIterator-side conjunct remains to be evaluated, so LIMIT is equivalent before |
2736 | | // and after filtering. Cap the first read directly; this is the no-conjunct fast path that |
2737 | | // avoids reading rows past the pushed-down local LIMIT. |
2738 | 0 | size_t cap = (_opts.read_limit > _rows_returned) ? (_opts.read_limit - _rows_returned) : 0; |
2739 | 0 | if (cap < nrows_read_limit) { |
2740 | 0 | nrows_read_limit = static_cast<uint32_t>(cap); |
2741 | 0 | } |
2742 | 0 | } |
2743 | 12.9k | DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", { |
2744 | 12.9k | if (nrows_read_limit != 1) { |
2745 | 12.9k | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
2746 | 12.9k | "topn opt 1 execute failed: nrows_read_limit={}, " |
2747 | 12.9k | "_opts.read_limit={}", |
2748 | 12.9k | nrows_read_limit, _opts.read_limit); |
2749 | 12.9k | } |
2750 | 12.9k | }) |
2751 | | |
2752 | 12.9k | RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit)); |
2753 | 12.9k | _converted_column_ids.assign(_schema->columns().size(), false); |
2754 | | |
2755 | 12.9k | _selected_size = 0; |
2756 | 12.9k | RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size)); |
2757 | 12.9k | _replace_version_col_if_needed(_predicate_column_ids, _selected_size); |
2758 | | |
2759 | 12.9k | _opts.stats->blocks_load += 1; |
2760 | 12.9k | _opts.stats->raw_rows_read += _selected_size; |
2761 | | |
2762 | 12.9k | if (_selected_size == 0) { |
2763 | 2.81k | return _process_eof(block); |
2764 | 2.81k | } |
2765 | | |
2766 | 10.1k | if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) { |
2767 | 1.69k | _sel_rowid_idx.resize(_selected_size); |
2768 | | |
2769 | 1.69k | if (_is_need_vec_eval || _is_need_short_eval) { |
2770 | 1.69k | _convert_dict_code_for_predicate_if_necessary(); |
2771 | | |
2772 | | // step 1: evaluate vectorization predicate |
2773 | 1.69k | _selected_size = |
2774 | 1.69k | _evaluate_vectorization_predicate(_sel_rowid_idx.data(), _selected_size); |
2775 | | |
2776 | | // step 2: evaluate short circuit predicate |
2777 | | // todo(wb) research whether need to read short predicate after vectorization evaluation |
2778 | | // to reduce cost of read short circuit columns. |
2779 | | // In SSB test, it make no difference; So need more scenarios to test |
2780 | 1.69k | _selected_size = |
2781 | 1.69k | _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), _selected_size); |
2782 | 1.69k | VLOG_DEBUG << fmt::format("After evaluate predicates, selected size: {} ", |
2783 | 0 | _selected_size); |
2784 | 1.69k | if (_selected_size > 0) { |
2785 | | // step 3.1: output short circuit and predicate column |
2786 | | // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids) |
2787 | | // see _vec_init_lazy_materialization |
2788 | | // todo(wb) need to tell input columnids from output columnids |
2789 | 1.66k | RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids, |
2790 | 1.66k | _sel_rowid_idx.data(), _selected_size)); |
2791 | | |
2792 | | // step 3.2: read remaining expr column and evaluate it. |
2793 | 1.66k | if (_is_need_expr_eval) { |
2794 | | // The predicate column contains the remaining expr column, no need second read. |
2795 | 0 | if (_common_expr_column_ids.size() > 0) { |
2796 | 0 | SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns); |
2797 | 0 | RETURN_IF_ERROR(_read_columns_by_rowids( |
2798 | 0 | _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(), |
2799 | 0 | _selected_size, &_current_return_columns)); |
2800 | 0 | _replace_version_col_if_needed(_common_expr_column_ids, _selected_size); |
2801 | 0 | RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); |
2802 | 0 | } |
2803 | | |
2804 | 0 | DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]); |
2805 | 0 | RETURN_IF_ERROR( |
2806 | 0 | _process_common_expr(_sel_rowid_idx.data(), _selected_size, block)); |
2807 | 0 | } |
2808 | 1.66k | } else { |
2809 | 30 | _fill_column_nothing(); |
2810 | 30 | if (_is_need_expr_eval) { |
2811 | 0 | RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block)); |
2812 | 0 | } |
2813 | 30 | } |
2814 | 1.69k | } else if (_is_need_expr_eval) { |
2815 | 0 | DCHECK(!_predicate_column_ids.empty()); |
2816 | 0 | RETURN_IF_ERROR(_process_columns(_predicate_column_ids, block)); |
2817 | | // first read all rows are insert block, initialize sel_rowid_idx to all rows. |
2818 | 0 | for (uint16_t i = 0; i < _selected_size; ++i) { |
2819 | 0 | _sel_rowid_idx[i] = i; |
2820 | 0 | } |
2821 | 0 | RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), _selected_size, block)); |
2822 | 0 | } |
2823 | | |
2824 | 1.69k | RETURN_IF_ERROR(_apply_read_limit_to_selected_rows(block, _selected_size)); |
2825 | | |
2826 | | // step4: read non_predicate column |
2827 | 1.69k | if (_selected_size > 0) { |
2828 | 1.66k | if (!_non_predicate_columns.empty()) { |
2829 | 1.38k | RETURN_IF_ERROR(_read_columns_by_rowids( |
2830 | 1.38k | _non_predicate_columns, _block_rowids, _sel_rowid_idx.data(), |
2831 | 1.38k | _selected_size, &_current_return_columns, |
2832 | 1.38k | _opts.condition_cache_digest && !_find_condition_cache)); |
2833 | 1.38k | _replace_version_col_if_needed(_non_predicate_columns, _selected_size); |
2834 | 1.38k | } else { |
2835 | 286 | if (_opts.condition_cache_digest && !_find_condition_cache) { |
2836 | 0 | auto& condition_cache = *_condition_cache; |
2837 | 0 | for (size_t i = 0; i < _selected_size; ++i) { |
2838 | 0 | auto rowid = _block_rowids[_sel_rowid_idx[i]]; |
2839 | 0 | condition_cache[rowid / SegmentIterator::CONDITION_CACHE_OFFSET] = true; |
2840 | 0 | } |
2841 | 0 | } |
2842 | 286 | } |
2843 | 1.66k | } |
2844 | 1.69k | } |
2845 | | |
2846 | | // step5: output columns |
2847 | 10.1k | RETURN_IF_ERROR(_output_non_pred_columns(block)); |
2848 | | // Convert inverted index bitmaps to result columns for virtual column exprs |
2849 | | // (e.g., MATCH projections). This must run before _materialization_of_virtual_column |
2850 | | // so that fast_execute() can find the pre-computed result columns. |
2851 | 10.1k | if (!_virtual_column_exprs.empty()) { |
2852 | 0 | bool use_sel = _is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval; |
2853 | 0 | uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr; |
2854 | 0 | std::vector<VExprContext*> vir_ctxs; |
2855 | 0 | vir_ctxs.reserve(_virtual_column_exprs.size()); |
2856 | 0 | for (auto& [cid, ctx] : _virtual_column_exprs) { |
2857 | 0 | vir_ctxs.push_back(ctx.get()); |
2858 | 0 | } |
2859 | 0 | _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size, block); |
2860 | 0 | } |
2861 | 10.1k | RETURN_IF_ERROR(_materialization_of_virtual_column(block)); |
2862 | | // shrink char_type suffix zero data |
2863 | 10.1k | block->shrink_char_type_column_suffix_zero(_char_type_idx); |
2864 | 10.1k | if (_opts.read_limit > 0) { |
2865 | 0 | _rows_returned += block->rows(); |
2866 | 0 | } |
2867 | 10.1k | return _check_output_block(block); |
2868 | 10.1k | } |
2869 | | |
2870 | 0 | Status SegmentIterator::_process_columns(const std::vector<ColumnId>& column_ids, Block* block) { |
2871 | 0 | RETURN_IF_ERROR(_convert_to_expected_type(column_ids)); |
2872 | 0 | for (auto cid : column_ids) { |
2873 | 0 | auto loc = _schema_block_id_map[cid]; |
2874 | 0 | block->replace_by_position(loc, std::move(_current_return_columns[cid])); |
2875 | 0 | } |
2876 | 0 | return Status::OK(); |
2877 | 0 | } |
2878 | | |
2879 | 30 | void SegmentIterator::_fill_column_nothing() { |
2880 | | // If column_predicate filters out all rows, the corresponding column in _current_return_columns[cid] must be a ColumnNothing. |
2881 | | // Because: |
2882 | | // 1. Before each batch, _init_return_columns is called to initialize _current_return_columns, and virtual columns in _current_return_columns are initialized as ColumnNothing. |
2883 | | // 2. When select_size == 0, the read method of VirtualColumnIterator will definitely not be called, so the corresponding Column remains a ColumnNothing |
2884 | 30 | for (const auto pair : _vir_cid_to_idx_in_block) { |
2885 | 0 | auto cid = pair.first; |
2886 | 0 | auto pos = pair.second; |
2887 | 0 | const auto* nothing_col = |
2888 | 0 | check_and_get_column<ColumnNothing>(_current_return_columns[cid].get()); |
2889 | 0 | DCHECK(nothing_col != nullptr) |
2890 | 0 | << fmt::format("ColumnNothing expected, but got {}, cid: {}, pos: {}", |
2891 | 0 | _current_return_columns[cid]->get_name(), cid, pos); |
2892 | 0 | _current_return_columns[cid] = _opts.vir_col_idx_to_type[pos]->create_column(); |
2893 | 0 | } |
2894 | 30 | } |
2895 | | |
2896 | 10.1k | Status SegmentIterator::_check_output_block(Block* block) { |
2897 | 10.1k | #ifndef NDEBUG |
2898 | 10.1k | size_t rows = block->rows(); |
2899 | 10.1k | size_t idx = 0; |
2900 | 20.7k | for (const auto& entry : *block) { |
2901 | 20.7k | if (!entry.column) { |
2902 | 0 | return Status::InternalError( |
2903 | 0 | "Column in idx {} is null, block columns {}, normal_columns {}, " |
2904 | 0 | "virtual_columns {}", |
2905 | 0 | idx, block->columns(), _schema->num_column_ids(), _virtual_column_exprs.size()); |
2906 | 20.7k | } else if (check_and_get_column<ColumnNothing>(entry.column.get())) { |
2907 | 0 | if (rows > 0) { |
2908 | 0 | std::vector<std::string> vcid_to_idx; |
2909 | 0 | for (const auto& pair : _vir_cid_to_idx_in_block) { |
2910 | 0 | vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second)); |
2911 | 0 | } |
2912 | 0 | std::string vir_cid_to_idx_in_block_msg = |
2913 | 0 | fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ",")); |
2914 | 0 | return Status::InternalError( |
2915 | 0 | "Column in idx {} is nothing, block columns {}, normal_columns {}, " |
2916 | 0 | "vir_cid_to_idx_in_block_msg {}", |
2917 | 0 | idx, block->columns(), _schema->num_column_ids(), |
2918 | 0 | vir_cid_to_idx_in_block_msg); |
2919 | 0 | } |
2920 | 20.7k | } else if (entry.column->size() != rows) { |
2921 | 0 | return Status::InternalError( |
2922 | 0 | "Unmatched size {}, expected {}, column: {}, type: {}, idx_in_block: {}, " |
2923 | 0 | "block: {}", |
2924 | 0 | entry.column->size(), rows, entry.column->get_name(), entry.type->get_name(), |
2925 | 0 | idx, block->dump_structure()); |
2926 | 0 | } |
2927 | 20.7k | idx++; |
2928 | 20.7k | } |
2929 | 10.1k | #endif |
2930 | 10.1k | return Status::OK(); |
2931 | 10.1k | } |
2932 | | |
2933 | 0 | Status SegmentIterator::_process_column_predicate() { |
2934 | 0 | return Status::OK(); |
2935 | 0 | } |
2936 | | |
2937 | 2.81k | Status SegmentIterator::_process_eof(Block* block) { |
2938 | | // Convert all columns in _current_return_columns to schema column |
2939 | 2.81k | RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids())); |
2940 | 9.66k | for (int i = 0; i < block->columns(); i++) { |
2941 | 6.84k | auto cid = _schema->column_id(i); |
2942 | 6.84k | if (!_is_pred_column[cid]) { |
2943 | 6.56k | block->replace_by_position(i, std::move(_current_return_columns[cid])); |
2944 | 6.56k | } |
2945 | 6.84k | } |
2946 | 2.81k | block->clear_column_data(); |
2947 | | // clear and release iterators memory footprint in advance |
2948 | 2.81k | _column_iterators.clear(); |
2949 | 2.81k | _index_iterators.clear(); |
2950 | 2.81k | return Status::EndOfFile("no more data in segment"); |
2951 | 2.81k | } |
2952 | | |
2953 | | Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, |
2954 | 0 | Block* block) { |
2955 | | // Here we just use col0 as row_number indicator. when reach here, we will calculate the predicates first. |
2956 | | // then use the result to reduce our data read(that is, expr push down). there's now row in block means the first |
2957 | | // column is not in common expr. so it's safe to replace it temporarily to provide correct `selected_size`. |
2958 | 0 | VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected size {}", block->rows(), |
2959 | 0 | _selected_size); |
2960 | |
|
2961 | 0 | bool need_mock_col = block->rows() != selected_size; |
2962 | 0 | MutableColumnPtr col0; |
2963 | 0 | if (need_mock_col) { |
2964 | 0 | col0 = std::move(*block->get_by_position(0).column).mutate(); |
2965 | 0 | block->replace_by_position( |
2966 | 0 | 0, block->get_by_position(0).type->create_column_const_with_default_value( |
2967 | 0 | _selected_size)); |
2968 | 0 | } |
2969 | |
|
2970 | 0 | std::vector<VExprContext*> common_ctxs; |
2971 | 0 | common_ctxs.reserve(_common_expr_ctxs_push_down.size()); |
2972 | 0 | for (auto& ctx : _common_expr_ctxs_push_down) { |
2973 | 0 | common_ctxs.push_back(ctx.get()); |
2974 | 0 | } |
2975 | 0 | _output_index_result_column(common_ctxs, _sel_rowid_idx.data(), _selected_size, block); |
2976 | 0 | block->shrink_char_type_column_suffix_zero(_char_type_idx); |
2977 | 0 | RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), _selected_size, block)); |
2978 | | |
2979 | 0 | if (need_mock_col) { |
2980 | 0 | block->replace_by_position(0, std::move(col0)); |
2981 | 0 | } |
2982 | |
|
2983 | 0 | VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, selected size {}", |
2984 | 0 | block->rows(), _selected_size); |
2985 | 0 | return Status::OK(); |
2986 | 0 | } |
2987 | | |
2988 | | Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size, |
2989 | 0 | Block* block) { |
2990 | 0 | SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns); |
2991 | 0 | DCHECK(!_remaining_conjunct_roots.empty()); |
2992 | 0 | DCHECK(block->rows() != 0); |
2993 | 0 | int prev_columns = block->columns(); |
2994 | 0 | uint16_t original_size = selected_size; |
2995 | 0 | _opts.stats->expr_cond_input_rows += original_size; |
2996 | |
|
2997 | 0 | IColumn::Filter filter; |
2998 | 0 | RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block( |
2999 | 0 | _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter)); |
3000 | | |
3001 | 0 | selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter); |
3002 | 0 | _opts.stats->rows_expr_cond_filtered += original_size - selected_size; |
3003 | 0 | return Status::OK(); |
3004 | 0 | } |
3005 | | |
3006 | | uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx, |
3007 | | uint16_t selected_size, |
3008 | 0 | const IColumn::Filter& filter) { |
3009 | 0 | size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size()); |
3010 | 0 | if (count == 0) { |
3011 | 0 | return 0; |
3012 | 0 | } else { |
3013 | 0 | const UInt8* filt_pos = filter.data(); |
3014 | |
|
3015 | 0 | uint16_t new_size = 0; |
3016 | 0 | uint32_t sel_pos = 0; |
3017 | 0 | const uint32_t sel_end = selected_size; |
3018 | 0 | static constexpr size_t SIMD_BYTES = simd::bits_mask_length(); |
3019 | 0 | const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES; |
3020 | |
|
3021 | 0 | while (sel_pos < sel_end_simd) { |
3022 | 0 | auto mask = simd::bytes_mask_to_bits_mask(filt_pos + sel_pos); |
3023 | 0 | if (0 == mask) { |
3024 | | //pass |
3025 | 0 | } else if (simd::bits_mask_all() == mask) { |
3026 | 0 | for (uint32_t i = 0; i < SIMD_BYTES; i++) { |
3027 | 0 | sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i]; |
3028 | 0 | } |
3029 | 0 | } else { |
3030 | 0 | simd::iterate_through_bits_mask( |
3031 | 0 | [&](const size_t bit_pos) { |
3032 | 0 | sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + bit_pos]; |
3033 | 0 | }, |
3034 | 0 | mask); |
3035 | 0 | } |
3036 | 0 | sel_pos += SIMD_BYTES; |
3037 | 0 | } |
3038 | |
|
3039 | 0 | for (; sel_pos < sel_end; sel_pos++) { |
3040 | 0 | if (filt_pos[sel_pos]) { |
3041 | 0 | sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos]; |
3042 | 0 | } |
3043 | 0 | } |
3044 | 0 | return new_size; |
3045 | 0 | } |
3046 | 0 | } |
3047 | | |
3048 | | void SegmentIterator::_output_index_result_column(const std::vector<VExprContext*>& expr_ctxs, |
3049 | | uint16_t* sel_rowid_idx, uint16_t select_size, |
3050 | 0 | Block* block) { |
3051 | 0 | SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer); |
3052 | 0 | if (block->rows() == 0) { |
3053 | 0 | return; |
3054 | 0 | } |
3055 | 0 | for (auto* expr_ctx_ptr : expr_ctxs) { |
3056 | 0 | auto index_ctx = expr_ctx_ptr->get_index_context(); |
3057 | 0 | if (index_ctx == nullptr) { |
3058 | 0 | continue; |
3059 | 0 | } |
3060 | 0 | for (auto& inverted_index_result_bitmap_for_expr : index_ctx->get_index_result_bitmap()) { |
3061 | 0 | const auto* expr = inverted_index_result_bitmap_for_expr.first; |
3062 | 0 | const auto& result_bitmap = inverted_index_result_bitmap_for_expr.second; |
3063 | 0 | const auto& index_result_bitmap = result_bitmap.get_data_bitmap(); |
3064 | 0 | auto index_result_column = ColumnUInt8::create(); |
3065 | 0 | ColumnUInt8::Container& vec_match_pred = index_result_column->get_data(); |
3066 | 0 | vec_match_pred.resize(block->rows()); |
3067 | 0 | std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0); |
3068 | |
|
3069 | 0 | const auto& null_bitmap = result_bitmap.get_null_bitmap(); |
3070 | 0 | bool has_null_bitmap = null_bitmap != nullptr && !null_bitmap->isEmpty(); |
3071 | 0 | bool expr_returns_nullable = expr->data_type()->is_nullable(); |
3072 | |
|
3073 | 0 | ColumnUInt8::MutablePtr null_map_column = nullptr; |
3074 | 0 | ColumnUInt8::Container* null_map_data = nullptr; |
3075 | 0 | if (has_null_bitmap && expr_returns_nullable) { |
3076 | 0 | null_map_column = ColumnUInt8::create(); |
3077 | 0 | auto& null_map_vec = null_map_column->get_data(); |
3078 | 0 | null_map_vec.resize(block->rows()); |
3079 | 0 | std::fill(null_map_vec.begin(), null_map_vec.end(), 0); |
3080 | 0 | null_map_data = &null_map_column->get_data(); |
3081 | 0 | } |
3082 | |
|
3083 | 0 | roaring::BulkContext bulk_context; |
3084 | 0 | for (uint32_t i = 0; i < select_size; i++) { |
3085 | 0 | auto rowid = sel_rowid_idx ? _block_rowids[sel_rowid_idx[i]] : _block_rowids[i]; |
3086 | 0 | if (index_result_bitmap) { |
3087 | 0 | vec_match_pred[i] = index_result_bitmap->containsBulk(bulk_context, rowid); |
3088 | 0 | } |
3089 | 0 | if (null_map_data != nullptr && null_bitmap->contains(rowid)) { |
3090 | 0 | (*null_map_data)[i] = 1; |
3091 | 0 | vec_match_pred[i] = 0; |
3092 | 0 | } |
3093 | 0 | } |
3094 | |
|
3095 | 0 | DCHECK(block->rows() == vec_match_pred.size()); |
3096 | |
|
3097 | 0 | if (null_map_column) { |
3098 | 0 | index_ctx->set_index_result_column_for_expr( |
3099 | 0 | expr, ColumnNullable::create(std::move(index_result_column), |
3100 | 0 | std::move(null_map_column))); |
3101 | 0 | } else { |
3102 | 0 | index_ctx->set_index_result_column_for_expr(expr, std::move(index_result_column)); |
3103 | 0 | } |
3104 | 0 | } |
3105 | 0 | } |
3106 | 0 | } |
3107 | | |
3108 | 1.69k | void SegmentIterator::_convert_dict_code_for_predicate_if_necessary() { |
3109 | 1.69k | for (auto predicate : _short_cir_eval_predicate) { |
3110 | 0 | _convert_dict_code_for_predicate_if_necessary_impl(predicate); |
3111 | 0 | } |
3112 | | |
3113 | 1.69k | for (auto predicate : _pre_eval_block_predicate) { |
3114 | 0 | _convert_dict_code_for_predicate_if_necessary_impl(predicate); |
3115 | 0 | } |
3116 | | |
3117 | 1.69k | for (auto column_id : _delete_range_column_ids) { |
3118 | 1.55k | _current_return_columns[column_id].get()->convert_dict_codes_if_necessary(); |
3119 | 1.55k | } |
3120 | | |
3121 | 1.69k | for (auto column_id : _delete_bloom_filter_column_ids) { |
3122 | 0 | _current_return_columns[column_id].get()->initialize_hash_values_for_runtime_filter(); |
3123 | 0 | } |
3124 | 1.69k | } |
3125 | | |
3126 | | void SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl( |
3127 | 0 | std::shared_ptr<ColumnPredicate> predicate) { |
3128 | 0 | auto& column = _current_return_columns[predicate->column_id()]; |
3129 | 0 | auto* col_ptr = column.get(); |
3130 | |
|
3131 | 0 | if (PredicateTypeTraits::is_range(predicate->type())) { |
3132 | 0 | col_ptr->convert_dict_codes_if_necessary(); |
3133 | 0 | } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) { |
3134 | 0 | col_ptr->initialize_hash_values_for_runtime_filter(); |
3135 | 0 | } |
3136 | 0 | } |
3137 | | |
3138 | 2.93k | Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* block_row_locations) { |
3139 | 2.93k | DCHECK(_opts.record_rowids); |
3140 | 2.93k | DCHECK_GE(_block_rowids.size(), _selected_size); |
3141 | 2.93k | block_row_locations->resize(_selected_size); |
3142 | 2.93k | uint32_t sid = segment_id(); |
3143 | 2.93k | if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) { |
3144 | 4.24M | for (auto i = 0; i < _selected_size; i++) { |
3145 | 4.23M | (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]); |
3146 | 4.23M | } |
3147 | 1.76k | } else { |
3148 | 2.46M | for (auto i = 0; i < _selected_size; i++) { |
3149 | 2.46M | (*block_row_locations)[i] = RowLocation(sid, _block_rowids[_sel_rowid_idx[i]]); |
3150 | 2.46M | } |
3151 | 1.16k | } |
3152 | 2.93k | return Status::OK(); |
3153 | 2.93k | } |
3154 | | |
3155 | 2.82k | Status SegmentIterator::_construct_compound_expr_context() { |
3156 | 2.82k | ColumnIteratorOptions iter_opts { |
3157 | 2.82k | .use_page_cache = _opts.use_page_cache, |
3158 | 2.82k | .file_reader = _file_reader.get(), |
3159 | 2.82k | .stats = _opts.stats, |
3160 | 2.82k | .io_ctx = _opts.io_ctx, |
3161 | 2.82k | }; |
3162 | 2.82k | auto inverted_index_context = std::make_shared<IndexExecContext>( |
3163 | 2.82k | _schema->column_ids(), _index_iterators, _storage_name_and_type, |
3164 | 2.82k | _common_expr_index_exec_status, _score_runtime, _segment.get(), iter_opts); |
3165 | 2.82k | inverted_index_context->set_index_query_context(_index_query_context); |
3166 | 2.82k | for (const auto& expr_ctx : _opts.common_expr_ctxs_push_down) { |
3167 | 0 | VExprContextSPtr context; |
3168 | | // _ann_range_search_runtime will do deep copy. |
3169 | 0 | RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context)); |
3170 | 0 | context->set_index_context(inverted_index_context); |
3171 | 0 | _common_expr_ctxs_push_down.emplace_back(context); |
3172 | 0 | } |
3173 | | // Clone virtual column exprs before setting IndexExecContext, because |
3174 | | // IndexExecContext holds segment-specific index iterator references. |
3175 | | // Without cloning, shared VExprContext would be overwritten per-segment |
3176 | | // and could point to the wrong segment's context. |
3177 | 2.82k | for (auto& [cid, expr_ctx] : _virtual_column_exprs) { |
3178 | 0 | VExprContextSPtr context; |
3179 | 0 | RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context)); |
3180 | 0 | context->set_index_context(inverted_index_context); |
3181 | 0 | expr_ctx = context; |
3182 | 0 | } |
3183 | 2.82k | return Status::OK(); |
3184 | 2.82k | } |
3185 | | |
3186 | 2.82k | void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() { |
3187 | 2.82k | for (const auto& root_expr_ctx : _common_expr_ctxs_push_down) { |
3188 | 0 | const auto& root_expr = root_expr_ctx->root(); |
3189 | 0 | if (root_expr == nullptr) { |
3190 | 0 | continue; |
3191 | 0 | } |
3192 | 0 | _common_expr_to_slotref_map[root_expr_ctx.get()] = std::unordered_map<ColumnId, VExpr*>(); |
3193 | |
|
3194 | 0 | std::stack<VExprSPtr> stack; |
3195 | 0 | stack.emplace(root_expr); |
3196 | |
|
3197 | 0 | while (!stack.empty()) { |
3198 | 0 | const auto& expr = stack.top(); |
3199 | 0 | stack.pop(); |
3200 | |
|
3201 | 0 | for (const auto& child : expr->children()) { |
3202 | 0 | if (child->is_virtual_slot_ref()) { |
3203 | | // Expand virtual slot ref to its underlying expression tree and |
3204 | | // collect real slot refs used inside. We still associate those |
3205 | | // slot refs with the current parent expr node for inverted index |
3206 | | // tracking, just like normal slot refs. |
3207 | 0 | auto* vir_slot_ref = assert_cast<VirtualSlotRef*>(child.get()); |
3208 | 0 | auto vir_expr = vir_slot_ref->get_virtual_column_expr(); |
3209 | 0 | if (vir_expr) { |
3210 | 0 | std::stack<VExprSPtr> vir_stack; |
3211 | 0 | vir_stack.emplace(vir_expr); |
3212 | |
|
3213 | 0 | while (!vir_stack.empty()) { |
3214 | 0 | const auto& vir_node = vir_stack.top(); |
3215 | 0 | vir_stack.pop(); |
3216 | |
|
3217 | 0 | for (const auto& vir_child : vir_node->children()) { |
3218 | 0 | if (vir_child->is_slot_ref()) { |
3219 | 0 | auto* inner_slot_ref = assert_cast<VSlotRef*>(vir_child.get()); |
3220 | 0 | _common_expr_index_exec_status[_schema->column_id( |
3221 | 0 | inner_slot_ref->column_id())][expr.get()] = false; |
3222 | 0 | _common_expr_to_slotref_map[root_expr_ctx.get()] |
3223 | 0 | [inner_slot_ref->column_id()] = |
3224 | 0 | expr.get(); |
3225 | 0 | } |
3226 | |
|
3227 | 0 | if (!vir_child->children().empty()) { |
3228 | 0 | vir_stack.emplace(vir_child); |
3229 | 0 | } |
3230 | 0 | } |
3231 | 0 | } |
3232 | 0 | } |
3233 | 0 | } |
3234 | | // Example: CAST(v['a'] AS VARCHAR) MATCH 'hello', do not add CAST expr to index tracking. |
3235 | 0 | auto expr_without_cast = VExpr::expr_without_cast(child); |
3236 | 0 | if (expr_without_cast->is_slot_ref() && expr->op() != TExprOpcode::CAST) { |
3237 | 0 | auto* column_slot_ref = assert_cast<VSlotRef*>(expr_without_cast.get()); |
3238 | 0 | _common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())] |
3239 | 0 | [expr.get()] = false; |
3240 | 0 | _common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] = |
3241 | 0 | expr.get(); |
3242 | 0 | } |
3243 | 0 | } |
3244 | |
|
3245 | 0 | const auto& children = expr->children(); |
3246 | 0 | for (int i = cast_set<int>(children.size()) - 1; i >= 0; --i) { |
3247 | 0 | if (!children[i]->children().empty()) { |
3248 | 0 | stack.emplace(children[i]); |
3249 | 0 | } |
3250 | 0 | } |
3251 | 0 | } |
3252 | 0 | } |
3253 | 2.82k | } |
3254 | | |
3255 | | bool SegmentIterator::_no_need_read_key_data(ColumnId cid, MutableColumnPtr& column, |
3256 | 27.1k | size_t nrows_read) { |
3257 | 27.1k | if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) { |
3258 | 0 | return false; |
3259 | 0 | } |
3260 | | |
3261 | 27.1k | if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS || |
3262 | 27.1k | (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS && |
3263 | 11.1k | _opts.enable_unique_key_merge_on_write)))) { |
3264 | 7.58k | return false; |
3265 | 7.58k | } |
3266 | | |
3267 | 19.5k | if (_opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) { |
3268 | 19.5k | return false; |
3269 | 19.5k | } |
3270 | | |
3271 | 0 | if (!_opts.tablet_schema->column(cid).is_key()) { |
3272 | 0 | return false; |
3273 | 0 | } |
3274 | | |
3275 | 0 | if (_has_delete_predicate(cid)) { |
3276 | 0 | return false; |
3277 | 0 | } |
3278 | | |
3279 | 0 | if (!_check_all_conditions_passed_inverted_index_for_column(cid)) { |
3280 | 0 | return false; |
3281 | 0 | } |
3282 | | |
3283 | 0 | if (column->is_nullable()) { |
3284 | 0 | auto* nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get()); |
3285 | 0 | nullable_col_ptr->get_null_map_column().insert_many_defaults(nrows_read); |
3286 | 0 | nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(nrows_read); |
3287 | 0 | } else { |
3288 | 0 | column->insert_many_defaults(nrows_read); |
3289 | 0 | } |
3290 | |
|
3291 | 0 | return true; |
3292 | 0 | } |
3293 | | |
3294 | 19.5k | bool SegmentIterator::_has_delete_predicate(ColumnId cid) { |
3295 | 19.5k | std::set<uint32_t> delete_columns_set; |
3296 | 19.5k | _opts.delete_condition_predicates->get_all_column_ids(delete_columns_set); |
3297 | 19.5k | return delete_columns_set.contains(cid); |
3298 | 19.5k | } |
3299 | | |
3300 | 12.9k | bool SegmentIterator::_can_opt_limit_reads() { |
3301 | 12.9k | if (_opts.read_limit == 0) { |
3302 | 12.9k | return false; |
3303 | 12.9k | } |
3304 | | |
3305 | | // If SegmentIterator still needs to evaluate predicates/common exprs, LIMIT must be applied to |
3306 | | // post-filter rows by _apply_read_limit_to_selected_rows(); capping the raw read here could |
3307 | | // return fewer rows than the query LIMIT. |
3308 | 7 | if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) { |
3309 | 3 | return false; |
3310 | 3 | } |
3311 | | |
3312 | 4 | if (_opts.delete_condition_predicates->num_of_column_predicate() > 0) { |
3313 | 1 | return false; |
3314 | 1 | } |
3315 | | |
3316 | 3 | bool all_true = std::ranges::all_of(_schema->column_ids(), [this](auto cid) { |
3317 | 3 | if (cid == _opts.tablet_schema->delete_sign_idx()) { |
3318 | 0 | return true; |
3319 | 0 | } |
3320 | 3 | if (_check_all_conditions_passed_inverted_index_for_column(cid, true)) { |
3321 | 2 | return true; |
3322 | 2 | } |
3323 | 1 | return false; |
3324 | 3 | }); |
3325 | | |
3326 | 3 | DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", { |
3327 | 3 | LOG(INFO) << "col_predicates: " << _col_predicates.size() << ", all_true: " << all_true; |
3328 | 3 | }) |
3329 | | |
3330 | 3 | DBUG_EXECUTE_IF("segment_iterator.topn_opt_2", { |
3331 | 3 | if (all_true) { |
3332 | 3 | return Status::Error<ErrorCode::INTERNAL_ERROR>("topn opt 2 execute failed"); |
3333 | 3 | } |
3334 | 3 | }) |
3335 | | |
3336 | 3 | return all_true; |
3337 | 3 | } |
3338 | | |
3339 | | // Before get next batch. make sure all virtual columns in block has type ColumnNothing. |
3340 | 12.9k | void SegmentIterator::_init_virtual_columns(Block* block) { |
3341 | 12.9k | for (const auto& pair : _vir_cid_to_idx_in_block) { |
3342 | 0 | auto& col_with_type_and_name = block->get_by_position(pair.second); |
3343 | 0 | col_with_type_and_name.column = ColumnNothing::create(0); |
3344 | 0 | col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second]; |
3345 | 0 | } |
3346 | 12.9k | } |
3347 | | |
3348 | 10.1k | Status SegmentIterator::_materialization_of_virtual_column(Block* block) { |
3349 | | // Some expr can not process empty block, such as function `element_at`. |
3350 | | // So materialize virtual column in advance to avoid errors. |
3351 | 10.1k | if (block->rows() == 0) { |
3352 | 30 | for (const auto& pair : _vir_cid_to_idx_in_block) { |
3353 | 0 | auto& col_with_type_and_name = block->get_by_position(pair.second); |
3354 | 0 | col_with_type_and_name.column = _opts.vir_col_idx_to_type[pair.second]->create_column(); |
3355 | 0 | col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second]; |
3356 | 0 | } |
3357 | 30 | return Status::OK(); |
3358 | 30 | } |
3359 | | |
3360 | 10.0k | for (const auto& cid_and_expr : _virtual_column_exprs) { |
3361 | 0 | auto cid = cid_and_expr.first; |
3362 | 0 | auto column_expr = cid_and_expr.second; |
3363 | 0 | size_t idx_in_block = _vir_cid_to_idx_in_block[cid]; |
3364 | 0 | if (block->columns() <= idx_in_block) { |
3365 | 0 | return Status::InternalError( |
3366 | 0 | "Virtual column index {} is out of range, block columns {}, " |
3367 | 0 | "virtual columns size {}, virtual column expr {}", |
3368 | 0 | idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(), |
3369 | 0 | column_expr->root()->debug_string()); |
3370 | 0 | } else if (block->get_by_position(idx_in_block).column.get() == nullptr) { |
3371 | 0 | return Status::InternalError( |
3372 | 0 | "Virtual column index {} is null, block columns {}, virtual columns size {}, " |
3373 | 0 | "virtual column expr {}", |
3374 | 0 | idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(), |
3375 | 0 | column_expr->root()->debug_string()); |
3376 | 0 | } |
3377 | 0 | block->shrink_char_type_column_suffix_zero(_char_type_idx); |
3378 | 0 | if (check_and_get_column<const ColumnNothing>( |
3379 | 0 | block->get_by_position(idx_in_block).column.get())) { |
3380 | 0 | VLOG_DEBUG << fmt::format("Virtual column is doing materialization, cid {}, col idx {}", |
3381 | 0 | cid, idx_in_block); |
3382 | 0 | ColumnPtr result_column; |
3383 | 0 | RETURN_IF_ERROR(column_expr->execute(block, result_column)); |
3384 | | |
3385 | 0 | block->replace_by_position(idx_in_block, std::move(result_column)); |
3386 | 0 | if (block->get_by_position(idx_in_block).column->size() == 0) { |
3387 | 0 | LOG_WARNING("Result of expr column {} is empty. cid {}, idx_in_block {}", |
3388 | 0 | column_expr->root()->debug_string(), cid, idx_in_block); |
3389 | 0 | } |
3390 | 0 | } |
3391 | 0 | } |
3392 | 10.0k | return Status::OK(); |
3393 | 10.0k | } |
3394 | | |
3395 | 2.82k | void SegmentIterator::_prepare_score_column_materialization() { |
3396 | 2.82k | if (_score_runtime == nullptr) { |
3397 | 2.82k | return; |
3398 | 2.82k | } |
3399 | | |
3400 | 0 | ScoreRangeFilterPtr filter; |
3401 | 0 | if (_score_runtime->has_score_range_filter()) { |
3402 | 0 | const auto& range_info = _score_runtime->get_score_range_info(); |
3403 | 0 | filter = std::make_shared<ScoreRangeFilter>(range_info->op, range_info->threshold); |
3404 | 0 | } |
3405 | |
|
3406 | 0 | IColumn::MutablePtr result_column; |
3407 | 0 | auto result_row_ids = std::make_unique<std::vector<uint64_t>>(); |
3408 | 0 | if (_score_runtime->get_limit() > 0 && _col_predicates.empty() && |
3409 | 0 | _common_expr_ctxs_push_down.empty()) { |
3410 | 0 | OrderType order_type = _score_runtime->is_asc() ? OrderType::ASC : OrderType::DESC; |
3411 | 0 | _index_query_context->collection_similarity->get_topn_bm25_scores( |
3412 | 0 | &_row_bitmap, result_column, result_row_ids, order_type, |
3413 | 0 | _score_runtime->get_limit(), filter); |
3414 | 0 | } else { |
3415 | 0 | _index_query_context->collection_similarity->get_bm25_scores(&_row_bitmap, result_column, |
3416 | 0 | result_row_ids, filter); |
3417 | 0 | } |
3418 | 0 | const size_t dst_col_idx = _score_runtime->get_dest_column_idx(); |
3419 | 0 | auto* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get(); |
3420 | 0 | auto* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter); |
3421 | 0 | virtual_column_iter->prepare_materialization(std::move(result_column), |
3422 | 0 | std::move(result_row_ids)); |
3423 | 0 | } |
3424 | | |
3425 | | } // namespace segment_v2 |
3426 | | } // namespace doris |