be/src/format/parquet/vparquet_column_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/parquet_types.h> |
20 | | #include <glog/logging.h> |
21 | | #include <stddef.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <list> |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/data_type/data_type.h" |
32 | | #include "format/generic_reader.h" |
33 | | #include "format/parquet/parquet_column_convert.h" |
34 | | #include "format/parquet/parquet_common.h" |
35 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
36 | | #include "format/table/table_schema_change_helper.h" |
37 | | #include "io/fs/buffered_reader.h" |
38 | | #include "io/fs/file_reader_writer_fwd.h" |
39 | | #include "parquet_column_convert.h" |
40 | | #include "vparquet_column_chunk_reader.h" |
41 | | |
42 | | namespace cctz { |
43 | | class time_zone; |
44 | | } // namespace cctz |
45 | | |
46 | | namespace doris::io { |
47 | | struct IOContext; |
48 | | } // namespace doris::io |
49 | | |
50 | | namespace doris { |
51 | | struct FieldSchema; |
52 | | template <typename T> |
53 | | class ColumnStr; |
54 | | using ColumnString = ColumnStr<UInt32>; |
55 | | |
56 | | class ParquetColumnReader { |
57 | | public: |
58 | | struct ColumnStatistics { |
59 | | ColumnStatistics() |
60 | 152 | : page_index_read_calls(0), |
61 | 152 | decompress_time(0), |
62 | 152 | decompress_cnt(0), |
63 | 152 | decode_header_time(0), |
64 | 152 | decode_value_time(0), |
65 | 152 | decode_dict_time(0), |
66 | 152 | decode_level_time(0), |
67 | 152 | decode_null_map_time(0), |
68 | 152 | skip_page_header_num(0), |
69 | 152 | parse_page_header_num(0), |
70 | 152 | read_page_header_time(0), |
71 | 152 | page_read_counter(0), |
72 | 152 | page_cache_write_counter(0), |
73 | 152 | page_cache_compressed_write_counter(0), |
74 | 152 | page_cache_decompressed_write_counter(0), |
75 | 152 | page_cache_hit_counter(0), |
76 | 152 | page_cache_missing_counter(0), |
77 | 152 | page_cache_compressed_hit_counter(0), |
78 | 152 | page_cache_decompressed_hit_counter(0) {} |
79 | | |
80 | | ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) |
81 | 119 | : page_index_read_calls(0), |
82 | 119 | decompress_time(cs.decompress_time), |
83 | 119 | decompress_cnt(cs.decompress_cnt), |
84 | 119 | decode_header_time(cs.decode_header_time), |
85 | 119 | decode_value_time(cs.decode_value_time), |
86 | 119 | decode_dict_time(cs.decode_dict_time), |
87 | 119 | decode_level_time(cs.decode_level_time), |
88 | 119 | decode_null_map_time(null_map_time), |
89 | 119 | skip_page_header_num(cs.skip_page_header_num), |
90 | 119 | parse_page_header_num(cs.parse_page_header_num), |
91 | 119 | read_page_header_time(cs.read_page_header_time), |
92 | 119 | page_read_counter(cs.page_read_counter), |
93 | 119 | page_cache_write_counter(cs.page_cache_write_counter), |
94 | 119 | page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), |
95 | 119 | page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), |
96 | 119 | page_cache_hit_counter(cs.page_cache_hit_counter), |
97 | 119 | page_cache_missing_counter(cs.page_cache_missing_counter), |
98 | 119 | page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), |
99 | 119 | page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {} |
100 | | |
101 | | int64_t page_index_read_calls; |
102 | | int64_t decompress_time; |
103 | | int64_t decompress_cnt; |
104 | | int64_t decode_header_time; |
105 | | int64_t decode_value_time; |
106 | | int64_t decode_dict_time; |
107 | | int64_t decode_level_time; |
108 | | int64_t decode_null_map_time; |
109 | | int64_t skip_page_header_num; |
110 | | int64_t parse_page_header_num; |
111 | | int64_t read_page_header_time; |
112 | | int64_t page_read_counter; |
113 | | int64_t page_cache_write_counter; |
114 | | int64_t page_cache_compressed_write_counter; |
115 | | int64_t page_cache_decompressed_write_counter; |
116 | | int64_t page_cache_hit_counter; |
117 | | int64_t page_cache_missing_counter; |
118 | | int64_t page_cache_compressed_hit_counter; |
119 | | int64_t page_cache_decompressed_hit_counter; |
120 | | |
121 | 168 | void merge(ColumnStatistics& col_statistics) { |
122 | 168 | page_index_read_calls += col_statistics.page_index_read_calls; |
123 | 168 | decompress_time += col_statistics.decompress_time; |
124 | 168 | decompress_cnt += col_statistics.decompress_cnt; |
125 | 168 | decode_header_time += col_statistics.decode_header_time; |
126 | 168 | decode_value_time += col_statistics.decode_value_time; |
127 | 168 | decode_dict_time += col_statistics.decode_dict_time; |
128 | 168 | decode_level_time += col_statistics.decode_level_time; |
129 | 168 | decode_null_map_time += col_statistics.decode_null_map_time; |
130 | 168 | skip_page_header_num += col_statistics.skip_page_header_num; |
131 | 168 | parse_page_header_num += col_statistics.parse_page_header_num; |
132 | 168 | read_page_header_time += col_statistics.read_page_header_time; |
133 | 168 | page_read_counter += col_statistics.page_read_counter; |
134 | 168 | page_cache_write_counter += col_statistics.page_cache_write_counter; |
135 | 168 | page_cache_compressed_write_counter += |
136 | 168 | col_statistics.page_cache_compressed_write_counter; |
137 | 168 | page_cache_decompressed_write_counter += |
138 | 168 | col_statistics.page_cache_decompressed_write_counter; |
139 | 168 | page_cache_hit_counter += col_statistics.page_cache_hit_counter; |
140 | 168 | page_cache_missing_counter += col_statistics.page_cache_missing_counter; |
141 | 168 | page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; |
142 | 168 | page_cache_decompressed_hit_counter += |
143 | 168 | col_statistics.page_cache_decompressed_hit_counter; |
144 | 168 | } |
145 | | }; |
146 | | |
147 | | ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
148 | | io::IOContext* io_ctx) |
149 | 136 | : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {} |
150 | 136 | virtual ~ParquetColumnReader() = default; |
151 | | virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
152 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
153 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, |
154 | | bool* eof, bool is_dict_filter, |
155 | | int64_t real_column_size = -1) = 0; |
156 | | |
157 | 0 | virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { |
158 | 0 | return Status::NotSupported("read_dict_values_to_column is not supported"); |
159 | 0 | } |
160 | | |
161 | | virtual Result<MutableColumnPtr> convert_dict_column_to_string_column( |
162 | 0 | const ColumnInt32* dict_column) { |
163 | 0 | throw Exception( |
164 | 0 | Status::FatalError("Method convert_dict_column_to_string_column is not supported")); |
165 | 0 | } |
166 | | |
167 | | static Status create(io::FileReaderSPtr file, FieldSchema* field, |
168 | | const tparquet::RowGroup& row_group, const RowRanges& row_ranges, |
169 | | const cctz::time_zone* ctz, io::IOContext* io_ctx, |
170 | | std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size, |
171 | | std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
172 | | RuntimeState* state, bool in_collection = false, |
173 | | const std::set<uint64_t>& column_ids = {}, |
174 | | const std::set<uint64_t>& filter_column_ids = {}); |
175 | | virtual const std::vector<level_t>& get_rep_level() const = 0; |
176 | | virtual const std::vector<level_t>& get_def_level() const = 0; |
177 | | virtual ColumnStatistics column_statistics() = 0; |
178 | | virtual void close() = 0; |
179 | | |
180 | | virtual void reset_filter_map_index() = 0; |
181 | | |
182 | 0 | FieldSchema* get_field_schema() const { return _field_schema; } |
183 | 28 | void set_column_in_nested() { _in_nested = true; } |
184 | | |
185 | | protected: |
186 | | void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; |
187 | | |
188 | | FieldSchema* _field_schema = nullptr; |
189 | | const RowRanges& _row_ranges; |
190 | | size_t _total_rows = 0; |
191 | | const cctz::time_zone* _ctz = nullptr; |
192 | | io::IOContext* _io_ctx = nullptr; |
193 | | int64_t _current_row_index = 0; |
194 | | int64_t _decode_null_map_time = 0; |
195 | | |
196 | | size_t _filter_map_index = 0; |
197 | | std::set<uint64_t> _filter_column_ids; |
198 | | |
199 | | // _in_nested: column in struct/map/array |
200 | | // IN_COLLECTION : column in map/array |
201 | | bool _in_nested = false; |
202 | | }; |
203 | | |
204 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
205 | | class ScalarColumnReader : public ParquetColumnReader { |
206 | | ENABLE_FACTORY_CREATOR(ScalarColumnReader) |
207 | | public: |
208 | | ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows, |
209 | | const tparquet::ColumnChunk& chunk_meta, |
210 | | const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz, |
211 | | io::IOContext* io_ctx) |
212 | 119 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), |
213 | 119 | _chunk_meta(chunk_meta), |
214 | 119 | _offset_index(offset_index) {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 212 | 3 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 213 | 3 | _chunk_meta(chunk_meta), | 214 | 3 | _offset_index(offset_index) {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 212 | 116 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 213 | 116 | _chunk_meta(chunk_meta), | 214 | 116 | _offset_index(offset_index) {} |
|
215 | 119 | ~ScalarColumnReader() override { close(); }Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev Line | Count | Source | 215 | 3 | ~ScalarColumnReader() override { close(); } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev Line | Count | Source | 215 | 116 | ~ScalarColumnReader() override { close(); } |
|
216 | | Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, |
217 | | RuntimeState* state); |
218 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
219 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
220 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
221 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
222 | | Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; |
223 | | Result<MutableColumnPtr> convert_dict_column_to_string_column( |
224 | | const ColumnInt32* dict_column) override; |
225 | 13 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv Line | Count | Source | 225 | 4 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv Line | Count | Source | 225 | 9 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
|
226 | 13 | const std::vector<level_t>& get_def_level() const override { return _def_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv Line | Count | Source | 226 | 4 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv Line | Count | Source | 226 | 9 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
|
227 | 119 | ColumnStatistics column_statistics() override { |
228 | 119 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); |
229 | 119 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv Line | Count | Source | 227 | 3 | ColumnStatistics column_statistics() override { | 228 | 3 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); | 229 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv Line | Count | Source | 227 | 116 | ColumnStatistics column_statistics() override { | 228 | 116 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); | 229 | 116 | } |
|
230 | 119 | void close() override {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv Line | Count | Source | 230 | 3 | void close() override {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv Line | Count | Source | 230 | 116 | void close() override {} |
|
231 | | |
232 | 136 | void reset_filter_map_index() override { |
233 | 136 | _filter_map_index = 0; // nested |
234 | 136 | _orig_filter_map_index = 0; |
235 | 136 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv Line | Count | Source | 232 | 3 | void reset_filter_map_index() override { | 233 | 3 | _filter_map_index = 0; // nested | 234 | 3 | _orig_filter_map_index = 0; | 235 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv Line | Count | Source | 232 | 133 | void reset_filter_map_index() override { | 233 | 133 | _filter_map_index = 0; // nested | 234 | 133 | _orig_filter_map_index = 0; | 235 | 133 | } |
|
236 | | |
237 | | private: |
238 | | tparquet::ColumnChunk _chunk_meta; |
239 | | const tparquet::OffsetIndex* _offset_index = nullptr; |
240 | | std::unique_ptr<io::BufferedFileStreamReader> _stream_reader; |
241 | | std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader; |
242 | | // rep def levels buffer. |
243 | | std::vector<level_t> _rep_levels; |
244 | | std::vector<level_t> _def_levels; |
245 | | |
246 | | size_t _current_range_idx = 0; |
247 | | |
248 | | Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx, |
249 | | std::vector<uint16_t>& null_map, |
250 | 13 | std::unordered_set<size_t>& ancestor_null_indices) { |
251 | 13 | size_t has_read = level_start_idx; |
252 | 13 | null_map.emplace_back(0); |
253 | 13 | bool prev_is_null = false; |
254 | | |
255 | 26 | while (has_read < level_end_idx) { |
256 | 13 | level_t def_level = _def_levels[has_read++]; |
257 | 13 | size_t loop_read = 1; |
258 | 19 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { |
259 | 6 | has_read++; |
260 | 6 | loop_read++; |
261 | 6 | } |
262 | | |
263 | 13 | if (def_level < _field_schema->repeated_parent_def_level) { |
264 | 0 | for (size_t i = 0; i < loop_read; i++) { |
265 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); |
266 | 0 | } |
267 | 0 | continue; |
268 | 0 | } |
269 | | |
270 | 13 | bool is_null = def_level < _field_schema->definition_level; |
271 | | |
272 | 13 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { |
273 | 13 | null_map.back() += loop_read; |
274 | 13 | } else { |
275 | 0 | if (!(prev_is_null ^ is_null)) { |
276 | 0 | null_map.emplace_back(0); |
277 | 0 | } |
278 | 0 | size_t remaining = loop_read; |
279 | 0 | while (remaining > USHRT_MAX) { |
280 | 0 | null_map.emplace_back(USHRT_MAX); |
281 | 0 | null_map.emplace_back(0); |
282 | 0 | remaining -= USHRT_MAX; |
283 | 0 | } |
284 | 0 | null_map.emplace_back((u_short)remaining); |
285 | 0 | prev_is_null = is_null; |
286 | 0 | } |
287 | 13 | } |
288 | 13 | return Status::OK(); |
289 | 13 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 250 | 3 | std::unordered_set<size_t>& ancestor_null_indices) { | 251 | 3 | size_t has_read = level_start_idx; | 252 | 3 | null_map.emplace_back(0); | 253 | 3 | bool prev_is_null = false; | 254 | | | 255 | 6 | while (has_read < level_end_idx) { | 256 | 3 | level_t def_level = _def_levels[has_read++]; | 257 | 3 | size_t loop_read = 1; | 258 | 9 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 259 | 6 | has_read++; | 260 | 6 | loop_read++; | 261 | 6 | } | 262 | | | 263 | 3 | if (def_level < _field_schema->repeated_parent_def_level) { | 264 | 0 | for (size_t i = 0; i < loop_read; i++) { | 265 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 266 | 0 | } | 267 | 0 | continue; | 268 | 0 | } | 269 | | | 270 | 3 | bool is_null = def_level < _field_schema->definition_level; | 271 | | | 272 | 3 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 273 | 3 | null_map.back() += loop_read; | 274 | 3 | } else { | 275 | 0 | if (!(prev_is_null ^ is_null)) { | 276 | 0 | null_map.emplace_back(0); | 277 | 0 | } | 278 | 0 | size_t remaining = loop_read; | 279 | 0 | while (remaining > USHRT_MAX) { | 280 | 0 | null_map.emplace_back(USHRT_MAX); | 281 | 0 | null_map.emplace_back(0); | 282 | | remaining -= USHRT_MAX; | 283 | 0 | } | 284 | 0 | null_map.emplace_back((u_short)remaining); | 285 | 0 | prev_is_null = is_null; | 286 | 0 | } | 287 | 3 | } | 288 | 3 | return Status::OK(); | 289 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 250 | 10 | std::unordered_set<size_t>& ancestor_null_indices) { | 251 | 10 | size_t has_read = level_start_idx; | 252 | 10 | null_map.emplace_back(0); | 253 | 10 | bool prev_is_null = false; | 254 | | | 255 | 20 | while (has_read < level_end_idx) { | 256 | 10 | level_t def_level = _def_levels[has_read++]; | 257 | 10 | size_t loop_read = 1; | 258 | 10 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 259 | 0 | has_read++; | 260 | 0 | loop_read++; | 261 | 0 | } | 262 | | | 263 | 10 | if (def_level < _field_schema->repeated_parent_def_level) { | 264 | 0 | for (size_t i = 0; i < loop_read; i++) { | 265 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 266 | 0 | } | 267 | 0 | continue; | 268 | 0 | } | 269 | | | 270 | 10 | bool is_null = def_level < _field_schema->definition_level; | 271 | | | 272 | 10 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 273 | 10 | null_map.back() += loop_read; | 274 | 10 | } else { | 275 | 0 | if (!(prev_is_null ^ is_null)) { | 276 | 0 | null_map.emplace_back(0); | 277 | 0 | } | 278 | 0 | size_t remaining = loop_read; | 279 | 0 | while (remaining > USHRT_MAX) { | 280 | 0 | null_map.emplace_back(USHRT_MAX); | 281 | 0 | null_map.emplace_back(0); | 282 | | remaining -= USHRT_MAX; | 283 | 0 | } | 284 | 0 | null_map.emplace_back((u_short)remaining); | 285 | 0 | prev_is_null = is_null; | 286 | 0 | } | 287 | 10 | } | 288 | 10 | return Status::OK(); | 289 | 10 | } |
|
290 | | |
291 | | Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx, |
292 | | size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data, |
293 | 0 | std::unique_ptr<FilterMap>* nested_filter_map) { |
294 | 0 | nested_filter_map_data.resize(level_end_idx - level_start_idx); |
295 | 0 | for (size_t idx = level_start_idx; idx < level_end_idx; idx++) { |
296 | 0 | if (idx != level_start_idx && _rep_levels[idx] == 0) { |
297 | 0 | filter_loc++; |
298 | 0 | } |
299 | 0 | nested_filter_map_data[idx - level_start_idx] = |
300 | 0 | filter_map.filter_map_data()[filter_loc]; |
301 | 0 | } |
302 | |
|
303 | 0 | auto new_filter = std::make_unique<FilterMap>(); |
304 | 0 | RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(), |
305 | 0 | nested_filter_map_data.size(), false)); |
306 | 0 | *nested_filter_map = std::move(new_filter); |
307 | |
|
308 | 0 | return Status::OK(); |
309 | 0 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE |
310 | | |
311 | | std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr; |
312 | | std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr; |
313 | | size_t _orig_filter_map_index = 0; |
314 | | |
315 | | Status _skip_values(size_t num_values); |
316 | | Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, |
317 | | FilterMap& filter_map, bool is_dict_filter); |
318 | | Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, |
319 | | size_t batch_size, size_t* read_rows, bool* eof, |
320 | | bool is_dict_filter); |
321 | | Status _try_load_dict_page(bool* loaded, bool* has_dict); |
322 | | }; |
323 | | |
324 | | class ArrayColumnReader : public ParquetColumnReader { |
325 | | ENABLE_FACTORY_CREATOR(ArrayColumnReader) |
326 | | public: |
327 | | ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
328 | | io::IOContext* io_ctx) |
329 | 2 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
330 | 2 | ~ArrayColumnReader() override { close(); } |
331 | | Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); |
332 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
333 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
334 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
335 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
336 | 0 | const std::vector<level_t>& get_rep_level() const override { |
337 | 0 | return _element_reader->get_rep_level(); |
338 | 0 | } |
339 | 0 | const std::vector<level_t>& get_def_level() const override { |
340 | 0 | return _element_reader->get_def_level(); |
341 | 0 | } |
342 | 2 | ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } |
343 | 2 | void close() override {} |
344 | | |
345 | 2 | void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } |
346 | | |
347 | | private: |
348 | | std::unique_ptr<ParquetColumnReader> _element_reader; |
349 | | }; |
350 | | |
351 | | class MapColumnReader : public ParquetColumnReader { |
352 | | ENABLE_FACTORY_CREATOR(MapColumnReader) |
353 | | public: |
354 | | MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
355 | | io::IOContext* io_ctx) |
356 | 0 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
357 | 0 | ~MapColumnReader() override { close(); } |
358 | | |
359 | | Status init(std::unique_ptr<ParquetColumnReader> key_reader, |
360 | | std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); |
361 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
362 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
363 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
364 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
365 | | |
366 | 0 | const std::vector<level_t>& get_rep_level() const override { |
367 | 0 | return _key_reader->get_rep_level(); |
368 | 0 | } |
369 | 0 | const std::vector<level_t>& get_def_level() const override { |
370 | 0 | return _key_reader->get_def_level(); |
371 | 0 | } |
372 | | |
373 | 0 | ColumnStatistics column_statistics() override { |
374 | 0 | ColumnStatistics kst = _key_reader->column_statistics(); |
375 | 0 | ColumnStatistics vst = _value_reader->column_statistics(); |
376 | 0 | kst.merge(vst); |
377 | 0 | return kst; |
378 | 0 | } |
379 | | |
380 | 0 | void close() override {} |
381 | | |
382 | 0 | void reset_filter_map_index() override { |
383 | 0 | _key_reader->reset_filter_map_index(); |
384 | 0 | _value_reader->reset_filter_map_index(); |
385 | 0 | } |
386 | | |
387 | | private: |
388 | | std::unique_ptr<ParquetColumnReader> _key_reader; |
389 | | std::unique_ptr<ParquetColumnReader> _value_reader; |
390 | | }; |
391 | | |
392 | | class StructColumnReader : public ParquetColumnReader { |
393 | | ENABLE_FACTORY_CREATOR(StructColumnReader) |
394 | | public: |
395 | | StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
396 | | io::IOContext* io_ctx) |
397 | 11 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
398 | 11 | ~StructColumnReader() override { close(); } |
399 | | |
400 | | Status init( |
401 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, |
402 | | FieldSchema* field); |
403 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
404 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
405 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
406 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
407 | | |
408 | 17 | const std::vector<level_t>& get_rep_level() const override { |
409 | 17 | if (!_read_column_names.empty()) { |
410 | | // can't use _child_readers[*_read_column_names.begin()] |
411 | | // because the operator[] of std::unordered_map is not const :( |
412 | | /* |
413 | | * Considering the issue in the `_read_nested_column` function where data may span across pages, leading |
414 | | * to missing definition and repetition levels, when filling the null_map of the struct later, it is |
415 | | * crucial to use the definition and repetition levels from the first read column, |
416 | | * that is `_read_column_names.front()`. |
417 | | */ |
418 | 17 | return _child_readers.find(_read_column_names.front())->second->get_rep_level(); |
419 | 17 | } |
420 | 0 | return _child_readers.begin()->second->get_rep_level(); |
421 | 17 | } |
422 | | |
423 | 17 | const std::vector<level_t>& get_def_level() const override { |
424 | 17 | if (!_read_column_names.empty()) { |
425 | 17 | return _child_readers.find(_read_column_names.front())->second->get_def_level(); |
426 | 17 | } |
427 | 0 | return _child_readers.begin()->second->get_def_level(); |
428 | 17 | } |
429 | | |
430 | 11 | ColumnStatistics column_statistics() override { |
431 | 11 | ColumnStatistics st; |
432 | 22 | for (const auto& column_name : _read_column_names) { |
433 | 22 | auto reader = _child_readers.find(column_name); |
434 | 22 | if (reader != _child_readers.end()) { |
435 | 22 | ColumnStatistics cst = reader->second->column_statistics(); |
436 | 22 | st.merge(cst); |
437 | 22 | } |
438 | 22 | } |
439 | 11 | return st; |
440 | 11 | } |
441 | | |
442 | 11 | void close() override {} |
443 | | |
444 | 11 | void reset_filter_map_index() override { |
445 | 26 | for (const auto& reader : _child_readers) { |
446 | 26 | reader.second->reset_filter_map_index(); |
447 | 26 | } |
448 | 11 | } |
449 | | |
450 | | private: |
451 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; |
452 | | std::vector<std::string> _read_column_names; |
453 | | //Need to use vector instead of set,see `get_rep_level()` for the reason. |
454 | | }; |
455 | | |
456 | | // A special reader that skips actual reading but provides empty data with correct structure |
457 | | // This is used when a column is not needed but its structure is required (e.g., for map keys) |
458 | | class SkipReadingReader : public ParquetColumnReader { |
459 | | public: |
460 | | SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
461 | | io::IOContext* io_ctx, FieldSchema* field_schema) |
462 | 4 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) { |
463 | 4 | _field_schema = field_schema; // Use inherited member from base class |
464 | 4 | VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: " |
465 | 0 | << _field_schema->name; |
466 | 4 | } |
467 | | |
468 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
469 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
470 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
471 | 4 | bool is_dict_filter, int64_t real_column_size = -1) override { |
472 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: " |
473 | 0 | << _field_schema->name << ", batch_size: " << batch_size; |
474 | 4 | DCHECK(real_column_size >= 0); // real_column_size for filtered column size. |
475 | | |
476 | | // Simulate reading without actually reading data |
477 | | // Fill with default/null values based on column type |
478 | 4 | MutableColumnPtr data_column = doris_column->assume_mutable(); |
479 | | |
480 | 4 | if (real_column_size > 0) { |
481 | 4 | if (doris_column->is_nullable()) { |
482 | 4 | auto* nullable_column = static_cast<ColumnNullable*>(data_column.get()); |
483 | 4 | nullable_column->insert_many_defaults(real_column_size); |
484 | 4 | } else { |
485 | | // For non-nullable columns, insert appropriate default values |
486 | 0 | for (size_t i = 0; i < real_column_size; ++i) { |
487 | 0 | data_column->insert_default(); |
488 | 0 | } |
489 | 0 | } |
490 | 4 | } |
491 | | |
492 | 4 | *read_rows = batch_size; // Indicate we "read" batch_size rows |
493 | 4 | *eof = false; // We can always provide more empty data |
494 | | |
495 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size |
496 | 0 | << " default values for field: " << _field_schema->name; |
497 | | |
498 | 4 | return Status::OK(); |
499 | 4 | } |
500 | | |
501 | | static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges, |
502 | | size_t total_rows, cctz::time_zone* ctz, |
503 | | io::IOContext* io_ctx, |
504 | 0 | FieldSchema* field_schema) { |
505 | 0 | return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx, |
506 | 0 | field_schema); |
507 | 0 | } |
508 | | |
509 | | // These methods should not be called for SkipReadingReader |
510 | | // If they are called, it indicates a logic error in the code |
511 | 0 | const std::vector<level_t>& get_rep_level() const override { |
512 | 0 | LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: " |
513 | 0 | << _field_schema->name |
514 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
515 | 0 | "column."; |
516 | 0 | __builtin_unreachable(); |
517 | 0 | } |
518 | | |
519 | 0 | const std::vector<level_t>& get_def_level() const override { |
520 | 0 | LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: " |
521 | 0 | << _field_schema->name |
522 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
523 | 0 | "column."; |
524 | 0 | __builtin_unreachable(); |
525 | 0 | } |
526 | | |
527 | | // Implement required pure virtual methods from base class |
528 | 0 | ColumnStatistics column_statistics() override { |
529 | 0 | return ColumnStatistics(); // Return empty statistics |
530 | 0 | } |
531 | | |
532 | 0 | void close() override { |
533 | | // Nothing to close for skip reading |
534 | 0 | } |
535 | | |
536 | 4 | void reset_filter_map_index() override { _filter_map_index = 0; } |
537 | | }; |
538 | | |
539 | | }; // namespace doris |