Coverage Report

Created: 2026-04-02 17:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_iterator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_iterator.h"
19
20
#include <assert.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Opcodes_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <cstdint>
29
#include <memory>
30
#include <numeric>
31
#include <set>
32
#include <unordered_map>
33
#include <utility>
34
#include <vector>
35
36
#include "cloud/config.h"
37
#include "common/compiler_util.h" // IWYU pragma: keep
38
#include "common/config.h"
39
#include "common/consts.h"
40
#include "common/exception.h"
41
#include "common/logging.h"
42
#include "common/metrics/doris_metrics.h"
43
#include "common/object_pool.h"
44
#include "common/status.h"
45
#include "core/assert_cast.h"
46
#include "core/block/column_with_type_and_name.h"
47
#include "core/column/column.h"
48
#include "core/column/column_const.h"
49
#include "core/column/column_nothing.h"
50
#include "core/column/column_nullable.h"
51
#include "core/column/column_string.h"
52
#include "core/column/column_variant.h"
53
#include "core/column/column_vector.h"
54
#include "core/data_type/data_type.h"
55
#include "core/data_type/data_type_factory.hpp"
56
#include "core/data_type/data_type_number.h"
57
#include "core/data_type/define_primitive_type.h"
58
#include "core/field.h"
59
#include "core/string_ref.h"
60
#include "core/typeid_cast.h"
61
#include "core/types.h"
62
#include "exprs/function/array/function_array_index.h"
63
#include "exprs/vexpr.h"
64
#include "exprs/vexpr_context.h"
65
#include "exprs/virtual_slot_ref.h"
66
#include "exprs/vliteral.h"
67
#include "exprs/vslot_ref.h"
68
#include "io/cache/cached_remote_file_reader.h"
69
#include "io/fs/file_reader.h"
70
#include "io/io_common.h"
71
#include "runtime/query_context.h"
72
#include "runtime/runtime_predicate.h"
73
#include "runtime/runtime_state.h"
74
#include "runtime/thread_context.h"
75
#include "storage/compaction/collection_similarity.h"
76
#include "storage/field.h"
77
#include "storage/id_manager.h"
78
#include "storage/index/ann/ann_index.h"
79
#include "storage/index/ann/ann_index_iterator.h"
80
#include "storage/index/ann/ann_index_reader.h"
81
#include "storage/index/ann/ann_topn_runtime.h"
82
#include "storage/index/index_file_reader.h"
83
#include "storage/index/index_iterator.h"
84
#include "storage/index/index_query_context.h"
85
#include "storage/index/index_reader_helper.h"
86
#include "storage/index/indexed_column_reader.h"
87
#include "storage/index/inverted/inverted_index_reader.h"
88
#include "storage/index/ordinal_page_index.h"
89
#include "storage/index/primary_key_index.h"
90
#include "storage/index/short_key_index.h"
91
#include "storage/iterators.h"
92
#include "storage/olap_common.h"
93
#include "storage/predicate/bloom_filter_predicate.h"
94
#include "storage/predicate/column_predicate.h"
95
#include "storage/predicate/like_column_predicate.h"
96
#include "storage/schema.h"
97
#include "storage/segment/column_reader.h"
98
#include "storage/segment/column_reader_cache.h"
99
#include "storage/segment/condition_cache.h"
100
#include "storage/segment/row_ranges.h"
101
#include "storage/segment/segment.h"
102
#include "storage/segment/segment_prefetcher.h"
103
#include "storage/segment/variant/variant_column_reader.h"
104
#include "storage/segment/virtual_column_iterator.h"
105
#include "storage/tablet/tablet_schema.h"
106
#include "storage/types.h"
107
#include "storage/utils.h"
108
#include "util/concurrency_stats.h"
109
#include "util/defer_op.h"
110
#include "util/simd/bits.h"
111
112
namespace doris {
113
using namespace ErrorCode;
114
namespace segment_v2 {
115
116
#include "common/compile_check_begin.h"
117
118
5.63k
SegmentIterator::~SegmentIterator() = default;
119
120
5.63k
void SegmentIterator::_init_row_bitmap_by_condition_cache() {
121
    // Only dispose need column predicate and expr cal in condition cache
122
5.63k
    if (!_col_predicates.empty() ||
123
5.63k
        (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty())) {
124
0
        if (_opts.condition_cache_digest) {
125
0
            auto* condition_cache = ConditionCache::instance();
126
0
            ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
127
0
                                               _opts.condition_cache_digest);
128
129
            // Increment search count when digest != 0
130
0
            DorisMetrics::instance()->condition_cache_search_count->increment(1);
131
132
0
            ConditionCacheHandle handle;
133
0
            _find_condition_cache = condition_cache->lookup(cache_key, &handle);
134
135
            // Increment hit count if cache lookup is successful
136
0
            if (_find_condition_cache) {
137
0
                DorisMetrics::instance()->condition_cache_hit_count->increment(1);
138
0
                if (_opts.runtime_state) {
139
0
                    VLOG_DEBUG << "Condition cache hit, query id: "
140
0
                               << print_id(_opts.runtime_state->query_id())
141
0
                               << ", segment id: " << _segment->id()
142
0
                               << ", cache digest: " << _opts.condition_cache_digest
143
0
                               << ", rowset id: " << _opts.rowset_id.to_string();
144
0
                }
145
0
            }
146
147
0
            auto num_rows = _segment->num_rows();
148
0
            if (_find_condition_cache) {
149
0
                const auto& filter_result = *(handle.get_filter_result());
150
0
                int64_t filtered_blocks = 0;
151
0
                for (int i = 0; i < filter_result.size(); i++) {
152
0
                    if (!filter_result[i]) {
153
0
                        _row_bitmap.removeRange(
154
0
                                i * CONDITION_CACHE_OFFSET,
155
0
                                i * CONDITION_CACHE_OFFSET + CONDITION_CACHE_OFFSET);
156
0
                        filtered_blocks++;
157
0
                    }
158
0
                }
159
                // Record condition_cache hit segment number
160
0
                _opts.stats->condition_cache_hit_seg_nums++;
161
                // Record rows filtered by condition cache hit
162
0
                _opts.stats->condition_cache_filtered_rows +=
163
0
                        filtered_blocks * SegmentIterator::CONDITION_CACHE_OFFSET;
164
0
            } else {
165
0
                _condition_cache = std::make_shared<std::vector<bool>>(
166
0
                        num_rows / CONDITION_CACHE_OFFSET + 1, false);
167
0
            }
168
0
        }
169
5.63k
    } else {
170
5.63k
        _opts.condition_cache_digest = 0;
171
5.63k
    }
172
5.63k
}
173
174
// A fast range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
175
// Example:
176
//   input bitmap:  [0 1 4 5 6 7 10 15 16 17 18 19]
177
//   output ranges: [0,2), [4,8), [10,11), [15,20) (when max_range_size=10)
178
//   output ranges: [0,2), [4,7), [7,8), [10,11), [15,18), [18,20) (when max_range_size=3)
179
class SegmentIterator::BitmapRangeIterator {
180
public:
181
0
    BitmapRangeIterator() = default;
182
5.63k
    virtual ~BitmapRangeIterator() = default;
183
184
5.63k
    explicit BitmapRangeIterator(const roaring::Roaring& bitmap) {
185
5.63k
        roaring_init_iterator(&bitmap.roaring, &_iter);
186
5.63k
    }
187
188
0
    bool has_more_range() const { return !_eof; }
189
190
12.1k
    [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; }
191
192
    // read next range into [*from, *to) whose size <= max_range_size.
193
    // return false when there is no more range.
194
0
    virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) {
195
0
        if (_eof) {
196
0
            return false;
197
0
        }
198
199
0
        *from = _buf[_buf_pos];
200
0
        uint32_t range_size = 0;
201
0
        uint32_t expect_val = _buf[_buf_pos]; // this initial value just make first batch valid
202
203
        // if array is contiguous sequence then the following conditions need to be met :
204
        // a_0: x
205
        // a_1: x+1
206
        // a_2: x+2
207
        // ...
208
        // a_p: x+p
209
        // so we can just use (a_p-a_0)-p to check conditions
210
        // and should notice the previous batch needs to be continuous with the current batch
211
0
        while (!_eof && range_size + _buf_size - _buf_pos <= max_range_size &&
212
0
               expect_val == _buf[_buf_pos] &&
213
0
               _buf[_buf_size - 1] - _buf[_buf_pos] == _buf_size - 1 - _buf_pos) {
214
0
            range_size += _buf_size - _buf_pos;
215
0
            expect_val = _buf[_buf_size - 1] + 1;
216
0
            _read_next_batch();
217
0
        }
218
219
        // promise remain range not will reach next batch
220
0
        if (!_eof && range_size < max_range_size && expect_val == _buf[_buf_pos]) {
221
0
            do {
222
0
                _buf_pos++;
223
0
                range_size++;
224
0
            } while (range_size < max_range_size && _buf[_buf_pos] == _buf[_buf_pos - 1] + 1);
225
0
        }
226
0
        *to = *from + range_size;
227
0
        return true;
228
0
    }
229
230
    // read batch_size of rowids from roaring bitmap into buf array
231
23.1k
    virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) {
232
23.1k
        return roaring::api::roaring_read_uint32_iterator(&_iter, buf, batch_size);
233
23.1k
    }
234
235
private:
236
0
    void _read_next_batch() {
237
0
        _buf_pos = 0;
238
0
        _buf_size = roaring::api::roaring_read_uint32_iterator(&_iter, _buf, kBatchSize);
239
0
        _eof = (_buf_size == 0);
240
0
    }
241
242
    static const uint32_t kBatchSize = 256;
243
    roaring::api::roaring_uint32_iterator_t _iter;
244
    uint32_t _buf[kBatchSize];
245
    uint32_t _buf_pos = 0;
246
    uint32_t _buf_size = 0;
247
    bool _eof = false;
248
};
249
250
// A backward range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
251
// Example:
252
//   input bitmap:  [0 1 4 5 6 7 10 15 16 17 18 19]
253
//   output ranges: , [15,20), [10,11), [4,8), [0,2) (when max_range_size=10)
254
//   output ranges: [17,20), [15,17), [10,11), [5,8), [4, 5), [0,2) (when max_range_size=3)
255
class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::BitmapRangeIterator {
256
public:
257
0
    explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) {
258
0
        roaring_init_iterator_last(&bitmap.roaring, &_riter);
259
0
        _rowid_count = cast_set<uint32_t>(roaring_bitmap_get_cardinality(&bitmap.roaring));
260
0
        _rowid_left = _rowid_count;
261
0
    }
262
263
0
    bool has_more_range() const { return !_riter.has_value; }
264
265
    // read next range into [*from, *to) whose size <= max_range_size.
266
    // return false when there is no more range.
267
0
    bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) override {
268
0
        if (!_riter.has_value) {
269
0
            return false;
270
0
        }
271
272
0
        uint32_t range_size = 0;
273
0
        *to = _riter.current_value + 1;
274
275
0
        do {
276
0
            *from = _riter.current_value;
277
0
            range_size++;
278
0
            roaring_previous_uint32_iterator(&_riter);
279
0
        } while (range_size < max_range_size && _riter.has_value &&
280
0
                 _riter.current_value + 1 == *from);
281
282
0
        return true;
283
0
    }
284
    /**
285
     * Reads a batch of row IDs from a roaring bitmap, starting from the end and moving backwards.
286
     * This function retrieves the last `batch_size` row IDs from the bitmap and stores them in the provided buffer.
287
     * It updates the internal state to track how many row IDs are left to read in subsequent calls.
288
     *
289
     * The row IDs are read in reverse order, but stored in the buffer maintaining their original order in the bitmap.
290
     *
291
     * Example:
292
     *   input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
293
     *   If the bitmap has 12 elements and batch_size is set to 5, the function will first read [15, 16, 17, 18, 19]
294
     *   into the buffer, leaving 7 elements left. In the next call with batch_size 5, it will read [4, 5, 6, 7, 10].
295
     *
296
     */
297
0
    uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override {
298
0
        if (!_riter.has_value || _rowid_left == 0) {
299
0
            return 0;
300
0
        }
301
302
0
        if (_rowid_count <= batch_size) {
303
0
            roaring_bitmap_to_uint32_array(_riter.parent,
304
0
                                           buf); // Fill 'buf' with '_rowid_count' elements.
305
0
            uint32_t num_read = _rowid_left;     // Save the number of row IDs read.
306
0
            _rowid_left = 0;                     // No row IDs left after this operation.
307
0
            return num_read;                     // Return the number of row IDs read.
308
0
        }
309
310
0
        uint32_t read_size = std::min(batch_size, _rowid_left);
311
0
        uint32_t num_read = 0; // Counter for the number of row IDs read.
312
313
        // Read row IDs into the buffer in reverse order.
314
0
        while (num_read < read_size && _riter.has_value) {
315
0
            buf[read_size - num_read - 1] = _riter.current_value;
316
0
            num_read++;
317
0
            _rowid_left--; // Decrement the count of remaining row IDs.
318
0
            roaring_previous_uint32_iterator(&_riter);
319
0
        }
320
321
        // Return the actual number of row IDs read.
322
0
        return num_read;
323
0
    }
324
325
private:
326
    roaring::api::roaring_uint32_iterator_t _riter;
327
    uint32_t _rowid_count;
328
    uint32_t _rowid_left;
329
};
330
331
SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema)
332
5.63k
        : _segment(std::move(segment)),
333
5.63k
          _schema(schema),
334
5.63k
          _column_iterators(_schema->num_columns()),
335
5.63k
          _index_iterators(_schema->num_columns()),
336
5.63k
          _cur_rowid(0),
337
5.63k
          _lazy_materialization_read(false),
338
5.63k
          _lazy_inited(false),
339
5.63k
          _inited(false),
340
5.63k
          _pool(new ObjectPool) {}
341
342
11.1k
Status SegmentIterator::init(const StorageReadOptions& opts) {
343
11.1k
    auto status = _init_impl(opts);
344
11.1k
    if (!status.ok()) {
345
0
        _segment->update_healthy_status(status);
346
0
    }
347
11.1k
    return status;
348
11.1k
}
349
350
11.1k
Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
351
    // get file handle from file descriptor of segment
352
11.1k
    if (_inited) {
353
5.49k
        return Status::OK();
354
5.49k
    }
355
5.63k
    _opts = opts;
356
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_timer_ns);
357
5.63k
    _inited = true;
358
5.63k
    _file_reader = _segment->_file_reader;
359
5.63k
    _col_predicates.clear();
360
361
5.63k
    for (const auto& predicate : opts.column_predicates) {
362
0
        if (!_segment->can_apply_predicate_safely(predicate->column_id(), *_schema,
363
0
                                                  _opts.target_cast_type_for_variants, _opts)) {
364
0
            continue;
365
0
        }
366
0
        _col_predicates.emplace_back(predicate);
367
0
    }
368
5.63k
    _tablet_id = opts.tablet_id;
369
    // Read options will not change, so that just resize here
370
5.63k
    _block_rowids.resize(_opts.block_row_max);
371
372
5.63k
    _remaining_conjunct_roots = opts.remaining_conjunct_roots;
373
374
5.63k
    if (_schema->rowid_col_idx() > 0) {
375
0
        _record_rowids = true;
376
0
    }
377
378
5.63k
    _virtual_column_exprs = _opts.virtual_column_exprs;
379
5.63k
    _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block;
380
5.63k
    _score_runtime = _opts.score_runtime;
381
5.63k
    _ann_topn_runtime = _opts.ann_topn_runtime;
382
383
5.63k
    if (opts.output_columns != nullptr) {
384
1.29k
        _output_columns = *(opts.output_columns);
385
1.29k
    }
386
387
5.63k
    _storage_name_and_type.resize(_schema->columns().size());
388
5.63k
    auto storage_format = _opts.tablet_schema->get_inverted_index_storage_format();
389
34.2k
    for (int i = 0; i < _schema->columns().size(); ++i) {
390
28.6k
        const StorageField* col = _schema->column(i);
391
28.6k
        if (col) {
392
12.6k
            auto storage_type = _segment->get_data_type_of(col->get_desc(), _opts);
393
12.6k
            if (storage_type == nullptr) {
394
0
                storage_type = DataTypeFactory::instance().create_data_type(col->get_desc(),
395
0
                                                                            col->is_nullable());
396
0
            }
397
            // Currently, when writing a lucene index, the field of the document is column_name, and the column name is
398
            // bound to the index field. Since version 1.2, the data file storage has been changed from column_name to
399
            // column_unique_id, allowing the column name to be changed. Due to current limitations, previous inverted
400
            // index data cannot be used after Doris changes the column name. Column names also support Unicode
401
            // characters, which may cause other problems with indexing in non-ASCII characters.
402
            // After consideration, it was decided to change the field name from column_name to column_unique_id in
403
            // format V2, while format V1 continues to use column_name.
404
12.6k
            std::string field_name;
405
12.6k
            if (storage_format == InvertedIndexStorageFormatPB::V1) {
406
9.98k
                field_name = col->name();
407
9.98k
            } else {
408
2.68k
                if (col->is_extracted_column()) {
409
                    // variant sub col
410
                    // field_name format: parent_unique_id.sub_col_name
411
0
                    field_name = std::to_string(col->parent_unique_id()) + "." + col->name();
412
2.68k
                } else {
413
2.68k
                    field_name = std::to_string(col->unique_id());
414
2.68k
                }
415
2.68k
            }
416
12.6k
            _storage_name_and_type[i] = std::make_pair(field_name, storage_type);
417
12.6k
            if (int32_t uid = col->get_unique_id(); !_variant_sparse_column_cache.contains(uid)) {
418
12.6k
                DCHECK(uid >= 0);
419
12.6k
                _variant_sparse_column_cache.emplace(uid,
420
12.6k
                                                     std::make_unique<PathToBinaryColumnCache>());
421
12.6k
            }
422
12.6k
        }
423
28.6k
    }
424
425
5.63k
    RETURN_IF_ERROR(init_iterators());
426
427
5.63k
    RETURN_IF_ERROR(_construct_compound_expr_context());
428
5.63k
    _enable_common_expr_pushdown = !_common_expr_ctxs_push_down.empty();
429
5.63k
    VLOG_DEBUG << fmt::format(
430
0
            "Segment iterator init, virtual_column_exprs size: {}, "
431
0
            "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}",
432
0
            _opts.virtual_column_exprs.size(), _opts.vir_cid_to_idx_in_block.size(),
433
0
            _common_expr_ctxs_push_down.size());
434
5.63k
    _initialize_predicate_results();
435
5.63k
    return Status::OK();
436
5.63k
}
437
438
5.63k
void SegmentIterator::_initialize_predicate_results() {
439
    // Initialize from _col_predicates
440
5.63k
    for (auto pred : _col_predicates) {
441
0
        int cid = pred->column_id();
442
0
        _column_predicate_index_exec_status[cid][pred] = false;
443
0
    }
444
445
5.63k
    _calculate_expr_in_remaining_conjunct_root();
446
5.63k
}
447
448
5.63k
Status SegmentIterator::init_iterators() {
449
5.63k
    RETURN_IF_ERROR(_init_return_column_iterators());
450
5.63k
    RETURN_IF_ERROR(_init_index_iterators());
451
5.63k
    return Status::OK();
452
5.63k
}
453
454
23.1k
Status SegmentIterator::_lazy_init(Block* block) {
455
23.1k
    if (_lazy_inited) {
456
17.4k
        return Status::OK();
457
17.4k
    }
458
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
459
5.63k
    DorisMetrics::instance()->segment_read_total->increment(1);
460
5.63k
    _row_bitmap.addRange(0, _segment->num_rows());
461
5.63k
    _init_row_bitmap_by_condition_cache();
462
463
    // z-order can not use prefix index
464
5.63k
    if (_segment->_tablet_schema->sort_type() != SortType::ZORDER &&
465
5.63k
        _segment->_tablet_schema->cluster_key_uids().empty()) {
466
5.63k
        RETURN_IF_ERROR(_get_row_ranges_by_keys());
467
5.63k
    }
468
5.63k
    RETURN_IF_ERROR(_get_row_ranges_by_column_conditions());
469
5.63k
    RETURN_IF_ERROR(_vec_init_lazy_materialization());
470
    // Remove rows that have been marked deleted
471
5.63k
    if (_opts.delete_bitmap.count(segment_id()) > 0 &&
472
5.63k
        _opts.delete_bitmap.at(segment_id()) != nullptr) {
473
25
        size_t pre_size = _row_bitmap.cardinality();
474
25
        _row_bitmap -= *(_opts.delete_bitmap.at(segment_id()));
475
25
        _opts.stats->rows_del_by_bitmap += (pre_size - _row_bitmap.cardinality());
476
25
        VLOG_DEBUG << "read on segment: " << segment_id() << ", delete bitmap cardinality: "
477
0
                   << _opts.delete_bitmap.at(segment_id())->cardinality() << ", "
478
0
                   << _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap";
479
25
    }
480
481
5.63k
    if (!_opts.row_ranges.is_empty()) {
482
0
        _row_bitmap &= RowRanges::ranges_to_roaring(_opts.row_ranges);
483
0
    }
484
485
5.63k
    _prepare_score_column_materialization();
486
487
5.63k
    RETURN_IF_ERROR(_apply_ann_topn_predicate());
488
489
5.63k
    if (_opts.read_orderby_key_reverse) {
490
0
        _range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap));
491
5.63k
    } else {
492
5.63k
        _range_iter.reset(new BitmapRangeIterator(_row_bitmap));
493
5.63k
    }
494
495
    // If the row bitmap size is smaller than block_row_max, there's no need to reserve that many column rows.
496
5.63k
    auto nrows_reserve_limit = std::min(_row_bitmap.cardinality(), uint64_t(_opts.block_row_max));
497
5.63k
    if (_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval) {
498
894
        _block_rowids.resize(_opts.block_row_max);
499
894
    }
500
5.63k
    _current_return_columns.resize(_schema->columns().size());
501
502
5.63k
    _vec_init_char_column_id(block);
503
18.3k
    for (size_t i = 0; i < _schema->column_ids().size(); i++) {
504
12.6k
        ColumnId cid = _schema->column_ids()[i];
505
12.6k
        const auto* column_desc = _schema->column(cid);
506
12.6k
        if (_is_pred_column[cid]) {
507
467
            auto storage_column_type = _storage_name_and_type[cid].second;
508
            // Char type is special , since char type's computational datatype is same with string,
509
            // both are DataTypeString, but DataTypeString only return FieldType::OLAP_FIELD_TYPE_STRING
510
            // in get_storage_field_type.
511
467
            RETURN_IF_CATCH_EXCEPTION(
512
                    // Here, cid will not go out of bounds
513
                    // because the size of _current_return_columns equals _schema->tablet_columns().size()
514
467
                    _current_return_columns[cid] = Schema::get_predicate_column_ptr(
515
467
                            _is_char_type[cid] ? FieldType::OLAP_FIELD_TYPE_CHAR
516
467
                                               : storage_column_type->get_storage_field_type(),
517
467
                            storage_column_type->is_nullable(), _opts.io_ctx.reader_type));
518
467
            _current_return_columns[cid]->set_rowset_segment_id(
519
467
                    {_segment->rowset_id(), _segment->id()});
520
467
            _current_return_columns[cid]->reserve(nrows_reserve_limit);
521
12.2k
        } else if (i >= block->columns()) {
522
            // This column needs to be scanned, but doesn't need to be returned upward. (delete sign)
523
            // if i >= block->columns means the column and not the pred_column means `column i` is
524
            // a delete condition column. but the column is not effective in the segment. so we just
525
            // create a column to hold the data.
526
            // a. origin data -> b. delete condition -> c. new load data
527
            // the segment of c do not effective delete condition, but it still need read the column
528
            // to match the schema.
529
            // TODO: skip read the not effective delete column to speed up segment read.
530
0
            _current_return_columns[cid] = Schema::get_data_type_ptr(*column_desc)->create_column();
531
0
            _current_return_columns[cid]->reserve(nrows_reserve_limit);
532
0
        }
533
12.6k
    }
534
535
    // Additional deleted filter condition will be materialized column be at the end of the block,
536
    // after _output_column_by_sel_idx  will be erase, we not need to filter it,
537
    // so erase it from _columns_to_filter in the first next_batch.
538
    // Eg:
539
    //      `delete from table where a = 10;`
540
    //      `select b from table;`
541
    // a column only effective in segment iterator, the block from query engine only contain the b column,
542
    // so no need to filter a column by expr.
543
5.63k
    for (auto it = _columns_to_filter.begin(); it != _columns_to_filter.end();) {
544
0
        if (*it >= block->columns()) {
545
0
            it = _columns_to_filter.erase(it);
546
0
        } else {
547
0
            ++it;
548
0
        }
549
0
    }
550
551
5.63k
    _lazy_inited = true;
552
553
5.63k
    _init_segment_prefetchers();
554
555
5.63k
    return Status::OK();
556
5.63k
}
557
558
5.63k
void SegmentIterator::_init_segment_prefetchers() {
559
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns);
560
5.63k
    if (!config::is_cloud_mode()) {
561
5.63k
        return;
562
5.63k
    }
563
0
    static std::vector<ReaderType> supported_reader_types {
564
0
            ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION,
565
0
            ReaderType::READER_CUMULATIVE_COMPACTION, ReaderType::READER_FULL_COMPACTION};
566
0
    if (std::ranges::none_of(supported_reader_types,
567
0
                             [&](ReaderType t) { return _opts.io_ctx.reader_type == t; })) {
568
0
        return;
569
0
    }
570
    // Initialize segment prefetcher for predicate and non-predicate columns
571
0
    bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY);
572
0
    bool enable_prefetch = is_query ? config::enable_query_segment_file_cache_prefetch
573
0
                                    : config::enable_compaction_segment_file_cache_prefetch;
574
0
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
575
0
            "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, enable_prefetch={}, "
576
0
            "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, rowset={}, "
577
0
            "segment={}, predicate_column_ids={}, common_expr_column_ids={}",
578
0
            is_query, enable_prefetch, _row_bitmap.isEmpty(), _row_bitmap.cardinality(),
579
0
            _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(),
580
0
            fmt::join(_predicate_column_ids, ","), fmt::join(_common_expr_column_ids, ","));
581
0
    if (enable_prefetch && !_row_bitmap.isEmpty()) {
582
0
        int window_size =
583
0
                1 + (is_query ? config::query_segment_file_cache_prefetch_block_size
584
0
                              : config::compaction_segment_file_cache_prefetch_block_size);
585
0
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
586
0
                "[verbose] SegmentIterator prefetch config: window_size={}", window_size);
587
0
        if (window_size > 0 &&
588
0
            !_column_iterators.empty()) { // ensure init_iterators has been called
589
0
            SegmentPrefetcherConfig prefetch_config(window_size,
590
0
                                                    config::file_cache_each_block_size);
591
0
            for (auto cid : _schema->column_ids()) {
592
0
                auto& column_iter = _column_iterators[cid];
593
0
                if (column_iter == nullptr) {
594
0
                    continue;
595
0
                }
596
0
                const auto* tablet_column = _schema->column(cid);
597
0
                SegmentPrefetchParams params {
598
0
                        .config = prefetch_config,
599
0
                        .read_options = _opts,
600
0
                };
601
0
                LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
602
0
                        "[verbose] SegmentIterator init_segment_prefetchers, "
603
0
                        "tablet={}, rowset={}, segment={}, column_id={}, col_name={}, type={}",
604
0
                        _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), cid,
605
0
                        tablet_column->name(), tablet_column->type());
606
0
                Status st = column_iter->init_prefetcher(params);
607
0
                if (!st.ok()) {
608
0
                    LOG_IF(WARNING, config::enable_segment_prefetch_verbose_log) << fmt::format(
609
0
                            "[verbose] failed to init prefetcher for column_id={}, "
610
0
                            "tablet={}, rowset={}, segment={}, error={}",
611
0
                            cid, _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(),
612
0
                            st.to_string());
613
0
                }
614
0
            }
615
616
            // for compaction, it's guaranteed that all rows are read, so we can prefetch all data blocks
617
0
            PrefetcherInitMethod init_method = (is_query && _row_bitmap.cardinality() < num_rows())
618
0
                                                       ? PrefetcherInitMethod::FROM_ROWIDS
619
0
                                                       : PrefetcherInitMethod::ALL_DATA_BLOCKS;
620
0
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>> prefetchers;
621
0
            for (const auto& column_iter : _column_iterators) {
622
0
                if (column_iter != nullptr) {
623
0
                    column_iter->collect_prefetchers(prefetchers, init_method);
624
0
                }
625
0
            }
626
0
            for (auto& [method, prefetcher_vec] : prefetchers) {
627
0
                if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) {
628
0
                    for (auto* prefetcher : prefetcher_vec) {
629
0
                        prefetcher->build_all_data_blocks();
630
0
                    }
631
0
                } else if (method == PrefetcherInitMethod::FROM_ROWIDS && !prefetcher_vec.empty()) {
632
0
                    SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap, prefetcher_vec);
633
0
                }
634
0
            }
635
0
        }
636
0
    }
637
0
}
638
639
5.63k
Status SegmentIterator::_get_row_ranges_by_keys() {
640
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns);
641
5.63k
    DorisMetrics::instance()->segment_row_total->increment(num_rows());
642
643
    // fast path for empty segment or empty key ranges
644
5.63k
    if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) {
645
5.63k
        return Status::OK();
646
5.63k
    }
647
648
    // Read & seek key columns is a waste of time when no key column in _schema
649
0
    if (std::none_of(
650
0
                _schema->columns().begin(), _schema->columns().end(), [&](const StorageField* col) {
651
0
                    return col && _opts.tablet_schema->column_by_uid(col->unique_id()).is_key();
652
0
                })) {
653
0
        return Status::OK();
654
0
    }
655
656
0
    RowRanges result_ranges;
657
0
    for (auto& key_range : _opts.key_ranges) {
658
0
        rowid_t lower_rowid = 0;
659
0
        rowid_t upper_rowid = num_rows();
660
0
        RETURN_IF_ERROR(_prepare_seek(key_range));
661
0
        if (key_range.upper_key != nullptr) {
662
            // If client want to read upper_bound, the include_upper is true. So we
663
            // should get the first ordinal at which key is larger than upper_bound.
664
            // So we call _lookup_ordinal with include_upper's negate
665
0
            RETURN_IF_ERROR(_lookup_ordinal(*key_range.upper_key, !key_range.include_upper,
666
0
                                            num_rows(), &upper_rowid));
667
0
        }
668
0
        if (upper_rowid > 0 && key_range.lower_key != nullptr) {
669
0
            RETURN_IF_ERROR(_lookup_ordinal(*key_range.lower_key, key_range.include_lower,
670
0
                                            upper_rowid, &lower_rowid));
671
0
        }
672
0
        auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
673
0
        RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
674
0
    }
675
0
    size_t pre_size = _row_bitmap.cardinality();
676
0
    _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges);
677
0
    _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality());
678
679
0
    return Status::OK();
680
0
}
681
682
// Set up environment for the following seek.
683
0
Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_range) {
684
0
    std::vector<const StorageField*> key_fields;
685
0
    std::set<uint32_t> column_set;
686
0
    if (key_range.lower_key != nullptr) {
687
0
        for (auto cid : key_range.lower_key->schema()->column_ids()) {
688
0
            column_set.emplace(cid);
689
0
            key_fields.emplace_back(key_range.lower_key->column_schema(cid));
690
0
        }
691
0
    }
692
0
    if (key_range.upper_key != nullptr) {
693
0
        for (auto cid : key_range.upper_key->schema()->column_ids()) {
694
0
            if (column_set.count(cid) == 0) {
695
0
                key_fields.emplace_back(key_range.upper_key->column_schema(cid));
696
0
                column_set.emplace(cid);
697
0
            }
698
0
        }
699
0
    }
700
0
    if (!_seek_schema) {
701
        // Schema constructors accept a vector of TabletColumnPtr. Convert
702
        // StorageField pointers to TabletColumnPtr by copying their descriptors.
703
0
        std::vector<TabletColumnPtr> cols;
704
0
        cols.reserve(key_fields.size());
705
0
        for (const StorageField* f : key_fields) {
706
0
            cols.emplace_back(std::make_shared<TabletColumn>(f->get_desc()));
707
0
        }
708
0
        _seek_schema = std::make_unique<Schema>(cols, cols.size());
709
0
    }
710
    // todo(wb) need refactor here, when using pk to search, _seek_block is useless
711
0
    if (_seek_block.size() == 0) {
712
0
        _seek_block.resize(_seek_schema->num_column_ids());
713
0
        int i = 0;
714
0
        for (auto cid : _seek_schema->column_ids()) {
715
0
            auto column_desc = _seek_schema->column(cid);
716
0
            _seek_block[i] = Schema::get_column_by_field(*column_desc);
717
0
            i++;
718
0
        }
719
0
    }
720
721
    // create used column iterator
722
0
    for (auto cid : _seek_schema->column_ids()) {
723
0
        if (_column_iterators[cid] == nullptr) {
724
            // TODO: Do we need this?
725
0
            if (_virtual_column_exprs.contains(cid)) {
726
0
                _column_iterators[cid] = std::make_unique<VirtualColumnIterator>();
727
0
                continue;
728
0
            }
729
730
0
            RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
731
0
                                                          &_column_iterators[cid], &_opts,
732
0
                                                          &_variant_sparse_column_cache));
733
0
            ColumnIteratorOptions iter_opts {
734
0
                    .use_page_cache = _opts.use_page_cache,
735
0
                    .file_reader = _file_reader.get(),
736
0
                    .stats = _opts.stats,
737
0
                    .io_ctx = _opts.io_ctx,
738
0
            };
739
0
            RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts));
740
0
        }
741
0
    }
742
743
0
    return Status::OK();
744
0
}
745
746
5.63k
Status SegmentIterator::_get_row_ranges_by_column_conditions() {
747
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_column_conditions_ns);
748
5.63k
    if (_row_bitmap.isEmpty()) {
749
0
        return Status::OK();
750
0
    }
751
752
5.63k
    {
753
5.63k
        if (_opts.runtime_state &&
754
5.63k
            _opts.runtime_state->query_options().enable_inverted_index_query &&
755
5.63k
            (has_index_in_iterators() || !_common_expr_ctxs_push_down.empty())) {
756
0
            SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
757
0
            size_t input_rows = _row_bitmap.cardinality();
758
            // Only apply column-level inverted index if we have iterators
759
0
            if (has_index_in_iterators()) {
760
0
                RETURN_IF_ERROR(_apply_inverted_index());
761
0
            }
762
            // Always apply expr-level index (e.g., search expressions) if we have common_expr_pushdown
763
            // This allows search expressions with variant subcolumns to be evaluated even when
764
            // the segment doesn't have all subcolumns
765
0
            RETURN_IF_ERROR(_apply_index_expr());
766
0
            for (auto it = _common_expr_ctxs_push_down.begin();
767
0
                 it != _common_expr_ctxs_push_down.end();) {
768
0
                if ((*it)->all_expr_inverted_index_evaluated()) {
769
0
                    const auto* result = (*it)->get_index_context()->get_index_result_for_expr(
770
0
                            (*it)->root().get());
771
0
                    if (result != nullptr) {
772
0
                        _row_bitmap &= *result->get_data_bitmap();
773
0
                        auto root = (*it)->root();
774
0
                        auto iter_find = std::find(_remaining_conjunct_roots.begin(),
775
0
                                                   _remaining_conjunct_roots.end(), root);
776
0
                        if (iter_find != _remaining_conjunct_roots.end()) {
777
0
                            _remaining_conjunct_roots.erase(iter_find);
778
0
                        }
779
0
                        it = _common_expr_ctxs_push_down.erase(it);
780
0
                    }
781
0
                } else {
782
0
                    ++it;
783
0
                }
784
0
            }
785
0
            _opts.condition_cache_digest =
786
0
                    _common_expr_ctxs_push_down.empty() ? 0 : _opts.condition_cache_digest;
787
0
            _opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality());
788
0
            for (auto cid : _schema->column_ids()) {
789
0
                bool result_true = _check_all_conditions_passed_inverted_index_for_column(cid);
790
0
                if (result_true) {
791
0
                    _need_read_data_indices[cid] = false;
792
0
                }
793
0
            }
794
0
        }
795
5.63k
    }
796
797
5.63k
    DBUG_EXECUTE_IF("segment_iterator.inverted_index.filtered_rows", {
798
5.63k
        LOG(INFO) << "Debug Point: segment_iterator.inverted_index.filtered_rows: "
799
5.63k
                  << _opts.stats->rows_inverted_index_filtered;
800
5.63k
        auto filtered_rows = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
801
5.63k
                "segment_iterator.inverted_index.filtered_rows", "filtered_rows", -1);
802
5.63k
        if (filtered_rows != _opts.stats->rows_inverted_index_filtered) {
803
5.63k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
804
5.63k
                    "filtered_rows: {} not equal to expected: {}",
805
5.63k
                    _opts.stats->rows_inverted_index_filtered, filtered_rows);
806
5.63k
        }
807
5.63k
    })
808
809
5.63k
    DBUG_EXECUTE_IF("segment_iterator.apply_inverted_index", {
810
5.63k
        LOG(INFO) << "Debug Point: segment_iterator.apply_inverted_index";
811
5.63k
        if (!_common_expr_ctxs_push_down.empty() || !_col_predicates.empty()) {
812
5.63k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
813
5.63k
                    "it is failed to apply inverted index, common_expr_ctxs_push_down: {}, "
814
5.63k
                    "col_predicates: {}",
815
5.63k
                    _common_expr_ctxs_push_down.size(), _col_predicates.size());
816
5.63k
        }
817
5.63k
    })
818
819
5.63k
    if (!_row_bitmap.isEmpty() &&
820
5.63k
        (!_opts.topn_filter_source_node_ids.empty() || !_opts.col_id_to_predicates.empty() ||
821
5.63k
         _opts.delete_condition_predicates->num_of_column_predicate() > 0)) {
822
467
        RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows());
823
467
        RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
824
467
        size_t pre_size = _row_bitmap.cardinality();
825
467
        _row_bitmap &= RowRanges::ranges_to_roaring(condition_row_ranges);
826
467
        _opts.stats->rows_conditions_filtered += (pre_size - _row_bitmap.cardinality());
827
467
    }
828
829
5.63k
    DBUG_EXECUTE_IF("bloom_filter_must_filter_data", {
830
5.63k
        if (_opts.stats->rows_bf_filtered == 0) {
831
5.63k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
832
5.63k
                    "Bloom filter did not filter the data.");
833
5.63k
        }
834
5.63k
    })
835
836
    // TODO(hkp): calculate filter rate to decide whether to
837
    // use zone map/bloom filter/secondary index or not.
838
5.63k
    return Status::OK();
839
5.63k
}
840
841
0
bool SegmentIterator::_column_has_ann_index(int32_t cid) {
842
0
    bool has_ann_index = _index_iterators[cid] != nullptr &&
843
0
                         _index_iterators[cid]->get_reader(AnnIndexReaderType::ANN);
844
845
0
    return has_ann_index;
846
0
}
847
848
5.63k
Status SegmentIterator::_apply_ann_topn_predicate() {
849
5.63k
    if (_ann_topn_runtime == nullptr) {
850
5.63k
        return Status::OK();
851
5.63k
    }
852
853
0
    VLOG_DEBUG << fmt::format("Try apply ann topn: {}", _ann_topn_runtime->debug_string());
854
0
    size_t src_col_idx = _ann_topn_runtime->get_src_column_idx();
855
0
    ColumnId src_cid = _schema->column_id(src_col_idx);
856
0
    IndexIterator* ann_index_iterator = _index_iterators[src_cid].get();
857
0
    bool has_ann_index = _column_has_ann_index(src_cid);
858
0
    bool has_common_expr_push_down = !_common_expr_ctxs_push_down.empty();
859
0
    bool has_column_predicate = std::any_of(_is_pred_column.begin(), _is_pred_column.end(),
860
0
                                            [](bool is_pred) { return is_pred; });
861
0
    if (!has_ann_index || has_common_expr_push_down || has_column_predicate) {
862
0
        VLOG_DEBUG << fmt::format(
863
0
                "Ann topn can not be evaluated by ann index, has_ann_index: {}, "
864
0
                "has_common_expr_push_down: {}, has_column_predicate: {}",
865
0
                has_ann_index, has_common_expr_push_down, has_column_predicate);
866
        // Disable index-only scan on ann indexed column.
867
0
        _need_read_data_indices[src_cid] = true;
868
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
869
0
        return Status::OK();
870
0
    }
871
872
    // Process asc & desc according to the type of metric
873
0
    auto index_reader = ann_index_iterator->get_reader(AnnIndexReaderType::ANN);
874
0
    auto ann_index_reader = dynamic_cast<AnnIndexReader*>(index_reader.get());
875
0
    DCHECK(ann_index_reader != nullptr);
876
0
    if (ann_index_reader->get_metric_type() == AnnIndexMetric::IP) {
877
0
        if (_ann_topn_runtime->is_asc()) {
878
0
            VLOG_DEBUG << fmt::format(
879
0
                    "Asc topn for inner product can not be evaluated by ann index");
880
            // Disable index-only scan on ann indexed column.
881
0
            _need_read_data_indices[src_cid] = true;
882
0
            _opts.stats->ann_fall_back_brute_force_cnt += 1;
883
0
            return Status::OK();
884
0
        }
885
0
    } else {
886
0
        if (!_ann_topn_runtime->is_asc()) {
887
0
            VLOG_DEBUG << fmt::format("Desc topn for l2/cosine can not be evaluated by ann index");
888
            // Disable index-only scan on ann indexed column.
889
0
            _need_read_data_indices[src_cid] = true;
890
0
            _opts.stats->ann_fall_back_brute_force_cnt += 1;
891
0
            return Status::OK();
892
0
        }
893
0
    }
894
895
0
    if (ann_index_reader->get_metric_type() != _ann_topn_runtime->get_metric_type()) {
896
0
        VLOG_DEBUG << fmt::format(
897
0
                "Ann topn metric type {} not match index metric type {}, can not be evaluated by "
898
0
                "ann index",
899
0
                metric_to_string(_ann_topn_runtime->get_metric_type()),
900
0
                metric_to_string(ann_index_reader->get_metric_type()));
901
        // Disable index-only scan on ann indexed column.
902
0
        _need_read_data_indices[src_cid] = true;
903
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
904
0
        return Status::OK();
905
0
    }
906
907
0
    size_t pre_size = _row_bitmap.cardinality();
908
0
    size_t rows_of_segment = _segment->num_rows();
909
0
    if (static_cast<double>(pre_size) < static_cast<double>(rows_of_segment) * 0.3) {
910
0
        VLOG_DEBUG << fmt::format(
911
0
                "Ann topn predicate input rows {} < 30% of segment rows {}, will not use ann index "
912
0
                "to "
913
0
                "filter",
914
0
                pre_size, rows_of_segment);
915
        // Disable index-only scan on ann indexed column.
916
0
        _need_read_data_indices[src_cid] = true;
917
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
918
0
        return Status::OK();
919
0
    }
920
0
    IColumn::MutablePtr result_column;
921
0
    std::unique_ptr<std::vector<uint64_t>> result_row_ids;
922
0
    segment_v2::AnnIndexStats ann_index_stats;
923
924
    // Try to load ANN index before search
925
0
    auto ann_index_iterator_casted =
926
0
            dynamic_cast<segment_v2::AnnIndexIterator*>(ann_index_iterator);
927
0
    if (ann_index_iterator_casted == nullptr) {
928
0
        VLOG_DEBUG << "Failed to cast index iterator to AnnIndexIterator, fallback to brute force";
929
0
        _need_read_data_indices[src_cid] = true;
930
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
931
0
        return Status::OK();
932
0
    }
933
934
    // Track load index timing
935
0
    {
936
0
        SCOPED_TIMER(&(ann_index_stats.load_index_costs_ns));
937
0
        if (!ann_index_iterator_casted->try_load_index()) {
938
0
            VLOG_DEBUG << "Failed to load ANN index, fallback to brute force search";
939
0
            _need_read_data_indices[src_cid] = true;
940
0
            _opts.stats->ann_fall_back_brute_force_cnt += 1;
941
0
            return Status::OK();
942
0
        }
943
0
        double load_costs_ms =
944
0
                static_cast<double>(ann_index_stats.load_index_costs_ns.value()) / 1000000.0;
945
0
        DorisMetrics::instance()->ann_index_load_costs_ms->increment(
946
0
                static_cast<int64_t>(load_costs_ms));
947
0
    }
948
949
0
    RETURN_IF_ERROR(_ann_topn_runtime->evaluate_vector_ann_search(
950
0
            ann_index_iterator_casted, &_row_bitmap, rows_of_segment, result_column, result_row_ids,
951
0
            ann_index_stats));
952
953
0
    VLOG_DEBUG << fmt::format("Ann topn filtered {} - {} = {} rows", pre_size,
954
0
                              _row_bitmap.cardinality(), pre_size - _row_bitmap.cardinality());
955
956
0
    int64_t rows_filterd = pre_size - _row_bitmap.cardinality();
957
0
    _opts.stats->rows_ann_index_topn_filtered += rows_filterd;
958
0
    _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value();
959
0
    _opts.stats->ann_topn_search_ns += ann_index_stats.search_costs_ns.value();
960
0
    _opts.stats->ann_ivf_on_disk_load_ns += ann_index_stats.ivf_on_disk_load_costs_ns.value();
961
0
    _opts.stats->ann_ivf_on_disk_cache_hit_cnt += ann_index_stats.ivf_on_disk_cache_hit_cnt.value();
962
0
    _opts.stats->ann_ivf_on_disk_cache_miss_cnt +=
963
0
            ann_index_stats.ivf_on_disk_cache_miss_cnt.value();
964
0
    _opts.stats->ann_index_topn_engine_search_ns += ann_index_stats.engine_search_ns.value();
965
0
    _opts.stats->ann_index_topn_result_process_ns +=
966
0
            ann_index_stats.result_process_costs_ns.value();
967
0
    _opts.stats->ann_index_topn_engine_convert_ns += ann_index_stats.engine_convert_ns.value();
968
0
    _opts.stats->ann_index_topn_engine_prepare_ns += ann_index_stats.engine_prepare_ns.value();
969
0
    _opts.stats->ann_index_topn_search_cnt += 1;
970
0
    const size_t dst_col_idx = _ann_topn_runtime->get_dest_column_idx();
971
0
    ColumnIterator* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get();
972
0
    DCHECK(column_iter != nullptr);
973
0
    VirtualColumnIterator* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter);
974
0
    DCHECK(virtual_column_iter != nullptr);
975
0
    VLOG_DEBUG << fmt::format(
976
0
            "Virtual column iterator, column_idx {}, is materialized with {} rows", dst_col_idx,
977
0
            result_row_ids->size());
978
    // reference count of result_column should be 1, so move will not issue any data copy.
979
0
    virtual_column_iter->prepare_materialization(std::move(result_column),
980
0
                                                 std::move(result_row_ids));
981
982
0
    _need_read_data_indices[src_cid] = false;
983
0
    VLOG_DEBUG << fmt::format(
984
0
            "Enable ANN index-only scan for src column cid {} (skip reading data pages)", src_cid);
985
986
0
    return Status::OK();
987
0
}
988
989
467
Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row_ranges) {
990
467
    std::set<int32_t> cids;
991
467
    for (auto& entry : _opts.col_id_to_predicates) {
992
0
        cids.insert(entry.first);
993
0
    }
994
995
467
    {
996
467
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_dict_ns);
997
        /// Low cardinality optimization is currently not very stable, so to prevent data corruption,
998
        /// we are temporarily disabling its use in data compaction.
999
        // TODO: enable it in not only ReaderTyper::READER_QUERY but also other reader types.
1000
467
        if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
1001
0
            RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
1002
0
            for (auto cid : cids) {
1003
0
                if (!_segment->can_apply_predicate_safely(
1004
0
                            cid, *_schema, _opts.target_cast_type_for_variants, _opts)) {
1005
0
                    continue;
1006
0
                }
1007
0
                DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
1008
0
                RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict(
1009
0
                        _opts.col_id_to_predicates.at(cid).get(), &dict_row_ranges));
1010
1011
0
                if (dict_row_ranges.is_empty()) {
1012
0
                    break;
1013
0
                }
1014
0
            }
1015
1016
0
            if (dict_row_ranges.is_empty()) {
1017
0
                RowRanges::ranges_intersection(*condition_row_ranges, dict_row_ranges,
1018
0
                                               condition_row_ranges);
1019
0
                _opts.stats->segment_dict_filtered++;
1020
0
                _opts.stats->filtered_segment_number++;
1021
0
                return Status::OK();
1022
0
            }
1023
0
        }
1024
467
    }
1025
1026
467
    size_t pre_size = 0;
1027
467
    {
1028
467
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_bf_ns);
1029
        // first filter data by bloom filter index
1030
        // bloom filter index only use CondColumn
1031
467
        RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
1032
467
        for (auto& cid : cids) {
1033
0
            DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
1034
0
            if (!_segment->can_apply_predicate_safely(cid, *_schema,
1035
0
                                                      _opts.target_cast_type_for_variants, _opts)) {
1036
0
                continue;
1037
0
            }
1038
            // get row ranges by bf index of this column,
1039
0
            RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows());
1040
0
            RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter(
1041
0
                    _opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges));
1042
0
            RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges);
1043
0
        }
1044
1045
467
        pre_size = condition_row_ranges->count();
1046
467
        RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges);
1047
467
        _opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count());
1048
467
    }
1049
1050
0
    {
1051
467
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_zonemap_ns);
1052
467
        RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
1053
        // second filter data by zone map
1054
467
        for (const auto& cid : cids) {
1055
0
            DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
1056
0
            if (!_segment->can_apply_predicate_safely(cid, *_schema,
1057
0
                                                      _opts.target_cast_type_for_variants, _opts)) {
1058
0
                continue;
1059
0
            }
1060
            // do not check zonemap if predicate does not support zonemap
1061
0
            if (!_opts.col_id_to_predicates.at(cid)->support_zonemap()) {
1062
0
                VLOG_DEBUG << "skip zonemap for column " << cid;
1063
0
                continue;
1064
0
            }
1065
            // get row ranges by zone map of this column,
1066
0
            RowRanges column_row_ranges = RowRanges::create_single(num_rows());
1067
0
            RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map(
1068
0
                    _opts.col_id_to_predicates.at(cid).get(),
1069
0
                    _opts.del_predicates_for_zone_map.count(cid) > 0
1070
0
                            ? &(_opts.del_predicates_for_zone_map.at(cid))
1071
0
                            : nullptr,
1072
0
                    &column_row_ranges));
1073
            // intersect different columns's row ranges to get final row ranges by zone map
1074
0
            RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges,
1075
0
                                           &zone_map_row_ranges);
1076
0
        }
1077
1078
467
        pre_size = condition_row_ranges->count();
1079
467
        RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
1080
467
                                       condition_row_ranges);
1081
1082
467
        size_t pre_size2 = condition_row_ranges->count();
1083
467
        RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
1084
467
                                       condition_row_ranges);
1085
467
        _opts.stats->rows_stats_rp_filtered += (pre_size2 - condition_row_ranges->count());
1086
467
        _opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count());
1087
467
    }
1088
1089
0
    return Status::OK();
1090
467
}
1091
1092
0
bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) {
1093
0
    switch (node_type) {
1094
0
    case TExprNodeType::BOOL_LITERAL:
1095
0
    case TExprNodeType::INT_LITERAL:
1096
0
    case TExprNodeType::LARGE_INT_LITERAL:
1097
0
    case TExprNodeType::FLOAT_LITERAL:
1098
0
    case TExprNodeType::DECIMAL_LITERAL:
1099
0
    case TExprNodeType::STRING_LITERAL:
1100
0
    case TExprNodeType::DATE_LITERAL:
1101
0
    case TExprNodeType::TIMEV2_LITERAL:
1102
0
        return true;
1103
0
    default:
1104
0
        return false;
1105
0
    }
1106
0
}
1107
1108
0
Status SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) {
1109
0
    auto& children = expr->children();
1110
0
    for (int i = 0; i < children.size(); ++i) {
1111
0
        RETURN_IF_ERROR(_extract_common_expr_columns(children[i]));
1112
0
    }
1113
1114
0
    auto node_type = expr->node_type();
1115
0
    if (node_type == TExprNodeType::SLOT_REF) {
1116
0
        auto slot_expr = std::dynamic_pointer_cast<doris::VSlotRef>(expr);
1117
0
        _is_common_expr_column[_schema->column_id(slot_expr->column_id())] = true;
1118
0
        _common_expr_columns.insert(_schema->column_id(slot_expr->column_id()));
1119
0
    } else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) {
1120
0
        std::shared_ptr<VirtualSlotRef> virtual_slot_ref =
1121
0
                std::dynamic_pointer_cast<VirtualSlotRef>(expr);
1122
0
        RETURN_IF_ERROR(_extract_common_expr_columns(virtual_slot_ref->get_virtual_column_expr()));
1123
0
    }
1124
1125
0
    return Status::OK();
1126
0
}
1127
1128
0
bool SegmentIterator::_check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred) {
1129
0
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) {
1130
0
        return false;
1131
0
    }
1132
0
    auto pred_column_id = pred->column_id();
1133
0
    if (_index_iterators[pred_column_id] == nullptr) {
1134
        //this column without inverted index
1135
0
        return false;
1136
0
    }
1137
1138
0
    if (_inverted_index_not_support_pred_type(pred->type())) {
1139
0
        return false;
1140
0
    }
1141
1142
0
    if (pred->type() == PredicateType::IN_LIST || pred->type() == PredicateType::NOT_IN_LIST) {
1143
        // in_list or not_in_list predicate produced by runtime filter
1144
0
        if (pred->is_runtime_filter()) {
1145
0
            return false;
1146
0
        }
1147
0
    }
1148
1149
    // UNTOKENIZED strings exceed ignore_above, they are written as null, causing range query errors
1150
0
    if (PredicateTypeTraits::is_range(pred->type()) &&
1151
0
        !IndexReaderHelper::has_bkd_index(_index_iterators[pred_column_id].get())) {
1152
0
        return false;
1153
0
    }
1154
1155
    // Function filter no apply inverted index
1156
0
    if (dynamic_cast<LikeColumnPredicate<TYPE_CHAR>*>(pred.get()) != nullptr ||
1157
0
        dynamic_cast<LikeColumnPredicate<TYPE_STRING>*>(pred.get()) != nullptr) {
1158
0
        return false;
1159
0
    }
1160
1161
0
    bool handle_by_fulltext = _column_has_fulltext_index(pred_column_id);
1162
0
    if (handle_by_fulltext) {
1163
        // when predicate is leafNode of andNode,
1164
        // can apply 'match query' and 'equal query' and 'list query' for fulltext index.
1165
0
        return pred->type() == PredicateType::MATCH || pred->type() == PredicateType::IS_NULL ||
1166
0
               pred->type() == PredicateType::IS_NOT_NULL ||
1167
0
               PredicateTypeTraits::is_equal_or_list(pred->type());
1168
0
    }
1169
1170
0
    return true;
1171
0
}
1172
1173
// TODO: optimization when all expr can not evaluate by inverted/ann index,
1174
0
Status SegmentIterator::_apply_index_expr() {
1175
0
    for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
1176
0
        if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) {
1177
0
            if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) {
1178
0
                continue;
1179
0
            } else {
1180
                // other code is not to be handled, we should just break
1181
0
                LOG(WARNING) << "failed to evaluate inverted index for expr_ctx: "
1182
0
                             << expr_ctx->root()->debug_string()
1183
0
                             << ", error msg: " << st.to_string();
1184
0
                return st;
1185
0
            }
1186
0
        }
1187
0
    }
1188
1189
    // Evaluate inverted index for virtual column MATCH expressions (projections).
1190
    // Unlike common exprs which filter rows, these only compute index result bitmaps
1191
    // for later materialization via fast_execute().
1192
0
    for (auto& [cid, expr_ctx] : _virtual_column_exprs) {
1193
0
        if (expr_ctx->get_index_context() == nullptr) {
1194
0
            continue;
1195
0
        }
1196
0
        if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) {
1197
0
            if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) {
1198
0
                continue;
1199
0
            } else {
1200
0
                LOG(WARNING) << "failed to evaluate inverted index for virtual column expr: "
1201
0
                             << expr_ctx->root()->debug_string()
1202
0
                             << ", error msg: " << st.to_string();
1203
0
                return st;
1204
0
            }
1205
0
        }
1206
0
    }
1207
1208
    // Apply ann range search
1209
0
    for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
1210
0
        segment_v2::AnnIndexStats ann_index_stats;
1211
0
        size_t origin_rows = _row_bitmap.cardinality();
1212
0
        RETURN_IF_ERROR(expr_ctx->evaluate_ann_range_search(
1213
0
                _index_iterators, _schema->column_ids(), _column_iterators,
1214
0
                _common_expr_to_slotref_map, _row_bitmap, ann_index_stats));
1215
0
        _opts.stats->rows_ann_index_range_filtered += (origin_rows - _row_bitmap.cardinality());
1216
0
        _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value();
1217
0
        _opts.stats->ann_index_range_search_ns += ann_index_stats.search_costs_ns.value();
1218
0
        _opts.stats->ann_ivf_on_disk_load_ns += ann_index_stats.ivf_on_disk_load_costs_ns.value();
1219
0
        _opts.stats->ann_ivf_on_disk_cache_hit_cnt +=
1220
0
                ann_index_stats.ivf_on_disk_cache_hit_cnt.value();
1221
0
        _opts.stats->ann_ivf_on_disk_cache_miss_cnt +=
1222
0
                ann_index_stats.ivf_on_disk_cache_miss_cnt.value();
1223
0
        _opts.stats->ann_range_engine_search_ns += ann_index_stats.engine_search_ns.value();
1224
0
        _opts.stats->ann_range_result_convert_ns += ann_index_stats.result_process_costs_ns.value();
1225
0
        _opts.stats->ann_range_engine_convert_ns += ann_index_stats.engine_convert_ns.value();
1226
0
        _opts.stats->ann_range_pre_process_ns += ann_index_stats.engine_prepare_ns.value();
1227
0
        _opts.stats->ann_fall_back_brute_force_cnt += ann_index_stats.fall_back_brute_force_cnt;
1228
0
    }
1229
1230
0
    for (auto it = _common_expr_ctxs_push_down.begin(); it != _common_expr_ctxs_push_down.end();) {
1231
0
        if ((*it)->root()->ann_range_search_executedd()) {
1232
0
            _opts.stats->ann_index_range_search_cnt++;
1233
0
            it = _common_expr_ctxs_push_down.erase(it);
1234
0
        } else {
1235
0
            ++it;
1236
0
        }
1237
0
    }
1238
    // TODO:Do we need to remove these expr root from _remaining_conjunct_roots?
1239
1240
0
    return Status::OK();
1241
0
}
1242
1243
0
bool SegmentIterator::_downgrade_without_index(Status res, bool need_remaining) {
1244
0
    bool is_fallback =
1245
0
            _opts.runtime_state->query_options().enable_fallback_on_missing_inverted_index;
1246
0
    if ((res.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND && is_fallback) ||
1247
0
        res.code() == ErrorCode::INVERTED_INDEX_BYPASS ||
1248
0
        res.code() == ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED ||
1249
0
        (res.code() == ErrorCode::INVERTED_INDEX_NO_TERMS && need_remaining) ||
1250
0
        res.code() == ErrorCode::INVERTED_INDEX_FILE_CORRUPTED) {
1251
        // 1. INVERTED_INDEX_FILE_NOT_FOUND means index file has not been built,
1252
        //    usually occurs when creating a new index, queries can be downgraded
1253
        //    without index.
1254
        // 2. INVERTED_INDEX_BYPASS means the hit of condition by index
1255
        //    has reached the optimal limit, downgrade without index query can
1256
        //    improve query performance.
1257
        // 3. INVERTED_INDEX_EVALUATE_SKIPPED means the inverted index is not
1258
        //    suitable for executing this predicate, skipped it and filter data
1259
        //    by function later.
1260
        // 4. INVERTED_INDEX_NO_TERMS means the column has fulltext index,
1261
        //    but the column condition value no terms in specified parser,
1262
        //    such as: where A = '' and B = ','
1263
        //    the predicate of A and B need downgrade without index query.
1264
        // 5. INVERTED_INDEX_FILE_CORRUPTED means the index file is corrupted,
1265
        //    such as when index segment files are not generated
1266
        // above case can downgrade without index query
1267
0
        _opts.stats->inverted_index_downgrade_count++;
1268
0
        if (!res.is<ErrorCode::INVERTED_INDEX_BYPASS>()) {
1269
0
            LOG(INFO) << "will downgrade without index to evaluate predicate, because of res: "
1270
0
                      << res;
1271
0
        } else {
1272
0
            VLOG_DEBUG << "will downgrade without index to evaluate predicate, because of res: "
1273
0
                       << res;
1274
0
        }
1275
0
        return true;
1276
0
    }
1277
0
    return false;
1278
0
}
1279
1280
0
bool SegmentIterator::_column_has_fulltext_index(int32_t cid) {
1281
0
    bool has_fulltext_index =
1282
0
            _index_iterators[cid] != nullptr &&
1283
0
            _index_iterators[cid]->get_reader(InvertedIndexReaderType::FULLTEXT) &&
1284
0
            _index_iterators[cid]->get_reader(InvertedIndexReaderType::STRING_TYPE) == nullptr;
1285
1286
0
    return has_fulltext_index;
1287
0
}
1288
1289
0
inline bool SegmentIterator::_inverted_index_not_support_pred_type(const PredicateType& type) {
1290
0
    return type == PredicateType::BF || type == PredicateType::BITMAP_FILTER;
1291
0
}
1292
1293
Status SegmentIterator::_apply_inverted_index_on_column_predicate(
1294
        std::shared_ptr<ColumnPredicate> pred,
1295
0
        std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates, bool* continue_apply) {
1296
0
    if (!_check_apply_by_inverted_index(pred)) {
1297
0
        remaining_predicates.emplace_back(pred);
1298
0
    } else {
1299
0
        bool need_remaining_after_evaluate = _column_has_fulltext_index(pred->column_id()) &&
1300
0
                                             PredicateTypeTraits::is_equal_or_list(pred->type());
1301
0
        Status res =
1302
0
                pred->evaluate(_storage_name_and_type[pred->column_id()],
1303
0
                               _index_iterators[pred->column_id()].get(), num_rows(), &_row_bitmap);
1304
0
        if (!res.ok()) {
1305
0
            if (_downgrade_without_index(res, need_remaining_after_evaluate)) {
1306
0
                remaining_predicates.emplace_back(pred);
1307
0
                return Status::OK();
1308
0
            }
1309
0
            LOG(WARNING) << "failed to evaluate index"
1310
0
                         << ", column predicate type: " << pred->pred_type_string(pred->type())
1311
0
                         << ", error msg: " << res;
1312
0
            return res;
1313
0
        }
1314
1315
0
        if (_row_bitmap.isEmpty()) {
1316
            // all rows have been pruned, no need to process further predicates
1317
0
            *continue_apply = false;
1318
0
        }
1319
1320
0
        if (need_remaining_after_evaluate) {
1321
0
            remaining_predicates.emplace_back(pred);
1322
0
            return Status::OK();
1323
0
        }
1324
0
        if (!pred->is_runtime_filter()) {
1325
0
            _column_predicate_index_exec_status[pred->column_id()][pred] = true;
1326
0
        }
1327
0
    }
1328
0
    return Status::OK();
1329
0
}
1330
1331
43.4k
bool SegmentIterator::_need_read_data(ColumnId cid) {
1332
43.4k
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
1333
0
        return true;
1334
0
    }
1335
    // only support DUP_KEYS and UNIQUE_KEYS with MOW
1336
43.4k
    if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
1337
43.4k
           (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
1338
16.3k
            _opts.enable_unique_key_merge_on_write)))) {
1339
12.0k
        return true;
1340
12.0k
    }
1341
    // this is a virtual column, we always need to read data
1342
31.3k
    if (this->_vir_cid_to_idx_in_block.contains(cid)) {
1343
0
        return true;
1344
0
    }
1345
1346
    // if there is a delete predicate, we always need to read data
1347
31.3k
    if (_has_delete_predicate(cid)) {
1348
4.03k
        return true;
1349
4.03k
    }
1350
27.2k
    if (_output_columns.count(-1)) {
1351
        // if _output_columns contains -1, it means that the light
1352
        // weight schema change may not be enabled or other reasons
1353
        // caused the column unique_id not be set, to prevent errors
1354
        // occurring, return true here that column data needs to be read
1355
0
        return true;
1356
0
    }
1357
    // Check the following conditions:
1358
    // 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]')
1359
    //    and it's not marked for projection in '_output_columns'.
1360
    // 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns',
1361
    //    and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function.
1362
    // If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column.
1363
    // Then, return false.
1364
27.2k
    const auto& column = _opts.tablet_schema->column(cid);
1365
    // Different subcolumns may share the same parent_unique_id, so we choose to abandon this optimization.
1366
27.2k
    if (column.is_extracted_column() &&
1367
27.2k
        _opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
1368
1
        return true;
1369
1
    }
1370
27.2k
    int32_t unique_id = column.unique_id();
1371
27.2k
    if (unique_id < 0) {
1372
1
        unique_id = column.parent_unique_id();
1373
1
    }
1374
27.2k
    if ((_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] &&
1375
27.2k
         !_output_columns.contains(unique_id)) ||
1376
27.2k
        (_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] &&
1377
27.2k
         _output_columns.count(unique_id) == 1 &&
1378
27.2k
         _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) {
1379
1
        VLOG_DEBUG << "SegmentIterator no need read data for column: "
1380
0
                   << _opts.tablet_schema->column_by_uid(unique_id).name();
1381
1
        return false;
1382
1
    }
1383
27.2k
    return true;
1384
27.2k
}
1385
1386
0
Status SegmentIterator::_apply_inverted_index() {
1387
0
    std::vector<std::shared_ptr<ColumnPredicate>> remaining_predicates;
1388
0
    std::set<std::shared_ptr<ColumnPredicate>> no_need_to_pass_column_predicate_set;
1389
1390
0
    for (auto pred : _col_predicates) {
1391
0
        if (no_need_to_pass_column_predicate_set.count(pred) > 0) {
1392
0
            continue;
1393
0
        } else {
1394
0
            bool continue_apply = true;
1395
0
            RETURN_IF_ERROR(_apply_inverted_index_on_column_predicate(pred, remaining_predicates,
1396
0
                                                                      &continue_apply));
1397
0
            if (!continue_apply) {
1398
0
                break;
1399
0
            }
1400
0
        }
1401
0
    }
1402
1403
0
    _col_predicates = std::move(remaining_predicates);
1404
0
    return Status::OK();
1405
0
}
1406
1407
/**
1408
 * @brief Checks if all conditions related to a specific column have passed in both
1409
 * `_column_predicate_inverted_index_status` and `_common_expr_inverted_index_status`.
1410
 *
1411
 * This function first checks the conditions in `_column_predicate_inverted_index_status`
1412
 * for the given `ColumnId`. If all conditions pass, it sets `default_return` to `true`.
1413
 * It then checks the conditions in `_common_expr_inverted_index_status` for the same column.
1414
 *
1415
 * The function returns `true` if all conditions in both maps pass. If any condition fails
1416
 * in either map, the function immediately returns `false`. If the column does not exist
1417
 * in one of the maps, the function returns `default_return`.
1418
 *
1419
 * @param cid The ColumnId of the column to check.
1420
 * @param default_return The default value to return if the column is not found in the status maps.
1421
 * @return true if all conditions in both status maps pass, or if the column is not found
1422
 *         and `default_return` is true.
1423
 * @return false if any condition in either status map fails, or if the column is not found
1424
 *         and `default_return` is false.
1425
 */
1426
bool SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(ColumnId cid,
1427
0
                                                                             bool default_return) {
1428
    // If common_expr_pushdown is disabled, we cannot guarantee that all conditions are processed by the inverted index.
1429
    // Consider a scenario where there is a column predicate and an expression involving the same column in the SQL query,
1430
    // such as 'a < 0' and 'abs(a) > 1'. This could potentially lead to errors.
1431
0
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_common_expr_pushdown) {
1432
0
        return false;
1433
0
    }
1434
0
    auto pred_it = _column_predicate_index_exec_status.find(cid);
1435
0
    if (pred_it != _column_predicate_index_exec_status.end()) {
1436
0
        const auto& pred_map = pred_it->second;
1437
0
        bool pred_passed = std::all_of(pred_map.begin(), pred_map.end(),
1438
0
                                       [](const auto& pred_entry) { return pred_entry.second; });
1439
0
        if (!pred_passed) {
1440
0
            return false;
1441
0
        } else {
1442
0
            default_return = true;
1443
0
        }
1444
0
    }
1445
1446
0
    auto expr_it = _common_expr_index_exec_status.find(cid);
1447
0
    if (expr_it != _common_expr_index_exec_status.end()) {
1448
0
        const auto& expr_map = expr_it->second;
1449
0
        return std::all_of(expr_map.begin(), expr_map.end(),
1450
0
                           [](const auto& expr_entry) { return expr_entry.second; });
1451
0
    }
1452
0
    return default_return;
1453
0
}
1454
1455
5.63k
Status SegmentIterator::_init_return_column_iterators() {
1456
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_return_column_iterators_timer_ns);
1457
5.63k
    if (_cur_rowid >= num_rows()) {
1458
0
        return Status::OK();
1459
0
    }
1460
1461
12.6k
    for (auto cid : _schema->column_ids()) {
1462
12.6k
        if (_schema->column(cid)->name() == BeConsts::ROWID_COL) {
1463
0
            _column_iterators[cid].reset(
1464
0
                    new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id()));
1465
0
            continue;
1466
0
        }
1467
1468
12.6k
        if (_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1469
0
            auto& id_file_map = _opts.runtime_state->get_id_file_map();
1470
0
            uint32_t file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>(
1471
0
                    _opts.tablet_id, _opts.rowset_id, _segment->id()));
1472
0
            _column_iterators[cid].reset(new RowIdColumnIteratorV2(
1473
0
                    IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id));
1474
0
            continue;
1475
0
        }
1476
1477
12.6k
        if (_schema->column(cid)->name().starts_with(BeConsts::VIRTUAL_COLUMN_PREFIX)) {
1478
0
            _column_iterators[cid] = std::make_unique<VirtualColumnIterator>();
1479
0
            continue;
1480
0
        }
1481
1482
12.6k
        std::set<ColumnId> del_cond_id_set;
1483
12.6k
        _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
1484
12.6k
        std::vector<bool> tmp_is_pred_column;
1485
12.6k
        tmp_is_pred_column.resize(_schema->columns().size(), false);
1486
12.6k
        for (auto predicate : _col_predicates) {
1487
0
            auto p_cid = predicate->column_id();
1488
0
            tmp_is_pred_column[p_cid] = true;
1489
0
        }
1490
        // handle delete_condition
1491
12.6k
        for (auto d_cid : del_cond_id_set) {
1492
1.32k
            tmp_is_pred_column[d_cid] = true;
1493
1.32k
        }
1494
1495
12.6k
        if (_column_iterators[cid] == nullptr) {
1496
12.6k
            RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
1497
12.6k
                                                          &_column_iterators[cid], &_opts,
1498
12.6k
                                                          &_variant_sparse_column_cache));
1499
12.6k
            ColumnIteratorOptions iter_opts {
1500
12.6k
                    .use_page_cache = _opts.use_page_cache,
1501
                    // If the col is predicate column, then should read the last page to check
1502
                    // if the column is full dict encoding
1503
12.6k
                    .is_predicate_column = tmp_is_pred_column[cid],
1504
12.6k
                    .file_reader = _file_reader.get(),
1505
12.6k
                    .stats = _opts.stats,
1506
12.6k
                    .io_ctx = _opts.io_ctx,
1507
12.6k
            };
1508
12.6k
            RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts));
1509
12.6k
        }
1510
12.6k
    }
1511
1512
5.63k
#ifndef NDEBUG
1513
5.63k
    for (auto pair : _vir_cid_to_idx_in_block) {
1514
0
        ColumnId vir_col_cid = pair.first;
1515
0
        DCHECK(_column_iterators[vir_col_cid] != nullptr)
1516
0
                << "Virtual column iterator for " << vir_col_cid << " should not be null";
1517
0
        ColumnIterator* column_iter = _column_iterators[vir_col_cid].get();
1518
0
        DCHECK(dynamic_cast<VirtualColumnIterator*>(column_iter) != nullptr)
1519
0
                << "Virtual column iterator for " << vir_col_cid
1520
0
                << " should be VirtualColumnIterator";
1521
0
    }
1522
5.63k
#endif
1523
5.63k
    return Status::OK();
1524
5.63k
}
1525
1526
5.63k
Status SegmentIterator::_init_index_iterators() {
1527
5.63k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_index_iterators_timer_ns);
1528
5.63k
    if (_cur_rowid >= num_rows()) {
1529
0
        return Status::OK();
1530
0
    }
1531
1532
5.63k
    _index_query_context = std::make_shared<IndexQueryContext>();
1533
5.63k
    _index_query_context->io_ctx = &_opts.io_ctx;
1534
5.63k
    _index_query_context->stats = _opts.stats;
1535
5.63k
    _index_query_context->runtime_state = _opts.runtime_state;
1536
1537
5.63k
    if (_score_runtime) {
1538
0
        _index_query_context->collection_statistics = _opts.collection_statistics;
1539
0
        _index_query_context->collection_similarity = std::make_shared<CollectionSimilarity>();
1540
0
        _index_query_context->query_limit = _score_runtime->get_limit();
1541
0
        _index_query_context->is_asc = _score_runtime->is_asc();
1542
0
    }
1543
1544
    // Inverted index iterators
1545
12.6k
    for (auto cid : _schema->column_ids()) {
1546
        // Use segment’s own index_meta, for compatibility with future indexing needs to default to lowercase.
1547
12.6k
        if (_index_iterators[cid] == nullptr) {
1548
            // In the _opts.tablet_schema, the sub-column type information for the variant is FieldType::OLAP_FIELD_TYPE_VARIANT.
1549
            // This is because the sub-column is created in create_materialized_variant_column.
1550
            // We use this column to locate the metadata for the inverted index, which requires a unique_id and path.
1551
12.6k
            const auto& column = _opts.tablet_schema->column(cid);
1552
12.6k
            std::vector<const TabletIndex*> inverted_indexs;
1553
            // Keep shared_ptr alive to prevent use-after-free when accessing raw pointers
1554
12.6k
            TabletIndexes inverted_indexs_holder;
1555
            // If the column is an extracted column, we need to find the sub-column in the parent column reader.
1556
12.6k
            std::shared_ptr<ColumnReader> column_reader;
1557
12.6k
            if (column.is_extracted_column()) {
1558
0
                if (!_segment->_column_reader_cache->get_column_reader(
1559
0
                            column.parent_unique_id(), &column_reader, _opts.stats) ||
1560
0
                    column_reader == nullptr) {
1561
0
                    continue;
1562
0
                }
1563
0
                auto* variant_reader = assert_cast<VariantColumnReader*>(column_reader.get());
1564
0
                DataTypePtr data_type = _storage_name_and_type[cid].second;
1565
0
                if (data_type != nullptr &&
1566
0
                    data_type->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
1567
0
                    DataTypePtr inferred_type;
1568
0
                    Status st = variant_reader->infer_data_type_for_path(
1569
0
                            &inferred_type, column, _opts, _segment->_column_reader_cache.get());
1570
0
                    if (st.ok() && inferred_type != nullptr) {
1571
0
                        data_type = inferred_type;
1572
0
                    }
1573
0
                }
1574
0
                inverted_indexs_holder =
1575
0
                        variant_reader->find_subcolumn_tablet_indexes(column, data_type);
1576
                // Extract raw pointers from shared_ptr for iteration
1577
0
                for (const auto& index_ptr : inverted_indexs_holder) {
1578
0
                    inverted_indexs.push_back(index_ptr.get());
1579
0
                }
1580
0
            }
1581
            // If the column is not an extracted column, we can directly get the inverted index metadata from the tablet schema.
1582
12.6k
            else {
1583
12.6k
                inverted_indexs = _segment->_tablet_schema->inverted_indexs(column);
1584
12.6k
            }
1585
12.6k
            for (const auto& inverted_index : inverted_indexs) {
1586
2.56k
                RETURN_IF_ERROR(_segment->new_index_iterator(column, inverted_index, _opts,
1587
2.56k
                                                             &_index_iterators[cid]));
1588
2.56k
            }
1589
12.6k
            if (_index_iterators[cid] != nullptr) {
1590
2.55k
                _index_iterators[cid]->set_context(_index_query_context);
1591
2.55k
            }
1592
12.6k
        }
1593
12.6k
    }
1594
1595
    // Ann index iterators
1596
12.6k
    for (auto cid : _schema->column_ids()) {
1597
12.6k
        if (_index_iterators[cid] == nullptr) {
1598
10.1k
            const auto& column = _opts.tablet_schema->column(cid);
1599
10.1k
            const auto* index_meta = _segment->_tablet_schema->ann_index(column);
1600
10.1k
            if (index_meta) {
1601
1
                RETURN_IF_ERROR(_segment->new_index_iterator(column, index_meta, _opts,
1602
1
                                                             &_index_iterators[cid]));
1603
1604
1
                if (_index_iterators[cid] != nullptr) {
1605
1
                    _index_iterators[cid]->set_context(_index_query_context);
1606
1
                }
1607
1
            }
1608
10.1k
        }
1609
12.6k
    }
1610
1611
5.63k
    return Status::OK();
1612
5.63k
}
1613
1614
Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound,
1615
0
                                        rowid_t* rowid) {
1616
0
    if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS &&
1617
0
        _segment->get_primary_key_index() != nullptr) {
1618
0
        return _lookup_ordinal_from_pk_index(key, is_include, rowid);
1619
0
    }
1620
0
    return _lookup_ordinal_from_sk_index(key, is_include, upper_bound, rowid);
1621
0
}
1622
1623
// look up one key to get its ordinal at which can get data by using short key index.
1624
// 'upper_bound' is defined the max ordinal the function will search.
1625
// We use upper_bound to reduce search times.
1626
// If we find a valid ordinal, it will be set in rowid and with Status::OK()
1627
// If we can not find a valid key in this segment, we will set rowid to upper_bound
1628
// Otherwise return error.
1629
// 1. get [start, end) ordinal through short key index
1630
// 2. binary search to find exact ordinal that match the input condition
1631
// Make is_include template to reduce branch
1632
Status SegmentIterator::_lookup_ordinal_from_sk_index(const RowCursor& key, bool is_include,
1633
0
                                                      rowid_t upper_bound, rowid_t* rowid) {
1634
0
    const ShortKeyIndexDecoder* sk_index_decoder = _segment->get_short_key_index();
1635
0
    DCHECK(sk_index_decoder != nullptr);
1636
1637
0
    std::string index_key;
1638
0
    key.encode_key_with_padding(&index_key, _segment->_tablet_schema->num_short_key_columns(),
1639
0
                                is_include);
1640
1641
0
    const auto& key_col_ids = key.schema()->column_ids();
1642
1643
    // Clone the key once and pad CHAR fields to storage format before the binary search.
1644
    // _seek_block holds storage-format data where CHAR is zero-padded to column length,
1645
    // while RowCursor holds CHAR in compute format (unpadded). Padding once here avoids
1646
    // repeated allocation inside the comparison loop.
1647
0
    RowCursor padded_key = key.clone();
1648
0
    padded_key.pad_char_fields();
1649
1650
0
    ssize_t start_block_id = 0;
1651
0
    auto start_iter = sk_index_decoder->lower_bound(index_key);
1652
0
    if (start_iter.valid()) {
1653
        // Because previous block may contain this key, so we should set rowid to
1654
        // last block's first row.
1655
0
        start_block_id = start_iter.ordinal();
1656
0
        if (start_block_id > 0) {
1657
0
            start_block_id--;
1658
0
        }
1659
0
    } else {
1660
        // When we don't find a valid index item, which means all short key is
1661
        // smaller than input key, this means that this key may exist in the last
1662
        // row block. so we set the rowid to first row of last row block.
1663
0
        start_block_id = sk_index_decoder->num_items() - 1;
1664
0
    }
1665
0
    rowid_t start = cast_set<rowid_t>(start_block_id) * sk_index_decoder->num_rows_per_block();
1666
1667
0
    rowid_t end = upper_bound;
1668
0
    auto end_iter = sk_index_decoder->upper_bound(index_key);
1669
0
    if (end_iter.valid()) {
1670
0
        end = cast_set<rowid_t>(end_iter.ordinal()) * sk_index_decoder->num_rows_per_block();
1671
0
    }
1672
1673
    // binary search to find the exact key
1674
0
    while (start < end) {
1675
0
        rowid_t mid = (start + end) / 2;
1676
0
        RETURN_IF_ERROR(_seek_and_peek(mid));
1677
0
        int cmp = _compare_short_key_with_seek_block(padded_key, key_col_ids);
1678
0
        if (cmp > 0) {
1679
0
            start = mid + 1;
1680
0
        } else if (cmp == 0) {
1681
0
            if (is_include) {
1682
                // lower bound
1683
0
                end = mid;
1684
0
            } else {
1685
                // upper bound
1686
0
                start = mid + 1;
1687
0
            }
1688
0
        } else {
1689
0
            end = mid;
1690
0
        }
1691
0
    }
1692
1693
0
    *rowid = start;
1694
0
    return Status::OK();
1695
0
}
1696
1697
Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool is_include,
1698
0
                                                      rowid_t* rowid) {
1699
0
    DCHECK(_segment->_tablet_schema->keys_type() == UNIQUE_KEYS);
1700
0
    const PrimaryKeyIndexReader* pk_index_reader = _segment->get_primary_key_index();
1701
0
    DCHECK(pk_index_reader != nullptr);
1702
1703
0
    std::string index_key;
1704
0
    key.encode_key_with_padding<true>(&index_key, _segment->_tablet_schema->num_key_columns(),
1705
0
                                      is_include);
1706
0
    if (index_key < _segment->min_key()) {
1707
0
        *rowid = 0;
1708
0
        return Status::OK();
1709
0
    } else if (index_key > _segment->max_key()) {
1710
0
        *rowid = num_rows();
1711
0
        return Status::OK();
1712
0
    }
1713
0
    bool exact_match = false;
1714
1715
0
    std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
1716
0
    RETURN_IF_ERROR(pk_index_reader->new_iterator(&index_iterator, _opts.stats));
1717
1718
0
    Status status = index_iterator->seek_at_or_after(&index_key, &exact_match);
1719
0
    if (UNLIKELY(!status.ok())) {
1720
0
        *rowid = num_rows();
1721
0
        if (status.is<ENTRY_NOT_FOUND>()) {
1722
0
            return Status::OK();
1723
0
        }
1724
0
        return status;
1725
0
    }
1726
0
    *rowid = cast_set<rowid_t>(index_iterator->get_current_ordinal());
1727
1728
    // The sequence column needs to be removed from primary key index when comparing key
1729
0
    bool has_seq_col = _segment->_tablet_schema->has_sequence_col();
1730
    // Used to get key range from primary key index,
1731
    // for mow with cluster key table, we should get key range from short key index.
1732
0
    DCHECK(_segment->_tablet_schema->cluster_key_uids().empty());
1733
1734
    // if full key is exact_match, the primary key without sequence column should also the same
1735
0
    if (has_seq_col && !exact_match) {
1736
0
        size_t seq_col_length =
1737
0
                _segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx())
1738
0
                        .length() +
1739
0
                1;
1740
0
        auto index_type = DataTypeFactory::instance().create_data_type(
1741
0
                _segment->_pk_index_reader->type_info()->type(), 1, 0);
1742
0
        auto index_column = index_type->create_column();
1743
0
        size_t num_to_read = 1;
1744
0
        size_t num_read = num_to_read;
1745
0
        RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
1746
0
        DCHECK(num_to_read == num_read);
1747
1748
0
        Slice sought_key =
1749
0
                Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
1750
0
        Slice sought_key_without_seq =
1751
0
                Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length);
1752
1753
        // compare key
1754
0
        if (Slice(index_key).compare(sought_key_without_seq) == 0) {
1755
0
            exact_match = true;
1756
0
        }
1757
0
    }
1758
1759
    // find the key in primary key index, and the is_include is false, so move
1760
    // to the next row.
1761
0
    if (exact_match && !is_include) {
1762
0
        *rowid += 1;
1763
0
    }
1764
0
    return Status::OK();
1765
0
}
1766
1767
// seek to the row and load that row to _key_cursor
1768
0
Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
1769
0
    {
1770
0
        _opts.stats->block_init_seek_num += 1;
1771
0
        SCOPED_RAW_TIMER(&_opts.stats->block_init_seek_ns);
1772
0
        RETURN_IF_ERROR(_seek_columns(_seek_schema->column_ids(), rowid));
1773
0
    }
1774
0
    size_t num_rows = 1;
1775
1776
    //note(wb) reset _seek_block for memory reuse
1777
    // it is easier to use row based memory layout for clear memory
1778
0
    for (int i = 0; i < _seek_block.size(); i++) {
1779
0
        _seek_block[i]->clear();
1780
0
    }
1781
0
    RETURN_IF_ERROR(_read_columns(_seek_schema->column_ids(), _seek_block, num_rows));
1782
0
    return Status::OK();
1783
0
}
1784
1785
0
Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) {
1786
0
    for (auto cid : column_ids) {
1787
0
        if (!_need_read_data(cid)) {
1788
0
            continue;
1789
0
        }
1790
0
        RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(pos));
1791
0
    }
1792
0
    return Status::OK();
1793
0
}
1794
1795
/* ---------------------- for vectorization implementation  ---------------------- */
1796
1797
/**
1798
 *  For storage layer data type, can be measured from two perspectives:
1799
 *  1 Whether the type can be read in a fast way(batch read using SIMD)
1800
 *    Such as integer type and float type, this type can be read in SIMD way.
1801
 *    For the type string/bitmap/hll, they can not be read in batch way, so read this type data is slow.
1802
 *   If a type can be read fast, we can try to eliminate Lazy Materialization, because we think for this type, seek cost > read cost.
1803
 *   This is an estimate, if we want more precise cost, statistics collection is necessary(this is a todo).
1804
 *   In short, when returned non-pred columns contains string/hll/bitmap, we using Lazy Materialization.
1805
 *   Otherwise, we disable it.
1806
 *
1807
 *   When Lazy Materialization enable, we need to read column at least two times.
1808
 *   First time to read Pred col, second time to read non-pred.
1809
 *   Here's an interesting question to research, whether read Pred col once is the best plan.
1810
 *   (why not read Pred col twice or more?)
1811
 *
1812
 *   When Lazy Materialization disable, we just need to read once.
1813
 *
1814
 *
1815
 *  2 Whether the predicate type can be evaluate in a fast way(using SIMD to eval pred)
1816
 *    Such as integer type and float type, they can be eval fast.
1817
 *    But for BloomFilter/string/date, they eval slow.
1818
 *    If a type can be eval fast, we use vectorization to eval it.
1819
 *    Otherwise, we use short-circuit to eval it.
1820
 *
1821
 *
1822
 */
1823
1824
// todo(wb) need a UT here
1825
5.63k
Status SegmentIterator::_vec_init_lazy_materialization() {
1826
5.63k
    _is_pred_column.resize(_schema->columns().size(), false);
1827
1828
    // including short/vec/delete pred
1829
5.63k
    std::set<ColumnId> pred_column_ids;
1830
5.63k
    _lazy_materialization_read = false;
1831
1832
5.63k
    std::set<ColumnId> del_cond_id_set;
1833
5.63k
    _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
1834
1835
5.63k
    std::set<std::shared_ptr<const ColumnPredicate>> delete_predicate_set {};
1836
5.63k
    _opts.delete_condition_predicates->get_all_column_predicate(delete_predicate_set);
1837
5.63k
    for (auto predicate : delete_predicate_set) {
1838
467
        if (PredicateTypeTraits::is_range(predicate->type())) {
1839
327
            _delete_range_column_ids.push_back(predicate->column_id());
1840
327
        } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) {
1841
0
            _delete_bloom_filter_column_ids.push_back(predicate->column_id());
1842
0
        }
1843
467
    }
1844
1845
    // Step1: extract columns that can be lazy materialization
1846
5.63k
    if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
1847
467
        std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
1848
467
        std::set<ColumnId> vec_pred_col_id_set;
1849
1850
467
        for (auto predicate : _col_predicates) {
1851
0
            auto cid = predicate->column_id();
1852
0
            _is_pred_column[cid] = true;
1853
0
            pred_column_ids.insert(cid);
1854
1855
            // check pred using short eval or vec eval
1856
0
            if (_can_evaluated_by_vectorized(predicate)) {
1857
0
                vec_pred_col_id_set.insert(cid);
1858
0
                _pre_eval_block_predicate.push_back(predicate);
1859
0
            } else {
1860
0
                short_cir_pred_col_id_set.insert(cid);
1861
0
                _short_cir_eval_predicate.push_back(predicate);
1862
0
            }
1863
0
            if (predicate->is_runtime_filter()) {
1864
0
                _filter_info_id.push_back(predicate);
1865
0
            }
1866
0
        }
1867
1868
        // handle delete_condition
1869
467
        if (!del_cond_id_set.empty()) {
1870
467
            short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end());
1871
467
            pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end());
1872
1873
467
            for (auto cid : del_cond_id_set) {
1874
467
                _is_pred_column[cid] = true;
1875
467
            }
1876
467
        }
1877
1878
467
        _vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend());
1879
467
        _short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(),
1880
467
                                          short_cir_pred_col_id_set.cend());
1881
467
    }
1882
1883
5.63k
    if (!_vec_pred_column_ids.empty()) {
1884
0
        _is_need_vec_eval = true;
1885
0
    }
1886
5.63k
    if (!_short_cir_pred_column_ids.empty()) {
1887
467
        _is_need_short_eval = true;
1888
467
    }
1889
1890
    // ColumnId to column index in block
1891
    // ColumnId will contail all columns in tablet schema, including virtual columns and global rowid column,
1892
5.63k
    _schema_block_id_map.resize(_schema->columns().size(), -1);
1893
    // Use cols read by query to initialize _schema_block_id_map.
1894
    // We need to know the index of each column in the block.
1895
    // There is an assumption here that the columns in the block are in the same order as in the read schema.
1896
    // TODO: A probelm is that, delete condition columns will exist in _schema->column_ids but not in block if
1897
    // delete column is not read by the query.
1898
18.3k
    for (int i = 0; i < _schema->num_column_ids(); i++) {
1899
12.6k
        auto cid = _schema->column_id(i);
1900
12.6k
        _schema_block_id_map[cid] = i;
1901
12.6k
    }
1902
1903
    // Step2: extract columns that can execute expr context
1904
5.63k
    _is_common_expr_column.resize(_schema->columns().size(), false);
1905
5.63k
    if (_enable_common_expr_pushdown && !_remaining_conjunct_roots.empty()) {
1906
0
        for (auto expr : _remaining_conjunct_roots) {
1907
0
            RETURN_IF_ERROR(_extract_common_expr_columns(expr));
1908
0
        }
1909
0
        if (!_common_expr_columns.empty()) {
1910
0
            _is_need_expr_eval = true;
1911
0
            for (auto cid : _schema->column_ids()) {
1912
                // pred column also needs to be filtered by expr, exclude additional delete condition column.
1913
                // if delete condition column not in the block, no filter is needed
1914
                // and will be removed from _columns_to_filter in the first next_batch.
1915
0
                if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
1916
0
                    auto loc = _schema_block_id_map[cid];
1917
0
                    _columns_to_filter.push_back(loc);
1918
0
                }
1919
0
            }
1920
1921
0
            for (auto pair : _vir_cid_to_idx_in_block) {
1922
0
                _columns_to_filter.push_back(cast_set<ColumnId>(pair.second));
1923
0
            }
1924
0
        }
1925
0
    }
1926
1927
    // Step 3: fill non predicate columns and second read column
1928
    // if _schema columns size equal to pred_column_ids size, lazy_materialization_read is false,
1929
    // all columns are lazy materialization columns without non predicte column.
1930
    // If common expr pushdown exists, and expr column is not contained in lazy materialization columns,
1931
    // add to second read column, which will be read after lazy materialization
1932
5.63k
    if (_schema->column_ids().size() > pred_column_ids.size()) {
1933
        // pred_column_ids maybe empty, so that could not set _lazy_materialization_read = true here
1934
        // has to check there is at least one predicate column
1935
12.6k
        for (auto cid : _schema->column_ids()) {
1936
12.6k
            if (!_is_pred_column[cid]) {
1937
12.2k
                if (_is_need_vec_eval || _is_need_short_eval) {
1938
862
                    _lazy_materialization_read = true;
1939
862
                }
1940
12.2k
                if (_is_common_expr_column[cid]) {
1941
0
                    _common_expr_column_ids.push_back(cid);
1942
12.2k
                } else {
1943
12.2k
                    _non_predicate_columns.push_back(cid);
1944
12.2k
                }
1945
12.2k
            }
1946
12.6k
        }
1947
5.57k
    }
1948
1949
    // Step 4: fill first read columns
1950
5.63k
    if (_lazy_materialization_read) {
1951
        // insert pred cid to first_read_columns
1952
410
        for (auto cid : pred_column_ids) {
1953
410
            _predicate_column_ids.push_back(cid);
1954
410
        }
1955
5.22k
    } else if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
1956
16.5k
        for (int i = 0; i < _schema->num_column_ids(); i++) {
1957
11.3k
            auto cid = _schema->column_id(i);
1958
11.3k
            _predicate_column_ids.push_back(cid);
1959
11.3k
        }
1960
5.16k
    } else {
1961
57
        if (_is_need_vec_eval || _is_need_short_eval) {
1962
            // TODO To refactor, because we suppose lazy materialization is better performance.
1963
            // pred exits, but we can eliminate lazy materialization
1964
            // insert pred/non-pred cid to first read columns
1965
57
            std::set<ColumnId> pred_id_set;
1966
57
            pred_id_set.insert(_short_cir_pred_column_ids.begin(),
1967
57
                               _short_cir_pred_column_ids.end());
1968
57
            pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());
1969
1970
57
            DCHECK(_common_expr_column_ids.empty());
1971
            // _non_predicate_column_ids must be empty. Otherwise _lazy_materialization_read must not false.
1972
114
            for (int i = 0; i < _schema->num_column_ids(); i++) {
1973
57
                auto cid = _schema->column_id(i);
1974
57
                if (pred_id_set.find(cid) != pred_id_set.end()) {
1975
57
                    _predicate_column_ids.push_back(cid);
1976
57
                }
1977
57
            }
1978
57
        } else if (_is_need_expr_eval) {
1979
0
            DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
1980
0
            for (auto cid : _common_expr_columns) {
1981
0
                _predicate_column_ids.push_back(cid);
1982
0
            }
1983
0
        }
1984
57
    }
1985
1986
5.63k
    VLOG_DEBUG << fmt::format(
1987
0
            "Laze materialization init end. "
1988
0
            "lazy_materialization_read: {}, "
1989
0
            "_col_predicates size: {}, "
1990
0
            "_cols_read_by_column_predicate: [{}], "
1991
0
            "_non_predicate_columns: [{}], "
1992
0
            "_cols_read_by_common_expr: [{}], "
1993
0
            "columns_to_filter: [{}], "
1994
0
            "_schema_block_id_map: [{}]",
1995
0
            _lazy_materialization_read, _col_predicates.size(),
1996
0
            fmt::join(_predicate_column_ids, ","), fmt::join(_non_predicate_columns, ","),
1997
0
            fmt::join(_common_expr_column_ids, ","), fmt::join(_columns_to_filter, ","),
1998
0
            fmt::join(_schema_block_id_map, ","));
1999
5.63k
    return Status::OK();
2000
5.63k
}
2001
2002
0
bool SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate> predicate) {
2003
0
    auto cid = predicate->column_id();
2004
0
    FieldType field_type = _schema->column(cid)->type();
2005
0
    if (field_type == FieldType::OLAP_FIELD_TYPE_VARIANT) {
2006
        // Use variant cast dst type
2007
0
        field_type = _opts.target_cast_type_for_variants[_schema->column(cid)->name()]
2008
0
                             ->get_storage_field_type();
2009
0
    }
2010
0
    switch (predicate->type()) {
2011
0
    case PredicateType::EQ:
2012
0
    case PredicateType::NE:
2013
0
    case PredicateType::LE:
2014
0
    case PredicateType::LT:
2015
0
    case PredicateType::GE:
2016
0
    case PredicateType::GT: {
2017
0
        if (field_type == FieldType::OLAP_FIELD_TYPE_VARCHAR ||
2018
0
            field_type == FieldType::OLAP_FIELD_TYPE_CHAR ||
2019
0
            field_type == FieldType::OLAP_FIELD_TYPE_STRING) {
2020
0
            return config::enable_low_cardinality_optimize &&
2021
0
                   _opts.io_ctx.reader_type == ReaderType::READER_QUERY &&
2022
0
                   _column_iterators[cid]->is_all_dict_encoding();
2023
0
        } else if (field_type == FieldType::OLAP_FIELD_TYPE_DECIMAL) {
2024
0
            return false;
2025
0
        }
2026
0
        return true;
2027
0
    }
2028
0
    default:
2029
0
        return false;
2030
0
    }
2031
0
}
2032
2033
12.6k
bool SegmentIterator::_has_char_type(const StorageField& column_desc) {
2034
12.6k
    switch (column_desc.type()) {
2035
0
    case FieldType::OLAP_FIELD_TYPE_CHAR:
2036
0
        return true;
2037
2
    case FieldType::OLAP_FIELD_TYPE_ARRAY:
2038
2
        return _has_char_type(*column_desc.get_sub_field(0));
2039
0
    case FieldType::OLAP_FIELD_TYPE_MAP:
2040
0
        return _has_char_type(*column_desc.get_sub_field(0)) ||
2041
0
               _has_char_type(*column_desc.get_sub_field(1));
2042
0
    case FieldType::OLAP_FIELD_TYPE_STRUCT:
2043
0
        for (int idx = 0; idx < column_desc.get_sub_field_count(); ++idx) {
2044
0
            if (_has_char_type(*column_desc.get_sub_field(idx))) {
2045
0
                return true;
2046
0
            }
2047
0
        }
2048
0
        return false;
2049
12.6k
    default:
2050
12.6k
        return false;
2051
12.6k
    }
2052
12.6k
};
2053
2054
5.63k
void SegmentIterator::_vec_init_char_column_id(Block* block) {
2055
5.63k
    if (!_char_type_idx.empty()) {
2056
0
        return;
2057
0
    }
2058
5.63k
    _is_char_type.resize(_schema->columns().size(), false);
2059
18.3k
    for (size_t i = 0; i < _schema->num_column_ids(); i++) {
2060
12.6k
        auto cid = _schema->column_id(i);
2061
12.6k
        const StorageField* column_desc = _schema->column(cid);
2062
2063
        // The additional deleted filter condition will be in the materialized column at the end of the block.
2064
        // After _output_column_by_sel_idx, it will be erased, so we do not need to shrink it.
2065
12.6k
        if (i < block->columns()) {
2066
12.6k
            if (_has_char_type(*column_desc)) {
2067
0
                _char_type_idx.emplace_back(i);
2068
0
            }
2069
12.6k
        }
2070
2071
12.6k
        if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_CHAR) {
2072
0
            _is_char_type[cid] = true;
2073
0
        }
2074
12.6k
    }
2075
5.63k
}
2076
2077
bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, bool fill_defaults,
2078
43.4k
                                    size_t num_of_defaults) {
2079
43.4k
    if (_need_read_data(cid)) {
2080
43.4k
        return false;
2081
43.4k
    }
2082
0
    if (!fill_defaults) {
2083
0
        return true;
2084
0
    }
2085
0
    if (column->is_nullable()) {
2086
0
        auto nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get());
2087
0
        nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
2088
0
        nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
2089
0
    } else {
2090
        // assert(column->is_const());
2091
0
        column->insert_many_defaults(num_of_defaults);
2092
0
    }
2093
0
    return true;
2094
0
}
2095
2096
Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
2097
0
                                      MutableColumns& column_block, size_t nrows) {
2098
0
    for (auto cid : column_ids) {
2099
0
        auto& column = column_block[cid];
2100
0
        size_t rows_read = nrows;
2101
0
        if (_prune_column(cid, column, true, rows_read)) {
2102
0
            continue;
2103
0
        }
2104
0
        RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
2105
0
        if (nrows != rows_read) {
2106
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows,
2107
0
                                                            rows_read);
2108
0
        }
2109
0
    }
2110
0
    return Status::OK();
2111
0
}
2112
2113
Status SegmentIterator::_init_current_block(Block* block,
2114
                                            std::vector<MutableColumnPtr>& current_columns,
2115
23.1k
                                            uint32_t nrows_read_limit) {
2116
23.1k
    block->clear_column_data(_schema->num_column_ids());
2117
2118
67.5k
    for (size_t i = 0; i < _schema->num_column_ids(); i++) {
2119
44.4k
        auto cid = _schema->column_id(i);
2120
44.4k
        const auto* column_desc = _schema->column(cid);
2121
2122
44.4k
        auto file_column_type = _storage_name_and_type[cid].second;
2123
44.4k
        auto expected_type = Schema::get_data_type_ptr(*column_desc);
2124
44.4k
        if (!_is_pred_column[cid] && !file_column_type->equals(*expected_type)) {
2125
            // The storage layer type is different from schema needed type, so we use storage
2126
            // type to read columns instead of schema type for safety
2127
0
            VLOG_DEBUG << fmt::format(
2128
0
                    "Recreate column with expected type {}, file column type {}, col_name {}, "
2129
0
                    "col_path {}",
2130
0
                    block->get_by_position(i).type->get_name(), file_column_type->get_name(),
2131
0
                    column_desc->name(),
2132
0
                    column_desc->path() == nullptr ? "" : column_desc->path()->get_path());
2133
            // TODO reuse
2134
0
            current_columns[cid] = file_column_type->create_column();
2135
0
            current_columns[cid]->reserve(nrows_read_limit);
2136
44.4k
        } else {
2137
            // the column in block must clear() here to insert new data
2138
44.4k
            if (_is_pred_column[cid] ||
2139
44.4k
                i >= block->columns()) { //todo(wb) maybe we can release it after output block
2140
4.66k
                if (current_columns[cid].get() == nullptr) {
2141
0
                    return Status::InternalError(
2142
0
                            "SegmentIterator meet invalid column, id={}, name={}", cid,
2143
0
                            _schema->column(cid)->name());
2144
0
                }
2145
4.66k
                current_columns[cid]->clear();
2146
39.7k
            } else { // non-predicate column
2147
39.7k
                current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();
2148
39.7k
                current_columns[cid]->reserve(nrows_read_limit);
2149
39.7k
            }
2150
44.4k
        }
2151
44.4k
    }
2152
2153
23.1k
    for (auto entry : _virtual_column_exprs) {
2154
0
        auto cid = entry.first;
2155
0
        current_columns[cid] = ColumnNothing::create(0);
2156
0
        current_columns[cid]->reserve(nrows_read_limit);
2157
0
    }
2158
2159
23.1k
    return Status::OK();
2160
23.1k
}
2161
2162
17.5k
Status SegmentIterator::_output_non_pred_columns(Block* block) {
2163
17.5k
    SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
2164
17.5k
    VLOG_DEBUG << fmt::format(
2165
0
            "Output non-predicate columns, _non_predicate_columns: [{}], "
2166
0
            "_schema_block_id_map: [{}]",
2167
0
            fmt::join(_non_predicate_columns, ","), fmt::join(_schema_block_id_map, ","));
2168
17.5k
    RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
2169
27.5k
    for (auto cid : _non_predicate_columns) {
2170
27.5k
        auto loc = _schema_block_id_map[cid];
2171
        // Whether a delete predicate column gets output depends on how the caller builds
2172
        // the block passed to next_batch(). Both calling paths now build the block with
2173
        // only the output schema (return_columns), so delete predicate columns are skipped:
2174
        //
2175
        // 1) VMergeIterator path: block_reset() builds _block using the output schema
2176
        //    (return_columns only), e.g. block has 2 columns {c1, c2}.
2177
        //    Here loc=2 for delete predicate c3, block->columns()=2, so loc < block->columns()
2178
        //    is false, and c3 is skipped.
2179
        //
2180
        // 2) VUnionIterator path: the caller's block is built with only return_columns
2181
        //    (output schema), e.g. block has 2 columns {c1, c2}.
2182
        //    Here loc=2 for c3, block->columns()=2, so loc < block->columns() is false,
2183
        //    and c3 is skipped — same behavior as the VMergeIterator path.
2184
27.5k
        if (loc < block->columns()) {
2185
27.5k
            bool column_in_block_is_nothing = check_and_get_column<const ColumnNothing>(
2186
27.5k
                    block->get_by_position(loc).column.get());
2187
27.5k
            bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid);
2188
27.5k
            bool return_column_is_nothing =
2189
27.5k
                    check_and_get_column<const ColumnNothing>(_current_return_columns[cid].get());
2190
27.5k
            VLOG_DEBUG << fmt::format(
2191
0
                    "Cid {} loc {}, column_in_block_is_nothing {}, column_is_normal {}, "
2192
0
                    "return_column_is_nothing {}",
2193
0
                    cid, loc, column_in_block_is_nothing, column_is_normal,
2194
0
                    return_column_is_nothing);
2195
2196
27.5k
            if (column_in_block_is_nothing || column_is_normal) {
2197
27.5k
                block->replace_by_position(loc, std::move(_current_return_columns[cid]));
2198
27.5k
                VLOG_DEBUG << fmt::format(
2199
0
                        "Output non-predicate column, cid: {}, loc: {}, col_name: {}, rows {}", cid,
2200
0
                        loc, _schema->column(cid)->name(),
2201
0
                        block->get_by_position(loc).column->size());
2202
27.5k
            }
2203
            // Means virtual column in block has been materialized(maybe by common expr).
2204
            // so do nothing here.
2205
27.5k
        }
2206
27.5k
    }
2207
17.5k
    return Status::OK();
2208
17.5k
}
2209
2210
/**
2211
 * Reads columns by their index, handling both continuous and discontinuous rowid scenarios.
2212
 *
2213
 * This function is designed to read a specified number of rows (up to nrows_read_limit)
2214
 * from the segment iterator, dealing with both continuous and discontinuous rowid arrays.
2215
 * It operates as follows:
2216
 *
2217
 * 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous.
2218
 *    Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...).
2219
 *
2220
 * 2. For each column that needs to be read (identified by _predicate_column_ids):
2221
 *    - If the rowids are continuous, the function uses seek_to_ordinal and next_batch
2222
 *      for efficient reading.
2223
 *    - If the rowids are not continuous, the function processes them in smaller batches
2224
 *      (each of size up to 256). Each batch is checked for internal continuity:
2225
 *        a. If a batch is continuous, uses seek_to_ordinal and next_batch for that batch.
2226
 *        b. If a batch is not continuous, uses read_by_rowids for individual rowids in the batch.
2227
 *
2228
 * This approach optimizes reading performance by leveraging batch processing for continuous
2229
 * rowid sequences and handling discontinuities gracefully in smaller chunks.
2230
 */
2231
23.1k
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read) {
2232
23.1k
    SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns);
2233
2234
23.1k
    nrows_read = (uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit);
2235
23.1k
    bool is_continuous = (nrows_read > 1) &&
2236
23.1k
                         (_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1);
2237
23.1k
    VLOG_DEBUG << fmt::format(
2238
0
            "nrows_read from range iterator: {}, is_continus {}, _cols_read_by_column_predicate "
2239
0
            "[{}]",
2240
0
            nrows_read, is_continuous, fmt::join(_predicate_column_ids, ","));
2241
2242
23.1k
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
2243
0
            "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, continuous: {}, "
2244
0
            "rowids: [{}...{}]",
2245
0
            nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0,
2246
0
            nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0);
2247
39.6k
    for (auto cid : _predicate_column_ids) {
2248
39.6k
        auto& column = _current_return_columns[cid];
2249
39.6k
        VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid,
2250
0
                                  _schema->column(cid)->name());
2251
39.6k
        if (!_virtual_column_exprs.contains(cid)) {
2252
39.6k
            if (_no_need_read_key_data(cid, column, nrows_read)) {
2253
0
                VLOG_DEBUG << fmt::format("Column {} no need to read.", cid);
2254
0
                continue;
2255
0
            }
2256
39.6k
            if (_prune_column(cid, column, true, nrows_read)) {
2257
0
                VLOG_DEBUG << fmt::format("Column {} is pruned. No need to read data.", cid);
2258
0
                continue;
2259
0
            }
2260
39.6k
            DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", {
2261
39.6k
                auto col_name = _opts.tablet_schema->column(cid).name();
2262
39.6k
                auto debug_col_name =
2263
39.6k
                        DebugPoints::instance()->get_debug_param_or_default<std::string>(
2264
39.6k
                                "segment_iterator._read_columns_by_index", "column_name", "");
2265
39.6k
                if (debug_col_name.empty() && col_name != "__DORIS_DELETE_SIGN__") {
2266
39.6k
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
2267
39.6k
                            "does not need to read data, {}", col_name);
2268
39.6k
                }
2269
39.6k
                if (debug_col_name.find(col_name) != std::string::npos) {
2270
39.6k
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
2271
39.6k
                            "does not need to read data, {}", col_name);
2272
39.6k
                }
2273
39.6k
            })
2274
39.6k
        }
2275
2276
39.6k
        if (is_continuous) {
2277
27.5k
            size_t rows_read = nrows_read;
2278
27.5k
            _opts.stats->predicate_column_read_seek_num += 1;
2279
27.5k
            if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
2280
0
                SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns);
2281
0
                RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
2282
27.5k
            } else {
2283
27.5k
                RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
2284
27.5k
            }
2285
27.5k
            RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
2286
27.5k
            if (rows_read != nrows_read) {
2287
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})",
2288
0
                                                                nrows_read, rows_read);
2289
0
            }
2290
27.5k
        } else {
2291
12.1k
            const uint32_t batch_size = _range_iter->get_batch_size();
2292
12.1k
            uint32_t processed = 0;
2293
13.3k
            while (processed < nrows_read) {
2294
1.19k
                uint32_t current_batch_size = std::min(batch_size, nrows_read - processed);
2295
1.19k
                bool batch_continuous = (current_batch_size > 1) &&
2296
1.19k
                                        (_block_rowids[processed + current_batch_size - 1] -
2297
1.17k
                                                 _block_rowids[processed] ==
2298
1.17k
                                         current_batch_size - 1);
2299
2300
1.19k
                if (batch_continuous) {
2301
0
                    size_t rows_read = current_batch_size;
2302
0
                    _opts.stats->predicate_column_read_seek_num += 1;
2303
0
                    if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
2304
0
                        SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns);
2305
0
                        RETURN_IF_ERROR(
2306
0
                                _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
2307
0
                    } else {
2308
0
                        RETURN_IF_ERROR(
2309
0
                                _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
2310
0
                    }
2311
0
                    RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
2312
0
                    if (rows_read != current_batch_size) {
2313
0
                        return Status::Error<ErrorCode::INTERNAL_ERROR>(
2314
0
                                "batch nrows({}) != rows_read({})", current_batch_size, rows_read);
2315
0
                    }
2316
1.19k
                } else {
2317
1.19k
                    RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(
2318
1.19k
                            &_block_rowids[processed], current_batch_size, column));
2319
1.19k
                }
2320
1.19k
                processed += current_batch_size;
2321
1.19k
            }
2322
12.1k
        }
2323
39.6k
    }
2324
2325
23.1k
    return Status::OK();
2326
23.1k
}
2327
void SegmentIterator::_replace_version_col_if_needed(const std::vector<ColumnId>& column_ids,
2328
26.1k
                                                     size_t num_rows) {
2329
    // Only the rowset with single version need to replace the version column.
2330
    // Doris can't determine the version before publish_version finished, so
2331
    // we can't write data to __DORIS_VERSION_COL__ in segment writer, the value
2332
    // is 0 by default.
2333
    // So we need to replace the value to real version while reading.
2334
26.1k
    if (_opts.version.first != _opts.version.second) {
2335
9.41k
        return;
2336
9.41k
    }
2337
16.7k
    int32_t version_idx = _schema->version_col_idx();
2338
16.7k
    if (std::ranges::find(column_ids, version_idx) == column_ids.end()) {
2339
16.7k
        return;
2340
16.7k
    }
2341
2342
0
    const auto* column_desc = _schema->column(version_idx);
2343
0
    auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
2344
0
    DCHECK(_schema->column(version_idx)->type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
2345
0
    auto* col_ptr = assert_cast<ColumnInt64*>(column.get());
2346
0
    for (size_t j = 0; j < num_rows; j++) {
2347
0
        col_ptr->insert_value(_opts.version.second);
2348
0
    }
2349
0
    _current_return_columns[version_idx] = std::move(column);
2350
0
    VLOG_DEBUG << "replaced version column in segment iterator, version_col_idx:" << version_idx;
2351
0
}
2352
2353
uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
2354
4.19k
                                                            uint16_t selected_size) {
2355
4.19k
    SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
2356
4.19k
    bool all_pred_always_true = true;
2357
4.19k
    for (const auto& pred : _pre_eval_block_predicate) {
2358
0
        if (!pred->always_true()) {
2359
0
            all_pred_always_true = false;
2360
0
        } else {
2361
0
            pred->update_filter_info(0, 0, selected_size);
2362
0
        }
2363
0
    }
2364
2365
4.19k
    const uint16_t original_size = selected_size;
2366
    //If all predicates are always_true, then return directly.
2367
4.19k
    if (all_pred_always_true || !_is_need_vec_eval) {
2368
3.91M
        for (uint16_t i = 0; i < original_size; ++i) {
2369
3.90M
            sel_rowid_idx[i] = i;
2370
3.90M
        }
2371
        // All preds are always_true, so return immediately and update the profile statistics here.
2372
4.19k
        _opts.stats->vec_cond_input_rows += original_size;
2373
4.19k
        return original_size;
2374
4.19k
    }
2375
2376
0
    _ret_flags.resize(original_size);
2377
0
    DCHECK(!_pre_eval_block_predicate.empty());
2378
0
    bool is_first = true;
2379
0
    for (auto& pred : _pre_eval_block_predicate) {
2380
0
        if (pred->always_true()) {
2381
0
            continue;
2382
0
        }
2383
0
        auto column_id = pred->column_id();
2384
0
        auto& column = _current_return_columns[column_id];
2385
0
        if (is_first) {
2386
0
            pred->evaluate_vec(*column, original_size, (bool*)_ret_flags.data());
2387
0
            is_first = false;
2388
0
        } else {
2389
0
            pred->evaluate_and_vec(*column, original_size, (bool*)_ret_flags.data());
2390
0
        }
2391
0
    }
2392
2393
0
    uint16_t new_size = 0;
2394
2395
0
    uint16_t sel_pos = 0;
2396
0
    const uint16_t sel_end = sel_pos + selected_size;
2397
0
    static constexpr size_t SIMD_BYTES = simd::bits_mask_length();
2398
0
    const uint16_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES;
2399
2400
0
    while (sel_pos < sel_end_simd) {
2401
0
        auto mask = simd::bytes_mask_to_bits_mask(_ret_flags.data() + sel_pos);
2402
0
        if (0 == mask) {
2403
            //pass
2404
0
        } else if (simd::bits_mask_all() == mask) {
2405
0
            for (uint16_t i = 0; i < SIMD_BYTES; i++) {
2406
0
                sel_rowid_idx[new_size++] = sel_pos + i;
2407
0
            }
2408
0
        } else {
2409
0
            simd::iterate_through_bits_mask(
2410
0
                    [&](const int bit_pos) {
2411
0
                        sel_rowid_idx[new_size++] = sel_pos + (uint16_t)bit_pos;
2412
0
                    },
2413
0
                    mask);
2414
0
        }
2415
0
        sel_pos += SIMD_BYTES;
2416
0
    }
2417
2418
0
    for (; sel_pos < sel_end; sel_pos++) {
2419
0
        if (_ret_flags[sel_pos]) {
2420
0
            sel_rowid_idx[new_size++] = sel_pos;
2421
0
        }
2422
0
    }
2423
2424
0
    _opts.stats->vec_cond_input_rows += original_size;
2425
0
    _opts.stats->rows_vec_cond_filtered += original_size - new_size;
2426
0
    return new_size;
2427
4.19k
}
2428
2429
uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx,
2430
4.19k
                                                            uint16_t selected_size) {
2431
4.19k
    SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns);
2432
4.19k
    if (!_is_need_short_eval) {
2433
0
        return selected_size;
2434
0
    }
2435
2436
4.19k
    uint16_t original_size = selected_size;
2437
4.19k
    for (auto predicate : _short_cir_eval_predicate) {
2438
0
        auto column_id = predicate->column_id();
2439
0
        auto& short_cir_column = _current_return_columns[column_id];
2440
0
        selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size);
2441
0
    }
2442
2443
4.19k
    _opts.stats->short_circuit_cond_input_rows += original_size;
2444
4.19k
    _opts.stats->rows_short_circuit_cond_filtered += original_size - selected_size;
2445
2446
    // evaluate delete condition
2447
4.19k
    original_size = selected_size;
2448
4.19k
    selected_size = _opts.delete_condition_predicates->evaluate(_current_return_columns,
2449
4.19k
                                                                vec_sel_rowid_idx, selected_size);
2450
4.19k
    _opts.stats->rows_vec_del_cond_filtered += original_size - selected_size;
2451
4.19k
    return selected_size;
2452
4.19k
}
2453
2454
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
2455
                                                std::vector<rowid_t>& rowid_vector,
2456
                                                uint16_t* sel_rowid_idx, size_t select_size,
2457
                                                MutableColumns* mutable_columns,
2458
3.05k
                                                bool init_condition_cache) {
2459
3.05k
    SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
2460
3.05k
    std::vector<rowid_t> rowids(select_size);
2461
2462
3.05k
    if (init_condition_cache) {
2463
0
        DCHECK(_condition_cache);
2464
0
        auto& condition_cache = *_condition_cache;
2465
0
        for (size_t i = 0; i < select_size; ++i) {
2466
0
            rowids[i] = rowid_vector[sel_rowid_idx[i]];
2467
0
            condition_cache[rowids[i] / SegmentIterator::CONDITION_CACHE_OFFSET] = true;
2468
0
        }
2469
3.05k
    } else {
2470
2.74M
        for (size_t i = 0; i < select_size; ++i) {
2471
2.74M
            rowids[i] = rowid_vector[sel_rowid_idx[i]];
2472
2.74M
        }
2473
3.05k
    }
2474
2475
3.77k
    for (auto cid : read_column_ids) {
2476
3.77k
        auto& colunm = (*mutable_columns)[cid];
2477
3.77k
        if (_no_need_read_key_data(cid, colunm, select_size)) {
2478
0
            continue;
2479
0
        }
2480
3.77k
        if (_prune_column(cid, colunm, true, select_size)) {
2481
0
            continue;
2482
0
        }
2483
2484
3.77k
        DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", {
2485
3.77k
            auto debug_col_name = DebugPoints::instance()->get_debug_param_or_default<std::string>(
2486
3.77k
                    "segment_iterator._read_columns_by_index", "column_name", "");
2487
3.77k
            if (debug_col_name.empty()) {
2488
3.77k
                return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data");
2489
3.77k
            }
2490
3.77k
            auto col_name = _opts.tablet_schema->column(cid).name();
2491
3.77k
            if (debug_col_name.find(col_name) != std::string::npos) {
2492
3.77k
                return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data, {}",
2493
3.77k
                                                                debug_col_name);
2494
3.77k
            }
2495
3.77k
        })
2496
2497
3.77k
        if (_current_return_columns[cid].get() == nullptr) {
2498
0
            return Status::InternalError(
2499
0
                    "SegmentIterator meet invalid column, return columns size {}, cid {}",
2500
0
                    _current_return_columns.size(), cid);
2501
0
        }
2502
3.77k
        RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
2503
3.77k
                                                               _current_return_columns[cid]));
2504
3.77k
    }
2505
2506
3.05k
    return Status::OK();
2507
3.05k
}
2508
2509
23.1k
Status SegmentIterator::next_batch(Block* block) {
2510
    // Replace virtual columns with ColumnNothing at the begining of each next_batch call.
2511
23.1k
    _init_virtual_columns(block);
2512
23.1k
    auto status = [&]() {
2513
23.1k
        RETURN_IF_CATCH_EXCEPTION({
2514
23.1k
            auto res = _next_batch_internal(block);
2515
2516
23.1k
            if (res.is<END_OF_FILE>()) {
2517
                // Since we have a type check at the caller.
2518
                // So a replacement of nothing column with real column is needed.
2519
23.1k
                const auto& idx_to_datatype = _opts.vir_col_idx_to_type;
2520
23.1k
                for (const auto& pair : _vir_cid_to_idx_in_block) {
2521
23.1k
                    size_t idx = pair.second;
2522
23.1k
                    auto type = idx_to_datatype.find(idx)->second;
2523
23.1k
                    block->replace_by_position(idx, type->create_column());
2524
23.1k
                }
2525
2526
23.1k
                if (_opts.condition_cache_digest && !_find_condition_cache) {
2527
23.1k
                    auto* condition_cache = ConditionCache::instance();
2528
23.1k
                    ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
2529
23.1k
                                                       _opts.condition_cache_digest);
2530
23.1k
                    VLOG_DEBUG << "Condition cache insert, query id: "
2531
23.1k
                               << print_id(_opts.runtime_state->query_id())
2532
23.1k
                               << ", rowset id: " << _opts.rowset_id.to_string()
2533
23.1k
                               << ", segment id: " << _segment->id()
2534
23.1k
                               << ", cache digest: " << _opts.condition_cache_digest;
2535
23.1k
                    condition_cache->insert(cache_key, std::move(_condition_cache));
2536
23.1k
                }
2537
23.1k
                return res;
2538
23.1k
            }
2539
2540
23.1k
            RETURN_IF_ERROR(res);
2541
            // reverse block row order if read_orderby_key_reverse is true for key topn
2542
            // it should be processed for all success _next_batch_internal
2543
23.1k
            if (_opts.read_orderby_key_reverse) {
2544
23.1k
                size_t num_rows = block->rows();
2545
23.1k
                if (num_rows == 0) {
2546
23.1k
                    return Status::OK();
2547
23.1k
                }
2548
23.1k
                size_t num_columns = block->columns();
2549
23.1k
                IColumn::Permutation permutation;
2550
23.1k
                for (size_t i = 0; i < num_rows; ++i) permutation.emplace_back(num_rows - 1 - i);
2551
2552
23.1k
                for (size_t i = 0; i < num_columns; ++i)
2553
23.1k
                    block->get_by_position(i).column =
2554
23.1k
                            block->get_by_position(i).column->permute(permutation, num_rows);
2555
23.1k
            }
2556
2557
23.1k
            RETURN_IF_ERROR(block->check_type_and_column());
2558
2559
23.1k
            return Status::OK();
2560
23.1k
        });
2561
23.1k
    }();
2562
2563
    // if rows read by batch is 0, will return end of file, we should not remove segment cache in this situation.
2564
23.1k
    if (!status.ok() && !status.is<END_OF_FILE>()) {
2565
0
        _segment->update_healthy_status(status);
2566
0
    }
2567
23.1k
    return status;
2568
23.1k
}
2569
2570
23.1k
Status SegmentIterator::_convert_to_expected_type(const std::vector<ColumnId>& col_ids) {
2571
40.2k
    for (ColumnId i : col_ids) {
2572
40.2k
        if (!_current_return_columns[i] || _converted_column_ids[i] || _is_pred_column[i]) {
2573
467
            continue;
2574
467
        }
2575
39.7k
        const StorageField* field_type = _schema->column(i);
2576
39.7k
        DataTypePtr expected_type = Schema::get_data_type_ptr(*field_type);
2577
39.7k
        DataTypePtr file_column_type = _storage_name_and_type[i].second;
2578
39.7k
        if (!file_column_type->equals(*expected_type)) {
2579
0
            ColumnPtr expected;
2580
0
            ColumnPtr original = _current_return_columns[i]->assume_mutable()->get_ptr();
2581
0
            RETURN_IF_ERROR(variant_util::cast_column({original, file_column_type, ""},
2582
0
                                                      expected_type, &expected));
2583
0
            _current_return_columns[i] = expected->assume_mutable();
2584
0
            _converted_column_ids[i] = true;
2585
0
            VLOG_DEBUG << fmt::format(
2586
0
                    "Convert {} fom file column type {} to {}, num_rows {}",
2587
0
                    field_type->path() == nullptr ? "" : field_type->path()->get_path(),
2588
0
                    file_column_type->get_name(), expected_type->get_name(),
2589
0
                    _current_return_columns[i]->size());
2590
0
        }
2591
39.7k
    }
2592
23.1k
    return Status::OK();
2593
23.1k
}
2594
2595
Status SegmentIterator::copy_column_data_by_selector(IColumn* input_col_ptr,
2596
                                                     MutableColumnPtr& output_col,
2597
                                                     uint16_t* sel_rowid_idx, uint16_t select_size,
2598
2.83k
                                                     size_t batch_size) {
2599
2.83k
    if (output_col->is_nullable() != input_col_ptr->is_nullable()) {
2600
0
        LOG(WARNING) << "nullable mismatch for output_column: " << output_col->dump_structure()
2601
0
                     << " input_column: " << input_col_ptr->dump_structure()
2602
0
                     << " select_size: " << select_size;
2603
0
        return Status::RuntimeError("copy_column_data_by_selector nullable mismatch");
2604
0
    }
2605
2.83k
    output_col->reserve(select_size);
2606
2.83k
    return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col.get());
2607
2.83k
}
2608
2609
23.1k
Status SegmentIterator::_next_batch_internal(Block* block) {
2610
23.1k
    SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch);
2611
2612
23.1k
    bool is_mem_reuse = block->mem_reuse();
2613
23.1k
    DCHECK(is_mem_reuse);
2614
2615
23.1k
    RETURN_IF_ERROR(_lazy_init(block));
2616
2617
23.1k
    SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
2618
2619
    // If the row bitmap size is smaller than nrows_read_limit, there's no need to reserve that many column rows.
2620
23.1k
    uint32_t nrows_read_limit =
2621
23.1k
            std::min(cast_set<uint32_t>(_row_bitmap.cardinality()), _opts.block_row_max);
2622
23.1k
    if (_can_opt_topn_reads()) {
2623
0
        nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit), nrows_read_limit);
2624
0
    }
2625
23.1k
    DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
2626
23.1k
        if (nrows_read_limit != 1) {
2627
23.1k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
2628
23.1k
                    "topn opt 1 execute failed: nrows_read_limit={}, _opts.topn_limit={}",
2629
23.1k
                    nrows_read_limit, _opts.topn_limit);
2630
23.1k
        }
2631
23.1k
    })
2632
2633
23.1k
    RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit));
2634
23.1k
    _converted_column_ids.assign(_schema->columns().size(), false);
2635
2636
23.1k
    _selected_size = 0;
2637
23.1k
    RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size));
2638
23.1k
    _replace_version_col_if_needed(_predicate_column_ids, _selected_size);
2639
2640
23.1k
    _opts.stats->blocks_load += 1;
2641
23.1k
    _opts.stats->raw_rows_read += _selected_size;
2642
2643
23.1k
    if (_selected_size == 0) {
2644
5.62k
        return _process_eof(block);
2645
5.62k
    }
2646
2647
17.5k
    if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) {
2648
4.19k
        _sel_rowid_idx.resize(_selected_size);
2649
2650
4.19k
        if (_is_need_vec_eval || _is_need_short_eval) {
2651
4.19k
            _convert_dict_code_for_predicate_if_necessary();
2652
2653
            // step 1: evaluate vectorization predicate
2654
4.19k
            _selected_size =
2655
4.19k
                    _evaluate_vectorization_predicate(_sel_rowid_idx.data(), _selected_size);
2656
2657
            // step 2: evaluate short circuit predicate
2658
            // todo(wb) research whether need to read short predicate after vectorization evaluation
2659
            //          to reduce cost of read short circuit columns.
2660
            //          In SSB test, it make no difference; So need more scenarios to test
2661
4.19k
            _selected_size =
2662
4.19k
                    _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), _selected_size);
2663
4.19k
            VLOG_DEBUG << fmt::format("After evaluate predicates, selected size: {} ",
2664
0
                                      _selected_size);
2665
4.19k
            if (_selected_size > 0) {
2666
                // step 3.1: output short circuit and predicate column
2667
                // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
2668
                // see _vec_init_lazy_materialization
2669
                // todo(wb) need to tell input columnids from output columnids
2670
4.04k
                RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids,
2671
4.04k
                                                          _sel_rowid_idx.data(), _selected_size));
2672
2673
                // step 3.2: read remaining expr column and evaluate it.
2674
4.04k
                if (_is_need_expr_eval) {
2675
                    // The predicate column contains the remaining expr column, no need second read.
2676
0
                    if (_common_expr_column_ids.size() > 0) {
2677
0
                        SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns);
2678
0
                        RETURN_IF_ERROR(_read_columns_by_rowids(
2679
0
                                _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(),
2680
0
                                _selected_size, &_current_return_columns));
2681
0
                        _replace_version_col_if_needed(_common_expr_column_ids, _selected_size);
2682
0
                        RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
2683
0
                    }
2684
2685
0
                    DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]);
2686
0
                    RETURN_IF_ERROR(
2687
0
                            _process_common_expr(_sel_rowid_idx.data(), _selected_size, block));
2688
0
                }
2689
4.04k
            } else {
2690
150
                _fill_column_nothing();
2691
150
                if (_is_need_expr_eval) {
2692
0
                    RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
2693
0
                }
2694
150
            }
2695
4.19k
        } else if (_is_need_expr_eval) {
2696
0
            DCHECK(!_predicate_column_ids.empty());
2697
0
            RETURN_IF_ERROR(_process_columns(_predicate_column_ids, block));
2698
            // first read all rows are insert block, initialize sel_rowid_idx to all rows.
2699
0
            for (uint16_t i = 0; i < _selected_size; ++i) {
2700
0
                _sel_rowid_idx[i] = i;
2701
0
            }
2702
0
            RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), _selected_size, block));
2703
0
        }
2704
2705
        // step4: read non_predicate column
2706
4.19k
        if (_selected_size > 0) {
2707
4.04k
            if (!_non_predicate_columns.empty()) {
2708
3.05k
                RETURN_IF_ERROR(_read_columns_by_rowids(
2709
3.05k
                        _non_predicate_columns, _block_rowids, _sel_rowid_idx.data(),
2710
3.05k
                        _selected_size, &_current_return_columns,
2711
3.05k
                        _opts.condition_cache_digest && !_find_condition_cache));
2712
3.05k
                _replace_version_col_if_needed(_non_predicate_columns, _selected_size);
2713
3.05k
            } else {
2714
995
                if (_opts.condition_cache_digest && !_find_condition_cache) {
2715
0
                    auto& condition_cache = *_condition_cache;
2716
0
                    for (size_t i = 0; i < _selected_size; ++i) {
2717
0
                        auto rowid = _block_rowids[_sel_rowid_idx[i]];
2718
0
                        condition_cache[rowid / SegmentIterator::CONDITION_CACHE_OFFSET] = true;
2719
0
                    }
2720
0
                }
2721
995
            }
2722
4.04k
        }
2723
4.19k
    }
2724
2725
    // step5: output columns
2726
17.5k
    RETURN_IF_ERROR(_output_non_pred_columns(block));
2727
    // Convert inverted index bitmaps to result columns for virtual column exprs
2728
    // (e.g., MATCH projections). This must run before _materialization_of_virtual_column
2729
    // so that fast_execute() can find the pre-computed result columns.
2730
17.5k
    if (!_virtual_column_exprs.empty()) {
2731
0
        bool use_sel = _is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval;
2732
0
        uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr;
2733
0
        std::vector<VExprContext*> vir_ctxs;
2734
0
        vir_ctxs.reserve(_virtual_column_exprs.size());
2735
0
        for (auto& [cid, ctx] : _virtual_column_exprs) {
2736
0
            vir_ctxs.push_back(ctx.get());
2737
0
        }
2738
0
        _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size, block);
2739
0
    }
2740
17.5k
    RETURN_IF_ERROR(_materialization_of_virtual_column(block));
2741
    // shrink char_type suffix zero data
2742
17.5k
    block->shrink_char_type_column_suffix_zero(_char_type_idx);
2743
17.5k
    return _check_output_block(block);
2744
17.5k
}
2745
2746
0
Status SegmentIterator::_process_columns(const std::vector<ColumnId>& column_ids, Block* block) {
2747
0
    RETURN_IF_ERROR(_convert_to_expected_type(column_ids));
2748
0
    for (auto cid : column_ids) {
2749
0
        auto loc = _schema_block_id_map[cid];
2750
0
        block->replace_by_position(loc, std::move(_current_return_columns[cid]));
2751
0
    }
2752
0
    return Status::OK();
2753
0
}
2754
2755
150
void SegmentIterator::_fill_column_nothing() {
2756
    // If column_predicate filters out all rows, the corresponding column in _current_return_columns[cid] must be a ColumnNothing.
2757
    // Because:
2758
    // 1. Before each batch, _init_return_columns is called to initialize _current_return_columns, and virtual columns in _current_return_columns are initialized as ColumnNothing.
2759
    // 2. When select_size == 0, the read method of VirtualColumnIterator will definitely not be called, so the corresponding Column remains a ColumnNothing
2760
150
    for (const auto pair : _vir_cid_to_idx_in_block) {
2761
0
        auto cid = pair.first;
2762
0
        auto pos = pair.second;
2763
0
        const auto* nothing_col =
2764
0
                check_and_get_column<ColumnNothing>(_current_return_columns[cid].get());
2765
0
        DCHECK(nothing_col != nullptr)
2766
0
                << fmt::format("ColumnNothing expected, but got {}, cid: {}, pos: {}",
2767
0
                               _current_return_columns[cid]->get_name(), cid, pos);
2768
0
        _current_return_columns[cid] = _opts.vir_col_idx_to_type[pos]->create_column();
2769
0
    }
2770
150
}
2771
2772
17.5k
Status SegmentIterator::_check_output_block(Block* block) {
2773
17.5k
#ifndef NDEBUG
2774
17.5k
    size_t rows = block->rows();
2775
17.5k
    size_t idx = 0;
2776
30.5k
    for (const auto& entry : *block) {
2777
30.5k
        if (!entry.column) {
2778
0
            return Status::InternalError(
2779
0
                    "Column in idx {} is null, block columns {}, normal_columns {}, "
2780
0
                    "virtual_columns {}",
2781
0
                    idx, block->columns(), _schema->num_column_ids(), _virtual_column_exprs.size());
2782
30.5k
        } else if (check_and_get_column<ColumnNothing>(entry.column.get())) {
2783
0
            if (rows > 0) {
2784
0
                std::vector<std::string> vcid_to_idx;
2785
0
                for (const auto& pair : _vir_cid_to_idx_in_block) {
2786
0
                    vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
2787
0
                }
2788
0
                std::string vir_cid_to_idx_in_block_msg =
2789
0
                        fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ","));
2790
0
                return Status::InternalError(
2791
0
                        "Column in idx {} is nothing, block columns {}, normal_columns {}, "
2792
0
                        "vir_cid_to_idx_in_block_msg {}",
2793
0
                        idx, block->columns(), _schema->num_column_ids(),
2794
0
                        vir_cid_to_idx_in_block_msg);
2795
0
            }
2796
30.5k
        } else if (entry.column->size() != rows) {
2797
0
            return Status::InternalError(
2798
0
                    "Unmatched size {}, expected {}, column: {}, type: {}, idx_in_block: {}, "
2799
0
                    "block: {}",
2800
0
                    entry.column->size(), rows, entry.column->get_name(), entry.type->get_name(),
2801
0
                    idx, block->dump_structure());
2802
0
        }
2803
30.5k
        idx++;
2804
30.5k
    }
2805
17.5k
#endif
2806
17.5k
    return Status::OK();
2807
17.5k
}
2808
2809
0
Status SegmentIterator::_process_column_predicate() {
2810
0
    return Status::OK();
2811
0
}
2812
2813
5.62k
Status SegmentIterator::_process_eof(Block* block) {
2814
    // Convert all columns in _current_return_columns to schema column
2815
5.62k
    RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
2816
18.0k
    for (int i = 0; i < block->columns(); i++) {
2817
12.4k
        auto cid = _schema->column_id(i);
2818
12.4k
        if (!_is_pred_column[cid]) {
2819
12.1k
            block->replace_by_position(i, std::move(_current_return_columns[cid]));
2820
12.1k
        }
2821
12.4k
    }
2822
5.62k
    block->clear_column_data();
2823
    // clear and release iterators memory footprint in advance
2824
5.62k
    _column_iterators.clear();
2825
5.62k
    _index_iterators.clear();
2826
5.62k
    return Status::EndOfFile("no more data in segment");
2827
5.62k
}
2828
2829
Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size,
2830
0
                                             Block* block) {
2831
    // Here we just use col0 as row_number indicator. when reach here, we will calculate the predicates first.
2832
    //  then use the result to reduce our data read(that is, expr push down). there's now row in block means the first
2833
    //  column is not in common expr. so it's safe to replace it temporarily to provide correct `selected_size`.
2834
0
    VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected size {}", block->rows(),
2835
0
                              _selected_size);
2836
2837
0
    bool need_mock_col = block->rows() != selected_size;
2838
0
    MutableColumnPtr col0;
2839
0
    if (need_mock_col) {
2840
0
        col0 = std::move(*block->get_by_position(0).column).mutate();
2841
0
        block->replace_by_position(
2842
0
                0, block->get_by_position(0).type->create_column_const_with_default_value(
2843
0
                           _selected_size));
2844
0
    }
2845
2846
0
    std::vector<VExprContext*> common_ctxs;
2847
0
    common_ctxs.reserve(_common_expr_ctxs_push_down.size());
2848
0
    for (auto& ctx : _common_expr_ctxs_push_down) {
2849
0
        common_ctxs.push_back(ctx.get());
2850
0
    }
2851
0
    _output_index_result_column(common_ctxs, _sel_rowid_idx.data(), _selected_size, block);
2852
0
    block->shrink_char_type_column_suffix_zero(_char_type_idx);
2853
0
    RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), _selected_size, block));
2854
2855
0
    if (need_mock_col) {
2856
0
        block->replace_by_position(0, std::move(col0));
2857
0
    }
2858
2859
0
    VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, selected size {}",
2860
0
                              block->rows(), _selected_size);
2861
0
    return Status::OK();
2862
0
}
2863
2864
Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size,
2865
0
                                             Block* block) {
2866
0
    SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
2867
0
    DCHECK(!_remaining_conjunct_roots.empty());
2868
0
    DCHECK(block->rows() != 0);
2869
0
    int prev_columns = block->columns();
2870
0
    uint16_t original_size = selected_size;
2871
0
    _opts.stats->expr_cond_input_rows += original_size;
2872
2873
0
    IColumn::Filter filter;
2874
0
    RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
2875
0
            _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter));
2876
2877
0
    selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter);
2878
0
    _opts.stats->rows_expr_cond_filtered += original_size - selected_size;
2879
0
    return Status::OK();
2880
0
}
2881
2882
uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
2883
                                                       uint16_t selected_size,
2884
0
                                                       const IColumn::Filter& filter) {
2885
0
    size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
2886
0
    if (count == 0) {
2887
0
        return 0;
2888
0
    } else {
2889
0
        const UInt8* filt_pos = filter.data();
2890
2891
0
        uint16_t new_size = 0;
2892
0
        uint32_t sel_pos = 0;
2893
0
        const uint32_t sel_end = selected_size;
2894
0
        static constexpr size_t SIMD_BYTES = simd::bits_mask_length();
2895
0
        const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES;
2896
2897
0
        while (sel_pos < sel_end_simd) {
2898
0
            auto mask = simd::bytes_mask_to_bits_mask(filt_pos + sel_pos);
2899
0
            if (0 == mask) {
2900
                //pass
2901
0
            } else if (simd::bits_mask_all() == mask) {
2902
0
                for (uint32_t i = 0; i < SIMD_BYTES; i++) {
2903
0
                    sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i];
2904
0
                }
2905
0
            } else {
2906
0
                simd::iterate_through_bits_mask(
2907
0
                        [&](const size_t bit_pos) {
2908
0
                            sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + bit_pos];
2909
0
                        },
2910
0
                        mask);
2911
0
            }
2912
0
            sel_pos += SIMD_BYTES;
2913
0
        }
2914
2915
0
        for (; sel_pos < sel_end; sel_pos++) {
2916
0
            if (filt_pos[sel_pos]) {
2917
0
                sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos];
2918
0
            }
2919
0
        }
2920
0
        return new_size;
2921
0
    }
2922
0
}
2923
2924
void SegmentIterator::_output_index_result_column(const std::vector<VExprContext*>& expr_ctxs,
2925
                                                  uint16_t* sel_rowid_idx, uint16_t select_size,
2926
0
                                                  Block* block) {
2927
0
    SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
2928
0
    if (block->rows() == 0) {
2929
0
        return;
2930
0
    }
2931
0
    for (auto* expr_ctx_ptr : expr_ctxs) {
2932
0
        auto index_ctx = expr_ctx_ptr->get_index_context();
2933
0
        if (index_ctx == nullptr) {
2934
0
            continue;
2935
0
        }
2936
0
        for (auto& inverted_index_result_bitmap_for_expr : index_ctx->get_index_result_bitmap()) {
2937
0
            const auto* expr = inverted_index_result_bitmap_for_expr.first;
2938
0
            const auto& result_bitmap = inverted_index_result_bitmap_for_expr.second;
2939
0
            const auto& index_result_bitmap = result_bitmap.get_data_bitmap();
2940
0
            auto index_result_column = ColumnUInt8::create();
2941
0
            ColumnUInt8::Container& vec_match_pred = index_result_column->get_data();
2942
0
            vec_match_pred.resize(block->rows());
2943
0
            std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0);
2944
2945
0
            const auto& null_bitmap = result_bitmap.get_null_bitmap();
2946
0
            bool has_null_bitmap = null_bitmap != nullptr && !null_bitmap->isEmpty();
2947
0
            bool expr_returns_nullable = expr->data_type()->is_nullable();
2948
2949
0
            ColumnUInt8::MutablePtr null_map_column = nullptr;
2950
0
            ColumnUInt8::Container* null_map_data = nullptr;
2951
0
            if (has_null_bitmap && expr_returns_nullable) {
2952
0
                null_map_column = ColumnUInt8::create();
2953
0
                auto& null_map_vec = null_map_column->get_data();
2954
0
                null_map_vec.resize(block->rows());
2955
0
                std::fill(null_map_vec.begin(), null_map_vec.end(), 0);
2956
0
                null_map_data = &null_map_column->get_data();
2957
0
            }
2958
2959
0
            roaring::BulkContext bulk_context;
2960
0
            for (uint32_t i = 0; i < select_size; i++) {
2961
0
                auto rowid = sel_rowid_idx ? _block_rowids[sel_rowid_idx[i]] : _block_rowids[i];
2962
0
                if (index_result_bitmap) {
2963
0
                    vec_match_pred[i] = index_result_bitmap->containsBulk(bulk_context, rowid);
2964
0
                }
2965
0
                if (null_map_data != nullptr && null_bitmap->contains(rowid)) {
2966
0
                    (*null_map_data)[i] = 1;
2967
0
                    vec_match_pred[i] = 0;
2968
0
                }
2969
0
            }
2970
2971
0
            DCHECK(block->rows() == vec_match_pred.size());
2972
2973
0
            if (null_map_column) {
2974
0
                index_ctx->set_index_result_column_for_expr(
2975
0
                        expr, ColumnNullable::create(std::move(index_result_column),
2976
0
                                                     std::move(null_map_column)));
2977
0
            } else {
2978
0
                index_ctx->set_index_result_column_for_expr(expr, std::move(index_result_column));
2979
0
            }
2980
0
        }
2981
0
    }
2982
0
}
2983
2984
4.19k
void SegmentIterator::_convert_dict_code_for_predicate_if_necessary() {
2985
4.19k
    for (auto predicate : _short_cir_eval_predicate) {
2986
0
        _convert_dict_code_for_predicate_if_necessary_impl(predicate);
2987
0
    }
2988
2989
4.19k
    for (auto predicate : _pre_eval_block_predicate) {
2990
0
        _convert_dict_code_for_predicate_if_necessary_impl(predicate);
2991
0
    }
2992
2993
4.19k
    for (auto column_id : _delete_range_column_ids) {
2994
4.05k
        _current_return_columns[column_id].get()->convert_dict_codes_if_necessary();
2995
4.05k
    }
2996
2997
4.19k
    for (auto column_id : _delete_bloom_filter_column_ids) {
2998
0
        _current_return_columns[column_id].get()->initialize_hash_values_for_runtime_filter();
2999
0
    }
3000
4.19k
}
3001
3002
void SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl(
3003
0
        std::shared_ptr<ColumnPredicate> predicate) {
3004
0
    auto& column = _current_return_columns[predicate->column_id()];
3005
0
    auto* col_ptr = column.get();
3006
3007
0
    if (PredicateTypeTraits::is_range(predicate->type())) {
3008
0
        col_ptr->convert_dict_codes_if_necessary();
3009
0
    } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) {
3010
0
        col_ptr->initialize_hash_values_for_runtime_filter();
3011
0
    }
3012
0
}
3013
3014
7.19k
Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* block_row_locations) {
3015
7.19k
    DCHECK(_opts.record_rowids);
3016
7.19k
    DCHECK_GE(_block_rowids.size(), _selected_size);
3017
7.19k
    block_row_locations->resize(_selected_size);
3018
7.19k
    uint32_t sid = segment_id();
3019
7.19k
    if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
3020
4.24M
        for (auto i = 0; i < _selected_size; i++) {
3021
4.23M
            (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
3022
4.23M
        }
3023
4.46k
    } else {
3024
2.46M
        for (auto i = 0; i < _selected_size; i++) {
3025
2.46M
            (*block_row_locations)[i] = RowLocation(sid, _block_rowids[_sel_rowid_idx[i]]);
3026
2.46M
        }
3027
2.73k
    }
3028
7.19k
    return Status::OK();
3029
7.19k
}
3030
3031
5.63k
Status SegmentIterator::_construct_compound_expr_context() {
3032
5.63k
    ColumnIteratorOptions iter_opts {
3033
5.63k
            .use_page_cache = _opts.use_page_cache,
3034
5.63k
            .file_reader = _file_reader.get(),
3035
5.63k
            .stats = _opts.stats,
3036
5.63k
            .io_ctx = _opts.io_ctx,
3037
5.63k
    };
3038
5.63k
    auto inverted_index_context = std::make_shared<IndexExecContext>(
3039
5.63k
            _schema->column_ids(), _index_iterators, _storage_name_and_type,
3040
5.63k
            _common_expr_index_exec_status, _score_runtime, _segment.get(), iter_opts);
3041
5.63k
    inverted_index_context->set_index_query_context(_index_query_context);
3042
5.63k
    for (const auto& expr_ctx : _opts.common_expr_ctxs_push_down) {
3043
0
        VExprContextSPtr context;
3044
        // _ann_range_search_runtime will do deep copy.
3045
0
        RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context));
3046
0
        context->set_index_context(inverted_index_context);
3047
0
        _common_expr_ctxs_push_down.emplace_back(context);
3048
0
    }
3049
    // Clone virtual column exprs before setting IndexExecContext, because
3050
    // IndexExecContext holds segment-specific index iterator references.
3051
    // Without cloning, shared VExprContext would be overwritten per-segment
3052
    // and could point to the wrong segment's context.
3053
5.63k
    for (auto& [cid, expr_ctx] : _virtual_column_exprs) {
3054
0
        VExprContextSPtr context;
3055
0
        RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context));
3056
0
        context->set_index_context(inverted_index_context);
3057
0
        expr_ctx = context;
3058
0
    }
3059
5.63k
    return Status::OK();
3060
5.63k
}
3061
3062
5.63k
void SegmentIterator::_calculate_expr_in_remaining_conjunct_root() {
3063
5.63k
    for (const auto& root_expr_ctx : _common_expr_ctxs_push_down) {
3064
0
        const auto& root_expr = root_expr_ctx->root();
3065
0
        if (root_expr == nullptr) {
3066
0
            continue;
3067
0
        }
3068
0
        _common_expr_to_slotref_map[root_expr_ctx.get()] = std::unordered_map<ColumnId, VExpr*>();
3069
3070
0
        std::stack<VExprSPtr> stack;
3071
0
        stack.emplace(root_expr);
3072
3073
0
        while (!stack.empty()) {
3074
0
            const auto& expr = stack.top();
3075
0
            stack.pop();
3076
3077
0
            for (const auto& child : expr->children()) {
3078
0
                if (child->is_virtual_slot_ref()) {
3079
                    // Expand virtual slot ref to its underlying expression tree and
3080
                    // collect real slot refs used inside. We still associate those
3081
                    // slot refs with the current parent expr node for inverted index
3082
                    // tracking, just like normal slot refs.
3083
0
                    auto* vir_slot_ref = assert_cast<VirtualSlotRef*>(child.get());
3084
0
                    auto vir_expr = vir_slot_ref->get_virtual_column_expr();
3085
0
                    if (vir_expr) {
3086
0
                        std::stack<VExprSPtr> vir_stack;
3087
0
                        vir_stack.emplace(vir_expr);
3088
3089
0
                        while (!vir_stack.empty()) {
3090
0
                            const auto& vir_node = vir_stack.top();
3091
0
                            vir_stack.pop();
3092
3093
0
                            for (const auto& vir_child : vir_node->children()) {
3094
0
                                if (vir_child->is_slot_ref()) {
3095
0
                                    auto* inner_slot_ref = assert_cast<VSlotRef*>(vir_child.get());
3096
0
                                    _common_expr_index_exec_status[_schema->column_id(
3097
0
                                            inner_slot_ref->column_id())][expr.get()] = false;
3098
0
                                    _common_expr_to_slotref_map[root_expr_ctx.get()]
3099
0
                                                               [inner_slot_ref->column_id()] =
3100
0
                                                                       expr.get();
3101
0
                                }
3102
3103
0
                                if (!vir_child->children().empty()) {
3104
0
                                    vir_stack.emplace(vir_child);
3105
0
                                }
3106
0
                            }
3107
0
                        }
3108
0
                    }
3109
0
                }
3110
                // Example: CAST(v['a'] AS VARCHAR) MATCH 'hello', do not add CAST expr to index tracking.
3111
0
                auto expr_without_cast = VExpr::expr_without_cast(child);
3112
0
                if (expr_without_cast->is_slot_ref() && expr->op() != TExprOpcode::CAST) {
3113
0
                    auto* column_slot_ref = assert_cast<VSlotRef*>(expr_without_cast.get());
3114
0
                    _common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())]
3115
0
                                                  [expr.get()] = false;
3116
0
                    _common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] =
3117
0
                            expr.get();
3118
0
                }
3119
0
            }
3120
3121
0
            const auto& children = expr->children();
3122
0
            for (int i = cast_set<int>(children.size()) - 1; i >= 0; --i) {
3123
0
                if (!children[i]->children().empty()) {
3124
0
                    stack.emplace(children[i]);
3125
0
                }
3126
0
            }
3127
0
        }
3128
0
    }
3129
5.63k
}
3130
3131
bool SegmentIterator::_no_need_read_key_data(ColumnId cid, MutableColumnPtr& column,
3132
43.4k
                                             size_t nrows_read) {
3133
43.4k
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
3134
0
        return false;
3135
0
    }
3136
3137
43.4k
    if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
3138
43.4k
           (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
3139
16.3k
            _opts.enable_unique_key_merge_on_write)))) {
3140
12.0k
        return false;
3141
12.0k
    }
3142
3143
31.3k
    if (_opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
3144
31.3k
        return false;
3145
31.3k
    }
3146
3147
0
    if (!_opts.tablet_schema->column(cid).is_key()) {
3148
0
        return false;
3149
0
    }
3150
3151
0
    if (_has_delete_predicate(cid)) {
3152
0
        return false;
3153
0
    }
3154
3155
0
    if (!_check_all_conditions_passed_inverted_index_for_column(cid)) {
3156
0
        return false;
3157
0
    }
3158
3159
0
    if (column->is_nullable()) {
3160
0
        auto* nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get());
3161
0
        nullable_col_ptr->get_null_map_column().insert_many_defaults(nrows_read);
3162
0
        nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(nrows_read);
3163
0
    } else {
3164
0
        column->insert_many_defaults(nrows_read);
3165
0
    }
3166
3167
0
    return true;
3168
0
}
3169
3170
31.3k
bool SegmentIterator::_has_delete_predicate(ColumnId cid) {
3171
31.3k
    std::set<uint32_t> delete_columns_set;
3172
31.3k
    _opts.delete_condition_predicates->get_all_column_ids(delete_columns_set);
3173
31.3k
    return delete_columns_set.contains(cid);
3174
31.3k
}
3175
3176
23.1k
bool SegmentIterator::_can_opt_topn_reads() {
3177
23.1k
    if (_opts.topn_limit <= 0) {
3178
23.1k
        return false;
3179
23.1k
    }
3180
3181
0
    if (_opts.delete_condition_predicates->num_of_column_predicate() > 0) {
3182
0
        return false;
3183
0
    }
3184
3185
0
    bool all_true = std::ranges::all_of(_schema->column_ids(), [this](auto cid) {
3186
0
        if (cid == _opts.tablet_schema->delete_sign_idx()) {
3187
0
            return true;
3188
0
        }
3189
0
        if (_check_all_conditions_passed_inverted_index_for_column(cid, true)) {
3190
0
            return true;
3191
0
        }
3192
0
        return false;
3193
0
    });
3194
3195
0
    DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
3196
0
        LOG(INFO) << "col_predicates: " << _col_predicates.size() << ", all_true: " << all_true;
3197
0
    })
3198
3199
0
    DBUG_EXECUTE_IF("segment_iterator.topn_opt_2", {
3200
0
        if (all_true) {
3201
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>("topn opt 2 execute failed");
3202
0
        }
3203
0
    })
3204
3205
0
    return all_true;
3206
0
}
3207
3208
// Before get next batch. make sure all virtual columns in block has type ColumnNothing.
3209
23.1k
void SegmentIterator::_init_virtual_columns(Block* block) {
3210
23.1k
    for (const auto& pair : _vir_cid_to_idx_in_block) {
3211
0
        auto& col_with_type_and_name = block->get_by_position(pair.second);
3212
0
        col_with_type_and_name.column = ColumnNothing::create(0);
3213
0
        col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second];
3214
0
    }
3215
23.1k
}
3216
3217
17.5k
Status SegmentIterator::_materialization_of_virtual_column(Block* block) {
3218
    // Some expr can not process empty block, such as function `element_at`.
3219
    // So materialize virtual column in advance to avoid errors.
3220
17.5k
    if (block->rows() == 0) {
3221
150
        for (const auto& pair : _vir_cid_to_idx_in_block) {
3222
0
            auto& col_with_type_and_name = block->get_by_position(pair.second);
3223
0
            col_with_type_and_name.column = _opts.vir_col_idx_to_type[pair.second]->create_column();
3224
0
            col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second];
3225
0
        }
3226
150
        return Status::OK();
3227
150
    }
3228
3229
17.3k
    for (const auto& cid_and_expr : _virtual_column_exprs) {
3230
0
        auto cid = cid_and_expr.first;
3231
0
        auto column_expr = cid_and_expr.second;
3232
0
        size_t idx_in_block = _vir_cid_to_idx_in_block[cid];
3233
0
        if (block->columns() <= idx_in_block) {
3234
0
            return Status::InternalError(
3235
0
                    "Virtual column index {} is out of range, block columns {}, "
3236
0
                    "virtual columns size {}, virtual column expr {}",
3237
0
                    idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(),
3238
0
                    column_expr->root()->debug_string());
3239
0
        } else if (block->get_by_position(idx_in_block).column.get() == nullptr) {
3240
0
            return Status::InternalError(
3241
0
                    "Virtual column index {} is null, block columns {}, virtual columns size {}, "
3242
0
                    "virtual column expr {}",
3243
0
                    idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(),
3244
0
                    column_expr->root()->debug_string());
3245
0
        }
3246
0
        block->shrink_char_type_column_suffix_zero(_char_type_idx);
3247
0
        if (check_and_get_column<const ColumnNothing>(
3248
0
                    block->get_by_position(idx_in_block).column.get())) {
3249
0
            VLOG_DEBUG << fmt::format("Virtual column is doing materialization, cid {}, col idx {}",
3250
0
                                      cid, idx_in_block);
3251
0
            ColumnPtr result_column;
3252
0
            RETURN_IF_ERROR(column_expr->execute(block, result_column));
3253
3254
0
            block->replace_by_position(idx_in_block, std::move(result_column));
3255
0
            if (block->get_by_position(idx_in_block).column->size() == 0) {
3256
0
                LOG_WARNING("Result of expr column {} is empty. cid {}, idx_in_block {}",
3257
0
                            column_expr->root()->debug_string(), cid, idx_in_block);
3258
0
            }
3259
0
        }
3260
0
    }
3261
17.3k
    return Status::OK();
3262
17.3k
}
3263
3264
5.63k
void SegmentIterator::_prepare_score_column_materialization() {
3265
5.63k
    if (_score_runtime == nullptr) {
3266
5.63k
        return;
3267
5.63k
    }
3268
3269
0
    ScoreRangeFilterPtr filter;
3270
0
    if (_score_runtime->has_score_range_filter()) {
3271
0
        const auto& range_info = _score_runtime->get_score_range_info();
3272
0
        filter = std::make_shared<ScoreRangeFilter>(range_info->op, range_info->threshold);
3273
0
    }
3274
3275
0
    IColumn::MutablePtr result_column;
3276
0
    auto result_row_ids = std::make_unique<std::vector<uint64_t>>();
3277
0
    if (_score_runtime->get_limit() > 0 && _col_predicates.empty() &&
3278
0
        _common_expr_ctxs_push_down.empty()) {
3279
0
        OrderType order_type = _score_runtime->is_asc() ? OrderType::ASC : OrderType::DESC;
3280
0
        _index_query_context->collection_similarity->get_topn_bm25_scores(
3281
0
                &_row_bitmap, result_column, result_row_ids, order_type,
3282
0
                _score_runtime->get_limit(), filter);
3283
0
    } else {
3284
0
        _index_query_context->collection_similarity->get_bm25_scores(&_row_bitmap, result_column,
3285
0
                                                                     result_row_ids, filter);
3286
0
    }
3287
0
    const size_t dst_col_idx = _score_runtime->get_dest_column_idx();
3288
0
    auto* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get();
3289
0
    auto* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter);
3290
0
    virtual_column_iter->prepare_materialization(std::move(result_column),
3291
0
                                                 std::move(result_row_ids));
3292
0
}
3293
3294
} // namespace segment_v2
3295
} // namespace doris