be/src/format/parquet/vparquet_column_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/parquet_types.h> |
20 | | #include <glog/logging.h> |
21 | | |
22 | | #include <cstddef> |
23 | | #include <cstdint> |
24 | | #include <list> |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <set> |
28 | | #include <string> |
29 | | #include <unordered_map> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/status.h" |
33 | | #include "core/data_type/data_type.h" |
34 | | #include "format/generic_reader.h" |
35 | | #include "format/parquet/parquet_column_convert.h" |
36 | | #include "format/parquet/parquet_common.h" |
37 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
38 | | #include "format/table/table_schema_change_helper.h" |
39 | | #include "io/fs/buffered_reader.h" |
40 | | #include "io/fs/file_reader_writer_fwd.h" |
41 | | #include "parquet_column_convert.h" |
42 | | #include "vparquet_column_chunk_reader.h" |
43 | | |
44 | | namespace cctz { |
45 | | class time_zone; |
46 | | } // namespace cctz |
47 | | |
48 | | namespace doris::io { |
49 | | struct IOContext; |
50 | | } // namespace doris::io |
51 | | |
52 | | namespace doris { |
53 | | class Field; |
54 | | struct FieldSchema; |
55 | | class IColumn; |
56 | | class ColumnVariant; |
57 | | template <typename T> |
58 | | class ColumnStr; |
59 | | using ColumnString = ColumnStr<UInt32>; |
60 | | |
61 | | #ifdef BE_TEST |
62 | | namespace parquet_variant_reader_test { |
63 | | bool can_direct_read_typed_value_for_test(const FieldSchema& typed_value_field); |
64 | | bool can_use_direct_typed_only_value_for_test(const FieldSchema& variant_field, |
65 | | const std::set<uint64_t>& column_ids); |
66 | | Status append_direct_typed_column_to_batch_for_test(const FieldSchema& typed_value_field, |
67 | | const IColumn& typed_value_column, size_t start, |
68 | | size_t rows, ColumnVariant* batch); |
69 | | Status read_variant_row_for_test(const FieldSchema& variant_field, const Field& field, |
70 | | bool output_nullable, Field* result, bool* sql_null); |
71 | | Status variant_to_json_for_test(const FieldSchema& variant_field, const Field& field, |
72 | | const std::string& inherited_metadata, std::string* json, |
73 | | bool* present); |
74 | | } // namespace parquet_variant_reader_test |
75 | | #endif |
76 | | |
77 | | class ParquetColumnReader { |
78 | | public: |
79 | | struct ColumnStatistics { |
80 | | ColumnStatistics() |
81 | 152 | : page_index_read_calls(0), |
82 | 152 | decompress_time(0), |
83 | 152 | decompress_cnt(0), |
84 | 152 | decode_header_time(0), |
85 | 152 | decode_value_time(0), |
86 | 152 | decode_dict_time(0), |
87 | 152 | decode_level_time(0), |
88 | 152 | decode_null_map_time(0), |
89 | 152 | convert_time(0), |
90 | 152 | skip_page_header_num(0), |
91 | 152 | parse_page_header_num(0), |
92 | 152 | read_page_header_time(0), |
93 | 152 | page_read_counter(0), |
94 | 152 | page_cache_write_counter(0), |
95 | 152 | page_cache_compressed_write_counter(0), |
96 | 152 | page_cache_decompressed_write_counter(0), |
97 | 152 | page_cache_hit_counter(0), |
98 | 152 | page_cache_missing_counter(0), |
99 | 152 | page_cache_compressed_hit_counter(0), |
100 | 152 | page_cache_decompressed_hit_counter(0), |
101 | 152 | variant_direct_typed_value_read_rows(0), |
102 | 152 | variant_rowwise_read_rows(0) {} |
103 | | |
104 | | ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time, |
105 | | int64_t convert_time_) |
106 | 119 | : page_index_read_calls(0), |
107 | 119 | decompress_time(cs.decompress_time), |
108 | 119 | decompress_cnt(cs.decompress_cnt), |
109 | 119 | decode_header_time(cs.decode_header_time), |
110 | 119 | decode_value_time(cs.decode_value_time), |
111 | 119 | decode_dict_time(cs.decode_dict_time), |
112 | 119 | decode_level_time(cs.decode_level_time), |
113 | 119 | decode_null_map_time(null_map_time), |
114 | 119 | convert_time(convert_time_), |
115 | 119 | skip_page_header_num(cs.skip_page_header_num), |
116 | 119 | parse_page_header_num(cs.parse_page_header_num), |
117 | 119 | read_page_header_time(cs.read_page_header_time), |
118 | 119 | page_read_counter(cs.page_read_counter), |
119 | 119 | page_cache_write_counter(cs.page_cache_write_counter), |
120 | 119 | page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), |
121 | 119 | page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), |
122 | 119 | page_cache_hit_counter(cs.page_cache_hit_counter), |
123 | 119 | page_cache_missing_counter(cs.page_cache_missing_counter), |
124 | 119 | page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), |
125 | 119 | page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter), |
126 | 119 | variant_direct_typed_value_read_rows(0), |
127 | 119 | variant_rowwise_read_rows(0) {} |
128 | | |
129 | | int64_t page_index_read_calls; |
130 | | int64_t decompress_time; |
131 | | int64_t decompress_cnt; |
132 | | int64_t decode_header_time; |
133 | | int64_t decode_value_time; |
134 | | int64_t decode_dict_time; |
135 | | int64_t decode_level_time; |
136 | | int64_t decode_null_map_time; |
137 | | int64_t convert_time; |
138 | | int64_t skip_page_header_num; |
139 | | int64_t parse_page_header_num; |
140 | | int64_t read_page_header_time; |
141 | | int64_t page_read_counter; |
142 | | int64_t page_cache_write_counter; |
143 | | int64_t page_cache_compressed_write_counter; |
144 | | int64_t page_cache_decompressed_write_counter; |
145 | | int64_t page_cache_hit_counter; |
146 | | int64_t page_cache_missing_counter; |
147 | | int64_t page_cache_compressed_hit_counter; |
148 | | int64_t page_cache_decompressed_hit_counter; |
149 | | int64_t variant_direct_typed_value_read_rows; |
150 | | int64_t variant_rowwise_read_rows; |
151 | | |
152 | 168 | void merge(ColumnStatistics& col_statistics) { |
153 | 168 | page_index_read_calls += col_statistics.page_index_read_calls; |
154 | 168 | decompress_time += col_statistics.decompress_time; |
155 | 168 | decompress_cnt += col_statistics.decompress_cnt; |
156 | 168 | decode_header_time += col_statistics.decode_header_time; |
157 | 168 | decode_value_time += col_statistics.decode_value_time; |
158 | 168 | decode_dict_time += col_statistics.decode_dict_time; |
159 | 168 | decode_level_time += col_statistics.decode_level_time; |
160 | 168 | decode_null_map_time += col_statistics.decode_null_map_time; |
161 | 168 | convert_time += col_statistics.convert_time; |
162 | 168 | skip_page_header_num += col_statistics.skip_page_header_num; |
163 | 168 | parse_page_header_num += col_statistics.parse_page_header_num; |
164 | 168 | read_page_header_time += col_statistics.read_page_header_time; |
165 | 168 | page_read_counter += col_statistics.page_read_counter; |
166 | 168 | page_cache_write_counter += col_statistics.page_cache_write_counter; |
167 | 168 | page_cache_compressed_write_counter += |
168 | 168 | col_statistics.page_cache_compressed_write_counter; |
169 | 168 | page_cache_decompressed_write_counter += |
170 | 168 | col_statistics.page_cache_decompressed_write_counter; |
171 | 168 | page_cache_hit_counter += col_statistics.page_cache_hit_counter; |
172 | 168 | page_cache_missing_counter += col_statistics.page_cache_missing_counter; |
173 | 168 | page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; |
174 | 168 | page_cache_decompressed_hit_counter += |
175 | 168 | col_statistics.page_cache_decompressed_hit_counter; |
176 | 168 | variant_direct_typed_value_read_rows += |
177 | 168 | col_statistics.variant_direct_typed_value_read_rows; |
178 | 168 | variant_rowwise_read_rows += col_statistics.variant_rowwise_read_rows; |
179 | 168 | } |
180 | | }; |
181 | | |
182 | | ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
183 | | io::IOContext* io_ctx) |
184 | 136 | : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {} |
185 | 136 | virtual ~ParquetColumnReader() = default; |
186 | | virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
187 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
188 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, |
189 | | bool* eof, bool is_dict_filter, |
190 | | int64_t real_column_size = -1) = 0; |
191 | | |
192 | 0 | virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { |
193 | 0 | return Status::NotSupported("read_dict_values_to_column is not supported"); |
194 | 0 | } |
195 | | |
196 | | virtual Result<MutableColumnPtr> convert_dict_column_to_string_column( |
197 | 0 | const ColumnInt32* dict_column) { |
198 | 0 | throw Exception( |
199 | 0 | Status::FatalError("Method convert_dict_column_to_string_column is not supported")); |
200 | 0 | } |
201 | | |
202 | | static Status create(io::FileReaderSPtr file, FieldSchema* field, |
203 | | const tparquet::RowGroup& row_group, const RowRanges& row_ranges, |
204 | | const cctz::time_zone* ctz, io::IOContext* io_ctx, |
205 | | std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size, |
206 | | std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
207 | | RuntimeState* state, bool in_collection = false, |
208 | | const std::set<uint64_t>& column_ids = {}, |
209 | | const std::set<uint64_t>& filter_column_ids = {}); |
210 | | virtual const std::vector<level_t>& get_rep_level() const = 0; |
211 | | virtual const std::vector<level_t>& get_def_level() const = 0; |
212 | | virtual ColumnStatistics column_statistics() = 0; |
213 | | virtual void close() = 0; |
214 | | |
215 | | virtual void reset_filter_map_index() = 0; |
216 | | |
217 | 0 | FieldSchema* get_field_schema() const { return _field_schema; } |
218 | 28 | void set_column_in_nested() { _in_nested = true; } |
219 | | |
220 | | protected: |
221 | | void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; |
222 | | |
223 | | FieldSchema* _field_schema = nullptr; |
224 | | const RowRanges& _row_ranges; |
225 | | size_t _total_rows = 0; |
226 | | const cctz::time_zone* _ctz = nullptr; |
227 | | io::IOContext* _io_ctx = nullptr; |
228 | | int64_t _current_row_index = 0; |
229 | | int64_t _decode_null_map_time = 0; |
230 | | |
231 | | size_t _filter_map_index = 0; |
232 | | std::set<uint64_t> _filter_column_ids; |
233 | | |
234 | | // _in_nested: column in struct/map/array |
235 | | // IN_COLLECTION : column in map/array |
236 | | bool _in_nested = false; |
237 | | }; |
238 | | |
239 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
240 | | class ScalarColumnReader : public ParquetColumnReader { |
241 | | ENABLE_FACTORY_CREATOR(ScalarColumnReader) |
242 | | public: |
243 | | ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows, |
244 | | const tparquet::ColumnChunk& chunk_meta, |
245 | | const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz, |
246 | | io::IOContext* io_ctx) |
247 | 119 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), |
248 | 119 | _chunk_meta(chunk_meta), |
249 | 119 | _offset_index(offset_index) {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 247 | 3 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 248 | 3 | _chunk_meta(chunk_meta), | 249 | 3 | _offset_index(offset_index) {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 247 | 116 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 248 | 116 | _chunk_meta(chunk_meta), | 249 | 116 | _offset_index(offset_index) {} |
|
250 | 119 | ~ScalarColumnReader() override { close(); }Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev Line | Count | Source | 250 | 3 | ~ScalarColumnReader() override { close(); } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev Line | Count | Source | 250 | 116 | ~ScalarColumnReader() override { close(); } |
|
251 | | Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, |
252 | | RuntimeState* state); |
253 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
254 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
255 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
256 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
257 | | Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; |
258 | | Result<MutableColumnPtr> convert_dict_column_to_string_column( |
259 | | const ColumnInt32* dict_column) override; |
260 | 13 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv Line | Count | Source | 260 | 4 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv Line | Count | Source | 260 | 9 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
|
261 | 13 | const std::vector<level_t>& get_def_level() const override { return _def_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv Line | Count | Source | 261 | 4 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv Line | Count | Source | 261 | 9 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
|
262 | 119 | ColumnStatistics column_statistics() override { |
263 | 119 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time, |
264 | 119 | _convert_time); |
265 | 119 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv Line | Count | Source | 262 | 3 | ColumnStatistics column_statistics() override { | 263 | 3 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time, | 264 | 3 | _convert_time); | 265 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv Line | Count | Source | 262 | 116 | ColumnStatistics column_statistics() override { | 263 | 116 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time, | 264 | 116 | _convert_time); | 265 | 116 | } |
|
266 | 119 | void close() override {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv Line | Count | Source | 266 | 3 | void close() override {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv Line | Count | Source | 266 | 116 | void close() override {} |
|
267 | | |
268 | 224 | void reset_filter_map_index() override { |
269 | 224 | _filter_map_index = 0; // nested |
270 | 224 | _orig_filter_map_index = 0; |
271 | 224 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv Line | Count | Source | 268 | 3 | void reset_filter_map_index() override { | 269 | 3 | _filter_map_index = 0; // nested | 270 | 3 | _orig_filter_map_index = 0; | 271 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv Line | Count | Source | 268 | 221 | void reset_filter_map_index() override { | 269 | 221 | _filter_map_index = 0; // nested | 270 | 221 | _orig_filter_map_index = 0; | 271 | 221 | } |
|
272 | | |
273 | | private: |
274 | | tparquet::ColumnChunk _chunk_meta; |
275 | | const tparquet::OffsetIndex* _offset_index = nullptr; |
276 | | std::unique_ptr<io::BufferedFileStreamReader> _stream_reader; |
277 | | std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader; |
278 | | // rep def levels buffer. |
279 | | std::vector<level_t> _rep_levels; |
280 | | std::vector<level_t> _def_levels; |
281 | | |
282 | | size_t _current_range_idx = 0; |
283 | | |
284 | | Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx, |
285 | | std::vector<uint16_t>& null_map, |
286 | 13 | std::unordered_set<size_t>& ancestor_null_indices) { |
287 | 13 | size_t has_read = level_start_idx; |
288 | 13 | null_map.emplace_back(0); |
289 | 13 | bool prev_is_null = false; |
290 | | |
291 | 26 | while (has_read < level_end_idx) { |
292 | 13 | level_t def_level = _def_levels[has_read++]; |
293 | 13 | size_t loop_read = 1; |
294 | 19 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { |
295 | 6 | has_read++; |
296 | 6 | loop_read++; |
297 | 6 | } |
298 | | |
299 | 13 | if (def_level < _field_schema->repeated_parent_def_level) { |
300 | 0 | for (size_t i = 0; i < loop_read; i++) { |
301 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); |
302 | 0 | } |
303 | 0 | continue; |
304 | 0 | } |
305 | | |
306 | 13 | bool is_null = def_level < _field_schema->definition_level; |
307 | | |
308 | 13 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { |
309 | 13 | null_map.back() += loop_read; |
310 | 13 | } else { |
311 | 0 | if (!(prev_is_null ^ is_null)) { |
312 | 0 | null_map.emplace_back(0); |
313 | 0 | } |
314 | 0 | size_t remaining = loop_read; |
315 | 0 | while (remaining > USHRT_MAX) { |
316 | 0 | null_map.emplace_back(USHRT_MAX); |
317 | 0 | null_map.emplace_back(0); |
318 | 0 | remaining -= USHRT_MAX; |
319 | 0 | } |
320 | 0 | null_map.emplace_back((u_short)remaining); |
321 | 0 | prev_is_null = is_null; |
322 | 0 | } |
323 | 13 | } |
324 | 13 | return Status::OK(); |
325 | 13 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 286 | 3 | std::unordered_set<size_t>& ancestor_null_indices) { | 287 | 3 | size_t has_read = level_start_idx; | 288 | 3 | null_map.emplace_back(0); | 289 | 3 | bool prev_is_null = false; | 290 | | | 291 | 6 | while (has_read < level_end_idx) { | 292 | 3 | level_t def_level = _def_levels[has_read++]; | 293 | 3 | size_t loop_read = 1; | 294 | 9 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 295 | 6 | has_read++; | 296 | 6 | loop_read++; | 297 | 6 | } | 298 | | | 299 | 3 | if (def_level < _field_schema->repeated_parent_def_level) { | 300 | 0 | for (size_t i = 0; i < loop_read; i++) { | 301 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 302 | 0 | } | 303 | 0 | continue; | 304 | 0 | } | 305 | | | 306 | 3 | bool is_null = def_level < _field_schema->definition_level; | 307 | | | 308 | 3 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 309 | 3 | null_map.back() += loop_read; | 310 | 3 | } else { | 311 | 0 | if (!(prev_is_null ^ is_null)) { | 312 | 0 | null_map.emplace_back(0); | 313 | 0 | } | 314 | 0 | size_t remaining = loop_read; | 315 | 0 | while (remaining > USHRT_MAX) { | 316 | 0 | null_map.emplace_back(USHRT_MAX); | 317 | 0 | null_map.emplace_back(0); | 318 | | remaining -= USHRT_MAX; | 319 | 0 | } | 320 | 0 | null_map.emplace_back((u_short)remaining); | 321 | 0 | prev_is_null = is_null; | 322 | 0 | } | 323 | 3 | } | 324 | 3 | return Status::OK(); | 325 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 286 | 10 | std::unordered_set<size_t>& ancestor_null_indices) { | 287 | 10 | size_t has_read = level_start_idx; | 288 | 10 | null_map.emplace_back(0); | 289 | 10 | bool prev_is_null = false; | 290 | | | 291 | 20 | while (has_read < level_end_idx) { | 292 | 10 | level_t def_level = _def_levels[has_read++]; | 293 | 10 | size_t loop_read = 1; | 294 | 10 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 295 | 0 | has_read++; | 296 | 0 | loop_read++; | 297 | 0 | } | 298 | | | 299 | 10 | if (def_level < _field_schema->repeated_parent_def_level) { | 300 | 0 | for (size_t i = 0; i < loop_read; i++) { | 301 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 302 | 0 | } | 303 | 0 | continue; | 304 | 0 | } | 305 | | | 306 | 10 | bool is_null = def_level < _field_schema->definition_level; | 307 | | | 308 | 10 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 309 | 10 | null_map.back() += loop_read; | 310 | 10 | } else { | 311 | 0 | if (!(prev_is_null ^ is_null)) { | 312 | 0 | null_map.emplace_back(0); | 313 | 0 | } | 314 | 0 | size_t remaining = loop_read; | 315 | 0 | while (remaining > USHRT_MAX) { | 316 | 0 | null_map.emplace_back(USHRT_MAX); | 317 | 0 | null_map.emplace_back(0); | 318 | | remaining -= USHRT_MAX; | 319 | 0 | } | 320 | 0 | null_map.emplace_back((u_short)remaining); | 321 | 0 | prev_is_null = is_null; | 322 | 0 | } | 323 | 10 | } | 324 | 10 | return Status::OK(); | 325 | 10 | } |
|
326 | | |
327 | | Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx, |
328 | | size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data, |
329 | 0 | std::unique_ptr<FilterMap>* nested_filter_map) { |
330 | 0 | nested_filter_map_data.resize(level_end_idx - level_start_idx); |
331 | 0 | for (size_t idx = level_start_idx; idx < level_end_idx; idx++) { |
332 | 0 | if (idx != level_start_idx && _rep_levels[idx] == 0) { |
333 | 0 | filter_loc++; |
334 | 0 | } |
335 | 0 | nested_filter_map_data[idx - level_start_idx] = |
336 | 0 | filter_map.filter_map_data()[filter_loc]; |
337 | 0 | } |
338 | |
|
339 | 0 | auto new_filter = std::make_unique<FilterMap>(); |
340 | 0 | RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(), |
341 | 0 | nested_filter_map_data.size(), false)); |
342 | 0 | *nested_filter_map = std::move(new_filter); |
343 | |
|
344 | 0 | return Status::OK(); |
345 | 0 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE |
346 | | |
347 | | std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr; |
348 | | std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr; |
349 | | size_t _orig_filter_map_index = 0; |
350 | | int64_t _convert_time = 0; |
351 | | |
352 | | Status _skip_values(size_t num_values); |
353 | | Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, |
354 | | FilterMap& filter_map, bool is_dict_filter); |
355 | | Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, |
356 | | size_t batch_size, size_t* read_rows, bool* eof, |
357 | | bool is_dict_filter); |
358 | | Status _try_load_dict_page(bool* loaded, bool* has_dict); |
359 | | }; |
360 | | |
361 | | class ArrayColumnReader : public ParquetColumnReader { |
362 | | ENABLE_FACTORY_CREATOR(ArrayColumnReader) |
363 | | public: |
364 | | ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
365 | | io::IOContext* io_ctx) |
366 | 2 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
367 | 2 | ~ArrayColumnReader() override { close(); } |
368 | | Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); |
369 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
370 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
371 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
372 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
373 | 0 | const std::vector<level_t>& get_rep_level() const override { |
374 | 0 | return _element_reader->get_rep_level(); |
375 | 0 | } |
376 | 0 | const std::vector<level_t>& get_def_level() const override { |
377 | 0 | return _element_reader->get_def_level(); |
378 | 0 | } |
379 | 2 | ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } |
380 | 2 | void close() override {} |
381 | | |
382 | 2 | void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } |
383 | | |
384 | | private: |
385 | | std::unique_ptr<ParquetColumnReader> _element_reader; |
386 | | }; |
387 | | |
388 | | class MapColumnReader : public ParquetColumnReader { |
389 | | ENABLE_FACTORY_CREATOR(MapColumnReader) |
390 | | public: |
391 | | MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
392 | | io::IOContext* io_ctx) |
393 | 0 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
394 | 0 | ~MapColumnReader() override { close(); } |
395 | | |
396 | | Status init(std::unique_ptr<ParquetColumnReader> key_reader, |
397 | | std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); |
398 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
399 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
400 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
401 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
402 | | |
403 | 0 | const std::vector<level_t>& get_rep_level() const override { |
404 | 0 | return _key_reader->get_rep_level(); |
405 | 0 | } |
406 | 0 | const std::vector<level_t>& get_def_level() const override { |
407 | 0 | return _key_reader->get_def_level(); |
408 | 0 | } |
409 | | |
410 | 0 | ColumnStatistics column_statistics() override { |
411 | 0 | ColumnStatistics kst = _key_reader->column_statistics(); |
412 | 0 | ColumnStatistics vst = _value_reader->column_statistics(); |
413 | 0 | kst.merge(vst); |
414 | 0 | return kst; |
415 | 0 | } |
416 | | |
417 | 0 | void close() override {} |
418 | | |
419 | 0 | void reset_filter_map_index() override { |
420 | 0 | _key_reader->reset_filter_map_index(); |
421 | 0 | _value_reader->reset_filter_map_index(); |
422 | 0 | } |
423 | | |
424 | | private: |
425 | | std::unique_ptr<ParquetColumnReader> _key_reader; |
426 | | std::unique_ptr<ParquetColumnReader> _value_reader; |
427 | | }; |
428 | | |
429 | | class StructColumnReader : public ParquetColumnReader { |
430 | | ENABLE_FACTORY_CREATOR(StructColumnReader) |
431 | | public: |
432 | | StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
433 | | io::IOContext* io_ctx) |
434 | 11 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
435 | 11 | ~StructColumnReader() override { close(); } |
436 | | |
437 | | Status init( |
438 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, |
439 | | FieldSchema* field); |
440 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
441 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
442 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
443 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
444 | | |
445 | 17 | const std::vector<level_t>& get_rep_level() const override { |
446 | 17 | if (!_read_column_names.empty()) { |
447 | | // can't use _child_readers[*_read_column_names.begin()] |
448 | | // because the operator[] of std::unordered_map is not const :( |
449 | | /* |
450 | | * Considering the issue in the `_read_nested_column` function where data may span across pages, leading |
451 | | * to missing definition and repetition levels, when filling the null_map of the struct later, it is |
452 | | * crucial to use the definition and repetition levels from the first read column, |
453 | | * that is `_read_column_names.front()`. |
454 | | */ |
455 | 17 | return _child_readers.find(_read_column_names.front())->second->get_rep_level(); |
456 | 17 | } |
457 | 0 | return _child_readers.begin()->second->get_rep_level(); |
458 | 17 | } |
459 | | |
460 | 17 | const std::vector<level_t>& get_def_level() const override { |
461 | 17 | if (!_read_column_names.empty()) { |
462 | 17 | return _child_readers.find(_read_column_names.front())->second->get_def_level(); |
463 | 17 | } |
464 | 0 | return _child_readers.begin()->second->get_def_level(); |
465 | 17 | } |
466 | | |
467 | 11 | ColumnStatistics column_statistics() override { |
468 | 11 | ColumnStatistics st; |
469 | 22 | for (const auto& column_name : _read_column_names) { |
470 | 22 | auto reader = _child_readers.find(column_name); |
471 | 22 | if (reader != _child_readers.end()) { |
472 | 22 | ColumnStatistics cst = reader->second->column_statistics(); |
473 | 22 | st.merge(cst); |
474 | 22 | } |
475 | 22 | } |
476 | 11 | return st; |
477 | 11 | } |
478 | | |
479 | 11 | void close() override {} |
480 | | |
481 | 11 | void reset_filter_map_index() override { |
482 | 26 | for (const auto& reader : _child_readers) { |
483 | 26 | reader.second->reset_filter_map_index(); |
484 | 26 | } |
485 | 11 | } |
486 | | |
487 | | private: |
488 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; |
489 | | std::vector<std::string> _read_column_names; |
490 | | //Need to use vector instead of set,see `get_rep_level()` for the reason. |
491 | | }; |
492 | | |
493 | | class VariantColumnReader : public ParquetColumnReader { |
494 | | ENABLE_FACTORY_CREATOR(VariantColumnReader) |
495 | | public: |
496 | | VariantColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
497 | | io::IOContext* io_ctx) |
498 | 0 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
499 | 0 | ~VariantColumnReader() override { close(); } |
500 | | |
501 | | Status init(io::FileReaderSPtr file, FieldSchema* field, const tparquet::RowGroup& row_group, |
502 | | size_t max_buf_size, std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
503 | | RuntimeState* state, bool in_collection, const std::set<uint64_t>& column_ids, |
504 | | const std::set<uint64_t>& filter_column_ids); |
505 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
506 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
507 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
508 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
509 | | |
510 | 0 | const std::vector<level_t>& get_rep_level() const override { |
511 | 0 | return _struct_reader->get_rep_level(); |
512 | 0 | } |
513 | 0 | const std::vector<level_t>& get_def_level() const override { |
514 | 0 | return _struct_reader->get_def_level(); |
515 | 0 | } |
516 | 0 | ColumnStatistics column_statistics() override { |
517 | 0 | auto statistics = _struct_reader->column_statistics(); |
518 | 0 | statistics.merge(_variant_statistics); |
519 | 0 | return statistics; |
520 | 0 | } |
521 | 0 | void close() override {} |
522 | | |
523 | 0 | void reset_filter_map_index() override { _struct_reader->reset_filter_map_index(); } |
524 | | |
525 | | private: |
526 | | std::unique_ptr<FieldSchema> _variant_struct_field; |
527 | | std::unique_ptr<ParquetColumnReader> _struct_reader; |
528 | | std::set<uint64_t> _column_ids; |
529 | | ColumnStatistics _variant_statistics; |
530 | | }; |
531 | | |
532 | | // A special reader that skips actual reading but provides empty data with correct structure |
533 | | // This is used when a column is not needed but its structure is required (e.g., for map keys) |
534 | | class SkipReadingReader : public ParquetColumnReader { |
535 | | public: |
536 | | SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
537 | | io::IOContext* io_ctx, FieldSchema* field_schema) |
538 | 4 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) { |
539 | 4 | _field_schema = field_schema; // Use inherited member from base class |
540 | 4 | VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: " |
541 | 0 | << _field_schema->name; |
542 | 4 | } |
543 | | |
544 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
545 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
546 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
547 | 4 | bool is_dict_filter, int64_t real_column_size = -1) override { |
548 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: " |
549 | 0 | << _field_schema->name << ", batch_size: " << batch_size; |
550 | 4 | DCHECK(real_column_size >= 0); // real_column_size for filtered column size. |
551 | | |
552 | | // Simulate reading without actually reading data |
553 | | // Fill with default/null values based on column type |
554 | 4 | MutableColumnPtr data_column = doris_column->assume_mutable(); |
555 | | |
556 | 4 | if (real_column_size > 0) { |
557 | 4 | if (doris_column->is_nullable()) { |
558 | 4 | auto* nullable_column = static_cast<ColumnNullable*>(data_column.get()); |
559 | 4 | nullable_column->insert_many_defaults(real_column_size); |
560 | 4 | } else { |
561 | | // For non-nullable columns, insert appropriate default values |
562 | 0 | for (size_t i = 0; i < real_column_size; ++i) { |
563 | 0 | data_column->insert_default(); |
564 | 0 | } |
565 | 0 | } |
566 | 4 | } |
567 | | |
568 | 4 | *read_rows = batch_size; // Indicate we "read" batch_size rows |
569 | 4 | *eof = false; // We can always provide more empty data |
570 | | |
571 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size |
572 | 0 | << " default values for field: " << _field_schema->name; |
573 | | |
574 | 4 | return Status::OK(); |
575 | 4 | } |
576 | | |
577 | | static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges, |
578 | | size_t total_rows, cctz::time_zone* ctz, |
579 | | io::IOContext* io_ctx, |
580 | 0 | FieldSchema* field_schema) { |
581 | 0 | return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx, |
582 | 0 | field_schema); |
583 | 0 | } |
584 | | |
585 | | // These methods should not be called for SkipReadingReader |
586 | | // If they are called, it indicates a logic error in the code |
587 | 0 | const std::vector<level_t>& get_rep_level() const override { |
588 | 0 | LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: " |
589 | 0 | << _field_schema->name |
590 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
591 | 0 | "column."; |
592 | 0 | __builtin_unreachable(); |
593 | 0 | } |
594 | | |
595 | 0 | const std::vector<level_t>& get_def_level() const override { |
596 | 0 | LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: " |
597 | 0 | << _field_schema->name |
598 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
599 | 0 | "column."; |
600 | 0 | __builtin_unreachable(); |
601 | 0 | } |
602 | | |
603 | | // Implement required pure virtual methods from base class |
604 | 0 | ColumnStatistics column_statistics() override { return {}; } |
605 | | |
606 | 0 | void close() override { |
607 | | // Nothing to close for skip reading |
608 | 0 | } |
609 | | |
610 | 4 | void reset_filter_map_index() override { _filter_map_index = 0; } |
611 | | }; |
612 | | |
613 | | }; // namespace doris |