be/src/format/parquet/vparquet_column_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/parquet_types.h> |
20 | | #include <glog/logging.h> |
21 | | #include <stddef.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <list> |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/data_type/data_type.h" |
32 | | #include "format/parquet/parquet_column_convert.h" |
33 | | #include "format/parquet/parquet_common.h" |
34 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
35 | | #include "format/table/table_format_reader.h" |
36 | | #include "format/table/table_schema_change_helper.h" |
37 | | #include "io/fs/buffered_reader.h" |
38 | | #include "io/fs/file_reader_writer_fwd.h" |
39 | | #include "parquet_column_convert.h" |
40 | | #include "vparquet_column_chunk_reader.h" |
41 | | |
42 | | namespace cctz { |
43 | | class time_zone; |
44 | | } // namespace cctz |
45 | | |
46 | | namespace doris::io { |
47 | | struct IOContext; |
48 | | } // namespace doris::io |
49 | | |
50 | | namespace doris { |
51 | | #include "common/compile_check_begin.h" |
52 | | struct FieldSchema; |
53 | | template <typename T> |
54 | | class ColumnStr; |
55 | | using ColumnString = ColumnStr<UInt32>; |
56 | | |
57 | | class ParquetColumnReader { |
58 | | public: |
59 | | struct ColumnStatistics { |
60 | | ColumnStatistics() |
61 | 140 | : page_index_read_calls(0), |
62 | 140 | decompress_time(0), |
63 | 140 | decompress_cnt(0), |
64 | 140 | decode_header_time(0), |
65 | 140 | decode_value_time(0), |
66 | 140 | decode_dict_time(0), |
67 | 140 | decode_level_time(0), |
68 | 140 | decode_null_map_time(0), |
69 | 140 | skip_page_header_num(0), |
70 | 140 | parse_page_header_num(0), |
71 | 140 | read_page_header_time(0), |
72 | 140 | page_read_counter(0), |
73 | 140 | page_cache_write_counter(0), |
74 | 140 | page_cache_compressed_write_counter(0), |
75 | 140 | page_cache_decompressed_write_counter(0), |
76 | 140 | page_cache_hit_counter(0), |
77 | 140 | page_cache_missing_counter(0), |
78 | 140 | page_cache_compressed_hit_counter(0), |
79 | 140 | page_cache_decompressed_hit_counter(0) {} |
80 | | |
81 | | ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) |
82 | 115 | : page_index_read_calls(0), |
83 | 115 | decompress_time(cs.decompress_time), |
84 | 115 | decompress_cnt(cs.decompress_cnt), |
85 | 115 | decode_header_time(cs.decode_header_time), |
86 | 115 | decode_value_time(cs.decode_value_time), |
87 | 115 | decode_dict_time(cs.decode_dict_time), |
88 | 115 | decode_level_time(cs.decode_level_time), |
89 | 115 | decode_null_map_time(null_map_time), |
90 | 115 | skip_page_header_num(cs.skip_page_header_num), |
91 | 115 | parse_page_header_num(cs.parse_page_header_num), |
92 | 115 | read_page_header_time(cs.read_page_header_time), |
93 | 115 | page_read_counter(cs.page_read_counter), |
94 | 115 | page_cache_write_counter(cs.page_cache_write_counter), |
95 | 115 | page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), |
96 | 115 | page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), |
97 | 115 | page_cache_hit_counter(cs.page_cache_hit_counter), |
98 | 115 | page_cache_missing_counter(cs.page_cache_missing_counter), |
99 | 115 | page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), |
100 | 115 | page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {} |
101 | | |
102 | | int64_t page_index_read_calls; |
103 | | int64_t decompress_time; |
104 | | int64_t decompress_cnt; |
105 | | int64_t decode_header_time; |
106 | | int64_t decode_value_time; |
107 | | int64_t decode_dict_time; |
108 | | int64_t decode_level_time; |
109 | | int64_t decode_null_map_time; |
110 | | int64_t skip_page_header_num; |
111 | | int64_t parse_page_header_num; |
112 | | int64_t read_page_header_time; |
113 | | int64_t page_read_counter; |
114 | | int64_t page_cache_write_counter; |
115 | | int64_t page_cache_compressed_write_counter; |
116 | | int64_t page_cache_decompressed_write_counter; |
117 | | int64_t page_cache_hit_counter; |
118 | | int64_t page_cache_missing_counter; |
119 | | int64_t page_cache_compressed_hit_counter; |
120 | | int64_t page_cache_decompressed_hit_counter; |
121 | | |
122 | 161 | void merge(ColumnStatistics& col_statistics) { |
123 | 161 | page_index_read_calls += col_statistics.page_index_read_calls; |
124 | 161 | decompress_time += col_statistics.decompress_time; |
125 | 161 | decompress_cnt += col_statistics.decompress_cnt; |
126 | 161 | decode_header_time += col_statistics.decode_header_time; |
127 | 161 | decode_value_time += col_statistics.decode_value_time; |
128 | 161 | decode_dict_time += col_statistics.decode_dict_time; |
129 | 161 | decode_level_time += col_statistics.decode_level_time; |
130 | 161 | decode_null_map_time += col_statistics.decode_null_map_time; |
131 | 161 | skip_page_header_num += col_statistics.skip_page_header_num; |
132 | 161 | parse_page_header_num += col_statistics.parse_page_header_num; |
133 | 161 | read_page_header_time += col_statistics.read_page_header_time; |
134 | 161 | page_read_counter += col_statistics.page_read_counter; |
135 | 161 | page_cache_write_counter += col_statistics.page_cache_write_counter; |
136 | 161 | page_cache_compressed_write_counter += |
137 | 161 | col_statistics.page_cache_compressed_write_counter; |
138 | 161 | page_cache_decompressed_write_counter += |
139 | 161 | col_statistics.page_cache_decompressed_write_counter; |
140 | 161 | page_cache_hit_counter += col_statistics.page_cache_hit_counter; |
141 | 161 | page_cache_missing_counter += col_statistics.page_cache_missing_counter; |
142 | 161 | page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; |
143 | 161 | page_cache_decompressed_hit_counter += |
144 | 161 | col_statistics.page_cache_decompressed_hit_counter; |
145 | 161 | } |
146 | | }; |
147 | | |
148 | | ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
149 | | io::IOContext* io_ctx) |
150 | 132 | : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {} |
151 | 132 | virtual ~ParquetColumnReader() = default; |
152 | | virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
153 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
154 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, |
155 | | bool* eof, bool is_dict_filter, |
156 | | int64_t real_column_size = -1) = 0; |
157 | | |
158 | 0 | virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { |
159 | 0 | return Status::NotSupported("read_dict_values_to_column is not supported"); |
160 | 0 | } |
161 | | |
162 | | virtual Result<MutableColumnPtr> convert_dict_column_to_string_column( |
163 | 0 | const ColumnInt32* dict_column) { |
164 | 0 | throw Exception( |
165 | 0 | Status::FatalError("Method convert_dict_column_to_string_column is not supported")); |
166 | 0 | } |
167 | | |
168 | | static Status create(io::FileReaderSPtr file, FieldSchema* field, |
169 | | const tparquet::RowGroup& row_group, const RowRanges& row_ranges, |
170 | | const cctz::time_zone* ctz, io::IOContext* io_ctx, |
171 | | std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size, |
172 | | std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
173 | | RuntimeState* state, bool in_collection = false, |
174 | | const std::set<uint64_t>& column_ids = {}, |
175 | | const std::set<uint64_t>& filter_column_ids = {}); |
176 | | virtual const std::vector<level_t>& get_rep_level() const = 0; |
177 | | virtual const std::vector<level_t>& get_def_level() const = 0; |
178 | | virtual ColumnStatistics column_statistics() = 0; |
179 | | virtual void close() = 0; |
180 | | |
181 | | virtual void reset_filter_map_index() = 0; |
182 | | |
183 | 0 | FieldSchema* get_field_schema() const { return _field_schema; } |
184 | 28 | void set_column_in_nested() { _in_nested = true; } |
185 | | |
186 | | protected: |
187 | | void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; |
188 | | |
189 | | FieldSchema* _field_schema = nullptr; |
190 | | const RowRanges& _row_ranges; |
191 | | size_t _total_rows = 0; |
192 | | const cctz::time_zone* _ctz = nullptr; |
193 | | io::IOContext* _io_ctx = nullptr; |
194 | | int64_t _current_row_index = 0; |
195 | | int64_t _decode_null_map_time = 0; |
196 | | |
197 | | size_t _filter_map_index = 0; |
198 | | std::set<uint64_t> _filter_column_ids; |
199 | | |
200 | | // _in_nested: column in struct/map/array |
201 | | // IN_COLLECTION : column in map/array |
202 | | bool _in_nested = false; |
203 | | }; |
204 | | |
205 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
206 | | class ScalarColumnReader : public ParquetColumnReader { |
207 | | ENABLE_FACTORY_CREATOR(ScalarColumnReader) |
208 | | public: |
209 | | ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows, |
210 | | const tparquet::ColumnChunk& chunk_meta, |
211 | | const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz, |
212 | | io::IOContext* io_ctx) |
213 | 115 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), |
214 | 115 | _chunk_meta(chunk_meta), |
215 | 115 | _offset_index(offset_index) {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 213 | 3 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 214 | 3 | _chunk_meta(chunk_meta), | 215 | 3 | _offset_index(offset_index) {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 213 | 112 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 214 | 112 | _chunk_meta(chunk_meta), | 215 | 112 | _offset_index(offset_index) {} |
|
216 | 115 | ~ScalarColumnReader() override { close(); }Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev Line | Count | Source | 216 | 3 | ~ScalarColumnReader() override { close(); } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev Line | Count | Source | 216 | 112 | ~ScalarColumnReader() override { close(); } |
|
217 | | Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, |
218 | | RuntimeState* state); |
219 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
220 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
221 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
222 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
223 | | Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; |
224 | | Result<MutableColumnPtr> convert_dict_column_to_string_column( |
225 | | const ColumnInt32* dict_column) override; |
226 | 13 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv Line | Count | Source | 226 | 4 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv Line | Count | Source | 226 | 9 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
|
227 | 13 | const std::vector<level_t>& get_def_level() const override { return _def_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv Line | Count | Source | 227 | 4 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv Line | Count | Source | 227 | 9 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
|
228 | 115 | ColumnStatistics column_statistics() override { |
229 | 115 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); |
230 | 115 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv Line | Count | Source | 228 | 3 | ColumnStatistics column_statistics() override { | 229 | 3 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); | 230 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv Line | Count | Source | 228 | 112 | ColumnStatistics column_statistics() override { | 229 | 112 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); | 230 | 112 | } |
|
231 | 115 | void close() override {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv Line | Count | Source | 231 | 3 | void close() override {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv Line | Count | Source | 231 | 112 | void close() override {} |
|
232 | | |
233 | 132 | void reset_filter_map_index() override { |
234 | 132 | _filter_map_index = 0; // nested |
235 | 132 | _orig_filter_map_index = 0; |
236 | 132 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv Line | Count | Source | 233 | 3 | void reset_filter_map_index() override { | 234 | 3 | _filter_map_index = 0; // nested | 235 | 3 | _orig_filter_map_index = 0; | 236 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv Line | Count | Source | 233 | 129 | void reset_filter_map_index() override { | 234 | 129 | _filter_map_index = 0; // nested | 235 | 129 | _orig_filter_map_index = 0; | 236 | 129 | } |
|
237 | | |
238 | | private: |
239 | | tparquet::ColumnChunk _chunk_meta; |
240 | | const tparquet::OffsetIndex* _offset_index = nullptr; |
241 | | std::unique_ptr<io::BufferedFileStreamReader> _stream_reader; |
242 | | std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader; |
243 | | // rep def levels buffer. |
244 | | std::vector<level_t> _rep_levels; |
245 | | std::vector<level_t> _def_levels; |
246 | | |
247 | | size_t _current_range_idx = 0; |
248 | | |
249 | | Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx, |
250 | | std::vector<uint16_t>& null_map, |
251 | 13 | std::unordered_set<size_t>& ancestor_null_indices) { |
252 | 13 | size_t has_read = level_start_idx; |
253 | 13 | null_map.emplace_back(0); |
254 | 13 | bool prev_is_null = false; |
255 | | |
256 | 26 | while (has_read < level_end_idx) { |
257 | 13 | level_t def_level = _def_levels[has_read++]; |
258 | 13 | size_t loop_read = 1; |
259 | 19 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { |
260 | 6 | has_read++; |
261 | 6 | loop_read++; |
262 | 6 | } |
263 | | |
264 | 13 | if (def_level < _field_schema->repeated_parent_def_level) { |
265 | 0 | for (size_t i = 0; i < loop_read; i++) { |
266 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); |
267 | 0 | } |
268 | 0 | continue; |
269 | 0 | } |
270 | | |
271 | 13 | bool is_null = def_level < _field_schema->definition_level; |
272 | | |
273 | 13 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { |
274 | 13 | null_map.back() += loop_read; |
275 | 13 | } else { |
276 | 0 | if (!(prev_is_null ^ is_null)) { |
277 | 0 | null_map.emplace_back(0); |
278 | 0 | } |
279 | 0 | size_t remaining = loop_read; |
280 | 0 | while (remaining > USHRT_MAX) { |
281 | 0 | null_map.emplace_back(USHRT_MAX); |
282 | 0 | null_map.emplace_back(0); |
283 | 0 | remaining -= USHRT_MAX; |
284 | 0 | } |
285 | 0 | null_map.emplace_back((u_short)remaining); |
286 | 0 | prev_is_null = is_null; |
287 | 0 | } |
288 | 13 | } |
289 | 13 | return Status::OK(); |
290 | 13 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 251 | 3 | std::unordered_set<size_t>& ancestor_null_indices) { | 252 | 3 | size_t has_read = level_start_idx; | 253 | 3 | null_map.emplace_back(0); | 254 | 3 | bool prev_is_null = false; | 255 | | | 256 | 6 | while (has_read < level_end_idx) { | 257 | 3 | level_t def_level = _def_levels[has_read++]; | 258 | 3 | size_t loop_read = 1; | 259 | 9 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 260 | 6 | has_read++; | 261 | 6 | loop_read++; | 262 | 6 | } | 263 | | | 264 | 3 | if (def_level < _field_schema->repeated_parent_def_level) { | 265 | 0 | for (size_t i = 0; i < loop_read; i++) { | 266 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 267 | 0 | } | 268 | 0 | continue; | 269 | 0 | } | 270 | | | 271 | 3 | bool is_null = def_level < _field_schema->definition_level; | 272 | | | 273 | 3 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 274 | 3 | null_map.back() += loop_read; | 275 | 3 | } else { | 276 | 0 | if (!(prev_is_null ^ is_null)) { | 277 | 0 | null_map.emplace_back(0); | 278 | 0 | } | 279 | 0 | size_t remaining = loop_read; | 280 | 0 | while (remaining > USHRT_MAX) { | 281 | 0 | null_map.emplace_back(USHRT_MAX); | 282 | 0 | null_map.emplace_back(0); | 283 | | remaining -= USHRT_MAX; | 284 | 0 | } | 285 | 0 | null_map.emplace_back((u_short)remaining); | 286 | 0 | prev_is_null = is_null; | 287 | 0 | } | 288 | 3 | } | 289 | 3 | return Status::OK(); | 290 | 3 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 251 | 10 | std::unordered_set<size_t>& ancestor_null_indices) { | 252 | 10 | size_t has_read = level_start_idx; | 253 | 10 | null_map.emplace_back(0); | 254 | 10 | bool prev_is_null = false; | 255 | | | 256 | 20 | while (has_read < level_end_idx) { | 257 | 10 | level_t def_level = _def_levels[has_read++]; | 258 | 10 | size_t loop_read = 1; | 259 | 10 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 260 | 0 | has_read++; | 261 | 0 | loop_read++; | 262 | 0 | } | 263 | | | 264 | 10 | if (def_level < _field_schema->repeated_parent_def_level) { | 265 | 0 | for (size_t i = 0; i < loop_read; i++) { | 266 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 267 | 0 | } | 268 | 0 | continue; | 269 | 0 | } | 270 | | | 271 | 10 | bool is_null = def_level < _field_schema->definition_level; | 272 | | | 273 | 10 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 274 | 10 | null_map.back() += loop_read; | 275 | 10 | } else { | 276 | 0 | if (!(prev_is_null ^ is_null)) { | 277 | 0 | null_map.emplace_back(0); | 278 | 0 | } | 279 | 0 | size_t remaining = loop_read; | 280 | 0 | while (remaining > USHRT_MAX) { | 281 | 0 | null_map.emplace_back(USHRT_MAX); | 282 | 0 | null_map.emplace_back(0); | 283 | | remaining -= USHRT_MAX; | 284 | 0 | } | 285 | 0 | null_map.emplace_back((u_short)remaining); | 286 | 0 | prev_is_null = is_null; | 287 | 0 | } | 288 | 10 | } | 289 | 10 | return Status::OK(); | 290 | 10 | } |
|
291 | | |
292 | | Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx, |
293 | | size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data, |
294 | 0 | std::unique_ptr<FilterMap>* nested_filter_map) { |
295 | 0 | nested_filter_map_data.resize(level_end_idx - level_start_idx); |
296 | 0 | for (size_t idx = level_start_idx; idx < level_end_idx; idx++) { |
297 | 0 | if (idx != level_start_idx && _rep_levels[idx] == 0) { |
298 | 0 | filter_loc++; |
299 | 0 | } |
300 | 0 | nested_filter_map_data[idx - level_start_idx] = |
301 | 0 | filter_map.filter_map_data()[filter_loc]; |
302 | 0 | } |
303 | |
|
304 | 0 | auto new_filter = std::make_unique<FilterMap>(); |
305 | 0 | RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(), |
306 | 0 | nested_filter_map_data.size(), false)); |
307 | 0 | *nested_filter_map = std::move(new_filter); |
308 | |
|
309 | 0 | return Status::OK(); |
310 | 0 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE |
311 | | |
312 | | std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr; |
313 | | std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr; |
314 | | size_t _orig_filter_map_index = 0; |
315 | | |
316 | | Status _skip_values(size_t num_values); |
317 | | Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, |
318 | | FilterMap& filter_map, bool is_dict_filter); |
319 | | Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, |
320 | | size_t batch_size, size_t* read_rows, bool* eof, |
321 | | bool is_dict_filter); |
322 | | Status _try_load_dict_page(bool* loaded, bool* has_dict); |
323 | | }; |
324 | | |
325 | | class ArrayColumnReader : public ParquetColumnReader { |
326 | | ENABLE_FACTORY_CREATOR(ArrayColumnReader) |
327 | | public: |
328 | | ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
329 | | io::IOContext* io_ctx) |
330 | 2 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
331 | 2 | ~ArrayColumnReader() override { close(); } |
332 | | Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); |
333 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
334 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
335 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
336 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
337 | 0 | const std::vector<level_t>& get_rep_level() const override { |
338 | 0 | return _element_reader->get_rep_level(); |
339 | 0 | } |
340 | 0 | const std::vector<level_t>& get_def_level() const override { |
341 | 0 | return _element_reader->get_def_level(); |
342 | 0 | } |
343 | 2 | ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } |
344 | 2 | void close() override {} |
345 | | |
346 | 2 | void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } |
347 | | |
348 | | private: |
349 | | std::unique_ptr<ParquetColumnReader> _element_reader; |
350 | | }; |
351 | | |
352 | | class MapColumnReader : public ParquetColumnReader { |
353 | | ENABLE_FACTORY_CREATOR(MapColumnReader) |
354 | | public: |
355 | | MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
356 | | io::IOContext* io_ctx) |
357 | 0 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
358 | 0 | ~MapColumnReader() override { close(); } |
359 | | |
360 | | Status init(std::unique_ptr<ParquetColumnReader> key_reader, |
361 | | std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); |
362 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
363 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
364 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
365 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
366 | | |
367 | 0 | const std::vector<level_t>& get_rep_level() const override { |
368 | 0 | return _key_reader->get_rep_level(); |
369 | 0 | } |
370 | 0 | const std::vector<level_t>& get_def_level() const override { |
371 | 0 | return _key_reader->get_def_level(); |
372 | 0 | } |
373 | | |
374 | 0 | ColumnStatistics column_statistics() override { |
375 | 0 | ColumnStatistics kst = _key_reader->column_statistics(); |
376 | 0 | ColumnStatistics vst = _value_reader->column_statistics(); |
377 | 0 | kst.merge(vst); |
378 | 0 | return kst; |
379 | 0 | } |
380 | | |
381 | 0 | void close() override {} |
382 | | |
383 | 0 | void reset_filter_map_index() override { |
384 | 0 | _key_reader->reset_filter_map_index(); |
385 | 0 | _value_reader->reset_filter_map_index(); |
386 | 0 | } |
387 | | |
388 | | private: |
389 | | std::unique_ptr<ParquetColumnReader> _key_reader; |
390 | | std::unique_ptr<ParquetColumnReader> _value_reader; |
391 | | }; |
392 | | |
393 | | class StructColumnReader : public ParquetColumnReader { |
394 | | ENABLE_FACTORY_CREATOR(StructColumnReader) |
395 | | public: |
396 | | StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
397 | | io::IOContext* io_ctx) |
398 | 11 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
399 | 11 | ~StructColumnReader() override { close(); } |
400 | | |
401 | | Status init( |
402 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, |
403 | | FieldSchema* field); |
404 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
405 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
406 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
407 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
408 | | |
409 | 17 | const std::vector<level_t>& get_rep_level() const override { |
410 | 17 | if (!_read_column_names.empty()) { |
411 | | // can't use _child_readers[*_read_column_names.begin()] |
412 | | // because the operator[] of std::unordered_map is not const :( |
413 | | /* |
414 | | * Considering the issue in the `_read_nested_column` function where data may span across pages, leading |
415 | | * to missing definition and repetition levels, when filling the null_map of the struct later, it is |
416 | | * crucial to use the definition and repetition levels from the first read column, |
417 | | * that is `_read_column_names.front()`. |
418 | | */ |
419 | 17 | return _child_readers.find(_read_column_names.front())->second->get_rep_level(); |
420 | 17 | } |
421 | 0 | return _child_readers.begin()->second->get_rep_level(); |
422 | 17 | } |
423 | | |
424 | 17 | const std::vector<level_t>& get_def_level() const override { |
425 | 17 | if (!_read_column_names.empty()) { |
426 | 17 | return _child_readers.find(_read_column_names.front())->second->get_def_level(); |
427 | 17 | } |
428 | 0 | return _child_readers.begin()->second->get_def_level(); |
429 | 17 | } |
430 | | |
431 | 11 | ColumnStatistics column_statistics() override { |
432 | 11 | ColumnStatistics st; |
433 | 22 | for (const auto& column_name : _read_column_names) { |
434 | 22 | auto reader = _child_readers.find(column_name); |
435 | 22 | if (reader != _child_readers.end()) { |
436 | 22 | ColumnStatistics cst = reader->second->column_statistics(); |
437 | 22 | st.merge(cst); |
438 | 22 | } |
439 | 22 | } |
440 | 11 | return st; |
441 | 11 | } |
442 | | |
443 | 11 | void close() override {} |
444 | | |
445 | 11 | void reset_filter_map_index() override { |
446 | 26 | for (const auto& reader : _child_readers) { |
447 | 26 | reader.second->reset_filter_map_index(); |
448 | 26 | } |
449 | 11 | } |
450 | | |
451 | | private: |
452 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; |
453 | | std::vector<std::string> _read_column_names; |
454 | | //Need to use vector instead of set,see `get_rep_level()` for the reason. |
455 | | }; |
456 | | |
457 | | // A special reader that skips actual reading but provides empty data with correct structure |
458 | | // This is used when a column is not needed but its structure is required (e.g., for map keys) |
459 | | class SkipReadingReader : public ParquetColumnReader { |
460 | | public: |
461 | | SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
462 | | io::IOContext* io_ctx, FieldSchema* field_schema) |
463 | 4 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) { |
464 | 4 | _field_schema = field_schema; // Use inherited member from base class |
465 | 4 | VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: " |
466 | 0 | << _field_schema->name; |
467 | 4 | } |
468 | | |
469 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
470 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
471 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
472 | 4 | bool is_dict_filter, int64_t real_column_size = -1) override { |
473 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: " |
474 | 0 | << _field_schema->name << ", batch_size: " << batch_size; |
475 | 4 | DCHECK(real_column_size >= 0); // real_column_size for filtered column size. |
476 | | |
477 | | // Simulate reading without actually reading data |
478 | | // Fill with default/null values based on column type |
479 | 4 | MutableColumnPtr data_column = doris_column->assume_mutable(); |
480 | | |
481 | 4 | if (real_column_size > 0) { |
482 | 4 | if (doris_column->is_nullable()) { |
483 | 4 | auto* nullable_column = static_cast<ColumnNullable*>(data_column.get()); |
484 | 4 | nullable_column->insert_many_defaults(real_column_size); |
485 | 4 | } else { |
486 | | // For non-nullable columns, insert appropriate default values |
487 | 0 | for (size_t i = 0; i < real_column_size; ++i) { |
488 | 0 | data_column->insert_default(); |
489 | 0 | } |
490 | 0 | } |
491 | 4 | } |
492 | | |
493 | 4 | *read_rows = batch_size; // Indicate we "read" batch_size rows |
494 | 4 | *eof = false; // We can always provide more empty data |
495 | | |
496 | 4 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size |
497 | 0 | << " default values for field: " << _field_schema->name; |
498 | | |
499 | 4 | return Status::OK(); |
500 | 4 | } |
501 | | |
502 | | static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges, |
503 | | size_t total_rows, cctz::time_zone* ctz, |
504 | | io::IOContext* io_ctx, |
505 | 0 | FieldSchema* field_schema) { |
506 | 0 | return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx, |
507 | 0 | field_schema); |
508 | 0 | } |
509 | | |
510 | | // These methods should not be called for SkipReadingReader |
511 | | // If they are called, it indicates a logic error in the code |
512 | 0 | const std::vector<level_t>& get_rep_level() const override { |
513 | 0 | LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: " |
514 | 0 | << _field_schema->name |
515 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
516 | 0 | "column."; |
517 | 0 | __builtin_unreachable(); |
518 | 0 | } |
519 | | |
520 | 0 | const std::vector<level_t>& get_def_level() const override { |
521 | 0 | LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: " |
522 | 0 | << _field_schema->name |
523 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
524 | 0 | "column."; |
525 | 0 | __builtin_unreachable(); |
526 | 0 | } |
527 | | |
528 | | // Implement required pure virtual methods from base class |
529 | 0 | ColumnStatistics column_statistics() override { |
530 | 0 | return ColumnStatistics(); // Return empty statistics |
531 | 0 | } |
532 | | |
533 | 0 | void close() override { |
534 | | // Nothing to close for skip reading |
535 | 0 | } |
536 | | |
537 | 4 | void reset_filter_map_index() override { _filter_map_index = 0; } |
538 | | }; |
539 | | |
540 | | #include "common/compile_check_end.h" |
541 | | |
542 | | }; // namespace doris |