Coverage Report

Created: 2026-07-04 04:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/parquet/parquet_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/parquet/parquet_reader.h"
19
20
#include <algorithm>
21
#include <map>
22
#include <memory>
23
#include <optional>
24
#include <ranges>
25
#include <unordered_set>
26
#include <utility>
27
#include <vector>
28
29
#include "core/assert_cast.h"
30
#include "core/block/block.h"
31
#include "core/data_type/data_type_array.h"
32
#include "core/data_type/data_type_factory.hpp"
33
#include "core/data_type/data_type_map.h"
34
#include "core/data_type/data_type_nullable.h"
35
#include "core/data_type/data_type_struct.h"
36
#include "format_v2/column_mapper.h"
37
#include "format_v2/parquet/parquet_column_schema.h"
38
#include "format_v2/parquet/parquet_file_context.h"
39
#include "format_v2/parquet/parquet_scan.h"
40
#include "format_v2/parquet/parquet_statistics.h"
41
#include "format_v2/parquet/reader/column_reader.h"
42
#include "runtime/runtime_state.h"
43
44
namespace doris::format::parquet {
45
46
struct ParquetReaderScanState {
47
    ParquetFileContext file_context;
48
    std::vector<std::unique_ptr<ParquetColumnSchema>> file_schema;
49
    RowGroupScanPlan scan_plan;
50
    ParquetScanScheduler scheduler;
51
    const cctz::time_zone* timezone = nullptr;
52
    bool enable_bloom_filter = false;
53
    bool enable_page_cache = false;
54
    bool enable_strict_mode = false;
55
};
56
57
155
int64_t column_chunk_start_offset(const ::parquet::ColumnChunkMetaData& column_metadata) {
58
155
    return column_metadata.has_dictionary_page()
59
155
                   ? cast_set<int64_t>(column_metadata.dictionary_page_offset())
60
155
                   : cast_set<int64_t>(column_metadata.data_page_offset());
61
155
}
62
63
void collect_all_leaf_column_ids(const ParquetColumnSchema& column_schema,
64
142
                                 std::unordered_set<int>* leaf_column_ids) {
65
142
    DORIS_CHECK(leaf_column_ids != nullptr);
66
142
    if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) {
67
133
        if (column_schema.leaf_column_id >= 0) {
68
133
            leaf_column_ids->insert(column_schema.leaf_column_id);
69
133
        }
70
133
        return;
71
133
    }
72
13
    for (const auto& child : column_schema.children) {
73
13
        DORIS_CHECK(child != nullptr);
74
13
        collect_all_leaf_column_ids(*child, leaf_column_ids);
75
13
    }
76
9
}
77
78
void collect_projected_leaf_column_ids(const ParquetColumnSchema& column_schema,
79
                                       const format::LocalColumnIndex& projection,
80
135
                                       std::unordered_set<int>* leaf_column_ids) {
81
135
    DORIS_CHECK(leaf_column_ids != nullptr);
82
135
    if (projection.project_all_children || projection.children.empty()) {
83
129
        collect_all_leaf_column_ids(column_schema, leaf_column_ids);
84
129
        return;
85
129
    }
86
7
    for (const auto& child_projection : projection.children) {
87
7
        const auto child_it =
88
11
                std::ranges::find_if(column_schema.children, [&](const auto& child_schema) {
89
11
                    return child_schema->local_id == child_projection.local_id();
90
11
                });
91
7
        DORIS_CHECK(child_it != column_schema.children.end());
92
7
        collect_projected_leaf_column_ids(**child_it, child_projection, leaf_column_ids);
93
7
    }
94
6
}
95
96
void collect_request_leaf_column_ids(
97
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
98
104
        const format::FileScanRequest& request, std::unordered_set<int>* leaf_column_ids) {
99
104
    DORIS_CHECK(leaf_column_ids != nullptr);
100
153
    auto collect_scan_column = [&](const format::LocalColumnIndex& projection) {
101
153
        const auto local_id = projection.local_id();
102
153
        if (local_id == format::ROW_POSITION_COLUMN_ID ||
103
153
            local_id == format::GLOBAL_ROWID_COLUMN_ID) {
104
25
            return;
105
25
        }
106
128
        DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
107
128
        DORIS_CHECK(file_schema[local_id] != nullptr);
108
128
        collect_projected_leaf_column_ids(*file_schema[local_id], projection, leaf_column_ids);
109
128
    };
110
104
    for (const auto& column : request.predicate_columns) {
111
42
        collect_scan_column(column);
112
42
    }
113
111
    for (const auto& column : request.non_predicate_columns) {
114
111
        collect_scan_column(column);
115
111
    }
116
104
}
117
118
std::vector<ParquetPageCacheRange> build_page_cache_ranges(
119
        const ::parquet::FileMetaData& metadata,
120
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
121
104
        const format::FileScanRequest& request, const RowGroupScanPlan& row_group_plan) {
122
104
    std::unordered_set<int> leaf_column_ids;
123
104
    collect_request_leaf_column_ids(file_schema, request, &leaf_column_ids);
124
104
    std::vector<ParquetPageCacheRange> ranges;
125
104
    ranges.reserve(row_group_plan.row_groups.size() * leaf_column_ids.size());
126
125
    for (const auto& row_group_plan_item : row_group_plan.row_groups) {
127
125
        auto row_group_metadata = metadata.RowGroup(row_group_plan_item.row_group_id);
128
125
        DORIS_CHECK(row_group_metadata != nullptr);
129
155
        for (const auto leaf_column_id : leaf_column_ids) {
130
155
            DORIS_CHECK(leaf_column_id >= 0 && leaf_column_id < row_group_metadata->num_columns());
131
155
            auto column_metadata = row_group_metadata->ColumnChunk(leaf_column_id);
132
155
            DORIS_CHECK(column_metadata != nullptr);
133
155
            const int64_t offset = column_chunk_start_offset(*column_metadata);
134
155
            const int64_t size = column_metadata->total_compressed_size();
135
155
            DORIS_CHECK(offset >= 0);
136
155
            DORIS_CHECK(size >= 0);
137
155
            if (size > 0) {
138
155
                ranges.push_back(ParquetPageCacheRange {.offset = offset, .size = size});
139
155
            }
140
155
        }
141
125
    }
142
104
    return ranges;
143
104
}
144
145
const ParquetColumnSchema& projected_root_schema(
146
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
147
3
        const format::LocalColumnIndex& projection) {
148
3
    const auto local_id = projection.local_id();
149
3
    DORIS_CHECK(local_id >= 0 && local_id < static_cast<int32_t>(file_schema.size()));
150
3
    DORIS_CHECK(file_schema[local_id] != nullptr);
151
3
    return *file_schema[local_id];
152
3
}
153
154
int64_t count_loaded_non_null_values(const ParquetColumnSchema& root_schema,
155
                                     const ParquetColumnReader& shape_reader,
156
3
                                     int64_t expected_rows) {
157
3
    const auto& def_levels = shape_reader.nested_definition_levels();
158
3
    const auto& rep_levels = shape_reader.nested_repetition_levels();
159
3
    const int64_t levels_written = shape_reader.nested_levels_written();
160
3
    DORIS_CHECK(levels_written >= expected_rows);
161
3
    if (root_schema.max_repetition_level == 0) {
162
1
        DORIS_CHECK(levels_written == expected_rows);
163
1
        const int16_t non_null_definition_level = root_schema.nullable_definition_level;
164
1
        int64_t count = 0;
165
6
        for (int64_t level_idx = 0; level_idx < levels_written; ++level_idx) {
166
5
            count += def_levels[level_idx] >= non_null_definition_level ? 1 : 0;
167
5
        }
168
1
        return count;
169
1
    }
170
171
    // For repeated encodings, one top-level row starts when the leaf repetition level moves above
172
    // no higher than the top-level container's repeated boundary. Empty MAP/LIST rows have no
173
    // entries but still carry a level slot; they are non-NULL and must be counted by count(col).
174
2
    const int16_t non_null_definition_level =
175
2
            static_cast<int16_t>(root_schema.definition_level - 1);
176
2
    int64_t counted_rows = 0;
177
2
    int64_t non_null_rows = 0;
178
13
    for (int64_t level_idx = 0; level_idx < levels_written && counted_rows < expected_rows;
179
11
         ++level_idx) {
180
11
        if (rep_levels[level_idx] >= root_schema.repetition_level) {
181
1
            continue;
182
1
        }
183
10
        ++counted_rows;
184
10
        non_null_rows += def_levels[level_idx] >= non_null_definition_level ? 1 : 0;
185
10
    }
186
2
    DORIS_CHECK(counted_rows == expected_rows);
187
2
    return non_null_rows;
188
3
}
189
190
0
DataTypePtr nullable_like_original(const DataTypePtr& type, DataTypePtr nested_type) {
191
0
    return type != nullptr && type->is_nullable() ? make_nullable(nested_type) : nested_type;
192
0
}
193
194
1
int timestamp_tz_scale(const ParquetTypeDescriptor& type_descriptor) {
195
1
    switch (type_descriptor.time_unit) {
196
0
    case ParquetTimeUnit::MILLIS:
197
0
        return 3;
198
0
    case ParquetTimeUnit::MICROS:
199
1
    case ParquetTimeUnit::UNKNOWN:
200
1
    default:
201
1
        return 6;
202
1
    }
203
1
}
204
205
1
bool should_map_to_timestamp_tz(const ParquetColumnSchema& column_schema) {
206
1
    const auto& type_descriptor = column_schema.type_descriptor;
207
1
    return type_descriptor.physical_type == ::parquet::Type::INT96 ||
208
1
           (type_descriptor.is_timestamp && type_descriptor.timestamp_is_adjusted_to_utc);
209
1
}
210
211
1
DataTypePtr apply_timestamp_tz_mapping(ParquetColumnSchema* column_schema) {
212
1
    DORIS_CHECK(column_schema != nullptr);
213
1
    if (column_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) {
214
1
        if (should_map_to_timestamp_tz(*column_schema)) {
215
1
            const bool nullable =
216
1
                    column_schema->type != nullptr && column_schema->type->is_nullable();
217
1
            const auto scale = timestamp_tz_scale(column_schema->type_descriptor);
218
1
            column_schema->type = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ,
219
1
                                                                               nullable, 0, scale);
220
1
            column_schema->type_descriptor.doris_type = column_schema->type;
221
1
        }
222
1
        return column_schema->type;
223
1
    }
224
225
0
    std::vector<DataTypePtr> child_types;
226
0
    child_types.reserve(column_schema->children.size());
227
0
    for (auto& child : column_schema->children) {
228
0
        child_types.push_back(apply_timestamp_tz_mapping(child.get()));
229
0
    }
230
231
0
    if (column_schema->kind == ParquetColumnSchemaKind::LIST) {
232
0
        DORIS_CHECK(child_types.size() == 1);
233
0
        column_schema->type = nullable_like_original(
234
0
                column_schema->type, std::make_shared<DataTypeArray>(child_types[0]));
235
0
    } else if (column_schema->kind == ParquetColumnSchemaKind::MAP) {
236
0
        DORIS_CHECK(child_types.size() == 2);
237
0
        column_schema->type = nullable_like_original(
238
0
                column_schema->type, std::make_shared<DataTypeMap>(make_nullable(child_types[0]),
239
0
                                                                   make_nullable(child_types[1])));
240
0
    } else if (column_schema->kind == ParquetColumnSchemaKind::STRUCT) {
241
0
        Strings child_names;
242
0
        child_names.reserve(column_schema->children.size());
243
0
        for (const auto& child : column_schema->children) {
244
0
            child_names.push_back(child->name);
245
0
        }
246
0
        column_schema->type = nullable_like_original(
247
0
                column_schema->type, std::make_shared<DataTypeStruct>(child_types, child_names));
248
0
    }
249
0
    return column_schema->type;
250
1
}
251
252
static Status find_projected_minmax_leaf(const ParquetColumnSchema& column_schema,
253
                                         const format::LocalColumnIndex& projection,
254
14
                                         const ParquetColumnSchema** leaf_schema) {
255
14
    DORIS_CHECK(leaf_schema != nullptr);
256
14
    if (projection.project_all_children || projection.children.empty()) {
257
12
        if (column_schema.leaf_column_id < 0) {
258
2
            return Status::NotSupported(
259
2
                    "Parquet aggregate pushdown only supports primitive column {}",
260
2
                    column_schema.name);
261
2
        }
262
10
        if (column_schema.max_repetition_level > 0) {
263
0
            return Status::NotSupported(
264
0
                    "Parquet aggregate pushdown does not support repeated column {}",
265
0
                    column_schema.name);
266
0
        }
267
10
        *leaf_schema = &column_schema;
268
10
        return Status::OK();
269
10
    }
270
2
    if (projection.children.size() != 1) {
271
0
        return Status::NotSupported(
272
0
                "Parquet aggregate pushdown only supports a single nested leaf under column {}",
273
0
                column_schema.name);
274
0
    }
275
2
    const auto& child_projection = projection.children[0];
276
2
    const auto child_schema_it =
277
2
            std::ranges::find_if(column_schema.children, [&](const auto& child_schema) {
278
2
                return child_schema->local_id == child_projection.local_id();
279
2
            });
280
2
    if (child_schema_it != column_schema.children.end()) {
281
2
        return find_projected_minmax_leaf(**child_schema_it, child_projection, leaf_schema);
282
2
    }
283
0
    return Status::InvalidArgument("Invalid parquet aggregate projection local id {} for column {}",
284
0
                                   child_projection.local_id(), column_schema.name);
285
2
}
286
287
void ParquetReader::_fill_column_definition(const ParquetColumnSchema& column_schema,
288
244
                                            format::ColumnDefinition* field) const {
289
244
    if (column_schema.parquet_field_id >= 0) {
290
95
        field->identifier = Field::create_field<TYPE_INT>(column_schema.parquet_field_id);
291
149
    } else {
292
149
        field->identifier = Field::create_field<TYPE_STRING>(column_schema.name);
293
149
    }
294
244
    field->local_id = column_schema.local_id;
295
244
    field->name = column_schema.name;
296
244
    field->type = column_schema.type != nullptr && !column_schema.type->is_nullable()
297
244
                          ? make_nullable(column_schema.type)
298
244
                          : column_schema.type;
299
244
    field->children.clear();
300
244
    field->children.reserve(column_schema.children.size());
301
244
    for (const auto& child : column_schema.children) {
302
26
        format::ColumnDefinition child_field;
303
26
        _fill_column_definition(*child, &child_field);
304
26
        field->children.push_back(std::move(child_field));
305
26
    }
306
244
}
307
308
ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
309
                             std::unique_ptr<io::FileDescription>& file_description,
310
                             std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile,
311
                             std::optional<format::GlobalRowIdContext> global_rowid_context,
312
                             bool enable_mapping_timestamp_tz)
313
109
        : FileReader(system_properties, file_description, io_ctx, profile),
314
109
          _global_rowid_context(global_rowid_context),
315
109
          _enable_mapping_timestamp_tz(enable_mapping_timestamp_tz) {}
316
317
109
ParquetReader::~ParquetReader() = default;
318
319
108
Status ParquetReader::init(RuntimeState* state) {
320
108
    RETURN_IF_ERROR(format::FileReader::init(state));
321
108
    if (_profile != nullptr) {
322
28
        COUNTER_UPDATE(_parquet_profile.file_reader_create_time,
323
28
                       _reader_statistics.file_reader_create_time);
324
28
        COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num);
325
28
    }
326
108
    _state = std::make_unique<ParquetReaderScanState>();
327
108
    _state->enable_bloom_filter =
328
108
            state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter;
329
108
    _state->enable_page_cache =
330
108
            state != nullptr && state->query_options().enable_parquet_file_page_cache;
331
108
    if (state != nullptr) {
332
108
        _state->timezone = &state->timezone_obj();
333
108
        _state->enable_strict_mode = state->enable_strict_mode();
334
108
        _state->scheduler.set_timezone(&state->timezone_obj());
335
108
        _state->scheduler.set_enable_strict_mode(_state->enable_strict_mode);
336
108
    }
337
108
    int64_t merge_read_slice_size = -1;
338
108
    if (state != nullptr && state->query_options().__isset.merge_read_slice_size) {
339
108
        merge_read_slice_size = state->query_options().merge_read_slice_size;
340
108
    }
341
108
    _state->scheduler.set_merge_read_options(_profile, merge_read_slice_size);
342
108
    _state->scheduler.set_batch_size(_batch_size);
343
    // Open parquet file and parse metadata to get file schema.
344
108
    RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get(),
345
108
                                              _state->enable_page_cache, *_file_description));
346
    // Build file schema from parquet metadata.
347
    // A file reader may expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier
348
108
    RETURN_IF_ERROR(
349
108
            build_parquet_column_schema(*_state->file_context.schema, &_state->file_schema));
350
108
    if (_enable_mapping_timestamp_tz) {
351
1
        for (auto& column_schema : _state->file_schema) {
352
1
            apply_timestamp_tz_mapping(column_schema.get());
353
1
        }
354
1
    }
355
108
    return Status::OK();
356
108
}
357
358
1
void ParquetReader::set_batch_size(size_t batch_size) {
359
1
    _batch_size = std::max<size_t>(1, batch_size);
360
1
    if (_state != nullptr) {
361
0
        _state->scheduler.set_batch_size(_batch_size);
362
0
    }
363
1
}
364
365
100
Status ParquetReader::get_schema(std::vector<format::ColumnDefinition>* file_schema) const {
366
100
    if (file_schema == nullptr) {
367
0
        return Status::InvalidArgument("file_schema is null");
368
0
    }
369
100
    file_schema->clear();
370
100
    if (_state == nullptr || _state->file_context.schema == nullptr) {
371
0
        return Status::Uninitialized("ParquetReader is not open");
372
0
    }
373
374
100
    file_schema->reserve(_state->file_schema.size());
375
318
    for (size_t column_idx = 0; column_idx < _state->file_schema.size(); ++column_idx) {
376
218
        format::ColumnDefinition field;
377
218
        _fill_column_definition(*_state->file_schema[column_idx], &field);
378
218
        DORIS_CHECK(field.local_id == static_cast<int32_t>(column_idx));
379
218
        file_schema->push_back(std::move(field));
380
218
    }
381
100
    if (_global_rowid_context.has_value()) {
382
2
        file_schema->push_back(format::global_rowid_column_definition());
383
2
    }
384
100
    return Status::OK();
385
100
}
386
387
std::unique_ptr<format::TableColumnMapper> ParquetReader::create_column_mapper(
388
58
        format::TableColumnMapperOptions options) const {
389
58
    return std::make_unique<format::ParquetColumnMapper>(std::move(options));
390
58
}
391
392
104
Status ParquetReader::open(std::shared_ptr<format::FileScanRequest> request) {
393
104
    if (_state == nullptr || _state->file_context.metadata == nullptr ||
394
104
        _state->file_context.schema == nullptr) {
395
0
        return Status::Uninitialized("ParquetReader is not open");
396
0
    }
397
104
    auto request_snapshot = request;
398
104
    DORIS_CHECK(request_snapshot != nullptr);
399
104
    RETURN_IF_ERROR(format::FileReader::open(std::move(request)));
400
401
104
    const int num_fields = static_cast<int>(_state->file_schema.size());
402
104
    for (const auto& column_filter : request_snapshot->column_predicate_filters) {
403
17
        const auto file_column_id = column_filter.file_column_id;
404
17
        if (!file_column_id.is_valid() || file_column_id.value() >= num_fields) {
405
0
            return Status::InvalidArgument("Invalid parquet filter top-level local id {}",
406
0
                                           file_column_id.value());
407
0
        }
408
17
    }
409
410
    // `local_positions.empty()` means all columns are needed by table reader
411
    // TODO(gabriel): It will happen only for TVF `select *` query.
412
104
    if (request_snapshot->local_positions.empty()) {
413
32
        for (const auto& col : request_snapshot->predicate_columns) {
414
9
            request_snapshot->local_positions.emplace(col.column_id(),
415
9
                                                      format::LocalIndex(col.column_id().value()));
416
9
        }
417
32
        for (const auto& col : request_snapshot->non_predicate_columns) {
418
17
            request_snapshot->local_positions.emplace(col.column_id(),
419
17
                                                      format::LocalIndex(col.column_id().value()));
420
17
        }
421
32
    }
422
423
104
    for (const auto& col : request_snapshot->predicate_columns) {
424
42
        DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0);
425
42
        const auto local_id = col.local_id();
426
42
        if (local_id == format::ROW_POSITION_COLUMN_ID ||
427
42
            local_id == format::GLOBAL_ROWID_COLUMN_ID) {
428
11
            continue;
429
11
        }
430
31
        DORIS_CHECK(local_id >= 0 && local_id < num_fields);
431
31
    }
432
111
    for (const auto& col : request_snapshot->non_predicate_columns) {
433
111
        DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0);
434
111
        const auto local_id = col.local_id();
435
111
        if (local_id == format::ROW_POSITION_COLUMN_ID ||
436
111
            local_id == format::GLOBAL_ROWID_COLUMN_ID) {
437
14
            continue;
438
14
        }
439
97
        DORIS_CHECK(local_id >= 0 && local_id < num_fields);
440
97
    }
441
442
104
    RowGroupScanPlan row_group_plan;
443
104
    ParquetScanRange scan_range;
444
104
    scan_range.start_offset = _file_description->range_start_offset;
445
104
    scan_range.size = _file_description->range_size;
446
104
    scan_range.file_size = _file_description->file_size;
447
    // Get selected ranges in row groups according to metadata (Row-Group level index and Page Index including Zonemap, Dictionary, Bloom Filter).
448
104
    RETURN_IF_ERROR(plan_parquet_row_groups(
449
104
            *_state->file_context.metadata, _state->file_context.file_reader.get(),
450
104
            _state->file_schema, *request_snapshot, scan_range, _state->enable_bloom_filter,
451
104
            &row_group_plan, _state->timezone));
452
104
    if (_profile != nullptr) {
453
28
        _parquet_profile.update_pruning_stats(row_group_plan.pruning_stats);
454
28
    }
455
104
    if (_state->enable_page_cache) {
456
104
        _state->file_context.register_page_cache_ranges(
457
104
                build_page_cache_ranges(*_state->file_context.metadata, _state->file_schema,
458
104
                                        *request_snapshot, row_group_plan));
459
104
    }
460
104
    _state->scan_plan = row_group_plan;
461
104
    _state->scheduler.set_page_skip_profile(_parquet_profile.page_skip_profile());
462
104
    _state->scheduler.set_global_rowid_context(_global_rowid_context);
463
104
    _state->scheduler.set_scan_profile(_parquet_profile.scan_profile());
464
104
    _state->scheduler.set_plan(std::move(row_group_plan));
465
104
    _eof = _state->scheduler.empty();
466
104
    return Status::OK();
467
104
}
468
469
142
Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) {
470
142
    if (_state == nullptr || _state->file_context.file_reader == nullptr ||
471
142
        _state->file_context.schema == nullptr) {
472
0
        return Status::Uninitialized("ParquetReader is not open");
473
0
    }
474
142
    *rows = 0;
475
142
    if (_eof) {
476
1
        *eof = true;
477
1
        return Status::OK();
478
1
    }
479
141
    auto request_snapshot = _request;
480
141
    if (request_snapshot == nullptr) {
481
0
        return Status::Cancelled("ParquetReader is closed");
482
0
    }
483
484
141
    const auto predicate_filtered_rows_before = _state->scheduler.predicate_filtered_rows();
485
141
    RETURN_IF_ERROR(_state->scheduler.read_next_batch(_state->file_context, _state->file_schema,
486
141
                                                      *request_snapshot, file_block, rows, eof));
487
141
    _sync_page_cache_profile();
488
141
    if (_io_ctx != nullptr) {
489
36
        _io_ctx->predicate_filtered_rows +=
490
36
                _state->scheduler.predicate_filtered_rows() - predicate_filtered_rows_before;
491
36
    }
492
141
    _eof = *eof;
493
141
    return Status::OK();
494
141
}
495
496
206
void ParquetReader::_sync_page_cache_profile() {
497
206
    if (_profile == nullptr || _state == nullptr) {
498
142
        return;
499
142
    }
500
64
    const auto stats = _state->file_context.page_cache_stats();
501
64
    COUNTER_UPDATE(_parquet_profile.page_read_counter,
502
64
                   stats.read_count - _reported_page_cache_stats.read_count);
503
64
    COUNTER_UPDATE(_parquet_profile.page_cache_write_counter,
504
64
                   stats.write_count - _reported_page_cache_stats.write_count);
505
64
    COUNTER_UPDATE(
506
64
            _parquet_profile.page_cache_compressed_write_counter,
507
64
            stats.compressed_write_count - _reported_page_cache_stats.compressed_write_count);
508
64
    COUNTER_UPDATE(_parquet_profile.page_cache_hit_counter,
509
64
                   stats.hit_count - _reported_page_cache_stats.hit_count);
510
64
    COUNTER_UPDATE(_parquet_profile.page_cache_missing_counter,
511
64
                   stats.miss_count - _reported_page_cache_stats.miss_count);
512
64
    COUNTER_UPDATE(_parquet_profile.page_cache_compressed_hit_counter,
513
64
                   stats.compressed_hit_count - _reported_page_cache_stats.compressed_hit_count);
514
64
    _reported_page_cache_stats = stats;
515
64
}
516
517
2
void ParquetReader::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {
518
2
    if (_state == nullptr) {
519
0
        return;
520
0
    }
521
2
    _state->scheduler.set_condition_cache_context(std::move(ctx));
522
2
    if (_io_ctx != nullptr) {
523
        // Condition-cache HIT filters row ranges before batch reading, so skipped rows never belong
524
        // to a later get_block() batch. Report the plan-level skipped rows at the same point where
525
        // the scan plan is rewritten.
526
1
        _io_ctx->condition_cache_filtered_rows += _state->scheduler.condition_cache_filtered_rows();
527
1
    }
528
2
}
529
530
0
int64_t ParquetReader::get_total_rows() const {
531
0
    if (_state == nullptr) {
532
0
        return 0;
533
0
    }
534
0
    int64_t rows = 0;
535
0
    for (const auto& row_group_plan : _state->scan_plan.row_groups) {
536
0
        rows += row_group_plan.row_group_rows;
537
0
    }
538
0
    return rows;
539
0
}
540
541
Status ParquetReader::get_aggregate_result(const format::FileAggregateRequest& request,
542
19
                                           format::FileAggregateResult* result) {
543
19
    DORIS_CHECK(result != nullptr);
544
19
    if (_state == nullptr || _state->file_context.metadata == nullptr ||
545
19
        _state->file_context.schema == nullptr) {
546
0
        return Status::Uninitialized("ParquetReader is not open");
547
0
    }
548
19
    result->count = 0;
549
19
    result->columns.clear();
550
19
    if (request.agg_type != TPushAggOp::type::COUNT &&
551
19
        request.agg_type != TPushAggOp::type::MINMAX) {
552
1
        return Status::NotSupported("Unsupported parquet aggregate pushdown type {}",
553
1
                                    request.agg_type);
554
1
    }
555
556
    // Aggregate row count in all selected row groups. For MIN/MAX aggregate, this is used to determine whether there is no row group selected.
557
33
    for (const auto& row_group_plan : _state->scan_plan.row_groups) {
558
33
        auto row_group_metadata =
559
33
                _state->file_context.metadata->RowGroup(row_group_plan.row_group_id);
560
33
        DORIS_CHECK(row_group_metadata != nullptr);
561
33
        result->count += row_group_metadata->num_rows();
562
33
    }
563
18
    if (request.agg_type == TPushAggOp::type::COUNT) {
564
7
        if (request.columns.empty()) {
565
4
            return Status::OK();
566
4
        }
567
3
        if (request.columns.size() != 1) {
568
0
            return Status::NotSupported("Parquet COUNT pushdown only supports one count column");
569
0
        }
570
3
        const auto& count_projection = request.columns[0].projection;
571
3
        const auto& root_schema = projected_root_schema(_state->file_schema, count_projection);
572
3
        result->count = 0;
573
3
        for (const auto& row_group_plan : _state->scan_plan.row_groups) {
574
3
            std::shared_ptr<::parquet::RowGroupReader> row_group;
575
3
            try {
576
3
                row_group = _state->file_context.file_reader->RowGroup(row_group_plan.row_group_id);
577
3
            } catch (const ::parquet::ParquetException& e) {
578
0
                return Status::Corruption("Failed to open parquet row group {}: {}",
579
0
                                          row_group_plan.row_group_id, e.what());
580
0
            } catch (const std::exception& e) {
581
0
                return Status::InternalError("Failed to open parquet row group {}: {}",
582
0
                                             row_group_plan.row_group_id, e.what());
583
0
            }
584
585
3
            ParquetColumnReaderFactory column_reader_factory(
586
3
                    row_group, _state->file_context.schema->num_columns(),
587
3
                    &row_group_plan.page_skip_plans, _parquet_profile.page_skip_profile(),
588
3
                    _state->timezone, _state->enable_strict_mode,
589
3
                    _parquet_profile.scan_profile().column_reader_profile);
590
3
            std::unique_ptr<ParquetColumnReader> shape_reader;
591
3
            RETURN_IF_ERROR(column_reader_factory.create_count_shape_reader(
592
3
                    root_schema, &count_projection, &shape_reader));
593
3
            DORIS_CHECK(shape_reader != nullptr);
594
595
3
            int64_t row_group_cursor = 0;
596
3
            for (const auto& selected_range : row_group_plan.selected_ranges) {
597
3
                DORIS_CHECK(selected_range.start >= row_group_cursor);
598
3
                RETURN_IF_ERROR(shape_reader->skip(selected_range.start - row_group_cursor));
599
3
                row_group_cursor = selected_range.start;
600
601
3
                int64_t range_rows_read = 0;
602
6
                while (range_rows_read < selected_range.length) {
603
3
                    const int64_t batch_rows =
604
3
                            std::min<int64_t>(_batch_size, selected_range.length - range_rows_read);
605
                    // COUNT(col) only needs the top-level NULL state. The shape reader loads
606
                    // def/rep levels from one representative leaf and does not build value_indices
607
                    // or values_column. MAP chooses the key leaf; ARRAY/STRUCT may choose a string
608
                    // leaf, but the levels-only protocol still avoids Doris-side string
609
                    // materialization for that leaf.
610
3
                    RETURN_IF_ERROR(shape_reader->load_nested_levels_batch(batch_rows));
611
3
                    result->count +=
612
3
                            count_loaded_non_null_values(root_schema, *shape_reader, batch_rows);
613
3
                    range_rows_read += batch_rows;
614
3
                    row_group_cursor += batch_rows;
615
3
                }
616
3
            }
617
3
        }
618
3
        return Status::OK();
619
3
    }
620
621
11
    result->columns.resize(request.columns.size());
622
20
    for (size_t request_column_idx = 0; request_column_idx < request.columns.size();
623
13
         ++request_column_idx) {
624
13
        const auto file_column_id = request.columns[request_column_idx].projection.local_id();
625
13
        if (file_column_id < 0 ||
626
13
            file_column_id >= static_cast<int32_t>(_state->file_schema.size())) {
627
1
            return Status::InvalidArgument("Invalid parquet aggregate column id {}",
628
1
                                           file_column_id);
629
1
        }
630
12
        const auto& column_schema = _state->file_schema[file_column_id];
631
12
        DORIS_CHECK(column_schema != nullptr);
632
12
        const ParquetColumnSchema* leaf_schema = nullptr;
633
12
        RETURN_IF_ERROR(find_projected_minmax_leaf(
634
12
                *column_schema, request.columns[request_column_idx].projection, &leaf_schema));
635
10
        DORIS_CHECK(leaf_schema != nullptr);
636
637
10
        auto& aggregate_column = result->columns[request_column_idx];
638
10
        aggregate_column.projection = request.columns[request_column_idx].projection;
639
19
        for (const auto& row_group_plan : _state->scan_plan.row_groups) {
640
19
            auto row_group_metadata =
641
19
                    _state->file_context.metadata->RowGroup(row_group_plan.row_group_id);
642
19
            DORIS_CHECK(row_group_metadata != nullptr);
643
19
            auto column_chunk = row_group_metadata->ColumnChunk(leaf_schema->leaf_column_id);
644
19
            DORIS_CHECK(column_chunk != nullptr);
645
19
            const auto statistics = ParquetStatisticsUtils::TransformColumnStatistics(
646
19
                    *leaf_schema, column_chunk->statistics(), _state->timezone);
647
19
            if (!statistics.has_min_max) {
648
1
                return Status::NotSupported("Missing parquet min/max statistics for column {}",
649
1
                                            leaf_schema->name);
650
1
            }
651
18
            if (!aggregate_column.has_min || statistics.min_value < aggregate_column.min_value) {
652
9
                aggregate_column.min_value = statistics.min_value;
653
9
                aggregate_column.has_min = true;
654
9
            }
655
18
            if (!aggregate_column.has_max || aggregate_column.max_value < statistics.max_value) {
656
18
                aggregate_column.max_value = statistics.max_value;
657
18
                aggregate_column.has_max = true;
658
18
            }
659
18
        }
660
9
        if (!aggregate_column.has_min || !aggregate_column.has_max) {
661
0
            return Status::NotSupported("No parquet row group selected for min/max pushdown");
662
0
        }
663
9
    }
664
7
    return Status::OK();
665
11
}
666
667
65
Status ParquetReader::close() {
668
65
    if (_state != nullptr) {
669
65
        _sync_page_cache_profile();
670
65
        RETURN_IF_ERROR(_state->file_context.close());
671
65
    }
672
65
    return FileReader::close();
673
65
}
674
675
108
void ParquetReader::_init_profile() {
676
108
    _parquet_profile.init(_profile);
677
108
}
678
679
} // namespace doris::format::parquet