Coverage Report

Created: 2026-07-04 02:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_predicate.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
22
#include <cmath>
23
#include <cstring>
24
#include <vector>
25
26
#include "cctz/time_zone.h"
27
#include "core/data_type/data_type_decimal.h"
28
#include "core/data_type/primitive_type.h"
29
#include "exec/common/endian.h"
30
#include "format/format_common.h"
31
#include "format/parquet/parquet_block_split_bloom_filter.h"
32
#include "format/parquet/parquet_column_convert.h"
33
#include "format/parquet/parquet_common.h"
34
#include "format/parquet/schema_desc.h"
35
#include "storage/olap_scan_common.h"
36
#include "storage/segment/row_ranges.h"
37
#include "util/timezone_utils.h"
38
39
namespace doris {
40
class ParquetPredicate {
41
private:
42
51
    static inline bool _is_ascii(uint8_t byte) { return byte < 128; }
43
44
14
    static int _common_prefix(const std::string& encoding_min, const std::string& encoding_max) {
45
14
        size_t min_length = std::min(encoding_min.size(), encoding_max.size());
46
14
        int common_length = 0;
47
49
        while (common_length < min_length &&
48
49
               encoding_min[common_length] == encoding_max[common_length]) {
49
35
            common_length++;
50
35
        }
51
14
        return common_length;
52
14
    }
53
54
15
    static bool _try_read_old_utf8_stats(std::string& encoding_min, std::string& encoding_max) {
55
15
        if (encoding_min == encoding_max) {
56
            // If min = max, then there is a single value only
57
            // No need to modify, just use min
58
1
            encoding_max = encoding_min;
59
1
            return true;
60
14
        } else {
61
14
            int common_prefix_length = _common_prefix(encoding_min, encoding_max);
62
63
            // For min we can retain all-ASCII, because this produces a strictly lower value.
64
14
            int min_good_length = common_prefix_length;
65
28
            while (min_good_length < encoding_min.size() &&
66
28
                   _is_ascii(static_cast<uint8_t>(encoding_min[min_good_length]))) {
67
14
                min_good_length++;
68
14
            }
69
70
            // For max we can be sure only of the part matching the min. When they differ, we can consider only one next, and only if both are ASCII
71
14
            int max_good_length = common_prefix_length;
72
14
            if (max_good_length < encoding_max.size() && max_good_length < encoding_min.size() &&
73
14
                _is_ascii(static_cast<uint8_t>(encoding_min[max_good_length])) &&
74
14
                _is_ascii(static_cast<uint8_t>(encoding_max[max_good_length]))) {
75
7
                max_good_length++;
76
7
            }
77
            // Incrementing 127 would overflow. Incrementing within non-ASCII can have side-effects.
78
19
            while (max_good_length > 0 &&
79
19
                   (static_cast<uint8_t>(encoding_max[max_good_length - 1]) == 127 ||
80
15
                    !_is_ascii(static_cast<uint8_t>(encoding_max[max_good_length - 1])))) {
81
5
                max_good_length--;
82
5
            }
83
14
            if (max_good_length == 0) {
84
                // We can return just min bound, but code downstream likely expects both are present or both are absent.
85
4
                return false;
86
4
            }
87
88
10
            encoding_min.resize(min_good_length);
89
10
            encoding_max.resize(max_good_length);
90
10
            if (max_good_length > 0) {
91
10
                encoding_max[max_good_length - 1]++;
92
10
            }
93
10
            return true;
94
14
        }
95
15
    }
96
97
0
    static SortOrder _determine_sort_order(const tparquet::SchemaElement& parquet_schema) {
98
0
        tparquet::Type::type physical_type = parquet_schema.type;
99
0
        const tparquet::LogicalType& logical_type = parquet_schema.logicalType;
100
101
        // Assume string type is SortOrder::SIGNED, use ParquetPredicate::_try_read_old_utf8_stats() to handle it.
102
0
        if (logical_type.__isset.STRING &&
103
0
            (physical_type == tparquet::Type::BYTE_ARRAY ||
104
0
             physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY)) {
105
0
            return SortOrder::SIGNED;
106
0
        }
107
108
0
        if (logical_type.__isset.INTEGER) {
109
0
            if (logical_type.INTEGER.isSigned) {
110
0
                return SortOrder::SIGNED;
111
0
            } else {
112
0
                return SortOrder::UNSIGNED;
113
0
            }
114
0
        } else if (logical_type.__isset.DATE) {
115
0
            return SortOrder::SIGNED;
116
0
        } else if (logical_type.__isset.ENUM) {
117
0
            return SortOrder::UNSIGNED;
118
0
        } else if (logical_type.__isset.BSON) {
119
0
            return SortOrder::UNSIGNED;
120
0
        } else if (logical_type.__isset.JSON) {
121
0
            return SortOrder::UNSIGNED;
122
0
        } else if (logical_type.__isset.STRING) {
123
0
            return SortOrder::UNSIGNED;
124
0
        } else if (logical_type.__isset.DECIMAL) {
125
0
            return SortOrder::UNKNOWN;
126
0
        } else if (logical_type.__isset.MAP) {
127
0
            return SortOrder::UNKNOWN;
128
0
        } else if (logical_type.__isset.LIST) {
129
0
            return SortOrder::UNKNOWN;
130
0
        } else if (logical_type.__isset.TIME) {
131
0
            return SortOrder::SIGNED;
132
0
        } else if (logical_type.__isset.TIMESTAMP) {
133
0
            return SortOrder::SIGNED;
134
0
        } else if (logical_type.__isset.UNKNOWN) {
135
0
            return SortOrder::UNKNOWN;
136
0
        } else {
137
0
            switch (physical_type) {
138
0
            case tparquet::Type::BOOLEAN:
139
0
            case tparquet::Type::INT32:
140
0
            case tparquet::Type::INT64:
141
0
            case tparquet::Type::FLOAT:
142
0
            case tparquet::Type::DOUBLE:
143
0
                return SortOrder::SIGNED;
144
0
            case tparquet::Type::BYTE_ARRAY:
145
0
            case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
146
0
                return SortOrder::UNSIGNED;
147
0
            case tparquet::Type::INT96:
148
0
                return SortOrder::UNKNOWN;
149
0
            default:
150
0
                return SortOrder::UNKNOWN;
151
0
            }
152
0
        }
153
0
    }
154
155
public:
156
    static constexpr int BLOOM_FILTER_MAX_HEADER_LENGTH = 64;
157
    struct ColumnStat {
158
        std::string encoded_min_value;
159
        std::string encoded_max_value;
160
        bool has_null;
161
        bool is_all_null;
162
        const FieldSchema* col_schema;
163
        const cctz::time_zone* ctz;
164
        std::unique_ptr<ParquetBlockSplitBloomFilter> bloom_filter;
165
        std::function<bool(ParquetPredicate::ColumnStat*, const int)>* get_stat_func = nullptr;
166
        std::function<bool(ParquetPredicate::ColumnStat*, const int)>* get_bloom_filter_func =
167
                nullptr;
168
    };
169
170
8
    static bool bloom_filter_supported(PrimitiveType type) {
171
        // Only support types where physical type == logical type (no conversion needed)
172
        // For types like DATEV2, DATETIMEV2, DECIMAL, Parquet stores them in physical format
173
        // (INT32, INT64, etc.) but Doris uses different internal representations.
174
        // Bloom filter works with physical bytes, but we only have logical type values,
175
        // and there's no reverse conversion (logical -> physical) available.
176
        // TINYINT/SMALLINT also need conversion via LittleIntPhysicalConverter.
177
8
        switch (type) {
178
0
        case TYPE_BOOLEAN:
179
8
        case TYPE_INT:
180
8
        case TYPE_BIGINT:
181
8
        case TYPE_FLOAT:
182
8
        case TYPE_DOUBLE:
183
8
        case TYPE_CHAR:
184
8
        case TYPE_VARCHAR:
185
8
        case TYPE_STRING:
186
8
            return true;
187
0
        default:
188
0
            return false;
189
8
        }
190
8
    }
191
192
    struct PageIndexStat {
193
        // Indicates whether the page index information in this column can be used.
194
        bool available = false;
195
        int64_t num_of_pages;
196
        std::vector<std::string> encoded_min_value;
197
        std::vector<std::string> encoded_max_value;
198
        std::vector<bool> has_null;
199
        std::vector<bool> is_all_null;
200
        const FieldSchema* col_schema;
201
202
        // Record the row range corresponding to each page.
203
        std::vector<segment_v2::RowRange> ranges;
204
    };
205
206
    struct CachedPageIndexStat {
207
        const cctz::time_zone* ctz;
208
        std::map<int, PageIndexStat> stats;
209
        std::function<bool(PageIndexStat**, int)> get_stat_func;
210
        RowRange row_group_range;
211
    };
212
213
    // The encoded Parquet min-max value is parsed into `fields`;
214
    // Can be used in row groups and page index statistics.
215
    static Status parse_min_max_value(const FieldSchema* col_schema, const std::string& encoded_min,
216
                                      const std::string& encoded_max, const cctz::time_zone& ctz,
217
5.91k
                                      Field* min_field, Field* max_field) {
218
5.91k
        auto logical_data_type = remove_nullable(col_schema->data_type);
219
5.91k
        auto converter = parquet::PhysicalToLogicalConverter::get_converter(
220
5.91k
                col_schema, logical_data_type, logical_data_type, &ctz);
221
5.91k
        ColumnPtr physical_column;
222
5.91k
        switch (col_schema->parquet_schema.type) {
223
2
        case tparquet::Type::type::BOOLEAN: {
224
2
            auto physical_col = ColumnUInt8::create();
225
2
            physical_col->get_data().data();
226
2
            physical_col->resize(2);
227
2
            physical_col->get_data()[0] = *reinterpret_cast<const bool*>(encoded_min.data());
228
2
            physical_col->get_data()[1] = *reinterpret_cast<const bool*>(encoded_max.data());
229
2
            physical_column = std::move(physical_col);
230
2
            break;
231
0
        }
232
5.86k
        case tparquet::Type::type::INT32: {
233
5.86k
            auto physical_col = ColumnInt32::create();
234
5.86k
            physical_col->resize(2);
235
236
5.86k
            physical_col->get_data()[0] = *reinterpret_cast<const int32_t*>(encoded_min.data());
237
5.86k
            physical_col->get_data()[1] = *reinterpret_cast<const int32_t*>(encoded_max.data());
238
239
5.86k
            physical_column = std::move(physical_col);
240
5.86k
            break;
241
0
        }
242
27
        case tparquet::Type::type::INT64: {
243
27
            auto physical_col = ColumnInt64::create();
244
27
            physical_col->resize(2);
245
27
            physical_col->get_data()[0] = *reinterpret_cast<const int64_t*>(encoded_min.data());
246
27
            physical_col->get_data()[1] = *reinterpret_cast<const int64_t*>(encoded_max.data());
247
27
            physical_column = std::move(physical_col);
248
27
            break;
249
0
        }
250
14
        case tparquet::Type::type::FLOAT: {
251
14
            auto physical_col = ColumnFloat32::create();
252
14
            physical_col->resize(2);
253
14
            physical_col->get_data()[0] = *reinterpret_cast<const float*>(encoded_min.data());
254
14
            physical_col->get_data()[1] = *reinterpret_cast<const float*>(encoded_max.data());
255
14
            physical_column = std::move(physical_col);
256
14
            break;
257
0
        }
258
0
        case tparquet::Type::type::DOUBLE: {
259
0
            auto physical_col = ColumnFloat64 ::create();
260
0
            physical_col->resize(2);
261
0
            physical_col->get_data()[0] = *reinterpret_cast<const double*>(encoded_min.data());
262
0
            physical_col->get_data()[1] = *reinterpret_cast<const double*>(encoded_max.data());
263
0
            physical_column = std::move(physical_col);
264
0
            break;
265
0
        }
266
3
        case tparquet::Type::type::BYTE_ARRAY: {
267
3
            auto physical_col = ColumnString::create();
268
3
            physical_col->insert_data(encoded_min.data(), encoded_min.size());
269
3
            physical_col->insert_data(encoded_max.data(), encoded_max.size());
270
3
            physical_column = std::move(physical_col);
271
3
            break;
272
0
        }
273
2
        case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: {
274
2
            auto physical_col = ColumnUInt8::create();
275
2
            physical_col->resize(2 * col_schema->parquet_schema.type_length);
276
2
            DCHECK(col_schema->parquet_schema.type_length == encoded_min.length());
277
2
            DCHECK(col_schema->parquet_schema.type_length == encoded_max.length());
278
279
2
            auto ptr = physical_col->get_data().data();
280
2
            memcpy(ptr, encoded_min.data(), encoded_min.length());
281
2
            memcpy(ptr + encoded_min.length(), encoded_max.data(), encoded_max.length());
282
2
            physical_column = std::move(physical_col);
283
2
            break;
284
0
        }
285
0
        case tparquet::Type::type::INT96: {
286
0
            auto physical_col = ColumnInt8::create();
287
0
            physical_col->resize(2 * sizeof(ParquetInt96));
288
0
            DCHECK(sizeof(ParquetInt96) == encoded_min.length());
289
0
            DCHECK(sizeof(ParquetInt96) == encoded_max.length());
290
291
0
            auto ptr = physical_col->get_data().data();
292
0
            memcpy(ptr, encoded_min.data(), encoded_min.length());
293
0
            memcpy(ptr + encoded_min.length(), encoded_max.data(), encoded_max.length());
294
0
            physical_column = std::move(physical_col);
295
0
            break;
296
0
        }
297
5.91k
        }
298
299
5.91k
        ColumnPtr logical_column;
300
5.91k
        if (converter->is_consistent()) {
301
5.90k
            logical_column = physical_column;
302
5.90k
        } else {
303
4
            logical_column = logical_data_type->create_column();
304
4
            RETURN_IF_ERROR(converter->physical_convert(physical_column, logical_column));
305
4
        }
306
307
5.91k
        DCHECK(logical_column->size() == 2);
308
5.91k
        *min_field = logical_column->operator[](0);
309
5.91k
        *max_field = logical_column->operator[](1);
310
311
5.91k
        auto logical_prim_type = logical_data_type->get_primitive_type();
312
313
5.91k
        if (logical_prim_type == TYPE_FLOAT) {
314
11
            auto& min_value = min_field->get<TYPE_FLOAT>();
315
11
            auto& max_value = max_field->get<TYPE_FLOAT>();
316
317
11
            if (std::isnan(min_value) || std::isnan(max_value)) {
318
1
                return Status::DataQualityError("Can not use this parquet min/max value.");
319
1
            }
320
            // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
321
10
            if (std::signbit(min_value) == 0 && min_value == 0.0F) {
322
0
                min_value = -0.0F;
323
0
            }
324
10
            if (std::signbit(max_value) != 0 && max_value == -0.0F) {
325
0
                max_value = 0.0F;
326
0
            }
327
5.90k
        } else if (logical_prim_type == TYPE_DOUBLE) {
328
0
            auto& min_value = min_field->get<TYPE_DOUBLE>();
329
0
            auto& max_value = max_field->get<TYPE_DOUBLE>();
330
331
0
            if (std::isnan(min_value) || std::isnan(max_value)) {
332
0
                return Status::DataQualityError("Can not use this parquet min/max value.");
333
0
            }
334
            // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
335
0
            if (std::signbit(min_value) == 0 && min_value == 0.0F) {
336
0
                min_value = -0.0F;
337
0
            }
338
0
            if (std::signbit(max_value) != 0 && max_value == -0.0F) {
339
0
                max_value = 0.0F;
340
0
            }
341
5.90k
        } else if (col_schema->parquet_schema.type == tparquet::Type::type::INT96 ||
342
5.90k
                   logical_prim_type == TYPE_DATETIMEV2) {
343
1
            auto min_value = min_field->get<TYPE_DATETIMEV2>();
344
1
            auto max_value = min_field->get<TYPE_DATETIMEV2>();
345
346
            // From Trino: Parquet INT96 timestamp values were compared incorrectly
347
            // for the purposes of producing statistics by older parquet writers,
348
            // so PARQUET-1065 deprecated them. The result is that any writer that produced stats
349
            // was producing unusable incorrect values, except the special case where min == max
350
            // and an incorrect ordering would not be material to the result.
351
            // PARQUET-1026 made binary stats available and valid in that special case.
352
1
            if (min_value != max_value) {
353
0
                return Status::DataQualityError("invalid min/max value");
354
0
            }
355
1
        }
356
357
5.91k
        return Status::OK();
358
5.91k
    }
359
360
    static Status read_column_stats(const FieldSchema* col_schema,
361
                                    const tparquet::ColumnMetaData& column_meta_data,
362
                                    std::unordered_map<tparquet::Type::type, bool>* ignored_stats,
363
660
                                    const std::string& file_created_by, ColumnStat* ans_stat) {
364
660
        auto& statistic = column_meta_data.statistics;
365
366
660
        if (!statistic.__isset.null_count) [[unlikely]] {
367
0
            return Status::DataQualityError("This parquet Column meta no set null_count.");
368
0
        }
369
660
        ans_stat->has_null = statistic.null_count > 0;
370
660
        ans_stat->is_all_null = statistic.null_count == column_meta_data.num_values;
371
660
        if (ans_stat->is_all_null) {
372
4
            return Status::OK();
373
4
        }
374
656
        auto prim_type = remove_nullable(col_schema->data_type)->get_primitive_type();
375
376
        // Min-max of statistic is plain-encoded value
377
656
        if (statistic.__isset.min_value && statistic.__isset.max_value) {
378
656
            ColumnOrderName column_order =
379
656
                    col_schema->physical_type == tparquet::Type::INT96 ||
380
656
                                    col_schema->parquet_schema.logicalType.__isset.UNKNOWN
381
656
                            ? ColumnOrderName::UNDEFINED
382
656
                            : ColumnOrderName::TYPE_DEFINED_ORDER;
383
656
            if ((statistic.min_value != statistic.max_value) &&
384
656
                (column_order != ColumnOrderName::TYPE_DEFINED_ORDER)) {
385
0
                return Status::DataQualityError("Can not use this parquet min/max value.");
386
0
            }
387
656
            ans_stat->encoded_min_value = statistic.min_value;
388
656
            ans_stat->encoded_max_value = statistic.max_value;
389
390
656
            if (prim_type == TYPE_VARCHAR || prim_type == TYPE_CHAR || prim_type == TYPE_STRING) {
391
2
                auto encoded_min_copy = ans_stat->encoded_min_value;
392
2
                auto encoded_max_copy = ans_stat->encoded_max_value;
393
2
                if (!_try_read_old_utf8_stats(encoded_min_copy, encoded_max_copy)) {
394
0
                    return Status::DataQualityError("Can not use this parquet min/max value.");
395
0
                }
396
2
                ans_stat->encoded_min_value = encoded_min_copy;
397
2
                ans_stat->encoded_max_value = encoded_max_copy;
398
2
            }
399
400
656
        } else if (statistic.__isset.min && statistic.__isset.max) {
401
0
            bool max_equals_min = statistic.min == statistic.max;
402
403
0
            SortOrder sort_order = _determine_sort_order(col_schema->parquet_schema);
404
0
            bool sort_orders_match = SortOrder::SIGNED == sort_order;
405
0
            if (!sort_orders_match && !max_equals_min) {
406
0
                return Status::NotSupported("Can not use this parquet min/max value.");
407
0
            }
408
409
0
            bool should_ignore_corrupted_stats = false;
410
0
            if (ignored_stats != nullptr) {
411
0
                if (ignored_stats->count(col_schema->physical_type) == 0) {
412
0
                    if (CorruptStatistics::should_ignore_statistics(file_created_by,
413
0
                                                                    col_schema->physical_type)) {
414
0
                        ignored_stats->emplace(col_schema->physical_type, true);
415
0
                        should_ignore_corrupted_stats = true;
416
0
                    } else {
417
0
                        ignored_stats->emplace(col_schema->physical_type, false);
418
0
                    }
419
0
                } else if (ignored_stats->at(col_schema->physical_type)) {
420
0
                    should_ignore_corrupted_stats = true;
421
0
                }
422
0
            } else if (CorruptStatistics::should_ignore_statistics(file_created_by,
423
0
                                                                   col_schema->physical_type)) {
424
0
                should_ignore_corrupted_stats = true;
425
0
            }
426
427
0
            if (should_ignore_corrupted_stats) {
428
0
                return Status::DataQualityError("Error statistics, should ignore.");
429
0
            }
430
431
0
            ans_stat->encoded_min_value = statistic.min;
432
0
            ans_stat->encoded_max_value = statistic.max;
433
0
        } else {
434
0
            return Status::DataQualityError("This parquet file not set min/max value");
435
0
        }
436
437
656
        return Status::OK();
438
656
    }
439
440
    static Status read_bloom_filter(const tparquet::ColumnMetaData& column_meta_data,
441
                                    io::FileReaderSPtr file_reader, io::IOContext* io_ctx,
442
12
                                    ColumnStat* ans_stat) {
443
12
        size_t size;
444
12
        if (!column_meta_data.__isset.bloom_filter_offset) {
445
0
            return Status::NotSupported("Can not use this parquet bloom filter.");
446
0
        }
447
448
12
        if (column_meta_data.__isset.bloom_filter_length &&
449
12
            column_meta_data.bloom_filter_length > 0) {
450
12
            size = column_meta_data.bloom_filter_length;
451
12
        } else {
452
0
            size = BLOOM_FILTER_MAX_HEADER_LENGTH;
453
0
        }
454
12
        size_t bytes_read = 0;
455
12
        std::vector<uint8_t> header_buffer(size);
456
12
        RETURN_IF_ERROR(file_reader->read_at(column_meta_data.bloom_filter_offset,
457
12
                                             Slice(header_buffer.data(), size), &bytes_read,
458
12
                                             io_ctx));
459
460
12
        tparquet::BloomFilterHeader t_bloom_filter_header;
461
12
        uint32_t t_bloom_filter_header_size = static_cast<uint32_t>(bytes_read);
462
12
        RETURN_IF_ERROR(deserialize_thrift_msg(header_buffer.data(), &t_bloom_filter_header_size,
463
12
                                               true, &t_bloom_filter_header));
464
465
        // TODO the bloom filter could be encrypted, too, so need to double check that this is NOT the case
466
12
        if (!t_bloom_filter_header.algorithm.__isset.BLOCK ||
467
12
            !t_bloom_filter_header.compression.__isset.UNCOMPRESSED ||
468
12
            !t_bloom_filter_header.hash.__isset.XXHASH) {
469
0
            return Status::NotSupported("Can not use this parquet bloom filter.");
470
0
        }
471
472
12
        ans_stat->bloom_filter = std::make_unique<ParquetBlockSplitBloomFilter>();
473
474
12
        std::vector<uint8_t> data_buffer(t_bloom_filter_header.numBytes);
475
12
        RETURN_IF_ERROR(file_reader->read_at(
476
12
                column_meta_data.bloom_filter_offset + t_bloom_filter_header_size,
477
12
                Slice(data_buffer.data(), t_bloom_filter_header.numBytes), &bytes_read, io_ctx));
478
479
12
        RETURN_IF_ERROR(ans_stat->bloom_filter->init(
480
12
                reinterpret_cast<const char*>(data_buffer.data()), t_bloom_filter_header.numBytes,
481
12
                segment_v2::HashStrategyPB::XX_HASH_64));
482
483
12
        return Status::OK();
484
12
    }
485
};
486
487
} // namespace doris