be/src/format/parquet/vparquet_column_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/parquet_types.h> |
20 | | #include <glog/logging.h> |
21 | | #include <stddef.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <list> |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/data_type/data_type.h" |
32 | | #include "format/generic_reader.h" |
33 | | #include "format/parquet/parquet_column_convert.h" |
34 | | #include "format/parquet/parquet_common.h" |
35 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
36 | | #include "format/table/table_schema_change_helper.h" |
37 | | #include "io/fs/buffered_reader.h" |
38 | | #include "io/fs/file_reader_writer_fwd.h" |
39 | | #include "parquet_column_convert.h" |
40 | | #include "vparquet_column_chunk_reader.h" |
41 | | |
42 | | namespace cctz { |
43 | | class time_zone; |
44 | | } // namespace cctz |
45 | | |
46 | | namespace doris::io { |
47 | | struct IOContext; |
48 | | } // namespace doris::io |
49 | | |
50 | | namespace doris { |
51 | | struct FieldSchema; |
52 | | template <typename T> |
53 | | class ColumnStr; |
54 | | using ColumnString = ColumnStr<UInt32>; |
55 | | |
56 | | class ParquetColumnReader { |
57 | | public: |
58 | | struct ColumnStatistics { |
59 | | ColumnStatistics() |
60 | 152 | : page_index_read_calls(0), |
61 | 152 | decompress_time(0), |
62 | 152 | decompress_cnt(0), |
63 | 152 | decode_header_time(0), |
64 | 152 | decode_value_time(0), |
65 | 152 | decode_dict_time(0), |
66 | 152 | decode_level_time(0), |
67 | 152 | decode_null_map_time(0), |
68 | 152 | convert_time(0), |
69 | 152 | skip_page_header_num(0), |
70 | 152 | parse_page_header_num(0), |
71 | 152 | read_page_header_time(0), |
72 | 152 | page_read_counter(0), |
73 | 152 | page_cache_write_counter(0), |
74 | 152 | page_cache_compressed_write_counter(0), |
75 | 152 | page_cache_decompressed_write_counter(0), |
76 | 152 | page_cache_hit_counter(0), |
77 | 152 | page_cache_missing_counter(0), |
78 | 152 | page_cache_compressed_hit_counter(0), |
79 | 152 | page_cache_decompressed_hit_counter(0) {} |
80 | | |
81 | | ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time, |
82 | | int64_t convert_time_) |
83 | 119 | : page_index_read_calls(0), |
84 | 119 | decompress_time(cs.decompress_time), |
85 | 119 | decompress_cnt(cs.decompress_cnt), |
86 | 119 | decode_header_time(cs.decode_header_time), |
87 | 119 | decode_value_time(cs.decode_value_time), |
88 | 119 | decode_dict_time(cs.decode_dict_time), |
89 | 119 | decode_level_time(cs.decode_level_time), |
90 | 119 | decode_null_map_time(null_map_time), |
91 | 119 | convert_time(convert_time_), |
92 | 119 | skip_page_header_num(cs.skip_page_header_num), |
93 | 119 | parse_page_header_num(cs.parse_page_header_num), |
94 | 119 | read_page_header_time(cs.read_page_header_time), |
95 | 119 | page_read_counter(cs.page_read_counter), |
96 | 119 | page_cache_write_counter(cs.page_cache_write_counter), |
97 | 119 | page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), |
98 | 119 | page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), |
99 | 119 | page_cache_hit_counter(cs.page_cache_hit_counter), |
100 | 119 | page_cache_missing_counter(cs.page_cache_missing_counter), |
101 | 119 | page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), |
102 | 119 | page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {} |
103 | | |
104 | | int64_t page_index_read_calls; |
105 | | int64_t decompress_time; |
106 | | int64_t decompress_cnt; |
107 | | int64_t decode_header_time; |
108 | | int64_t decode_value_time; |
109 | | int64_t decode_dict_time; |
110 | | int64_t decode_level_time; |
111 | | int64_t decode_null_map_time; |
112 | | int64_t convert_time; |
113 | | int64_t skip_page_header_num; |
114 | | int64_t parse_page_header_num; |
115 | | int64_t read_page_header_time; |
116 | | int64_t page_read_counter; |
117 | | int64_t page_cache_write_counter; |
118 | | int64_t page_cache_compressed_write_counter; |
119 | | int64_t page_cache_decompressed_write_counter; |
120 | | int64_t page_cache_hit_counter; |
121 | | int64_t page_cache_missing_counter; |
122 | | int64_t page_cache_compressed_hit_counter; |
123 | | int64_t page_cache_decompressed_hit_counter; |
124 | | |
125 | 168 | void merge(ColumnStatistics& col_statistics) { |
126 | 168 | page_index_read_calls += col_statistics.page_index_read_calls; |
127 | 168 | decompress_time += col_statistics.decompress_time; |
128 | 168 | decompress_cnt += col_statistics.decompress_cnt; |
129 | 168 | decode_header_time += col_statistics.decode_header_time; |
130 | 168 | decode_value_time += col_statistics.decode_value_time; |
131 | 168 | decode_dict_time += col_statistics.decode_dict_time; |
132 | 168 | decode_level_time += col_statistics.decode_level_time; |
133 | 168 | decode_null_map_time += col_statistics.decode_null_map_time; |
134 | 168 | convert_time += col_statistics.convert_time; |
135 | 168 | skip_page_header_num += col_statistics.skip_page_header_num; |
136 | 168 | parse_page_header_num += col_statistics.parse_page_header_num; |
137 | 168 | read_page_header_time += col_statistics.read_page_header_time; |
138 | 168 | page_read_counter += col_statistics.page_read_counter; |
139 | 168 | page_cache_write_counter += col_statistics.page_cache_write_counter; |
140 | 168 | page_cache_compressed_write_counter += |
141 | 168 | col_statistics.page_cache_compressed_write_counter; |
142 | 168 | page_cache_decompressed_write_counter += |
143 | 168 | col_statistics.page_cache_decompressed_write_counter; |
144 | 168 | page_cache_hit_counter += col_statistics.page_cache_hit_counter; |
145 | 168 | page_cache_missing_counter += col_statistics.page_cache_missing_counter; |
146 | 168 | page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; |
147 | 168 | page_cache_decompressed_hit_counter += |
148 | 168 | col_statistics.page_cache_decompressed_hit_counter; |
149 | 168 | } |
150 | | }; |
151 | | |
152 | | ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
153 | | io::IOContext* io_ctx) |
154 | 136 | : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {} |
155 | 136 | virtual ~ParquetColumnReader() = default; |
156 | | virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
157 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
158 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, |
159 | | bool* eof, bool is_dict_filter, |
160 | | int64_t real_column_size = -1) = 0; |
161 | | |
162 | 0 | virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { |
163 | 0 | return Status::NotSupported("read_dict_values_to_column is not supported"); |
164 | 0 | } |
165 | | |
166 | | virtual Result<MutableColumnPtr> convert_dict_column_to_string_column( |
167 | 0 | const ColumnInt32* dict_column) { |
168 | 0 | throw Exception( |
169 | 0 | Status::FatalError("Method convert_dict_column_to_string_column is not supported")); |
170 | 0 | } |
171 | | |
172 | | static Status create(io::FileReaderSPtr file, FieldSchema* field, |
173 | | const tparquet::RowGroup& row_group, const RowRanges& row_ranges, |
174 | | const cctz::time_zone* ctz, io::IOContext* io_ctx, |
175 | | std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size, |
176 | | std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
177 | | RuntimeState* state, bool in_collection = false, |
178 | | const std::set<uint64_t>& column_ids = {}, |
179 | | const std::set<uint64_t>& filter_column_ids = {}); |
180 | | virtual const std::vector<level_t>& get_rep_level() const = 0; |
181 | | virtual const std::vector<level_t>& get_def_level() const = 0; |
182 | | virtual ColumnStatistics column_statistics() = 0; |
183 | | virtual void close() = 0; |
184 | | |
185 | | virtual void reset_filter_map_index() = 0; |
186 | | |
187 | 0 | FieldSchema* get_field_schema() const { return _field_schema; } |
188 | 28 | void set_column_in_nested() { _in_nested = true; } |
189 | | |
190 | | protected: |
191 | | void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; |
192 | | |
193 | | FieldSchema* _field_schema = nullptr; |
194 | | const RowRanges& _row_ranges; |
195 | | size_t _total_rows = 0; |
196 | | const cctz::time_zone* _ctz = nullptr; |
197 | | io::IOContext* _io_ctx = nullptr; |
198 | | int64_t _current_row_index = 0; |
199 | | int64_t _decode_null_map_time = 0; |
200 | | |
201 | | size_t _filter_map_index = 0; |
202 | | std::set<uint64_t> _filter_column_ids; |
203 | | |
204 | | // _in_nested: column in struct/map/array |
205 | | // IN_COLLECTION : column in map/array |
206 | | bool _in_nested = false; |
207 | | }; |
208 | | |
209 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
210 | | class ScalarColumnReader : public ParquetColumnReader { |
211 | | ENABLE_FACTORY_CREATOR(ScalarColumnReader) |
212 | | public: |
213 | | ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows, |
214 | | const tparquet::ColumnChunk& chunk_meta, |
215 | | const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz, |
216 | | io::IOContext* io_ctx) |
217 | 119 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), |
218 | 119 | _chunk_meta(chunk_meta), |
219 | 119 | _offset_index(offset_index) {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 217 | 3 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 218 | 3 | _chunk_meta(chunk_meta), | 219 | 3 | _offset_index(offset_index) {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 217 | 116 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 218 | 116 | _chunk_meta(chunk_meta), | 219 | 116 | _offset_index(offset_index) {} |
|
220 | 119 | ~ScalarColumnReader() override { close(); }Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev Line | Count | Source | 220 | 3 | ~ScalarColumnReader() override { close(); } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev Line | Count | Source | 220 | 116 | ~ScalarColumnReader() override { close(); } |
|
221 | | Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, |
222 | | RuntimeState* state); |
223 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
224 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
225 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
226 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
227 | | Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; |
228 | | Result<MutableColumnPtr> convert_dict_column_to_string_column( |
229 | | const ColumnInt32* dict_column) override; |
230 | 13 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv Line | Count | Source | 230 | 4 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv Line | Count | Source | 230 | 9 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
|
231 | 13 | const std::vector<level_t>& get_def_level() const override { return _def_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv Line | Count | Source | 231 | 4 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv Line | Count | Source | 231 | 9 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
|
232 | 119 | ColumnStatistics column_statistics() override { |
233 | 119 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time, |
234 | 119 | _convert_time); |
235 | 119 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv Line | Count | Source | 232 | 3 | ColumnStatistics column_statistics() override { | 233 | 3 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time, | 234 | 3 | _convert_time); | 235 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv Line | Count | Source | 232 | 116 | ColumnStatistics column_statistics() override { | 233 | 116 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time, | 234 | 116 | _convert_time); | 235 | 116 | } |
|
236 | 119 | void close() override {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv Line | Count | Source | 236 | 3 | void close() override {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv Line | Count | Source | 236 | 116 | void close() override {} |
|
237 | | |
238 | 224 | void reset_filter_map_index() override { |
239 | 224 | _filter_map_index = 0; // nested |
240 | 224 | _orig_filter_map_index = 0; |
241 | 224 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv Line | Count | Source | 238 | 3 | void reset_filter_map_index() override { | 239 | 3 | _filter_map_index = 0; // nested | 240 | 3 | _orig_filter_map_index = 0; | 241 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv Line | Count | Source | 238 | 221 | void reset_filter_map_index() override { | 239 | 221 | _filter_map_index = 0; // nested | 240 | 221 | _orig_filter_map_index = 0; | 241 | 221 | } |
|
242 | | |
243 | | private: |
244 | | tparquet::ColumnChunk _chunk_meta; |
245 | | const tparquet::OffsetIndex* _offset_index = nullptr; |
246 | | std::unique_ptr<io::BufferedFileStreamReader> _stream_reader; |
247 | | std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader; |
248 | | // rep def levels buffer. |
249 | | std::vector<level_t> _rep_levels; |
250 | | std::vector<level_t> _def_levels; |
251 | | |
252 | | size_t _current_range_idx = 0; |
253 | | |
254 | | Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx, |
255 | | std::vector<uint16_t>& null_map, |
256 | 13 | std::unordered_set<size_t>& ancestor_null_indices) { |
257 | 13 | size_t has_read = level_start_idx; |
258 | 13 | null_map.emplace_back(0); |
259 | 13 | bool prev_is_null = false; |
260 | | |
261 | 26 | while (has_read < level_end_idx) { |
262 | 13 | level_t def_level = _def_levels[has_read++]; |
263 | 13 | size_t loop_read = 1; |
264 | 19 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { |
265 | 6 | has_read++; |
266 | 6 | loop_read++; |
267 | 6 | } |
268 | | |
269 | 13 | if (def_level < _field_schema->repeated_parent_def_level) { |
270 | 0 | for (size_t i = 0; i < loop_read; i++) { |
271 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); |
272 | 0 | } |
273 | 0 | continue; |
274 | 0 | } |
275 | | |
276 | 13 | bool is_null = def_level < _field_schema->definition_level; |
277 | | |
278 | 13 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { |
279 | 13 | null_map.back() += loop_read; |
280 | 13 | } else { |
281 | 0 | if (!(prev_is_null ^ is_null)) { |
282 | 0 | null_map.emplace_back(0); |
283 | 0 | } |
284 | 0 | size_t remaining = loop_read; |
285 | 0 | while (remaining > USHRT_MAX) { |
286 | 0 | null_map.emplace_back(USHRT_MAX); |
287 | 0 | null_map.emplace_back(0); |
288 | 0 | remaining -= USHRT_MAX; |
289 | 0 | } |
290 | 0 | null_map.emplace_back((u_short)remaining); |
291 | 0 | prev_is_null = is_null; |
292 | 0 | } |
293 | 13 | } |
294 | 13 | return Status::OK(); |
295 | 13 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 256 | 3 | std::unordered_set<size_t>& ancestor_null_indices) { | 257 | 3 | size_t has_read = level_start_idx; | 258 | 3 | null_map.emplace_back(0); | 259 | 3 | bool prev_is_null = false; | 260 | | | 261 | 6 | while (has_read < level_end_idx) { | 262 | 3 | level_t def_level = _def_levels[has_read++]; | 263 | 3 | size_t loop_read = 1; | 264 | 9 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 265 | 6 | has_read++; | 266 | 6 | loop_read++; | 267 | 6 | } | 268 | | | 269 | 3 | if (def_level < _field_schema->repeated_parent_def_level) { | 270 | 0 | for (size_t i = 0; i < loop_read; i++) { | 271 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 272 | 0 | } | 273 | 0 | continue; | 274 | 0 | } | 275 | | | 276 | 3 | bool is_null = def_level < _field_schema->definition_level; | 277 | | | 278 | 3 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 279 | 3 | null_map.back() += loop_read; | 280 | 3 | } else { | 281 | 0 | if (!(prev_is_null ^ is_null)) { | 282 | 0 | null_map.emplace_back(0); | 283 | 0 | } | 284 | 0 | size_t remaining = loop_read; | 285 | 0 | while (remaining > USHRT_MAX) { | 286 | 0 | null_map.emplace_back(USHRT_MAX); | 287 | 0 | null_map.emplace_back(0); | 288 | | remaining -= USHRT_MAX; | 289 | 0 | } | 290 | 0 | null_map.emplace_back((u_short)remaining); | 291 | 0 | prev_is_null = is_null; | 292 | 0 | } | 293 | 3 | } | 294 | 3 | return Status::OK(); | 295 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 256 | 10 | std::unordered_set<size_t>& ancestor_null_indices) { | 257 | 10 | size_t has_read = level_start_idx; | 258 | 10 | null_map.emplace_back(0); | 259 | 10 | bool prev_is_null = false; | 260 | | | 261 | 20 | while (has_read < level_end_idx) { | 262 | 10 | level_t def_level = _def_levels[has_read++]; | 263 | 10 | size_t loop_read = 1; | 264 | 10 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 265 | 0 | has_read++; | 266 | 0 | loop_read++; | 267 | 0 | } | 268 | | | 269 | 10 | if (def_level < _field_schema->repeated_parent_def_level) { | 270 | 0 | for (size_t i = 0; i < loop_read; i++) { | 271 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 272 | 0 | } | 273 | 0 | continue; | 274 | 0 | } | 275 | | | 276 | 10 | bool is_null = def_level < _field_schema->definition_level; | 277 | | | 278 | 10 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 279 | 10 | null_map.back() += loop_read; | 280 | 10 | } else { | 281 | 0 | if (!(prev_is_null ^ is_null)) { | 282 | 0 | null_map.emplace_back(0); | 283 | 0 | } | 284 | 0 | size_t remaining = loop_read; | 285 | 0 | while (remaining > USHRT_MAX) { | 286 | 0 | null_map.emplace_back(USHRT_MAX); | 287 | 0 | null_map.emplace_back(0); | 288 | | remaining -= USHRT_MAX; | 289 | 0 | } | 290 | 0 | null_map.emplace_back((u_short)remaining); | 291 | 0 | prev_is_null = is_null; | 292 | 0 | } | 293 | 10 | } | 294 | 10 | return Status::OK(); | 295 | 10 | } |
|
296 | | |
297 | | Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx, |
298 | | size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data, |
299 | 0 | std::unique_ptr<FilterMap>* nested_filter_map) { |
300 | 0 | nested_filter_map_data.resize(level_end_idx - level_start_idx); |
301 | 0 | for (size_t idx = level_start_idx; idx < level_end_idx; idx++) { |
302 | 0 | if (idx != level_start_idx && _rep_levels[idx] == 0) { |
303 | 0 | filter_loc++; |
304 | 0 | } |
305 | 0 | nested_filter_map_data[idx - level_start_idx] = |
306 | 0 | filter_map.filter_map_data()[filter_loc]; |
307 | 0 | } |
308 | |
|
309 | 0 | auto new_filter = std::make_unique<FilterMap>(); |
310 | 0 | RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(), |
311 | 0 | nested_filter_map_data.size(), false)); |
312 | 0 | *nested_filter_map = std::move(new_filter); |
313 | |
|
314 | 0 | return Status::OK(); |
315 | 0 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE |
316 | | |
317 | | std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr; |
318 | | std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr; |
319 | | size_t _orig_filter_map_index = 0; |
320 | | int64_t _convert_time = 0; |
321 | | |
322 | | Status _skip_values(size_t num_values); |
323 | | Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, |
324 | | FilterMap& filter_map, bool is_dict_filter); |
325 | | Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, |
326 | | size_t batch_size, size_t* read_rows, bool* eof, |
327 | | bool is_dict_filter); |
328 | | Status _try_load_dict_page(bool* loaded, bool* has_dict); |
329 | | }; |
330 | | |
331 | | class ArrayColumnReader : public ParquetColumnReader { |
332 | | ENABLE_FACTORY_CREATOR(ArrayColumnReader) |
333 | | public: |
334 | | ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
335 | | io::IOContext* io_ctx) |
336 | 2 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
337 | 2 | ~ArrayColumnReader() override { close(); } |
338 | | Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); |
339 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
340 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
341 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
342 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
343 | 0 | const std::vector<level_t>& get_rep_level() const override { |
344 | 0 | return _element_reader->get_rep_level(); |
345 | 0 | } |
346 | 0 | const std::vector<level_t>& get_def_level() const override { |
347 | 0 | return _element_reader->get_def_level(); |
348 | 0 | } |
349 | 2 | ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } |
350 | 2 | void close() override {} |
351 | | |
352 | 2 | void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } |
353 | | |
354 | | private: |
355 | | std::unique_ptr<ParquetColumnReader> _element_reader; |
356 | | }; |
357 | | |
358 | | class MapColumnReader : public ParquetColumnReader { |
359 | | ENABLE_FACTORY_CREATOR(MapColumnReader) |
360 | | public: |
361 | | MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
362 | | io::IOContext* io_ctx) |
363 | 0 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
364 | 0 | ~MapColumnReader() override { close(); } |
365 | | |
366 | | Status init(std::unique_ptr<ParquetColumnReader> key_reader, |
367 | | std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); |
368 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
369 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
370 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
371 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
372 | | |
373 | 0 | const std::vector<level_t>& get_rep_level() const override { |
374 | 0 | return _key_reader->get_rep_level(); |
375 | 0 | } |
376 | 0 | const std::vector<level_t>& get_def_level() const override { |
377 | 0 | return _key_reader->get_def_level(); |
378 | 0 | } |
379 | | |
380 | 0 | ColumnStatistics column_statistics() override { |
381 | 0 | ColumnStatistics kst = _key_reader->column_statistics(); |
382 | 0 | ColumnStatistics vst = _value_reader->column_statistics(); |
383 | 0 | kst.merge(vst); |
384 | 0 | return kst; |
385 | 0 | } |
386 | | |
387 | 0 | void close() override {} |
388 | | |
389 | 0 | void reset_filter_map_index() override { |
390 | 0 | _key_reader->reset_filter_map_index(); |
391 | 0 | _value_reader->reset_filter_map_index(); |
392 | 0 | } |
393 | | |
394 | | private: |
395 | | std::unique_ptr<ParquetColumnReader> _key_reader; |
396 | | std::unique_ptr<ParquetColumnReader> _value_reader; |
397 | | }; |
398 | | |
399 | | class StructColumnReader : public ParquetColumnReader { |
400 | | ENABLE_FACTORY_CREATOR(StructColumnReader) |
401 | | public: |
402 | | StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
403 | | io::IOContext* io_ctx) |
404 | 11 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
405 | 11 | ~StructColumnReader() override { close(); } |
406 | | |
407 | | Status init( |
408 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, |
409 | | FieldSchema* field); |
410 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
411 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
412 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
413 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
414 | | |
415 | 17 | const std::vector<level_t>& get_rep_level() const override { |
416 | 17 | if (!_read_column_names.empty()) { |
417 | | // can't use _child_readers[*_read_column_names.begin()] |
418 | | // because the operator[] of std::unordered_map is not const :( |
419 | | /* |
420 | | * Considering the issue in the `_read_nested_column` function where data may span across pages, leading |
421 | | * to missing definition and repetition levels, when filling the null_map of the struct later, it is |
422 | | * crucial to use the definition and repetition levels from the first read column, |
423 | | * that is `_read_column_names.front()`. |
424 | | */ |
425 | 17 | return _child_readers.find(_read_column_names.front())->second->get_rep_level(); |
426 | 17 | } |
427 | 0 | return _child_readers.begin()->second->get_rep_level(); |
428 | 17 | } |
429 | | |
430 | 17 | const std::vector<level_t>& get_def_level() const override { |
431 | 17 | if (!_read_column_names.empty()) { |
432 | 17 | return _child_readers.find(_read_column_names.front())->second->get_def_level(); |
433 | 17 | } |
434 | 0 | return _child_readers.begin()->second->get_def_level(); |
435 | 17 | } |
436 | | |
437 | 11 | ColumnStatistics column_statistics() override { |
438 | 11 | ColumnStatistics st; |
439 | 22 | for (const auto& column_name : _read_column_names) { |
440 | 22 | auto reader = _child_readers.find(column_name); |
441 | 22 | if (reader != _child_readers.end()) { |
442 | 22 | ColumnStatistics cst = reader->second->column_statistics(); |
443 | 22 | st.merge(cst); |
444 | 22 | } |
445 | 22 | } |
446 | 11 | return st; |
447 | 11 | } |
448 | | |
449 | 11 | void close() override {} |
450 | | |
451 | 11 | void reset_filter_map_index() override { |
452 | 26 | for (const auto& reader : _child_readers) { |
453 | 26 | reader.second->reset_filter_map_index(); |
454 | 26 | } |
455 | 11 | } |
456 | | |
457 | | private: |
458 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; |
459 | | std::vector<std::string> _read_column_names; |
460 | | //Need to use vector instead of set,see `get_rep_level()` for the reason. |
461 | | }; |
462 | | |
463 | | // A special reader that skips actual reading but provides empty data with correct structure |
464 | | // This is used when a column is not needed but its structure is required (e.g., for map keys) |
465 | | class SkipReadingReader : public ParquetColumnReader { |
466 | | public: |
467 | | SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
468 | | io::IOContext* io_ctx, FieldSchema* field_schema) |
469 | 4 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) { |
470 | 4 | _field_schema = field_schema; // Use inherited member from base class |
471 | 4 | VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: " |
472 | 0 | << _field_schema->name; |
473 | 4 | } |
474 | | |
475 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
476 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
477 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
478 | 4 | bool is_dict_filter, int64_t real_column_size = -1) override { |
479 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: " |
480 | 0 | << _field_schema->name << ", batch_size: " << batch_size; |
481 | 4 | DCHECK(real_column_size >= 0); // real_column_size for filtered column size. |
482 | | |
483 | | // Simulate reading without actually reading data |
484 | | // Fill with default/null values based on column type |
485 | 4 | MutableColumnPtr data_column = doris_column->assume_mutable(); |
486 | | |
487 | 4 | if (real_column_size > 0) { |
488 | 4 | if (doris_column->is_nullable()) { |
489 | 4 | auto* nullable_column = static_cast<ColumnNullable*>(data_column.get()); |
490 | 4 | nullable_column->insert_many_defaults(real_column_size); |
491 | 4 | } else { |
492 | | // For non-nullable columns, insert appropriate default values |
493 | 0 | for (size_t i = 0; i < real_column_size; ++i) { |
494 | 0 | data_column->insert_default(); |
495 | 0 | } |
496 | 0 | } |
497 | 4 | } |
498 | | |
499 | 4 | *read_rows = batch_size; // Indicate we "read" batch_size rows |
500 | 4 | *eof = false; // We can always provide more empty data |
501 | | |
502 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size |
503 | 0 | << " default values for field: " << _field_schema->name; |
504 | | |
505 | 4 | return Status::OK(); |
506 | 4 | } |
507 | | |
508 | | static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges, |
509 | | size_t total_rows, cctz::time_zone* ctz, |
510 | | io::IOContext* io_ctx, |
511 | 0 | FieldSchema* field_schema) { |
512 | 0 | return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx, |
513 | 0 | field_schema); |
514 | 0 | } |
515 | | |
516 | | // These methods should not be called for SkipReadingReader |
517 | | // If they are called, it indicates a logic error in the code |
518 | 0 | const std::vector<level_t>& get_rep_level() const override { |
519 | 0 | LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: " |
520 | 0 | << _field_schema->name |
521 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
522 | 0 | "column."; |
523 | 0 | __builtin_unreachable(); |
524 | 0 | } |
525 | | |
526 | 0 | const std::vector<level_t>& get_def_level() const override { |
527 | 0 | LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: " |
528 | 0 | << _field_schema->name |
529 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
530 | 0 | "column."; |
531 | 0 | __builtin_unreachable(); |
532 | 0 | } |
533 | | |
534 | | // Implement required pure virtual methods from base class |
535 | 0 | ColumnStatistics column_statistics() override { |
536 | 0 | return ColumnStatistics(); // Return empty statistics |
537 | 0 | } |
538 | | |
539 | 0 | void close() override { |
540 | | // Nothing to close for skip reading |
541 | 0 | } |
542 | | |
543 | 4 | void reset_filter_map_index() override { _filter_map_index = 0; } |
544 | | }; |
545 | | |
546 | | }; // namespace doris |