Coverage Report

Created: 2026-05-16 22:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/vparquet_column_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <gen_cpp/parquet_types.h>
20
#include <glog/logging.h>
21
22
#include <cstddef>
23
#include <cstdint>
24
#include <list>
25
#include <memory>
26
#include <ostream>
27
#include <set>
28
#include <string>
29
#include <unordered_map>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "core/data_type/data_type.h"
34
#include "format/generic_reader.h"
35
#include "format/parquet/parquet_column_convert.h"
36
#include "format/parquet/parquet_common.h"
37
#include "format/parquet/vparquet_column_chunk_reader.h"
38
#include "format/table/table_schema_change_helper.h"
39
#include "io/fs/buffered_reader.h"
40
#include "io/fs/file_reader_writer_fwd.h"
41
#include "parquet_column_convert.h"
42
#include "vparquet_column_chunk_reader.h"
43
44
namespace cctz {
45
class time_zone;
46
} // namespace cctz
47
48
namespace doris::io {
49
struct IOContext;
50
} // namespace doris::io
51
52
namespace doris {
53
class Field;
54
struct FieldSchema;
55
class IColumn;
56
class ColumnVariant;
57
template <typename T>
58
class ColumnStr;
59
using ColumnString = ColumnStr<UInt32>;
60
61
#ifdef BE_TEST
62
namespace parquet_variant_reader_test {
63
bool can_direct_read_typed_value_for_test(const FieldSchema& typed_value_field);
64
bool can_use_direct_typed_only_value_for_test(const FieldSchema& variant_field,
65
                                              const std::set<uint64_t>& column_ids);
66
Status append_direct_typed_column_to_batch_for_test(const FieldSchema& typed_value_field,
67
                                                    const IColumn& typed_value_column, size_t start,
68
                                                    size_t rows, ColumnVariant* batch);
69
Status read_variant_row_for_test(const FieldSchema& variant_field, const Field& field,
70
                                 bool output_nullable, Field* result, bool* sql_null);
71
Status variant_to_json_for_test(const FieldSchema& variant_field, const Field& field,
72
                                const std::string& inherited_metadata, std::string* json,
73
                                bool* present);
74
} // namespace parquet_variant_reader_test
75
#endif
76
77
class ParquetColumnReader {
78
public:
79
    struct ColumnStatistics {
80
        ColumnStatistics()
81
152
                : page_index_read_calls(0),
82
152
                  decompress_time(0),
83
152
                  decompress_cnt(0),
84
152
                  decode_header_time(0),
85
152
                  decode_value_time(0),
86
152
                  decode_dict_time(0),
87
152
                  decode_level_time(0),
88
152
                  decode_null_map_time(0),
89
152
                  convert_time(0),
90
152
                  skip_page_header_num(0),
91
152
                  parse_page_header_num(0),
92
152
                  read_page_header_time(0),
93
152
                  page_read_counter(0),
94
152
                  page_cache_write_counter(0),
95
152
                  page_cache_compressed_write_counter(0),
96
152
                  page_cache_decompressed_write_counter(0),
97
152
                  page_cache_hit_counter(0),
98
152
                  page_cache_missing_counter(0),
99
152
                  page_cache_compressed_hit_counter(0),
100
152
                  page_cache_decompressed_hit_counter(0),
101
152
                  variant_direct_typed_value_read_rows(0),
102
152
                  variant_rowwise_read_rows(0) {}
103
104
        ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time,
105
                         int64_t convert_time_)
106
119
                : page_index_read_calls(0),
107
119
                  decompress_time(cs.decompress_time),
108
119
                  decompress_cnt(cs.decompress_cnt),
109
119
                  decode_header_time(cs.decode_header_time),
110
119
                  decode_value_time(cs.decode_value_time),
111
119
                  decode_dict_time(cs.decode_dict_time),
112
119
                  decode_level_time(cs.decode_level_time),
113
119
                  decode_null_map_time(null_map_time),
114
119
                  convert_time(convert_time_),
115
119
                  skip_page_header_num(cs.skip_page_header_num),
116
119
                  parse_page_header_num(cs.parse_page_header_num),
117
119
                  read_page_header_time(cs.read_page_header_time),
118
119
                  page_read_counter(cs.page_read_counter),
119
119
                  page_cache_write_counter(cs.page_cache_write_counter),
120
119
                  page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter),
121
119
                  page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter),
122
119
                  page_cache_hit_counter(cs.page_cache_hit_counter),
123
119
                  page_cache_missing_counter(cs.page_cache_missing_counter),
124
119
                  page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter),
125
119
                  page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter),
126
119
                  variant_direct_typed_value_read_rows(0),
127
119
                  variant_rowwise_read_rows(0) {}
128
129
        int64_t page_index_read_calls;
130
        int64_t decompress_time;
131
        int64_t decompress_cnt;
132
        int64_t decode_header_time;
133
        int64_t decode_value_time;
134
        int64_t decode_dict_time;
135
        int64_t decode_level_time;
136
        int64_t decode_null_map_time;
137
        int64_t convert_time;
138
        int64_t skip_page_header_num;
139
        int64_t parse_page_header_num;
140
        int64_t read_page_header_time;
141
        int64_t page_read_counter;
142
        int64_t page_cache_write_counter;
143
        int64_t page_cache_compressed_write_counter;
144
        int64_t page_cache_decompressed_write_counter;
145
        int64_t page_cache_hit_counter;
146
        int64_t page_cache_missing_counter;
147
        int64_t page_cache_compressed_hit_counter;
148
        int64_t page_cache_decompressed_hit_counter;
149
        int64_t variant_direct_typed_value_read_rows;
150
        int64_t variant_rowwise_read_rows;
151
152
168
        void merge(ColumnStatistics& col_statistics) {
153
168
            page_index_read_calls += col_statistics.page_index_read_calls;
154
168
            decompress_time += col_statistics.decompress_time;
155
168
            decompress_cnt += col_statistics.decompress_cnt;
156
168
            decode_header_time += col_statistics.decode_header_time;
157
168
            decode_value_time += col_statistics.decode_value_time;
158
168
            decode_dict_time += col_statistics.decode_dict_time;
159
168
            decode_level_time += col_statistics.decode_level_time;
160
168
            decode_null_map_time += col_statistics.decode_null_map_time;
161
168
            convert_time += col_statistics.convert_time;
162
168
            skip_page_header_num += col_statistics.skip_page_header_num;
163
168
            parse_page_header_num += col_statistics.parse_page_header_num;
164
168
            read_page_header_time += col_statistics.read_page_header_time;
165
168
            page_read_counter += col_statistics.page_read_counter;
166
168
            page_cache_write_counter += col_statistics.page_cache_write_counter;
167
168
            page_cache_compressed_write_counter +=
168
168
                    col_statistics.page_cache_compressed_write_counter;
169
168
            page_cache_decompressed_write_counter +=
170
168
                    col_statistics.page_cache_decompressed_write_counter;
171
168
            page_cache_hit_counter += col_statistics.page_cache_hit_counter;
172
168
            page_cache_missing_counter += col_statistics.page_cache_missing_counter;
173
168
            page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter;
174
168
            page_cache_decompressed_hit_counter +=
175
168
                    col_statistics.page_cache_decompressed_hit_counter;
176
168
            variant_direct_typed_value_read_rows +=
177
168
                    col_statistics.variant_direct_typed_value_read_rows;
178
168
            variant_rowwise_read_rows += col_statistics.variant_rowwise_read_rows;
179
168
        }
180
    };
181
182
    ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
183
                        io::IOContext* io_ctx)
184
136
            : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {}
185
136
    virtual ~ParquetColumnReader() = default;
186
    virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
187
                                    const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
188
                                    FilterMap& filter_map, size_t batch_size, size_t* read_rows,
189
                                    bool* eof, bool is_dict_filter,
190
                                    int64_t real_column_size = -1) = 0;
191
192
0
    virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) {
193
0
        return Status::NotSupported("read_dict_values_to_column is not supported");
194
0
    }
195
196
    virtual Result<MutableColumnPtr> convert_dict_column_to_string_column(
197
0
            const ColumnInt32* dict_column) {
198
0
        throw Exception(
199
0
                Status::FatalError("Method convert_dict_column_to_string_column is not supported"));
200
0
    }
201
202
    static Status create(io::FileReaderSPtr file, FieldSchema* field,
203
                         const tparquet::RowGroup& row_group, const RowRanges& row_ranges,
204
                         const cctz::time_zone* ctz, io::IOContext* io_ctx,
205
                         std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size,
206
                         std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
207
                         RuntimeState* state, bool in_collection = false,
208
                         const std::set<uint64_t>& column_ids = {},
209
                         const std::set<uint64_t>& filter_column_ids = {});
210
    virtual const std::vector<level_t>& get_rep_level() const = 0;
211
    virtual const std::vector<level_t>& get_def_level() const = 0;
212
    virtual ColumnStatistics column_statistics() = 0;
213
    virtual void close() = 0;
214
215
    virtual void reset_filter_map_index() = 0;
216
217
0
    FieldSchema* get_field_schema() const { return _field_schema; }
218
28
    void set_column_in_nested() { _in_nested = true; }
219
220
protected:
221
    void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const;
222
223
    FieldSchema* _field_schema = nullptr;
224
    const RowRanges& _row_ranges;
225
    size_t _total_rows = 0;
226
    const cctz::time_zone* _ctz = nullptr;
227
    io::IOContext* _io_ctx = nullptr;
228
    int64_t _current_row_index = 0;
229
    int64_t _decode_null_map_time = 0;
230
231
    size_t _filter_map_index = 0;
232
    std::set<uint64_t> _filter_column_ids;
233
234
    // _in_nested: column in struct/map/array
235
    // IN_COLLECTION : column in map/array
236
    bool _in_nested = false;
237
};
238
239
template <bool IN_COLLECTION, bool OFFSET_INDEX>
240
class ScalarColumnReader : public ParquetColumnReader {
241
    ENABLE_FACTORY_CREATOR(ScalarColumnReader)
242
public:
243
    ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows,
244
                       const tparquet::ColumnChunk& chunk_meta,
245
                       const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz,
246
                       io::IOContext* io_ctx)
247
119
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx),
248
119
              _chunk_meta(chunk_meta),
249
119
              _offset_index(offset_index) {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
_ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
Line
Count
Source
247
3
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx),
248
3
              _chunk_meta(chunk_meta),
249
3
              _offset_index(offset_index) {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
_ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE
Line
Count
Source
247
116
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx),
248
116
              _chunk_meta(chunk_meta),
249
116
              _offset_index(offset_index) {}
250
119
    ~ScalarColumnReader() override { close(); }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev
_ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev
Line
Count
Source
250
3
    ~ScalarColumnReader() override { close(); }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev
_ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev
Line
Count
Source
250
116
    ~ScalarColumnReader() override { close(); }
251
    Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size,
252
                RuntimeState* state);
253
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
254
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
255
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
256
                            bool is_dict_filter, int64_t real_column_size = -1) override;
257
    Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override;
258
    Result<MutableColumnPtr> convert_dict_column_to_string_column(
259
            const ColumnInt32* dict_column) override;
260
13
    const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv
_ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv
Line
Count
Source
260
4
    const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv
_ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv
Line
Count
Source
260
9
    const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }
261
13
    const std::vector<level_t>& get_def_level() const override { return _def_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv
_ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv
Line
Count
Source
261
4
    const std::vector<level_t>& get_def_level() const override { return _def_levels; }
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv
_ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv
Line
Count
Source
261
9
    const std::vector<level_t>& get_def_level() const override { return _def_levels; }
262
119
    ColumnStatistics column_statistics() override {
263
119
        return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time,
264
119
                                _convert_time);
265
119
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv
_ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv
Line
Count
Source
262
3
    ColumnStatistics column_statistics() override {
263
3
        return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time,
264
3
                                _convert_time);
265
3
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv
_ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv
Line
Count
Source
262
116
    ColumnStatistics column_statistics() override {
263
116
        return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time,
264
116
                                _convert_time);
265
116
    }
266
119
    void close() override {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv
_ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv
Line
Count
Source
266
3
    void close() override {}
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv
_ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv
Line
Count
Source
266
116
    void close() override {}
267
268
224
    void reset_filter_map_index() override {
269
224
        _filter_map_index = 0; // nested
270
224
        _orig_filter_map_index = 0;
271
224
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv
_ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv
Line
Count
Source
268
3
    void reset_filter_map_index() override {
269
3
        _filter_map_index = 0; // nested
270
3
        _orig_filter_map_index = 0;
271
3
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv
_ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv
Line
Count
Source
268
221
    void reset_filter_map_index() override {
269
221
        _filter_map_index = 0; // nested
270
221
        _orig_filter_map_index = 0;
271
221
    }
272
273
private:
274
    tparquet::ColumnChunk _chunk_meta;
275
    const tparquet::OffsetIndex* _offset_index = nullptr;
276
    std::unique_ptr<io::BufferedFileStreamReader> _stream_reader;
277
    std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader;
278
    // rep def levels buffer.
279
    std::vector<level_t> _rep_levels;
280
    std::vector<level_t> _def_levels;
281
282
    size_t _current_range_idx = 0;
283
284
    Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx,
285
                               std::vector<uint16_t>& null_map,
286
13
                               std::unordered_set<size_t>& ancestor_null_indices) {
287
13
        size_t has_read = level_start_idx;
288
13
        null_map.emplace_back(0);
289
13
        bool prev_is_null = false;
290
291
26
        while (has_read < level_end_idx) {
292
13
            level_t def_level = _def_levels[has_read++];
293
13
            size_t loop_read = 1;
294
19
            while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) {
295
6
                has_read++;
296
6
                loop_read++;
297
6
            }
298
299
13
            if (def_level < _field_schema->repeated_parent_def_level) {
300
0
                for (size_t i = 0; i < loop_read; i++) {
301
0
                    ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i);
302
0
                }
303
0
                continue;
304
0
            }
305
306
13
            bool is_null = def_level < _field_schema->definition_level;
307
308
13
            if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) {
309
13
                null_map.back() += loop_read;
310
13
            } else {
311
0
                if (!(prev_is_null ^ is_null)) {
312
0
                    null_map.emplace_back(0);
313
0
                }
314
0
                size_t remaining = loop_read;
315
0
                while (remaining > USHRT_MAX) {
316
0
                    null_map.emplace_back(USHRT_MAX);
317
0
                    null_map.emplace_back(0);
318
0
                    remaining -= USHRT_MAX;
319
0
                }
320
0
                null_map.emplace_back((u_short)remaining);
321
0
                prev_is_null = is_null;
322
0
            }
323
13
        }
324
13
        return Status::OK();
325
13
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
_ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
Line
Count
Source
286
3
                               std::unordered_set<size_t>& ancestor_null_indices) {
287
3
        size_t has_read = level_start_idx;
288
3
        null_map.emplace_back(0);
289
3
        bool prev_is_null = false;
290
291
6
        while (has_read < level_end_idx) {
292
3
            level_t def_level = _def_levels[has_read++];
293
3
            size_t loop_read = 1;
294
9
            while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) {
295
6
                has_read++;
296
6
                loop_read++;
297
6
            }
298
299
3
            if (def_level < _field_schema->repeated_parent_def_level) {
300
0
                for (size_t i = 0; i < loop_read; i++) {
301
0
                    ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i);
302
0
                }
303
0
                continue;
304
0
            }
305
306
3
            bool is_null = def_level < _field_schema->definition_level;
307
308
3
            if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) {
309
3
                null_map.back() += loop_read;
310
3
            } else {
311
0
                if (!(prev_is_null ^ is_null)) {
312
0
                    null_map.emplace_back(0);
313
0
                }
314
0
                size_t remaining = loop_read;
315
0
                while (remaining > USHRT_MAX) {
316
0
                    null_map.emplace_back(USHRT_MAX);
317
0
                    null_map.emplace_back(0);
318
                    remaining -= USHRT_MAX;
319
0
                }
320
0
                null_map.emplace_back((u_short)remaining);
321
0
                prev_is_null = is_null;
322
0
            }
323
3
        }
324
3
        return Status::OK();
325
3
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
_ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE
Line
Count
Source
286
10
                               std::unordered_set<size_t>& ancestor_null_indices) {
287
10
        size_t has_read = level_start_idx;
288
10
        null_map.emplace_back(0);
289
10
        bool prev_is_null = false;
290
291
20
        while (has_read < level_end_idx) {
292
10
            level_t def_level = _def_levels[has_read++];
293
10
            size_t loop_read = 1;
294
10
            while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) {
295
0
                has_read++;
296
0
                loop_read++;
297
0
            }
298
299
10
            if (def_level < _field_schema->repeated_parent_def_level) {
300
0
                for (size_t i = 0; i < loop_read; i++) {
301
0
                    ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i);
302
0
                }
303
0
                continue;
304
0
            }
305
306
10
            bool is_null = def_level < _field_schema->definition_level;
307
308
10
            if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) {
309
10
                null_map.back() += loop_read;
310
10
            } else {
311
0
                if (!(prev_is_null ^ is_null)) {
312
0
                    null_map.emplace_back(0);
313
0
                }
314
0
                size_t remaining = loop_read;
315
0
                while (remaining > USHRT_MAX) {
316
0
                    null_map.emplace_back(USHRT_MAX);
317
0
                    null_map.emplace_back(0);
318
                    remaining -= USHRT_MAX;
319
0
                }
320
0
                null_map.emplace_back((u_short)remaining);
321
0
                prev_is_null = is_null;
322
0
            }
323
10
        }
324
10
        return Status::OK();
325
10
    }
326
327
    Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx,
328
                          size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data,
329
0
                          std::unique_ptr<FilterMap>* nested_filter_map) {
330
0
        nested_filter_map_data.resize(level_end_idx - level_start_idx);
331
0
        for (size_t idx = level_start_idx; idx < level_end_idx; idx++) {
332
0
            if (idx != level_start_idx && _rep_levels[idx] == 0) {
333
0
                filter_loc++;
334
0
            }
335
0
            nested_filter_map_data[idx - level_start_idx] =
336
0
                    filter_map.filter_map_data()[filter_loc];
337
0
        }
338
339
0
        auto new_filter = std::make_unique<FilterMap>();
340
0
        RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(),
341
0
                                         nested_filter_map_data.size(), false));
342
0
        *nested_filter_map = std::move(new_filter);
343
344
0
        return Status::OK();
345
0
    }
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE
346
347
    std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr;
348
    std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr;
349
    size_t _orig_filter_map_index = 0;
350
    int64_t _convert_time = 0;
351
352
    Status _skip_values(size_t num_values);
353
    Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type,
354
                        FilterMap& filter_map, bool is_dict_filter);
355
    Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map,
356
                               size_t batch_size, size_t* read_rows, bool* eof,
357
                               bool is_dict_filter);
358
    Status _try_load_dict_page(bool* loaded, bool* has_dict);
359
};
360
361
class ArrayColumnReader : public ParquetColumnReader {
362
    ENABLE_FACTORY_CREATOR(ArrayColumnReader)
363
public:
364
    ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
365
                      io::IOContext* io_ctx)
366
2
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
367
2
    ~ArrayColumnReader() override { close(); }
368
    Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field);
369
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
370
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
371
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
372
                            bool is_dict_filter, int64_t real_column_size = -1) override;
373
0
    const std::vector<level_t>& get_rep_level() const override {
374
0
        return _element_reader->get_rep_level();
375
0
    }
376
0
    const std::vector<level_t>& get_def_level() const override {
377
0
        return _element_reader->get_def_level();
378
0
    }
379
2
    ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); }
380
2
    void close() override {}
381
382
2
    void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); }
383
384
private:
385
    std::unique_ptr<ParquetColumnReader> _element_reader;
386
};
387
388
class MapColumnReader : public ParquetColumnReader {
389
    ENABLE_FACTORY_CREATOR(MapColumnReader)
390
public:
391
    MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
392
                    io::IOContext* io_ctx)
393
0
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
394
0
    ~MapColumnReader() override { close(); }
395
396
    Status init(std::unique_ptr<ParquetColumnReader> key_reader,
397
                std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field);
398
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
399
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
400
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
401
                            bool is_dict_filter, int64_t real_column_size = -1) override;
402
403
0
    const std::vector<level_t>& get_rep_level() const override {
404
0
        return _key_reader->get_rep_level();
405
0
    }
406
0
    const std::vector<level_t>& get_def_level() const override {
407
0
        return _key_reader->get_def_level();
408
0
    }
409
410
0
    ColumnStatistics column_statistics() override {
411
0
        ColumnStatistics kst = _key_reader->column_statistics();
412
0
        ColumnStatistics vst = _value_reader->column_statistics();
413
0
        kst.merge(vst);
414
0
        return kst;
415
0
    }
416
417
0
    void close() override {}
418
419
0
    void reset_filter_map_index() override {
420
0
        _key_reader->reset_filter_map_index();
421
0
        _value_reader->reset_filter_map_index();
422
0
    }
423
424
private:
425
    std::unique_ptr<ParquetColumnReader> _key_reader;
426
    std::unique_ptr<ParquetColumnReader> _value_reader;
427
};
428
429
class StructColumnReader : public ParquetColumnReader {
430
    ENABLE_FACTORY_CREATOR(StructColumnReader)
431
public:
432
    StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
433
                       io::IOContext* io_ctx)
434
11
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
435
11
    ~StructColumnReader() override { close(); }
436
437
    Status init(
438
            std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers,
439
            FieldSchema* field);
440
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
441
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
442
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
443
                            bool is_dict_filter, int64_t real_column_size = -1) override;
444
445
17
    const std::vector<level_t>& get_rep_level() const override {
446
17
        if (!_read_column_names.empty()) {
447
            // can't use _child_readers[*_read_column_names.begin()]
448
            // because the operator[] of std::unordered_map is not const :(
449
            /*
450
             * Considering the issue in the `_read_nested_column` function where data may span across pages, leading
451
             * to missing definition and repetition levels, when filling the null_map of the struct later, it is
452
             * crucial to use the definition and repetition levels from the first read column,
453
             * that is `_read_column_names.front()`.
454
             */
455
17
            return _child_readers.find(_read_column_names.front())->second->get_rep_level();
456
17
        }
457
0
        return _child_readers.begin()->second->get_rep_level();
458
17
    }
459
460
17
    const std::vector<level_t>& get_def_level() const override {
461
17
        if (!_read_column_names.empty()) {
462
17
            return _child_readers.find(_read_column_names.front())->second->get_def_level();
463
17
        }
464
0
        return _child_readers.begin()->second->get_def_level();
465
17
    }
466
467
11
    ColumnStatistics column_statistics() override {
468
11
        ColumnStatistics st;
469
22
        for (const auto& column_name : _read_column_names) {
470
22
            auto reader = _child_readers.find(column_name);
471
22
            if (reader != _child_readers.end()) {
472
22
                ColumnStatistics cst = reader->second->column_statistics();
473
22
                st.merge(cst);
474
22
            }
475
22
        }
476
11
        return st;
477
11
    }
478
479
11
    void close() override {}
480
481
11
    void reset_filter_map_index() override {
482
26
        for (const auto& reader : _child_readers) {
483
26
            reader.second->reset_filter_map_index();
484
26
        }
485
11
    }
486
487
private:
488
    std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers;
489
    std::vector<std::string> _read_column_names;
490
    //Need to use vector instead of set,see `get_rep_level()` for the reason.
491
};
492
493
class VariantColumnReader : public ParquetColumnReader {
494
    ENABLE_FACTORY_CREATOR(VariantColumnReader)
495
public:
496
    VariantColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
497
                        io::IOContext* io_ctx)
498
0
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {}
499
0
    ~VariantColumnReader() override { close(); }
500
501
    Status init(io::FileReaderSPtr file, FieldSchema* field, const tparquet::RowGroup& row_group,
502
                size_t max_buf_size, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets,
503
                RuntimeState* state, bool in_collection, const std::set<uint64_t>& column_ids,
504
                const std::set<uint64_t>& filter_column_ids);
505
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
506
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
507
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
508
                            bool is_dict_filter, int64_t real_column_size = -1) override;
509
510
0
    const std::vector<level_t>& get_rep_level() const override {
511
0
        return _struct_reader->get_rep_level();
512
0
    }
513
0
    const std::vector<level_t>& get_def_level() const override {
514
0
        return _struct_reader->get_def_level();
515
0
    }
516
0
    ColumnStatistics column_statistics() override {
517
0
        auto statistics = _struct_reader->column_statistics();
518
0
        statistics.merge(_variant_statistics);
519
0
        return statistics;
520
0
    }
521
0
    void close() override {}
522
523
0
    void reset_filter_map_index() override { _struct_reader->reset_filter_map_index(); }
524
525
private:
526
    std::unique_ptr<FieldSchema> _variant_struct_field;
527
    std::unique_ptr<ParquetColumnReader> _struct_reader;
528
    std::set<uint64_t> _column_ids;
529
    ColumnStatistics _variant_statistics;
530
};
531
532
// A special reader that skips actual reading but provides empty data with correct structure
533
// This is used when a column is not needed but its structure is required (e.g., for map keys)
534
class SkipReadingReader : public ParquetColumnReader {
535
public:
536
    SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz,
537
                      io::IOContext* io_ctx, FieldSchema* field_schema)
538
4
            : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {
539
4
        _field_schema = field_schema; // Use inherited member from base class
540
4
        VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: "
541
0
                   << _field_schema->name;
542
4
    }
543
544
    Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type,
545
                            const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node,
546
                            FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof,
547
4
                            bool is_dict_filter, int64_t real_column_size = -1) override {
548
4
        VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: "
549
0
                   << _field_schema->name << ", batch_size: " << batch_size;
550
4
        DCHECK(real_column_size >= 0); // real_column_size for filtered column size.
551
552
        // Simulate reading without actually reading data
553
        // Fill with default/null values based on column type
554
4
        MutableColumnPtr data_column = doris_column->assume_mutable();
555
556
4
        if (real_column_size > 0) {
557
4
            if (doris_column->is_nullable()) {
558
4
                auto* nullable_column = static_cast<ColumnNullable*>(data_column.get());
559
4
                nullable_column->insert_many_defaults(real_column_size);
560
4
            } else {
561
                // For non-nullable columns, insert appropriate default values
562
0
                for (size_t i = 0; i < real_column_size; ++i) {
563
0
                    data_column->insert_default();
564
0
                }
565
0
            }
566
4
        }
567
568
4
        *read_rows = batch_size; // Indicate we "read" batch_size rows
569
4
        *eof = false;            // We can always provide more empty data
570
571
4
        VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size
572
0
                   << " default values for field: " << _field_schema->name;
573
574
4
        return Status::OK();
575
4
    }
576
577
    static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges,
578
                                                            size_t total_rows, cctz::time_zone* ctz,
579
                                                            io::IOContext* io_ctx,
580
0
                                                            FieldSchema* field_schema) {
581
0
        return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx,
582
0
                                                   field_schema);
583
0
    }
584
585
    // These methods should not be called for SkipReadingReader
586
    // If they are called, it indicates a logic error in the code
587
0
    const std::vector<level_t>& get_rep_level() const override {
588
0
        LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: "
589
0
                   << _field_schema->name
590
0
                   << ". This indicates the SkipReadingReader was incorrectly used as a reference "
591
0
                      "column.";
592
0
        __builtin_unreachable();
593
0
    }
594
595
0
    const std::vector<level_t>& get_def_level() const override {
596
0
        LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: "
597
0
                   << _field_schema->name
598
0
                   << ". This indicates the SkipReadingReader was incorrectly used as a reference "
599
0
                      "column.";
600
0
        __builtin_unreachable();
601
0
    }
602
603
    // Implement required pure virtual methods from base class
604
0
    ColumnStatistics column_statistics() override { return {}; }
605
606
0
    void close() override {
607
        // Nothing to close for skip reading
608
0
    }
609
610
4
    void reset_filter_map_index() override { _filter_map_index = 0; }
611
};
612
613
}; // namespace doris