Coverage Report

Created: 2026-03-17 00:04

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_column_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <gen_cpp/parquet_types.h>
20
#include <glog/logging.h>
21
#include <stddef.h>
22
#include <stdint.h>
23
24
#include <list>
25
#include <memory>
26
#include <ostream>
27
#include <unordered_map>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "core/data_type/data_type.h"
32
#include "format/parquet/parquet_column_convert.h"
33
#include "format/parquet/parquet_common.h"
34
#include "format/parquet/vparquet_column_chunk_reader.h"
35
#include "format/table/table_format_reader.h"
36
#include "format/table/table_schema_change_helper.h"
37
#include "io/fs/buffered_reader.h"
38
#include "io/fs/file_reader_writer_fwd.h"
39
#include "parquet_column_convert.h"
40
#include "vparquet_column_chunk_reader.h"
41
42
namespace cctz {
43
class time_zone;
44
} // namespace cctz
45
46
namespace doris::io {
47
struct IOContext;
48
} // namespace doris::io
49
50
namespace doris {
51
#include "common/compile_check_begin.h"
52
struct FieldSchema;
53
template <typename T>
54
class ColumnStr;
55
using ColumnString = ColumnStr<UInt32>;
56
57
class ParquetColumnReader {
58
public:
59
    struct ColumnStatistics {
60
        ColumnStatistics()
61
140
                : page_index_read_calls(0),
62
140
                  decompress_time(0),
63
140
                  decompress_cnt(0),
64
140
                  decode_header_time(0),
65
140
                  decode_value_time(0),
66
140
                  decode_dict_time(0),
67
140
                  decode_level_time(0),
68
140
                  decode_null_map_time(0),
69
140
                  skip_page_header_num(0),
70
140
                  parse_page_header_num(0),
71
140
                  read_page_header_time(0),
72
140
                  page_read_counter(0),
73
140
                  page_cache_write_counter(0),
74
140
                  page_cache_compressed_write_counter(0),
75
140
                  page_cache_decompressed_write_counter(0),
76
140
                  page_cache_hit_counter(0),
77
140
                  page_cache_missing_counter(0),
78
140
                  page_cache_compressed_hit_counter(0),
79
140
                  page_cache_decompressed_hit_counter(0) {}
80
81
        ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time)
82
115
                : page_index_read_calls(0),
83
115
                  decompress_time(cs.decompress_time),
84
115
                  decompress_cnt(cs.decompress_cnt),
85
115
                  decode_header_time(cs.decode_header_time),
86
115
                  decode_value_time(cs.decode_value_time),
87
115
                  decode_dict_time(cs.decode_dict_time),
88
115
                  decode_level_time(cs.decode_level_time),
89
115
                  decode_null_map_time(null_map_time),
90
115
                  skip_page_header_num(cs.skip_page_header_num),
91
115
                  parse_page_header_num(cs.parse_page_header_num),
92
115
                  read_page_header_time(cs.read_page_header_time),
93
115
                  page_read_counter(cs.page_read_counter),
94
115
                  page_cache_write_counter(cs.page_cache_write_counter),
95
115
                  page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter),
96
115
                  page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter),
97
115
                  page_cache_hit_counter(cs.page_cache_hit_counter),
98
115
                  page_cache_missing_counter(cs.page_cache_missing_counter),
99
115
                  page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter),
100
115
                  page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {}
101
102
        int64_t page_index_read_calls;
103
        int64_t decompress_time;
104
        int64_t decompress_cnt;
105
        int64_t decode_header_time;
106
        int64_t decode_value_time;
107
        int64_t decode_dict_time;
108
        int64_t decode_level_time;
109
        int64_t decode_null_map_time;
110
        int64_t skip_page_header_num;
111
        int64_t parse_page_header_num;
112
        int64_t read_page_header_time;
113
        int64_t page_read_counter;
114
        int64_t page_cache_write_counter;
115
        int64_t page_cache_compressed_write_counter;
116
        int64_t page_cache_decompressed_write_counter;
117
        int64_t page_cache_hit_counter;
118
        int64_t page_cache_missing_counter;
119
        int64_t page_cache_compressed_hit_counter;
120
        int64_t page_cache_decompressed_hit_counter;
121
122
161
        void merge(ColumnStatistics& col_statistics) {
123
161
            page_index_read_calls += col_statistics.page_index_read_calls;
124
161
            decompress_time += col_statistics.decompress_time;
125
161
            decompress_cnt += col_statistics.decompress_cnt;
126
161
            decode_header_time += col_statistics.decode_header_time;
127
161
            decode_value_time += col_statistics.decode_value_time;
128
161
            decode_dict_time += col_statistics.decode_dict_time;
129
161
            decode_level_time += col_statistics.decode_level_time;
130
161
            decode_null_map_time += col_statistics.decode_null_map_time;
131
161
            skip_page_header_num += col_statistics.skip_page_header_num;
132
161
            parse_page_header_num += col_statistics.parse_page_header_num;
133
161
            read_page_header_time += col_statistics.read_page_header_time;
134
161
            page_read_counter += col_statistics.page_read_counter;
135
161
            page_cache_write_counter += col_statistics.page_cache_write_counter;
136
161
            page_cache_compressed_write_counter +=
137
161
                    col_statistics.page_cache_compressed_write_counter;
138
161
            page_cache_decompressed_write_counter +=
139
161
                    col_statistics.page_cache_decompressed_write_counter;
140
161
            page_cache_hit_counter += col_statistics.page_cache_hit_counter;
141
161
            page_cache_missing_counter += col_statistics.page_cache_missing_counter;
142
161
            page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter;
143
161
            page_cache_decompressed_hit_counter +=
144
161
                    col_statistics.page_cache_decompressed_hit_counter;
145
161
        }
146
    };
147
148
    ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
149
                        io::IOContext* io_ctx)
150
132
            : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {}
151
132
    virtual ~ParquetColumnReader() = default;
152
    virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
153
                                    const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
154
                                    FilterMap& filter_map, size_t batch_size, size_t* read_rows,
155
                                    bool* eof, bool is_dict_filter,
156
                                    int64_t real_column_size = -1) = 0;
157
158
0
    virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) {
159
0
        return Status::NotSupported("read_dict_values_to_column is not supported");
160
0
    }
161
162
    virtual Result<MutableColumnPtr> convert_dict_column_to_string_column(
163
0
            const ColumnInt32* dict_column) {
164
0
        throw Exception(
165
0
                Status::FatalError("Method convert_dict_column_to_string_column is not supported"));
166
0
    }
167
168
    static Status create(io::FileReaderSPtr file, FieldSchema* field,
169
                         const tparquet::RowGroup& row_group, const RowRanges& row_ranges,
170
                         const cctz::time_zone* ctz, io::IOContext* io_ctx,
171
                         std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size,
172
                         std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
173
                         RuntimeState* state, bool in_collection = false,
174
                         const std::set<uint64_t>& column_ids = {},
175
                         const std::set<uint64_t>& filter_column_ids = {});
176
    virtual const std::vector<level_t>& get_rep_level() const = 0;
177
    virtual const std::vector<level_t>& get_def_level() const = 0;
178
    virtual ColumnStatistics column_statistics() = 0;
179
    virtual void close() = 0;
180
181
    virtual void reset_filter_map_index() = 0;
182
183
0
    FieldSchema* get_field_schema() const { return _field_schema; }
184
28
    void set_column_in_nested() { _in_nested = true; }
185
186
protected:
187
    void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const;
188
189
    FieldSchema* _field_schema = nullptr;
190
    const RowRanges& _row_ranges;
191
    size_t _total_rows = 0;
192
    const cctz::time_zone* _ctz = nullptr;
193
    io::IOContext* _io_ctx = nullptr;
194
    int64_t _current_row_index = 0;
195
    int64_t _decode_null_map_time = 0;
196
197
    size_t _filter_map_index = 0;
198
    std::set<uint64_t> _filter_column_ids;
199
200
    // _in_nested: column in struct/map/array
201
    // IN_COLLECTION : column in map/array
202
    bool _in_nested = false;
203
};
204
205
template <bool IN_COLLECTION, bool OFFSET_INDEX>
206
class ScalarColumnReader : public ParquetColumnReader {
207
    ENABLE_FACTORY_CREATOR(ScalarColumnReader)
208
public:
209
    ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows,
210
                       const tparquet::ColumnChunk& chunk_meta,
211
                       const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz,
212
                       io::IOContext* io_ctx)
213
115
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx),
214
115
              _chunk_meta(chunk_meta),
215
115
              _offset_index(offset_index) {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
_ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
Line
Count
Source
213
3
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx),
214
3
              _chunk_meta(chunk_meta),
215
3
              _offset_index(offset_index) {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
_ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
Line
Count
Source
213
112
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx),
214
112
              _chunk_meta(chunk_meta),
215
112
              _offset_index(offset_index) {}
216
115
    ~ScalarColumnReader() override { close(); }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev
_ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev
Line
Count
Source
216
3
    ~ScalarColumnReader() override { close(); }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev
_ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev
Line
Count
Source
216
112
    ~ScalarColumnReader() override { close(); }
217
    Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size,
218
                RuntimeState* state);
219
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
220
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
221
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
222
                            bool is_dict_filter, int64_t real_column_size = -1) override;
223
    Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override;
224
    Result<MutableColumnPtr> convert_dict_column_to_string_column(
225
            const ColumnInt32* dict_column) override;
226
13
    const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv
_ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv
Line
Count
Source
226
4
    const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv
_ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv
Line
Count
Source
226
9
    const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
227
13
    const std::vector<level_t>& get_def_level() const override { return _def_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv
_ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv
Line
Count
Source
227
4
    const std::vector<level_t>& get_def_level() const override { return _def_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv
_ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv
Line
Count
Source
227
9
    const std::vector<level_t>& get_def_level() const override { return _def_levels; }
228
115
    ColumnStatistics column_statistics() override {
229
115
        return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time);
230
115
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv
_ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv
Line
Count
Source
228
3
    ColumnStatistics column_statistics() override {
229
3
        return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time);
230
3
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv
_ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv
Line
Count
Source
228
112
    ColumnStatistics column_statistics() override {
229
112
        return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time);
230
112
    }
231
115
    void close() override {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv
_ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv
Line
Count
Source
231
3
    void close() override {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv
_ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv
Line
Count
Source
231
112
    void close() override {}
232
233
132
    void reset_filter_map_index() override {
234
132
        _filter_map_index = 0; // nested
235
132
        _orig_filter_map_index = 0;
236
132
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv
_ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv
Line
Count
Source
233
3
    void reset_filter_map_index() override {
234
3
        _filter_map_index = 0; // nested
235
3
        _orig_filter_map_index = 0;
236
3
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv
_ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv
Line
Count
Source
233
129
    void reset_filter_map_index() override {
234
129
        _filter_map_index = 0; // nested
235
129
        _orig_filter_map_index = 0;
236
129
    }
237
238
private:
239
    tparquet::ColumnChunk _chunk_meta;
240
    const tparquet::OffsetIndex* _offset_index = nullptr;
241
    std::unique_ptr<io::BufferedFileStreamReader> _stream_reader;
242
    std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader;
243
    // rep def levels buffer.
244
    std::vector<level_t> _rep_levels;
245
    std::vector<level_t> _def_levels;
246
247
    size_t _current_range_idx = 0;
248
249
    Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx,
250
                               std::vector<uint16_t>& null_map,
251
13
                               std::unordered_set<size_t>& ancestor_null_indices) {
252
13
        size_t has_read = level_start_idx;
253
13
        null_map.emplace_back(0);
254
13
        bool prev_is_null = false;
255
256
26
        while (has_read < level_end_idx) {
257
13
            level_t def_level = _def_levels[has_read++];
258
13
            size_t loop_read = 1;
259
19
            while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) {
260
6
                has_read++;
261
6
                loop_read++;
262
6
            }
263
264
13
            if (def_level < _field_schema->repeated_parent_def_level) {
265
0
                for (size_t i = 0; i < loop_read; i++) {
266
0
                    ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i);
267
0
                }
268
0
                continue;
269
0
            }
270
271
13
            bool is_null = def_level < _field_schema->definition_level;
272
273
13
            if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) {
274
13
                null_map.back() += loop_read;
275
13
            } else {
276
0
                if (!(prev_is_null ^ is_null)) {
277
0
                    null_map.emplace_back(0);
278
0
                }
279
0
                size_t remaining = loop_read;
280
0
                while (remaining > USHRT_MAX) {
281
0
                    null_map.emplace_back(USHRT_MAX);
282
0
                    null_map.emplace_back(0);
283
0
                    remaining -= USHRT_MAX;
284
0
                }
285
0
                null_map.emplace_back((u_short)remaining);
286
0
                prev_is_null = is_null;
287
0
            }
288
13
        }
289
13
        return Status::OK();
290
13
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
_ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
Line
Count
Source
251
3
                               std::unordered_set<size_t>& ancestor_null_indices) {
252
3
        size_t has_read = level_start_idx;
253
3
        null_map.emplace_back(0);
254
3
        bool prev_is_null = false;
255
256
6
        while (has_read < level_end_idx) {
257
3
            level_t def_level = _def_levels[has_read++];
258
3
            size_t loop_read = 1;
259
9
            while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) {
260
6
                has_read++;
261
6
                loop_read++;
262
6
            }
263
264
3
            if (def_level < _field_schema->repeated_parent_def_level) {
265
0
                for (size_t i = 0; i < loop_read; i++) {
266
0
                    ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i);
267
0
                }
268
0
                continue;
269
0
            }
270
271
3
            bool is_null = def_level < _field_schema->definition_level;
272
273
3
            if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) {
274
3
                null_map.back() += loop_read;
275
3
            } else {
276
0
                if (!(prev_is_null ^ is_null)) {
277
0
                    null_map.emplace_back(0);
278
0
                }
279
0
                size_t remaining = loop_read;
280
0
                while (remaining > USHRT_MAX) {
281
0
                    null_map.emplace_back(USHRT_MAX);
282
0
                    null_map.emplace_back(0);
283
                    remaining -= USHRT_MAX;
284
0
                }
285
0
                null_map.emplace_back((u_short)remaining);
286
0
                prev_is_null = is_null;
287
0
            }
288
3
        }
289
3
        return Status::OK();
290
3
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
_ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
Line
Count
Source
251
10
                               std::unordered_set<size_t>& ancestor_null_indices) {
252
10
        size_t has_read = level_start_idx;
253
10
        null_map.emplace_back(0);
254
10
        bool prev_is_null = false;
255
256
20
        while (has_read < level_end_idx) {
257
10
            level_t def_level = _def_levels[has_read++];
258
10
            size_t loop_read = 1;
259
10
            while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) {
260
0
                has_read++;
261
0
                loop_read++;
262
0
            }
263
264
10
            if (def_level < _field_schema->repeated_parent_def_level) {
265
0
                for (size_t i = 0; i < loop_read; i++) {
266
0
                    ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i);
267
0
                }
268
0
                continue;
269
0
            }
270
271
10
            bool is_null = def_level < _field_schema->definition_level;
272
273
10
            if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) {
274
10
                null_map.back() += loop_read;
275
10
            } else {
276
0
                if (!(prev_is_null ^ is_null)) {
277
0
                    null_map.emplace_back(0);
278
0
                }
279
0
                size_t remaining = loop_read;
280
0
                while (remaining > USHRT_MAX) {
281
0
                    null_map.emplace_back(USHRT_MAX);
282
0
                    null_map.emplace_back(0);
283
                    remaining -= USHRT_MAX;
284
0
                }
285
0
                null_map.emplace_back((u_short)remaining);
286
0
                prev_is_null = is_null;
287
0
            }
288
10
        }
289
10
        return Status::OK();
290
10
    }
291
292
    Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx,
293
                          size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data,
294
0
                          std::unique_ptr<FilterMap>* nested_filter_map) {
295
0
        nested_filter_map_data.resize(level_end_idx - level_start_idx);
296
0
        for (size_t idx = level_start_idx; idx < level_end_idx; idx++) {
297
0
            if (idx != level_start_idx && _rep_levels[idx] == 0) {
298
0
                filter_loc++;
299
0
            }
300
0
            nested_filter_map_data[idx - level_start_idx] =
301
0
                    filter_map.filter_map_data()[filter_loc];
302
0
        }
303
304
0
        auto new_filter = std::make_unique<FilterMap>();
305
0
        RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(),
306
0
                                         nested_filter_map_data.size(), false));
307
0
        *nested_filter_map = std::move(new_filter);
308
309
0
        return Status::OK();
310
0
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
311
312
    std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr;
313
    std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr;
314
    size_t _orig_filter_map_index = 0;
315
316
    Status _skip_values(size_t num_values);
317
    Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type,
318
                        FilterMap& filter_map, bool is_dict_filter);
319
    Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map,
320
                               size_t batch_size, size_t* read_rows, bool* eof,
321
                               bool is_dict_filter);
322
    Status _try_load_dict_page(bool* loaded, bool* has_dict);
323
};
324
325
class ArrayColumnReader : public ParquetColumnReader {
326
    ENABLE_FACTORY_CREATOR(ArrayColumnReader)
327
public:
328
    ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
329
                      io::IOContext* io_ctx)
330
2
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
331
2
    ~ArrayColumnReader() override { close(); }
332
    Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field);
333
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
334
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
335
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
336
                            bool is_dict_filter, int64_t real_column_size = -1) override;
337
0
    const std::vector<level_t>& get_rep_level() const override {
338
0
        return _element_reader->get_rep_level();
339
0
    }
340
0
    const std::vector<level_t>& get_def_level() const override {
341
0
        return _element_reader->get_def_level();
342
0
    }
343
2
    ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); }
344
2
    void close() override {}
345
346
2
    void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); }
347
348
private:
349
    std::unique_ptr<ParquetColumnReader> _element_reader;
350
};
351
352
class MapColumnReader : public ParquetColumnReader {
353
    ENABLE_FACTORY_CREATOR(MapColumnReader)
354
public:
355
    MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
356
                    io::IOContext* io_ctx)
357
0
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
358
0
    ~MapColumnReader() override { close(); }
359
360
    Status init(std::unique_ptr<ParquetColumnReader> key_reader,
361
                std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field);
362
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
363
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
364
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
365
                            bool is_dict_filter, int64_t real_column_size = -1) override;
366
367
0
    const std::vector<level_t>& get_rep_level() const override {
368
0
        return _key_reader->get_rep_level();
369
0
    }
370
0
    const std::vector<level_t>& get_def_level() const override {
371
0
        return _key_reader->get_def_level();
372
0
    }
373
374
0
    ColumnStatistics column_statistics() override {
375
0
        ColumnStatistics kst = _key_reader->column_statistics();
376
0
        ColumnStatistics vst = _value_reader->column_statistics();
377
0
        kst.merge(vst);
378
0
        return kst;
379
0
    }
380
381
0
    void close() override {}
382
383
0
    void reset_filter_map_index() override {
384
0
        _key_reader->reset_filter_map_index();
385
0
        _value_reader->reset_filter_map_index();
386
0
    }
387
388
private:
389
    std::unique_ptr<ParquetColumnReader> _key_reader;
390
    std::unique_ptr<ParquetColumnReader> _value_reader;
391
};
392
393
class StructColumnReader : public ParquetColumnReader {
394
    ENABLE_FACTORY_CREATOR(StructColumnReader)
395
public:
396
    StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
397
                       io::IOContext* io_ctx)
398
11
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
399
11
    ~StructColumnReader() override { close(); }
400
401
    Status init(
402
            std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers,
403
            FieldSchema* field);
404
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
405
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
406
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
407
                            bool is_dict_filter, int64_t real_column_size = -1) override;
408
409
17
    const std::vector<level_t>& get_rep_level() const override {
410
17
        if (!_read_column_names.empty()) {
411
            // can't use _child_readers[*_read_column_names.begin()]
412
            // because the operator[] of std::unordered_map is not const :(
413
            /*
414
             * Considering the issue in the `_read_nested_column` function where data may span across pages, leading
415
             * to missing definition and repetition levels, when filling the null_map of the struct later, it is
416
             * crucial to use the definition and repetition levels from the first read column,
417
             * that is `_read_column_names.front()`.
418
             */
419
17
            return _child_readers.find(_read_column_names.front())->second->get_rep_level();
420
17
        }
421
0
        return _child_readers.begin()->second->get_rep_level();
422
17
    }
423
424
17
    const std::vector<level_t>& get_def_level() const override {
425
17
        if (!_read_column_names.empty()) {
426
17
            return _child_readers.find(_read_column_names.front())->second->get_def_level();
427
17
        }
428
0
        return _child_readers.begin()->second->get_def_level();
429
17
    }
430
431
11
    ColumnStatistics column_statistics() override {
432
11
        ColumnStatistics st;
433
22
        for (const auto& column_name : _read_column_names) {
434
22
            auto reader = _child_readers.find(column_name);
435
22
            if (reader != _child_readers.end()) {
436
22
                ColumnStatistics cst = reader->second->column_statistics();
437
22
                st.merge(cst);
438
22
            }
439
22
        }
440
11
        return st;
441
11
    }
442
443
11
    void close() override {}
444
445
11
    void reset_filter_map_index() override {
446
26
        for (const auto& reader : _child_readers) {
447
26
            reader.second->reset_filter_map_index();
448
26
        }
449
11
    }
450
451
private:
452
    std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers;
453
    std::vector<std::string> _read_column_names;
454
    //Need to use vector instead of set,see `get_rep_level()` for the reason.
455
};
456
457
// A special reader that skips actual reading but provides empty data with correct structure
458
// This is used when a column is not needed but its structure is required (e.g., for map keys)
459
class SkipReadingReader : public ParquetColumnReader {
460
public:
461
    SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
462
                      io::IOContext* io_ctx, FieldSchema* field_schema)
463
4
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {
464
4
        _field_schema = field_schema; // Use inherited member from base class
465
4
        VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: "
466
0
                   << _field_schema->name;
467
4
    }
468
469
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
470
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
471
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
472
4
                            bool is_dict_filter, int64_t real_column_size = -1) override {
473
4
        VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: "
474
0
                   << _field_schema->name << ", batch_size: " << batch_size;
475
4
        DCHECK(real_column_size >= 0); // real_column_size for filtered column size.
476
477
        // Simulate reading without actually reading data
478
        // Fill with default/null values based on column type
479
4
        MutableColumnPtr data_column = doris_column->assume_mutable();
480
481
4
        if (real_column_size > 0) {
482
4
            if (doris_column->is_nullable()) {
483
4
                auto* nullable_column = static_cast<ColumnNullable*>(data_column.get());
484
4
                nullable_column->insert_many_defaults(real_column_size);
485
4
            } else {
486
                // For non-nullable columns, insert appropriate default values
487
0
                for (size_t i = 0; i < real_column_size; ++i) {
488
0
                    data_column->insert_default();
489
0
                }
490
0
            }
491
4
        }
492
493
4
        *read_rows = batch_size; // Indicate we "read" batch_size rows
494
4
        *eof = false;            // We can always provide more empty data
495
496
4
        VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size
497
0
                   << " default values for field: " << _field_schema->name;
498
499
4
        return Status::OK();
500
4
    }
501
502
    static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges,
503
                                                            size_t total_rows, cctz::time_zone* ctz,
504
                                                            io::IOContext* io_ctx,
505
0
                                                            FieldSchema* field_schema) {
506
0
        return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx,
507
0
                                                   field_schema);
508
0
    }
509
510
    // These methods should not be called for SkipReadingReader
511
    // If they are called, it indicates a logic error in the code
512
0
    const std::vector<level_t>& get_rep_level() const override {
513
0
        LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: "
514
0
                   << _field_schema->name
515
0
                   << ". This indicates the SkipReadingReader was incorrectly used as a reference "
516
0
                      "column.";
517
0
        __builtin_unreachable();
518
0
    }
519
520
0
    const std::vector<level_t>& get_def_level() const override {
521
0
        LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: "
522
0
                   << _field_schema->name
523
0
                   << ". This indicates the SkipReadingReader was incorrectly used as a reference "
524
0
                      "column.";
525
0
        __builtin_unreachable();
526
0
    }
527
528
    // Implement required pure virtual methods from base class
529
0
    ColumnStatistics column_statistics() override {
530
0
        return ColumnStatistics(); // Return empty statistics
531
0
    }
532
533
0
    void close() override {
534
        // Nothing to close for skip reading
535
0
    }
536
537
4
    void reset_filter_map_index() override { _filter_map_index = 0; }
538
};
539
540
#include "common/compile_check_end.h"
541
542
}; // namespace doris