be/src/format_v2/parquet/parquet_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/parquet/parquet_reader.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <map> |
22 | | #include <memory> |
23 | | #include <optional> |
24 | | #include <ranges> |
25 | | #include <unordered_set> |
26 | | #include <utility> |
27 | | #include <vector> |
28 | | |
29 | | #include "core/assert_cast.h" |
30 | | #include "core/block/block.h" |
31 | | #include "core/data_type/data_type_array.h" |
32 | | #include "core/data_type/data_type_factory.hpp" |
33 | | #include "core/data_type/data_type_map.h" |
34 | | #include "core/data_type/data_type_nullable.h" |
35 | | #include "core/data_type/data_type_struct.h" |
36 | | #include "format_v2/column_mapper.h" |
37 | | #include "format_v2/parquet/parquet_column_schema.h" |
38 | | #include "format_v2/parquet/parquet_file_context.h" |
39 | | #include "format_v2/parquet/parquet_scan.h" |
40 | | #include "format_v2/parquet/parquet_statistics.h" |
41 | | #include "format_v2/parquet/reader/column_reader.h" |
42 | | #include "runtime/runtime_state.h" |
43 | | |
44 | | namespace doris::format::parquet { |
45 | | |
46 | | struct ParquetReaderScanState { |
47 | | ParquetFileContext file_context; |
48 | | std::vector<std::unique_ptr<ParquetColumnSchema>> file_schema; |
49 | | RowGroupScanPlan scan_plan; |
50 | | ParquetScanScheduler scheduler; |
51 | | const cctz::time_zone* timezone = nullptr; |
52 | | bool enable_bloom_filter = false; |
53 | | bool enable_page_cache = false; |
54 | | bool enable_strict_mode = false; |
55 | | }; |
56 | | |
57 | 155 | int64_t column_chunk_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) { |
58 | 155 | return column_metadata.has_dictionary_page() |
59 | 155 | ? cast_set<int64_t>(column_metadata.dictionary_page_offset()) |
60 | 155 | : cast_set<int64_t>(column_metadata.data_page_offset()); |
61 | 155 | } |
62 | | |
63 | | void collect_all_leaf_column_ids(const ParquetColumnSchema& column_schema, |
64 | 142 | std::unordered_set<int>* leaf_column_ids) { |
65 | 142 | DORIS_CHECK(leaf_column_ids != nullptr); |
66 | 142 | if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { |
67 | 133 | if (column_schema.leaf_column_id >= 0) { |
68 | 133 | leaf_column_ids->insert(column_schema.leaf_column_id); |
69 | 133 | } |
70 | 133 | return; |
71 | 133 | } |
72 | 13 | for (const auto& child : column_schema.children) { |
73 | 13 | DORIS_CHECK(child != nullptr); |
74 | 13 | collect_all_leaf_column_ids(*child, leaf_column_ids); |
75 | 13 | } |
76 | 9 | } |
77 | | |
78 | | void collect_projected_leaf_column_ids(const ParquetColumnSchema& column_schema, |
79 | | const format::LocalColumnIndex& projection, |
80 | 135 | std::unordered_set<int>* leaf_column_ids) { |
81 | 135 | DORIS_CHECK(leaf_column_ids != nullptr); |
82 | 135 | if (projection.project_all_children || projection.children.empty()) { |
83 | 129 | collect_all_leaf_column_ids(column_schema, leaf_column_ids); |
84 | 129 | return; |
85 | 129 | } |
86 | 7 | for (const auto& child_projection : projection.children) { |
87 | 7 | const auto child_it = |
88 | 11 | std::ranges::find_if(column_schema.children, [&](const auto& child_schema) { |
89 | 11 | return child_schema->local_id == child_projection.local_id(); |
90 | 11 | }); |
91 | 7 | DORIS_CHECK(child_it != column_schema.children.end()); |
92 | 7 | collect_projected_leaf_column_ids(**child_it, child_projection, leaf_column_ids); |
93 | 7 | } |
94 | 6 | } |
95 | | |
96 | | void collect_request_leaf_column_ids( |
97 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
98 | 104 | const format::FileScanRequest& request, std::unordered_set<int>* leaf_column_ids) { |
99 | 104 | DORIS_CHECK(leaf_column_ids != nullptr); |
100 | 153 | auto collect_scan_column = [&](const format::LocalColumnIndex& projection) { |
101 | 153 | const auto local_id = projection.local_id(); |
102 | 153 | if (local_id == format::ROW_POSITION_COLUMN_ID || |
103 | 153 | local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
104 | 25 | return; |
105 | 25 | } |
106 | 128 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
107 | 128 | DORIS_CHECK(file_schema[local_id] != nullptr); |
108 | 128 | collect_projected_leaf_column_ids(*file_schema[local_id], projection, leaf_column_ids); |
109 | 128 | }; |
110 | 104 | for (const auto& column : request.predicate_columns) { |
111 | 42 | collect_scan_column(column); |
112 | 42 | } |
113 | 111 | for (const auto& column : request.non_predicate_columns) { |
114 | 111 | collect_scan_column(column); |
115 | 111 | } |
116 | 104 | } |
117 | | |
118 | | std::vector<ParquetPageCacheRange> build_page_cache_ranges( |
119 | | const ::parquet::FileMetaData& metadata, |
120 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
121 | 104 | const format::FileScanRequest& request, const RowGroupScanPlan& row_group_plan) { |
122 | 104 | std::unordered_set<int> leaf_column_ids; |
123 | 104 | collect_request_leaf_column_ids(file_schema, request, &leaf_column_ids); |
124 | 104 | std::vector<ParquetPageCacheRange> ranges; |
125 | 104 | ranges.reserve(row_group_plan.row_groups.size() * leaf_column_ids.size()); |
126 | 125 | for (const auto& row_group_plan_item : row_group_plan.row_groups) { |
127 | 125 | auto row_group_metadata = metadata.RowGroup(row_group_plan_item.row_group_id); |
128 | 125 | DORIS_CHECK(row_group_metadata != nullptr); |
129 | 155 | for (const auto leaf_column_id : leaf_column_ids) { |
130 | 155 | DORIS_CHECK(leaf_column_id >= 0 && leaf_column_id < row_group_metadata->num_columns()); |
131 | 155 | auto column_metadata = row_group_metadata->ColumnChunk(leaf_column_id); |
132 | 155 | DORIS_CHECK(column_metadata != nullptr); |
133 | 155 | const int64_t offset = column_chunk_start_offset(*column_metadata); |
134 | 155 | const int64_t size = column_metadata->total_compressed_size(); |
135 | 155 | DORIS_CHECK(offset >= 0); |
136 | 155 | DORIS_CHECK(size >= 0); |
137 | 155 | if (size > 0) { |
138 | 155 | ranges.push_back(ParquetPageCacheRange {.offset = offset, .size = size}); |
139 | 155 | } |
140 | 155 | } |
141 | 125 | } |
142 | 104 | return ranges; |
143 | 104 | } |
144 | | |
145 | | const ParquetColumnSchema& projected_root_schema( |
146 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
147 | 3 | const format::LocalColumnIndex& projection) { |
148 | 3 | const auto local_id = projection.local_id(); |
149 | 3 | DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size())); |
150 | 3 | DORIS_CHECK(file_schema[local_id] != nullptr); |
151 | 3 | return *file_schema[local_id]; |
152 | 3 | } |
153 | | |
154 | | int64_t count_loaded_non_null_values(const ParquetColumnSchema& root_schema, |
155 | | const ParquetColumnReader& shape_reader, |
156 | 3 | int64_t expected_rows) { |
157 | 3 | const auto& def_levels = shape_reader.nested_definition_levels(); |
158 | 3 | const auto& rep_levels = shape_reader.nested_repetition_levels(); |
159 | 3 | const int64_t levels_written = shape_reader.nested_levels_written(); |
160 | 3 | DORIS_CHECK(levels_written >= expected_rows); |
161 | 3 | if (root_schema.max_repetition_level == 0) { |
162 | 1 | DORIS_CHECK(levels_written == expected_rows); |
163 | 1 | const int16_t non_null_definition_level = root_schema.nullable_definition_level; |
164 | 1 | int64_t count = 0; |
165 | 6 | for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) { |
166 | 5 | count += def_levels[level_idx] >= non_null_definition_level ? 1 : 0; |
167 | 5 | } |
168 | 1 | return count; |
169 | 1 | } |
170 | | |
171 | | // For repeated encodings, one top-level row starts when the leaf repetition level moves above |
172 | | // no higher than the top-level container's repeated boundary. Empty MAP/LIST rows have no |
173 | | // entries but still carry a level slot; they are non-NULL and must be counted by count(col). |
174 | 2 | const int16_t non_null_definition_level = |
175 | 2 | static_cast<int16_t>(root_schema.definition_level - 1); |
176 | 2 | int64_t counted_rows = 0; |
177 | 2 | int64_t non_null_rows = 0; |
178 | 13 | for (int64_t level_idx = 0; level_idx < levels_written && counted_rows < expected_rows; |
179 | 11 | ++level_idx) { |
180 | 11 | if (rep_levels[level_idx] >= root_schema.repetition_level) { |
181 | 1 | continue; |
182 | 1 | } |
183 | 10 | ++counted_rows; |
184 | 10 | non_null_rows += def_levels[level_idx] >= non_null_definition_level ? 1 : 0; |
185 | 10 | } |
186 | 2 | DORIS_CHECK(counted_rows == expected_rows); |
187 | 2 | return non_null_rows; |
188 | 3 | } |
189 | | |
190 | 0 | DataTypePtr nullable_like_original(const DataTypePtr& type, DataTypePtr nested_type) { |
191 | 0 | return type != nullptr && type->is_nullable() ? make_nullable(nested_type) : nested_type; |
192 | 0 | } |
193 | | |
194 | 1 | int timestamp_tz_scale(const ParquetTypeDescriptor& type_descriptor) { |
195 | 1 | switch (type_descriptor.time_unit) { |
196 | 0 | case ParquetTimeUnit::MILLIS: |
197 | 0 | return 3; |
198 | 0 | case ParquetTimeUnit::MICROS: |
199 | 1 | case ParquetTimeUnit::UNKNOWN: |
200 | 1 | default: |
201 | 1 | return 6; |
202 | 1 | } |
203 | 1 | } |
204 | | |
205 | 1 | bool should_map_to_timestamp_tz(const ParquetColumnSchema& column_schema) { |
206 | 1 | const auto& type_descriptor = column_schema.type_descriptor; |
207 | 1 | return type_descriptor.physical_type == ::parquet::Type::INT96 || |
208 | 1 | (type_descriptor.is_timestamp && type_descriptor.timestamp_is_adjusted_to_utc); |
209 | 1 | } |
210 | | |
211 | 1 | DataTypePtr apply_timestamp_tz_mapping(ParquetColumnSchema* column_schema) { |
212 | 1 | DORIS_CHECK(column_schema != nullptr); |
213 | 1 | if (column_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) { |
214 | 1 | if (should_map_to_timestamp_tz(*column_schema)) { |
215 | 1 | const bool nullable = |
216 | 1 | column_schema->type != nullptr && column_schema->type->is_nullable(); |
217 | 1 | const auto scale = timestamp_tz_scale(column_schema->type_descriptor); |
218 | 1 | column_schema->type = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, |
219 | 1 | nullable, 0, scale); |
220 | 1 | column_schema->type_descriptor.doris_type = column_schema->type; |
221 | 1 | } |
222 | 1 | return column_schema->type; |
223 | 1 | } |
224 | | |
225 | 0 | std::vector<DataTypePtr> child_types; |
226 | 0 | child_types.reserve(column_schema->children.size()); |
227 | 0 | for (auto& child : column_schema->children) { |
228 | 0 | child_types.push_back(apply_timestamp_tz_mapping(child.get())); |
229 | 0 | } |
230 | |
|
231 | 0 | if (column_schema->kind == ParquetColumnSchemaKind::LIST) { |
232 | 0 | DORIS_CHECK(child_types.size() == 1); |
233 | 0 | column_schema->type = nullable_like_original( |
234 | 0 | column_schema->type, std::make_shared<DataTypeArray>(child_types[0])); |
235 | 0 | } else if (column_schema->kind == ParquetColumnSchemaKind::MAP) { |
236 | 0 | DORIS_CHECK(child_types.size() == 2); |
237 | 0 | column_schema->type = nullable_like_original( |
238 | 0 | column_schema->type, std::make_shared<DataTypeMap>(make_nullable(child_types[0]), |
239 | 0 | make_nullable(child_types[1]))); |
240 | 0 | } else if (column_schema->kind == ParquetColumnSchemaKind::STRUCT) { |
241 | 0 | Strings child_names; |
242 | 0 | child_names.reserve(column_schema->children.size()); |
243 | 0 | for (const auto& child : column_schema->children) { |
244 | 0 | child_names.push_back(child->name); |
245 | 0 | } |
246 | 0 | column_schema->type = nullable_like_original( |
247 | 0 | column_schema->type, std::make_shared<DataTypeStruct>(child_types, child_names)); |
248 | 0 | } |
249 | 0 | return column_schema->type; |
250 | 1 | } |
251 | | |
252 | | static Status find_projected_minmax_leaf(const ParquetColumnSchema& column_schema, |
253 | | const format::LocalColumnIndex& projection, |
254 | 14 | const ParquetColumnSchema** leaf_schema) { |
255 | 14 | DORIS_CHECK(leaf_schema != nullptr); |
256 | 14 | if (projection.project_all_children || projection.children.empty()) { |
257 | 12 | if (column_schema.leaf_column_id < 0) { |
258 | 2 | return Status::NotSupported( |
259 | 2 | "Parquet aggregate pushdown only supports primitive column {}", |
260 | 2 | column_schema.name); |
261 | 2 | } |
262 | 10 | if (column_schema.max_repetition_level > 0) { |
263 | 0 | return Status::NotSupported( |
264 | 0 | "Parquet aggregate pushdown does not support repeated column {}", |
265 | 0 | column_schema.name); |
266 | 0 | } |
267 | 10 | *leaf_schema = &column_schema; |
268 | 10 | return Status::OK(); |
269 | 10 | } |
270 | 2 | if (projection.children.size() != 1) { |
271 | 0 | return Status::NotSupported( |
272 | 0 | "Parquet aggregate pushdown only supports a single nested leaf under column {}", |
273 | 0 | column_schema.name); |
274 | 0 | } |
275 | 2 | const auto& child_projection = projection.children[0]; |
276 | 2 | const auto child_schema_it = |
277 | 2 | std::ranges::find_if(column_schema.children, [&](const auto& child_schema) { |
278 | 2 | return child_schema->local_id == child_projection.local_id(); |
279 | 2 | }); |
280 | 2 | if (child_schema_it != column_schema.children.end()) { |
281 | 2 | return find_projected_minmax_leaf(**child_schema_it, child_projection, leaf_schema); |
282 | 2 | } |
283 | 0 | return Status::InvalidArgument("Invalid parquet aggregate projection local id {} for column {}", |
284 | 0 | child_projection.local_id(), column_schema.name); |
285 | 2 | } |
286 | | |
287 | | void ParquetReader::_fill_column_definition(const ParquetColumnSchema& column_schema, |
288 | 244 | format::ColumnDefinition* field) const { |
289 | 244 | if (column_schema.parquet_field_id >= 0) { |
290 | 95 | field->identifier = Field::create_field<TYPE_INT>(column_schema.parquet_field_id); |
291 | 149 | } else { |
292 | 149 | field->identifier = Field::create_field<TYPE_STRING>(column_schema.name); |
293 | 149 | } |
294 | 244 | field->local_id = column_schema.local_id; |
295 | 244 | field->name = column_schema.name; |
296 | 244 | field->type = column_schema.type != nullptr && !column_schema.type->is_nullable() |
297 | 244 | ? make_nullable(column_schema.type) |
298 | 244 | : column_schema.type; |
299 | 244 | field->children.clear(); |
300 | 244 | field->children.reserve(column_schema.children.size()); |
301 | 244 | for (const auto& child : column_schema.children) { |
302 | 26 | format::ColumnDefinition child_field; |
303 | 26 | _fill_column_definition(*child, &child_field); |
304 | 26 | field->children.push_back(std::move(child_field)); |
305 | 26 | } |
306 | 244 | } |
307 | | |
308 | | ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties, |
309 | | std::unique_ptr<io::FileDescription>& file_description, |
310 | | std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile, |
311 | | std::optional<format::GlobalRowIdContext> global_rowid_context, |
312 | | bool enable_mapping_timestamp_tz) |
313 | 109 | : FileReader(system_properties, file_description, io_ctx, profile), |
314 | 109 | _global_rowid_context(global_rowid_context), |
315 | 109 | _enable_mapping_timestamp_tz(enable_mapping_timestamp_tz) {} |
316 | | |
317 | 109 | ParquetReader::~ParquetReader() = default; |
318 | | |
319 | 108 | Status ParquetReader::init(RuntimeState* state) { |
320 | 108 | RETURN_IF_ERROR(format::FileReader::init(state)); |
321 | 108 | if (_profile != nullptr) { |
322 | 28 | COUNTER_UPDATE(_parquet_profile.file_reader_create_time, |
323 | 28 | _reader_statistics.file_reader_create_time); |
324 | 28 | COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num); |
325 | 28 | } |
326 | 108 | _state = std::make_unique<ParquetReaderScanState>(); |
327 | 108 | _state->enable_bloom_filter = |
328 | 108 | state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter; |
329 | 108 | _state->enable_page_cache = |
330 | 108 | state != nullptr && state->query_options().enable_parquet_file_page_cache; |
331 | 108 | if (state != nullptr) { |
332 | 108 | _state->timezone = &state->timezone_obj(); |
333 | 108 | _state->enable_strict_mode = state->enable_strict_mode(); |
334 | 108 | _state->scheduler.set_timezone(&state->timezone_obj()); |
335 | 108 | _state->scheduler.set_enable_strict_mode(_state->enable_strict_mode); |
336 | 108 | } |
337 | 108 | int64_t merge_read_slice_size = -1; |
338 | 108 | if (state != nullptr && state->query_options().__isset.merge_read_slice_size) { |
339 | 108 | merge_read_slice_size = state->query_options().merge_read_slice_size; |
340 | 108 | } |
341 | 108 | _state->scheduler.set_merge_read_options(_profile, merge_read_slice_size); |
342 | 108 | _state->scheduler.set_batch_size(_batch_size); |
343 | | // Open parquet file and parse metadata to get file schema. |
344 | 108 | RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get(), |
345 | 108 | _state->enable_page_cache, *_file_description)); |
346 | | // Build file schema from parquet metadata. |
347 | | // A file reader may expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier |
348 | 108 | RETURN_IF_ERROR( |
349 | 108 | build_parquet_column_schema(*_state->file_context.schema, &_state->file_schema)); |
350 | 108 | if (_enable_mapping_timestamp_tz) { |
351 | 1 | for (auto& column_schema : _state->file_schema) { |
352 | 1 | apply_timestamp_tz_mapping(column_schema.get()); |
353 | 1 | } |
354 | 1 | } |
355 | 108 | return Status::OK(); |
356 | 108 | } |
357 | | |
358 | 1 | void ParquetReader::set_batch_size(size_t batch_size) { |
359 | 1 | _batch_size = std::max<size_t>(1, batch_size); |
360 | 1 | if (_state != nullptr) { |
361 | 0 | _state->scheduler.set_batch_size(_batch_size); |
362 | 0 | } |
363 | 1 | } |
364 | | |
365 | 100 | Status ParquetReader::get_schema(std::vector<format::ColumnDefinition>* file_schema) const { |
366 | 100 | if (file_schema == nullptr) { |
367 | 0 | return Status::InvalidArgument("file_schema is null"); |
368 | 0 | } |
369 | 100 | file_schema->clear(); |
370 | 100 | if (_state == nullptr || _state->file_context.schema == nullptr) { |
371 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
372 | 0 | } |
373 | | |
374 | 100 | file_schema->reserve(_state->file_schema.size()); |
375 | 318 | for (size_t column_idx = 0; column_idx < _state->file_schema.size(); ++column_idx) { |
376 | 218 | format::ColumnDefinition field; |
377 | 218 | _fill_column_definition(*_state->file_schema[column_idx], &field); |
378 | 218 | DORIS_CHECK(field.local_id == static_cast<int32_t>(column_idx)); |
379 | 218 | file_schema->push_back(std::move(field)); |
380 | 218 | } |
381 | 100 | if (_global_rowid_context.has_value()) { |
382 | 2 | file_schema->push_back(format::global_rowid_column_definition()); |
383 | 2 | } |
384 | 100 | return Status::OK(); |
385 | 100 | } |
386 | | |
387 | | std::unique_ptr<format::TableColumnMapper> ParquetReader::create_column_mapper( |
388 | 58 | format::TableColumnMapperOptions options) const { |
389 | 58 | return std::make_unique<format::ParquetColumnMapper>(std::move(options)); |
390 | 58 | } |
391 | | |
392 | 104 | Status ParquetReader::open(std::shared_ptr<format::FileScanRequest> request) { |
393 | 104 | if (_state == nullptr || _state->file_context.metadata == nullptr || |
394 | 104 | _state->file_context.schema == nullptr) { |
395 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
396 | 0 | } |
397 | 104 | auto request_snapshot = request; |
398 | 104 | DORIS_CHECK(request_snapshot != nullptr); |
399 | 104 | RETURN_IF_ERROR(format::FileReader::open(std::move(request))); |
400 | | |
401 | 104 | const int num_fields = static_cast<int>(_state->file_schema.size()); |
402 | 104 | for (const auto& column_filter : request_snapshot->column_predicate_filters) { |
403 | 17 | const auto file_column_id = column_filter.file_column_id; |
404 | 17 | if (!file_column_id.is_valid() || file_column_id.value() >= num_fields) { |
405 | 0 | return Status::InvalidArgument("Invalid parquet filter top-level local id {}", |
406 | 0 | file_column_id.value()); |
407 | 0 | } |
408 | 17 | } |
409 | | |
410 | | // `local_positions.empty()` means all columns are needed by table reader |
411 | | // TODO(gabriel): It will happen only for TVF `select *` query. |
412 | 104 | if (request_snapshot->local_positions.empty()) { |
413 | 32 | for (const auto& col : request_snapshot->predicate_columns) { |
414 | 9 | request_snapshot->local_positions.emplace(col.column_id(), |
415 | 9 | format::LocalIndex(col.column_id().value())); |
416 | 9 | } |
417 | 32 | for (const auto& col : request_snapshot->non_predicate_columns) { |
418 | 17 | request_snapshot->local_positions.emplace(col.column_id(), |
419 | 17 | format::LocalIndex(col.column_id().value())); |
420 | 17 | } |
421 | 32 | } |
422 | | |
423 | 104 | for (const auto& col : request_snapshot->predicate_columns) { |
424 | 42 | DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0); |
425 | 42 | const auto local_id = col.local_id(); |
426 | 42 | if (local_id == format::ROW_POSITION_COLUMN_ID || |
427 | 42 | local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
428 | 11 | continue; |
429 | 11 | } |
430 | 31 | DORIS_CHECK(local_id >= 0 && local_id < num_fields); |
431 | 31 | } |
432 | 111 | for (const auto& col : request_snapshot->non_predicate_columns) { |
433 | 111 | DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0); |
434 | 111 | const auto local_id = col.local_id(); |
435 | 111 | if (local_id == format::ROW_POSITION_COLUMN_ID || |
436 | 111 | local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
437 | 14 | continue; |
438 | 14 | } |
439 | 97 | DORIS_CHECK(local_id >= 0 && local_id < num_fields); |
440 | 97 | } |
441 | | |
442 | 104 | RowGroupScanPlan row_group_plan; |
443 | 104 | ParquetScanRange scan_range; |
444 | 104 | scan_range.start_offset = _file_description->range_start_offset; |
445 | 104 | scan_range.size = _file_description->range_size; |
446 | 104 | scan_range.file_size = _file_description->file_size; |
447 | | // Get selected ranges in row groups according to metadata (Row-Group level index and Page Index including Zonemap, Dictionary, Bloom Filter). |
448 | 104 | RETURN_IF_ERROR(plan_parquet_row_groups( |
449 | 104 | *_state->file_context.metadata, _state->file_context.file_reader.get(), |
450 | 104 | _state->file_schema, *request_snapshot, scan_range, _state->enable_bloom_filter, |
451 | 104 | &row_group_plan, _state->timezone)); |
452 | 104 | if (_profile != nullptr) { |
453 | 28 | _parquet_profile.update_pruning_stats(row_group_plan.pruning_stats); |
454 | 28 | } |
455 | 104 | if (_state->enable_page_cache) { |
456 | 104 | _state->file_context.register_page_cache_ranges( |
457 | 104 | build_page_cache_ranges(*_state->file_context.metadata, _state->file_schema, |
458 | 104 | *request_snapshot, row_group_plan)); |
459 | 104 | } |
460 | 104 | _state->scan_plan = row_group_plan; |
461 | 104 | _state->scheduler.set_page_skip_profile(_parquet_profile.page_skip_profile()); |
462 | 104 | _state->scheduler.set_global_rowid_context(_global_rowid_context); |
463 | 104 | _state->scheduler.set_scan_profile(_parquet_profile.scan_profile()); |
464 | 104 | _state->scheduler.set_plan(std::move(row_group_plan)); |
465 | 104 | _eof = _state->scheduler.empty(); |
466 | 104 | return Status::OK(); |
467 | 104 | } |
468 | | |
469 | 142 | Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { |
470 | 142 | if (_state == nullptr || _state->file_context.file_reader == nullptr || |
471 | 142 | _state->file_context.schema == nullptr) { |
472 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
473 | 0 | } |
474 | 142 | *rows = 0; |
475 | 142 | if (_eof) { |
476 | 1 | *eof = true; |
477 | 1 | return Status::OK(); |
478 | 1 | } |
479 | 141 | auto request_snapshot = _request; |
480 | 141 | if (request_snapshot == nullptr) { |
481 | 0 | return Status::Cancelled("ParquetReader is closed"); |
482 | 0 | } |
483 | | |
484 | 141 | const auto predicate_filtered_rows_before = _state->scheduler.predicate_filtered_rows(); |
485 | 141 | RETURN_IF_ERROR(_state->scheduler.read_next_batch(_state->file_context, _state->file_schema, |
486 | 141 | *request_snapshot, file_block, rows, eof)); |
487 | 141 | _sync_page_cache_profile(); |
488 | 141 | if (_io_ctx != nullptr) { |
489 | 36 | _io_ctx->predicate_filtered_rows += |
490 | 36 | _state->scheduler.predicate_filtered_rows() - predicate_filtered_rows_before; |
491 | 36 | } |
492 | 141 | _eof = *eof; |
493 | 141 | return Status::OK(); |
494 | 141 | } |
495 | | |
496 | 206 | void ParquetReader::_sync_page_cache_profile() { |
497 | 206 | if (_profile == nullptr || _state == nullptr) { |
498 | 142 | return; |
499 | 142 | } |
500 | 64 | const auto stats = _state->file_context.page_cache_stats(); |
501 | 64 | COUNTER_UPDATE(_parquet_profile.page_read_counter, |
502 | 64 | stats.read_count - _reported_page_cache_stats.read_count); |
503 | 64 | COUNTER_UPDATE(_parquet_profile.page_cache_write_counter, |
504 | 64 | stats.write_count - _reported_page_cache_stats.write_count); |
505 | 64 | COUNTER_UPDATE( |
506 | 64 | _parquet_profile.page_cache_compressed_write_counter, |
507 | 64 | stats.compressed_write_count - _reported_page_cache_stats.compressed_write_count); |
508 | 64 | COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter, |
509 | 64 | stats.hit_count - _reported_page_cache_stats.hit_count); |
510 | 64 | COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter, |
511 | 64 | stats.miss_count - _reported_page_cache_stats.miss_count); |
512 | 64 | COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter, |
513 | 64 | stats.compressed_hit_count - _reported_page_cache_stats.compressed_hit_count); |
514 | 64 | _reported_page_cache_stats = stats; |
515 | 64 | } |
516 | | |
517 | 2 | void ParquetReader::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) { |
518 | 2 | if (_state == nullptr) { |
519 | 0 | return; |
520 | 0 | } |
521 | 2 | _state->scheduler.set_condition_cache_context(std::move(ctx)); |
522 | 2 | if (_io_ctx != nullptr) { |
523 | | // Condition-cache HIT filters row ranges before batch reading, so skipped rows never belong |
524 | | // to a later get_block() batch. Report the plan-level skipped rows at the same point where |
525 | | // the scan plan is rewritten. |
526 | 1 | _io_ctx->condition_cache_filtered_rows += _state->scheduler.condition_cache_filtered_rows(); |
527 | 1 | } |
528 | 2 | } |
529 | | |
530 | 0 | int64_t ParquetReader::get_total_rows() const { |
531 | 0 | if (_state == nullptr) { |
532 | 0 | return 0; |
533 | 0 | } |
534 | 0 | int64_t rows = 0; |
535 | 0 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
536 | 0 | rows += row_group_plan.row_group_rows; |
537 | 0 | } |
538 | 0 | return rows; |
539 | 0 | } |
540 | | |
541 | | Status ParquetReader::get_aggregate_result(const format::FileAggregateRequest& request, |
542 | 19 | format::FileAggregateResult* result) { |
543 | 19 | DORIS_CHECK(result != nullptr); |
544 | 19 | if (_state == nullptr || _state->file_context.metadata == nullptr || |
545 | 19 | _state->file_context.schema == nullptr) { |
546 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
547 | 0 | } |
548 | 19 | result->count = 0; |
549 | 19 | result->columns.clear(); |
550 | 19 | if (request.agg_type != TPushAggOp::type::COUNT && |
551 | 19 | request.agg_type != TPushAggOp::type::MINMAX) { |
552 | 1 | return Status::NotSupported("Unsupported parquet aggregate pushdown type {}", |
553 | 1 | request.agg_type); |
554 | 1 | } |
555 | | |
556 | | // Aggregate row count in all selected row groups. For MIN/MAX aggregate, this is used to determine whether there is no row group selected. |
557 | 33 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
558 | 33 | auto row_group_metadata = |
559 | 33 | _state->file_context.metadata->RowGroup(row_group_plan.row_group_id); |
560 | 33 | DORIS_CHECK(row_group_metadata != nullptr); |
561 | 33 | result->count += row_group_metadata->num_rows(); |
562 | 33 | } |
563 | 18 | if (request.agg_type == TPushAggOp::type::COUNT) { |
564 | 7 | if (request.columns.empty()) { |
565 | 4 | return Status::OK(); |
566 | 4 | } |
567 | 3 | if (request.columns.size() != 1) { |
568 | 0 | return Status::NotSupported("Parquet COUNT pushdown only supports one count column"); |
569 | 0 | } |
570 | 3 | const auto& count_projection = request.columns[0].projection; |
571 | 3 | const auto& root_schema = projected_root_schema(_state->file_schema, count_projection); |
572 | 3 | result->count = 0; |
573 | 3 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
574 | 3 | std::shared_ptr<::parquet::RowGroupReader> row_group; |
575 | 3 | try { |
576 | 3 | row_group = _state->file_context.file_reader->RowGroup(row_group_plan.row_group_id); |
577 | 3 | } catch (const ::parquet::ParquetException& e) { |
578 | 0 | return Status::Corruption("Failed to open parquet row group {}: {}", |
579 | 0 | row_group_plan.row_group_id, e.what()); |
580 | 0 | } catch (const std::exception& e) { |
581 | 0 | return Status::InternalError("Failed to open parquet row group {}: {}", |
582 | 0 | row_group_plan.row_group_id, e.what()); |
583 | 0 | } |
584 | | |
585 | 3 | ParquetColumnReaderFactory column_reader_factory( |
586 | 3 | row_group, _state->file_context.schema->num_columns(), |
587 | 3 | &row_group_plan.page_skip_plans, _parquet_profile.page_skip_profile(), |
588 | 3 | _state->timezone, _state->enable_strict_mode, |
589 | 3 | _parquet_profile.scan_profile().column_reader_profile); |
590 | 3 | std::unique_ptr<ParquetColumnReader> shape_reader; |
591 | 3 | RETURN_IF_ERROR(column_reader_factory.create_count_shape_reader( |
592 | 3 | root_schema, &count_projection, &shape_reader)); |
593 | 3 | DORIS_CHECK(shape_reader != nullptr); |
594 | | |
595 | 3 | int64_t row_group_cursor = 0; |
596 | 3 | for (const auto& selected_range : row_group_plan.selected_ranges) { |
597 | 3 | DORIS_CHECK(selected_range.start >= row_group_cursor); |
598 | 3 | RETURN_IF_ERROR(shape_reader->skip(selected_range.start - row_group_cursor)); |
599 | 3 | row_group_cursor = selected_range.start; |
600 | | |
601 | 3 | int64_t range_rows_read = 0; |
602 | 6 | while (range_rows_read < selected_range.length) { |
603 | 3 | const int64_t batch_rows = |
604 | 3 | std::min<int64_t>(_batch_size, selected_range.length - range_rows_read); |
605 | | // COUNT(col) only needs the top-level NULL state. The shape reader loads |
606 | | // def/rep levels from one representative leaf and does not build value_indices |
607 | | // or values_column. MAP chooses the key leaf; ARRAY/STRUCT may choose a string |
608 | | // leaf, but the levels-only protocol still avoids Doris-side string |
609 | | // materialization for that leaf. |
610 | 3 | RETURN_IF_ERROR(shape_reader->load_nested_levels_batch(batch_rows)); |
611 | 3 | result->count += |
612 | 3 | count_loaded_non_null_values(root_schema, *shape_reader, batch_rows); |
613 | 3 | range_rows_read += batch_rows; |
614 | 3 | row_group_cursor += batch_rows; |
615 | 3 | } |
616 | 3 | } |
617 | 3 | } |
618 | 3 | return Status::OK(); |
619 | 3 | } |
620 | | |
621 | 11 | result->columns.resize(request.columns.size()); |
622 | 20 | for (size_t request_column_idx = 0; request_column_idx < request.columns.size(); |
623 | 13 | ++request_column_idx) { |
624 | 13 | const auto file_column_id = request.columns[request_column_idx].projection.local_id(); |
625 | 13 | if (file_column_id < 0 || |
626 | 13 | file_column_id >= static_cast<int32_t>(_state->file_schema.size())) { |
627 | 1 | return Status::InvalidArgument("Invalid parquet aggregate column id {}", |
628 | 1 | file_column_id); |
629 | 1 | } |
630 | 12 | const auto& column_schema = _state->file_schema[file_column_id]; |
631 | 12 | DORIS_CHECK(column_schema != nullptr); |
632 | 12 | const ParquetColumnSchema* leaf_schema = nullptr; |
633 | 12 | RETURN_IF_ERROR(find_projected_minmax_leaf( |
634 | 12 | *column_schema, request.columns[request_column_idx].projection, &leaf_schema)); |
635 | 10 | DORIS_CHECK(leaf_schema != nullptr); |
636 | | |
637 | 10 | auto& aggregate_column = result->columns[request_column_idx]; |
638 | 10 | aggregate_column.projection = request.columns[request_column_idx].projection; |
639 | 19 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
640 | 19 | auto row_group_metadata = |
641 | 19 | _state->file_context.metadata->RowGroup(row_group_plan.row_group_id); |
642 | 19 | DORIS_CHECK(row_group_metadata != nullptr); |
643 | 19 | auto column_chunk = row_group_metadata->ColumnChunk(leaf_schema->leaf_column_id); |
644 | 19 | DORIS_CHECK(column_chunk != nullptr); |
645 | 19 | const auto statistics = ParquetStatisticsUtils::TransformColumnStatistics( |
646 | 19 | *leaf_schema, column_chunk->statistics(), _state->timezone); |
647 | 19 | if (!statistics.has_min_max) { |
648 | 1 | return Status::NotSupported("Missing parquet min/max statistics for column {}", |
649 | 1 | leaf_schema->name); |
650 | 1 | } |
651 | 18 | if (!aggregate_column.has_min || statistics.min_value < aggregate_column.min_value) { |
652 | 9 | aggregate_column.min_value = statistics.min_value; |
653 | 9 | aggregate_column.has_min = true; |
654 | 9 | } |
655 | 18 | if (!aggregate_column.has_max || aggregate_column.max_value < statistics.max_value) { |
656 | 18 | aggregate_column.max_value = statistics.max_value; |
657 | 18 | aggregate_column.has_max = true; |
658 | 18 | } |
659 | 18 | } |
660 | 9 | if (!aggregate_column.has_min || !aggregate_column.has_max) { |
661 | 0 | return Status::NotSupported("No parquet row group selected for min/max pushdown"); |
662 | 0 | } |
663 | 9 | } |
664 | 7 | return Status::OK(); |
665 | 11 | } |
666 | | |
667 | 65 | Status ParquetReader::close() { |
668 | 65 | if (_state != nullptr) { |
669 | 65 | _sync_page_cache_profile(); |
670 | 65 | RETURN_IF_ERROR(_state->file_context.close()); |
671 | 65 | } |
672 | 65 | return FileReader::close(); |
673 | 65 | } |
674 | | |
675 | 108 | void ParquetReader::_init_profile() { |
676 | 108 | _parquet_profile.init(_profile); |
677 | 108 | } |
678 | | |
679 | | } // namespace doris::format::parquet |