Coverage Report

Created: 2026-06-13 01:53

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
#include <stddef.h>
22
#include <stdint.h>
23
24
#include <list>
25
#include <memory>
26
#include <string>
27
#include <tuple>
28
#include <unordered_map>
29
#include <unordered_set>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "format/parquet/parquet_common.h"
34
#include "format/parquet/parquet_predicate.h"
35
#include "format/parquet/vparquet_column_reader.h"
36
#include "format/parquet/vparquet_group_reader.h"
37
#include "format/table/table_format_reader.h"
38
#include "format/table/table_schema_change_helper.h"
39
#include "io/file_factory.h"
40
#include "io/fs/file_meta_cache.h"
41
#include "io/fs/file_reader.h"
42
#include "io/fs/file_reader_writer_fwd.h"
43
#include "runtime/runtime_profile.h"
44
#include "storage/olap_scan_common.h"
45
#include "util/obj_lru_cache.h"
46
47
namespace cctz {
48
class time_zone;
49
} // namespace cctz
50
namespace doris {
51
class RowDescriptor;
52
class RuntimeState;
53
class SlotDescriptor;
54
class TFileRangeDesc;
55
class TFileScanRangeParams;
56
class TupleDescriptor;
57
58
namespace io {
59
class FileSystem;
60
struct IOContext;
61
} // namespace io
62
class Block;
63
class FileMetaData;
64
class PageIndex;
65
class ShardedKVCache;
66
class VExprContext;
67
struct RowLineageColumns;
68
} // namespace doris
69
70
namespace doris {
71
72
/// Parquet-specific initialization context.
73
/// Extends ReaderInitContext with predicate pushdown fields.
74
struct ParquetInitContext final : public ReaderInitContext {
75
    // Safe defaults for standalone readers (delete file readers, push handler)
76
    // that don't have conjuncts/predicates. Dereferenced by _do_init_reader.
77
    static inline const VExprContextSPtrs EMPTY_CONJUNCTS {};
78
    static inline phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>
79
            EMPTY_SLOT_PREDICATES {};
80
81
    const VExprContextSPtrs* conjuncts = &EMPTY_CONJUNCTS;
82
    phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>*
83
            slot_id_to_predicates = &EMPTY_SLOT_PREDICATES;
84
    const std::unordered_map<std::string, int>* colname_to_slot_id = nullptr;
85
    const VExprContextSPtrs* not_single_slot_filter_conjuncts = nullptr;
86
    const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts = nullptr;
87
    bool filter_groups = true;
88
};
89
90
class ParquetReader : public TableFormatReader {
91
    ENABLE_FACTORY_CREATOR(ParquetReader);
92
93
public:
94
    struct ReaderStatistics {
95
        int32_t filtered_row_groups = 0;
96
        int32_t filtered_row_groups_by_min_max = 0;
97
        int32_t filtered_row_groups_by_expr_zonemap = 0;
98
        int32_t filtered_row_groups_by_bloom_filter = 0;
99
        int32_t read_row_groups = 0;
100
        int64_t filtered_group_rows = 0;
101
        int64_t filtered_page_rows = 0;
102
        int64_t lazy_read_filtered_rows = 0;
103
        int64_t read_rows = 0;
104
        int64_t filtered_bytes = 0;
105
        int64_t column_read_time = 0;
106
        int64_t parse_meta_time = 0;
107
        int64_t parse_footer_time = 0;
108
        int64_t file_footer_read_calls = 0;
109
        int64_t file_footer_hit_cache = 0;
110
        int64_t file_reader_create_time = 0;
111
        int64_t open_file_num = 0;
112
        int64_t row_group_filter_time = 0;
113
        int64_t page_index_filter_time = 0;
114
        int64_t read_page_index_time = 0;
115
        int64_t parse_page_index_time = 0;
116
        int64_t predicate_filter_time = 0;
117
        int64_t dict_filter_rewrite_time = 0;
118
        int64_t bloom_filter_read_time = 0;
119
        int64_t expr_zonemap_unusable_evals = 0;
120
        int64_t in_zonemap_point_check_count = 0;
121
        int64_t in_zonemap_range_only_count = 0;
122
    };
123
124
    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
125
                  const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz,
126
                  io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* meta_cache = nullptr,
127
                  bool enable_lazy_mat = true);
128
129
    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
130
                  const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz,
131
                  std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* state,
132
                  FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true);
133
134
    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
135
                  io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* meta_cache = nullptr,
136
                  bool enable_lazy_mat = true);
137
138
    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
139
                  std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* state,
140
                  FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true);
141
142
    ~ParquetReader() override;
143
#ifdef BE_TEST
144
    // for unit test
145
    void set_file_reader(io::FileReaderSPtr file_reader);
146
    void set_conjuncts_for_test(const VExprContextSPtrs& conjuncts) {
147
        _lazy_read_ctx.conjuncts = conjuncts;
148
    }
149
#endif
150
151
    // Override to build table_info_node from Parquet file metadata using by_parquet_name.
152
    // Subclasses (HiveParquetReader, etc.) call GenericReader::on_before_init_reader directly,
153
    // so this override only applies to plain ParquetReader (TVF, load).
154
    Status on_before_init_reader(ReaderInitContext* ctx) override;
155
156
    void set_batch_size(size_t batch_size) override;
157
158
    Status close() override;
159
160
    // set the delete rows in current parquet file
161
2
    void set_delete_rows(const std::vector<int64_t>* delete_rows) { _delete_rows = delete_rows; }
162
163
0
    int64_t size() const { return _file_reader->size(); }
164
165
    Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override;
166
167
    Status init_schema_reader() override;
168
169
    Status get_parsed_schema(std::vector<std::string>* col_names,
170
                             std::vector<DataTypePtr>* col_types) override;
171
172
0
    ReaderStatistics& reader_statistics() { return _reader_statistics; }
173
174
2
    const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
175
176
    Status get_file_metadata_schema(const FieldDescriptor** ptr);
177
178
    void set_create_row_id_column_iterator_func(
179
14
            std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()> create_func) {
180
14
        _create_topn_row_id_column_iterator = create_func;
181
14
    }
182
183
    /// Access current batch row positions (delegates to RowGroupReader).
184
    /// Used by IcebergReaderMixin to build $row_id column.
185
5
    const std::vector<segment_v2::rowid_t>& current_batch_row_positions() const {
186
5
        return _current_group_reader->current_batch_row_positions();
187
5
    }
188
189
    Status fill_topn_row_id(
190
            std::shared_ptr<segment_v2::RowIdColumnIteratorV2> _row_id_column_iterator,
191
5
            std::string col_name, Block* block, size_t rows) {
192
5
        int col_pos = block->get_position_by_name(col_name);
193
5
        DCHECK(col_pos >= 0);
194
5
        if (col_pos < 0) {
195
0
            return Status::InternalError("Column {} not found in block", col_name);
196
0
        }
197
5
        auto column_guard = block->mutate_column_scoped(col_pos);
198
5
        auto& col = column_guard.mutable_column();
199
5
        const auto& row_ids = this->current_batch_row_positions();
200
5
        RETURN_IF_ERROR(
201
5
                _row_id_column_iterator->read_by_rowids(row_ids.data(), row_ids.size(), col));
202
203
5
        return Status::OK();
204
5
    }
205
206
14
    bool count_read_rows() override { return true; }
207
208
    void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) override;
209
210
0
    bool supports_count_pushdown() const override { return true; }
211
212
    int64_t get_total_rows() const override;
213
214
4
    bool has_delete_operations() const override {
215
4
        return _delete_rows != nullptr && !_delete_rows->empty();
216
4
    }
217
218
    /// Disable row-group range filtering (needed when reading delete files
219
    /// whose TFileRangeDesc has size=-1).
220
0
    void set_filter_groups(bool v) { _filter_groups = v; }
221
222
protected:
223
    // ---- Unified init_reader(ReaderInitContext*) overrides ----
224
    Status _open_file_reader(ReaderInitContext* ctx) override;
225
    Status _do_init_reader(ReaderInitContext* ctx) override;
226
227
    void _collect_profile_before_close() override;
228
229
    // Core block reading implementation
230
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override;
231
232
    // Parquet fills partition/missing columns per-batch internally via RowGroupReader,
233
    // so suppress TableFormatReader's default on_after_read_block fill.
234
87
    Status on_after_read_block(Block* /*block*/, size_t* /*read_rows*/) override {
235
87
        return Status::OK();
236
87
    }
237
238
    // Protected accessors so CRTP mixin subclasses can reach private members
239
0
    io::IOContext* get_io_ctx() const { return _io_ctx; }
240
0
    std::unordered_map<std::string, uint32_t>*& col_name_to_block_idx_ref() {
241
0
        return _col_name_to_block_idx;
242
0
    }
243
42
    RuntimeProfile* get_profile() const { return _profile; }
244
28
    RuntimeState* get_state() const { return _state; }
245
1
    const TFileScanRangeParams& get_scan_params() const { return _scan_params; }
246
1
    const TFileRangeDesc& get_scan_range() const { return _scan_range; }
247
0
    const TupleDescriptor* get_tuple_descriptor() const { return _tuple_descriptor; }
248
0
    const RowDescriptor* get_row_descriptor() const { return _row_descriptor; }
249
0
    const FileMetaData* get_file_metadata() const { return _file_metadata; }
250
251
private:
252
    struct ParquetProfile {
253
        RuntimeProfile::Counter* filtered_row_groups = nullptr;
254
        RuntimeProfile::Counter* filtered_row_groups_by_min_max = nullptr;
255
        RuntimeProfile::Counter* filtered_row_groups_by_expr_zonemap = nullptr;
256
        RuntimeProfile::Counter* filtered_row_groups_by_bloom_filter = nullptr;
257
        RuntimeProfile::Counter* to_read_row_groups = nullptr;
258
        RuntimeProfile::Counter* total_row_groups = nullptr;
259
        RuntimeProfile::Counter* filtered_group_rows = nullptr;
260
        RuntimeProfile::Counter* filtered_page_rows = nullptr;
261
        RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
262
        RuntimeProfile::Counter* filtered_bytes = nullptr;
263
        RuntimeProfile::Counter* raw_rows_read = nullptr;
264
        RuntimeProfile::Counter* column_read_time = nullptr;
265
        RuntimeProfile::Counter* parse_meta_time = nullptr;
266
        RuntimeProfile::Counter* parse_footer_time = nullptr;
267
        RuntimeProfile::Counter* file_reader_create_time = nullptr;
268
        RuntimeProfile::Counter* open_file_num = nullptr;
269
        RuntimeProfile::Counter* row_group_filter_time = nullptr;
270
        RuntimeProfile::Counter* page_index_read_calls = nullptr;
271
        RuntimeProfile::Counter* page_index_filter_time = nullptr;
272
        RuntimeProfile::Counter* read_page_index_time = nullptr;
273
        RuntimeProfile::Counter* parse_page_index_time = nullptr;
274
        RuntimeProfile::Counter* file_footer_read_calls = nullptr;
275
        RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
276
        RuntimeProfile::Counter* decompress_time = nullptr;
277
        RuntimeProfile::Counter* decompress_cnt = nullptr;
278
        RuntimeProfile::Counter* page_read_counter = nullptr;
279
        RuntimeProfile::Counter* page_cache_write_counter = nullptr;
280
        RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr;
281
        RuntimeProfile::Counter* page_cache_decompressed_write_counter = nullptr;
282
        RuntimeProfile::Counter* page_cache_hit_counter = nullptr;
283
        RuntimeProfile::Counter* page_cache_missing_counter = nullptr;
284
        RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr;
285
        RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr;
286
        RuntimeProfile::Counter* decode_header_time = nullptr;
287
        RuntimeProfile::Counter* read_page_header_time = nullptr;
288
        RuntimeProfile::Counter* decode_value_time = nullptr;
289
        RuntimeProfile::Counter* decode_dict_time = nullptr;
290
        RuntimeProfile::Counter* decode_level_time = nullptr;
291
        RuntimeProfile::Counter* decode_null_map_time = nullptr;
292
        RuntimeProfile::Counter* skip_page_header_num = nullptr;
293
        RuntimeProfile::Counter* parse_page_header_num = nullptr;
294
        RuntimeProfile::Counter* predicate_filter_time = nullptr;
295
        RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
296
        RuntimeProfile::Counter* convert_time = nullptr;
297
        RuntimeProfile::Counter* bloom_filter_read_time = nullptr;
298
        RuntimeProfile::Counter* expr_zonemap_unusable = nullptr;
299
        RuntimeProfile::Counter* in_zonemap_point_check = nullptr;
300
        RuntimeProfile::Counter* in_zonemap_range_only = nullptr;
301
    };
302
303
    // ---- set_fill_columns sub-functions ----
304
    void _collect_predicate_columns_from_conjuncts(
305
            std::unordered_map<std::string, std::pair<uint32_t, int>>& predicate_columns);
306
    void _classify_columns_for_lazy_read(
307
            const std::unordered_map<std::string, std::pair<uint32_t, int>>& predicate_columns,
308
            const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
309
                    partition_columns,
310
            const std::unordered_map<std::string, VExprContextSPtr>& missing_columns);
311
312
    Status _open_file();
313
    void _init_profile();
314
    void _close_internal();
315
    Status _next_row_group_reader();
316
    RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
317
            const tparquet::RowGroup& row_group,
318
            const RowGroupReader::RowGroupIndex& row_group_index);
319
    void _init_system_properties();
320
    void _init_file_description();
321
322
    // At the beginning of reading next row group, index should be loaded and used to filter data efficiently.
323
    Status _process_page_index_filter(
324
            const tparquet::RowGroup& row_group,
325
            const RowGroupReader::RowGroupIndex& row_group_index,
326
            const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred,
327
            RowRanges* candidate_row_ranges);
328
    Status _process_expr_zonemap_page_filter(
329
            ParquetPredicate::CachedPageIndexStat* cached_page_index,
330
            RowRanges* candidate_row_ranges, bool* filtered_row_group_by_expr_zonemap);
331
    bool _expr_zonemap_page_slot_index(const VExprContextSPtr& conjunct, int* cid) const;
332
    bool _has_expr_zonemap_page_filter() const;
333
334
    // check this range contain this row group.
335
    bool _is_misaligned_range_group(const tparquet::RowGroup& row_group) const;
336
337
    // Row Group min-max Filter
338
    Status _process_column_stat_filter(
339
            const tparquet::RowGroup& row_group,
340
            const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred,
341
            bool* filter_group, bool* filtered_by_min_max, bool* filtered_by_bloom_filter);
342
    Status _process_expr_zonemap_filter(const tparquet::RowGroup& row_group, bool* filter_group);
343
344
    /*
345
     * 1. row group min-max filter
346
     * 2. row group bloom filter
347
     * 3. page index min-max filter
348
     *
349
     * return Status && row_ranges (lines to be read)
350
     */
351
    Status _process_min_max_bloom_filter(
352
            const RowGroupReader::RowGroupIndex& row_group_index,
353
            const tparquet::RowGroup& row_group,
354
            const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred,
355
            RowRanges* row_ranges);
356
357
    int64_t _get_column_start_offset(
358
            const tparquet::ColumnMetaData& column_init_column_readers) const;
359
0
    std::string _meta_cache_key(const std::string& path) { return "meta_" + path; }
360
    std::vector<io::PrefetchRange> _generate_random_access_ranges(
361
            const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
362
    void _collect_profile();
363
364
19
    Status _set_read_one_line_impl() override { return Status::OK(); }
365
366
    bool _exists_in_file(const std::string& expr_name) const;
367
    bool _type_matches(const int cid) const;
368
    void _init_read_columns(const std::vector<std::string>& column_names);
369
370
    io::FileSystemProperties _system_properties;
371
    io::FileDescription _file_description;
372
373
    // the following fields are for parquet meta data cache.
374
    // if _meta_cache is not null, the _file_metadata will be got from _meta_cache,
375
    // and it is owned by _meta_cache_handle.
376
    // if _meta_cache is null, _file_metadata will be managed by _file_metadata_ptr,
377
    // which will be released when deconstructing.
378
    // ATTN: these fields must be before _file_reader, to make sure they will be released
379
    // after _file_reader. Otherwise, there may be heap-use-after-free bug.
380
    ObjLRUCache::CacheHandle _meta_cache_handle;
381
    std::unique_ptr<FileMetaData> _file_metadata_ptr;
382
    const tparquet::FileMetaData* _t_metadata = nullptr;
383
384
    // _tracing_file_reader wraps _file_reader.
385
    // _file_reader is original file reader.
386
    // _tracing_file_reader is tracing file reader with io context.
387
    // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
388
    io::FileReaderSPtr _file_reader = nullptr;
389
    io::FileReaderSPtr _tracing_file_reader = nullptr;
390
    std::unique_ptr<RowGroupReader> _current_group_reader;
391
392
    RowGroupReader::RowGroupIndex _current_row_group_index {-1, 0, 0};
393
    // read to the end of current reader
394
    bool _row_group_eof = true;
395
    size_t _total_groups = 0; // num of groups(stripes) of a parquet(orc) file
396
397
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
398
399
    // Through this node, you can find the file column based on the table column.
400
    std::shared_ptr<TableSchemaChangeHelper::Node> _table_info_node_ptr =
401
            TableSchemaChangeHelper::ConstNode::get_instance();
402
403
    //sequence in file, need to read
404
    std::vector<std::string> _read_table_columns;
405
    std::vector<std::string> _read_file_columns;
406
    // The set of file columns to be read; only columns within this set will be filtered using the min-max predicate.
407
    std::set<std::string> _read_table_columns_set;
408
    // Deleted rows will be marked by Iceberg/Paimon. So we should filter deleted rows when reading it.
409
    const std::vector<int64_t>* _delete_rows = nullptr;
410
    int64_t _delete_rows_index = 0;
411
412
    // parquet file reader object
413
    RuntimeProfile* _profile = nullptr;
414
    const TFileScanRangeParams& _scan_params;
415
    const TFileRangeDesc& _scan_range;
416
    size_t _batch_size;
417
    // Bytes-per-row estimate from the previous batch, used to pre-shrink _batch_size
418
    // before reading so that oversized blocks are prevented from the current call onward.
419
    // Zero means no prior data (first batch).
420
    size_t _load_bytes_per_row = 0;
421
    int64_t _range_start_offset;
422
    int64_t _range_size;
423
    const cctz::time_zone* _ctz = nullptr;
424
425
    std::unordered_map<int, tparquet::OffsetIndex> _col_offsets;
426
427
    ReaderStatistics _reader_statistics;
428
    ParquetColumnReader::ColumnStatistics _column_statistics;
429
    ParquetProfile _parquet_profile;
430
    bool _closed = false;
431
    io::IOContext* _io_ctx = nullptr;
432
    std::shared_ptr<io::IOContext> _io_ctx_holder;
433
    RuntimeState* _state = nullptr;
434
    const TupleDescriptor* _tuple_descriptor = nullptr;
435
    const RowDescriptor* _row_descriptor = nullptr;
436
    const FileMetaData* _file_metadata = nullptr;
437
    // Pointer to external column name to block index mapping (from FileScanner)
438
    std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
439
    bool _enable_lazy_mat = true;
440
    bool _enable_filter_by_min_max = true;
441
    bool _enable_filter_by_bloom_filter = true;
442
    const std::unordered_map<std::string, int>* _colname_to_slot_id = nullptr;
443
    const VExprContextSPtrs* _not_single_slot_filter_conjuncts = nullptr;
444
    const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
445
    std::unordered_map<tparquet::Type::type, bool> _ignored_stats;
446
0
    size_t get_batch_size() const override { return _batch_size; }
447
448
protected:
449
    // Used for column lazy read. Protected so Iceberg/Paimon subclasses can
450
    // register synthesized columns in on_before_init_reader.
451
    RowGroupReader::LazyReadContext _lazy_read_ctx;
452
    bool _filter_groups = true;
453
454
    std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
455
            _create_topn_row_id_column_iterator;
456
457
private:
458
    std::set<uint64_t> _column_ids;
459
    std::set<uint64_t> _filter_column_ids;
460
461
    std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
462
    Arena _arena;
463
};
464
465
} // namespace doris