Coverage Report

Created: 2026-03-12 17:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/storage/segment/column_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Descriptors_types.h>
21
#include <gen_cpp/segment_v2.pb.h>
22
#include <sys/types.h>
23
24
#include <cstddef> // for size_t
25
#include <cstdint> // for uint32_t
26
#include <memory>  // for unique_ptr
27
#include <string>
28
#include <utility>
29
#include <vector>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/status.h"               // for Status
34
#include "core/column/column_array.h"    // ColumnArray
35
#include "core/column/column_nullable.h" // NullMap
36
#include "core/data_type/data_type.h"
37
#include "io/cache/cached_remote_file_reader.h"
38
#include "io/fs/file_reader_writer_fwd.h"
39
#include "io/io_common.h"
40
#include "storage/index/index_reader.h"
41
#include "storage/index/ordinal_page_index.h" // for OrdinalPageIndexIterator
42
#include "storage/index/zone_map/zone_map_index.h"
43
#include "storage/olap_common.h"
44
#include "storage/predicate/column_predicate.h"
45
#include "storage/segment/common.h"
46
#include "storage/segment/page_handle.h" // for PageHandle
47
#include "storage/segment/page_pointer.h"
48
#include "storage/segment/parsed_page.h" // for ParsedPage
49
#include "storage/segment/segment_prefetcher.h"
50
#include "storage/segment/stream_reader.h"
51
#include "storage/tablet/tablet_schema.h"
52
#include "storage/types.h"
53
#include "storage/utils.h"
54
#include "util/once.h"
55
56
namespace doris {
57
#include "common/compile_check_begin.h"
58
59
class BlockCompressionCodec;
60
class AndBlockColumnPredicate;
61
class ColumnPredicate;
62
class TabletIndex;
63
class StorageReadOptions;
64
65
namespace io {
66
class FileReader;
67
} // namespace io
68
struct Slice;
69
struct StringRef;
70
71
using TColumnAccessPaths = std::vector<TColumnAccessPath>;
72
73
namespace segment_v2 {
74
class EncodingInfo;
75
class ColumnIterator;
76
class BloomFilterIndexReader;
77
class InvertedIndexIterator;
78
class InvertedIndexReader;
79
class IndexFileReader;
80
class PageDecoder;
81
class RowRanges;
82
class ZoneMapIndexReader;
83
class IndexIterator;
84
class ColumnMetaAccessor;
85
86
struct ColumnReaderOptions {
87
    // whether verify checksum when read page
88
    bool verify_checksum = true;
89
    // for in memory olap table, use DURABLE CachePriority in page cache
90
    bool kept_in_memory = false;
91
92
    int be_exec_version = -1;
93
94
    TabletSchemaSPtr tablet_schema = nullptr;
95
};
96
97
struct ColumnIteratorOptions {
98
    bool use_page_cache = false;
99
    bool is_predicate_column = false;
100
    // for page cache allocation
101
    // page types are divided into DATA_PAGE & INDEX_PAGE
102
    // INDEX_PAGE including index_page, dict_page and short_key_page
103
    PageTypePB type = PageTypePB::UNKNOWN_PAGE_TYPE;
104
    io::FileReader* file_reader = nullptr; // Ref
105
    // reader statistics
106
    OlapReaderStatistics* stats = nullptr; // Ref
107
    io::IOContext io_ctx;
108
109
2.12M
    void sanity_check() const {
110
2.12M
        CHECK_NOTNULL(file_reader);
111
2.12M
        CHECK_NOTNULL(stats);
112
2.12M
    }
113
};
114
115
class ColumnIterator;
116
class OffsetFileColumnIterator;
117
class FileColumnIterator;
118
119
using ColumnIteratorUPtr = std::unique_ptr<ColumnIterator>;
120
using OffsetFileColumnIteratorUPtr = std::unique_ptr<OffsetFileColumnIterator>;
121
using FileColumnIteratorUPtr = std::unique_ptr<FileColumnIterator>;
122
using ColumnIteratorSPtr = std::shared_ptr<ColumnIterator>;
123
124
// There will be concurrent users to read the same column. So
125
// we should do our best to reduce resource usage through share
126
// same information, such as OrdinalPageIndex and Page data.
127
// This will cache data shared by all reader
128
class ColumnReader : public MetadataAdder<ColumnReader>,
129
                     public std::enable_shared_from_this<ColumnReader> {
130
public:
131
    ColumnReader();
132
    // Create an initialized ColumnReader in *reader.
133
    // This should be a lightweight operation without I/O.
134
    static Status create(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
135
                         uint64_t num_rows, const io::FileReaderSPtr& file_reader,
136
                         std::shared_ptr<ColumnReader>* reader);
137
138
    static Status create_array(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
139
                               const io::FileReaderSPtr& file_reader,
140
                               std::shared_ptr<ColumnReader>* reader);
141
    static Status create_map(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
142
                             const io::FileReaderSPtr& file_reader,
143
                             std::shared_ptr<ColumnReader>* reader);
144
    static Status create_struct(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
145
                                uint64_t num_rows, const io::FileReaderSPtr& file_reader,
146
                                std::shared_ptr<ColumnReader>* reader);
147
    static Status create_agg_state(const ColumnReaderOptions& opts, const ColumnMetaPB& meta,
148
                                   uint64_t num_rows, const io::FileReaderSPtr& file_reader,
149
                                   std::shared_ptr<ColumnReader>* reader);
150
151
    enum DictEncodingType { UNKNOWN_DICT_ENCODING, PARTIAL_DICT_ENCODING, ALL_DICT_ENCODING };
152
153
    static bool is_compaction_reader_type(ReaderType type);
154
155
    ~ColumnReader() override;
156
157
    // create a new column iterator. Client should delete returned iterator
158
    virtual Status new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* col,
159
                                const StorageReadOptions*);
160
    Status new_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
161
    Status new_array_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
162
    Status new_struct_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
163
    Status new_map_iterator(ColumnIteratorUPtr* iterator, const TabletColumn* tablet_column);
164
    Status new_agg_state_iterator(ColumnIteratorUPtr* iterator);
165
166
    Status new_index_iterator(const std::shared_ptr<IndexFileReader>& index_file_reader,
167
                              const TabletIndex* index_meta,
168
                              std::unique_ptr<IndexIterator>* iterator);
169
170
    Status seek_at_or_before(ordinal_t ordinal, OrdinalPageIndexIterator* iter,
171
                             const ColumnIteratorOptions& iter_opts);
172
    Status get_ordinal_index_reader(OrdinalIndexReader*& reader,
173
                                    OlapReaderStatistics* index_load_stats);
174
175
    // read a page from file into a page handle
176
    Status read_page(const ColumnIteratorOptions& iter_opts, const PagePointer& pp,
177
                     PageHandle* handle, Slice* page_body, PageFooterPB* footer,
178
                     BlockCompressionCodec* codec, bool is_dict_page = false) const;
179
180
1.70M
    bool is_nullable() const { return _meta_is_nullable; }
181
182
23.6M
    const EncodingInfo* encoding_info() const { return _encoding_info; }
183
184
1.44M
    bool has_zone_map() const { return _zone_map_index != nullptr; }
185
    bool has_bloom_filter_index(bool ngram) const;
186
    // Check if this column could match `cond' using segment zone map.
187
    // Since segment zone map is stored in metadata, this function is fast without I/O.
188
    // set matched to true if segment zone map is absent or `cond' could be satisfied, false otherwise.
189
    Status match_condition(const AndBlockColumnPredicate* col_predicates, bool* matched) const;
190
191
    Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) const;
192
193
    // get row ranges with zone map
194
    // - cond_column is user's query predicate
195
    // - delete_condition is a delete predicate of one version
196
    Status get_row_ranges_by_zone_map(
197
            const AndBlockColumnPredicate* col_predicates,
198
            const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
199
            RowRanges* row_ranges, const ColumnIteratorOptions& iter_opts);
200
201
    // get row ranges with bloom filter index
202
    Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
203
                                          RowRanges* row_ranges,
204
                                          const ColumnIteratorOptions& iter_opts);
205
206
402k
    PagePointer get_dict_page_pointer() const { return _meta_dict_page; }
207
208
20.4M
    bool is_empty() const { return _num_rows == 0; }
209
210
    Status prune_predicates_by_zone_map(std::vector<std::shared_ptr<ColumnPredicate>>& predicates,
211
                                        const int column_id, bool* pruned) const;
212
213
20.2M
    CompressionTypePB get_compression() const { return _meta_compression; }
214
215
1.77M
    uint64_t num_rows() const { return _num_rows; }
216
217
15.8k
    void set_dict_encoding_type(DictEncodingType type) {
218
15.9k
        static_cast<void>(_set_dict_encoding_type_once.call([&] {
219
15.9k
            _dict_encoding_type = type;
220
15.9k
            return Status::OK();
221
15.9k
        }));
222
15.8k
    }
223
224
11.9M
    DictEncodingType get_dict_encoding_type() { return _dict_encoding_type; }
225
226
19.3M
    void disable_index_meta_cache() { _use_index_page_cache = false; }
227
228
23.2k
    DataTypePtr get_vec_data_type() { return _data_type; }
229
230
39.8M
    virtual FieldType get_meta_type() { return _meta_type; }
231
232
    int64_t get_metadata_size() const override;
233
234
#ifdef BE_TEST
235
    void check_data_by_zone_map_for_test(const MutableColumnPtr& dst) const;
236
#endif
237
238
private:
239
    friend class VariantColumnReader;
240
    friend class FileColumnIterator;
241
    friend class SegmentPrefetcher;
242
243
    ColumnReader(const ColumnReaderOptions& opts, const ColumnMetaPB& meta, uint64_t num_rows,
244
                 io::FileReaderSPtr file_reader);
245
    Status init(const ColumnMetaPB* meta);
246
247
    [[nodiscard]] Status _load_zone_map_index(bool use_page_cache, bool kept_in_memory,
248
                                              const ColumnIteratorOptions& iter_opts);
249
    [[nodiscard]] Status _load_ordinal_index(bool use_page_cache, bool kept_in_memory,
250
                                             const ColumnIteratorOptions& iter_opts);
251
252
    [[nodiscard]] Status _load_index(const std::shared_ptr<IndexFileReader>& index_file_reader,
253
                                     const TabletIndex* index_meta);
254
    [[nodiscard]] Status _load_bloom_filter_index(bool use_page_cache, bool kept_in_memory,
255
                                                  const ColumnIteratorOptions& iter_opts);
256
257
    bool _zone_map_match_condition(const segment_v2::ZoneMap& zone_map,
258
                                   const AndBlockColumnPredicate* col_predicates) const;
259
260
    Status _get_filtered_pages(
261
            const AndBlockColumnPredicate* col_predicates,
262
            const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
263
            std::vector<uint32_t>* page_indexes, const ColumnIteratorOptions& iter_opts);
264
265
    Status _calculate_row_ranges(const std::vector<uint32_t>& page_indexes, RowRanges* row_ranges,
266
                                 const ColumnIteratorOptions& iter_opts);
267
268
    int64_t _meta_length;
269
    FieldType _meta_type;
270
    FieldType _meta_children_column_type;
271
    bool _meta_is_nullable;
272
    bool _use_index_page_cache;
273
    int _be_exec_version = -1;
274
275
    PagePointer _meta_dict_page;
276
    CompressionTypePB _meta_compression;
277
278
    ColumnReaderOptions _opts;
279
    uint64_t _num_rows;
280
281
    io::FileReaderSPtr _file_reader;
282
283
    DictEncodingType _dict_encoding_type;
284
285
    DataTypePtr _data_type;
286
287
    TypeInfoPtr _type_info =
288
            TypeInfoPtr(nullptr,
289
                        nullptr); // initialized in init(), may changed by subclasses.
290
    const EncodingInfo* _encoding_info =
291
            nullptr; // initialized in init(), used for create PageDecoder
292
293
    // meta for various column indexes (null if the index is absent)
294
    std::unique_ptr<ZoneMapPB> _segment_zone_map;
295
296
    mutable std::shared_mutex _load_index_lock;
297
    std::unique_ptr<ZoneMapIndexReader> _zone_map_index;
298
    std::unique_ptr<OrdinalIndexReader> _ordinal_index;
299
    std::shared_ptr<BloomFilterIndexReader> _bloom_filter_index;
300
301
    std::unordered_map<int64_t, IndexReaderPtr> _index_readers;
302
303
    std::vector<std::shared_ptr<ColumnReader>> _sub_readers;
304
305
    DorisCallOnce<Status> _set_dict_encoding_type_once;
306
};
307
308
// Base iterator to read one column data
309
class ColumnIterator {
310
public:
311
20.5M
    ColumnIterator() = default;
312
20.6M
    virtual ~ColumnIterator() = default;
313
314
40.5k
    virtual Status init(const ColumnIteratorOptions& opts) {
315
40.5k
        _opts = opts;
316
40.5k
        return Status::OK();
317
40.5k
    }
318
319
    // Seek to the given ordinal entry in the column.
320
    // Entry 0 is the first entry written to the column.
321
    // If provided seek point is past the end of the file,
322
    // then returns false.
323
    virtual Status seek_to_ordinal(ordinal_t ord) = 0;
324
325
2.03M
    Status next_batch(size_t* n, MutableColumnPtr& dst) {
326
2.03M
        bool has_null;
327
2.03M
        return next_batch(n, dst, &has_null);
328
2.03M
    }
329
330
0
    virtual Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) {
331
0
        return Status::NotSupported("next_batch not implement");
332
0
    }
333
334
0
    virtual Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) {
335
0
        return Status::NotSupported("next_batch_of_zone_map not implement");
336
0
    }
337
338
    virtual Status read_by_rowids(const rowid_t* rowids, const size_t count,
339
0
                                  MutableColumnPtr& dst) {
340
0
        return Status::NotSupported("read_by_rowids not implement");
341
0
    }
342
343
    virtual ordinal_t get_current_ordinal() const = 0;
344
345
    virtual Status get_row_ranges_by_zone_map(
346
            const AndBlockColumnPredicate* col_predicates,
347
            const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
348
9
            RowRanges* row_ranges) {
349
9
        return Status::OK();
350
9
    }
351
352
    virtual Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
353
10
                                                  RowRanges* row_ranges) {
354
10
        return Status::OK();
355
10
    }
356
357
    virtual Status get_row_ranges_by_dict(const AndBlockColumnPredicate* col_predicates,
358
9
                                          RowRanges* row_ranges) {
359
9
        return Status::OK();
360
9
    }
361
362
2
    virtual bool is_all_dict_encoding() const { return false; }
363
364
0
    virtual Status read_null_map(size_t* n, NullMap& null_map) {
365
0
        return Status::NotSupported("read_null_map not implemented");
366
0
    }
367
368
    virtual Status set_access_paths(const TColumnAccessPaths& all_access_paths,
369
16.9k
                                    const TColumnAccessPaths& predicate_access_paths) {
370
16.9k
        if (!predicate_access_paths.empty()) {
371
240
            _reading_flag = ReadingFlag::READING_FOR_PREDICATE;
372
240
        }
373
16.9k
        return Status::OK();
374
16.9k
    }
375
376
20.4M
    void set_column_name(const std::string& column_name) { _column_name = column_name; }
377
378
22.3k
    const std::string& column_name() const { return _column_name; }
379
380
    // Since there may be multiple paths with conflicts or overlaps,
381
    // we need to define several reading flags:
382
    //
383
    // NORMAL_READING — Default value, indicating that the column should be read.
384
    // SKIP_READING — The column should not be read.
385
    // NEED_TO_READ — The column must be read.
386
    // READING_FOR_PREDICATE — The column is required for predicate evaluation.
387
    //
388
    // For example, suppose there are two paths:
389
    // - Path 1 specifies that column A needs to be read, so it is marked as NEED_TO_READ.
390
    // - Path 2 specifies that the column should not be read, but since it is already marked as NEED_TO_READ,
391
    //   it should not be changed to SKIP_READING.
392
    enum class ReadingFlag : int {
393
        NORMAL_READING,
394
        SKIP_READING,
395
        NEED_TO_READ,
396
        READING_FOR_PREDICATE
397
    };
398
186k
    void set_reading_flag(ReadingFlag flag) {
399
186k
        if (static_cast<int>(flag) > static_cast<int>(_reading_flag)) {
400
141k
            _reading_flag = flag;
401
141k
        }
402
186k
    }
403
404
749k
    ReadingFlag reading_flag() const { return _reading_flag; }
405
406
79.5k
    virtual void set_need_to_read() { set_reading_flag(ReadingFlag::NEED_TO_READ); }
407
408
80.9k
    virtual void remove_pruned_sub_iterators() {};
409
410
46.2k
    virtual Status init_prefetcher(const SegmentPrefetchParams& params) { return Status::OK(); }
411
412
    virtual void collect_prefetchers(
413
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
414
46.1k
            PrefetcherInitMethod init_method) {}
415
416
protected:
417
    Result<TColumnAccessPaths> _get_sub_access_paths(const TColumnAccessPaths& access_paths);
418
    ColumnIteratorOptions _opts;
419
420
    ReadingFlag _reading_flag {ReadingFlag::NORMAL_READING};
421
    std::string _column_name;
422
};
423
424
// This iterator is used to read column data from file
425
// for scalar type
426
class FileColumnIterator final : public ColumnIterator {
427
public:
428
    explicit FileColumnIterator(std::shared_ptr<ColumnReader> reader);
429
    ~FileColumnIterator() override;
430
431
    Status init(const ColumnIteratorOptions& opts) override;
432
433
    Status seek_to_ordinal(ordinal_t ord) override;
434
435
    Status seek_to_page_start();
436
437
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
438
439
    Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) override;
440
441
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
442
                          MutableColumnPtr& dst) override;
443
444
2
    ordinal_t get_current_ordinal() const override { return _current_ordinal; }
445
446
    // get row ranges by zone map
447
    // - cond_column is user's query predicate
448
    // - delete_condition is delete predicate of one version
449
    Status get_row_ranges_by_zone_map(
450
            const AndBlockColumnPredicate* col_predicates,
451
            const std::vector<std::shared_ptr<const ColumnPredicate>>* delete_predicates,
452
            RowRanges* row_ranges) override;
453
454
    Status get_row_ranges_by_bloom_filter(const AndBlockColumnPredicate* col_predicates,
455
                                          RowRanges* row_ranges) override;
456
457
    Status get_row_ranges_by_dict(const AndBlockColumnPredicate* col_predicates,
458
                                  RowRanges* row_ranges) override;
459
460
1.42M
    ParsedPage* get_current_page() { return &_page; }
461
462
0
    bool is_nullable() { return _reader->is_nullable(); }
463
464
11.6k
    bool is_all_dict_encoding() const override { return _is_all_dict_encoding; }
465
466
    Status read_null_map(size_t* n, NullMap& null_map) override;
467
468
    Status init_prefetcher(const SegmentPrefetchParams& params) override;
469
    void collect_prefetchers(
470
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
471
            PrefetcherInitMethod init_method) override;
472
473
private:
474
    Status _seek_to_pos_in_page(ParsedPage* page, ordinal_t offset_in_page) const;
475
    Status _load_next_page(bool* eos);
476
    Status _read_data_page(const OrdinalPageIndexIterator& iter);
477
    Status _read_dict_data();
478
    void _trigger_prefetch_if_eligible(ordinal_t ord);
479
480
    std::shared_ptr<ColumnReader> _reader = nullptr;
481
482
    // iterator owned compress codec, should NOT be shared by threads, initialized in init()
483
    BlockCompressionCodec* _compress_codec = nullptr;
484
485
    // 1. The _page represents current page.
486
    // 2. We define an operation is one seek and following read,
487
    //    If new seek is issued, the _page will be reset.
488
    ParsedPage _page;
489
490
    // keep dict page decoder
491
    std::unique_ptr<PageDecoder> _dict_decoder;
492
493
    // keep dict page handle to avoid released
494
    PageHandle _dict_page_handle;
495
496
    // page iterator used to get next page when current page is finished.
497
    // This value will be reset when a new seek is issued
498
    OrdinalPageIndexIterator _page_iter;
499
500
    // current value ordinal
501
    ordinal_t _current_ordinal = 0;
502
503
    bool _is_all_dict_encoding = false;
504
505
    std::unique_ptr<StringRef[]> _dict_word_info;
506
507
    bool _enable_prefetch {false};
508
    std::unique_ptr<SegmentPrefetcher> _prefetcher;
509
    std::shared_ptr<io::CachedRemoteFileReader> _cached_remote_file_reader {nullptr};
510
};
511
512
class EmptyFileColumnIterator final : public ColumnIterator {
513
public:
514
22.5k
    Status seek_to_ordinal(ordinal_t ord) override { return Status::OK(); }
515
0
    ordinal_t get_current_ordinal() const override { return 0; }
516
};
517
518
// This iterator make offset operation write once for
519
class OffsetFileColumnIterator final : public ColumnIterator {
520
public:
521
98.8k
    explicit OffsetFileColumnIterator(FileColumnIteratorUPtr offset_reader) {
522
98.8k
        _offset_iterator = std::move(offset_reader);
523
98.8k
    }
524
525
99.3k
    ~OffsetFileColumnIterator() override = default;
526
527
    Status init(const ColumnIteratorOptions& opts) override;
528
529
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
530
0
    ordinal_t get_current_ordinal() const override {
531
0
        return _offset_iterator->get_current_ordinal();
532
0
    }
533
362k
    Status seek_to_ordinal(ordinal_t ord) override {
534
362k
        RETURN_IF_ERROR(_offset_iterator->seek_to_ordinal(ord));
535
362k
        return Status::OK();
536
362k
    }
537
538
    Status _peek_one_offset(ordinal_t* offset);
539
540
    Status _calculate_offsets(ssize_t start, ColumnArray::ColumnOffsets& column_offsets);
541
542
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
543
29.8k
                          MutableColumnPtr& dst) override {
544
29.8k
        return _offset_iterator->read_by_rowids(rowids, count, dst);
545
29.8k
    }
546
547
    Status init_prefetcher(const SegmentPrefetchParams& params) override;
548
    void collect_prefetchers(
549
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
550
            PrefetcherInitMethod init_method) override;
551
552
private:
553
    std::unique_ptr<FileColumnIterator> _offset_iterator;
554
    // reuse a tiny column for peek to avoid frequent allocations
555
    MutableColumnPtr _peek_tmp_col;
556
};
557
558
// This iterator is used to read map value column
559
class MapFileColumnIterator final : public ColumnIterator {
560
public:
561
    explicit MapFileColumnIterator(std::shared_ptr<ColumnReader> reader,
562
                                   ColumnIteratorUPtr null_iterator,
563
                                   OffsetFileColumnIteratorUPtr offsets_iterator,
564
                                   ColumnIteratorUPtr key_iterator,
565
                                   ColumnIteratorUPtr val_iterator);
566
567
32.4k
    ~MapFileColumnIterator() override = default;
568
569
    Status init(const ColumnIteratorOptions& opts) override;
570
571
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
572
573
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
574
                          MutableColumnPtr& dst) override;
575
576
    Status seek_to_ordinal(ordinal_t ord) override;
577
578
0
    ordinal_t get_current_ordinal() const override {
579
0
        return _offsets_iterator->get_current_ordinal();
580
0
    }
581
    Status init_prefetcher(const SegmentPrefetchParams& params) override;
582
    void collect_prefetchers(
583
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
584
            PrefetcherInitMethod init_method) override;
585
586
    Status set_access_paths(const TColumnAccessPaths& all_access_paths,
587
                            const TColumnAccessPaths& predicate_access_paths) override;
588
589
    void set_need_to_read() override;
590
591
    void remove_pruned_sub_iterators() override;
592
593
    Status read_null_map(size_t* n, NullMap& null_map) override;
594
595
private:
596
    std::shared_ptr<ColumnReader> _map_reader = nullptr;
597
    ColumnIteratorUPtr _null_iterator;
598
    OffsetFileColumnIteratorUPtr _offsets_iterator; //OffsetFileIterator
599
    ColumnIteratorUPtr _key_iterator;
600
    ColumnIteratorUPtr _val_iterator;
601
};
602
603
class StructFileColumnIterator final : public ColumnIterator {
604
public:
605
    explicit StructFileColumnIterator(std::shared_ptr<ColumnReader> reader,
606
                                      ColumnIteratorUPtr null_iterator,
607
                                      std::vector<ColumnIteratorUPtr>&& sub_column_iterators);
608
609
7.72k
    ~StructFileColumnIterator() override = default;
610
611
    Status init(const ColumnIteratorOptions& opts) override;
612
613
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
614
615
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
616
                          MutableColumnPtr& dst) override;
617
618
    Status seek_to_ordinal(ordinal_t ord) override;
619
620
0
    ordinal_t get_current_ordinal() const override {
621
0
        return _sub_column_iterators[0]->get_current_ordinal();
622
0
    }
623
624
    Status set_access_paths(const TColumnAccessPaths& all_access_paths,
625
                            const TColumnAccessPaths& predicate_access_paths) override;
626
627
    void set_need_to_read() override;
628
629
    void remove_pruned_sub_iterators() override;
630
631
    Status init_prefetcher(const SegmentPrefetchParams& params) override;
632
    void collect_prefetchers(
633
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
634
            PrefetcherInitMethod init_method) override;
635
636
    Status read_null_map(size_t* n, NullMap& null_map) override;
637
638
private:
639
    std::shared_ptr<ColumnReader> _struct_reader = nullptr;
640
    ColumnIteratorUPtr _null_iterator;
641
    std::vector<ColumnIteratorUPtr> _sub_column_iterators;
642
};
643
644
class ArrayFileColumnIterator final : public ColumnIterator {
645
public:
646
    explicit ArrayFileColumnIterator(std::shared_ptr<ColumnReader> reader,
647
                                     OffsetFileColumnIteratorUPtr offset_reader,
648
                                     ColumnIteratorUPtr item_iterator,
649
                                     ColumnIteratorUPtr null_iterator);
650
651
66.8k
    ~ArrayFileColumnIterator() override = default;
652
653
    Status init(const ColumnIteratorOptions& opts) override;
654
655
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
656
657
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
658
                          MutableColumnPtr& dst) override;
659
660
    Status seek_to_ordinal(ordinal_t ord) override;
661
662
0
    ordinal_t get_current_ordinal() const override {
663
0
        return _offset_iterator->get_current_ordinal();
664
0
    }
665
666
    Status set_access_paths(const TColumnAccessPaths& all_access_paths,
667
                            const TColumnAccessPaths& predicate_access_paths) override;
668
    void set_need_to_read() override;
669
670
    void remove_pruned_sub_iterators() override;
671
672
    Status init_prefetcher(const SegmentPrefetchParams& params) override;
673
    void collect_prefetchers(
674
            std::map<PrefetcherInitMethod, std::vector<SegmentPrefetcher*>>& prefetchers,
675
            PrefetcherInitMethod init_method) override;
676
677
    Status read_null_map(size_t* n, NullMap& null_map) override;
678
679
private:
680
    std::shared_ptr<ColumnReader> _array_reader = nullptr;
681
    std::unique_ptr<OffsetFileColumnIterator> _offset_iterator;
682
    std::unique_ptr<ColumnIterator> _null_iterator;
683
    std::unique_ptr<ColumnIterator> _item_iterator;
684
685
    Status _seek_by_offsets(ordinal_t ord);
686
};
687
688
class RowIdColumnIterator : public ColumnIterator {
689
public:
690
    RowIdColumnIterator() = delete;
691
    RowIdColumnIterator(int64_t tid, RowsetId rid, int32_t segid)
692
11
            : _tablet_id(tid), _rowset_id(rid), _segment_id(segid) {}
693
694
7
    Status seek_to_ordinal(ordinal_t ord_idx) override {
695
7
        _current_rowid = cast_set<uint32_t>(ord_idx);
696
7
        return Status::OK();
697
7
    }
698
699
0
    Status next_batch(size_t* n, MutableColumnPtr& dst) {
700
0
        bool has_null;
701
0
        return next_batch(n, dst, &has_null);
702
0
    }
703
704
7
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override {
705
28
        for (size_t i = 0; i < *n; ++i) {
706
21
            const auto row_id = cast_set<uint32_t>(_current_rowid + i);
707
21
            GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id);
708
21
            dst->insert_data(reinterpret_cast<const char*>(&location), sizeof(GlobalRowLoacation));
709
21
        }
710
7
        _current_rowid += *n;
711
7
        return Status::OK();
712
7
    }
713
714
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
715
4
                          MutableColumnPtr& dst) override {
716
8
        for (size_t i = 0; i < count; ++i) {
717
4
            rowid_t row_id = rowids[i];
718
4
            GlobalRowLoacation location(_tablet_id, _rowset_id, _segment_id, row_id);
719
4
            dst->insert_data(reinterpret_cast<const char*>(&location), sizeof(GlobalRowLoacation));
720
4
        }
721
4
        return Status::OK();
722
4
    }
723
724
0
    ordinal_t get_current_ordinal() const override { return _current_rowid; }
725
726
private:
727
    rowid_t _current_rowid = 0;
728
    int64_t _tablet_id = 0;
729
    RowsetId _rowset_id;
730
    int32_t _segment_id = 0;
731
};
732
733
// Add new RowIdColumnIteratorV2
734
class RowIdColumnIteratorV2 : public ColumnIterator {
735
public:
736
    RowIdColumnIteratorV2(uint8_t version, int64_t backend_id, uint32_t file_id)
737
7.49k
            : _version(version), _backend_id(backend_id), _file_id(file_id) {}
738
739
2.05k
    Status seek_to_ordinal(ordinal_t ord_idx) override {
740
2.05k
        _current_rowid = cast_set<uint32_t>(ord_idx);
741
2.05k
        return Status::OK();
742
2.05k
    }
743
744
15
    Status next_batch(size_t* n, MutableColumnPtr& dst) {
745
15
        bool has_null;
746
15
        return next_batch(n, dst, &has_null);
747
15
    }
748
749
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
750
751
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
752
                          MutableColumnPtr& dst) override;
753
754
0
    ordinal_t get_current_ordinal() const override { return _current_rowid; }
755
756
private:
757
    uint32_t _current_rowid = 0;
758
    uint8_t _version;
759
    int64_t _backend_id;
760
    uint32_t _file_id;
761
};
762
763
// This iterator is used to read default value column
764
class DefaultValueColumnIterator : public ColumnIterator {
765
public:
766
    DefaultValueColumnIterator(bool has_default_value, std::string default_value, bool is_nullable,
767
                               TypeInfoPtr type_info, int precision, int scale, int len)
768
10.3k
            : _has_default_value(has_default_value),
769
10.3k
              _default_value(std::move(default_value)),
770
10.3k
              _is_nullable(is_nullable),
771
10.3k
              _type_info(std::move(type_info)),
772
10.3k
              _precision(precision),
773
10.3k
              _scale(scale),
774
10.3k
              _len(len) {}
775
776
    Status init(const ColumnIteratorOptions& opts) override;
777
778
2.67k
    Status seek_to_ordinal(ordinal_t ord_idx) override {
779
2.67k
        _current_rowid = ord_idx;
780
2.67k
        return Status::OK();
781
2.67k
    }
782
783
0
    Status next_batch(size_t* n, MutableColumnPtr& dst) {
784
0
        bool has_null;
785
0
        return next_batch(n, dst, &has_null);
786
0
    }
787
788
    Status next_batch(size_t* n, MutableColumnPtr& dst, bool* has_null) override;
789
790
0
    Status next_batch_of_zone_map(size_t* n, MutableColumnPtr& dst) override {
791
0
        return next_batch(n, dst);
792
0
    }
793
794
    Status read_by_rowids(const rowid_t* rowids, const size_t count,
795
                          MutableColumnPtr& dst) override;
796
797
0
    ordinal_t get_current_ordinal() const override { return _current_rowid; }
798
799
private:
800
    void _insert_many_default(MutableColumnPtr& dst, size_t n);
801
802
    bool _has_default_value;
803
    std::string _default_value;
804
    bool _is_nullable;
805
    TypeInfoPtr _type_info;
806
    int _precision;
807
    int _scale;
808
    const int _len;
809
    Field _default_value_field;
810
811
    // current rowid
812
    ordinal_t _current_rowid = 0;
813
};
814
815
} // namespace segment_v2
816
#include "common/compile_check_end.h"
817
} // namespace doris