Coverage Report

Created: 2026-04-10 04:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
#include <stddef.h>
22
#include <stdint.h>
23
24
#include <list>
25
#include <memory>
26
#include <string>
27
#include <tuple>
28
#include <unordered_map>
29
#include <unordered_set>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "format/generic_reader.h"
34
#include "format/parquet/parquet_common.h"
35
#include "format/parquet/parquet_predicate.h"
36
#include "format/parquet/vparquet_column_reader.h"
37
#include "format/parquet/vparquet_group_reader.h"
38
#include "format/table/table_format_reader.h"
39
#include "io/file_factory.h"
40
#include "io/fs/file_meta_cache.h"
41
#include "io/fs/file_reader.h"
42
#include "io/fs/file_reader_writer_fwd.h"
43
#include "runtime/runtime_profile.h"
44
#include "storage/olap_scan_common.h"
45
#include "util/obj_lru_cache.h"
46
47
namespace cctz {
48
class time_zone;
49
} // namespace cctz
50
namespace doris {
51
class RowDescriptor;
52
class RuntimeState;
53
class SlotDescriptor;
54
class TFileRangeDesc;
55
class TFileScanRangeParams;
56
class TupleDescriptor;
57
58
namespace io {
59
class FileSystem;
60
struct IOContext;
61
} // namespace io
62
class Block;
63
class FileMetaData;
64
class PageIndex;
65
class ShardedKVCache;
66
class VExprContext;
67
struct RowLineageColumns;
68
} // namespace doris
69
70
namespace doris {
71
class ParquetReader : public GenericReader {
72
    ENABLE_FACTORY_CREATOR(ParquetReader);
73
74
public:
75
    struct ReaderStatistics {
76
        int32_t filtered_row_groups = 0;
77
        int32_t filtered_row_groups_by_min_max = 0;
78
        int32_t filtered_row_groups_by_bloom_filter = 0;
79
        int32_t read_row_groups = 0;
80
        int64_t filtered_group_rows = 0;
81
        int64_t filtered_page_rows = 0;
82
        int64_t lazy_read_filtered_rows = 0;
83
        int64_t read_rows = 0;
84
        int64_t filtered_bytes = 0;
85
        int64_t column_read_time = 0;
86
        int64_t parse_meta_time = 0;
87
        int64_t parse_footer_time = 0;
88
        int64_t file_footer_read_calls = 0;
89
        int64_t file_footer_hit_cache = 0;
90
        int64_t file_reader_create_time = 0;
91
        int64_t open_file_num = 0;
92
        int64_t row_group_filter_time = 0;
93
        int64_t page_index_filter_time = 0;
94
        int64_t read_page_index_time = 0;
95
        int64_t parse_page_index_time = 0;
96
        int64_t predicate_filter_time = 0;
97
        int64_t dict_filter_rewrite_time = 0;
98
        int64_t bloom_filter_read_time = 0;
99
    };
100
101
    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
102
                  const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz,
103
                  io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* meta_cache = nullptr,
104
                  bool enable_lazy_mat = true);
105
106
    ParquetReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
107
                  const TFileRangeDesc& range, size_t batch_size, const cctz::time_zone* ctz,
108
                  std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* state,
109
                  FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true);
110
111
    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
112
                  io::IOContext* io_ctx, RuntimeState* state, FileMetaCache* meta_cache = nullptr,
113
                  bool enable_lazy_mat = true);
114
115
    ParquetReader(const TFileScanRangeParams& params, const TFileRangeDesc& range,
116
                  std::shared_ptr<io::IOContext> io_ctx_holder, RuntimeState* state,
117
                  FileMetaCache* meta_cache = nullptr, bool enable_lazy_mat = true);
118
119
    ~ParquetReader() override;
120
#ifdef BE_TEST
121
    // for unit test
122
    void set_file_reader(io::FileReaderSPtr file_reader);
123
#endif
124
125
    Status init_reader(
126
            const std::vector<std::string>& all_column_names,
127
            std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
128
            const VExprContextSPtrs& conjuncts,
129
            phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>&
130
                    slot_id_to_predicates,
131
            const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
132
            const std::unordered_map<std::string, int>* colname_to_slot_id,
133
            const VExprContextSPtrs* not_single_slot_filter_conjuncts,
134
            const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts,
135
            std::shared_ptr<TableSchemaChangeHelper::Node> table_info_node_ptr =
136
                    TableSchemaChangeHelper::ConstNode::get_instance(),
137
            bool filter_groups = true, const std::set<uint64_t>& column_ids = {},
138
            const std::set<uint64_t>& filter_column_ids = {});
139
140
    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
141
142
    Status close() override;
143
144
    // set the delete rows in current parquet file
145
2
    void set_delete_rows(const std::vector<int64_t>* delete_rows) { _delete_rows = delete_rows; }
146
147
0
    int64_t size() const { return _file_reader->size(); }
148
149
    Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
150
                       std::unordered_set<std::string>* missing_cols) override;
151
152
    Status init_schema_reader() override;
153
154
    Status get_parsed_schema(std::vector<std::string>* col_names,
155
                             std::vector<DataTypePtr>* col_types) override;
156
157
0
    ReaderStatistics& reader_statistics() { return _reader_statistics; }
158
159
2
    const tparquet::FileMetaData* get_meta_data() const { return _t_metadata; }
160
161
    // Partition columns will not be materialized in parquet files. So we should fill it with missing columns.
162
    Status set_fill_columns(
163
            const std::unordered_map<std::string, std::tuple<std::string, const SlotDescriptor*>>&
164
                    partition_columns,
165
            const std::unordered_map<std::string, VExprContextSPtr>& missing_columns) override;
166
167
    Status get_file_metadata_schema(const FieldDescriptor** ptr);
168
169
    void set_row_id_column_iterator(
170
13
            std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> iterator_pair) {
171
13
        _row_id_column_iterator_pair = iterator_pair;
172
13
    }
173
174
    void set_iceberg_rowid_params(const std::string& file_path, int32_t partition_spec_id,
175
                                  const std::string& partition_data_json, int row_id_column_pos);
176
177
0
    void set_row_lineage_columns(std::shared_ptr<RowLineageColumns> row_lineage_columns) {
178
0
        _row_lineage_columns = std::move(row_lineage_columns);
179
0
    }
180
181
155
    bool count_read_rows() override { return true; }
182
183
    void set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) override;
184
185
    int64_t get_total_rows() const override;
186
187
5
    bool has_delete_operations() const override {
188
5
        return _delete_rows != nullptr && !_delete_rows->empty();
189
5
    }
190
191
protected:
192
    void _collect_profile_before_close() override;
193
194
private:
195
    struct ParquetProfile {
196
        RuntimeProfile::Counter* filtered_row_groups = nullptr;
197
        RuntimeProfile::Counter* filtered_row_groups_by_min_max = nullptr;
198
        RuntimeProfile::Counter* filtered_row_groups_by_bloom_filter = nullptr;
199
        RuntimeProfile::Counter* to_read_row_groups = nullptr;
200
        RuntimeProfile::Counter* total_row_groups = nullptr;
201
        RuntimeProfile::Counter* filtered_group_rows = nullptr;
202
        RuntimeProfile::Counter* filtered_page_rows = nullptr;
203
        RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
204
        RuntimeProfile::Counter* filtered_bytes = nullptr;
205
        RuntimeProfile::Counter* raw_rows_read = nullptr;
206
        RuntimeProfile::Counter* column_read_time = nullptr;
207
        RuntimeProfile::Counter* parse_meta_time = nullptr;
208
        RuntimeProfile::Counter* parse_footer_time = nullptr;
209
        RuntimeProfile::Counter* file_reader_create_time = nullptr;
210
        RuntimeProfile::Counter* open_file_num = nullptr;
211
        RuntimeProfile::Counter* row_group_filter_time = nullptr;
212
        RuntimeProfile::Counter* page_index_read_calls = nullptr;
213
        RuntimeProfile::Counter* page_index_filter_time = nullptr;
214
        RuntimeProfile::Counter* read_page_index_time = nullptr;
215
        RuntimeProfile::Counter* parse_page_index_time = nullptr;
216
        RuntimeProfile::Counter* file_footer_read_calls = nullptr;
217
        RuntimeProfile::Counter* file_footer_hit_cache = nullptr;
218
        RuntimeProfile::Counter* decompress_time = nullptr;
219
        RuntimeProfile::Counter* decompress_cnt = nullptr;
220
        RuntimeProfile::Counter* page_read_counter = nullptr;
221
        RuntimeProfile::Counter* page_cache_write_counter = nullptr;
222
        RuntimeProfile::Counter* page_cache_compressed_write_counter = nullptr;
223
        RuntimeProfile::Counter* page_cache_decompressed_write_counter = nullptr;
224
        RuntimeProfile::Counter* page_cache_hit_counter = nullptr;
225
        RuntimeProfile::Counter* page_cache_missing_counter = nullptr;
226
        RuntimeProfile::Counter* page_cache_compressed_hit_counter = nullptr;
227
        RuntimeProfile::Counter* page_cache_decompressed_hit_counter = nullptr;
228
        RuntimeProfile::Counter* decode_header_time = nullptr;
229
        RuntimeProfile::Counter* read_page_header_time = nullptr;
230
        RuntimeProfile::Counter* decode_value_time = nullptr;
231
        RuntimeProfile::Counter* decode_dict_time = nullptr;
232
        RuntimeProfile::Counter* decode_level_time = nullptr;
233
        RuntimeProfile::Counter* decode_null_map_time = nullptr;
234
        RuntimeProfile::Counter* skip_page_header_num = nullptr;
235
        RuntimeProfile::Counter* parse_page_header_num = nullptr;
236
        RuntimeProfile::Counter* predicate_filter_time = nullptr;
237
        RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
238
        RuntimeProfile::Counter* bloom_filter_read_time = nullptr;
239
    };
240
241
    Status _open_file();
242
    void _init_profile();
243
    void _close_internal();
244
    Status _next_row_group_reader();
245
    RowGroupReader::PositionDeleteContext _get_position_delete_ctx(
246
            const tparquet::RowGroup& row_group,
247
            const RowGroupReader::RowGroupIndex& row_group_index);
248
    void _init_system_properties();
249
    void _init_file_description();
250
251
    // At the beginning of reading next row group, index should be loaded and used to filter data efficiently.
252
    Status _process_page_index_filter(
253
            const tparquet::RowGroup& row_group,
254
            const RowGroupReader::RowGroupIndex& row_group_index,
255
            const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred,
256
            RowRanges* candidate_row_ranges);
257
258
    // check this range contain this row group.
259
    bool _is_misaligned_range_group(const tparquet::RowGroup& row_group) const;
260
261
    // Row Group min-max Filter
262
    Status _process_column_stat_filter(
263
            const tparquet::RowGroup& row_group,
264
            const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred,
265
            bool* filter_group, bool* filtered_by_min_max, bool* filtered_by_bloom_filter);
266
267
    /*
268
     * 1. row group min-max filter
269
     * 2. row group bloom filter
270
     * 3. page index min-max filter
271
     *
272
     * return Status && row_ranges (lines to be read)
273
     */
274
    Status _process_min_max_bloom_filter(
275
            const RowGroupReader::RowGroupIndex& row_group_index,
276
            const tparquet::RowGroup& row_group,
277
            const std::vector<std::unique_ptr<MutilColumnBlockPredicate>>& push_down_pred,
278
            RowRanges* row_ranges);
279
280
    int64_t _get_column_start_offset(
281
            const tparquet::ColumnMetaData& column_init_column_readers) const;
282
0
    std::string _meta_cache_key(const std::string& path) { return "meta_" + path; }
283
    std::vector<io::PrefetchRange> _generate_random_access_ranges(
284
            const RowGroupReader::RowGroupIndex& group, size_t* avg_io_size);
285
    void _collect_profile();
286
287
27
    Status _set_read_one_line_impl() override { return Status::OK(); }
288
289
    bool _exists_in_file(const std::string& expr_name) const;
290
    bool _type_matches(const int cid) const;
291
292
    RuntimeProfile* _profile = nullptr;
293
    const TFileScanRangeParams& _scan_params;
294
    const TFileRangeDesc& _scan_range;
295
    io::FileSystemProperties _system_properties;
296
    io::FileDescription _file_description;
297
298
    // the following fields are for parquet meta data cache.
299
    // if _meta_cache is not null, the _file_metadata will be got from _meta_cache,
300
    // and it is owned by _meta_cache_handle.
301
    // if _meta_cache is null, _file_metadata will be managed by _file_metadata_ptr,
302
    // which will be released when deconstructing.
303
    // ATTN: these fields must be before _file_reader, to make sure they will be released
304
    // after _file_reader. Otherwise, there may be heap-use-after-free bug.
305
    ObjLRUCache::CacheHandle _meta_cache_handle;
306
    std::unique_ptr<FileMetaData> _file_metadata_ptr;
307
    const FileMetaData* _file_metadata = nullptr;
308
    const tparquet::FileMetaData* _t_metadata = nullptr;
309
310
    // _tracing_file_reader wraps _file_reader.
311
    // _file_reader is original file reader.
312
    // _tracing_file_reader is tracing file reader with io context.
313
    // If io_ctx is null, _tracing_file_reader will be the same as file_reader.
314
    io::FileReaderSPtr _file_reader = nullptr;
315
    io::FileReaderSPtr _tracing_file_reader = nullptr;
316
    std::unique_ptr<RowGroupReader> _current_group_reader;
317
318
    RowGroupReader::RowGroupIndex _current_row_group_index {-1, 0, 0};
319
    // read to the end of current reader
320
    bool _row_group_eof = true;
321
    size_t _total_groups; // num of groups(stripes) of a parquet(orc) file
322
323
    std::shared_ptr<ConditionCacheContext> _condition_cache_ctx;
324
325
    // Through this node, you can find the file column based on the table column.
326
    std::shared_ptr<TableSchemaChangeHelper::Node> _table_info_node_ptr =
327
            TableSchemaChangeHelper::ConstNode::get_instance();
328
329
    //sequence in file, need to read
330
    std::vector<std::string> _read_table_columns;
331
    std::vector<std::string> _read_file_columns;
332
    // The set of file columns to be read; only columns within this set will be filtered using the min-max predicate.
333
    std::set<std::string> _read_table_columns_set;
334
    // Deleted rows will be marked by Iceberg/Paimon. So we should filter deleted rows when reading it.
335
    const std::vector<int64_t>* _delete_rows = nullptr;
336
    int64_t _delete_rows_index = 0;
337
338
    // Used for column lazy read.
339
    RowGroupReader::LazyReadContext _lazy_read_ctx;
340
341
    // parquet file reader object
342
    size_t _batch_size;
343
    // Bytes-per-row estimate from the previous batch, used to pre-shrink _batch_size
344
    // before reading so that oversized blocks are prevented from the current call onward.
345
    // Zero means no prior data (first batch).
346
    size_t _load_bytes_per_row = 0;
347
    int64_t _range_start_offset;
348
    int64_t _range_size;
349
    const cctz::time_zone* _ctz = nullptr;
350
351
    std::unordered_map<int, tparquet::OffsetIndex> _col_offsets;
352
353
    std::vector<std::string> _missing_cols;
354
    // _table_column_names = _missing_cols + _read_table_columns
355
    const std::vector<std::string>* _table_column_names = nullptr;
356
357
    ReaderStatistics _reader_statistics;
358
    ParquetColumnReader::ColumnStatistics _column_statistics;
359
    ParquetProfile _parquet_profile;
360
    bool _closed = false;
361
    io::IOContext* _io_ctx = nullptr;
362
    std::shared_ptr<io::IOContext> _io_ctx_holder;
363
    RuntimeState* _state = nullptr;
364
    bool _enable_lazy_mat = true;
365
    bool _enable_filter_by_min_max = true;
366
    bool _enable_filter_by_bloom_filter = true;
367
    const TupleDescriptor* _tuple_descriptor = nullptr;
368
    const RowDescriptor* _row_descriptor = nullptr;
369
    const std::unordered_map<std::string, int>* _colname_to_slot_id = nullptr;
370
    const VExprContextSPtrs* _not_single_slot_filter_conjuncts = nullptr;
371
    const std::unordered_map<int, VExprContextSPtrs>* _slot_id_to_filter_conjuncts = nullptr;
372
    std::unordered_map<tparquet::Type::type, bool> _ignored_stats;
373
374
    std::pair<std::shared_ptr<RowIdColumnIteratorV2>, int> _row_id_column_iterator_pair = {nullptr,
375
                                                                                           -1};
376
    std::shared_ptr<RowLineageColumns> _row_lineage_columns;
377
378
protected:
379
    bool _filter_groups = true;
380
    RowGroupReader::IcebergRowIdParams _iceberg_rowid_params;
381
382
    std::set<uint64_t> _column_ids;
383
    std::set<uint64_t> _filter_column_ids;
384
385
    std::unordered_map<std::string, uint32_t>* _col_name_to_block_idx = nullptr;
386
387
    std::vector<std::unique_ptr<MutilColumnBlockPredicate>> _push_down_predicates;
388
    Arena _arena;
389
};
390
391
} // namespace doris