be/src/format_v2/parquet/parquet_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format_v2/parquet/parquet_reader.h" |
19 | | |
20 | | #include <algorithm> |
21 | | #include <map> |
22 | | #include <memory> |
23 | | #include <optional> |
24 | | #include <ranges> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "core/assert_cast.h" |
29 | | #include "core/block/block.h" |
30 | | #include "core/data_type/data_type_array.h" |
31 | | #include "core/data_type/data_type_factory.hpp" |
32 | | #include "core/data_type/data_type_map.h" |
33 | | #include "core/data_type/data_type_nullable.h" |
34 | | #include "core/data_type/data_type_struct.h" |
35 | | #include "format_v2/column_mapper.h" |
36 | | #include "format_v2/parquet/parquet_column_schema.h" |
37 | | #include "format_v2/parquet/parquet_file_context.h" |
38 | | #include "format_v2/parquet/parquet_scan.h" |
39 | | #include "format_v2/parquet/parquet_statistics.h" |
40 | | #include "format_v2/parquet/reader/column_reader.h" |
41 | | #include "runtime/runtime_state.h" |
42 | | |
43 | | namespace doris::format::parquet { |
44 | | |
45 | | struct ParquetReaderScanState { |
46 | | ParquetFileContext file_context; |
47 | | std::vector<std::unique_ptr<ParquetColumnSchema>> file_schema; |
48 | | RowGroupScanPlan scan_plan; |
49 | | ParquetScanScheduler scheduler; |
50 | | const cctz::time_zone* timezone = nullptr; |
51 | | bool enable_bloom_filter = false; |
52 | | }; |
53 | | |
54 | 0 | DataTypePtr nullable_like_original(const DataTypePtr& type, DataTypePtr nested_type) { |
55 | 0 | return type != nullptr && type->is_nullable() ? make_nullable(nested_type) : nested_type; |
56 | 0 | } |
57 | | |
58 | 0 | int timestamp_tz_scale(const ParquetTypeDescriptor& type_descriptor) { |
59 | 0 | switch (type_descriptor.time_unit) { |
60 | 0 | case ParquetTimeUnit::MILLIS: |
61 | 0 | return 3; |
62 | 0 | case ParquetTimeUnit::MICROS: |
63 | 0 | case ParquetTimeUnit::UNKNOWN: |
64 | 0 | default: |
65 | 0 | return 6; |
66 | 0 | } |
67 | 0 | } |
68 | | |
69 | 1 | bool should_map_to_timestamp_tz(const ParquetColumnSchema& column_schema) { |
70 | 1 | const auto& type_descriptor = column_schema.type_descriptor; |
71 | 1 | return type_descriptor.is_timestamp && type_descriptor.timestamp_is_adjusted_to_utc; |
72 | 1 | } |
73 | | |
74 | 1 | DataTypePtr apply_timestamp_tz_mapping(ParquetColumnSchema* column_schema) { |
75 | 1 | DORIS_CHECK(column_schema != nullptr); |
76 | 1 | if (column_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) { |
77 | 1 | if (should_map_to_timestamp_tz(*column_schema)) { |
78 | 0 | const bool nullable = |
79 | 0 | column_schema->type != nullptr && column_schema->type->is_nullable(); |
80 | 0 | const auto scale = timestamp_tz_scale(column_schema->type_descriptor); |
81 | 0 | column_schema->type = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ, |
82 | 0 | nullable, 0, scale); |
83 | 0 | column_schema->type_descriptor.doris_type = column_schema->type; |
84 | 0 | } |
85 | 1 | return column_schema->type; |
86 | 1 | } |
87 | | |
88 | 0 | std::vector<DataTypePtr> child_types; |
89 | 0 | child_types.reserve(column_schema->children.size()); |
90 | 0 | for (auto& child : column_schema->children) { |
91 | 0 | child_types.push_back(apply_timestamp_tz_mapping(child.get())); |
92 | 0 | } |
93 | |
|
94 | 0 | if (column_schema->kind == ParquetColumnSchemaKind::LIST) { |
95 | 0 | DORIS_CHECK(child_types.size() == 1); |
96 | 0 | column_schema->type = nullable_like_original( |
97 | 0 | column_schema->type, std::make_shared<DataTypeArray>(child_types[0])); |
98 | 0 | } else if (column_schema->kind == ParquetColumnSchemaKind::MAP) { |
99 | 0 | DORIS_CHECK(child_types.size() == 2); |
100 | 0 | column_schema->type = nullable_like_original( |
101 | 0 | column_schema->type, std::make_shared<DataTypeMap>(make_nullable(child_types[0]), |
102 | 0 | make_nullable(child_types[1]))); |
103 | 0 | } else if (column_schema->kind == ParquetColumnSchemaKind::STRUCT) { |
104 | 0 | Strings child_names; |
105 | 0 | child_names.reserve(column_schema->children.size()); |
106 | 0 | for (const auto& child : column_schema->children) { |
107 | 0 | child_names.push_back(child->name); |
108 | 0 | } |
109 | 0 | column_schema->type = nullable_like_original( |
110 | 0 | column_schema->type, std::make_shared<DataTypeStruct>(child_types, child_names)); |
111 | 0 | } |
112 | 0 | return column_schema->type; |
113 | 1 | } |
114 | | |
115 | | static Status find_projected_minmax_leaf(const ParquetColumnSchema& column_schema, |
116 | | const format::LocalColumnIndex& projection, |
117 | 14 | const ParquetColumnSchema** leaf_schema) { |
118 | 14 | DORIS_CHECK(leaf_schema != nullptr); |
119 | 14 | if (projection.project_all_children || projection.children.empty()) { |
120 | 12 | if (column_schema.leaf_column_id < 0) { |
121 | 2 | return Status::NotSupported( |
122 | 2 | "Parquet aggregate pushdown only supports primitive column {}", |
123 | 2 | column_schema.name); |
124 | 2 | } |
125 | 10 | if (column_schema.max_repetition_level > 0) { |
126 | 0 | return Status::NotSupported( |
127 | 0 | "Parquet aggregate pushdown does not support repeated column {}", |
128 | 0 | column_schema.name); |
129 | 0 | } |
130 | 10 | *leaf_schema = &column_schema; |
131 | 10 | return Status::OK(); |
132 | 10 | } |
133 | 2 | if (projection.children.size() != 1) { |
134 | 0 | return Status::NotSupported( |
135 | 0 | "Parquet aggregate pushdown only supports a single nested leaf under column {}", |
136 | 0 | column_schema.name); |
137 | 0 | } |
138 | 2 | const auto& child_projection = projection.children[0]; |
139 | 2 | const auto child_schema_it = |
140 | 2 | std::ranges::find_if(column_schema.children, [&](const auto& child_schema) { |
141 | 2 | return child_schema->local_id == child_projection.local_id(); |
142 | 2 | }); |
143 | 2 | if (child_schema_it != column_schema.children.end()) { |
144 | 2 | return find_projected_minmax_leaf(**child_schema_it, child_projection, leaf_schema); |
145 | 2 | } |
146 | 0 | return Status::InvalidArgument("Invalid parquet aggregate projection local id {} for column {}", |
147 | 0 | child_projection.local_id(), column_schema.name); |
148 | 2 | } |
149 | | |
150 | | void ParquetReader::_fill_column_definition(const ParquetColumnSchema& column_schema, |
151 | 237 | format::ColumnDefinition* field) const { |
152 | 237 | if (column_schema.parquet_field_id >= 0) { |
153 | 95 | field->identifier = Field::create_field<TYPE_INT>(column_schema.parquet_field_id); |
154 | 142 | } else { |
155 | 142 | field->identifier = Field::create_field<TYPE_STRING>(column_schema.name); |
156 | 142 | } |
157 | 237 | field->local_id = column_schema.local_id; |
158 | 237 | field->name = column_schema.name; |
159 | 237 | field->type = column_schema.type != nullptr && !column_schema.type->is_nullable() |
160 | 237 | ? make_nullable(column_schema.type) |
161 | 237 | : column_schema.type; |
162 | 237 | field->children.clear(); |
163 | 237 | field->children.reserve(column_schema.children.size()); |
164 | 237 | for (const auto& child : column_schema.children) { |
165 | 26 | format::ColumnDefinition child_field; |
166 | 26 | _fill_column_definition(*child, &child_field); |
167 | 26 | field->children.push_back(std::move(child_field)); |
168 | 26 | } |
169 | 237 | } |
170 | | |
171 | | ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties, |
172 | | std::unique_ptr<io::FileDescription>& file_description, |
173 | | std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile, |
174 | | std::optional<format::GlobalRowIdContext> global_rowid_context, |
175 | | bool enable_mapping_timestamp_tz) |
176 | 103 | : FileReader(system_properties, file_description, io_ctx, profile), |
177 | 103 | _global_rowid_context(global_rowid_context), |
178 | 103 | _enable_mapping_timestamp_tz(enable_mapping_timestamp_tz) {} |
179 | | |
180 | 103 | ParquetReader::~ParquetReader() = default; |
181 | | |
182 | 102 | Status ParquetReader::init(RuntimeState* state) { |
183 | 102 | RETURN_IF_ERROR(format::FileReader::init(state)); |
184 | 102 | if (_profile != nullptr) { |
185 | 23 | COUNTER_UPDATE(_parquet_profile.file_reader_create_time, |
186 | 23 | _reader_statistics.file_reader_create_time); |
187 | 23 | COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num); |
188 | 23 | } |
189 | 102 | _state = std::make_unique<ParquetReaderScanState>(); |
190 | 102 | _state->enable_bloom_filter = |
191 | 102 | state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter; |
192 | 102 | if (state != nullptr) { |
193 | 102 | _state->timezone = &state->timezone_obj(); |
194 | 102 | _state->scheduler.set_timezone(&state->timezone_obj()); |
195 | 102 | _state->scheduler.set_enable_strict_mode(state->enable_strict_mode()); |
196 | 102 | } |
197 | | // Open parquet file and parse metadata to get file schema. |
198 | 102 | RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get())); |
199 | | // Build file schema from parquet metadata. |
200 | | // A file reader may expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier |
201 | 102 | RETURN_IF_ERROR( |
202 | 102 | build_parquet_column_schema(*_state->file_context.schema, &_state->file_schema)); |
203 | 102 | if (_enable_mapping_timestamp_tz) { |
204 | 1 | for (auto& column_schema : _state->file_schema) { |
205 | 1 | apply_timestamp_tz_mapping(column_schema.get()); |
206 | 1 | } |
207 | 1 | } |
208 | 102 | return Status::OK(); |
209 | 102 | } |
210 | | |
211 | 97 | Status ParquetReader::get_schema(std::vector<format::ColumnDefinition>* file_schema) const { |
212 | 97 | if (file_schema == nullptr) { |
213 | 0 | return Status::InvalidArgument("file_schema is null"); |
214 | 0 | } |
215 | 97 | file_schema->clear(); |
216 | 97 | if (_state == nullptr || _state->file_context.schema == nullptr) { |
217 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
218 | 0 | } |
219 | | |
220 | 97 | file_schema->reserve(_state->file_schema.size()); |
221 | 308 | for (size_t column_idx = 0; column_idx < _state->file_schema.size(); ++column_idx) { |
222 | 211 | format::ColumnDefinition field; |
223 | 211 | _fill_column_definition(*_state->file_schema[column_idx], &field); |
224 | 211 | DORIS_CHECK(field.local_id == static_cast<int32_t>(column_idx)); |
225 | 211 | file_schema->push_back(std::move(field)); |
226 | 211 | } |
227 | 97 | if (_global_rowid_context.has_value()) { |
228 | 2 | file_schema->push_back(format::global_rowid_column_definition()); |
229 | 2 | } |
230 | 97 | return Status::OK(); |
231 | 97 | } |
232 | | |
233 | | std::unique_ptr<format::TableColumnMapper> ParquetReader::create_column_mapper( |
234 | 58 | format::TableColumnMapperOptions options) const { |
235 | 58 | return std::make_unique<format::ParquetColumnMapper>(std::move(options)); |
236 | 58 | } |
237 | | |
238 | 98 | Status ParquetReader::open(std::shared_ptr<format::FileScanRequest> request) { |
239 | 98 | if (_state == nullptr || _state->file_context.metadata == nullptr || |
240 | 98 | _state->file_context.schema == nullptr) { |
241 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
242 | 0 | } |
243 | 98 | auto request_snapshot = request; |
244 | 98 | DORIS_CHECK(request_snapshot != nullptr); |
245 | 98 | RETURN_IF_ERROR(format::FileReader::open(std::move(request))); |
246 | | |
247 | 98 | const int num_fields = static_cast<int>(_state->file_schema.size()); |
248 | 98 | for (const auto& column_filter : request_snapshot->column_predicate_filters) { |
249 | 17 | const auto file_column_id = column_filter.effective_file_column_id(); |
250 | 17 | if (!file_column_id.is_valid() || file_column_id.value() >= num_fields) { |
251 | 0 | return Status::InvalidArgument("Invalid parquet filter top-level local id {}", |
252 | 0 | file_column_id.value()); |
253 | 0 | } |
254 | 17 | } |
255 | | |
256 | | // `local_positions.empty()` means all columns are needed by table reader |
257 | | // TODO(gabriel): It will happen only for TVF `select *` query. |
258 | 98 | if (request_snapshot->local_positions.empty()) { |
259 | 26 | for (const auto& col : request_snapshot->predicate_columns) { |
260 | 9 | request_snapshot->local_positions.emplace(col.column_id(), |
261 | 9 | format::LocalIndex(col.column_id().value())); |
262 | 9 | } |
263 | 26 | for (const auto& col : request_snapshot->non_predicate_columns) { |
264 | 11 | request_snapshot->local_positions.emplace(col.column_id(), |
265 | 11 | format::LocalIndex(col.column_id().value())); |
266 | 11 | } |
267 | 26 | } |
268 | | |
269 | 98 | for (const auto& col : request_snapshot->predicate_columns) { |
270 | 42 | DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0); |
271 | 42 | const auto local_id = col.local_id(); |
272 | 42 | if (local_id == format::ROW_POSITION_COLUMN_ID || |
273 | 42 | local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
274 | 11 | continue; |
275 | 11 | } |
276 | 31 | DORIS_CHECK(local_id >= 0 && local_id < num_fields); |
277 | 31 | } |
278 | 105 | for (const auto& col : request_snapshot->non_predicate_columns) { |
279 | 105 | DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0); |
280 | 105 | const auto local_id = col.local_id(); |
281 | 105 | if (local_id == format::ROW_POSITION_COLUMN_ID || |
282 | 105 | local_id == format::GLOBAL_ROWID_COLUMN_ID) { |
283 | 14 | continue; |
284 | 14 | } |
285 | 91 | DORIS_CHECK(local_id >= 0 && local_id < num_fields); |
286 | 91 | } |
287 | | |
288 | 98 | RowGroupScanPlan row_group_plan; |
289 | 98 | ParquetScanRange scan_range; |
290 | 98 | scan_range.start_offset = _file_description->range_start_offset; |
291 | 98 | scan_range.size = _file_description->range_size; |
292 | 98 | scan_range.file_size = _file_description->file_size; |
293 | | // Get selected ranges in row groups according to metadata (Row-Group level index and Page Index including Zonemap, Dictionary, Bloom Filter). |
294 | 98 | RETURN_IF_ERROR(plan_parquet_row_groups( |
295 | 98 | *_state->file_context.metadata, _state->file_context.file_reader.get(), |
296 | 98 | _state->file_schema, *request_snapshot, scan_range, _state->enable_bloom_filter, |
297 | 98 | &row_group_plan, _state->timezone)); |
298 | 98 | if (_profile != nullptr) { |
299 | 23 | _parquet_profile.update_pruning_stats(row_group_plan.pruning_stats); |
300 | 23 | } |
301 | 98 | _state->scan_plan = row_group_plan; |
302 | 98 | _state->scheduler.set_page_skip_profile(_parquet_profile.page_skip_profile()); |
303 | 98 | _state->scheduler.set_global_rowid_context(_global_rowid_context); |
304 | 98 | _state->scheduler.set_scan_profile(_parquet_profile.scan_profile()); |
305 | 98 | _state->scheduler.set_plan(std::move(row_group_plan)); |
306 | 98 | _eof = _state->scheduler.empty(); |
307 | 98 | return Status::OK(); |
308 | 98 | } |
309 | | |
310 | 132 | Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) { |
311 | 132 | if (_state == nullptr || _state->file_context.file_reader == nullptr || |
312 | 132 | _state->file_context.schema == nullptr) { |
313 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
314 | 0 | } |
315 | 132 | *rows = 0; |
316 | 132 | if (_eof) { |
317 | 1 | *eof = true; |
318 | 1 | return Status::OK(); |
319 | 1 | } |
320 | 131 | auto request_snapshot = _request; |
321 | 131 | if (request_snapshot == nullptr) { |
322 | 0 | return Status::Cancelled("ParquetReader is closed"); |
323 | 0 | } |
324 | | |
325 | 131 | const auto predicate_filtered_rows_before = _state->scheduler.predicate_filtered_rows(); |
326 | 131 | RETURN_IF_ERROR(_state->scheduler.read_next_batch(_state->file_context, _state->file_schema, |
327 | 131 | *request_snapshot, file_block, rows, eof)); |
328 | 131 | if (_io_ctx != nullptr) { |
329 | 36 | _io_ctx->predicate_filtered_rows += |
330 | 36 | _state->scheduler.predicate_filtered_rows() - predicate_filtered_rows_before; |
331 | 36 | } |
332 | 131 | _eof = *eof; |
333 | 131 | return Status::OK(); |
334 | 131 | } |
335 | | |
336 | 2 | void ParquetReader::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) { |
337 | 2 | if (_state == nullptr) { |
338 | 0 | return; |
339 | 0 | } |
340 | 2 | _state->scheduler.set_condition_cache_context(std::move(ctx)); |
341 | 2 | if (_io_ctx != nullptr) { |
342 | | // Condition-cache HIT filters row ranges before batch reading, so skipped rows never belong |
343 | | // to a later get_block() batch. Report the plan-level skipped rows at the same point where |
344 | | // the scan plan is rewritten. |
345 | 1 | _io_ctx->condition_cache_filtered_rows += _state->scheduler.condition_cache_filtered_rows(); |
346 | 1 | } |
347 | 2 | } |
348 | | |
349 | 0 | int64_t ParquetReader::get_total_rows() const { |
350 | 0 | if (_state == nullptr) { |
351 | 0 | return 0; |
352 | 0 | } |
353 | 0 | int64_t rows = 0; |
354 | 0 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
355 | 0 | rows += row_group_plan.row_group_rows; |
356 | 0 | } |
357 | 0 | return rows; |
358 | 0 | } |
359 | | |
360 | | Status ParquetReader::get_aggregate_result(const format::FileAggregateRequest& request, |
361 | 16 | format::FileAggregateResult* result) { |
362 | 16 | DORIS_CHECK(result != nullptr); |
363 | 16 | if (_state == nullptr || _state->file_context.metadata == nullptr || |
364 | 16 | _state->file_context.schema == nullptr) { |
365 | 0 | return Status::Uninitialized("ParquetReader is not open"); |
366 | 0 | } |
367 | 16 | result->count = 0; |
368 | 16 | result->columns.clear(); |
369 | 16 | if (request.agg_type != TPushAggOp::type::COUNT && |
370 | 16 | request.agg_type != TPushAggOp::type::MINMAX) { |
371 | 1 | return Status::NotSupported("Unsupported parquet aggregate pushdown type {}", |
372 | 1 | request.agg_type); |
373 | 1 | } |
374 | | |
375 | | // Aggregate row count in all selected row groups. For MIN/MAX aggregate, this is used to determine whether there is no row group selected. |
376 | 30 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
377 | 30 | auto row_group_metadata = |
378 | 30 | _state->file_context.metadata->RowGroup(row_group_plan.row_group_id); |
379 | 30 | DORIS_CHECK(row_group_metadata != nullptr); |
380 | 30 | result->count += row_group_metadata->num_rows(); |
381 | 30 | } |
382 | 15 | if (request.agg_type == TPushAggOp::type::COUNT) { |
383 | 4 | return Status::OK(); |
384 | 4 | } |
385 | | |
386 | 11 | result->columns.resize(request.columns.size()); |
387 | 20 | for (size_t request_column_idx = 0; request_column_idx < request.columns.size(); |
388 | 13 | ++request_column_idx) { |
389 | 13 | const auto file_column_id = request.columns[request_column_idx].projection.local_id(); |
390 | 13 | if (file_column_id < 0 || |
391 | 13 | file_column_id >= static_cast<int32_t>(_state->file_schema.size())) { |
392 | 1 | return Status::InvalidArgument("Invalid parquet aggregate column id {}", |
393 | 1 | file_column_id); |
394 | 1 | } |
395 | 12 | const auto& column_schema = _state->file_schema[file_column_id]; |
396 | 12 | DORIS_CHECK(column_schema != nullptr); |
397 | 12 | const ParquetColumnSchema* leaf_schema = nullptr; |
398 | 12 | RETURN_IF_ERROR(find_projected_minmax_leaf( |
399 | 12 | *column_schema, request.columns[request_column_idx].projection, &leaf_schema)); |
400 | 10 | DORIS_CHECK(leaf_schema != nullptr); |
401 | | |
402 | 10 | auto& aggregate_column = result->columns[request_column_idx]; |
403 | 10 | aggregate_column.projection = request.columns[request_column_idx].projection; |
404 | 19 | for (const auto& row_group_plan : _state->scan_plan.row_groups) { |
405 | 19 | auto row_group_metadata = |
406 | 19 | _state->file_context.metadata->RowGroup(row_group_plan.row_group_id); |
407 | 19 | DORIS_CHECK(row_group_metadata != nullptr); |
408 | 19 | auto column_chunk = row_group_metadata->ColumnChunk(leaf_schema->leaf_column_id); |
409 | 19 | DORIS_CHECK(column_chunk != nullptr); |
410 | 19 | const auto statistics = ParquetStatisticsUtils::TransformColumnStatistics( |
411 | 19 | *leaf_schema, column_chunk->statistics(), _state->timezone); |
412 | 19 | if (!statistics.has_min_max) { |
413 | 1 | return Status::NotSupported("Missing parquet min/max statistics for column {}", |
414 | 1 | leaf_schema->name); |
415 | 1 | } |
416 | 18 | if (!aggregate_column.has_min || statistics.min_value < aggregate_column.min_value) { |
417 | 9 | aggregate_column.min_value = statistics.min_value; |
418 | 9 | aggregate_column.has_min = true; |
419 | 9 | } |
420 | 18 | if (!aggregate_column.has_max || aggregate_column.max_value < statistics.max_value) { |
421 | 18 | aggregate_column.max_value = statistics.max_value; |
422 | 18 | aggregate_column.has_max = true; |
423 | 18 | } |
424 | 18 | } |
425 | 9 | if (!aggregate_column.has_min || !aggregate_column.has_max) { |
426 | 0 | return Status::NotSupported("No parquet row group selected for min/max pushdown"); |
427 | 0 | } |
428 | 9 | } |
429 | 7 | return Status::OK(); |
430 | 11 | } |
431 | | |
432 | 65 | Status ParquetReader::close() { |
433 | 65 | if (_state != nullptr) { |
434 | 65 | RETURN_IF_ERROR(_state->file_context.close()); |
435 | 65 | } |
436 | 65 | return FileReader::close(); |
437 | 65 | } |
438 | | |
439 | 102 | void ParquetReader::_init_profile() { |
440 | 102 | _parquet_profile.init(_profile); |
441 | 102 | } |
442 | | |
443 | | } // namespace doris::format::parquet |