Coverage Report

Created: 2026-07-01 18:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/segment_iterator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "storage/segment/segment_iterator.h"
19
20
#include <assert.h>
21
#include <gen_cpp/Exprs_types.h>
22
#include <gen_cpp/Opcodes_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <algorithm>
27
#include <boost/iterator/iterator_facade.hpp>
28
#include <cstdint>
29
#include <memory>
30
#include <numeric>
31
#include <optional>
32
#include <set>
33
#include <unordered_map>
34
#include <utility>
35
#include <vector>
36
37
#include "cloud/config.h"
38
#include "common/compiler_util.h" // IWYU pragma: keep
39
#include "common/config.h"
40
#include "common/consts.h"
41
#include "common/exception.h"
42
#include "common/logging.h"
43
#include "common/metrics/doris_metrics.h"
44
#include "common/object_pool.h"
45
#include "common/status.h"
46
#include "core/assert_cast.h"
47
#include "core/block/column_with_type_and_name.h"
48
#include "core/column/column.h"
49
#include "core/column/column_const.h"
50
#include "core/column/column_nothing.h"
51
#include "core/column/column_nullable.h"
52
#include "core/column/column_string.h"
53
#include "core/column/column_variant.h"
54
#include "core/column/column_vector.h"
55
#include "core/data_type/data_type.h"
56
#include "core/data_type/data_type_factory.hpp"
57
#include "core/data_type/data_type_number.h"
58
#include "core/data_type/define_primitive_type.h"
59
#include "core/field.h"
60
#include "core/string_ref.h"
61
#include "core/typeid_cast.h"
62
#include "core/types.h"
63
#include "exprs/expr_zonemap_filter.h"
64
#include "exprs/function/array/function_array_index.h"
65
#include "exprs/runtime_filter_expr.h"
66
#include "exprs/vexpr.h"
67
#include "exprs/vexpr_context.h"
68
#include "exprs/virtual_slot_ref.h"
69
#include "exprs/vliteral.h"
70
#include "exprs/vslot_ref.h"
71
#include "io/cache/cached_remote_file_reader.h"
72
#include "io/fs/file_reader.h"
73
#include "io/io_common.h"
74
#include "runtime/descriptors.h"
75
#include "runtime/query_context.h"
76
#include "runtime/runtime_predicate.h"
77
#include "runtime/runtime_state.h"
78
#include "runtime/thread_context.h"
79
#include "storage/binlog.h"
80
#include "storage/compaction/collection_similarity.h"
81
#include "storage/id_manager.h"
82
#include "storage/index/ann/ann_index.h"
83
#include "storage/index/ann/ann_index_iterator.h"
84
#include "storage/index/ann/ann_index_reader.h"
85
#include "storage/index/ann/ann_topn_runtime.h"
86
#include "storage/index/index_file_reader.h"
87
#include "storage/index/index_iterator.h"
88
#include "storage/index/index_query_context.h"
89
#include "storage/index/index_reader_helper.h"
90
#include "storage/index/indexed_column_reader.h"
91
#include "storage/index/inverted/inverted_index_reader.h"
92
#include "storage/index/ordinal_page_index.h"
93
#include "storage/index/primary_key_index.h"
94
#include "storage/index/short_key_index.h"
95
#include "storage/index/zone_map/zone_map_index.h"
96
#include "storage/index/zone_map/zonemap_eval_context.h"
97
#include "storage/iterators.h"
98
#include "storage/olap_common.h"
99
#include "storage/predicate/bloom_filter_predicate.h"
100
#include "storage/predicate/column_predicate.h"
101
#include "storage/predicate/like_column_predicate.h"
102
#include "storage/schema.h"
103
#include "storage/segment/column_reader.h"
104
#include "storage/segment/column_reader_cache.h"
105
#include "storage/segment/condition_cache.h"
106
#include "storage/segment/row_ranges.h"
107
#include "storage/segment/segment.h"
108
#include "storage/segment/segment_prefetcher.h"
109
#include "storage/segment/variant/variant_column_reader.h"
110
#include "storage/segment/virtual_column_iterator.h"
111
#include "storage/tablet/tablet_schema.h"
112
#include "storage/types.h"
113
#include "storage/utils.h"
114
#include "util/concurrency_stats.h"
115
#include "util/defer_op.h"
116
#include "util/json/path_in_data.h"
117
#include "util/simd/bits.h"
118
119
namespace doris {
120
using namespace ErrorCode;
121
namespace segment_v2 {
122
namespace {
123
124
Status tablet_column_id_by_slot(const TabletSchemaSPtr& tablet_schema, const SlotDescriptor* slot,
125
0
                                ColumnId* cid) {
126
0
    int32_t field_index = -1;
127
0
    if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
128
0
        field_index = tablet_schema->field_index(
129
0
                PathInData(tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
130
0
                           slot->column_paths()));
131
0
    } else {
132
0
        field_index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id())
133
0
                                                 : tablet_schema->field_index(slot->col_name());
134
0
    }
135
0
    if (field_index < 0) {
136
0
        return Status::InternalError(
137
0
                "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}",
138
0
                slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id());
139
0
    }
140
0
    *cid = field_index;
141
0
    return Status::OK();
142
0
}
143
144
Status rebind_storage_expr_to_reader_schema(
145
        const StorageReadOptions& opts, const VExprSPtr& expr,
146
0
        const std::unordered_map<ColumnId, size_t>& cid_to_pos) {
147
0
    DORIS_CHECK(expr != nullptr);
148
149
0
    if (expr->is_slot_ref()) {
150
0
        auto slot_ref = std::static_pointer_cast<VSlotRef>(expr);
151
0
        auto* slot = opts.runtime_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
152
0
        if (slot == nullptr) {
153
0
            return Status::InternalError("slot {} is not found in descriptor table",
154
0
                                         slot_ref->slot_id());
155
0
        }
156
157
0
        ColumnId cid = 0;
158
0
        RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, &cid));
159
0
        auto pos_it = cid_to_pos.find(cid);
160
0
        if (pos_it == cid_to_pos.end()) {
161
0
            return Status::InternalError("slot {} column {} with cid {} is not in reader schema",
162
0
                                         slot_ref->slot_id(), slot->col_name(), cid);
163
0
        }
164
0
        slot_ref->set_column_id(cast_set<int>(pos_it->second));
165
0
    } else if (expr->is_virtual_slot_ref()) {
166
0
        auto virtual_slot_ref = std::static_pointer_cast<VirtualSlotRef>(expr);
167
0
        auto* slot =
168
0
                opts.runtime_state->desc_tbl().get_slot_descriptor(virtual_slot_ref->slot_id());
169
0
        if (slot == nullptr) {
170
0
            return Status::InternalError("slot {} is not found in descriptor table",
171
0
                                         virtual_slot_ref->slot_id());
172
0
        }
173
174
0
        ColumnId cid = 0;
175
0
        RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, &cid));
176
0
        auto pos_it = cid_to_pos.find(cid);
177
0
        if (pos_it == cid_to_pos.end()) {
178
0
            return Status::InternalError(
179
0
                    "virtual slot {} column {} with cid {} is not in reader schema",
180
0
                    virtual_slot_ref->slot_id(), slot->col_name(), cid);
181
0
        }
182
0
        virtual_slot_ref->set_column_id(cast_set<int>(pos_it->second));
183
        // A virtual slot has its own output position in the reader block, and its
184
        // materialization expression may also contain real slot refs. Rebind both
185
        // sides so evaluating the virtual expression reads from the same block
186
        // layout used by SegmentIterator.
187
0
        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(
188
0
                opts, virtual_slot_ref->get_virtual_column_expr(), cid_to_pos));
189
0
    }
190
191
0
    for (const auto& child : expr->children()) {
192
0
        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, child, cid_to_pos));
193
0
    }
194
0
    return Status::OK();
195
0
}
196
197
} // namespace
198
199
Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts, const Schema& schema,
200
                                             const VExprContextSPtrs& common_exprs,
201
3.03k
                                             std::map<ColumnId, VExprContextSPtr>& virtual_exprs) {
202
3.03k
    if (common_exprs.empty() && virtual_exprs.empty()) {
203
3.01k
        return Status::OK();
204
3.01k
    }
205
21
    DORIS_CHECK(opts.runtime_state != nullptr);
206
21
    DORIS_CHECK(opts.tablet_schema != nullptr);
207
208
21
    const auto keys_type = opts.tablet_schema->keys_type();
209
21
    if (keys_type == KeysType::DUP_KEYS ||
210
21
        (keys_type == KeysType::UNIQUE_KEYS && opts.enable_unique_key_merge_on_write)) {
211
21
        return Status::OK();
212
21
    }
213
214
    // Storage exprs are prepared with RowDescriptor, so VSlotRef/VirtualSlotRef column_id points to
215
    // the scan tuple column ordinal. SegmentIterator evaluates cloned exprs on a block built from
216
    // the reader schema instead. AGG_KEYS and non-MOW UNIQUE_KEYS readers may expand the reader
217
    // schema, for example by filling all key columns before merging/aggregating rows, so the scan
218
    // tuple ordinal is not always the same as the runtime block ordinal.
219
    //
220
    // DUP_KEYS and UNIQUE_KEYS MOW use direct readers for query scans, so their reader block keeps
221
    // the scan tuple layout and can skip this per-segment expression-tree traversal. For merge/agg
222
    // readers, the reader schema is the source of truth: map tablet column id to reader-block
223
    // position and rebind every storage expr slot to that position.
224
0
    std::unordered_map<ColumnId, size_t> cid_to_pos;
225
0
    for (size_t pos = 0; pos < schema.num_column_ids(); ++pos) {
226
0
        cid_to_pos.emplace(schema.column_id(cast_set<int>(pos)), pos);
227
0
    }
228
229
0
    for (const auto& ctx : common_exprs) {
230
0
        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, ctx->root(), cid_to_pos));
231
0
    }
232
0
    for (const auto& [_, ctx] : virtual_exprs) {
233
0
        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, ctx->root(), cid_to_pos));
234
0
    }
235
0
    return Status::OK();
236
0
}
237
238
3.04k
SegmentIterator::~SegmentIterator() = default;
239
240
3.03k
void SegmentIterator::_init_row_bitmap_by_condition_cache() {
241
    // Only dispose need column predicate and expr cal in condition cache
242
3.03k
    if (!_col_predicates.empty() || !_common_expr_ctxs_push_down.empty()) {
243
84
        if (_opts.condition_cache_digest) {
244
0
            auto* condition_cache = ConditionCache::instance();
245
0
            ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
246
0
                                               _opts.condition_cache_digest);
247
248
            // Increment search count when digest != 0
249
0
            DorisMetrics::instance()->condition_cache_search_count->increment(1);
250
251
0
            ConditionCacheHandle handle;
252
0
            _find_condition_cache = condition_cache->lookup(cache_key, &handle);
253
254
            // Increment hit count if cache lookup is successful
255
0
            if (_find_condition_cache) {
256
0
                DorisMetrics::instance()->condition_cache_hit_count->increment(1);
257
0
                if (_opts.runtime_state) {
258
0
                    VLOG_DEBUG << "Condition cache hit, query id: "
259
0
                               << print_id(_opts.runtime_state->query_id())
260
0
                               << ", segment id: " << _segment->id()
261
0
                               << ", cache digest: " << _opts.condition_cache_digest
262
0
                               << ", rowset id: " << _opts.rowset_id.to_string();
263
0
                }
264
0
            }
265
266
0
            auto num_rows = _segment->num_rows();
267
0
            if (_find_condition_cache) {
268
0
                const auto& filter_result = *(handle.get_filter_result());
269
0
                int64_t filtered_blocks = 0;
270
0
                for (int i = 0; i < filter_result.size(); i++) {
271
0
                    if (!filter_result[i]) {
272
0
                        _row_bitmap.removeRange(
273
0
                                i * CONDITION_CACHE_OFFSET,
274
0
                                i * CONDITION_CACHE_OFFSET + CONDITION_CACHE_OFFSET);
275
0
                        filtered_blocks++;
276
0
                    }
277
0
                }
278
                // Record condition_cache hit segment number
279
0
                _opts.stats->condition_cache_hit_seg_nums++;
280
                // Record rows filtered by condition cache hit
281
0
                _opts.stats->condition_cache_filtered_rows +=
282
0
                        filtered_blocks * SegmentIterator::CONDITION_CACHE_OFFSET;
283
0
            } else {
284
0
                _condition_cache = std::make_shared<std::vector<bool>>(
285
0
                        num_rows / CONDITION_CACHE_OFFSET + 1, false);
286
0
            }
287
0
        }
288
2.94k
    } else {
289
2.94k
        _opts.condition_cache_digest = 0;
290
2.94k
    }
291
3.03k
}
292
293
// A fast range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
294
// Example:
295
//   input bitmap:  [0 1 4 5 6 7 10 15 16 17 18 19]
296
//   output ranges: [0,2), [4,8), [10,11), [15,20) (when max_range_size=10)
297
//   output ranges: [0,2), [4,7), [7,8), [10,11), [15,18), [18,20) (when max_range_size=3)
298
class SegmentIterator::BitmapRangeIterator {
299
public:
300
0
    BitmapRangeIterator() = default;
301
3.03k
    virtual ~BitmapRangeIterator() = default;
302
303
3.03k
    explicit BitmapRangeIterator(const roaring::Roaring& bitmap) {
304
3.03k
        roaring_init_iterator(&bitmap.roaring, &_iter);
305
3.03k
    }
306
307
0
    bool has_more_range() const { return !_eof; }
308
309
7.03k
    [[nodiscard]] static uint32_t get_batch_size() { return kBatchSize; }
310
311
    // read next range into [*from, *to) whose size <= max_range_size.
312
    // return false when there is no more range.
313
0
    virtual bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) {
314
0
        if (_eof) {
315
0
            return false;
316
0
        }
317
318
0
        *from = _buf[_buf_pos];
319
0
        uint32_t range_size = 0;
320
0
        uint32_t expect_val = _buf[_buf_pos]; // this initial value just make first batch valid
321
322
        // if array is contiguous sequence then the following conditions need to be met :
323
        // a_0: x
324
        // a_1: x+1
325
        // a_2: x+2
326
        // ...
327
        // a_p: x+p
328
        // so we can just use (a_p-a_0)-p to check conditions
329
        // and should notice the previous batch needs to be continuous with the current batch
330
0
        while (!_eof && range_size + _buf_size - _buf_pos <= max_range_size &&
331
0
               expect_val == _buf[_buf_pos] &&
332
0
               _buf[_buf_size - 1] - _buf[_buf_pos] == _buf_size - 1 - _buf_pos) {
333
0
            range_size += _buf_size - _buf_pos;
334
0
            expect_val = _buf[_buf_size - 1] + 1;
335
0
            _read_next_batch();
336
0
        }
337
338
        // promise remain range not will reach next batch
339
0
        if (!_eof && range_size < max_range_size && expect_val == _buf[_buf_pos]) {
340
0
            do {
341
0
                _buf_pos++;
342
0
                range_size++;
343
0
            } while (range_size < max_range_size && _buf[_buf_pos] == _buf[_buf_pos - 1] + 1);
344
0
        }
345
0
        *to = *from + range_size;
346
0
        return true;
347
0
    }
348
349
    // read batch_size of rowids from roaring bitmap into buf array
350
13.3k
    virtual uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) {
351
13.3k
        return roaring::api::roaring_read_uint32_iterator(&_iter, buf, batch_size);
352
13.3k
    }
353
354
private:
355
0
    void _read_next_batch() {
356
0
        _buf_pos = 0;
357
0
        _buf_size = roaring::api::roaring_read_uint32_iterator(&_iter, _buf, kBatchSize);
358
0
        _eof = (_buf_size == 0);
359
0
    }
360
361
    static const uint32_t kBatchSize = 256;
362
    roaring::api::roaring_uint32_iterator_t _iter;
363
    uint32_t _buf[kBatchSize];
364
    uint32_t _buf_pos = 0;
365
    uint32_t _buf_size = 0;
366
    bool _eof = false;
367
};
368
369
// A backward range iterator for roaring bitmap. Output ranges use closed-open form, like [from, to).
370
// Example:
371
//   input bitmap:  [0 1 4 5 6 7 10 15 16 17 18 19]
372
//   output ranges: , [15,20), [10,11), [4,8), [0,2) (when max_range_size=10)
373
//   output ranges: [17,20), [15,17), [10,11), [5,8), [4, 5), [0,2) (when max_range_size=3)
374
class SegmentIterator::BackwardBitmapRangeIterator : public SegmentIterator::BitmapRangeIterator {
375
public:
376
0
    explicit BackwardBitmapRangeIterator(const roaring::Roaring& bitmap) {
377
0
        roaring_init_iterator_last(&bitmap.roaring, &_riter);
378
0
        _rowid_count = cast_set<uint32_t>(roaring_bitmap_get_cardinality(&bitmap.roaring));
379
0
        _rowid_left = _rowid_count;
380
0
    }
381
382
0
    bool has_more_range() const { return !_riter.has_value; }
383
384
    // read next range into [*from, *to) whose size <= max_range_size.
385
    // return false when there is no more range.
386
0
    bool next_range(const uint32_t max_range_size, uint32_t* from, uint32_t* to) override {
387
0
        if (!_riter.has_value) {
388
0
            return false;
389
0
        }
390
391
0
        uint32_t range_size = 0;
392
0
        *to = _riter.current_value + 1;
393
394
0
        do {
395
0
            *from = _riter.current_value;
396
0
            range_size++;
397
0
            roaring_previous_uint32_iterator(&_riter);
398
0
        } while (range_size < max_range_size && _riter.has_value &&
399
0
                 _riter.current_value + 1 == *from);
400
401
0
        return true;
402
0
    }
403
    /**
404
     * Reads a batch of row IDs from a roaring bitmap, starting from the end and moving backwards.
405
     * This function retrieves the last `batch_size` row IDs from the bitmap and stores them in the provided buffer.
406
     * It updates the internal state to track how many row IDs are left to read in subsequent calls.
407
     *
408
     * The row IDs are read in reverse order, but stored in the buffer maintaining their original order in the bitmap.
409
     *
410
     * Example:
411
     *   input bitmap: [0 1 4 5 6 7 10 15 16 17 18 19]
412
     *   If the bitmap has 12 elements and batch_size is set to 5, the function will first read [15, 16, 17, 18, 19]
413
     *   into the buffer, leaving 7 elements left. In the next call with batch_size 5, it will read [4, 5, 6, 7, 10].
414
     *
415
     */
416
0
    uint32_t read_batch_rowids(rowid_t* buf, uint32_t batch_size) override {
417
0
        if (!_riter.has_value || _rowid_left == 0) {
418
0
            return 0;
419
0
        }
420
421
0
        if (_rowid_count <= batch_size) {
422
0
            roaring_bitmap_to_uint32_array(_riter.parent,
423
0
                                           buf); // Fill 'buf' with '_rowid_count' elements.
424
0
            uint32_t num_read = _rowid_left;     // Save the number of row IDs read.
425
0
            _rowid_left = 0;                     // No row IDs left after this operation.
426
0
            return num_read;                     // Return the number of row IDs read.
427
0
        }
428
429
0
        uint32_t read_size = std::min(batch_size, _rowid_left);
430
0
        uint32_t num_read = 0; // Counter for the number of row IDs read.
431
432
        // Read row IDs into the buffer in reverse order.
433
0
        while (num_read < read_size && _riter.has_value) {
434
0
            buf[read_size - num_read - 1] = _riter.current_value;
435
0
            num_read++;
436
0
            _rowid_left--; // Decrement the count of remaining row IDs.
437
0
            roaring_previous_uint32_iterator(&_riter);
438
0
        }
439
440
        // Return the actual number of row IDs read.
441
0
        return num_read;
442
0
    }
443
444
private:
445
    roaring::api::roaring_uint32_iterator_t _riter;
446
    uint32_t _rowid_count;
447
    uint32_t _rowid_left;
448
};
449
450
SegmentIterator::SegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr schema)
451
3.04k
        : _segment(std::move(segment)),
452
3.04k
          _schema(schema),
453
3.04k
          _column_iterators(_schema->num_columns()),
454
3.04k
          _index_iterators(_schema->num_columns()),
455
3.04k
          _cur_rowid(0),
456
3.04k
          _lazy_materialization_read(false),
457
3.04k
          _lazy_inited(false),
458
3.04k
          _inited(false),
459
3.04k
          _pool(new ObjectPool) {}
460
461
5.92k
Status SegmentIterator::init(const StorageReadOptions& opts) {
462
5.92k
    auto status = _init_impl(opts);
463
5.92k
    if (!status.ok()) {
464
0
        _segment->update_healthy_status(status);
465
0
    }
466
5.92k
    return status;
467
5.92k
}
468
469
3.03k
std::unique_ptr<AdaptiveBlockSizePredictor> SegmentIterator::_make_block_size_predictor() const {
470
3.03k
    if (!config::enable_adaptive_batch_size || _opts.preferred_block_size_bytes == 0) {
471
0
        return nullptr;
472
0
    }
473
474
    // Collect per-column raw byte metadata from the segment footer for the columns
475
    // this iterator will actually output (defined by _schema, which is built from
476
    // _opts.return_columns).
477
3.03k
    std::vector<AdaptiveBlockSizePredictor::ColumnMetadata> col_metadata;
478
3.03k
    uint32_t seg_rows = _segment->num_rows();
479
3.03k
    uint64_t total_raw_bytes = 0;
480
3.03k
    double metadata_hint_bytes_per_row = 0.0;
481
3.03k
    if (seg_rows > 0) {
482
3.03k
        const auto& ts = _segment->tablet_schema();
483
3.03k
        if (ts) {
484
7.50k
            for (ColumnId cid : _schema->column_ids()) {
485
7.50k
                if (static_cast<size_t>(cid) < ts->num_columns()) {
486
7.37k
                    int32_t uid = ts->column(cid).unique_id();
487
7.37k
                    uint64_t raw_bytes = _segment->column_raw_data_bytes(uid);
488
7.37k
                    if (uid >= 0 && raw_bytes > 0) {
489
7.12k
                        total_raw_bytes += raw_bytes;
490
7.12k
                    }
491
7.37k
                }
492
7.50k
            }
493
3.03k
            metadata_hint_bytes_per_row = total_raw_bytes / static_cast<double>(seg_rows);
494
3.03k
        }
495
3.03k
    }
496
497
3.03k
    return std::make_unique<AdaptiveBlockSizePredictor>(
498
3.03k
            _opts.preferred_block_size_bytes, metadata_hint_bytes_per_row,
499
3.03k
            AdaptiveBlockSizePredictor::kDefaultProbeRows, _opts.block_row_max);
500
3.03k
}
501
502
5.92k
Status SegmentIterator::_init_impl(const StorageReadOptions& opts) {
503
    // get file handle from file descriptor of segment
504
5.92k
    if (_inited) {
505
2.89k
        return Status::OK();
506
2.89k
    }
507
3.03k
    _opts = opts;
508
3.03k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_timer_ns);
509
3.03k
    _inited = true;
510
3.03k
    _file_reader = _segment->_file_reader;
511
3.03k
    _col_predicates.clear();
512
513
3.03k
    for (const auto& predicate : opts.column_predicates) {
514
64
        if (!_segment->can_apply_predicate_safely(predicate->column_id(), *_schema,
515
64
                                                  _opts.target_cast_type_for_variants, _opts)) {
516
0
            continue;
517
0
        }
518
64
        _col_predicates.emplace_back(predicate);
519
64
    }
520
3.03k
    _tablet_id = opts.tablet_id;
521
    // Read options will not change, so that just resize here
522
3.03k
    _block_rowids.resize(_opts.block_row_max);
523
524
    // Adaptive batch size: snapshot the initial row limit and create predictor if enabled.
525
3.03k
    _initial_block_row_max = _opts.block_row_max;
526
3.03k
    _block_size_predictor = _make_block_size_predictor();
527
528
3.03k
    if (_schema->rowid_col_idx() > 0) {
529
0
        _record_rowids = true;
530
0
    }
531
532
3.03k
    _virtual_column_exprs = _opts.virtual_column_exprs;
533
3.03k
    _vir_cid_to_idx_in_block = _opts.vir_cid_to_idx_in_block;
534
3.03k
    _score_runtime = _opts.score_runtime;
535
3.03k
    _ann_topn_runtime = _opts.ann_topn_runtime;
536
537
3.03k
    if (opts.output_columns != nullptr) {
538
1.40k
        _output_columns = *(opts.output_columns);
539
1.40k
    }
540
541
3.03k
    _storage_name_and_type.resize(_schema->columns().size());
542
3.03k
    auto storage_format = _opts.tablet_schema->get_inverted_index_storage_format();
543
26.8k
    for (int i = 0; i < _schema->columns().size(); ++i) {
544
23.8k
        const TabletColumn* col = _schema->column(i);
545
23.8k
        if (col) {
546
7.50k
            auto storage_type = _segment->get_data_type_of(*col, _opts);
547
7.50k
            if (storage_type == nullptr) {
548
0
                storage_type =
549
0
                        DataTypeFactory::instance().create_data_type(*col, col->is_nullable());
550
0
            }
551
            // Currently, when writing a lucene index, the field of the document is column_name, and the column name is
552
            // bound to the index field. Since version 1.2, the data file storage has been changed from column_name to
553
            // column_unique_id, allowing the column name to be changed. Due to current limitations, previous inverted
554
            // index data cannot be used after Doris changes the column name. Column names also support Unicode
555
            // characters, which may cause other problems with indexing in non-ASCII characters.
556
            // After consideration, it was decided to change the field name from column_name to column_unique_id in
557
            // format V2, while format V1 continues to use column_name.
558
7.50k
            std::string field_name;
559
7.50k
            if (storage_format == InvertedIndexStorageFormatPB::V1) {
560
4.37k
                field_name = col->name();
561
4.37k
            } else {
562
3.12k
                if (col->is_extracted_column()) {
563
                    // variant sub col
564
                    // field_name format: parent_unique_id.sub_col_name
565
197
                    field_name = std::to_string(col->parent_unique_id()) + "." + col->name();
566
2.93k
                } else {
567
2.93k
                    field_name = std::to_string(col->unique_id());
568
2.93k
                }
569
3.12k
            }
570
7.50k
            _storage_name_and_type[i] = std::make_pair(field_name, storage_type);
571
7.50k
            if (int32_t uid =
572
7.50k
                        col->is_extracted_column() ? col->parent_unique_id() : col->unique_id();
573
7.50k
                !_variant_sparse_column_cache.contains(uid)) {
574
7.37k
                DCHECK(uid >= 0);
575
7.37k
                _variant_sparse_column_cache.emplace(uid,
576
7.37k
                                                     std::make_unique<PathToBinaryColumnCache>());
577
7.37k
            }
578
7.50k
        }
579
23.8k
    }
580
581
3.03k
    RETURN_IF_ERROR(init_iterators());
582
583
3.03k
    RETURN_IF_ERROR(_construct_compound_expr_context());
584
3.03k
    VLOG_DEBUG << fmt::format(
585
0
            "Segment iterator init, virtual_column_exprs size: {}, "
586
0
            "_vir_cid_to_idx_in_block size: {}, common_expr_pushdown size: {}",
587
0
            _opts.virtual_column_exprs.size(), _opts.vir_cid_to_idx_in_block.size(),
588
0
            _common_expr_ctxs_push_down.size());
589
3.03k
    _initialize_predicate_results();
590
3.03k
    return Status::OK();
591
3.03k
}
592
593
3.03k
void SegmentIterator::_initialize_predicate_results() {
594
    // Initialize from _col_predicates
595
3.03k
    for (auto pred : _col_predicates) {
596
64
        int cid = pred->column_id();
597
64
        _column_predicate_index_exec_status[cid][pred] = false;
598
64
    }
599
600
3.03k
    _calculate_common_expr_index_exec_status();
601
3.03k
}
602
603
3.03k
Status SegmentIterator::init_iterators() {
604
3.03k
    RETURN_IF_ERROR(_init_return_column_iterators());
605
3.03k
    RETURN_IF_ERROR(_init_index_iterators());
606
3.03k
    return Status::OK();
607
3.03k
}
608
609
13.3k
Status SegmentIterator::_lazy_init(Block* block) {
610
13.3k
    if (_lazy_inited) {
611
10.3k
        return Status::OK();
612
10.3k
    }
613
3.03k
    SCOPED_RAW_TIMER(&_opts.stats->block_init_ns);
614
3.03k
    DorisMetrics::instance()->segment_read_total->increment(1);
615
3.03k
    _row_bitmap.addRange(0, _segment->num_rows());
616
3.03k
    _init_row_bitmap_by_condition_cache();
617
618
    // z-order can not use prefix index
619
3.03k
    if (_segment->_tablet_schema->sort_type() != SortType::ZORDER &&
620
3.03k
        _segment->_tablet_schema->cluster_key_uids().empty()) {
621
3.02k
        RETURN_IF_ERROR(_get_row_ranges_by_keys());
622
3.02k
    }
623
3.03k
    RETURN_IF_ERROR(_get_row_ranges_by_column_conditions());
624
3.03k
    RETURN_IF_ERROR(_vec_init_lazy_materialization());
625
    // Remove rows that have been marked deleted
626
3.03k
    if (_opts.delete_bitmap.count(segment_id()) > 0 &&
627
3.03k
        _opts.delete_bitmap.at(segment_id()) != nullptr) {
628
25
        size_t pre_size = _row_bitmap.cardinality();
629
25
        _row_bitmap -= *(_opts.delete_bitmap.at(segment_id()));
630
25
        _opts.stats->rows_del_by_bitmap += (pre_size - _row_bitmap.cardinality());
631
25
        VLOG_DEBUG << "read on segment: " << segment_id() << ", delete bitmap cardinality: "
632
0
                   << _opts.delete_bitmap.at(segment_id())->cardinality() << ", "
633
0
                   << _opts.stats->rows_del_by_bitmap << " rows deleted by bitmap";
634
25
    }
635
636
3.03k
    if (!_opts.row_ranges.is_empty()) {
637
0
        _row_bitmap &= RowRanges::ranges_to_roaring(_opts.row_ranges);
638
0
    }
639
640
3.03k
    _prepare_score_column_materialization();
641
642
3.03k
    RETURN_IF_ERROR(_apply_ann_topn_predicate());
643
644
3.03k
    if (_opts.read_orderby_key_reverse) {
645
0
        _range_iter.reset(new BackwardBitmapRangeIterator(_row_bitmap));
646
3.03k
    } else {
647
3.03k
        _range_iter.reset(new BitmapRangeIterator(_row_bitmap));
648
3.03k
    }
649
650
    // Reserve columns for _initial_block_row_max (the original max before any adaptive
651
    // prediction) because the predictor may increase block_row_max on subsequent batches
652
    // up to this ceiling. Using the current (possibly reduced) _opts.block_row_max would
653
    // cause heap-buffer-overflow if a later prediction is larger.
654
3.03k
    auto nrows_reserve_limit =
655
3.03k
            std::min(_row_bitmap.cardinality(), uint64_t(_initial_block_row_max));
656
3.03k
    if (_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval) {
657
921
        _block_rowids.resize(_initial_block_row_max);
658
921
    }
659
3.03k
    _current_return_columns.resize(_schema->columns().size());
660
661
10.5k
    for (size_t i = 0; i < _schema->column_ids().size(); i++) {
662
7.50k
        ColumnId cid = _schema->column_ids()[i];
663
7.50k
        const auto* column_desc = _schema->column(cid);
664
7.50k
        if (_is_pred_column[cid]) {
665
486
            auto storage_column_type = _storage_name_and_type[cid].second;
666
486
            RETURN_IF_CATCH_EXCEPTION(
667
                    // Here, cid will not go out of bounds
668
                    // because the size of _current_return_columns equals _schema->tablet_columns().size()
669
486
                    _current_return_columns[cid] = Schema::get_predicate_column_ptr(
670
486
                            storage_column_type, _opts.io_ctx.reader_type));
671
486
            _current_return_columns[cid]->set_rowset_segment_id(
672
486
                    {_segment->rowset_id(), _segment->id()});
673
486
            _current_return_columns[cid]->reserve(nrows_reserve_limit);
674
7.01k
        } else if (i >= block->columns()) {
675
            // This column needs to be scanned, but doesn't need to be returned upward. (delete sign)
676
            // if i >= block->columns means the column and not the pred_column means `column i` is
677
            // a delete condition column. but the column is not effective in the segment. so we just
678
            // create a column to hold the data.
679
            // a. origin data -> b. delete condition -> c. new load data
680
            // the segment of c do not effective delete condition, but it still need read the column
681
            // to match the schema.
682
            // TODO: skip read the not effective delete column to speed up segment read.
683
0
            _current_return_columns[cid] = Schema::get_data_type_ptr(*column_desc)->create_column();
684
0
            _current_return_columns[cid]->reserve(nrows_reserve_limit);
685
0
        }
686
7.50k
    }
687
688
    // Additional deleted filter condition will be materialized column be at the end of the block,
689
    // after _output_column_by_sel_idx  will be erase, we not need to filter it,
690
    // so erase it from _columns_to_filter in the first next_batch.
691
    // Eg:
692
    //      `delete from table where a = 10;`
693
    //      `select b from table;`
694
    // a column only effective in segment iterator, the block from query engine only contain the b column,
695
    // so no need to filter a column by expr.
696
3.03k
    for (auto it = _columns_to_filter.begin(); it != _columns_to_filter.end();) {
697
6
        if (*it >= block->columns()) {
698
0
            it = _columns_to_filter.erase(it);
699
6
        } else {
700
6
            ++it;
701
6
        }
702
6
    }
703
704
3.03k
    _lazy_inited = true;
705
706
3.03k
    _init_segment_prefetchers();
707
708
3.03k
    return Status::OK();
709
3.03k
}
710
711
3.03k
void SegmentIterator::_init_segment_prefetchers() {
712
3.03k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_segment_prefetchers_timer_ns);
713
3.03k
    if (!config::is_cloud_mode()) {
714
3.03k
        return;
715
3.03k
    }
716
0
    static std::vector<ReaderType> supported_reader_types {
717
0
            ReaderType::READER_QUERY, ReaderType::READER_BASE_COMPACTION,
718
0
            ReaderType::READER_CUMULATIVE_COMPACTION, ReaderType::READER_FULL_COMPACTION};
719
0
    if (std::ranges::none_of(supported_reader_types,
720
0
                             [&](ReaderType t) { return _opts.io_ctx.reader_type == t; })) {
721
0
        return;
722
0
    }
723
    // Initialize segment prefetcher for predicate and non-predicate columns
724
0
    bool is_query = (_opts.io_ctx.reader_type == ReaderType::READER_QUERY);
725
0
    bool enable_prefetch = is_query ? config::enable_query_segment_file_cache_prefetch
726
0
                                    : config::enable_compaction_segment_file_cache_prefetch;
727
0
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
728
0
            "[verbose] SegmentIterator _init_segment_prefetchers, is_query={}, "
729
0
            "enable_prefetch={}, "
730
0
            "_row_bitmap.isEmpty()={}, row_bitmap.cardinality()={}, tablet={}, rowset={}, "
731
0
            "segment={}, predicate_column_ids={}, common_expr_column_ids={}",
732
0
            is_query, enable_prefetch, _row_bitmap.isEmpty(), _row_bitmap.cardinality(),
733
0
            _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(),
734
0
            fmt::join(_predicate_column_ids, ","), fmt::join(_common_expr_column_ids, ","));
735
0
    if (enable_prefetch && !_row_bitmap.isEmpty()) {
736
0
        int window_size =
737
0
                1 + (is_query ? config::query_segment_file_cache_prefetch_block_size
738
0
                              : config::compaction_segment_file_cache_prefetch_block_size);
739
0
        LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
740
0
                "[verbose] SegmentIterator prefetch config: window_size={}", window_size);
741
0
        if (window_size > 0 &&
742
0
            !_column_iterators.empty()) { // ensure init_iterators has been called
743
0
            SegmentPrefetcherConfig prefetch_config(window_size,
744
0
                                                    config::file_cache_each_block_size);
745
0
            for (auto cid : _schema->column_ids()) {
746
0
                auto& column_iter = _column_iterators[cid];
747
0
                if (column_iter == nullptr) {
748
0
                    continue;
749
0
                }
750
0
                const auto* tablet_column = _schema->column(cid);
751
0
                SegmentPrefetchParams params {
752
0
                        .config = prefetch_config,
753
0
                        .read_options = _opts,
754
0
                };
755
0
                LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
756
0
                        "[verbose] SegmentIterator init_segment_prefetchers, "
757
0
                        "tablet={}, rowset={}, segment={}, column_id={}, col_name={}, type={}",
758
0
                        _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(), cid,
759
0
                        tablet_column->name(), tablet_column->type());
760
0
                Status st = column_iter->init_prefetcher(params);
761
0
                if (!st.ok()) {
762
0
                    LOG_IF(WARNING, config::enable_segment_prefetch_verbose_log) << fmt::format(
763
0
                            "[verbose] failed to init prefetcher for column_id={}, "
764
0
                            "tablet={}, rowset={}, segment={}, error={}",
765
0
                            cid, _opts.tablet_id, _opts.rowset_id.to_string(), segment_id(),
766
0
                            st.to_string());
767
0
                }
768
0
            }
769
770
            // for compaction, it's guaranteed that all rows are read, so we can prefetch all data blocks
771
0
            PrefetcherInitMethod init_method = (is_query && _row_bitmap.cardinality() < num_rows())
772
0
                                                       ? PrefetcherInitMethod::FROM_ROWIDS
773
0
                                                       : PrefetcherInitMethod::ALL_DATA_BLOCKS;
774
0
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>> prefetchers;
775
0
            for (const auto& column_iter : _column_iterators) {
776
0
                if (column_iter != nullptr) {
777
0
                    column_iter->collect_prefetchers(prefetchers, init_method);
778
0
                }
779
0
            }
780
0
            for (auto& [method, prefetcher_vec] : prefetchers) {
781
0
                if (method == PrefetcherInitMethod::ALL_DATA_BLOCKS) {
782
0
                    for (auto* prefetcher : prefetcher_vec) {
783
0
                        prefetcher->build_all_data_blocks();
784
0
                    }
785
0
                } else if (method == PrefetcherInitMethod::FROM_ROWIDS && !prefetcher_vec.empty()) {
786
0
                    SegmentPrefetcher::build_blocks_by_rowids(_row_bitmap, prefetcher_vec);
787
0
                }
788
0
            }
789
0
        }
790
0
    }
791
0
}
792
793
3.02k
Status SegmentIterator::_get_row_ranges_by_keys() {
794
3.02k
    SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_keys_ns);
795
3.02k
    DorisMetrics::instance()->segment_row_total->increment(num_rows());
796
797
    // fast path for empty segment or empty key ranges
798
3.02k
    if (_row_bitmap.isEmpty() || _opts.key_ranges.empty()) {
799
3.02k
        return Status::OK();
800
3.02k
    }
801
802
    // Read & seek key columns is a waste of time when no key column in _schema
803
0
    if (std::none_of(_schema->columns().begin(), _schema->columns().end(),
804
0
                     [&](const TabletColumnPtr& col) {
805
0
                         return col &&
806
0
                                _opts.tablet_schema->column_by_uid(col->unique_id()).is_key();
807
0
                     })) {
808
0
        return Status::OK();
809
0
    }
810
811
0
    RowRanges result_ranges;
812
0
    for (auto& key_range : _opts.key_ranges) {
813
0
        rowid_t lower_rowid = 0;
814
0
        rowid_t upper_rowid = num_rows();
815
0
        RETURN_IF_ERROR(_prepare_seek(key_range));
816
0
        if (key_range.upper_key != nullptr) {
817
            // If client want to read upper_bound, the include_upper is true. So we
818
            // should get the first ordinal at which key is larger than upper_bound.
819
            // So we call _lookup_ordinal with include_upper's negate
820
0
            RETURN_IF_ERROR(_lookup_ordinal(*key_range.upper_key, !key_range.include_upper,
821
0
                                            num_rows(), &upper_rowid));
822
0
        }
823
0
        if (upper_rowid > 0 && key_range.lower_key != nullptr) {
824
0
            RETURN_IF_ERROR(_lookup_ordinal(*key_range.lower_key, key_range.include_lower,
825
0
                                            upper_rowid, &lower_rowid));
826
0
        }
827
0
        auto row_range = RowRanges::create_single(lower_rowid, upper_rowid);
828
0
        RowRanges::ranges_union(result_ranges, row_range, &result_ranges);
829
0
    }
830
0
    size_t pre_size = _row_bitmap.cardinality();
831
0
    _row_bitmap &= RowRanges::ranges_to_roaring(result_ranges);
832
0
    _opts.stats->rows_key_range_filtered += (pre_size - _row_bitmap.cardinality());
833
834
0
    return Status::OK();
835
0
}
836
837
// Set up environment for the following seek.
838
0
Status SegmentIterator::_prepare_seek(const StorageReadOptions::KeyRange& key_range) {
839
0
    std::vector<const TabletColumn*> key_columns;
840
0
    std::set<uint32_t> column_set;
841
0
    if (key_range.lower_key != nullptr) {
842
0
        for (auto cid : key_range.lower_key->schema()->column_ids()) {
843
0
            column_set.emplace(cid);
844
0
            key_columns.emplace_back(key_range.lower_key->column(cid));
845
0
        }
846
0
    }
847
0
    if (key_range.upper_key != nullptr) {
848
0
        for (auto cid : key_range.upper_key->schema()->column_ids()) {
849
0
            if (column_set.count(cid) == 0) {
850
0
                key_columns.emplace_back(key_range.upper_key->column(cid));
851
0
                column_set.emplace(cid);
852
0
            }
853
0
        }
854
0
    }
855
0
    if (!_seek_schema) {
856
0
        std::vector<TabletColumnPtr> cols;
857
0
        cols.reserve(key_columns.size());
858
0
        for (const TabletColumn* col : key_columns) {
859
0
            cols.emplace_back(std::make_shared<TabletColumn>(*col));
860
0
        }
861
0
        std::vector<uint32_t> column_ids(cols.size());
862
0
        std::iota(column_ids.begin(), column_ids.end(), 0);
863
0
        _seek_schema = std::make_unique<Schema>(cols, column_ids);
864
0
    }
865
    // todo(wb) need refactor here, when using pk to search, _seek_block is useless
866
0
    if (_seek_block.size() == 0) {
867
0
        _seek_block.resize(_seek_schema->num_column_ids());
868
0
        int i = 0;
869
0
        for (auto cid : _seek_schema->column_ids()) {
870
0
            auto column_desc = _seek_schema->column(cid);
871
0
            _seek_block[i] = Schema::get_data_type_ptr(*column_desc)->create_column();
872
0
            i++;
873
0
        }
874
0
    }
875
876
    // create used column iterator
877
0
    for (auto cid : _seek_schema->column_ids()) {
878
0
        if (_column_iterators[cid] == nullptr) {
879
            // TODO: Do we need this?
880
0
            if (_virtual_column_exprs.contains(cid)) {
881
0
                _column_iterators[cid] = std::make_unique<VirtualColumnIterator>();
882
0
                continue;
883
0
            }
884
885
0
            RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
886
0
                                                          &_column_iterators[cid], &_opts,
887
0
                                                          &_variant_sparse_column_cache));
888
0
            ColumnIteratorOptions iter_opts {
889
0
                    .use_page_cache = _opts.use_page_cache,
890
0
                    .file_reader = _file_reader.get(),
891
0
                    .stats = _opts.stats,
892
0
                    .io_ctx = _opts.io_ctx,
893
0
            };
894
0
            RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts));
895
0
        }
896
0
    }
897
898
0
    return Status::OK();
899
0
}
900
901
3.03k
Status SegmentIterator::_get_row_ranges_by_column_conditions() {
902
3.03k
    SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_column_conditions_ns);
903
3.03k
    if (_row_bitmap.isEmpty()) {
904
0
        return Status::OK();
905
0
    }
906
907
3.03k
    {
908
3.03k
        if (_opts.runtime_state &&
909
3.03k
            _opts.runtime_state->query_options().enable_inverted_index_query &&
910
3.03k
            (has_index_in_iterators() || !_common_expr_ctxs_push_down.empty())) {
911
67
            SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
912
67
            size_t input_rows = _row_bitmap.cardinality();
913
            // Only apply column-level inverted index if we have iterators
914
67
            if (has_index_in_iterators()) {
915
63
                RETURN_IF_ERROR(_apply_inverted_index());
916
63
            }
917
            // Always apply expr-level index (e.g., search expressions) if we have common_expr_pushdown
918
            // This allows search expressions with variant subcolumns to be evaluated even when
919
            // the segment doesn't have all subcolumns
920
67
            RETURN_IF_ERROR(_apply_index_expr());
921
67
            for (auto it = _common_expr_ctxs_push_down.begin();
922
85
                 it != _common_expr_ctxs_push_down.end();) {
923
18
                if ((*it)->all_expr_inverted_index_evaluated()) {
924
14
                    const auto* result = (*it)->get_index_context()->get_index_result_for_expr(
925
14
                            (*it)->root().get());
926
14
                    if (result != nullptr) {
927
14
                        _row_bitmap &= *result->get_data_bitmap();
928
14
                        it = _common_expr_ctxs_push_down.erase(it);
929
14
                    }
930
14
                } else {
931
4
                    ++it;
932
4
                }
933
18
            }
934
67
            _opts.condition_cache_digest =
935
67
                    _common_expr_ctxs_push_down.empty() ? 0 : _opts.condition_cache_digest;
936
67
            _opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality());
937
136
            for (auto cid : _schema->column_ids()) {
938
136
                bool result_true = _check_all_conditions_passed_inverted_index_for_column(cid);
939
136
                if (result_true) {
940
59
                    _need_read_data_indices[cid] = false;
941
59
                }
942
136
            }
943
67
        }
944
3.03k
    }
945
946
3.03k
    DBUG_EXECUTE_IF("segment_iterator.inverted_index.filtered_rows", {
947
3.03k
        LOG(INFO) << "Debug Point: segment_iterator.inverted_index.filtered_rows: "
948
3.03k
                  << _opts.stats->rows_inverted_index_filtered;
949
3.03k
        auto filtered_rows = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
950
3.03k
                "segment_iterator.inverted_index.filtered_rows", "filtered_rows", -1);
951
3.03k
        if (filtered_rows != _opts.stats->rows_inverted_index_filtered) {
952
3.03k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
953
3.03k
                    "filtered_rows: {} not equal to expected: {}",
954
3.03k
                    _opts.stats->rows_inverted_index_filtered, filtered_rows);
955
3.03k
        }
956
3.03k
    })
957
958
3.03k
    DBUG_EXECUTE_IF("segment_iterator.apply_inverted_index", {
959
3.03k
        LOG(INFO) << "Debug Point: segment_iterator.apply_inverted_index";
960
3.03k
        if (!_common_expr_ctxs_push_down.empty() || !_col_predicates.empty()) {
961
3.03k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
962
3.03k
                    "it is failed to apply inverted index, common_expr_ctxs_push_down: {}, "
963
3.03k
                    "col_predicates: {}",
964
3.03k
                    _common_expr_ctxs_push_down.size(), _col_predicates.size());
965
3.03k
        }
966
3.03k
    })
967
968
3.03k
    if (!_row_bitmap.isEmpty() &&
969
3.03k
        (!_opts.topn_filter_source_node_ids.empty() || !_opts.col_id_to_predicates.empty() ||
970
3.02k
         _opts.delete_condition_predicates->num_of_column_predicate() > 0 ||
971
3.02k
         !_common_expr_ctxs_push_down.empty())) {
972
537
        RowRanges condition_row_ranges = RowRanges::create_single(_segment->num_rows());
973
537
        RETURN_IF_ERROR(_get_row_ranges_from_conditions(&condition_row_ranges));
974
537
        size_t pre_size = _row_bitmap.cardinality();
975
537
        _row_bitmap &= RowRanges::ranges_to_roaring(condition_row_ranges);
976
537
        _opts.stats->rows_conditions_filtered += (pre_size - _row_bitmap.cardinality());
977
537
    }
978
979
3.03k
    DBUG_EXECUTE_IF("bloom_filter_must_filter_data", {
980
3.03k
        if (_opts.stats->rows_bf_filtered == 0) {
981
3.03k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
982
3.03k
                    "Bloom filter did not filter the data.");
983
3.03k
        }
984
3.03k
    })
985
986
    // TODO(hkp): calculate filter rate to decide whether to
987
    // use zone map/bloom filter/secondary index or not.
988
3.03k
    return Status::OK();
989
3.03k
}
990
991
0
bool SegmentIterator::_column_has_ann_index(int32_t cid) {
992
0
    bool has_ann_index = _index_iterators[cid] != nullptr &&
993
0
                         _index_iterators[cid]->get_reader(AnnIndexReaderType::ANN);
994
995
0
    return has_ann_index;
996
0
}
997
998
3.03k
Status SegmentIterator::_apply_ann_topn_predicate() {
999
3.03k
    if (_ann_topn_runtime == nullptr) {
1000
3.03k
        return Status::OK();
1001
3.03k
    }
1002
1003
0
    VLOG_DEBUG << fmt::format("Try apply ann topn: {}", _ann_topn_runtime->debug_string());
1004
0
    size_t src_col_idx = _ann_topn_runtime->get_src_column_idx();
1005
0
    ColumnId src_cid = _schema->column_id(src_col_idx);
1006
0
    IndexIterator* ann_index_iterator = _index_iterators[src_cid].get();
1007
0
    bool has_ann_index = _column_has_ann_index(src_cid);
1008
0
    bool has_common_expr_push_down = !_common_expr_ctxs_push_down.empty();
1009
0
    bool has_column_predicate = std::any_of(_is_pred_column.begin(), _is_pred_column.end(),
1010
0
                                            [](bool is_pred) { return is_pred; });
1011
0
    if (!has_ann_index || has_common_expr_push_down || has_column_predicate) {
1012
0
        VLOG_DEBUG << fmt::format(
1013
0
                "Ann topn can not be evaluated by ann index, has_ann_index: {}, "
1014
0
                "has_common_expr_push_down: {}, has_column_predicate: {}",
1015
0
                has_ann_index, has_common_expr_push_down, has_column_predicate);
1016
        // Disable index-only scan on ann indexed column.
1017
0
        _need_read_data_indices[src_cid] = true;
1018
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
1019
0
        return Status::OK();
1020
0
    }
1021
1022
    // Process asc & desc according to the type of metric
1023
0
    auto index_reader = ann_index_iterator->get_reader(AnnIndexReaderType::ANN);
1024
0
    auto ann_index_reader = dynamic_cast<AnnIndexReader*>(index_reader.get());
1025
0
    DCHECK(ann_index_reader != nullptr);
1026
0
    if (ann_index_reader->get_metric_type() == AnnIndexMetric::IP) {
1027
0
        if (_ann_topn_runtime->is_asc()) {
1028
0
            VLOG_DEBUG << fmt::format(
1029
0
                    "Asc topn for inner product can not be evaluated by ann index");
1030
            // Disable index-only scan on ann indexed column.
1031
0
            _need_read_data_indices[src_cid] = true;
1032
0
            _opts.stats->ann_fall_back_brute_force_cnt += 1;
1033
0
            return Status::OK();
1034
0
        }
1035
0
    } else {
1036
0
        if (!_ann_topn_runtime->is_asc()) {
1037
0
            VLOG_DEBUG << fmt::format("Desc topn for l2/cosine can not be evaluated by ann index");
1038
            // Disable index-only scan on ann indexed column.
1039
0
            _need_read_data_indices[src_cid] = true;
1040
0
            _opts.stats->ann_fall_back_brute_force_cnt += 1;
1041
0
            return Status::OK();
1042
0
        }
1043
0
    }
1044
1045
0
    if (ann_index_reader->get_metric_type() != _ann_topn_runtime->get_metric_type()) {
1046
0
        VLOG_DEBUG << fmt::format(
1047
0
                "Ann topn metric type {} not match index metric type {}, can not be evaluated "
1048
0
                "by "
1049
0
                "ann index",
1050
0
                metric_to_string(_ann_topn_runtime->get_metric_type()),
1051
0
                metric_to_string(ann_index_reader->get_metric_type()));
1052
        // Disable index-only scan on ann indexed column.
1053
0
        _need_read_data_indices[src_cid] = true;
1054
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
1055
0
        return Status::OK();
1056
0
    }
1057
1058
0
    size_t pre_size = _row_bitmap.cardinality();
1059
0
    size_t rows_of_segment = _segment->num_rows();
1060
0
    const auto& user_params = _ann_topn_runtime->user_params();
1061
0
    if (user_params.should_fallback_ann_index_by_small_candidate(pre_size, rows_of_segment)) {
1062
0
        VLOG_DEBUG << fmt::format(
1063
0
                "Ann topn predicate input rows {} reach small candidate threshold, "
1064
0
                "rows_of_segment: {}, absolute_threshold: {}, percent_threshold: {}, "
1065
0
                "will not use ann index to filter",
1066
0
                pre_size, rows_of_segment, user_params.ann_index_candidate_rows_threshold,
1067
0
                user_params.ann_index_candidate_rows_percent_threshold);
1068
        // Disable index-only scan on ann indexed column.
1069
0
        _need_read_data_indices[src_cid] = true;
1070
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
1071
0
        _opts.stats->ann_topn_fallback_by_small_candidate_cnt += 1;
1072
0
        _opts.stats->ann_topn_fallback_small_candidate_rows += pre_size;
1073
0
        return Status::OK();
1074
0
    }
1075
0
    IColumn::MutablePtr result_column;
1076
0
    std::shared_ptr<std::vector<uint64_t>> result_row_ids;
1077
0
    segment_v2::AnnIndexStats ann_index_stats;
1078
1079
    // Try to load ANN index before search
1080
0
    auto ann_index_iterator_casted =
1081
0
            dynamic_cast<segment_v2::AnnIndexIterator*>(ann_index_iterator);
1082
0
    if (ann_index_iterator_casted == nullptr) {
1083
0
        VLOG_DEBUG << "Failed to cast index iterator to AnnIndexIterator, fallback to brute force";
1084
0
        _need_read_data_indices[src_cid] = true;
1085
0
        _opts.stats->ann_fall_back_brute_force_cnt += 1;
1086
0
        return Status::OK();
1087
0
    }
1088
1089
    // Track load index timing
1090
0
    {
1091
0
        SCOPED_TIMER(&(ann_index_stats.load_index_costs_ns));
1092
0
        if (!ann_index_iterator_casted->try_load_index()) {
1093
0
            VLOG_DEBUG << "Failed to load ANN index, fallback to brute force search";
1094
0
            _need_read_data_indices[src_cid] = true;
1095
0
            _opts.stats->ann_fall_back_brute_force_cnt += 1;
1096
0
            return Status::OK();
1097
0
        }
1098
0
        double load_costs_ms =
1099
0
                static_cast<double>(ann_index_stats.load_index_costs_ns.value()) / 1000000.0;
1100
0
        DorisMetrics::instance()->ann_index_load_costs_ms->increment(
1101
0
                static_cast<int64_t>(load_costs_ms));
1102
0
    }
1103
1104
0
    bool enable_ann_index_result_cache =
1105
0
            !_opts.runtime_state ||
1106
0
            !_opts.runtime_state->query_options().__isset.enable_ann_index_result_cache ||
1107
0
            _opts.runtime_state->query_options().enable_ann_index_result_cache;
1108
0
    RETURN_IF_ERROR(_ann_topn_runtime->evaluate_vector_ann_search(
1109
0
            ann_index_iterator_casted, &_row_bitmap, rows_of_segment, enable_ann_index_result_cache,
1110
0
            result_column, result_row_ids, ann_index_stats));
1111
1112
0
    VLOG_DEBUG << fmt::format("Ann topn filtered {} - {} = {} rows", pre_size,
1113
0
                              _row_bitmap.cardinality(), pre_size - _row_bitmap.cardinality());
1114
1115
0
    int64_t rows_filterd = pre_size - _row_bitmap.cardinality();
1116
0
    _opts.stats->rows_ann_index_topn_filtered += rows_filterd;
1117
0
    _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value();
1118
0
    _opts.stats->ann_topn_search_ns += ann_index_stats.search_costs_ns.value();
1119
0
    _opts.stats->ann_ivf_on_disk_load_ns += ann_index_stats.ivf_on_disk_load_costs_ns.value();
1120
0
    _opts.stats->ann_ivf_on_disk_cache_hit_cnt += ann_index_stats.ivf_on_disk_cache_hit_cnt.value();
1121
0
    _opts.stats->ann_ivf_on_disk_cache_miss_cnt +=
1122
0
            ann_index_stats.ivf_on_disk_cache_miss_cnt.value();
1123
0
    _opts.stats->ann_index_topn_engine_search_ns += ann_index_stats.engine_search_ns.value();
1124
0
    _opts.stats->ann_index_topn_result_process_ns +=
1125
0
            ann_index_stats.result_process_costs_ns.value();
1126
0
    _opts.stats->ann_index_topn_engine_convert_ns += ann_index_stats.engine_convert_ns.value();
1127
0
    _opts.stats->ann_index_topn_engine_prepare_ns += ann_index_stats.engine_prepare_ns.value();
1128
0
    _opts.stats->ann_index_topn_search_cnt += 1;
1129
0
    _opts.stats->ann_index_cache_hits += ann_index_stats.topn_cache_hits.value();
1130
0
    const size_t dst_col_idx = _ann_topn_runtime->get_dest_column_idx();
1131
0
    ColumnIterator* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get();
1132
0
    DCHECK(column_iter != nullptr);
1133
0
    VirtualColumnIterator* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter);
1134
0
    DCHECK(virtual_column_iter != nullptr);
1135
0
    VLOG_DEBUG << fmt::format(
1136
0
            "Virtual column iterator, column_idx {}, is materialized with {} rows", dst_col_idx,
1137
0
            result_row_ids->size());
1138
    // reference count of result_column should be 1, so move will not issue any data copy.
1139
0
    virtual_column_iter->prepare_materialization(std::move(result_column), result_row_ids);
1140
1141
0
    _need_read_data_indices[src_cid] = false;
1142
0
    VLOG_DEBUG << fmt::format(
1143
0
            "Enable ANN index-only scan for src column cid {} (skip reading data pages)", src_cid);
1144
1145
0
    return Status::OK();
1146
0
}
1147
1148
537
Status SegmentIterator::_get_row_ranges_from_conditions(RowRanges* condition_row_ranges) {
1149
537
    std::set<int32_t> cids;
1150
537
    for (auto& entry : _opts.col_id_to_predicates) {
1151
64
        cids.insert(entry.first);
1152
64
    }
1153
1154
537
    {
1155
537
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_dict_ns);
1156
        /// Low cardinality optimization is currently not very stable, so to prevent data corruption,
1157
        /// we are temporarily disabling its use in data compaction.
1158
        // TODO: enable it in not only ReaderTyper::READER_QUERY but also other reader types.
1159
537
        if (_opts.io_ctx.reader_type == ReaderType::READER_QUERY) {
1160
70
            RowRanges dict_row_ranges = RowRanges::create_single(num_rows());
1161
70
            for (auto cid : cids) {
1162
64
                if (!_segment->can_apply_predicate_safely(
1163
64
                            cid, *_schema, _opts.target_cast_type_for_variants, _opts)) {
1164
0
                    continue;
1165
0
                }
1166
64
                DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
1167
64
                RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_dict(
1168
64
                        _opts.col_id_to_predicates.at(cid).get(), &dict_row_ranges));
1169
1170
64
                if (dict_row_ranges.is_empty()) {
1171
0
                    break;
1172
0
                }
1173
64
            }
1174
1175
70
            if (dict_row_ranges.is_empty()) {
1176
0
                RowRanges::ranges_intersection(*condition_row_ranges, dict_row_ranges,
1177
0
                                               condition_row_ranges);
1178
0
                _opts.stats->segment_dict_filtered++;
1179
0
                _opts.stats->filtered_segment_number++;
1180
0
                return Status::OK();
1181
0
            }
1182
70
        }
1183
537
    }
1184
1185
537
    size_t pre_size = 0;
1186
537
    {
1187
537
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_bf_ns);
1188
        // first filter data by bloom filter index
1189
        // bloom filter index only use CondColumn
1190
537
        RowRanges bf_row_ranges = RowRanges::create_single(num_rows());
1191
537
        for (auto& cid : cids) {
1192
64
            DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
1193
64
            if (!_segment->can_apply_predicate_safely(cid, *_schema,
1194
64
                                                      _opts.target_cast_type_for_variants, _opts)) {
1195
0
                continue;
1196
0
            }
1197
            // get row ranges by bf index of this column,
1198
64
            RowRanges column_bf_row_ranges = RowRanges::create_single(num_rows());
1199
64
            RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_bloom_filter(
1200
64
                    _opts.col_id_to_predicates.at(cid).get(), &column_bf_row_ranges));
1201
64
            RowRanges::ranges_intersection(bf_row_ranges, column_bf_row_ranges, &bf_row_ranges);
1202
64
        }
1203
1204
537
        pre_size = condition_row_ranges->count();
1205
537
        RowRanges::ranges_intersection(*condition_row_ranges, bf_row_ranges, condition_row_ranges);
1206
537
        _opts.stats->rows_bf_filtered += (pre_size - condition_row_ranges->count());
1207
537
    }
1208
1209
0
    {
1210
537
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_zonemap_ns);
1211
537
        RowRanges zone_map_row_ranges = RowRanges::create_single(num_rows());
1212
        // second filter data by zone map
1213
537
        for (const auto& cid : cids) {
1214
64
            DCHECK(_opts.col_id_to_predicates.count(cid) > 0);
1215
64
            if (!_segment->can_apply_predicate_safely(cid, *_schema,
1216
64
                                                      _opts.target_cast_type_for_variants, _opts)) {
1217
0
                continue;
1218
0
            }
1219
64
            if (_segment->is_tso_placeholder_col(cid, *_schema, _opts)) {
1220
                // skip untrustworthy tso placeholder zonemap
1221
                // if possible already be pruned as a whole before,
1222
                // so just skip
1223
0
                continue;
1224
0
            }
1225
            // do not check zonemap if predicate does not support zonemap
1226
64
            if (!_opts.col_id_to_predicates.at(cid)->support_zonemap()) {
1227
0
                VLOG_DEBUG << "skip zonemap for column " << cid;
1228
0
                continue;
1229
0
            }
1230
            // get row ranges by zone map of this column,
1231
64
            RowRanges column_row_ranges = RowRanges::create_single(num_rows());
1232
64
            RETURN_IF_ERROR(_column_iterators[cid]->get_row_ranges_by_zone_map(
1233
64
                    _opts.col_id_to_predicates.at(cid).get(),
1234
64
                    _opts.del_predicates_for_zone_map.count(cid) > 0
1235
64
                            ? &(_opts.del_predicates_for_zone_map.at(cid))
1236
64
                            : nullptr,
1237
64
                    &column_row_ranges));
1238
            // intersect different columns's row ranges to get final row ranges by zone map
1239
64
            RowRanges::ranges_intersection(zone_map_row_ranges, column_row_ranges,
1240
64
                                           &zone_map_row_ranges);
1241
64
        }
1242
1243
537
        pre_size = condition_row_ranges->count();
1244
537
        RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
1245
537
                                       condition_row_ranges);
1246
1247
537
        size_t pre_size2 = condition_row_ranges->count();
1248
537
        RowRanges::ranges_intersection(*condition_row_ranges, zone_map_row_ranges,
1249
537
                                       condition_row_ranges);
1250
537
        _opts.stats->rows_stats_rp_filtered += (pre_size2 - condition_row_ranges->count());
1251
537
        _opts.stats->rows_stats_filtered += (pre_size - condition_row_ranges->count());
1252
537
    }
1253
1254
0
    {
1255
537
        SCOPED_RAW_TIMER(&_opts.stats->generate_row_ranges_by_zonemap_ns);
1256
537
        if (!_common_expr_ctxs_push_down.empty()) {
1257
6
            const auto pre_expr_zonemap_size = condition_row_ranges->count();
1258
6
            RETURN_IF_ERROR(_apply_expr_zonemap_to_row_ranges(_common_expr_ctxs_push_down, 0,
1259
6
                                                              condition_row_ranges));
1260
6
            _opts.stats->rows_stats_filtered +=
1261
6
                    (pre_expr_zonemap_size - condition_row_ranges->count());
1262
6
        }
1263
537
    }
1264
1265
537
    return Status::OK();
1266
537
}
1267
1268
0
bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) {
1269
0
    switch (node_type) {
1270
0
    case TExprNodeType::BOOL_LITERAL:
1271
0
    case TExprNodeType::INT_LITERAL:
1272
0
    case TExprNodeType::LARGE_INT_LITERAL:
1273
0
    case TExprNodeType::FLOAT_LITERAL:
1274
0
    case TExprNodeType::DECIMAL_LITERAL:
1275
0
    case TExprNodeType::STRING_LITERAL:
1276
0
    case TExprNodeType::DATE_LITERAL:
1277
0
    case TExprNodeType::TIMEV2_LITERAL:
1278
0
        return true;
1279
0
    default:
1280
0
        return false;
1281
0
    }
1282
0
}
1283
1284
14
Status SegmentIterator::_extract_common_expr_columns(const VExprSPtr& expr) {
1285
14
    auto& children = expr->children();
1286
22
    for (int i = 0; i < children.size(); ++i) {
1287
8
        RETURN_IF_ERROR(_extract_common_expr_columns(children[i]));
1288
8
    }
1289
1290
14
    auto node_type = expr->node_type();
1291
14
    if (node_type == TExprNodeType::SLOT_REF) {
1292
6
        auto slot_expr = std::dynamic_pointer_cast<doris::VSlotRef>(expr);
1293
6
        _is_common_expr_column[_schema->column_id(slot_expr->column_id())] = true;
1294
6
        _common_expr_columns.insert(_schema->column_id(slot_expr->column_id()));
1295
8
    } else if (node_type == TExprNodeType::VIRTUAL_SLOT_REF) {
1296
0
        std::shared_ptr<VirtualSlotRef> virtual_slot_ref =
1297
0
                std::dynamic_pointer_cast<VirtualSlotRef>(expr);
1298
0
        RETURN_IF_ERROR(_extract_common_expr_columns(virtual_slot_ref->get_virtual_column_expr()));
1299
0
    }
1300
1301
14
    return Status::OK();
1302
14
}
1303
1304
45
bool SegmentIterator::_check_apply_by_inverted_index(std::shared_ptr<ColumnPredicate> pred) {
1305
45
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) {
1306
0
        return false;
1307
0
    }
1308
45
    auto pred_column_id = pred->column_id();
1309
45
    if (_index_iterators[pred_column_id] == nullptr) {
1310
        //this column without inverted index
1311
0
        return false;
1312
0
    }
1313
1314
45
    if (_inverted_index_not_support_pred_type(pred->type())) {
1315
0
        return false;
1316
0
    }
1317
1318
45
    if (pred->type() == PredicateType::IN_LIST || pred->type() == PredicateType::NOT_IN_LIST) {
1319
        // in_list or not_in_list predicate produced by runtime filter
1320
0
        if (pred->is_runtime_filter()) {
1321
0
            return false;
1322
0
        }
1323
0
    }
1324
1325
    // UNTOKENIZED strings exceed ignore_above, they are written as null, causing range query errors
1326
45
    if (PredicateTypeTraits::is_range(pred->type()) &&
1327
45
        !IndexReaderHelper::has_bkd_index(_index_iterators[pred_column_id].get())) {
1328
0
        return false;
1329
0
    }
1330
1331
    // Function filter no apply inverted index
1332
45
    if (dynamic_cast<LikeColumnPredicate*>(pred.get()) != nullptr) {
1333
0
        return false;
1334
0
    }
1335
1336
45
    bool handle_by_fulltext = _column_has_fulltext_index(pred_column_id);
1337
45
    if (handle_by_fulltext) {
1338
        // when predicate is leafNode of andNode,
1339
        // can apply 'match query' and 'equal query' and 'list query' for fulltext index.
1340
0
        return pred->type() == PredicateType::MATCH || pred->type() == PredicateType::IS_NULL ||
1341
0
               pred->type() == PredicateType::IS_NOT_NULL ||
1342
0
               PredicateTypeTraits::is_equal_or_list(pred->type());
1343
0
    }
1344
1345
45
    return true;
1346
45
}
1347
1348
// TODO: optimization when all expr can not evaluate by inverted/ann index,
1349
67
Status SegmentIterator::_apply_index_expr() {
1350
67
    bool enable_ann_index_result_cache =
1351
67
            !_opts.runtime_state ||
1352
67
            !_opts.runtime_state->query_options().__isset.enable_ann_index_result_cache ||
1353
67
            _opts.runtime_state->query_options().enable_ann_index_result_cache;
1354
1355
67
    for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
1356
18
        if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) {
1357
0
            if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) {
1358
0
                continue;
1359
0
            } else {
1360
                // other code is not to be handled, we should just break
1361
0
                LOG(WARNING) << "failed to evaluate inverted index for expr_ctx: "
1362
0
                             << expr_ctx->root()->debug_string()
1363
0
                             << ", error msg: " << st.to_string();
1364
0
                return st;
1365
0
            }
1366
0
        }
1367
18
    }
1368
1369
    // Evaluate inverted index for virtual column MATCH expressions (projections).
1370
    // Unlike common exprs which filter rows, these only compute index result bitmaps
1371
    // for later materialization via fast_execute().
1372
67
    for (auto& [cid, expr_ctx] : _virtual_column_exprs) {
1373
0
        if (expr_ctx->get_index_context() == nullptr) {
1374
0
            continue;
1375
0
        }
1376
0
        if (Status st = expr_ctx->evaluate_inverted_index(num_rows()); !st.ok()) {
1377
0
            if (_downgrade_without_index(st) || st.code() == ErrorCode::NOT_IMPLEMENTED_ERROR) {
1378
0
                continue;
1379
0
            } else {
1380
0
                LOG(WARNING) << "failed to evaluate inverted index for virtual column expr: "
1381
0
                             << expr_ctx->root()->debug_string()
1382
0
                             << ", error msg: " << st.to_string();
1383
0
                return st;
1384
0
            }
1385
0
        }
1386
0
    }
1387
1388
    // Apply ann range search
1389
67
    for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
1390
18
        segment_v2::AnnIndexStats ann_index_stats;
1391
18
        size_t origin_rows = _row_bitmap.cardinality();
1392
18
        bool ann_range_search_executed = false;
1393
18
        RETURN_IF_ERROR(expr_ctx->evaluate_ann_range_search(
1394
18
                _index_iterators, _schema->column_ids(), _column_iterators,
1395
18
                _common_expr_to_slotref_map, num_rows(), _row_bitmap, ann_index_stats,
1396
18
                enable_ann_index_result_cache, &ann_range_search_executed));
1397
18
        if (ann_range_search_executed) {
1398
0
            _opts.stats->ann_index_range_search_cnt++;
1399
0
        }
1400
18
        _opts.stats->rows_ann_index_range_filtered += (origin_rows - _row_bitmap.cardinality());
1401
18
        _opts.stats->ann_index_load_ns += ann_index_stats.load_index_costs_ns.value();
1402
18
        _opts.stats->ann_index_range_search_ns += ann_index_stats.search_costs_ns.value();
1403
18
        _opts.stats->ann_ivf_on_disk_load_ns += ann_index_stats.ivf_on_disk_load_costs_ns.value();
1404
18
        _opts.stats->ann_ivf_on_disk_cache_hit_cnt +=
1405
18
                ann_index_stats.ivf_on_disk_cache_hit_cnt.value();
1406
18
        _opts.stats->ann_ivf_on_disk_cache_miss_cnt +=
1407
18
                ann_index_stats.ivf_on_disk_cache_miss_cnt.value();
1408
18
        _opts.stats->ann_range_engine_search_ns += ann_index_stats.engine_search_ns.value();
1409
18
        _opts.stats->ann_range_result_convert_ns += ann_index_stats.result_process_costs_ns.value();
1410
18
        _opts.stats->ann_range_engine_convert_ns += ann_index_stats.engine_convert_ns.value();
1411
18
        _opts.stats->ann_range_pre_process_ns += ann_index_stats.engine_prepare_ns.value();
1412
18
        _opts.stats->ann_fall_back_brute_force_cnt += ann_index_stats.fall_back_brute_force_cnt;
1413
18
        _opts.stats->ann_range_fallback_by_small_candidate_cnt +=
1414
18
                ann_index_stats.range_fallback_by_small_candidate_cnt;
1415
18
        _opts.stats->ann_range_fallback_small_candidate_rows +=
1416
18
                ann_index_stats.range_fallback_small_candidate_rows;
1417
18
        _opts.stats->ann_index_range_cache_hits += ann_index_stats.range_cache_hits.value();
1418
18
    }
1419
1420
67
    return Status::OK();
1421
67
}
1422
1423
0
bool SegmentIterator::_downgrade_without_index(Status res, bool need_remaining) {
1424
0
    bool is_fallback =
1425
0
            _opts.runtime_state->query_options().enable_fallback_on_missing_inverted_index;
1426
0
    if ((res.code() == ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND && is_fallback) ||
1427
0
        res.code() == ErrorCode::INVERTED_INDEX_BYPASS ||
1428
0
        res.code() == ErrorCode::INVERTED_INDEX_EVALUATE_SKIPPED ||
1429
0
        (res.code() == ErrorCode::INVERTED_INDEX_NO_TERMS && need_remaining) ||
1430
0
        res.code() == ErrorCode::INVERTED_INDEX_FILE_CORRUPTED) {
1431
        // 1. INVERTED_INDEX_FILE_NOT_FOUND means index file has not been built,
1432
        //    usually occurs when creating a new index, queries can be downgraded
1433
        //    without index.
1434
        // 2. INVERTED_INDEX_BYPASS means the hit of condition by index
1435
        //    has reached the optimal limit, downgrade without index query can
1436
        //    improve query performance.
1437
        // 3. INVERTED_INDEX_EVALUATE_SKIPPED means the inverted index is not
1438
        //    suitable for executing this predicate, skipped it and filter data
1439
        //    by function later.
1440
        // 4. INVERTED_INDEX_NO_TERMS means the column has fulltext index,
1441
        //    but the column condition value no terms in specified parser,
1442
        //    such as: where A = '' and B = ','
1443
        //    the predicate of A and B need downgrade without index query.
1444
        // 5. INVERTED_INDEX_FILE_CORRUPTED means the index file is corrupted,
1445
        //    such as when index segment files are not generated
1446
        // above case can downgrade without index query
1447
0
        _opts.stats->inverted_index_downgrade_count++;
1448
0
        if (!res.is<ErrorCode::INVERTED_INDEX_BYPASS>()) {
1449
0
            LOG(INFO) << "will downgrade without index to evaluate predicate, because of res: "
1450
0
                      << res;
1451
0
        } else {
1452
0
            VLOG_DEBUG << "will downgrade without index to evaluate predicate, because of res: "
1453
0
                       << res;
1454
0
        }
1455
0
        return true;
1456
0
    }
1457
0
    return false;
1458
0
}
1459
1460
90
bool SegmentIterator::_column_has_fulltext_index(int32_t cid) {
1461
90
    bool has_fulltext_index =
1462
90
            _index_iterators[cid] != nullptr &&
1463
90
            _index_iterators[cid]->get_reader(InvertedIndexReaderType::FULLTEXT) &&
1464
90
            _index_iterators[cid]->get_reader(InvertedIndexReaderType::STRING_TYPE) == nullptr;
1465
1466
90
    return has_fulltext_index;
1467
90
}
1468
1469
45
inline bool SegmentIterator::_inverted_index_not_support_pred_type(const PredicateType& type) {
1470
45
    return type == PredicateType::BF;
1471
45
}
1472
1473
Status SegmentIterator::_apply_inverted_index_on_column_predicate(
1474
        std::shared_ptr<ColumnPredicate> pred,
1475
45
        std::vector<std::shared_ptr<ColumnPredicate>>& remaining_predicates, bool* continue_apply) {
1476
45
    if (!_check_apply_by_inverted_index(pred)) {
1477
0
        remaining_predicates.emplace_back(pred);
1478
45
    } else {
1479
45
        bool need_remaining_after_evaluate = _column_has_fulltext_index(pred->column_id()) &&
1480
45
                                             PredicateTypeTraits::is_equal_or_list(pred->type());
1481
45
        Status res =
1482
45
                pred->evaluate(_storage_name_and_type[pred->column_id()],
1483
45
                               _index_iterators[pred->column_id()].get(), num_rows(), &_row_bitmap);
1484
45
        if (!res.ok()) {
1485
0
            if (_downgrade_without_index(res, need_remaining_after_evaluate)) {
1486
0
                remaining_predicates.emplace_back(pred);
1487
0
                return Status::OK();
1488
0
            }
1489
0
            LOG(WARNING) << "failed to evaluate index"
1490
0
                         << ", column predicate type: " << pred->pred_type_string(pred->type())
1491
0
                         << ", error msg: " << res;
1492
0
            return res;
1493
0
        }
1494
1495
45
        if (_row_bitmap.isEmpty()) {
1496
            // all rows have been pruned, no need to process further predicates
1497
0
            *continue_apply = false;
1498
0
        }
1499
1500
45
        if (need_remaining_after_evaluate) {
1501
0
            remaining_predicates.emplace_back(pred);
1502
0
            return Status::OK();
1503
0
        }
1504
45
        if (!pred->is_runtime_filter()) {
1505
45
            _column_predicate_index_exec_status[pred->column_id()][pred] = true;
1506
45
        }
1507
45
    }
1508
45
    return Status::OK();
1509
45
}
1510
1511
28.0k
bool SegmentIterator::_need_read_data(ColumnId cid) {
1512
28.0k
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
1513
0
        return true;
1514
0
    }
1515
    // only support DUP_KEYS and UNIQUE_KEYS with MOW
1516
28.0k
    if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
1517
28.0k
           (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
1518
11.1k
            _opts.enable_unique_key_merge_on_write)))) {
1519
7.58k
        return true;
1520
7.58k
    }
1521
    // this is a virtual column, we always need to read data
1522
20.4k
    if (this->_vir_cid_to_idx_in_block.contains(cid)) {
1523
0
        return true;
1524
0
    }
1525
1526
    // if there is a delete predicate, we always need to read data
1527
20.4k
    if (_has_delete_predicate(cid)) {
1528
1.74k
        return true;
1529
1.74k
    }
1530
18.7k
    if (_output_columns.count(-1)) {
1531
        // if _output_columns contains -1, it means that the light
1532
        // weight schema change may not be enabled or other reasons
1533
        // caused the column unique_id not be set, to prevent errors
1534
        // occurring, return true here that column data needs to be read
1535
0
        return true;
1536
0
    }
1537
    // Check the following conditions:
1538
    // 1. If the column represented by the unique ID is an inverted index column (indicated by '_need_read_data_indices.count(unique_id) > 0 && !_need_read_data_indices[unique_id]')
1539
    //    and it's not marked for projection in '_output_columns'.
1540
    // 2. Or, if the column is an inverted index column and it's marked for projection in '_output_columns',
1541
    //    and the operation is a push down of the 'COUNT_ON_INDEX' aggregation function.
1542
    // If any of the above conditions are met, log a debug message indicating that there's no need to read data for the indexed column.
1543
    // Then, return false.
1544
18.7k
    const auto& column = _opts.tablet_schema->column(cid);
1545
    // Different subcolumns may share the same parent_unique_id, so we choose to abandon this optimization.
1546
18.7k
    if (column.is_extracted_column() &&
1547
18.7k
        _opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
1548
406
        return true;
1549
406
    }
1550
18.3k
    int32_t unique_id = column.unique_id();
1551
18.3k
    if (unique_id < 0) {
1552
9
        unique_id = column.parent_unique_id();
1553
9
    }
1554
18.3k
    if ((_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] &&
1555
18.3k
         !_output_columns.contains(unique_id)) ||
1556
18.3k
        (_need_read_data_indices.contains(cid) && !_need_read_data_indices[cid] &&
1557
18.2k
         _output_columns.count(unique_id) == 1 &&
1558
18.2k
         _opts.push_down_agg_type_opt == TPushAggOp::COUNT_ON_INDEX)) {
1559
39
        VLOG_DEBUG << "SegmentIterator no need read data for column: "
1560
0
                   << _opts.tablet_schema->column_by_uid(unique_id).name();
1561
39
        return false;
1562
39
    }
1563
18.2k
    return true;
1564
18.3k
}
1565
1566
63
Status SegmentIterator::_apply_inverted_index() {
1567
63
    std::vector<std::shared_ptr<ColumnPredicate>> remaining_predicates;
1568
63
    std::set<std::shared_ptr<ColumnPredicate>> no_need_to_pass_column_predicate_set;
1569
1570
63
    for (auto pred : _col_predicates) {
1571
45
        if (no_need_to_pass_column_predicate_set.count(pred) > 0) {
1572
0
            continue;
1573
45
        } else {
1574
45
            bool continue_apply = true;
1575
45
            RETURN_IF_ERROR(_apply_inverted_index_on_column_predicate(pred, remaining_predicates,
1576
45
                                                                      &continue_apply));
1577
45
            if (!continue_apply) {
1578
0
                break;
1579
0
            }
1580
45
        }
1581
45
    }
1582
1583
63
    _col_predicates = std::move(remaining_predicates);
1584
63
    return Status::OK();
1585
63
}
1586
1587
/**
1588
 * @brief Checks if all conditions related to a specific column have passed in both
1589
 * `_column_predicate_inverted_index_status` and `_common_expr_inverted_index_status`.
1590
 *
1591
 * This function first checks the conditions in `_column_predicate_inverted_index_status`
1592
 * for the given `ColumnId`. If all conditions pass, it sets `default_return` to `true`.
1593
 * It then checks the conditions in `_common_expr_inverted_index_status` for the same column.
1594
 *
1595
 * The function returns `true` if all conditions in both maps pass. If any condition fails
1596
 * in either map, the function immediately returns `false`. If the column does not exist
1597
 * in one of the maps, the function returns `default_return`.
1598
 *
1599
 * @param cid The ColumnId of the column to check.
1600
 * @param default_return The default value to return if the column is not found in the status maps.
1601
 * @return true if all conditions in both status maps pass, or if the column is not found
1602
 *         and `default_return` is true.
1603
 * @return false if any condition in either status map fails, or if the column is not found
1604
 *         and `default_return` is false.
1605
 */
1606
bool SegmentIterator::_check_all_conditions_passed_inverted_index_for_column(ColumnId cid,
1607
145
                                                                             bool default_return) {
1608
145
    auto pred_it = _column_predicate_index_exec_status.find(cid);
1609
145
    if (pred_it != _column_predicate_index_exec_status.end()) {
1610
47
        const auto& pred_map = pred_it->second;
1611
47
        bool pred_passed = std::all_of(pred_map.begin(), pred_map.end(),
1612
47
                                       [](const auto& pred_entry) { return pred_entry.second; });
1613
47
        if (!pred_passed) {
1614
1
            return false;
1615
46
        } else {
1616
46
            default_return = true;
1617
46
        }
1618
47
    }
1619
1620
144
    auto expr_it = _common_expr_index_exec_status.find(cid);
1621
144
    if (expr_it != _common_expr_index_exec_status.end()) {
1622
18
        const auto& expr_map = expr_it->second;
1623
18
        return std::all_of(expr_map.begin(), expr_map.end(),
1624
18
                           [](const auto& expr_entry) { return expr_entry.second; });
1625
18
    }
1626
126
    return default_return;
1627
144
}
1628
1629
3.03k
Status SegmentIterator::_init_return_column_iterators() {
1630
3.03k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_return_column_iterators_timer_ns);
1631
3.03k
    if (_cur_rowid >= num_rows()) {
1632
0
        return Status::OK();
1633
0
    }
1634
1635
7.50k
    for (auto cid : _schema->column_ids()) {
1636
7.50k
        if (_schema->column(cid)->name() == BeConsts::ROWID_COL) {
1637
0
            _column_iterators[cid].reset(
1638
0
                    new RowIdColumnIterator(_opts.tablet_id, _opts.rowset_id, _segment->id()));
1639
0
            continue;
1640
0
        }
1641
1642
7.50k
        if (_schema->column(cid)->name().starts_with(BeConsts::GLOBAL_ROWID_COL)) {
1643
0
            auto& id_file_map = _opts.runtime_state->get_id_file_map();
1644
0
            uint32_t file_id = id_file_map->get_file_mapping_id(std::make_shared<FileMapping>(
1645
0
                    _opts.tablet_id, _opts.rowset_id, _segment->id()));
1646
0
            _column_iterators[cid].reset(new RowIdColumnIteratorV2(
1647
0
                    IdManager::ID_VERSION, BackendOptions::get_backend_id(), file_id));
1648
0
            continue;
1649
0
        }
1650
1651
7.50k
        if (_schema->column(cid)->name().starts_with(BeConsts::VIRTUAL_COLUMN_PREFIX)) {
1652
0
            _column_iterators[cid] = std::make_unique<VirtualColumnIterator>();
1653
0
            continue;
1654
0
        }
1655
1656
7.50k
        std::set<ColumnId> del_cond_id_set;
1657
7.50k
        _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
1658
7.50k
        std::vector<bool> tmp_is_pred_column;
1659
7.50k
        tmp_is_pred_column.resize(_schema->columns().size(), false);
1660
7.50k
        for (auto predicate : _col_predicates) {
1661
130
            auto p_cid = predicate->column_id();
1662
130
            tmp_is_pred_column[p_cid] = true;
1663
130
        }
1664
        // handle delete_condition
1665
7.50k
        for (auto d_cid : del_cond_id_set) {
1666
1.32k
            tmp_is_pred_column[d_cid] = true;
1667
1.32k
        }
1668
1669
7.50k
        if (_column_iterators[cid] == nullptr) {
1670
7.50k
            RETURN_IF_ERROR(_segment->new_column_iterator(_opts.tablet_schema->column(cid),
1671
7.50k
                                                          &_column_iterators[cid], &_opts,
1672
7.50k
                                                          &_variant_sparse_column_cache));
1673
7.50k
            ColumnIteratorOptions iter_opts {
1674
7.50k
                    .use_page_cache = _opts.use_page_cache,
1675
                    // If the col is predicate column, then should read the last page to check
1676
                    // if the column is full dict encoding
1677
7.50k
                    .is_predicate_column = tmp_is_pred_column[cid],
1678
7.50k
                    .file_reader = _file_reader.get(),
1679
7.50k
                    .stats = _opts.stats,
1680
7.50k
                    .io_ctx = _opts.io_ctx,
1681
7.50k
            };
1682
7.50k
            RETURN_IF_ERROR(_column_iterators[cid]->init(iter_opts));
1683
7.50k
        }
1684
7.50k
    }
1685
1686
3.03k
#ifndef NDEBUG
1687
3.03k
    for (auto pair : _vir_cid_to_idx_in_block) {
1688
0
        ColumnId vir_col_cid = pair.first;
1689
0
        DCHECK(_column_iterators[vir_col_cid] != nullptr)
1690
0
                << "Virtual column iterator for " << vir_col_cid << " should not be null";
1691
0
        ColumnIterator* column_iter = _column_iterators[vir_col_cid].get();
1692
0
        DCHECK(dynamic_cast<VirtualColumnIterator*>(column_iter) != nullptr)
1693
0
                << "Virtual column iterator for " << vir_col_cid
1694
0
                << " should be VirtualColumnIterator";
1695
0
    }
1696
3.03k
#endif
1697
3.03k
    return Status::OK();
1698
3.03k
}
1699
1700
3.03k
Status SegmentIterator::_init_index_iterators() {
1701
3.03k
    SCOPED_RAW_TIMER(&_opts.stats->segment_iterator_init_index_iterators_timer_ns);
1702
3.03k
    if (_cur_rowid >= num_rows()) {
1703
0
        return Status::OK();
1704
0
    }
1705
1706
3.03k
    _index_query_context = std::make_shared<IndexQueryContext>();
1707
3.03k
    _index_query_context->io_ctx = &_opts.io_ctx;
1708
3.03k
    _index_query_context->stats = _opts.stats;
1709
3.03k
    _index_query_context->runtime_state = _opts.runtime_state;
1710
1711
3.03k
    if (_score_runtime) {
1712
0
        _index_query_context->collection_statistics = _opts.collection_statistics;
1713
0
        _index_query_context->collection_similarity = std::make_shared<CollectionSimilarity>();
1714
0
        _index_query_context->query_limit = _score_runtime->get_limit();
1715
0
        _index_query_context->is_asc = _score_runtime->is_asc();
1716
0
    }
1717
1718
    // Inverted index iterators
1719
7.50k
    for (auto cid : _schema->column_ids()) {
1720
        // Use segment’s own index_meta, for compatibility with future indexing needs to default to lowercase.
1721
7.50k
        if (_index_iterators[cid] == nullptr) {
1722
            // In the _opts.tablet_schema, the sub-column type information for the variant is FieldType::OLAP_FIELD_TYPE_VARIANT.
1723
            // This is because the sub-column is created in create_materialized_variant_column.
1724
            // We use this column to locate the metadata for the inverted index, which requires a unique_id and path.
1725
7.50k
            const auto& column = _opts.tablet_schema->column(cid);
1726
7.50k
            std::vector<const TabletIndex*> inverted_indexs;
1727
            // Keep shared_ptr alive to prevent use-after-free when accessing raw pointers
1728
7.50k
            TabletIndexes inverted_indexs_holder;
1729
            // If the column is an extracted column, we need to find the sub-column in the parent column reader.
1730
7.50k
            std::shared_ptr<ColumnReader> column_reader;
1731
7.50k
            if (column.is_extracted_column()) {
1732
203
                if (!_segment->_column_reader_cache->get_column_reader(
1733
203
                            column.parent_unique_id(), &column_reader, _opts.stats) ||
1734
203
                    column_reader == nullptr) {
1735
0
                    continue;
1736
0
                }
1737
203
                auto* variant_reader = assert_cast<VariantColumnReader*>(column_reader.get());
1738
203
                DataTypePtr data_type = _storage_name_and_type[cid].second;
1739
203
                if (data_type != nullptr &&
1740
203
                    data_type->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
1741
19
                    DataTypePtr inferred_type;
1742
19
                    Status st = variant_reader->infer_data_type_for_path(
1743
19
                            &inferred_type, column, _opts, _segment->_column_reader_cache.get());
1744
19
                    if (st.ok() && inferred_type != nullptr) {
1745
19
                        data_type = inferred_type;
1746
19
                    }
1747
19
                }
1748
203
                inverted_indexs_holder = variant_reader->find_subcolumn_tablet_indexes(
1749
203
                        column, data_type, _opts.stats);
1750
                // Extract raw pointers from shared_ptr for iteration
1751
203
                for (const auto& index_ptr : inverted_indexs_holder) {
1752
87
                    inverted_indexs.push_back(index_ptr.get());
1753
87
                }
1754
203
            }
1755
            // If the column is not an extracted column, we can directly get the inverted index metadata from the tablet schema.
1756
7.29k
            else {
1757
7.29k
                inverted_indexs = _segment->_tablet_schema->inverted_indexs(column);
1758
7.29k
            }
1759
7.50k
            if (column.is_extracted_column() && inverted_indexs.empty() && _opts.stats != nullptr) {
1760
126
                const auto relative_path = column.path_info_ptr()->copy_pop_front().get_path();
1761
126
                const auto diagnostic = fmt::format(
1762
126
                        "[VariantSearchBinding] phase=init_index_iterators "
1763
126
                        "result=no_candidate tablet_id={} rowset_id={} segment_id={} cid={} "
1764
126
                        "logical_path={} relative_path={} materialized_column={}",
1765
126
                        _tablet_id, _segment->rowset_id().to_string(), _segment->id(), cid,
1766
126
                        column.path_info_ptr()->get_path(), relative_path, column.name());
1767
126
                VLOG_DEBUG << diagnostic;
1768
126
                _opts.stats->inverted_index_stats.add_binding_diagnostic(diagnostic);
1769
126
            }
1770
7.50k
            for (const auto& inverted_index : inverted_indexs) {
1771
2.67k
                const bool had_iterator = _index_iterators[cid] != nullptr;
1772
2.67k
                RETURN_IF_ERROR(_segment->new_index_iterator(column, inverted_index, _opts,
1773
2.67k
                                                             &_index_iterators[cid]));
1774
2.67k
                if ((column.is_extracted_column() || column.is_variant_type()) &&
1775
2.67k
                    _opts.stats != nullptr) {
1776
87
                    const auto diagnostic = fmt::format(
1777
87
                            "[VariantSearchBinding] phase=init_index_iterators "
1778
87
                            "result={} tablet_id={} rowset_id={} segment_id={} cid={} "
1779
87
                            "logical_path={} materialized_column={} index_id={} suffix={} "
1780
87
                            "field_pattern={} iterator_state={}",
1781
87
                            _index_iterators[cid] == nullptr ? "no_iterator" : "accepted",
1782
87
                            _tablet_id, _segment->rowset_id().to_string(), _segment->id(), cid,
1783
87
                            column.has_path_info() ? column.path_info_ptr()->get_path()
1784
87
                                                   : column.name(),
1785
87
                            column.name(), inverted_index->index_id(),
1786
87
                            inverted_index->get_index_suffix(), inverted_index->field_pattern(),
1787
87
                            had_iterator ? "preserved" : "created");
1788
87
                    VLOG_DEBUG << diagnostic;
1789
87
                    _opts.stats->inverted_index_stats.add_binding_diagnostic(diagnostic);
1790
87
                }
1791
2.67k
            }
1792
7.50k
            if (_index_iterators[cid] != nullptr) {
1793
2.65k
                _index_iterators[cid]->set_context(_index_query_context);
1794
2.65k
            }
1795
7.50k
        }
1796
7.50k
    }
1797
1798
    // Ann index iterators
1799
7.50k
    for (auto cid : _schema->column_ids()) {
1800
7.50k
        if (_index_iterators[cid] == nullptr) {
1801
4.85k
            const auto& column = _opts.tablet_schema->column(cid);
1802
4.85k
            const auto* index_meta = _segment->_tablet_schema->ann_index(column);
1803
4.85k
            if (index_meta) {
1804
1
                RETURN_IF_ERROR(_segment->new_index_iterator(column, index_meta, _opts,
1805
1
                                                             &_index_iterators[cid]));
1806
1807
1
                if (_index_iterators[cid] != nullptr) {
1808
1
                    _index_iterators[cid]->set_context(_index_query_context);
1809
1
                }
1810
1
            }
1811
4.85k
        }
1812
7.50k
    }
1813
1814
3.03k
    return Status::OK();
1815
3.03k
}
1816
1817
Status SegmentIterator::_lookup_ordinal(const RowCursor& key, bool is_include, rowid_t upper_bound,
1818
0
                                        rowid_t* rowid) {
1819
0
    if (_segment->_tablet_schema->keys_type() == UNIQUE_KEYS &&
1820
0
        _segment->get_primary_key_index() != nullptr) {
1821
0
        return _lookup_ordinal_from_pk_index(key, is_include, rowid);
1822
0
    }
1823
0
    return _lookup_ordinal_from_sk_index(key, is_include, upper_bound, rowid);
1824
0
}
1825
1826
// look up one key to get its ordinal at which can get data by using short key index.
1827
// 'upper_bound' is defined the max ordinal the function will search.
1828
// We use upper_bound to reduce search times.
1829
// If we find a valid ordinal, it will be set in rowid and with Status::OK()
1830
// If we can not find a valid key in this segment, we will set rowid to upper_bound
1831
// Otherwise return error.
1832
// 1. get [start, end) ordinal through short key index
1833
// 2. binary search to find exact ordinal that match the input condition
1834
// Make is_include template to reduce branch
1835
Status SegmentIterator::_lookup_ordinal_from_sk_index(const RowCursor& key, bool is_include,
1836
0
                                                      rowid_t upper_bound, rowid_t* rowid) {
1837
0
    const ShortKeyIndexDecoder* sk_index_decoder = _segment->get_short_key_index();
1838
0
    DCHECK(sk_index_decoder != nullptr);
1839
1840
0
    std::string index_key;
1841
0
    key.encode_key_with_padding(&index_key, _segment->_tablet_schema->num_short_key_columns(),
1842
0
                                is_include);
1843
1844
0
    const auto& key_col_ids = key.schema()->column_ids();
1845
1846
0
    ssize_t start_block_id = 0;
1847
0
    auto start_iter = sk_index_decoder->lower_bound(index_key);
1848
0
    if (start_iter.valid()) {
1849
        // Because previous block may contain this key, so we should set rowid to
1850
        // last block's first row.
1851
0
        start_block_id = start_iter.ordinal();
1852
0
        if (start_block_id > 0) {
1853
0
            start_block_id--;
1854
0
        }
1855
0
    } else {
1856
        // When we don't find a valid index item, which means all short key is
1857
        // smaller than input key, this means that this key may exist in the last
1858
        // row block. so we set the rowid to first row of last row block.
1859
0
        start_block_id = sk_index_decoder->num_items() - 1;
1860
0
    }
1861
0
    rowid_t start = cast_set<rowid_t>(start_block_id) * sk_index_decoder->num_rows_per_block();
1862
1863
0
    rowid_t end = upper_bound;
1864
0
    auto end_iter = sk_index_decoder->upper_bound(index_key);
1865
0
    if (end_iter.valid()) {
1866
0
        end = cast_set<rowid_t>(end_iter.ordinal()) * sk_index_decoder->num_rows_per_block();
1867
0
    }
1868
1869
    // binary search to find the exact key
1870
0
    while (start < end) {
1871
0
        rowid_t mid = (start + end) / 2;
1872
0
        RETURN_IF_ERROR(_seek_and_peek(mid));
1873
0
        int cmp = _compare_short_key_with_seek_block(key, key_col_ids);
1874
0
        if (cmp > 0) {
1875
0
            start = mid + 1;
1876
0
        } else if (cmp == 0) {
1877
0
            if (is_include) {
1878
                // lower bound
1879
0
                end = mid;
1880
0
            } else {
1881
                // upper bound
1882
0
                start = mid + 1;
1883
0
            }
1884
0
        } else {
1885
0
            end = mid;
1886
0
        }
1887
0
    }
1888
1889
0
    *rowid = start;
1890
0
    return Status::OK();
1891
0
}
1892
1893
Status SegmentIterator::_lookup_ordinal_from_pk_index(const RowCursor& key, bool is_include,
1894
0
                                                      rowid_t* rowid) {
1895
0
    DCHECK(_segment->_tablet_schema->keys_type() == UNIQUE_KEYS);
1896
0
    const PrimaryKeyIndexReader* pk_index_reader = _segment->get_primary_key_index();
1897
0
    DCHECK(pk_index_reader != nullptr);
1898
1899
0
    std::string index_key;
1900
0
    key.encode_key_with_padding<true>(&index_key, _segment->_tablet_schema->num_key_columns(),
1901
0
                                      is_include);
1902
0
    if (index_key < _segment->min_key()) {
1903
0
        *rowid = 0;
1904
0
        return Status::OK();
1905
0
    } else if (index_key > _segment->max_key()) {
1906
0
        *rowid = num_rows();
1907
0
        return Status::OK();
1908
0
    }
1909
0
    bool exact_match = false;
1910
1911
0
    std::unique_ptr<segment_v2::IndexedColumnIterator> index_iterator;
1912
0
    RETURN_IF_ERROR(pk_index_reader->new_iterator(&index_iterator, _opts.stats));
1913
1914
0
    Status status = index_iterator->seek_at_or_after(&index_key, &exact_match);
1915
0
    if (UNLIKELY(!status.ok())) {
1916
0
        *rowid = num_rows();
1917
0
        if (status.is<ENTRY_NOT_FOUND>()) {
1918
0
            return Status::OK();
1919
0
        }
1920
0
        return status;
1921
0
    }
1922
0
    *rowid = cast_set<rowid_t>(index_iterator->get_current_ordinal());
1923
1924
    // The sequence column needs to be removed from primary key index when comparing key
1925
0
    bool has_seq_col = _segment->_tablet_schema->has_sequence_col();
1926
    // Used to get key range from primary key index,
1927
    // for mow with cluster key table, we should get key range from short key index.
1928
0
    DCHECK(_segment->_tablet_schema->cluster_key_uids().empty());
1929
1930
    // if full key is exact_match, the primary key without sequence column should also the same
1931
0
    if (has_seq_col && !exact_match) {
1932
0
        size_t seq_col_length =
1933
0
                _segment->_tablet_schema->column(_segment->_tablet_schema->sequence_col_idx())
1934
0
                        .length() +
1935
0
                1;
1936
0
        auto index_type = DataTypeFactory::instance().create_data_type(
1937
0
                _segment->_pk_index_reader->type(), 1, 0);
1938
0
        auto index_column = index_type->create_column();
1939
0
        size_t num_to_read = 1;
1940
0
        size_t num_read = num_to_read;
1941
0
        RETURN_IF_ERROR(index_iterator->next_batch(&num_read, index_column));
1942
0
        DCHECK(num_to_read == num_read);
1943
1944
0
        Slice sought_key =
1945
0
                Slice(index_column->get_data_at(0).data, index_column->get_data_at(0).size);
1946
0
        Slice sought_key_without_seq =
1947
0
                Slice(sought_key.get_data(), sought_key.get_size() - seq_col_length);
1948
1949
        // compare key
1950
0
        if (Slice(index_key).compare(sought_key_without_seq) == 0) {
1951
0
            exact_match = true;
1952
0
        }
1953
0
    }
1954
1955
    // find the key in primary key index, and the is_include is false, so move
1956
    // to the next row.
1957
0
    if (exact_match && !is_include) {
1958
0
        *rowid += 1;
1959
0
    }
1960
0
    return Status::OK();
1961
0
}
1962
1963
// seek to the row and load that row to _key_cursor
1964
0
Status SegmentIterator::_seek_and_peek(rowid_t rowid) {
1965
0
    {
1966
0
        _opts.stats->block_init_seek_num += 1;
1967
0
        SCOPED_RAW_TIMER(&_opts.stats->block_init_seek_ns);
1968
0
        RETURN_IF_ERROR(_seek_columns(_seek_schema->column_ids(), rowid));
1969
0
    }
1970
0
    size_t num_rows = 1;
1971
1972
    //note(wb) reset _seek_block for memory reuse
1973
    // it is easier to use row based memory layout for clear memory
1974
0
    for (int i = 0; i < _seek_block.size(); i++) {
1975
0
        _seek_block[i]->clear();
1976
0
    }
1977
0
    RETURN_IF_ERROR(_read_columns(_seek_schema->column_ids(), _seek_block, num_rows));
1978
0
    return Status::OK();
1979
0
}
1980
1981
0
Status SegmentIterator::_seek_columns(const std::vector<ColumnId>& column_ids, rowid_t pos) {
1982
0
    for (auto cid : column_ids) {
1983
0
        if (!_need_read_data(cid)) {
1984
0
            continue;
1985
0
        }
1986
0
        RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(pos));
1987
0
    }
1988
0
    return Status::OK();
1989
0
}
1990
1991
/* ---------------------- for vectorization implementation  ---------------------- */
1992
1993
/**
1994
 *  For storage layer data type, can be measured from two perspectives:
1995
 *  1 Whether the type can be read in a fast way(batch read using SIMD)
1996
 *    Such as integer type and float type, this type can be read in SIMD way.
1997
 *    For the type string/bitmap/hll, they can not be read in batch way, so read this type data is slow.
1998
 *   If a type can be read fast, we can try to eliminate Lazy Materialization, because we think for this type, seek cost > read cost.
1999
 *   This is an estimate, if we want more precise cost, statistics collection is necessary(this is a todo).
2000
 *   In short, when returned non-pred columns contains string/hll/bitmap, we using Lazy Materialization.
2001
 *   Otherwise, we disable it.
2002
 *
2003
 *   When Lazy Materialization enable, we need to read column at least two times.
2004
 *   First time to read Pred col, second time to read non-pred.
2005
 *   Here's an interesting question to research, whether read Pred col once is the best plan.
2006
 *   (why not read Pred col twice or more?)
2007
 *
2008
 *   When Lazy Materialization disable, we just need to read once.
2009
 *
2010
 *
2011
 *  2 Whether the predicate type can be evaluate in a fast way(using SIMD to eval pred)
2012
 *    Such as integer type and float type, they can be eval fast.
2013
 *    But for BloomFilter/string/date, they eval slow.
2014
 *    If a type can be eval fast, we use vectorization to eval it.
2015
 *    Otherwise, we use short-circuit to eval it.
2016
 *
2017
 *
2018
 */
2019
2020
// todo(wb) need a UT here
2021
3.03k
Status SegmentIterator::_vec_init_lazy_materialization() {
2022
3.03k
    _is_pred_column.resize(_schema->columns().size(), false);
2023
2024
    // including short/vec/delete pred
2025
3.03k
    std::set<ColumnId> pred_column_ids;
2026
3.03k
    _lazy_materialization_read = false;
2027
2028
3.03k
    std::set<ColumnId> del_cond_id_set;
2029
3.03k
    _opts.delete_condition_predicates->get_all_column_ids(del_cond_id_set);
2030
2031
3.03k
    std::set<std::shared_ptr<const ColumnPredicate>> delete_predicate_set {};
2032
3.03k
    _opts.delete_condition_predicates->get_all_column_predicate(delete_predicate_set);
2033
3.03k
    for (auto predicate : delete_predicate_set) {
2034
467
        if (PredicateTypeTraits::is_range(predicate->type())) {
2035
327
            _delete_range_column_ids.push_back(predicate->column_id());
2036
327
        } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) {
2037
0
            _delete_bloom_filter_column_ids.push_back(predicate->column_id());
2038
0
        }
2039
467
    }
2040
2041
    // Step1: extract columns that can be lazy materialization
2042
3.03k
    if (!_col_predicates.empty() || !del_cond_id_set.empty()) {
2043
486
        std::set<ColumnId> short_cir_pred_col_id_set; // using set for distinct cid
2044
486
        std::set<ColumnId> vec_pred_col_id_set;
2045
2046
486
        for (auto predicate : _col_predicates) {
2047
19
            auto cid = predicate->column_id();
2048
19
            _is_pred_column[cid] = true;
2049
19
            pred_column_ids.insert(cid);
2050
2051
            // check pred using short eval or vec eval
2052
19
            if (_can_evaluated_by_vectorized(predicate)) {
2053
19
                vec_pred_col_id_set.insert(cid);
2054
19
                _pre_eval_block_predicate.push_back(predicate);
2055
19
            } else {
2056
0
                short_cir_pred_col_id_set.insert(cid);
2057
0
                _short_cir_eval_predicate.push_back(predicate);
2058
0
            }
2059
19
            if (predicate->is_runtime_filter()) {
2060
0
                _filter_info_id.push_back(predicate);
2061
0
            }
2062
19
        }
2063
2064
        // handle delete_condition
2065
486
        if (!del_cond_id_set.empty()) {
2066
467
            short_cir_pred_col_id_set.insert(del_cond_id_set.begin(), del_cond_id_set.end());
2067
467
            pred_column_ids.insert(del_cond_id_set.begin(), del_cond_id_set.end());
2068
2069
467
            for (auto cid : del_cond_id_set) {
2070
467
                _is_pred_column[cid] = true;
2071
467
            }
2072
467
        }
2073
2074
486
        _vec_pred_column_ids.assign(vec_pred_col_id_set.cbegin(), vec_pred_col_id_set.cend());
2075
486
        _short_cir_pred_column_ids.assign(short_cir_pred_col_id_set.cbegin(),
2076
486
                                          short_cir_pred_col_id_set.cend());
2077
486
    }
2078
2079
3.03k
    if (!_vec_pred_column_ids.empty()) {
2080
19
        _is_need_vec_eval = true;
2081
19
    }
2082
3.03k
    if (!_short_cir_pred_column_ids.empty()) {
2083
467
        _is_need_short_eval = true;
2084
467
    }
2085
2086
    // ColumnId to column index in block
2087
    // ColumnId will contail all columns in tablet schema, including virtual columns and global rowid column,
2088
3.03k
    _schema_block_id_map.resize(_schema->columns().size(), -1);
2089
    // Use cols read by query to initialize _schema_block_id_map.
2090
    // We need to know the index of each column in the block.
2091
    // There is an assumption here that the columns in the block are in the same order as in the read schema.
2092
    // TODO: A probelm is that, delete condition columns will exist in _schema->column_ids but not in block if
2093
    // delete column is not read by the query.
2094
10.5k
    for (int i = 0; i < _schema->num_column_ids(); i++) {
2095
7.50k
        auto cid = _schema->column_id(i);
2096
7.50k
        _schema_block_id_map[cid] = i;
2097
7.50k
    }
2098
2099
    // Step2: extract columns that can execute expr context
2100
3.03k
    _is_common_expr_column.resize(_schema->columns().size(), false);
2101
3.03k
    if (!_common_expr_ctxs_push_down.empty()) {
2102
6
        for (const auto& expr_ctx : _common_expr_ctxs_push_down) {
2103
6
            RETURN_IF_ERROR(_extract_common_expr_columns(expr_ctx->root()));
2104
6
        }
2105
6
        if (!_common_expr_columns.empty()) {
2106
6
            _is_need_expr_eval = true;
2107
12
            for (auto cid : _schema->column_ids()) {
2108
                // pred column also needs to be filtered by expr, exclude additional delete condition column.
2109
                // if delete condition column not in the block, no filter is needed
2110
                // and will be removed from _columns_to_filter in the first next_batch.
2111
12
                if (_is_common_expr_column[cid] || _is_pred_column[cid]) {
2112
6
                    auto loc = _schema_block_id_map[cid];
2113
6
                    _columns_to_filter.push_back(loc);
2114
6
                }
2115
12
            }
2116
2117
6
            for (auto pair : _vir_cid_to_idx_in_block) {
2118
0
                _columns_to_filter.push_back(cast_set<ColumnId>(pair.second));
2119
0
            }
2120
6
        }
2121
6
    }
2122
2123
    // Step 3: fill non predicate columns and second read column
2124
    // if _schema columns size equal to pred_column_ids size, lazy_materialization_read is false,
2125
    // all columns are lazy materialization columns without non predicte column.
2126
    // If common expr pushdown exists, and expr column is not contained in lazy materialization columns,
2127
    // add to second read column, which will be read after lazy materialization
2128
3.03k
    if (_schema->column_ids().size() > pred_column_ids.size()) {
2129
        // pred_column_ids maybe empty, so that could not set _lazy_materialization_read = true here
2130
        // has to check there is at least one predicate column
2131
7.44k
        for (auto cid : _schema->column_ids()) {
2132
7.44k
            if (!_is_pred_column[cid]) {
2133
7.01k
                if (_is_need_vec_eval || _is_need_short_eval) {
2134
881
                    _lazy_materialization_read = true;
2135
881
                }
2136
7.01k
                if (_is_common_expr_column[cid]) {
2137
6
                    _common_expr_column_ids.push_back(cid);
2138
7.00k
                } else {
2139
7.00k
                    _non_predicate_columns.push_back(cid);
2140
7.00k
                }
2141
7.01k
            }
2142
7.44k
        }
2143
2.97k
    }
2144
2145
    // Step 4: fill first read columns
2146
3.03k
    if (_lazy_materialization_read) {
2147
        // insert pred cid to first_read_columns
2148
429
        for (auto cid : pred_column_ids) {
2149
429
            _predicate_column_ids.push_back(cid);
2150
429
        }
2151
2.60k
    } else if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
2152
8.65k
        for (int i = 0; i < _schema->num_column_ids(); i++) {
2153
6.12k
            auto cid = _schema->column_id(i);
2154
6.12k
            _predicate_column_ids.push_back(cid);
2155
6.12k
        }
2156
2.53k
    } else {
2157
63
        if (_is_need_vec_eval || _is_need_short_eval) {
2158
            // TODO To refactor, because we suppose lazy materialization is better performance.
2159
            // pred exits, but we can eliminate lazy materialization
2160
            // insert pred/non-pred cid to first read columns
2161
57
            std::set<ColumnId> pred_id_set;
2162
57
            pred_id_set.insert(_short_cir_pred_column_ids.begin(),
2163
57
                               _short_cir_pred_column_ids.end());
2164
57
            pred_id_set.insert(_vec_pred_column_ids.begin(), _vec_pred_column_ids.end());
2165
2166
57
            DCHECK(_common_expr_column_ids.empty());
2167
            // _non_predicate_column_ids must be empty. Otherwise _lazy_materialization_read must not false.
2168
114
            for (int i = 0; i < _schema->num_column_ids(); i++) {
2169
57
                auto cid = _schema->column_id(i);
2170
57
                if (pred_id_set.find(cid) != pred_id_set.end()) {
2171
57
                    _predicate_column_ids.push_back(cid);
2172
57
                }
2173
57
            }
2174
57
        } else if (_is_need_expr_eval) {
2175
6
            DCHECK(!_is_need_vec_eval && !_is_need_short_eval);
2176
6
            for (auto cid : _common_expr_columns) {
2177
6
                _predicate_column_ids.push_back(cid);
2178
6
            }
2179
6
        }
2180
63
    }
2181
2182
3.03k
    VLOG_DEBUG << fmt::format(
2183
0
            "Laze materialization init end. "
2184
0
            "lazy_materialization_read: {}, "
2185
0
            "_col_predicates size: {}, "
2186
0
            "_cols_read_by_column_predicate: [{}], "
2187
0
            "_non_predicate_columns: [{}], "
2188
0
            "_cols_read_by_common_expr: [{}], "
2189
0
            "columns_to_filter: [{}], "
2190
0
            "_schema_block_id_map: [{}]",
2191
0
            _lazy_materialization_read, _col_predicates.size(),
2192
0
            fmt::join(_predicate_column_ids, ","), fmt::join(_non_predicate_columns, ","),
2193
0
            fmt::join(_common_expr_column_ids, ","), fmt::join(_columns_to_filter, ","),
2194
0
            fmt::join(_schema_block_id_map, ","));
2195
3.03k
    return Status::OK();
2196
3.03k
}
2197
2198
19
bool SegmentIterator::_can_evaluated_by_vectorized(std::shared_ptr<ColumnPredicate> predicate) {
2199
19
    auto cid = predicate->column_id();
2200
19
    FieldType field_type = _schema->column(cid)->type();
2201
19
    if (field_type == FieldType::OLAP_FIELD_TYPE_VARIANT) {
2202
        // Use variant cast dst type
2203
0
        field_type = _opts.target_cast_type_for_variants[_schema->column(cid)->name()]
2204
0
                             ->get_storage_field_type();
2205
0
    }
2206
19
    switch (predicate->type()) {
2207
9
    case PredicateType::EQ:
2208
9
    case PredicateType::NE:
2209
9
    case PredicateType::LE:
2210
9
    case PredicateType::LT:
2211
9
    case PredicateType::GE:
2212
19
    case PredicateType::GT: {
2213
19
        if (field_type == FieldType::OLAP_FIELD_TYPE_VARCHAR ||
2214
19
            field_type == FieldType::OLAP_FIELD_TYPE_CHAR ||
2215
19
            field_type == FieldType::OLAP_FIELD_TYPE_STRING) {
2216
5
            return config::enable_low_cardinality_optimize &&
2217
5
                   _opts.io_ctx.reader_type == ReaderType::READER_QUERY &&
2218
5
                   _column_iterators[cid]->is_all_dict_encoding();
2219
14
        } else if (field_type == FieldType::OLAP_FIELD_TYPE_DECIMAL) {
2220
0
            return false;
2221
0
        }
2222
14
        return true;
2223
19
    }
2224
0
    default:
2225
0
        return false;
2226
19
    }
2227
19
}
2228
2229
bool SegmentIterator::_prune_column(ColumnId cid, MutableColumnPtr& column, bool fill_defaults,
2230
28.0k
                                    size_t num_of_defaults) {
2231
28.0k
    if (_need_read_data(cid)) {
2232
28.0k
        return false;
2233
28.0k
    }
2234
38
    if (!fill_defaults) {
2235
0
        return true;
2236
0
    }
2237
38
    if (is_column_nullable(*column)) {
2238
8
        auto nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get());
2239
8
        nullable_col_ptr->get_null_map_column().insert_many_defaults(num_of_defaults);
2240
8
        nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(num_of_defaults);
2241
30
    } else {
2242
        // assert(column->is_const());
2243
30
        column->insert_many_defaults(num_of_defaults);
2244
30
    }
2245
38
    return true;
2246
38
}
2247
2248
Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
2249
0
                                      MutableColumns& column_block, size_t nrows) {
2250
0
    for (auto cid : column_ids) {
2251
0
        auto& column = column_block[cid];
2252
0
        size_t rows_read = nrows;
2253
0
        if (_prune_column(cid, column, true, rows_read)) {
2254
0
            continue;
2255
0
        }
2256
0
        RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
2257
0
        if (nrows != rows_read) {
2258
0
            return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})", nrows,
2259
0
                                                            rows_read);
2260
0
        }
2261
0
    }
2262
0
    return Status::OK();
2263
0
}
2264
2265
Status SegmentIterator::_init_current_block(Block* block,
2266
                                            std::vector<MutableColumnPtr>& current_columns,
2267
13.3k
                                            uint32_t nrows_read_limit) {
2268
13.3k
    block->clear_column_data(_schema->num_column_ids());
2269
2270
42.3k
    for (size_t i = 0; i < _schema->num_column_ids(); i++) {
2271
28.9k
        auto cid = _schema->column_id(i);
2272
28.9k
        const auto* column_desc = _schema->column(cid);
2273
2274
28.9k
        auto file_column_type = _storage_name_and_type[cid].second;
2275
28.9k
        auto expected_type = Schema::get_data_type_ptr(*column_desc);
2276
28.9k
        if (!_is_pred_column[cid] && !file_column_type->equals(*expected_type)) {
2277
            // The storage layer type is different from schema needed type, so we use storage
2278
            // type to read columns instead of schema type for safety
2279
36
            VLOG_DEBUG << fmt::format(
2280
0
                    "Recreate column with expected type {}, file column type {}, col_name {}, "
2281
0
                    "col_path {}",
2282
0
                    block->get_by_position(i).type->get_name(), file_column_type->get_name(),
2283
0
                    column_desc->name(),
2284
0
                    column_desc->path_info_ptr() == nullptr
2285
0
                            ? ""
2286
0
                            : column_desc->path_info_ptr()->get_path());
2287
            // TODO reuse
2288
36
            current_columns[cid] = file_column_type->create_column();
2289
36
            current_columns[cid]->reserve(nrows_read_limit);
2290
28.9k
        } else {
2291
            // the column in block must clear() here to insert new data
2292
28.9k
            if (_is_pred_column[cid] ||
2293
28.9k
                i >= block->columns()) { //todo(wb) maybe we can release it after output block
2294
2.21k
                if (current_columns[cid].get() == nullptr) {
2295
0
                    return Status::InternalError(
2296
0
                            "SegmentIterator meet invalid column, id={}, name={}", cid,
2297
0
                            _schema->column(cid)->name());
2298
0
                }
2299
2.21k
                current_columns[cid]->clear();
2300
26.7k
            } else { // non-predicate column
2301
26.7k
                current_columns[cid] = std::move(*block->get_by_position(i).column).mutate();
2302
26.7k
                current_columns[cid]->reserve(nrows_read_limit);
2303
26.7k
            }
2304
28.9k
        }
2305
28.9k
    }
2306
2307
13.3k
    for (auto entry : _virtual_column_exprs) {
2308
0
        auto cid = entry.first;
2309
0
        current_columns[cid] = ColumnNothing::create(0);
2310
0
        current_columns[cid]->reserve(nrows_read_limit);
2311
0
    }
2312
2313
13.3k
    return Status::OK();
2314
13.3k
}
2315
2316
10.3k
Status SegmentIterator::_output_non_pred_columns(Block* block) {
2317
10.3k
    SCOPED_RAW_TIMER(&_opts.stats->output_col_ns);
2318
10.3k
    VLOG_DEBUG << fmt::format(
2319
0
            "Output non-predicate columns, _non_predicate_columns: [{}], "
2320
0
            "_schema_block_id_map: [{}]",
2321
0
            fmt::join(_non_predicate_columns, ","), fmt::join(_schema_block_id_map, ","));
2322
10.3k
    RETURN_IF_ERROR(_convert_to_expected_type(_non_predicate_columns));
2323
19.7k
    for (auto cid : _non_predicate_columns) {
2324
19.7k
        auto loc = _schema_block_id_map[cid];
2325
        // Whether a delete predicate column gets output depends on how the caller builds
2326
        // the block passed to next_batch(). Both calling paths now build the block with
2327
        // only the output schema (return_columns), so delete predicate columns are skipped:
2328
        //
2329
        // 1) VMergeIterator path: block_reset() builds _block using the output schema
2330
        //    (return_columns only), e.g. block has 2 columns {c1, c2}.
2331
        //    Here loc=2 for delete predicate c3, block->columns()=2, so loc < block->columns()
2332
        //    is false, and c3 is skipped.
2333
        //
2334
        // 2) VUnionIterator path: the caller's block is built with only return_columns
2335
        //    (output schema), e.g. block has 2 columns {c1, c2}.
2336
        //    Here loc=2 for c3, block->columns()=2, so loc < block->columns() is false,
2337
        //    and c3 is skipped — same behavior as the VMergeIterator path.
2338
19.7k
        if (loc < block->columns()) {
2339
19.7k
            bool column_in_block_is_nothing = check_and_get_column<const ColumnNothing>(
2340
19.7k
                    block->get_by_position(loc).column.get());
2341
19.7k
            bool column_is_normal = !_vir_cid_to_idx_in_block.contains(cid);
2342
19.7k
            bool return_column_is_nothing =
2343
19.7k
                    check_and_get_column<const ColumnNothing>(_current_return_columns[cid].get());
2344
19.7k
            VLOG_DEBUG << fmt::format(
2345
0
                    "Cid {} loc {}, column_in_block_is_nothing {}, column_is_normal {}, "
2346
0
                    "return_column_is_nothing {}",
2347
0
                    cid, loc, column_in_block_is_nothing, column_is_normal,
2348
0
                    return_column_is_nothing);
2349
2350
19.7k
            if (column_in_block_is_nothing || column_is_normal) {
2351
19.7k
                block->replace_by_position(loc, std::move(_current_return_columns[cid]));
2352
19.7k
                VLOG_DEBUG << fmt::format(
2353
0
                        "Output non-predicate column, cid: {}, loc: {}, col_name: {}, rows {}", cid,
2354
0
                        loc, _schema->column(cid)->name(),
2355
0
                        block->get_by_position(loc).column->size());
2356
19.7k
            }
2357
            // Means virtual column in block has been materialized(maybe by common expr).
2358
            // so do nothing here.
2359
19.7k
        }
2360
19.7k
    }
2361
10.3k
    return Status::OK();
2362
10.3k
}
2363
2364
/**
2365
 * Reads columns by their index, handling both continuous and discontinuous rowid scenarios.
2366
 *
2367
 * This function is designed to read a specified number of rows (up to nrows_read_limit)
2368
 * from the segment iterator, dealing with both continuous and discontinuous rowid arrays.
2369
 * It operates as follows:
2370
 *
2371
 * 1. Reads a batch of rowids (up to the specified limit), and checks if they are continuous.
2372
 *    Continuous here means that the rowids form an unbroken sequence (e.g., 1, 2, 3, 4...).
2373
 *
2374
 * 2. For each column that needs to be read (identified by _predicate_column_ids):
2375
 *    - If the rowids are continuous, the function uses seek_to_ordinal and next_batch
2376
 *      for efficient reading.
2377
 *    - If the rowids are not continuous, the function processes them in smaller batches
2378
 *      (each of size up to 256). Each batch is checked for internal continuity:
2379
 *        a. If a batch is continuous, uses seek_to_ordinal and next_batch for that batch.
2380
 *        b. If a batch is not continuous, uses read_by_rowids for individual rowids in the batch.
2381
 *
2382
 * This approach optimizes reading performance by leveraging batch processing for continuous
2383
 * rowid sequences and handling discontinuities gracefully in smaller chunks.
2384
 */
2385
13.3k
Status SegmentIterator::_read_columns_by_index(uint32_t nrows_read_limit, uint16_t& nrows_read) {
2386
13.3k
    SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_ns);
2387
2388
13.3k
    nrows_read = (uint16_t)_range_iter->read_batch_rowids(_block_rowids.data(), nrows_read_limit);
2389
13.3k
    bool is_continuous = (nrows_read > 1) &&
2390
13.3k
                         (_block_rowids[nrows_read - 1] - _block_rowids[0] == nrows_read - 1);
2391
13.3k
    VLOG_DEBUG << fmt::format(
2392
0
            "nrows_read from range iterator: {}, is_continus {}, "
2393
0
            "_cols_read_by_column_predicate "
2394
0
            "[{}]",
2395
0
            nrows_read, is_continuous, fmt::join(_predicate_column_ids, ","));
2396
2397
13.3k
    LOG_IF(INFO, config::enable_segment_prefetch_verbose_log) << fmt::format(
2398
0
            "[verbose] SegmentIterator::_read_columns_by_index read {} rowids, continuous: {}, "
2399
0
            "rowids: [{}...{}]",
2400
0
            nrows_read, is_continuous, nrows_read > 0 ? _block_rowids[0] : 0,
2401
0
            nrows_read > 0 ? _block_rowids[nrows_read - 1] : 0);
2402
25.9k
    for (auto cid : _predicate_column_ids) {
2403
25.9k
        auto& column = _current_return_columns[cid];
2404
25.9k
        VLOG_DEBUG << fmt::format("Reading column {}, col_name {}", cid,
2405
0
                                  _schema->column(cid)->name());
2406
25.9k
        if (!_virtual_column_exprs.contains(cid)) {
2407
25.9k
            if (_no_need_read_key_data(cid, column, nrows_read)) {
2408
0
                VLOG_DEBUG << fmt::format("Column {} no need to read.", cid);
2409
0
                continue;
2410
0
            }
2411
25.9k
            if (_prune_column(cid, column, true, nrows_read)) {
2412
38
                VLOG_DEBUG << fmt::format("Column {} is pruned. No need to read data.", cid);
2413
38
                continue;
2414
38
            }
2415
25.8k
            DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", {
2416
25.8k
                auto col_name = _opts.tablet_schema->column(cid).name();
2417
25.8k
                auto debug_col_name =
2418
25.8k
                        DebugPoints::instance()->get_debug_param_or_default<std::string>(
2419
25.8k
                                "segment_iterator._read_columns_by_index", "column_name", "");
2420
25.8k
                if (debug_col_name.empty() && col_name != "__DORIS_DELETE_SIGN__") {
2421
25.8k
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
2422
25.8k
                            "does not need to read data, {}", col_name);
2423
25.8k
                }
2424
25.8k
                if (debug_col_name.find(col_name) != std::string::npos) {
2425
25.8k
                    return Status::Error<ErrorCode::INTERNAL_ERROR>(
2426
25.8k
                            "does not need to read data, {}", col_name);
2427
25.8k
                }
2428
25.8k
            })
2429
25.8k
        }
2430
2431
25.8k
        if (is_continuous) {
2432
18.8k
            size_t rows_read = nrows_read;
2433
18.8k
            _opts.stats->predicate_column_read_seek_num += 1;
2434
18.8k
            if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
2435
0
                SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns);
2436
0
                RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
2437
18.8k
            } else {
2438
18.8k
                RETURN_IF_ERROR(_column_iterators[cid]->seek_to_ordinal(_block_rowids[0]));
2439
18.8k
            }
2440
18.8k
            RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
2441
18.8k
            if (rows_read != nrows_read) {
2442
0
                return Status::Error<ErrorCode::INTERNAL_ERROR>("nrows({}) != rows_read({})",
2443
0
                                                                nrows_read, rows_read);
2444
0
            }
2445
18.8k
        } else {
2446
7.03k
            const uint32_t batch_size = _range_iter->get_batch_size();
2447
7.03k
            uint32_t processed = 0;
2448
8.34k
            while (processed < nrows_read) {
2449
1.31k
                uint32_t current_batch_size = std::min(batch_size, nrows_read - processed);
2450
1.31k
                bool batch_continuous = (current_batch_size > 1) &&
2451
1.31k
                                        (_block_rowids[processed + current_batch_size - 1] -
2452
1.20k
                                                 _block_rowids[processed] ==
2453
1.20k
                                         current_batch_size - 1);
2454
2455
1.31k
                if (batch_continuous) {
2456
0
                    size_t rows_read = current_batch_size;
2457
0
                    _opts.stats->predicate_column_read_seek_num += 1;
2458
0
                    if (_opts.runtime_state && _opts.runtime_state->enable_profile()) {
2459
0
                        SCOPED_RAW_TIMER(&_opts.stats->predicate_column_read_seek_ns);
2460
0
                        RETURN_IF_ERROR(
2461
0
                                _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
2462
0
                    } else {
2463
0
                        RETURN_IF_ERROR(
2464
0
                                _column_iterators[cid]->seek_to_ordinal(_block_rowids[processed]));
2465
0
                    }
2466
0
                    RETURN_IF_ERROR(_column_iterators[cid]->next_batch(&rows_read, column));
2467
0
                    if (rows_read != current_batch_size) {
2468
0
                        return Status::Error<ErrorCode::INTERNAL_ERROR>(
2469
0
                                "batch nrows({}) != rows_read({})", current_batch_size, rows_read);
2470
0
                    }
2471
1.31k
                } else {
2472
1.31k
                    RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(
2473
1.31k
                            &_block_rowids[processed], current_batch_size, column));
2474
1.31k
                }
2475
1.31k
                processed += current_batch_size;
2476
1.31k
            }
2477
7.03k
        }
2478
25.8k
    }
2479
2480
13.3k
    return Status::OK();
2481
13.3k
}
2482
void SegmentIterator::_replace_version_col_if_needed(const std::vector<ColumnId>& column_ids,
2483
14.7k
                                                     size_t num_rows) {
2484
    // Only the rowset with single version need to replace the version column.
2485
    // Doris can't determine the version before publish_version finished, so
2486
    // we can't write data to __DORIS_VERSION_COL__ in segment writer, the value
2487
    // is 0 by default.
2488
    // So we need to replace the value to real version while reading.
2489
14.7k
    if (_opts.version.first != _opts.version.second) {
2490
6.54k
        return;
2491
6.54k
    }
2492
8.21k
    int32_t version_idx = _schema->version_col_idx();
2493
8.21k
    if (std::ranges::find(column_ids, version_idx) == column_ids.end()) {
2494
8.21k
        return;
2495
8.21k
    }
2496
2497
0
    const auto* column_desc = _schema->column(version_idx);
2498
0
    auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
2499
0
    DCHECK(_schema->column(version_idx)->type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
2500
0
    auto* col_ptr = assert_cast<ColumnInt64*>(column.get());
2501
0
    for (size_t j = 0; j < num_rows; j++) {
2502
0
        col_ptr->insert_value(_opts.version.second);
2503
0
    }
2504
0
    _current_return_columns[version_idx] = std::move(column);
2505
0
    VLOG_DEBUG << "replaced version column in segment iterator, version_col_idx:" << version_idx;
2506
0
}
2507
2508
void SegmentIterator::_update_lsn_col_if_needed(const std::vector<ColumnId>& column_ids,
2509
14.7k
                                                size_t num_rows) {
2510
    // | commit tso(64) | auto-inc row_id(64) |
2511
14.7k
    if (_opts.version.first != _opts.version.second) {
2512
6.54k
        return;
2513
6.54k
    }
2514
2515
8.21k
    if (_opts.io_ctx.reader_type != ReaderType::READER_BINLOG &&
2516
8.21k
        _opts.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
2517
8.21k
        return;
2518
8.21k
    }
2519
2520
0
    int32_t lsn_col_idx = _schema->lsn_col_idx();
2521
0
    if (lsn_col_idx < 0 || std::ranges::find(column_ids, lsn_col_idx) == column_ids.end()) {
2522
0
        return;
2523
0
    }
2524
2525
0
    DCHECK_EQ(_opts.commit_tso.start_tso(), _opts.commit_tso.end_tso());
2526
0
    const Int64 commit_tso = _opts.commit_tso.end_tso() == -1 ? 0 : _opts.commit_tso.end_tso();
2527
2528
0
    if (_is_pred_column[lsn_col_idx]) {
2529
0
        auto* lsn_column = assert_cast<ColumnInt128*>(_current_return_columns[lsn_col_idx].get());
2530
0
        std::vector<Int128> binlog_lsns;
2531
0
        binlog_lsns.reserve(num_rows);
2532
0
        for (size_t j = 0; j < num_rows; j++) {
2533
0
            const Int128 row_id = lsn_column->get_data()[j];
2534
0
            binlog_lsns.emplace_back(make_row_binlog_lsn(commit_tso, row_id));
2535
0
        }
2536
0
        lsn_column->clear();
2537
0
        for (const auto& binlog_lsn : binlog_lsns) {
2538
0
            lsn_column->insert_data(reinterpret_cast<const char*>(&binlog_lsn), 0);
2539
0
        }
2540
0
        return;
2541
0
    }
2542
2543
0
    auto* lsn_column = assert_cast<ColumnInt128*>(_current_return_columns[lsn_col_idx].get());
2544
0
    const auto* column_desc = _schema->column(lsn_col_idx);
2545
0
    auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
2546
0
    DCHECK(column_desc->type() == FieldType::OLAP_FIELD_TYPE_LARGEINT);
2547
0
    auto* col_ptr = assert_cast<ColumnInt128*>(column.get());
2548
2549
0
    for (size_t j = 0; j < num_rows; j++) {
2550
0
        const Int128 row_id = lsn_column->get_element(j);
2551
0
        col_ptr->insert_value(make_row_binlog_lsn(commit_tso, row_id));
2552
0
    }
2553
0
    _current_return_columns[lsn_col_idx] = std::move(column);
2554
0
}
2555
2556
void SegmentIterator::_update_tso_col_if_needed(const std::vector<ColumnId>& column_ids,
2557
14.7k
                                                size_t num_rows) {
2558
    // use physical time part of commit tso to replace timestamp col
2559
14.7k
    if (_opts.version.first != _opts.version.second) {
2560
6.54k
        return;
2561
6.54k
    }
2562
2563
8.21k
    if (_opts.io_ctx.reader_type != ReaderType::READER_BINLOG &&
2564
8.21k
        _opts.io_ctx.reader_type != ReaderType::READER_BINLOG_COMPACTION) {
2565
8.21k
        return;
2566
8.21k
    }
2567
2568
0
    int32_t tso_col_idx = _schema->tso_col_idx();
2569
0
    if (tso_col_idx < 0 || std::ranges::find(column_ids, tso_col_idx) == column_ids.end()) {
2570
0
        return;
2571
0
    }
2572
2573
0
    DCHECK_EQ(_opts.commit_tso.start_tso(), _opts.commit_tso.end_tso());
2574
0
    Int64 commit_tso = _opts.commit_tso.end_tso() == -1 ? 0 : _opts.commit_tso.end_tso();
2575
2576
0
    if (_is_pred_column[tso_col_idx]) {
2577
        // Nullable predicate column is represented as ColumnNullable(predicate_col)
2578
0
        if (auto* tso_nullable = check_and_get_column<ColumnNullable>(
2579
0
                    _current_return_columns[tso_col_idx].get())) {
2580
0
            _current_return_columns[tso_col_idx]->clear();
2581
0
            auto value = commit_tso;
2582
0
            for (size_t j = 0; j < num_rows; j++) {
2583
0
                tso_nullable->get_nested_column_ptr()->insert_data(
2584
0
                        reinterpret_cast<const char*>(&value), 0);
2585
0
                tso_nullable->get_null_map_data().emplace_back(0);
2586
0
            }
2587
0
            return;
2588
0
        }
2589
2590
0
        auto* tso_column = assert_cast<ColumnInt64*>(_current_return_columns[tso_col_idx].get());
2591
0
        tso_column->clear();
2592
0
        auto value = commit_tso;
2593
0
        for (size_t j = 0; j < num_rows; j++) {
2594
0
            tso_column->insert_data(reinterpret_cast<const char*>(&value), 0);
2595
0
        }
2596
0
        return;
2597
0
    }
2598
2599
0
    const auto* column_desc = _schema->column(tso_col_idx);
2600
0
    auto column = Schema::get_data_type_ptr(*column_desc)->create_column();
2601
0
    DCHECK(column_desc->type() == FieldType::OLAP_FIELD_TYPE_BIGINT);
2602
2603
0
    if (auto* tso_nullable = check_and_get_column<ColumnNullable>(column.get())) {
2604
0
        auto* col_ptr = assert_cast<ColumnInt64*>(&tso_nullable->get_nested_column());
2605
0
        for (size_t j = 0; j < num_rows; j++) {
2606
0
            col_ptr->insert_value(commit_tso);
2607
0
            tso_nullable->get_null_map_data().emplace_back(0);
2608
0
        }
2609
0
    } else {
2610
0
        auto* col_ptr = assert_cast<ColumnInt64*>(column.get());
2611
0
        for (size_t j = 0; j < num_rows; j++) {
2612
0
            col_ptr->insert_value(commit_tso);
2613
0
        }
2614
0
    }
2615
0
    _current_return_columns[tso_col_idx] = std::move(column);
2616
0
}
2617
2618
uint16_t SegmentIterator::_evaluate_vectorization_predicate(uint16_t* sel_rowid_idx,
2619
1.72k
                                                            uint16_t selected_size) {
2620
1.72k
    SCOPED_RAW_TIMER(&_opts.stats->vec_cond_ns);
2621
1.72k
    bool all_pred_always_true = true;
2622
1.72k
    for (const auto& pred : _pre_eval_block_predicate) {
2623
28
        if (!pred->always_true()) {
2624
28
            all_pred_always_true = false;
2625
28
        } else {
2626
0
            pred->update_filter_info(0, 0, selected_size);
2627
0
        }
2628
28
    }
2629
2630
1.72k
    const uint16_t original_size = selected_size;
2631
    //If all predicates are always_true, then return directly.
2632
1.72k
    if (all_pred_always_true || !_is_need_vec_eval) {
2633
3.90M
        for (uint16_t i = 0; i < original_size; ++i) {
2634
3.90M
            sel_rowid_idx[i] = i;
2635
3.90M
        }
2636
        // All preds are always_true, so return immediately and update the profile statistics here.
2637
1.69k
        _opts.stats->vec_cond_input_rows += original_size;
2638
1.69k
        return original_size;
2639
1.69k
    }
2640
2641
28
    _ret_flags.resize(original_size);
2642
28
    DCHECK(!_pre_eval_block_predicate.empty());
2643
28
    bool is_first = true;
2644
28
    for (auto& pred : _pre_eval_block_predicate) {
2645
28
        if (pred->always_true()) {
2646
0
            continue;
2647
0
        }
2648
28
        auto column_id = pred->column_id();
2649
28
        auto& column = _current_return_columns[column_id];
2650
28
        if (is_first) {
2651
28
            pred->evaluate_vec(*column, original_size, (bool*)_ret_flags.data());
2652
28
            is_first = false;
2653
28
        } else {
2654
0
            pred->evaluate_and_vec(*column, original_size, (bool*)_ret_flags.data());
2655
0
        }
2656
28
    }
2657
2658
28
    uint16_t new_size = 0;
2659
2660
28
    uint16_t sel_pos = 0;
2661
28
    const uint16_t sel_end = sel_pos + selected_size;
2662
28
    static constexpr size_t SIMD_BYTES = simd::bits_mask_length();
2663
28
    const uint16_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES;
2664
2665
540
    while (sel_pos < sel_end_simd) {
2666
512
        auto mask = simd::bytes_mask_to_bits_mask(_ret_flags.data() + sel_pos);
2667
512
        if (0 == mask) {
2668
            //pass
2669
256
        } else if (simd::bits_mask_all() == mask) {
2670
8.44k
            for (uint16_t i = 0; i < SIMD_BYTES; i++) {
2671
8.19k
                sel_rowid_idx[new_size++] = sel_pos + i;
2672
8.19k
            }
2673
256
        } else {
2674
0
            simd::iterate_through_bits_mask(
2675
0
                    [&](const int bit_pos) {
2676
0
                        sel_rowid_idx[new_size++] = sel_pos + (uint16_t)bit_pos;
2677
0
                    },
2678
0
                    mask);
2679
0
        }
2680
512
        sel_pos += SIMD_BYTES;
2681
512
    }
2682
2683
55
    for (; sel_pos < sel_end; sel_pos++) {
2684
27
        if (_ret_flags[sel_pos]) {
2685
20
            sel_rowid_idx[new_size++] = sel_pos;
2686
20
        }
2687
27
    }
2688
2689
28
    _opts.stats->vec_cond_input_rows += original_size;
2690
28
    _opts.stats->rows_vec_cond_filtered += original_size - new_size;
2691
28
    return new_size;
2692
1.72k
}
2693
2694
uint16_t SegmentIterator::_evaluate_short_circuit_predicate(uint16_t* vec_sel_rowid_idx,
2695
1.72k
                                                            uint16_t selected_size) {
2696
1.72k
    SCOPED_RAW_TIMER(&_opts.stats->short_cond_ns);
2697
1.72k
    if (!_is_need_short_eval) {
2698
28
        return selected_size;
2699
28
    }
2700
2701
1.69k
    uint16_t original_size = selected_size;
2702
1.69k
    for (auto predicate : _short_cir_eval_predicate) {
2703
0
        auto column_id = predicate->column_id();
2704
0
        auto& short_cir_column = _current_return_columns[column_id];
2705
0
        selected_size = predicate->evaluate(*short_cir_column, vec_sel_rowid_idx, selected_size);
2706
0
    }
2707
2708
1.69k
    _opts.stats->short_circuit_cond_input_rows += original_size;
2709
1.69k
    _opts.stats->rows_short_circuit_cond_filtered += original_size - selected_size;
2710
2711
    // evaluate delete condition
2712
1.69k
    original_size = selected_size;
2713
1.69k
    selected_size = _opts.delete_condition_predicates->evaluate(_current_return_columns,
2714
1.69k
                                                                vec_sel_rowid_idx, selected_size);
2715
1.69k
    _opts.stats->rows_vec_del_cond_filtered += original_size - selected_size;
2716
1.69k
    return selected_size;
2717
1.72k
}
2718
2719
1
static void shrink_materialized_block_columns(Block* block, size_t rows) {
2720
2
    for (auto& entry : *block) {
2721
2
        if (entry.column && entry.column->size() > rows) {
2722
1
            entry.column = entry.column->shrink(rows);
2723
1
        }
2724
2
    }
2725
1
}
2726
2727
static void slice_materialized_block_columns(Block* block, size_t offset, size_t rows,
2728
1
                                             size_t original_rows) {
2729
1
    for (auto& entry : *block) {
2730
1
        if (!entry.column || entry.column->size() == 0) {
2731
0
            continue;
2732
0
        }
2733
1
        DORIS_CHECK(entry.column->size() == original_rows);
2734
1
        entry.column = entry.column->cut(offset, rows);
2735
1
    }
2736
1
}
2737
2738
1.73k
Status SegmentIterator::_apply_read_limit_to_selected_rows(Block* block, uint16_t& selected_size) {
2739
1.73k
    if (_opts.read_limit == 0) {
2740
1.73k
        return Status::OK();
2741
1.73k
    }
2742
2
    DORIS_CHECK(_rows_returned <= _opts.read_limit);
2743
2
    size_t remaining = _opts.read_limit - _rows_returned;
2744
2
    if (remaining == 0) {
2745
0
        selected_size = 0;
2746
0
        shrink_materialized_block_columns(block, 0);
2747
0
        return Status::OK();
2748
0
    }
2749
2
    if (selected_size > remaining) {
2750
2
        if (_opts.read_orderby_key_reverse) {
2751
1
            const auto original_size = selected_size;
2752
1
            const auto offset = original_size - remaining;
2753
21
            for (size_t i = 0; i < remaining; ++i) {
2754
20
                _sel_rowid_idx[i] = _sel_rowid_idx[offset + i];
2755
20
            }
2756
1
            selected_size = cast_set<uint16_t>(remaining);
2757
1
            slice_materialized_block_columns(block, offset, remaining, original_size);
2758
1
            return Status::OK();
2759
1
        }
2760
1
        selected_size = cast_set<uint16_t>(remaining);
2761
1
        shrink_materialized_block_columns(block, selected_size);
2762
1
    }
2763
1
    return Status::OK();
2764
2
}
2765
2766
Status SegmentIterator::_read_columns_by_rowids(std::vector<ColumnId>& read_column_ids,
2767
                                                std::vector<rowid_t>& rowid_vector,
2768
                                                uint16_t* sel_rowid_idx, size_t select_size,
2769
                                                MutableColumns* mutable_columns,
2770
1.40k
                                                bool init_condition_cache) {
2771
1.40k
    SCOPED_RAW_TIMER(&_opts.stats->lazy_read_ns);
2772
1.40k
    std::vector<rowid_t> rowids(select_size);
2773
2774
1.40k
    if (init_condition_cache) {
2775
0
        DCHECK(_condition_cache);
2776
0
        auto& condition_cache = *_condition_cache;
2777
0
        for (size_t i = 0; i < select_size; ++i) {
2778
0
            rowids[i] = rowid_vector[sel_rowid_idx[i]];
2779
0
            condition_cache[rowids[i] / SegmentIterator::CONDITION_CACHE_OFFSET] = true;
2780
0
        }
2781
1.40k
    } else {
2782
2.75M
        for (size_t i = 0; i < select_size; ++i) {
2783
2.74M
            rowids[i] = rowid_vector[sel_rowid_idx[i]];
2784
2.74M
        }
2785
1.40k
    }
2786
2787
2.12k
    for (auto cid : read_column_ids) {
2788
2.12k
        auto& colunm = (*mutable_columns)[cid];
2789
2.12k
        if (_no_need_read_key_data(cid, colunm, select_size)) {
2790
0
            continue;
2791
0
        }
2792
2.12k
        if (_prune_column(cid, colunm, true, select_size)) {
2793
0
            continue;
2794
0
        }
2795
2796
2.12k
        DBUG_EXECUTE_IF("segment_iterator._read_columns_by_index", {
2797
2.12k
            auto debug_col_name = DebugPoints::instance()->get_debug_param_or_default<std::string>(
2798
2.12k
                    "segment_iterator._read_columns_by_index", "column_name", "");
2799
2.12k
            if (debug_col_name.empty()) {
2800
2.12k
                return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data");
2801
2.12k
            }
2802
2.12k
            auto col_name = _opts.tablet_schema->column(cid).name();
2803
2.12k
            if (debug_col_name.find(col_name) != std::string::npos) {
2804
2.12k
                return Status::Error<ErrorCode::INTERNAL_ERROR>("does not need to read data, {}",
2805
2.12k
                                                                debug_col_name);
2806
2.12k
            }
2807
2.12k
        })
2808
2809
2.12k
        if (_current_return_columns[cid].get() == nullptr) {
2810
0
            return Status::InternalError(
2811
0
                    "SegmentIterator meet invalid column, return columns size {}, cid {}",
2812
0
                    _current_return_columns.size(), cid);
2813
0
        }
2814
2.12k
        RETURN_IF_ERROR(_column_iterators[cid]->read_by_rowids(rowids.data(), select_size,
2815
2.12k
                                                               _current_return_columns[cid]));
2816
2.12k
    }
2817
2818
1.40k
    return Status::OK();
2819
1.40k
}
2820
2821
13.3k
Status SegmentIterator::next_batch(Block* block) {
2822
    // Replace virtual columns with ColumnNothing at the begining of each next_batch call.
2823
13.3k
    _init_virtual_columns(block);
2824
13.3k
    auto status = [&]() {
2825
13.3k
        RETURN_IF_CATCH_EXCEPTION({
2826
            // Adaptive batch size: predict how many rows this batch should read.
2827
13.3k
            if (_block_size_predictor) {
2828
13.3k
                auto predicted = static_cast<uint32_t>(_block_size_predictor->predict_next_rows());
2829
13.3k
                _opts.block_row_max = std::min(predicted, _initial_block_row_max);
2830
13.3k
                _opts.stats->adaptive_batch_size_predict_min_rows =
2831
13.3k
                        std::min(_opts.stats->adaptive_batch_size_predict_min_rows,
2832
13.3k
                                 static_cast<int64_t>(predicted));
2833
13.3k
                _opts.stats->adaptive_batch_size_predict_max_rows =
2834
13.3k
                        std::max(_opts.stats->adaptive_batch_size_predict_max_rows,
2835
13.3k
                                 static_cast<int64_t>(predicted));
2836
13.3k
            } else {
2837
                // No predictor — record the fixed batch size using min/max so we don't
2838
                // clobber values already accumulated by other segment iterators that
2839
                // share the same OlapReaderStatistics.
2840
13.3k
                _opts.stats->adaptive_batch_size_predict_min_rows =
2841
13.3k
                        std::min(_opts.stats->adaptive_batch_size_predict_min_rows,
2842
13.3k
                                 static_cast<int64_t>(_opts.block_row_max));
2843
13.3k
                _opts.stats->adaptive_batch_size_predict_max_rows =
2844
13.3k
                        std::max(_opts.stats->adaptive_batch_size_predict_max_rows,
2845
13.3k
                                 static_cast<int64_t>(_opts.block_row_max));
2846
13.3k
            }
2847
2848
13.3k
            auto res = _next_batch_internal(block);
2849
2850
13.3k
            if (res.is<END_OF_FILE>()) {
2851
                // Since we have a type check at the caller.
2852
                // So a replacement of nothing column with real column is needed.
2853
13.3k
                const auto& idx_to_datatype = _opts.vir_col_idx_to_type;
2854
13.3k
                for (const auto& pair : _vir_cid_to_idx_in_block) {
2855
13.3k
                    size_t idx = pair.second;
2856
13.3k
                    auto type = idx_to_datatype.find(idx)->second;
2857
13.3k
                    block->replace_by_position(idx, type->create_column());
2858
13.3k
                }
2859
2860
13.3k
                if (_opts.condition_cache_digest && !_find_condition_cache) {
2861
13.3k
                    auto* condition_cache = ConditionCache::instance();
2862
13.3k
                    ConditionCache::CacheKey cache_key(_opts.rowset_id, _segment->id(),
2863
13.3k
                                                       _opts.condition_cache_digest);
2864
13.3k
                    VLOG_DEBUG << "Condition cache insert, query id: "
2865
13.3k
                               << print_id(_opts.runtime_state->query_id())
2866
13.3k
                               << ", rowset id: " << _opts.rowset_id.to_string()
2867
13.3k
                               << ", segment id: " << _segment->id()
2868
13.3k
                               << ", cache digest: " << _opts.condition_cache_digest;
2869
13.3k
                    condition_cache->insert(cache_key, std::move(_condition_cache));
2870
13.3k
                }
2871
13.3k
                return res;
2872
13.3k
            }
2873
2874
13.3k
            RETURN_IF_ERROR(res);
2875
            // reverse block row order if read_orderby_key_reverse is true for key topn
2876
            // it should be processed for all success _next_batch_internal
2877
13.3k
            if (_opts.read_orderby_key_reverse) {
2878
13.3k
                size_t num_rows = block->rows();
2879
13.3k
                if (num_rows == 0) {
2880
13.3k
                    return Status::OK();
2881
13.3k
                }
2882
13.3k
                size_t num_columns = block->columns();
2883
13.3k
                IColumn::Permutation permutation;
2884
13.3k
                for (size_t i = 0; i < num_rows; ++i) permutation.emplace_back(num_rows - 1 - i);
2885
2886
13.3k
                for (size_t i = 0; i < num_columns; ++i)
2887
13.3k
                    block->get_by_position(i).column =
2888
13.3k
                            block->get_by_position(i).column->permute(permutation, num_rows);
2889
13.3k
            }
2890
2891
13.3k
            RETURN_IF_ERROR(block->check_type_and_column());
2892
2893
            // Adaptive batch size: update EWMA estimate from the completed batch.
2894
            // block->bytes() is accurate here: predicates have been applied and non-predicate
2895
            // columns have been filled for surviving rows by _next_batch_internal.
2896
13.3k
            if (_block_size_predictor && block->rows() > 0) {
2897
13.3k
                _block_size_predictor->update(*block);
2898
13.3k
            }
2899
2900
13.3k
            return Status::OK();
2901
13.3k
        });
2902
13.3k
    }();
2903
2904
    // if rows read by batch is 0, will return end of file, we should not remove segment cache in this situation.
2905
13.3k
    if (!status.ok() && !status.is<END_OF_FILE>()) {
2906
0
        _segment->update_healthy_status(status);
2907
0
    }
2908
13.3k
    return status;
2909
13.3k
}
2910
2911
13.3k
Status SegmentIterator::_convert_to_expected_type(const std::vector<ColumnId>& col_ids) {
2912
27.2k
    for (ColumnId i : col_ids) {
2913
27.2k
        if (!_current_return_columns[i] || _converted_column_ids[i] || _is_pred_column[i]) {
2914
486
            continue;
2915
486
        }
2916
26.7k
        const TabletColumn* column_desc = _schema->column(i);
2917
26.7k
        DataTypePtr expected_type = Schema::get_data_type_ptr(*column_desc);
2918
26.7k
        DataTypePtr file_column_type = _storage_name_and_type[i].second;
2919
26.7k
        if (!file_column_type->equals(*expected_type)) {
2920
36
            ColumnPtr expected;
2921
36
            ColumnPtr original = _current_return_columns[i]->assert_mutable()->get_ptr();
2922
36
            RETURN_IF_ERROR(variant_util::cast_column({original, file_column_type, ""},
2923
36
                                                      expected_type, &expected));
2924
36
            _current_return_columns[i] = expected->assert_mutable();
2925
36
            _converted_column_ids[i] = true;
2926
36
            VLOG_DEBUG << fmt::format("Convert {} fom file column type {} to {}, num_rows {}",
2927
0
                                      column_desc->path_info_ptr() == nullptr
2928
0
                                              ? ""
2929
0
                                              : column_desc->path_info_ptr()->get_path(),
2930
0
                                      file_column_type->get_name(), expected_type->get_name(),
2931
0
                                      _current_return_columns[i]->size());
2932
36
        }
2933
26.7k
    }
2934
13.3k
    return Status::OK();
2935
13.3k
}
2936
2937
Status SegmentIterator::copy_column_data_by_selector(IColumn* input_col_ptr,
2938
                                                     MutableColumnPtr& output_col,
2939
                                                     uint16_t* sel_rowid_idx, uint16_t select_size,
2940
1.41k
                                                     size_t batch_size) {
2941
1.41k
    if (is_column_nullable(*output_col) != is_column_nullable(*input_col_ptr)) {
2942
0
        LOG(WARNING) << "nullable mismatch for output_column: " << output_col->dump_structure()
2943
0
                     << " input_column: " << input_col_ptr->dump_structure()
2944
0
                     << " select_size: " << select_size;
2945
0
        return Status::RuntimeError("copy_column_data_by_selector nullable mismatch");
2946
0
    }
2947
1.41k
    output_col->reserve(select_size);
2948
1.41k
    return input_col_ptr->filter_by_selector(sel_rowid_idx, select_size, output_col.get());
2949
1.41k
}
2950
2951
13.3k
Status SegmentIterator::_next_batch_internal(Block* block) {
2952
13.3k
    SCOPED_CONCURRENCY_COUNT(ConcurrencyStatsManager::instance().segment_iterator_next_batch);
2953
2954
13.3k
    bool is_mem_reuse = block->mem_reuse();
2955
13.3k
    DCHECK(is_mem_reuse);
2956
2957
13.3k
    RETURN_IF_ERROR(_lazy_init(block));
2958
2959
13.3k
    SCOPED_RAW_TIMER(&_opts.stats->block_load_ns);
2960
2961
13.3k
    if (_opts.read_limit > 0 && _rows_returned >= _opts.read_limit) {
2962
0
        return _process_eof(block);
2963
0
    }
2964
2965
    // If the row bitmap size is smaller than nrows_read_limit, there's no need to reserve that many column rows.
2966
13.3k
    uint32_t nrows_read_limit =
2967
13.3k
            std::min(cast_set<uint32_t>(_row_bitmap.cardinality()), _opts.block_row_max);
2968
13.3k
    if (_can_opt_limit_reads()) {
2969
        // No SegmentIterator-side conjunct remains to be evaluated, so LIMIT is equivalent before
2970
        // and after filtering. Cap the first read directly; this is the no-conjunct fast path that
2971
        // avoids reading rows past the pushed-down local LIMIT.
2972
0
        size_t cap = (_opts.read_limit > _rows_returned) ? (_opts.read_limit - _rows_returned) : 0;
2973
0
        if (cap < nrows_read_limit) {
2974
0
            nrows_read_limit = static_cast<uint32_t>(cap);
2975
0
        }
2976
0
    }
2977
13.3k
    DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
2978
13.3k
        if (nrows_read_limit != 1) {
2979
13.3k
            return Status::Error<ErrorCode::INTERNAL_ERROR>(
2980
13.3k
                    "topn opt 1 execute failed: nrows_read_limit={}, "
2981
13.3k
                    "_opts.read_limit={}",
2982
13.3k
                    nrows_read_limit, _opts.read_limit);
2983
13.3k
        }
2984
13.3k
    })
2985
2986
13.3k
    RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit));
2987
13.3k
    _converted_column_ids.assign(_schema->columns().size(), false);
2988
2989
13.3k
    _selected_size = 0;
2990
13.3k
    RETURN_IF_ERROR(_read_columns_by_index(nrows_read_limit, _selected_size));
2991
13.3k
    _replace_version_col_if_needed(_predicate_column_ids, _selected_size);
2992
13.3k
    _update_lsn_col_if_needed(_predicate_column_ids, _selected_size);
2993
13.3k
    _update_tso_col_if_needed(_predicate_column_ids, _selected_size);
2994
2995
13.3k
    _opts.stats->blocks_load += 1;
2996
13.3k
    _opts.stats->raw_rows_read += _selected_size;
2997
2998
13.3k
    if (_selected_size == 0) {
2999
3.02k
        return _process_eof(block);
3000
3.02k
    }
3001
3002
10.3k
    if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) {
3003
1.73k
        _sel_rowid_idx.resize(_selected_size);
3004
3005
1.73k
        if (_is_need_vec_eval || _is_need_short_eval) {
3006
1.72k
            _convert_dict_code_for_predicate_if_necessary();
3007
3008
            // step 1: evaluate vectorization predicate
3009
1.72k
            _selected_size =
3010
1.72k
                    _evaluate_vectorization_predicate(_sel_rowid_idx.data(), _selected_size);
3011
3012
            // step 2: evaluate short circuit predicate
3013
            // todo(wb) research whether need to read short predicate after vectorization evaluation
3014
            //          to reduce cost of read short circuit columns.
3015
            //          In SSB test, it make no difference; So need more scenarios to test
3016
1.72k
            _selected_size =
3017
1.72k
                    _evaluate_short_circuit_predicate(_sel_rowid_idx.data(), _selected_size);
3018
1.72k
            VLOG_DEBUG << fmt::format("After evaluate predicates, selected size: {} ",
3019
0
                                      _selected_size);
3020
1.72k
            if (_selected_size > 0) {
3021
                // step 3.1: output short circuit and predicate column
3022
                // when lazy materialization enables, _predicate_column_ids = distinct(_short_cir_pred_column_ids + _vec_pred_column_ids)
3023
                // see _vec_init_lazy_materialization
3024
                // todo(wb) need to tell input columnids from output columnids
3025
1.68k
                RETURN_IF_ERROR(_output_column_by_sel_idx(block, _predicate_column_ids,
3026
1.68k
                                                          _sel_rowid_idx.data(), _selected_size));
3027
3028
                // step 3.2: read remaining expr column and evaluate it.
3029
1.68k
                if (_is_need_expr_eval) {
3030
                    // The predicate column contains the remaining expr column, no need second read.
3031
0
                    if (_common_expr_column_ids.size() > 0) {
3032
0
                        SCOPED_RAW_TIMER(&_opts.stats->non_predicate_read_ns);
3033
0
                        RETURN_IF_ERROR(_read_columns_by_rowids(
3034
0
                                _common_expr_column_ids, _block_rowids, _sel_rowid_idx.data(),
3035
0
                                _selected_size, &_current_return_columns));
3036
0
                        _replace_version_col_if_needed(_common_expr_column_ids, _selected_size);
3037
0
                        _update_lsn_col_if_needed(_common_expr_column_ids, _selected_size);
3038
0
                        _update_tso_col_if_needed(_common_expr_column_ids, _selected_size);
3039
0
                        RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
3040
0
                    }
3041
3042
0
                    DCHECK(block->columns() > _schema_block_id_map[*_common_expr_columns.begin()]);
3043
0
                    RETURN_IF_ERROR(
3044
0
                            _process_common_expr(_sel_rowid_idx.data(), _selected_size, block));
3045
0
                }
3046
1.68k
            } else {
3047
38
                _fill_column_nothing();
3048
38
                if (_is_need_expr_eval) {
3049
0
                    RETURN_IF_ERROR(_process_columns(_common_expr_column_ids, block));
3050
0
                }
3051
38
            }
3052
1.72k
        } else if (_is_need_expr_eval) {
3053
6
            DCHECK(!_predicate_column_ids.empty());
3054
6
            RETURN_IF_ERROR(_process_columns(_predicate_column_ids, block));
3055
            // first read all rows are insert block, initialize sel_rowid_idx to all rows.
3056
23
            for (uint16_t i = 0; i < _selected_size; ++i) {
3057
17
                _sel_rowid_idx[i] = i;
3058
17
            }
3059
6
            RETURN_IF_ERROR(_process_common_expr(_sel_rowid_idx.data(), _selected_size, block));
3060
6
        }
3061
3062
1.73k
        RETURN_IF_ERROR(_apply_read_limit_to_selected_rows(block, _selected_size));
3063
3064
        // step4: read non_predicate column
3065
1.73k
        if (_selected_size > 0) {
3066
1.69k
            if (!_non_predicate_columns.empty()) {
3067
1.40k
                RETURN_IF_ERROR(_read_columns_by_rowids(
3068
1.40k
                        _non_predicate_columns, _block_rowids, _sel_rowid_idx.data(),
3069
1.40k
                        _selected_size, &_current_return_columns,
3070
1.40k
                        _opts.condition_cache_digest && !_find_condition_cache));
3071
1.40k
                _replace_version_col_if_needed(_non_predicate_columns, _selected_size);
3072
1.40k
                _update_lsn_col_if_needed(_non_predicate_columns, _selected_size);
3073
1.40k
                _update_tso_col_if_needed(_non_predicate_columns, _selected_size);
3074
1.40k
            } else {
3075
286
                if (_opts.condition_cache_digest && !_find_condition_cache) {
3076
0
                    auto& condition_cache = *_condition_cache;
3077
0
                    for (size_t i = 0; i < _selected_size; ++i) {
3078
0
                        auto rowid = _block_rowids[_sel_rowid_idx[i]];
3079
0
                        condition_cache[rowid / SegmentIterator::CONDITION_CACHE_OFFSET] = true;
3080
0
                    }
3081
0
                }
3082
286
            }
3083
1.69k
        }
3084
1.73k
    }
3085
3086
    // step5: output columns
3087
10.3k
    RETURN_IF_ERROR(_output_non_pred_columns(block));
3088
    // Convert inverted index bitmaps to result columns for virtual column exprs
3089
    // (e.g., MATCH projections). This must run before _materialization_of_virtual_column
3090
    // so that fast_execute() can find the pre-computed result columns.
3091
10.3k
    if (!_virtual_column_exprs.empty()) {
3092
0
        bool use_sel = _is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval;
3093
0
        uint16_t* sel_rowid_idx = use_sel ? _sel_rowid_idx.data() : nullptr;
3094
0
        std::vector<VExprContext*> vir_ctxs;
3095
0
        vir_ctxs.reserve(_virtual_column_exprs.size());
3096
0
        for (auto& [cid, ctx] : _virtual_column_exprs) {
3097
0
            vir_ctxs.push_back(ctx.get());
3098
0
        }
3099
0
        _output_index_result_column(vir_ctxs, sel_rowid_idx, _selected_size, block);
3100
0
    }
3101
10.3k
    RETURN_IF_ERROR(_materialization_of_virtual_column(block));
3102
10.3k
    if (_opts.read_limit > 0) {
3103
0
        _rows_returned += block->rows();
3104
0
    }
3105
10.3k
    return _check_output_block(block);
3106
10.3k
}
3107
3108
6
Status SegmentIterator::_process_columns(const std::vector<ColumnId>& column_ids, Block* block) {
3109
6
    RETURN_IF_ERROR(_convert_to_expected_type(column_ids));
3110
6
    for (auto cid : column_ids) {
3111
6
        auto loc = _schema_block_id_map[cid];
3112
6
        block->replace_by_position(loc, std::move(_current_return_columns[cid]));
3113
6
    }
3114
6
    return Status::OK();
3115
6
}
3116
3117
38
void SegmentIterator::_fill_column_nothing() {
3118
    // If column_predicate filters out all rows, the corresponding column in _current_return_columns[cid] must be a ColumnNothing.
3119
    // Because:
3120
    // 1. Before each batch, _init_return_columns is called to initialize _current_return_columns, and virtual columns in _current_return_columns are initialized as ColumnNothing.
3121
    // 2. When select_size == 0, the read method of VirtualColumnIterator will definitely not be called, so the corresponding Column remains a ColumnNothing
3122
38
    for (const auto pair : _vir_cid_to_idx_in_block) {
3123
0
        auto cid = pair.first;
3124
0
        auto pos = pair.second;
3125
0
        [[maybe_unused]] const auto* nothing_col =
3126
0
                assert_cast<const ColumnNothing*>(_current_return_columns[cid].get());
3127
0
        _current_return_columns[cid] = _opts.vir_col_idx_to_type[pos]->create_column();
3128
0
    }
3129
38
}
3130
3131
10.3k
Status SegmentIterator::_check_output_block(Block* block) {
3132
10.3k
#ifndef NDEBUG
3133
10.3k
    size_t rows = block->rows();
3134
10.3k
    size_t idx = 0;
3135
21.2k
    for (const auto& entry : *block) {
3136
21.2k
        if (!entry.column) {
3137
0
            return Status::InternalError(
3138
0
                    "Column in idx {} is null, block columns {}, normal_columns {}, "
3139
0
                    "virtual_columns {}",
3140
0
                    idx, block->columns(), _schema->num_column_ids(), _virtual_column_exprs.size());
3141
21.2k
        } else if (check_and_get_column<ColumnNothing>(entry.column.get())) {
3142
0
            if (rows > 0) {
3143
0
                std::vector<std::string> vcid_to_idx;
3144
0
                for (const auto& pair : _vir_cid_to_idx_in_block) {
3145
0
                    vcid_to_idx.push_back(fmt::format("{}-{}", pair.first, pair.second));
3146
0
                }
3147
0
                std::string vir_cid_to_idx_in_block_msg =
3148
0
                        fmt::format("_vir_cid_to_idx_in_block:[{}]", fmt::join(vcid_to_idx, ","));
3149
0
                return Status::InternalError(
3150
0
                        "Column in idx {} is nothing, block columns {}, normal_columns {}, "
3151
0
                        "vir_cid_to_idx_in_block_msg {}",
3152
0
                        idx, block->columns(), _schema->num_column_ids(),
3153
0
                        vir_cid_to_idx_in_block_msg);
3154
0
            }
3155
21.2k
        } else if (entry.column->size() != rows) {
3156
0
            return Status::InternalError(
3157
0
                    "Unmatched size {}, expected {}, column: {}, type: {}, idx_in_block: {}, "
3158
0
                    "block: {}",
3159
0
                    entry.column->size(), rows, entry.column->get_name(), entry.type->get_name(),
3160
0
                    idx, block->dump_structure());
3161
0
        }
3162
21.2k
        idx++;
3163
21.2k
    }
3164
10.3k
#endif
3165
10.3k
    return Status::OK();
3166
10.3k
}
3167
3168
0
Status SegmentIterator::_process_column_predicate() {
3169
0
    return Status::OK();
3170
0
}
3171
3172
3.02k
Status SegmentIterator::_process_eof(Block* block) {
3173
    // Convert all columns in _current_return_columns to schema column
3174
3.02k
    RETURN_IF_ERROR(_convert_to_expected_type(_schema->column_ids()));
3175
10.3k
    for (int i = 0; i < block->columns(); i++) {
3176
7.28k
        auto cid = _schema->column_id(i);
3177
7.28k
        if (!_is_pred_column[cid]) {
3178
6.98k
            block->replace_by_position(i, std::move(_current_return_columns[cid]));
3179
6.98k
        }
3180
7.28k
    }
3181
3.02k
    block->clear_column_data();
3182
    // clear and release iterators memory footprint in advance
3183
3.02k
    _column_iterators.clear();
3184
3.02k
    _index_iterators.clear();
3185
3.02k
    return Status::EndOfFile("no more data in segment");
3186
3.02k
}
3187
3188
Status SegmentIterator::_process_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size,
3189
6
                                             Block* block) {
3190
    // Here we just use col0 as row_number indicator. when reach here, we will calculate the predicates first.
3191
    //  then use the result to reduce our data read(that is, expr push down). there's now row in block means the first
3192
    //  column is not in common expr. so it's safe to replace it temporarily to provide correct `selected_size`.
3193
6
    VLOG_DEBUG << fmt::format("Execute common expr. block rows {}, selected size {}", block->rows(),
3194
0
                              _selected_size);
3195
3196
6
    bool need_mock_col = block->rows() != selected_size;
3197
6
    MutableColumnPtr col0;
3198
6
    if (need_mock_col) {
3199
6
        col0 = std::move(*block->get_by_position(0).column).mutate();
3200
6
        block->replace_by_position(
3201
6
                0, block->get_by_position(0).type->create_column_const_with_default_value(
3202
6
                           _selected_size));
3203
6
    }
3204
3205
6
    std::vector<VExprContext*> common_ctxs;
3206
6
    common_ctxs.reserve(_common_expr_ctxs_push_down.size());
3207
6
    for (auto& ctx : _common_expr_ctxs_push_down) {
3208
6
        common_ctxs.push_back(ctx.get());
3209
6
    }
3210
6
    _output_index_result_column(common_ctxs, _sel_rowid_idx.data(), _selected_size, block);
3211
6
    RETURN_IF_ERROR(_execute_common_expr(_sel_rowid_idx.data(), _selected_size, block));
3212
3213
6
    if (need_mock_col) {
3214
6
        block->replace_by_position(0, std::move(col0));
3215
6
    }
3216
3217
6
    VLOG_DEBUG << fmt::format("Execute common expr end. block rows {}, selected size {}",
3218
0
                              block->rows(), _selected_size);
3219
6
    return Status::OK();
3220
6
}
3221
3222
Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t& selected_size,
3223
6
                                             Block* block) {
3224
6
    SCOPED_RAW_TIMER(&_opts.stats->expr_filter_ns);
3225
6
    DCHECK(!_common_expr_ctxs_push_down.empty());
3226
6
    DCHECK(block->rows() != 0);
3227
6
    int prev_columns = block->columns();
3228
6
    uint16_t original_size = selected_size;
3229
6
    _opts.stats->expr_cond_input_rows += original_size;
3230
3231
6
    IColumn::Filter filter;
3232
6
    RETURN_IF_ERROR(VExprContext::execute_conjuncts_and_filter_block(
3233
6
            _common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter));
3234
3235
6
    selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter);
3236
6
    _opts.stats->rows_expr_cond_filtered += original_size - selected_size;
3237
6
    return Status::OK();
3238
6
}
3239
3240
uint16_t SegmentIterator::_evaluate_common_expr_filter(uint16_t* sel_rowid_idx,
3241
                                                       uint16_t selected_size,
3242
6
                                                       const IColumn::Filter& filter) {
3243
6
    size_t count = filter.size() - simd::count_zero_num((int8_t*)filter.data(), filter.size());
3244
6
    if (count == 0) {
3245
0
        return 0;
3246
6
    } else {
3247
6
        const UInt8* filt_pos = filter.data();
3248
3249
6
        uint16_t new_size = 0;
3250
6
        uint32_t sel_pos = 0;
3251
6
        const uint32_t sel_end = selected_size;
3252
6
        static constexpr size_t SIMD_BYTES = simd::bits_mask_length();
3253
6
        const uint32_t sel_end_simd = sel_pos + selected_size / SIMD_BYTES * SIMD_BYTES;
3254
3255
6
        while (sel_pos < sel_end_simd) {
3256
0
            auto mask = simd::bytes_mask_to_bits_mask(filt_pos + sel_pos);
3257
0
            if (0 == mask) {
3258
                //pass
3259
0
            } else if (simd::bits_mask_all() == mask) {
3260
0
                for (uint32_t i = 0; i < SIMD_BYTES; i++) {
3261
0
                    sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + i];
3262
0
                }
3263
0
            } else {
3264
0
                simd::iterate_through_bits_mask(
3265
0
                        [&](const size_t bit_pos) {
3266
0
                            sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos + bit_pos];
3267
0
                        },
3268
0
                        mask);
3269
0
            }
3270
0
            sel_pos += SIMD_BYTES;
3271
0
        }
3272
3273
23
        for (; sel_pos < sel_end; sel_pos++) {
3274
17
            if (filt_pos[sel_pos]) {
3275
7
                sel_rowid_idx[new_size++] = sel_rowid_idx[sel_pos];
3276
7
            }
3277
17
        }
3278
6
        return new_size;
3279
6
    }
3280
6
}
3281
3282
void SegmentIterator::_output_index_result_column(const std::vector<VExprContext*>& expr_ctxs,
3283
                                                  uint16_t* sel_rowid_idx, uint16_t select_size,
3284
6
                                                  Block* block) {
3285
6
    SCOPED_RAW_TIMER(&_opts.stats->output_index_result_column_timer);
3286
6
    if (block->rows() == 0) {
3287
0
        return;
3288
0
    }
3289
6
    for (auto* expr_ctx_ptr : expr_ctxs) {
3290
6
        auto index_ctx = expr_ctx_ptr->get_index_context();
3291
6
        if (index_ctx == nullptr) {
3292
0
            continue;
3293
0
        }
3294
6
        for (auto& inverted_index_result_bitmap_for_expr : index_ctx->get_index_result_bitmap()) {
3295
0
            const auto* expr = inverted_index_result_bitmap_for_expr.first;
3296
0
            const auto& result_bitmap = inverted_index_result_bitmap_for_expr.second;
3297
0
            const auto& index_result_bitmap = result_bitmap.get_data_bitmap();
3298
0
            auto index_result_column = ColumnUInt8::create();
3299
0
            ColumnUInt8::Container& vec_match_pred = index_result_column->get_data();
3300
0
            vec_match_pred.resize(block->rows());
3301
0
            std::fill(vec_match_pred.begin(), vec_match_pred.end(), 0);
3302
3303
0
            const auto& null_bitmap = result_bitmap.get_null_bitmap();
3304
0
            bool has_null_bitmap = null_bitmap != nullptr && !null_bitmap->isEmpty();
3305
0
            bool expr_returns_nullable = expr->data_type()->is_nullable();
3306
3307
0
            ColumnUInt8::MutablePtr null_map_column = nullptr;
3308
0
            ColumnUInt8::Container* null_map_data = nullptr;
3309
0
            if (has_null_bitmap && expr_returns_nullable) {
3310
0
                null_map_column = ColumnUInt8::create();
3311
0
                auto& null_map_vec = null_map_column->get_data();
3312
0
                null_map_vec.resize(block->rows());
3313
0
                std::fill(null_map_vec.begin(), null_map_vec.end(), 0);
3314
0
                null_map_data = &null_map_column->get_data();
3315
0
            }
3316
3317
0
            roaring::BulkContext bulk_context;
3318
0
            for (uint32_t i = 0; i < select_size; i++) {
3319
0
                auto rowid = sel_rowid_idx ? _block_rowids[sel_rowid_idx[i]] : _block_rowids[i];
3320
0
                if (index_result_bitmap) {
3321
0
                    vec_match_pred[i] = index_result_bitmap->containsBulk(bulk_context, rowid);
3322
0
                }
3323
0
                if (null_map_data != nullptr && null_bitmap->contains(rowid)) {
3324
0
                    (*null_map_data)[i] = 1;
3325
0
                    vec_match_pred[i] = 0;
3326
0
                }
3327
0
            }
3328
3329
0
            DCHECK(block->rows() == vec_match_pred.size());
3330
3331
0
            if (null_map_column) {
3332
0
                index_ctx->set_index_result_column_for_expr(
3333
0
                        expr, ColumnNullable::create(std::move(index_result_column),
3334
0
                                                     std::move(null_map_column)));
3335
0
            } else {
3336
0
                index_ctx->set_index_result_column_for_expr(expr, std::move(index_result_column));
3337
0
            }
3338
0
        }
3339
6
    }
3340
6
}
3341
3342
1.72k
void SegmentIterator::_convert_dict_code_for_predicate_if_necessary() {
3343
1.72k
    for (auto predicate : _short_cir_eval_predicate) {
3344
0
        _convert_dict_code_for_predicate_if_necessary_impl(predicate);
3345
0
    }
3346
3347
1.72k
    for (auto predicate : _pre_eval_block_predicate) {
3348
28
        _convert_dict_code_for_predicate_if_necessary_impl(predicate);
3349
28
    }
3350
3351
1.72k
    for (auto column_id : _delete_range_column_ids) {
3352
1.55k
        _current_return_columns[column_id].get()->convert_dict_codes_if_necessary();
3353
1.55k
    }
3354
3355
1.72k
    for (auto column_id : _delete_bloom_filter_column_ids) {
3356
0
        _current_return_columns[column_id].get()->initialize_hash_values_for_runtime_filter();
3357
0
    }
3358
1.72k
}
3359
3360
void SegmentIterator::_convert_dict_code_for_predicate_if_necessary_impl(
3361
28
        std::shared_ptr<ColumnPredicate> predicate) {
3362
28
    auto& column = _current_return_columns[predicate->column_id()];
3363
28
    auto* col_ptr = column.get();
3364
3365
28
    if (PredicateTypeTraits::is_range(predicate->type())) {
3366
22
        col_ptr->convert_dict_codes_if_necessary();
3367
22
    } else if (PredicateTypeTraits::is_bloom_filter(predicate->type())) {
3368
0
        col_ptr->initialize_hash_values_for_runtime_filter();
3369
0
    }
3370
28
}
3371
3372
2.93k
Status SegmentIterator::current_block_row_locations(std::vector<RowLocation>* block_row_locations) {
3373
2.93k
    DCHECK(_opts.record_rowids);
3374
2.93k
    DCHECK_GE(_block_rowids.size(), _selected_size);
3375
2.93k
    block_row_locations->resize(_selected_size);
3376
2.93k
    uint32_t sid = segment_id();
3377
2.93k
    if (!_is_need_vec_eval && !_is_need_short_eval && !_is_need_expr_eval) {
3378
4.24M
        for (auto i = 0; i < _selected_size; i++) {
3379
4.23M
            (*block_row_locations)[i] = RowLocation(sid, _block_rowids[i]);
3380
4.23M
        }
3381
1.76k
    } else {
3382
2.46M
        for (auto i = 0; i < _selected_size; i++) {
3383
2.46M
            (*block_row_locations)[i] = RowLocation(sid, _block_rowids[_sel_rowid_idx[i]]);
3384
2.46M
        }
3385
1.16k
    }
3386
2.93k
    return Status::OK();
3387
2.93k
}
3388
3389
3.03k
Status SegmentIterator::_construct_compound_expr_context() {
3390
3.03k
    ColumnIteratorOptions iter_opts {
3391
3.03k
            .use_page_cache = _opts.use_page_cache,
3392
3.03k
            .file_reader = _file_reader.get(),
3393
3.03k
            .stats = _opts.stats,
3394
3.03k
            .io_ctx = _opts.io_ctx,
3395
3.03k
    };
3396
3.03k
    auto inverted_index_context = std::make_shared<IndexExecContext>(
3397
3.03k
            _schema->column_ids(), _index_iterators, _storage_name_and_type,
3398
3.03k
            _common_expr_index_exec_status, _score_runtime, _segment.get(), iter_opts);
3399
3.03k
    inverted_index_context->set_index_query_context(_index_query_context);
3400
3.03k
    for (const auto& expr_ctx : _opts.common_expr_ctxs_push_down) {
3401
21
        VExprContextSPtr context;
3402
        // _ann_range_search_runtime will do deep copy.
3403
21
        RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context));
3404
21
        context->set_index_context(inverted_index_context);
3405
21
        _common_expr_ctxs_push_down.emplace_back(context);
3406
21
    }
3407
    // Clone virtual column exprs before setting IndexExecContext, because
3408
    // IndexExecContext holds segment-specific index iterator references.
3409
    // Without cloning, shared VExprContext would be overwritten per-segment
3410
    // and could point to the wrong segment's context.
3411
3.03k
    for (auto& [cid, expr_ctx] : _virtual_column_exprs) {
3412
0
        VExprContextSPtr context;
3413
0
        RETURN_IF_ERROR(expr_ctx->clone(_opts.runtime_state, context));
3414
0
        context->set_index_context(inverted_index_context);
3415
0
        expr_ctx = context;
3416
0
    }
3417
3.03k
    RETURN_IF_ERROR(rebind_storage_exprs_to_reader_schema(
3418
3.03k
            _opts, *_schema, _common_expr_ctxs_push_down, _virtual_column_exprs));
3419
3.03k
    return Status::OK();
3420
3.03k
}
3421
3422
Status SegmentIterator::_apply_expr_zonemap_to_row_ranges(const VExprContextSPtrs& conjuncts,
3423
                                                          rowid_t min_rowid,
3424
7
                                                          RowRanges* row_ranges) {
3425
7
    DORIS_CHECK(row_ranges != nullptr);
3426
7
    if (!expr_zonemap::is_expr_zonemap_filter_enabled(_opts.runtime_state) || conjuncts.empty() ||
3427
7
        row_ranges->is_empty()) {
3428
0
        return Status::OK();
3429
0
    }
3430
3431
7
    std::unordered_map<int, VExprContextSPtrs> ctxs_by_slot;
3432
7
    for (const auto& conjunct : conjuncts) {
3433
7
        auto slot_index = expr_zonemap::single_slot_zonemap_index(conjunct);
3434
7
        if (slot_index >= 0) {
3435
5
            ctxs_by_slot[slot_index].emplace_back(conjunct);
3436
5
        }
3437
7
    }
3438
    // Page zone maps are stored per column. Multi-slot expressions need page alignment across
3439
    // multiple column readers and are therefore left to segment-level pruning for now.
3440
7
    if (ctxs_by_slot.empty()) {
3441
2
        return Status::OK();
3442
2
    }
3443
3444
5
    ColumnIteratorOptions iter_opts {
3445
5
            .use_page_cache = _opts.use_page_cache,
3446
5
            .file_reader = _file_reader.get(),
3447
5
            .stats = _opts.stats,
3448
5
            .io_ctx = _opts.io_ctx,
3449
5
    };
3450
5
    for (const auto& [slot_index, slot_conjuncts] : ctxs_by_slot) {
3451
5
        if (cast_set<size_t>(slot_index) >= _schema->num_column_ids()) {
3452
0
            continue;
3453
0
        }
3454
5
        const auto cid = _schema->column_id(cast_set<size_t>(slot_index));
3455
5
        if (!_segment->can_apply_predicate_safely(cid, *_schema,
3456
5
                                                  _opts.target_cast_type_for_variants, _opts)) {
3457
0
            continue;
3458
0
        }
3459
5
        const auto* tablet_column = _schema->column(cid);
3460
5
        if (tablet_column == nullptr) {
3461
0
            continue;
3462
0
        }
3463
5
        std::shared_ptr<ColumnReader> reader;
3464
5
        Status st = _segment->get_column_reader(*tablet_column, &reader, _opts.stats);
3465
5
        if (st.is<ErrorCode::NOT_FOUND>()) {
3466
4
            continue;
3467
4
        }
3468
1
        RETURN_IF_ERROR(st);
3469
1
        if (reader == nullptr || !reader->has_zone_map()) {
3470
0
            continue;
3471
0
        }
3472
1
        const std::vector<ZoneMapPB>* page_zone_maps = nullptr;
3473
1
        RETURN_IF_ERROR(reader->get_page_zone_maps(iter_opts, &page_zone_maps));
3474
1
        if (page_zone_maps == nullptr || page_zone_maps->empty()) {
3475
0
            continue;
3476
0
        }
3477
1
        auto data_type = _segment->get_data_type_of(*tablet_column, _opts);
3478
1
        if (data_type == nullptr) {
3479
0
            continue;
3480
0
        }
3481
3482
1
        RowRanges column_ranges;
3483
1
        ZoneMapEvalStats page_stats;
3484
9
        for (uint32_t page_index = 0; page_index < page_zone_maps->size(); ++page_index) {
3485
8
            RowRange page_range;
3486
8
            RETURN_IF_ERROR(reader->get_row_range_for_page(page_index, iter_opts, &page_range));
3487
8
            if (!page_range.is_valid() || page_range.to() <= min_rowid) {
3488
0
                continue;
3489
0
            }
3490
8
            ZoneMapEvalContext ctx;
3491
8
            ZoneMapEvalContext::SlotZoneMap slot_zone_map;
3492
8
            slot_zone_map.data_type = data_type;
3493
8
            ZoneMap zone_map;
3494
8
            RETURN_IF_ERROR(
3495
8
                    ZoneMap::from_proto((*page_zone_maps)[page_index], data_type, zone_map));
3496
8
            slot_zone_map.zone_map = std::make_shared<ZoneMap>(std::move(zone_map));
3497
8
            ctx.slots.emplace(slot_index, std::move(slot_zone_map));
3498
8
            const auto result = VExprContext::evaluate_zonemap_filter(slot_conjuncts, ctx);
3499
8
            page_stats.merge_page_eval_stats(ctx.stats);
3500
8
            if (result != ZoneMapFilterResult::kNoMatch) {
3501
4
                column_ranges.add(
3502
4
                        RowRange(std::max<int64_t>(page_range.from(), min_rowid), page_range.to()));
3503
4
            } else {
3504
4
                ++_opts.stats->expr_zonemap_filtered_pages;
3505
4
            }
3506
8
        }
3507
1
        page_stats.accumulate_to(_opts.stats);
3508
1
        RowRanges::ranges_intersection(*row_ranges, column_ranges, row_ranges);
3509
1
        if (row_ranges->is_empty()) {
3510
0
            return Status::OK();
3511
0
        }
3512
1
    }
3513
5
    return Status::OK();
3514
5
}
3515
3516
3.03k
void SegmentIterator::_calculate_common_expr_index_exec_status() {
3517
3.03k
    for (const auto& root_expr_ctx : _common_expr_ctxs_push_down) {
3518
21
        const auto& root_expr = root_expr_ctx->root();
3519
21
        if (root_expr == nullptr) {
3520
0
            continue;
3521
0
        }
3522
21
        _common_expr_to_slotref_map[root_expr_ctx.get()] = std::unordered_map<ColumnId, VExpr*>();
3523
3524
21
        std::stack<VExprSPtr> stack;
3525
21
        stack.emplace(root_expr);
3526
3527
42
        while (!stack.empty()) {
3528
21
            const auto& expr = stack.top();
3529
21
            stack.pop();
3530
3531
30
            for (const auto& child : expr->children()) {
3532
30
                if (child->is_virtual_slot_ref()) {
3533
                    // Expand virtual slot ref to its underlying expression tree and
3534
                    // collect real slot refs used inside. We still associate those
3535
                    // slot refs with the current parent expr node for inverted index
3536
                    // tracking, just like normal slot refs.
3537
0
                    auto* vir_slot_ref = assert_cast<VirtualSlotRef*>(child.get());
3538
0
                    auto vir_expr = vir_slot_ref->get_virtual_column_expr();
3539
0
                    if (vir_expr) {
3540
0
                        std::stack<VExprSPtr> vir_stack;
3541
0
                        vir_stack.emplace(vir_expr);
3542
3543
0
                        while (!vir_stack.empty()) {
3544
0
                            const auto& vir_node = vir_stack.top();
3545
0
                            vir_stack.pop();
3546
3547
0
                            for (const auto& vir_child : vir_node->children()) {
3548
0
                                if (vir_child->is_slot_ref()) {
3549
0
                                    auto* inner_slot_ref = assert_cast<VSlotRef*>(vir_child.get());
3550
0
                                    _common_expr_index_exec_status[_schema->column_id(
3551
0
                                            inner_slot_ref->column_id())][expr.get()] = false;
3552
0
                                    _common_expr_to_slotref_map[root_expr_ctx.get()]
3553
0
                                                               [inner_slot_ref->column_id()] =
3554
0
                                                                       expr.get();
3555
0
                                }
3556
3557
0
                                if (!vir_child->children().empty()) {
3558
0
                                    vir_stack.emplace(vir_child);
3559
0
                                }
3560
0
                            }
3561
0
                        }
3562
0
                    }
3563
0
                }
3564
                // Example: CAST(v['a'] AS VARCHAR) MATCH 'hello', do not add CAST expr to index tracking.
3565
30
                auto expr_without_cast = VExpr::expr_without_cast(child);
3566
30
                if (expr_without_cast->is_slot_ref() && expr->op() != TExprOpcode::CAST) {
3567
20
                    auto* column_slot_ref = assert_cast<VSlotRef*>(expr_without_cast.get());
3568
20
                    _common_expr_index_exec_status[_schema->column_id(column_slot_ref->column_id())]
3569
20
                                                  [expr.get()] = false;
3570
20
                    _common_expr_to_slotref_map[root_expr_ctx.get()][column_slot_ref->column_id()] =
3571
20
                            expr.get();
3572
20
                }
3573
30
            }
3574
3575
21
            const auto& children = expr->children();
3576
51
            for (int i = cast_set<int>(children.size()) - 1; i >= 0; --i) {
3577
30
                if (!children[i]->children().empty()) {
3578
0
                    stack.emplace(children[i]);
3579
0
                }
3580
30
            }
3581
21
        }
3582
21
    }
3583
3.03k
}
3584
3585
bool SegmentIterator::_no_need_read_key_data(ColumnId cid, MutableColumnPtr& column,
3586
28.0k
                                             size_t nrows_read) {
3587
28.0k
    if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_no_need_read_data_opt) {
3588
0
        return false;
3589
0
    }
3590
3591
28.0k
    if (!((_opts.tablet_schema->keys_type() == KeysType::DUP_KEYS ||
3592
28.0k
           (_opts.tablet_schema->keys_type() == KeysType::UNIQUE_KEYS &&
3593
11.1k
            _opts.enable_unique_key_merge_on_write)))) {
3594
7.58k
        return false;
3595
7.58k
    }
3596
3597
20.4k
    if (_opts.push_down_agg_type_opt != TPushAggOp::COUNT_ON_INDEX) {
3598
20.4k
        return false;
3599
20.4k
    }
3600
3601
20
    if (!_opts.tablet_schema->column(cid).is_key()) {
3602
14
        return false;
3603
14
    }
3604
3605
6
    if (_has_delete_predicate(cid)) {
3606
0
        return false;
3607
0
    }
3608
3609
6
    if (!_check_all_conditions_passed_inverted_index_for_column(cid)) {
3610
6
        return false;
3611
6
    }
3612
3613
0
    if (is_column_nullable(*column)) {
3614
0
        auto* nullable_col_ptr = reinterpret_cast<ColumnNullable*>(column.get());
3615
0
        nullable_col_ptr->get_null_map_column().insert_many_defaults(nrows_read);
3616
0
        nullable_col_ptr->get_nested_column_ptr()->insert_many_defaults(nrows_read);
3617
0
    } else {
3618
0
        column->insert_many_defaults(nrows_read);
3619
0
    }
3620
3621
0
    return true;
3622
6
}
3623
3624
20.4k
bool SegmentIterator::_has_delete_predicate(ColumnId cid) {
3625
20.4k
    std::set<uint32_t> delete_columns_set;
3626
20.4k
    _opts.delete_condition_predicates->get_all_column_ids(delete_columns_set);
3627
20.4k
    return delete_columns_set.contains(cid);
3628
20.4k
}
3629
3630
13.3k
bool SegmentIterator::_can_opt_limit_reads() {
3631
13.3k
    if (_opts.read_limit == 0) {
3632
13.3k
        return false;
3633
13.3k
    }
3634
3635
    // If SegmentIterator still needs to evaluate predicates/common exprs, LIMIT must be applied to
3636
    // post-filter rows by _apply_read_limit_to_selected_rows(); capping the raw read here could
3637
    // return fewer rows than the query LIMIT.
3638
7
    if (_is_need_vec_eval || _is_need_short_eval || _is_need_expr_eval) {
3639
3
        return false;
3640
3
    }
3641
3642
4
    if (_opts.delete_condition_predicates->num_of_column_predicate() > 0) {
3643
1
        return false;
3644
1
    }
3645
3646
3
    bool all_true = std::ranges::all_of(_schema->column_ids(), [this](auto cid) {
3647
3
        if (cid == _opts.tablet_schema->delete_sign_idx()) {
3648
0
            return true;
3649
0
        }
3650
3
        if (_check_all_conditions_passed_inverted_index_for_column(cid, true)) {
3651
2
            return true;
3652
2
        }
3653
1
        return false;
3654
3
    });
3655
3656
3
    DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
3657
3
        LOG(INFO) << "col_predicates: " << _col_predicates.size() << ", all_true: " << all_true;
3658
3
    })
3659
3660
3
    DBUG_EXECUTE_IF("segment_iterator.topn_opt_2", {
3661
3
        if (all_true) {
3662
3
            return Status::Error<ErrorCode::INTERNAL_ERROR>("topn opt 2 execute failed");
3663
3
        }
3664
3
    })
3665
3666
3
    return all_true;
3667
3
}
3668
3669
// Before get next batch. make sure all virtual columns in block has type ColumnNothing.
3670
13.3k
void SegmentIterator::_init_virtual_columns(Block* block) {
3671
13.3k
    for (const auto& pair : _vir_cid_to_idx_in_block) {
3672
0
        auto& col_with_type_and_name = block->get_by_position(pair.second);
3673
0
        col_with_type_and_name.column = ColumnNothing::create(0);
3674
0
        col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second];
3675
0
    }
3676
13.3k
}
3677
3678
10.3k
Status SegmentIterator::_materialization_of_virtual_column(Block* block) {
3679
    // Some expr can not process empty block, such as function `element_at`.
3680
    // So materialize virtual column in advance to avoid errors.
3681
10.3k
    if (block->rows() == 0) {
3682
38
        for (const auto& pair : _vir_cid_to_idx_in_block) {
3683
0
            auto& col_with_type_and_name = block->get_by_position(pair.second);
3684
0
            col_with_type_and_name.column = _opts.vir_col_idx_to_type[pair.second]->create_column();
3685
0
            col_with_type_and_name.type = _opts.vir_col_idx_to_type[pair.second];
3686
0
        }
3687
38
        return Status::OK();
3688
38
    }
3689
3690
10.2k
    for (const auto& cid_and_expr : _virtual_column_exprs) {
3691
0
        auto cid = cid_and_expr.first;
3692
0
        auto column_expr = cid_and_expr.second;
3693
0
        size_t idx_in_block = _vir_cid_to_idx_in_block[cid];
3694
0
        if (block->columns() <= idx_in_block) {
3695
0
            return Status::InternalError(
3696
0
                    "Virtual column index {} is out of range, block columns {}, "
3697
0
                    "virtual columns size {}, virtual column expr {}",
3698
0
                    idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(),
3699
0
                    column_expr->root()->debug_string());
3700
0
        } else if (block->get_by_position(idx_in_block).column.get() == nullptr) {
3701
0
            return Status::InternalError(
3702
0
                    "Virtual column index {} is null, block columns {}, virtual columns size "
3703
0
                    "{}, "
3704
0
                    "virtual column expr {}",
3705
0
                    idx_in_block, block->columns(), _vir_cid_to_idx_in_block.size(),
3706
0
                    column_expr->root()->debug_string());
3707
0
        }
3708
0
        if (check_and_get_column<const ColumnNothing>(
3709
0
                    block->get_by_position(idx_in_block).column.get())) {
3710
0
            VLOG_DEBUG << fmt::format("Virtual column is doing materialization, cid {}, col idx {}",
3711
0
                                      cid, idx_in_block);
3712
0
            ColumnPtr result_column;
3713
0
            RETURN_IF_ERROR(column_expr->execute(block, result_column));
3714
3715
0
            block->replace_by_position(idx_in_block, std::move(result_column));
3716
0
            if (block->get_by_position(idx_in_block).column->size() == 0) {
3717
0
                LOG_WARNING("Result of expr column {} is empty. cid {}, idx_in_block {}",
3718
0
                            column_expr->root()->debug_string(), cid, idx_in_block);
3719
0
            }
3720
0
        }
3721
0
    }
3722
10.2k
    return Status::OK();
3723
10.2k
}
3724
3725
3.03k
void SegmentIterator::_prepare_score_column_materialization() {
3726
3.03k
    if (_score_runtime == nullptr) {
3727
3.03k
        return;
3728
3.03k
    }
3729
3730
0
    ScoreRangeFilterPtr filter;
3731
0
    if (_score_runtime->has_score_range_filter()) {
3732
0
        const auto& range_info = _score_runtime->get_score_range_info();
3733
0
        filter = std::make_shared<ScoreRangeFilter>(range_info->op, range_info->threshold);
3734
0
    }
3735
3736
0
    IColumn::MutablePtr result_column;
3737
0
    auto result_row_ids = std::make_unique<std::vector<uint64_t>>();
3738
0
    if (_score_runtime->get_limit() > 0 && _col_predicates.empty() &&
3739
0
        _common_expr_ctxs_push_down.empty()) {
3740
0
        OrderType order_type = _score_runtime->is_asc() ? OrderType::ASC : OrderType::DESC;
3741
0
        _index_query_context->collection_similarity->get_topn_bm25_scores(
3742
0
                &_row_bitmap, result_column, result_row_ids, order_type,
3743
0
                _score_runtime->get_limit(), filter);
3744
0
    } else {
3745
0
        _index_query_context->collection_similarity->get_bm25_scores(&_row_bitmap, result_column,
3746
0
                                                                     result_row_ids, filter);
3747
0
    }
3748
0
    const size_t dst_col_idx = _score_runtime->get_dest_column_idx();
3749
0
    auto* column_iter = _column_iterators[_schema->column_id(dst_col_idx)].get();
3750
0
    auto* virtual_column_iter = dynamic_cast<VirtualColumnIterator*>(column_iter);
3751
0
    virtual_column_iter->prepare_materialization(
3752
0
            std::move(result_column),
3753
0
            std::shared_ptr<std::vector<uint64_t>>(std::move(result_row_ids)));
3754
0
}
3755
3756
} // namespace segment_v2
3757
} // namespace doris