Coverage Report

Created: 2026-06-25 12:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/parquet/parquet_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format_v2/parquet/parquet_reader.h"
19
20
#include <algorithm>
21
#include <map>
22
#include <memory>
23
#include <optional>
24
#include <ranges>
25
#include <utility>
26
#include <vector>
27
28
#include "core/assert_cast.h"
29
#include "core/block/block.h"
30
#include "core/data_type/data_type_array.h"
31
#include "core/data_type/data_type_factory.hpp"
32
#include "core/data_type/data_type_map.h"
33
#include "core/data_type/data_type_nullable.h"
34
#include "core/data_type/data_type_struct.h"
35
#include "format_v2/column_mapper.h"
36
#include "format_v2/parquet/parquet_column_schema.h"
37
#include "format_v2/parquet/parquet_file_context.h"
38
#include "format_v2/parquet/parquet_scan.h"
39
#include "format_v2/parquet/parquet_statistics.h"
40
#include "format_v2/parquet/reader/column_reader.h"
41
#include "runtime/runtime_state.h"
42
43
namespace doris::format::parquet {
44
45
struct ParquetReaderScanState {
46
    ParquetFileContext file_context;
47
    std::vector<std::unique_ptr<ParquetColumnSchema>> file_schema;
48
    RowGroupScanPlan scan_plan;
49
    ParquetScanScheduler scheduler;
50
    const cctz::time_zone* timezone = nullptr;
51
    bool enable_bloom_filter = false;
52
};
53
54
0
DataTypePtr nullable_like_original(const DataTypePtr& type, DataTypePtr nested_type) {
55
0
    return type != nullptr && type->is_nullable() ? make_nullable(nested_type) : nested_type;
56
0
}
57
58
0
int timestamp_tz_scale(const ParquetTypeDescriptor& type_descriptor) {
59
0
    switch (type_descriptor.time_unit) {
60
0
    case ParquetTimeUnit::MILLIS:
61
0
        return 3;
62
0
    case ParquetTimeUnit::MICROS:
63
0
    case ParquetTimeUnit::UNKNOWN:
64
0
    default:
65
0
        return 6;
66
0
    }
67
0
}
68
69
1
bool should_map_to_timestamp_tz(const ParquetColumnSchema& column_schema) {
70
1
    const auto& type_descriptor = column_schema.type_descriptor;
71
1
    return type_descriptor.is_timestamp && type_descriptor.timestamp_is_adjusted_to_utc;
72
1
}
73
74
1
DataTypePtr apply_timestamp_tz_mapping(ParquetColumnSchema* column_schema) {
75
1
    DORIS_CHECK(column_schema != nullptr);
76
1
    if (column_schema->kind == ParquetColumnSchemaKind::PRIMITIVE) {
77
1
        if (should_map_to_timestamp_tz(*column_schema)) {
78
0
            const bool nullable =
79
0
                    column_schema->type != nullptr && column_schema->type->is_nullable();
80
0
            const auto scale = timestamp_tz_scale(column_schema->type_descriptor);
81
0
            column_schema->type = DataTypeFactory::instance().create_data_type(TYPE_TIMESTAMPTZ,
82
0
                                                                               nullable, 0, scale);
83
0
            column_schema->type_descriptor.doris_type = column_schema->type;
84
0
        }
85
1
        return column_schema->type;
86
1
    }
87
88
0
    std::vector<DataTypePtr> child_types;
89
0
    child_types.reserve(column_schema->children.size());
90
0
    for (auto& child : column_schema->children) {
91
0
        child_types.push_back(apply_timestamp_tz_mapping(child.get()));
92
0
    }
93
94
0
    if (column_schema->kind == ParquetColumnSchemaKind::LIST) {
95
0
        DORIS_CHECK(child_types.size() == 1);
96
0
        column_schema->type = nullable_like_original(
97
0
                column_schema->type, std::make_shared<DataTypeArray>(child_types[0]));
98
0
    } else if (column_schema->kind == ParquetColumnSchemaKind::MAP) {
99
0
        DORIS_CHECK(child_types.size() == 2);
100
0
        column_schema->type = nullable_like_original(
101
0
                column_schema->type, std::make_shared<DataTypeMap>(make_nullable(child_types[0]),
102
0
                                                                   make_nullable(child_types[1])));
103
0
    } else if (column_schema->kind == ParquetColumnSchemaKind::STRUCT) {
104
0
        Strings child_names;
105
0
        child_names.reserve(column_schema->children.size());
106
0
        for (const auto& child : column_schema->children) {
107
0
            child_names.push_back(child->name);
108
0
        }
109
0
        column_schema->type = nullable_like_original(
110
0
                column_schema->type, std::make_shared<DataTypeStruct>(child_types, child_names));
111
0
    }
112
0
    return column_schema->type;
113
1
}
114
115
static Status find_projected_minmax_leaf(const ParquetColumnSchema& column_schema,
116
                                         const format::LocalColumnIndex& projection,
117
14
                                         const ParquetColumnSchema** leaf_schema) {
118
14
    DORIS_CHECK(leaf_schema != nullptr);
119
14
    if (projection.project_all_children || projection.children.empty()) {
120
12
        if (column_schema.leaf_column_id < 0) {
121
2
            return Status::NotSupported(
122
2
                    "Parquet aggregate pushdown only supports primitive column {}",
123
2
                    column_schema.name);
124
2
        }
125
10
        if (column_schema.max_repetition_level > 0) {
126
0
            return Status::NotSupported(
127
0
                    "Parquet aggregate pushdown does not support repeated column {}",
128
0
                    column_schema.name);
129
0
        }
130
10
        *leaf_schema = &column_schema;
131
10
        return Status::OK();
132
10
    }
133
2
    if (projection.children.size() != 1) {
134
0
        return Status::NotSupported(
135
0
                "Parquet aggregate pushdown only supports a single nested leaf under column {}",
136
0
                column_schema.name);
137
0
    }
138
2
    const auto& child_projection = projection.children[0];
139
2
    const auto child_schema_it =
140
2
            std::ranges::find_if(column_schema.children, [&](const auto& child_schema) {
141
2
                return child_schema->local_id == child_projection.local_id();
142
2
            });
143
2
    if (child_schema_it != column_schema.children.end()) {
144
2
        return find_projected_minmax_leaf(**child_schema_it, child_projection, leaf_schema);
145
2
    }
146
0
    return Status::InvalidArgument("Invalid parquet aggregate projection local id {} for column {}",
147
0
                                   child_projection.local_id(), column_schema.name);
148
2
}
149
150
void ParquetReader::_fill_column_definition(const ParquetColumnSchema& column_schema,
151
237
                                            format::ColumnDefinition* field) const {
152
237
    if (column_schema.parquet_field_id >= 0) {
153
95
        field->identifier = Field::create_field<TYPE_INT>(column_schema.parquet_field_id);
154
142
    } else {
155
142
        field->identifier = Field::create_field<TYPE_STRING>(column_schema.name);
156
142
    }
157
237
    field->local_id = column_schema.local_id;
158
237
    field->name = column_schema.name;
159
237
    field->type = column_schema.type != nullptr && !column_schema.type->is_nullable()
160
237
                          ? make_nullable(column_schema.type)
161
237
                          : column_schema.type;
162
237
    field->children.clear();
163
237
    field->children.reserve(column_schema.children.size());
164
237
    for (const auto& child : column_schema.children) {
165
26
        format::ColumnDefinition child_field;
166
26
        _fill_column_definition(*child, &child_field);
167
26
        field->children.push_back(std::move(child_field));
168
26
    }
169
237
}
170
171
ParquetReader::ParquetReader(std::shared_ptr<io::FileSystemProperties>& system_properties,
172
                             std::unique_ptr<io::FileDescription>& file_description,
173
                             std::shared_ptr<io::IOContext> io_ctx, RuntimeProfile* profile,
174
                             std::optional<format::GlobalRowIdContext> global_rowid_context,
175
                             bool enable_mapping_timestamp_tz)
176
103
        : FileReader(system_properties, file_description, io_ctx, profile),
177
103
          _global_rowid_context(global_rowid_context),
178
103
          _enable_mapping_timestamp_tz(enable_mapping_timestamp_tz) {}
179
180
103
ParquetReader::~ParquetReader() = default;
181
182
102
Status ParquetReader::init(RuntimeState* state) {
183
102
    RETURN_IF_ERROR(format::FileReader::init(state));
184
102
    if (_profile != nullptr) {
185
23
        COUNTER_UPDATE(_parquet_profile.file_reader_create_time,
186
23
                       _reader_statistics.file_reader_create_time);
187
23
        COUNTER_UPDATE(_parquet_profile.open_file_num, _reader_statistics.open_file_num);
188
23
    }
189
102
    _state = std::make_unique<ParquetReaderScanState>();
190
102
    _state->enable_bloom_filter =
191
102
            state != nullptr && state->query_options().enable_parquet_filter_by_bloom_filter;
192
102
    if (state != nullptr) {
193
102
        _state->timezone = &state->timezone_obj();
194
102
        _state->scheduler.set_timezone(&state->timezone_obj());
195
102
        _state->scheduler.set_enable_strict_mode(state->enable_strict_mode());
196
102
    }
197
    // Open parquet file and parse metadata to get file schema.
198
102
    RETURN_IF_ERROR(_state->file_context.open(_tracing_file_reader, _io_ctx.get()));
199
    // Build file schema from parquet metadata.
200
    // A file reader may expose raw file identifiers, such as Parquet field_id, through ColumnDefinition::identifier
201
102
    RETURN_IF_ERROR(
202
102
            build_parquet_column_schema(*_state->file_context.schema, &_state->file_schema));
203
102
    if (_enable_mapping_timestamp_tz) {
204
1
        for (auto& column_schema : _state->file_schema) {
205
1
            apply_timestamp_tz_mapping(column_schema.get());
206
1
        }
207
1
    }
208
102
    return Status::OK();
209
102
}
210
211
97
Status ParquetReader::get_schema(std::vector<format::ColumnDefinition>* file_schema) const {
212
97
    if (file_schema == nullptr) {
213
0
        return Status::InvalidArgument("file_schema is null");
214
0
    }
215
97
    file_schema->clear();
216
97
    if (_state == nullptr || _state->file_context.schema == nullptr) {
217
0
        return Status::Uninitialized("ParquetReader is not open");
218
0
    }
219
220
97
    file_schema->reserve(_state->file_schema.size());
221
308
    for (size_t column_idx = 0; column_idx < _state->file_schema.size(); ++column_idx) {
222
211
        format::ColumnDefinition field;
223
211
        _fill_column_definition(*_state->file_schema[column_idx], &field);
224
211
        DORIS_CHECK(field.local_id == static_cast<int32_t>(column_idx));
225
211
        file_schema->push_back(std::move(field));
226
211
    }
227
97
    if (_global_rowid_context.has_value()) {
228
2
        file_schema->push_back(format::global_rowid_column_definition());
229
2
    }
230
97
    return Status::OK();
231
97
}
232
233
std::unique_ptr<format::TableColumnMapper> ParquetReader::create_column_mapper(
234
58
        format::TableColumnMapperOptions options) const {
235
58
    return std::make_unique<format::ParquetColumnMapper>(std::move(options));
236
58
}
237
238
98
Status ParquetReader::open(std::shared_ptr<format::FileScanRequest> request) {
239
98
    if (_state == nullptr || _state->file_context.metadata == nullptr ||
240
98
        _state->file_context.schema == nullptr) {
241
0
        return Status::Uninitialized("ParquetReader is not open");
242
0
    }
243
98
    auto request_snapshot = request;
244
98
    DORIS_CHECK(request_snapshot != nullptr);
245
98
    RETURN_IF_ERROR(format::FileReader::open(std::move(request)));
246
247
98
    const int num_fields = static_cast<int>(_state->file_schema.size());
248
98
    for (const auto& column_filter : request_snapshot->column_predicate_filters) {
249
17
        const auto file_column_id = column_filter.effective_file_column_id();
250
17
        if (!file_column_id.is_valid() || file_column_id.value() >= num_fields) {
251
0
            return Status::InvalidArgument("Invalid parquet filter top-level local id {}",
252
0
                                           file_column_id.value());
253
0
        }
254
17
    }
255
256
    // `local_positions.empty()` means all columns are needed by table reader
257
    // TODO(gabriel): It will happen only for TVF `select *` query.
258
98
    if (request_snapshot->local_positions.empty()) {
259
26
        for (const auto& col : request_snapshot->predicate_columns) {
260
9
            request_snapshot->local_positions.emplace(col.column_id(),
261
9
                                                      format::LocalIndex(col.column_id().value()));
262
9
        }
263
26
        for (const auto& col : request_snapshot->non_predicate_columns) {
264
11
            request_snapshot->local_positions.emplace(col.column_id(),
265
11
                                                      format::LocalIndex(col.column_id().value()));
266
11
        }
267
26
    }
268
269
98
    for (const auto& col : request_snapshot->predicate_columns) {
270
42
        DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0);
271
42
        const auto local_id = col.local_id();
272
42
        if (local_id == format::ROW_POSITION_COLUMN_ID ||
273
42
            local_id == format::GLOBAL_ROWID_COLUMN_ID) {
274
11
            continue;
275
11
        }
276
31
        DORIS_CHECK(local_id >= 0 && local_id < num_fields);
277
31
    }
278
105
    for (const auto& col : request_snapshot->non_predicate_columns) {
279
105
        DORIS_CHECK(request_snapshot->local_positions.count(col.column_id()) > 0);
280
105
        const auto local_id = col.local_id();
281
105
        if (local_id == format::ROW_POSITION_COLUMN_ID ||
282
105
            local_id == format::GLOBAL_ROWID_COLUMN_ID) {
283
14
            continue;
284
14
        }
285
91
        DORIS_CHECK(local_id >= 0 && local_id < num_fields);
286
91
    }
287
288
98
    RowGroupScanPlan row_group_plan;
289
98
    ParquetScanRange scan_range;
290
98
    scan_range.start_offset = _file_description->range_start_offset;
291
98
    scan_range.size = _file_description->range_size;
292
98
    scan_range.file_size = _file_description->file_size;
293
    // Get selected ranges in row groups according to metadata (Row-Group level index and Page Index including Zonemap, Dictionary, Bloom Filter).
294
98
    RETURN_IF_ERROR(plan_parquet_row_groups(
295
98
            *_state->file_context.metadata, _state->file_context.file_reader.get(),
296
98
            _state->file_schema, *request_snapshot, scan_range, _state->enable_bloom_filter,
297
98
            &row_group_plan, _state->timezone));
298
98
    if (_profile != nullptr) {
299
23
        _parquet_profile.update_pruning_stats(row_group_plan.pruning_stats);
300
23
    }
301
98
    _state->scan_plan = row_group_plan;
302
98
    _state->scheduler.set_page_skip_profile(_parquet_profile.page_skip_profile());
303
98
    _state->scheduler.set_global_rowid_context(_global_rowid_context);
304
98
    _state->scheduler.set_scan_profile(_parquet_profile.scan_profile());
305
98
    _state->scheduler.set_plan(std::move(row_group_plan));
306
98
    _eof = _state->scheduler.empty();
307
98
    return Status::OK();
308
98
}
309
310
132
Status ParquetReader::get_block(Block* file_block, size_t* rows, bool* eof) {
311
132
    if (_state == nullptr || _state->file_context.file_reader == nullptr ||
312
132
        _state->file_context.schema == nullptr) {
313
0
        return Status::Uninitialized("ParquetReader is not open");
314
0
    }
315
132
    *rows = 0;
316
132
    if (_eof) {
317
1
        *eof = true;
318
1
        return Status::OK();
319
1
    }
320
131
    auto request_snapshot = _request;
321
131
    if (request_snapshot == nullptr) {
322
0
        return Status::Cancelled("ParquetReader is closed");
323
0
    }
324
325
131
    const auto predicate_filtered_rows_before = _state->scheduler.predicate_filtered_rows();
326
131
    RETURN_IF_ERROR(_state->scheduler.read_next_batch(_state->file_context, _state->file_schema,
327
131
                                                      *request_snapshot, file_block, rows, eof));
328
131
    if (_io_ctx != nullptr) {
329
36
        _io_ctx->predicate_filtered_rows +=
330
36
                _state->scheduler.predicate_filtered_rows() - predicate_filtered_rows_before;
331
36
    }
332
131
    _eof = *eof;
333
131
    return Status::OK();
334
131
}
335
336
2
void ParquetReader::set_condition_cache_context(std::shared_ptr<ConditionCacheContext> ctx) {
337
2
    if (_state == nullptr) {
338
0
        return;
339
0
    }
340
2
    _state->scheduler.set_condition_cache_context(std::move(ctx));
341
2
    if (_io_ctx != nullptr) {
342
        // Condition-cache HIT filters row ranges before batch reading, so skipped rows never belong
343
        // to a later get_block() batch. Report the plan-level skipped rows at the same point where
344
        // the scan plan is rewritten.
345
1
        _io_ctx->condition_cache_filtered_rows += _state->scheduler.condition_cache_filtered_rows();
346
1
    }
347
2
}
348
349
0
int64_t ParquetReader::get_total_rows() const {
350
0
    if (_state == nullptr) {
351
0
        return 0;
352
0
    }
353
0
    int64_t rows = 0;
354
0
    for (const auto& row_group_plan : _state->scan_plan.row_groups) {
355
0
        rows += row_group_plan.row_group_rows;
356
0
    }
357
0
    return rows;
358
0
}
359
360
Status ParquetReader::get_aggregate_result(const format::FileAggregateRequest& request,
361
16
                                           format::FileAggregateResult* result) {
362
16
    DORIS_CHECK(result != nullptr);
363
16
    if (_state == nullptr || _state->file_context.metadata == nullptr ||
364
16
        _state->file_context.schema == nullptr) {
365
0
        return Status::Uninitialized("ParquetReader is not open");
366
0
    }
367
16
    result->count = 0;
368
16
    result->columns.clear();
369
16
    if (request.agg_type != TPushAggOp::type::COUNT &&
370
16
        request.agg_type != TPushAggOp::type::MINMAX) {
371
1
        return Status::NotSupported("Unsupported parquet aggregate pushdown type {}",
372
1
                                    request.agg_type);
373
1
    }
374
375
    // Aggregate row count in all selected row groups. For MIN/MAX aggregate, this is used to determine whether there is no row group selected.
376
30
    for (const auto& row_group_plan : _state->scan_plan.row_groups) {
377
30
        auto row_group_metadata =
378
30
                _state->file_context.metadata->RowGroup(row_group_plan.row_group_id);
379
30
        DORIS_CHECK(row_group_metadata != nullptr);
380
30
        result->count += row_group_metadata->num_rows();
381
30
    }
382
15
    if (request.agg_type == TPushAggOp::type::COUNT) {
383
4
        return Status::OK();
384
4
    }
385
386
11
    result->columns.resize(request.columns.size());
387
20
    for (size_t request_column_idx = 0; request_column_idx < request.columns.size();
388
13
         ++request_column_idx) {
389
13
        const auto file_column_id = request.columns[request_column_idx].projection.local_id();
390
13
        if (file_column_id < 0 ||
391
13
            file_column_id >= static_cast<int32_t>(_state->file_schema.size())) {
392
1
            return Status::InvalidArgument("Invalid parquet aggregate column id {}",
393
1
                                           file_column_id);
394
1
        }
395
12
        const auto& column_schema = _state->file_schema[file_column_id];
396
12
        DORIS_CHECK(column_schema != nullptr);
397
12
        const ParquetColumnSchema* leaf_schema = nullptr;
398
12
        RETURN_IF_ERROR(find_projected_minmax_leaf(
399
12
                *column_schema, request.columns[request_column_idx].projection, &leaf_schema));
400
10
        DORIS_CHECK(leaf_schema != nullptr);
401
402
10
        auto& aggregate_column = result->columns[request_column_idx];
403
10
        aggregate_column.projection = request.columns[request_column_idx].projection;
404
19
        for (const auto& row_group_plan : _state->scan_plan.row_groups) {
405
19
            auto row_group_metadata =
406
19
                    _state->file_context.metadata->RowGroup(row_group_plan.row_group_id);
407
19
            DORIS_CHECK(row_group_metadata != nullptr);
408
19
            auto column_chunk = row_group_metadata->ColumnChunk(leaf_schema->leaf_column_id);
409
19
            DORIS_CHECK(column_chunk != nullptr);
410
19
            const auto statistics = ParquetStatisticsUtils::TransformColumnStatistics(
411
19
                    *leaf_schema, column_chunk->statistics(), _state->timezone);
412
19
            if (!statistics.has_min_max) {
413
1
                return Status::NotSupported("Missing parquet min/max statistics for column {}",
414
1
                                            leaf_schema->name);
415
1
            }
416
18
            if (!aggregate_column.has_min || statistics.min_value < aggregate_column.min_value) {
417
9
                aggregate_column.min_value = statistics.min_value;
418
9
                aggregate_column.has_min = true;
419
9
            }
420
18
            if (!aggregate_column.has_max || aggregate_column.max_value < statistics.max_value) {
421
18
                aggregate_column.max_value = statistics.max_value;
422
18
                aggregate_column.has_max = true;
423
18
            }
424
18
        }
425
9
        if (!aggregate_column.has_min || !aggregate_column.has_max) {
426
0
            return Status::NotSupported("No parquet row group selected for min/max pushdown");
427
0
        }
428
9
    }
429
7
    return Status::OK();
430
11
}
431
432
65
Status ParquetReader::close() {
433
65
    if (_state != nullptr) {
434
65
        RETURN_IF_ERROR(_state->file_context.close());
435
65
    }
436
65
    return FileReader::close();
437
65
}
438
439
102
void ParquetReader::_init_profile() {
440
102
    _parquet_profile.init(_profile);
441
102
}
442
443
} // namespace doris::format::parquet