be/src/format/parquet/vparquet_column_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | #include <gen_cpp/parquet_types.h> |
20 | | #include <glog/logging.h> |
21 | | #include <stddef.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <list> |
25 | | #include <memory> |
26 | | #include <ostream> |
27 | | #include <unordered_map> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/data_type/data_type.h" |
32 | | #include "format/parquet/parquet_column_convert.h" |
33 | | #include "format/parquet/parquet_common.h" |
34 | | #include "format/parquet/vparquet_column_chunk_reader.h" |
35 | | #include "format/table/table_format_reader.h" |
36 | | #include "io/fs/buffered_reader.h" |
37 | | #include "io/fs/file_reader_writer_fwd.h" |
38 | | |
39 | | namespace cctz { |
40 | | class time_zone; |
41 | | } // namespace cctz |
42 | | |
43 | | namespace doris::io { |
44 | | struct IOContext; |
45 | | } // namespace doris::io |
46 | | |
47 | | namespace doris { |
48 | | #include "common/compile_check_begin.h" |
49 | | struct FieldSchema; |
50 | | template <typename T> |
51 | | class ColumnStr; |
52 | | using ColumnString = ColumnStr<UInt32>; |
53 | | |
54 | | class ParquetColumnReader { |
55 | | public: |
56 | | struct ColumnStatistics { |
57 | | ColumnStatistics() |
58 | 125 | : page_index_read_calls(0), |
59 | 125 | decompress_time(0), |
60 | 125 | decompress_cnt(0), |
61 | 125 | decode_header_time(0), |
62 | 125 | decode_value_time(0), |
63 | 125 | decode_dict_time(0), |
64 | 125 | decode_level_time(0), |
65 | 125 | decode_null_map_time(0), |
66 | 125 | skip_page_header_num(0), |
67 | 125 | parse_page_header_num(0), |
68 | 125 | read_page_header_time(0), |
69 | 125 | page_read_counter(0), |
70 | 125 | page_cache_write_counter(0), |
71 | 125 | page_cache_compressed_write_counter(0), |
72 | 125 | page_cache_decompressed_write_counter(0), |
73 | 125 | page_cache_hit_counter(0), |
74 | 125 | page_cache_missing_counter(0), |
75 | 125 | page_cache_compressed_hit_counter(0), |
76 | 125 | page_cache_decompressed_hit_counter(0) {} |
77 | | |
78 | | ColumnStatistics(ColumnChunkReaderStatistics& cs, int64_t null_map_time) |
79 | 110 | : page_index_read_calls(0), |
80 | 110 | decompress_time(cs.decompress_time), |
81 | 110 | decompress_cnt(cs.decompress_cnt), |
82 | 110 | decode_header_time(cs.decode_header_time), |
83 | 110 | decode_value_time(cs.decode_value_time), |
84 | 110 | decode_dict_time(cs.decode_dict_time), |
85 | 110 | decode_level_time(cs.decode_level_time), |
86 | 110 | decode_null_map_time(null_map_time), |
87 | 110 | skip_page_header_num(cs.skip_page_header_num), |
88 | 110 | parse_page_header_num(cs.parse_page_header_num), |
89 | 110 | read_page_header_time(cs.read_page_header_time), |
90 | 110 | page_read_counter(cs.page_read_counter), |
91 | 110 | page_cache_write_counter(cs.page_cache_write_counter), |
92 | 110 | page_cache_compressed_write_counter(cs.page_cache_compressed_write_counter), |
93 | 110 | page_cache_decompressed_write_counter(cs.page_cache_decompressed_write_counter), |
94 | 110 | page_cache_hit_counter(cs.page_cache_hit_counter), |
95 | 110 | page_cache_missing_counter(cs.page_cache_missing_counter), |
96 | 110 | page_cache_compressed_hit_counter(cs.page_cache_compressed_hit_counter), |
97 | 110 | page_cache_decompressed_hit_counter(cs.page_cache_decompressed_hit_counter) {} |
98 | | |
99 | | int64_t page_index_read_calls; |
100 | | int64_t decompress_time; |
101 | | int64_t decompress_cnt; |
102 | | int64_t decode_header_time; |
103 | | int64_t decode_value_time; |
104 | | int64_t decode_dict_time; |
105 | | int64_t decode_level_time; |
106 | | int64_t decode_null_map_time; |
107 | | int64_t skip_page_header_num; |
108 | | int64_t parse_page_header_num; |
109 | | int64_t read_page_header_time; |
110 | | int64_t page_read_counter; |
111 | | int64_t page_cache_write_counter; |
112 | | int64_t page_cache_compressed_write_counter; |
113 | | int64_t page_cache_decompressed_write_counter; |
114 | | int64_t page_cache_hit_counter; |
115 | | int64_t page_cache_missing_counter; |
116 | | int64_t page_cache_compressed_hit_counter; |
117 | | int64_t page_cache_decompressed_hit_counter; |
118 | | |
119 | 155 | void merge(ColumnStatistics& col_statistics) { |
120 | 155 | page_index_read_calls += col_statistics.page_index_read_calls; |
121 | 155 | decompress_time += col_statistics.decompress_time; |
122 | 155 | decompress_cnt += col_statistics.decompress_cnt; |
123 | 155 | decode_header_time += col_statistics.decode_header_time; |
124 | 155 | decode_value_time += col_statistics.decode_value_time; |
125 | 155 | decode_dict_time += col_statistics.decode_dict_time; |
126 | 155 | decode_level_time += col_statistics.decode_level_time; |
127 | 155 | decode_null_map_time += col_statistics.decode_null_map_time; |
128 | 155 | skip_page_header_num += col_statistics.skip_page_header_num; |
129 | 155 | parse_page_header_num += col_statistics.parse_page_header_num; |
130 | 155 | read_page_header_time += col_statistics.read_page_header_time; |
131 | 155 | page_read_counter += col_statistics.page_read_counter; |
132 | 155 | page_cache_write_counter += col_statistics.page_cache_write_counter; |
133 | 155 | page_cache_compressed_write_counter += |
134 | 155 | col_statistics.page_cache_compressed_write_counter; |
135 | 155 | page_cache_decompressed_write_counter += |
136 | 155 | col_statistics.page_cache_decompressed_write_counter; |
137 | 155 | page_cache_hit_counter += col_statistics.page_cache_hit_counter; |
138 | 155 | page_cache_missing_counter += col_statistics.page_cache_missing_counter; |
139 | 155 | page_cache_compressed_hit_counter += col_statistics.page_cache_compressed_hit_counter; |
140 | 155 | page_cache_decompressed_hit_counter += |
141 | 155 | col_statistics.page_cache_decompressed_hit_counter; |
142 | 155 | } |
143 | | }; |
144 | | |
145 | | ParquetColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
146 | | io::IOContext* io_ctx) |
147 | 130 | : _row_ranges(row_ranges), _total_rows(total_rows), _ctz(ctz), _io_ctx(io_ctx) {} |
148 | 130 | virtual ~ParquetColumnReader() = default; |
149 | | virtual Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
150 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
151 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, |
152 | | bool* eof, bool is_dict_filter, |
153 | | int64_t real_column_size = -1) = 0; |
154 | | |
155 | 0 | virtual Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) { |
156 | 0 | return Status::NotSupported("read_dict_values_to_column is not supported"); |
157 | 0 | } |
158 | | |
159 | | virtual Result<MutableColumnPtr> convert_dict_column_to_string_column( |
160 | 0 | const ColumnInt32* dict_column) { |
161 | 0 | throw Exception( |
162 | 0 | Status::FatalError("Method convert_dict_column_to_string_column is not supported")); |
163 | 0 | } |
164 | | |
165 | | static Status create(io::FileReaderSPtr file, FieldSchema* field, |
166 | | const tparquet::RowGroup& row_group, const RowRanges& row_ranges, |
167 | | const cctz::time_zone* ctz, io::IOContext* io_ctx, |
168 | | std::unique_ptr<ParquetColumnReader>& reader, size_t max_buf_size, |
169 | | std::unordered_map<int, tparquet::OffsetIndex>& col_offsets, |
170 | | RuntimeState* state, bool in_collection = false, |
171 | | const std::set<uint64_t>& column_ids = {}, |
172 | | const std::set<uint64_t>& filter_column_ids = {}); |
173 | | virtual const std::vector<level_t>& get_rep_level() const = 0; |
174 | | virtual const std::vector<level_t>& get_def_level() const = 0; |
175 | | virtual ColumnStatistics column_statistics() = 0; |
176 | | virtual void close() = 0; |
177 | | |
178 | | virtual void reset_filter_map_index() = 0; |
179 | | |
180 | 0 | FieldSchema* get_field_schema() const { return _field_schema; } |
181 | 26 | void set_column_in_nested() { _in_nested = true; } |
182 | | |
183 | | protected: |
184 | | void _generate_read_ranges(RowRange page_row_range, RowRanges* result_ranges) const; |
185 | | |
186 | | FieldSchema* _field_schema = nullptr; |
187 | | const RowRanges& _row_ranges; |
188 | | size_t _total_rows = 0; |
189 | | const cctz::time_zone* _ctz = nullptr; |
190 | | io::IOContext* _io_ctx = nullptr; |
191 | | int64_t _current_row_index = 0; |
192 | | int64_t _decode_null_map_time = 0; |
193 | | |
194 | | size_t _filter_map_index = 0; |
195 | | std::set<uint64_t> _filter_column_ids; |
196 | | |
197 | | // _in_nested: column in struct/map/array |
198 | | // IN_COLLECTION : column in map/array |
199 | | bool _in_nested = false; |
200 | | }; |
201 | | |
202 | | template <bool IN_COLLECTION, bool OFFSET_INDEX> |
203 | | class ScalarColumnReader : public ParquetColumnReader { |
204 | | ENABLE_FACTORY_CREATOR(ScalarColumnReader) |
205 | | public: |
206 | | ScalarColumnReader(const RowRanges& row_ranges, size_t total_rows, |
207 | | const tparquet::ColumnChunk& chunk_meta, |
208 | | const tparquet::OffsetIndex* offset_index, const cctz::time_zone* ctz, |
209 | | io::IOContext* io_ctx) |
210 | 110 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), |
211 | 110 | _chunk_meta(chunk_meta), |
212 | 110 | _offset_index(offset_index) {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb1ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 210 | 2 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 211 | 2 | _chunk_meta(chunk_meta), | 212 | 2 | _offset_index(offset_index) {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE _ZN5doris18ScalarColumnReaderILb0ELb0EEC2ERKNS_10segment_v29RowRangesEmRKN8tparquet11ColumnChunkEPKNS6_11OffsetIndexEPKN4cctz9time_zoneEPNS_2io9IOContextE Line | Count | Source | 210 | 108 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx), | 211 | 108 | _chunk_meta(chunk_meta), | 212 | 108 | _offset_index(offset_index) {} |
|
213 | 110 | ~ScalarColumnReader() override { close(); }Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb1ELb0EED2Ev Line | Count | Source | 213 | 2 | ~ScalarColumnReader() override { close(); } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EED2Ev _ZN5doris18ScalarColumnReaderILb0ELb0EED2Ev Line | Count | Source | 213 | 108 | ~ScalarColumnReader() override { close(); } |
|
214 | | Status init(io::FileReaderSPtr file, FieldSchema* field, size_t max_buf_size, |
215 | | RuntimeState* state); |
216 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
217 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
218 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
219 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
220 | | Status read_dict_values_to_column(MutableColumnPtr& doris_column, bool* has_dict) override; |
221 | | Result<MutableColumnPtr> convert_dict_column_to_string_column( |
222 | | const ColumnInt32* dict_column) override; |
223 | 12 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_rep_levelEv Line | Count | Source | 223 | 4 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_rep_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_rep_levelEv Line | Count | Source | 223 | 8 | const std::vector<level_t>& get_rep_level() const override { return _rep_levels; } |
|
224 | 12 | const std::vector<level_t>& get_def_level() const override { return _def_levels; }Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb1ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb1ELb0EE13get_def_levelEv Line | Count | Source | 224 | 4 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
Unexecuted instantiation: _ZNK5doris18ScalarColumnReaderILb0ELb1EE13get_def_levelEv _ZNK5doris18ScalarColumnReaderILb0ELb0EE13get_def_levelEv Line | Count | Source | 224 | 8 | const std::vector<level_t>& get_def_level() const override { return _def_levels; } |
|
225 | 110 | ColumnStatistics column_statistics() override { |
226 | 110 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); |
227 | 110 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb1ELb0EE17column_statisticsEv Line | Count | Source | 225 | 2 | ColumnStatistics column_statistics() override { | 226 | 2 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); | 227 | 2 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE17column_statisticsEv _ZN5doris18ScalarColumnReaderILb0ELb0EE17column_statisticsEv Line | Count | Source | 225 | 108 | ColumnStatistics column_statistics() override { | 226 | 108 | return ColumnStatistics(_chunk_reader->chunk_statistics(), _decode_null_map_time); | 227 | 108 | } |
|
228 | 110 | void close() override {}Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb1ELb0EE5closeEv Line | Count | Source | 228 | 2 | void close() override {} |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE5closeEv _ZN5doris18ScalarColumnReaderILb0ELb0EE5closeEv Line | Count | Source | 228 | 108 | void close() override {} |
|
229 | | |
230 | 127 | void reset_filter_map_index() override { |
231 | 127 | _filter_map_index = 0; // nested |
232 | 127 | _orig_filter_map_index = 0; |
233 | 127 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb1ELb0EE22reset_filter_map_indexEv Line | Count | Source | 230 | 2 | void reset_filter_map_index() override { | 231 | 2 | _filter_map_index = 0; // nested | 232 | 2 | _orig_filter_map_index = 0; | 233 | 2 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE22reset_filter_map_indexEv _ZN5doris18ScalarColumnReaderILb0ELb0EE22reset_filter_map_indexEv Line | Count | Source | 230 | 125 | void reset_filter_map_index() override { | 231 | 125 | _filter_map_index = 0; // nested | 232 | 125 | _orig_filter_map_index = 0; | 233 | 125 | } |
|
234 | | |
235 | | private: |
236 | | tparquet::ColumnChunk _chunk_meta; |
237 | | const tparquet::OffsetIndex* _offset_index = nullptr; |
238 | | std::unique_ptr<io::BufferedFileStreamReader> _stream_reader; |
239 | | std::unique_ptr<ColumnChunkReader<IN_COLLECTION, OFFSET_INDEX>> _chunk_reader; |
240 | | // rep def levels buffer. |
241 | | std::vector<level_t> _rep_levels; |
242 | | std::vector<level_t> _def_levels; |
243 | | |
244 | | size_t _current_range_idx = 0; |
245 | | |
246 | | Status gen_nested_null_map(size_t level_start_idx, size_t level_end_idx, |
247 | | std::vector<uint16_t>& null_map, |
248 | 8 | std::unordered_set<size_t>& ancestor_null_indices) { |
249 | 8 | size_t has_read = level_start_idx; |
250 | 8 | null_map.emplace_back(0); |
251 | 8 | bool prev_is_null = false; |
252 | | |
253 | 16 | while (has_read < level_end_idx) { |
254 | 8 | level_t def_level = _def_levels[has_read++]; |
255 | 8 | size_t loop_read = 1; |
256 | 12 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { |
257 | 4 | has_read++; |
258 | 4 | loop_read++; |
259 | 4 | } |
260 | | |
261 | 8 | if (def_level < _field_schema->repeated_parent_def_level) { |
262 | 0 | for (size_t i = 0; i < loop_read; i++) { |
263 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); |
264 | 0 | } |
265 | 0 | continue; |
266 | 0 | } |
267 | | |
268 | 8 | bool is_null = def_level < _field_schema->definition_level; |
269 | | |
270 | 8 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { |
271 | 8 | null_map.back() += loop_read; |
272 | 8 | } else { |
273 | 0 | if (!(prev_is_null ^ is_null)) { |
274 | 0 | null_map.emplace_back(0); |
275 | 0 | } |
276 | 0 | size_t remaining = loop_read; |
277 | 0 | while (remaining > USHRT_MAX) { |
278 | 0 | null_map.emplace_back(USHRT_MAX); |
279 | 0 | null_map.emplace_back(0); |
280 | 0 | remaining -= USHRT_MAX; |
281 | 0 | } |
282 | 0 | null_map.emplace_back((u_short)remaining); |
283 | 0 | prev_is_null = is_null; |
284 | 0 | } |
285 | 8 | } |
286 | 8 | return Status::OK(); |
287 | 8 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb1ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 248 | 2 | std::unordered_set<size_t>& ancestor_null_indices) { | 249 | 2 | size_t has_read = level_start_idx; | 250 | 2 | null_map.emplace_back(0); | 251 | 2 | bool prev_is_null = false; | 252 | | | 253 | 4 | while (has_read < level_end_idx) { | 254 | 2 | level_t def_level = _def_levels[has_read++]; | 255 | 2 | size_t loop_read = 1; | 256 | 6 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 257 | 4 | has_read++; | 258 | 4 | loop_read++; | 259 | 4 | } | 260 | | | 261 | 2 | if (def_level < _field_schema->repeated_parent_def_level) { | 262 | 0 | for (size_t i = 0; i < loop_read; i++) { | 263 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 264 | 0 | } | 265 | 0 | continue; | 266 | 0 | } | 267 | | | 268 | 2 | bool is_null = def_level < _field_schema->definition_level; | 269 | | | 270 | 2 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 271 | 2 | null_map.back() += loop_read; | 272 | 2 | } else { | 273 | 0 | if (!(prev_is_null ^ is_null)) { | 274 | 0 | null_map.emplace_back(0); | 275 | 0 | } | 276 | 0 | size_t remaining = loop_read; | 277 | 0 | while (remaining > USHRT_MAX) { | 278 | 0 | null_map.emplace_back(USHRT_MAX); | 279 | 0 | null_map.emplace_back(0); | 280 | | remaining -= USHRT_MAX; | 281 | 0 | } | 282 | 0 | null_map.emplace_back((u_short)remaining); | 283 | 0 | prev_is_null = is_null; | 284 | 0 | } | 285 | 2 | } | 286 | 2 | return Status::OK(); | 287 | 2 | } |
Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE _ZN5doris18ScalarColumnReaderILb0ELb0EE19gen_nested_null_mapEmmRSt6vectorItSaItEERSt13unordered_setImSt4hashImESt8equal_toImESaImEE Line | Count | Source | 248 | 6 | std::unordered_set<size_t>& ancestor_null_indices) { | 249 | 6 | size_t has_read = level_start_idx; | 250 | 6 | null_map.emplace_back(0); | 251 | 6 | bool prev_is_null = false; | 252 | | | 253 | 12 | while (has_read < level_end_idx) { | 254 | 6 | level_t def_level = _def_levels[has_read++]; | 255 | 6 | size_t loop_read = 1; | 256 | 6 | while (has_read < _def_levels.size() && _def_levels[has_read] == def_level) { | 257 | 0 | has_read++; | 258 | 0 | loop_read++; | 259 | 0 | } | 260 | | | 261 | 6 | if (def_level < _field_schema->repeated_parent_def_level) { | 262 | 0 | for (size_t i = 0; i < loop_read; i++) { | 263 | 0 | ancestor_null_indices.insert(has_read - level_start_idx - loop_read + i); | 264 | 0 | } | 265 | 0 | continue; | 266 | 0 | } | 267 | | | 268 | 6 | bool is_null = def_level < _field_schema->definition_level; | 269 | | | 270 | 6 | if (prev_is_null == is_null && (USHRT_MAX - null_map.back() >= loop_read)) { | 271 | 6 | null_map.back() += loop_read; | 272 | 6 | } else { | 273 | 0 | if (!(prev_is_null ^ is_null)) { | 274 | 0 | null_map.emplace_back(0); | 275 | 0 | } | 276 | 0 | size_t remaining = loop_read; | 277 | 0 | while (remaining > USHRT_MAX) { | 278 | 0 | null_map.emplace_back(USHRT_MAX); | 279 | 0 | null_map.emplace_back(0); | 280 | | remaining -= USHRT_MAX; | 281 | 0 | } | 282 | 0 | null_map.emplace_back((u_short)remaining); | 283 | 0 | prev_is_null = is_null; | 284 | 0 | } | 285 | 6 | } | 286 | 6 | return Status::OK(); | 287 | 6 | } |
|
288 | | |
289 | | Status gen_filter_map(FilterMap& filter_map, size_t filter_loc, size_t level_start_idx, |
290 | | size_t level_end_idx, std::vector<uint8_t>& nested_filter_map_data, |
291 | 0 | std::unique_ptr<FilterMap>* nested_filter_map) { |
292 | 0 | nested_filter_map_data.resize(level_end_idx - level_start_idx); |
293 | 0 | for (size_t idx = level_start_idx; idx < level_end_idx; idx++) { |
294 | 0 | if (idx != level_start_idx && _rep_levels[idx] == 0) { |
295 | 0 | filter_loc++; |
296 | 0 | } |
297 | 0 | nested_filter_map_data[idx - level_start_idx] = |
298 | 0 | filter_map.filter_map_data()[filter_loc]; |
299 | 0 | } |
300 | |
|
301 | 0 | auto new_filter = std::make_unique<FilterMap>(); |
302 | 0 | RETURN_IF_ERROR(new_filter->init(nested_filter_map_data.data(), |
303 | 0 | nested_filter_map_data.size(), false)); |
304 | 0 | *nested_filter_map = std::move(new_filter); |
305 | |
|
306 | 0 | return Status::OK(); |
307 | 0 | } Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb1ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb1EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE Unexecuted instantiation: _ZN5doris18ScalarColumnReaderILb0ELb0EE14gen_filter_mapERNS_9FilterMapEmmmRSt6vectorIhSaIhEEPSt10unique_ptrIS2_St14default_deleteIS2_EE |
308 | | |
309 | | std::unique_ptr<parquet::PhysicalToLogicalConverter> _converter = nullptr; |
310 | | std::unique_ptr<std::vector<uint8_t>> _nested_filter_map_data = nullptr; |
311 | | size_t _orig_filter_map_index = 0; |
312 | | |
313 | | Status _skip_values(size_t num_values); |
314 | | Status _read_values(size_t num_values, ColumnPtr& doris_column, DataTypePtr& type, |
315 | | FilterMap& filter_map, bool is_dict_filter); |
316 | | Status _read_nested_column(ColumnPtr& doris_column, DataTypePtr& type, FilterMap& filter_map, |
317 | | size_t batch_size, size_t* read_rows, bool* eof, |
318 | | bool is_dict_filter); |
319 | | Status _try_load_dict_page(bool* loaded, bool* has_dict); |
320 | | }; |
321 | | |
322 | | class ArrayColumnReader : public ParquetColumnReader { |
323 | | ENABLE_FACTORY_CREATOR(ArrayColumnReader) |
324 | | public: |
325 | | ArrayColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
326 | | io::IOContext* io_ctx) |
327 | 2 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
328 | 2 | ~ArrayColumnReader() override { close(); } |
329 | | Status init(std::unique_ptr<ParquetColumnReader> element_reader, FieldSchema* field); |
330 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
331 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
332 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
333 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
334 | 0 | const std::vector<level_t>& get_rep_level() const override { |
335 | 0 | return _element_reader->get_rep_level(); |
336 | 0 | } |
337 | 0 | const std::vector<level_t>& get_def_level() const override { |
338 | 0 | return _element_reader->get_def_level(); |
339 | 0 | } |
340 | 2 | ColumnStatistics column_statistics() override { return _element_reader->column_statistics(); } |
341 | 2 | void close() override {} |
342 | | |
343 | 2 | void reset_filter_map_index() override { _element_reader->reset_filter_map_index(); } |
344 | | |
345 | | private: |
346 | | std::unique_ptr<ParquetColumnReader> _element_reader; |
347 | | }; |
348 | | |
349 | | class MapColumnReader : public ParquetColumnReader { |
350 | | ENABLE_FACTORY_CREATOR(MapColumnReader) |
351 | | public: |
352 | | MapColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
353 | | io::IOContext* io_ctx) |
354 | 0 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
355 | 0 | ~MapColumnReader() override { close(); } |
356 | | |
357 | | Status init(std::unique_ptr<ParquetColumnReader> key_reader, |
358 | | std::unique_ptr<ParquetColumnReader> value_reader, FieldSchema* field); |
359 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
360 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
361 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
362 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
363 | | |
364 | 0 | const std::vector<level_t>& get_rep_level() const override { |
365 | 0 | return _key_reader->get_rep_level(); |
366 | 0 | } |
367 | 0 | const std::vector<level_t>& get_def_level() const override { |
368 | 0 | return _key_reader->get_def_level(); |
369 | 0 | } |
370 | | |
371 | 0 | ColumnStatistics column_statistics() override { |
372 | 0 | ColumnStatistics kst = _key_reader->column_statistics(); |
373 | 0 | ColumnStatistics vst = _value_reader->column_statistics(); |
374 | 0 | kst.merge(vst); |
375 | 0 | return kst; |
376 | 0 | } |
377 | | |
378 | 0 | void close() override {} |
379 | | |
380 | 0 | void reset_filter_map_index() override { |
381 | 0 | _key_reader->reset_filter_map_index(); |
382 | 0 | _value_reader->reset_filter_map_index(); |
383 | 0 | } |
384 | | |
385 | | private: |
386 | | std::unique_ptr<ParquetColumnReader> _key_reader; |
387 | | std::unique_ptr<ParquetColumnReader> _value_reader; |
388 | | }; |
389 | | |
390 | | class StructColumnReader : public ParquetColumnReader { |
391 | | ENABLE_FACTORY_CREATOR(StructColumnReader) |
392 | | public: |
393 | | StructColumnReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
394 | | io::IOContext* io_ctx) |
395 | 10 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) {} |
396 | 10 | ~StructColumnReader() override { close(); } |
397 | | |
398 | | Status init( |
399 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>>&& child_readers, |
400 | | FieldSchema* field); |
401 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
402 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
403 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
404 | | bool is_dict_filter, int64_t real_column_size = -1) override; |
405 | | |
406 | 18 | const std::vector<level_t>& get_rep_level() const override { |
407 | 18 | if (!_read_column_names.empty()) { |
408 | | // can't use _child_readers[*_read_column_names.begin()] |
409 | | // because the operator[] of std::unordered_map is not const :( |
410 | | /* |
411 | | * Considering the issue in the `_read_nested_column` function where data may span across pages, leading |
412 | | * to missing definition and repetition levels, when filling the null_map of the struct later, it is |
413 | | * crucial to use the definition and repetition levels from the first read column, |
414 | | * that is `_read_column_names.front()`. |
415 | | */ |
416 | 18 | return _child_readers.find(_read_column_names.front())->second->get_rep_level(); |
417 | 18 | } |
418 | 0 | return _child_readers.begin()->second->get_rep_level(); |
419 | 18 | } |
420 | | |
421 | 18 | const std::vector<level_t>& get_def_level() const override { |
422 | 18 | if (!_read_column_names.empty()) { |
423 | 18 | return _child_readers.find(_read_column_names.front())->second->get_def_level(); |
424 | 18 | } |
425 | 0 | return _child_readers.begin()->second->get_def_level(); |
426 | 18 | } |
427 | | |
428 | 10 | ColumnStatistics column_statistics() override { |
429 | 10 | ColumnStatistics st; |
430 | 16 | for (const auto& column_name : _read_column_names) { |
431 | 16 | auto reader = _child_readers.find(column_name); |
432 | 16 | if (reader != _child_readers.end()) { |
433 | 16 | ColumnStatistics cst = reader->second->column_statistics(); |
434 | 16 | st.merge(cst); |
435 | 16 | } |
436 | 16 | } |
437 | 10 | return st; |
438 | 10 | } |
439 | | |
440 | 10 | void close() override {} |
441 | | |
442 | 10 | void reset_filter_map_index() override { |
443 | 24 | for (const auto& reader : _child_readers) { |
444 | 24 | reader.second->reset_filter_map_index(); |
445 | 24 | } |
446 | 10 | } |
447 | | |
448 | | private: |
449 | | std::unordered_map<std::string, std::unique_ptr<ParquetColumnReader>> _child_readers; |
450 | | std::vector<std::string> _read_column_names; |
451 | | //Need to use vector instead of set,see `get_rep_level()` for the reason. |
452 | | }; |
453 | | |
454 | | // A special reader that skips actual reading but provides empty data with correct structure |
455 | | // This is used when a column is not needed but its structure is required (e.g., for map keys) |
456 | | class SkipReadingReader : public ParquetColumnReader { |
457 | | public: |
458 | | SkipReadingReader(const RowRanges& row_ranges, size_t total_rows, const cctz::time_zone* ctz, |
459 | | io::IOContext* io_ctx, FieldSchema* field_schema) |
460 | 8 | : ParquetColumnReader(row_ranges, total_rows, ctz, io_ctx) { |
461 | 8 | _field_schema = field_schema; // Use inherited member from base class |
462 | 8 | VLOG_DEBUG << "[ParquetReader] Created SkipReadingReader for field: " |
463 | 0 | << _field_schema->name; |
464 | 8 | } |
465 | | |
466 | | Status read_column_data(ColumnPtr& doris_column, const DataTypePtr& type, |
467 | | const std::shared_ptr<TableSchemaChangeHelper::Node>& root_node, |
468 | | FilterMap& filter_map, size_t batch_size, size_t* read_rows, bool* eof, |
469 | 8 | bool is_dict_filter, int64_t real_column_size = -1) override { |
470 | 8 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader::read_column_data for field: " |
471 | 0 | << _field_schema->name << ", batch_size: " << batch_size; |
472 | 8 | DCHECK(real_column_size >= 0); // real_column_size for filtered column size. |
473 | | |
474 | | // Simulate reading without actually reading data |
475 | | // Fill with default/null values based on column type |
476 | 8 | MutableColumnPtr data_column = doris_column->assume_mutable(); |
477 | | |
478 | 8 | if (real_column_size > 0) { |
479 | 8 | if (doris_column->is_nullable()) { |
480 | 8 | auto* nullable_column = static_cast<ColumnNullable*>(data_column.get()); |
481 | 8 | nullable_column->insert_many_defaults(real_column_size); |
482 | 8 | } else { |
483 | | // For non-nullable columns, insert appropriate default values |
484 | 0 | for (size_t i = 0; i < real_column_size; ++i) { |
485 | 0 | data_column->insert_default(); |
486 | 0 | } |
487 | 0 | } |
488 | 8 | } |
489 | | |
490 | 8 | *read_rows = batch_size; // Indicate we "read" batch_size rows |
491 | 8 | *eof = false; // We can always provide more empty data |
492 | | |
493 | 8 | VLOG_DEBUG << "[ParquetReader] SkipReadingReader generated " << batch_size |
494 | 0 | << " default values for field: " << _field_schema->name; |
495 | | |
496 | 8 | return Status::OK(); |
497 | 8 | } |
498 | | |
499 | | static std::unique_ptr<SkipReadingReader> create_unique(const RowRanges& row_ranges, |
500 | | size_t total_rows, cctz::time_zone* ctz, |
501 | | io::IOContext* io_ctx, |
502 | 0 | FieldSchema* field_schema) { |
503 | 0 | return std::make_unique<SkipReadingReader>(row_ranges, total_rows, ctz, io_ctx, |
504 | 0 | field_schema); |
505 | 0 | } |
506 | | |
507 | | // These methods should not be called for SkipReadingReader |
508 | | // If they are called, it indicates a logic error in the code |
509 | 0 | const std::vector<level_t>& get_rep_level() const override { |
510 | 0 | LOG(FATAL) << "get_rep_level() should not be called on SkipReadingReader for field: " |
511 | 0 | << _field_schema->name |
512 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
513 | 0 | "column."; |
514 | 0 | __builtin_unreachable(); |
515 | 0 | } |
516 | | |
517 | 0 | const std::vector<level_t>& get_def_level() const override { |
518 | 0 | LOG(FATAL) << "get_def_level() should not be called on SkipReadingReader for field: " |
519 | 0 | << _field_schema->name |
520 | 0 | << ". This indicates the SkipReadingReader was incorrectly used as a reference " |
521 | 0 | "column."; |
522 | 0 | __builtin_unreachable(); |
523 | 0 | } |
524 | | |
525 | | // Implement required pure virtual methods from base class |
526 | 0 | ColumnStatistics column_statistics() override { |
527 | 0 | return ColumnStatistics(); // Return empty statistics |
528 | 0 | } |
529 | | |
530 | 0 | void close() override { |
531 | | // Nothing to close for skip reading |
532 | 0 | } |
533 | | |
534 | 8 | void reset_filter_map_index() override { _filter_map_index = 0; } |
535 | | }; |
536 | | |
537 | | #include "common/compile_check_end.h" |
538 | | |
539 | | }; // namespace doris |