Coverage Report

Created: 2026-03-15 01:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_predicate.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
22
#include <cmath>
23
#include <cstring>
24
#include <vector>
25
26
#include "cctz/time_zone.h"
27
#include "core/data_type/data_type_decimal.h"
28
#include "core/data_type/primitive_type.h"
29
#include "exec/common/endian.h"
30
#include "format/format_common.h"
31
#include "format/parquet/parquet_block_split_bloom_filter.h"
32
#include "format/parquet/parquet_column_convert.h"
33
#include "format/parquet/parquet_common.h"
34
#include "format/parquet/schema_desc.h"
35
#include "storage/olap_scan_common.h"
36
#include "storage/segment/row_ranges.h"
37
#include "util/timezone_utils.h"
38
39
namespace doris {
40
#include "common/compile_check_begin.h"
41
class ParquetPredicate {
42
private:
43
51
    static inline bool _is_ascii(uint8_t byte) { return byte < 128; }
44
45
14
    static int _common_prefix(const std::string& encoding_min, const std::string& encoding_max) {
46
14
        size_t min_length = std::min(encoding_min.size(), encoding_max.size());
47
14
        int common_length = 0;
48
49
        while (common_length < min_length &&
49
49
               encoding_min[common_length] == encoding_max[common_length]) {
50
35
            common_length++;
51
35
        }
52
14
        return common_length;
53
14
    }
54
55
15
    static bool _try_read_old_utf8_stats(std::string& encoding_min, std::string& encoding_max) {
56
15
        if (encoding_min == encoding_max) {
57
            // If min = max, then there is a single value only
58
            // No need to modify, just use min
59
1
            encoding_max = encoding_min;
60
1
            return true;
61
14
        } else {
62
14
            int common_prefix_length = _common_prefix(encoding_min, encoding_max);
63
64
            // For min we can retain all-ASCII, because this produces a strictly lower value.
65
14
            int min_good_length = common_prefix_length;
66
28
            while (min_good_length < encoding_min.size() &&
67
28
                   _is_ascii(static_cast<uint8_t>(encoding_min[min_good_length]))) {
68
14
                min_good_length++;
69
14
            }
70
71
            // For max we can be sure only of the part matching the min. When they differ, we can consider only one next, and only if both are ASCII
72
14
            int max_good_length = common_prefix_length;
73
14
            if (max_good_length < encoding_max.size() && max_good_length < encoding_min.size() &&
74
14
                _is_ascii(static_cast<uint8_t>(encoding_min[max_good_length])) &&
75
14
                _is_ascii(static_cast<uint8_t>(encoding_max[max_good_length]))) {
76
7
                max_good_length++;
77
7
            }
78
            // Incrementing 127 would overflow. Incrementing within non-ASCII can have side-effects.
79
19
            while (max_good_length > 0 &&
80
19
                   (static_cast<uint8_t>(encoding_max[max_good_length - 1]) == 127 ||
81
15
                    !_is_ascii(static_cast<uint8_t>(encoding_max[max_good_length - 1])))) {
82
5
                max_good_length--;
83
5
            }
84
14
            if (max_good_length == 0) {
85
                // We can return just min bound, but code downstream likely expects both are present or both are absent.
86
4
                return false;
87
4
            }
88
89
10
            encoding_min.resize(min_good_length);
90
10
            encoding_max.resize(max_good_length);
91
10
            if (max_good_length > 0) {
92
10
                encoding_max[max_good_length - 1]++;
93
10
            }
94
10
            return true;
95
14
        }
96
15
    }
97
98
0
    static SortOrder _determine_sort_order(const tparquet::SchemaElement& parquet_schema) {
99
0
        tparquet::Type::type physical_type = parquet_schema.type;
100
0
        const tparquet::LogicalType& logical_type = parquet_schema.logicalType;
101
102
        // Assume string type is SortOrder::SIGNED, use ParquetPredicate::_try_read_old_utf8_stats() to handle it.
103
0
        if (logical_type.__isset.STRING &&
104
0
            (physical_type == tparquet::Type::BYTE_ARRAY ||
105
0
             physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY)) {
106
0
            return SortOrder::SIGNED;
107
0
        }
108
109
0
        if (logical_type.__isset.INTEGER) {
110
0
            if (logical_type.INTEGER.isSigned) {
111
0
                return SortOrder::SIGNED;
112
0
            } else {
113
0
                return SortOrder::UNSIGNED;
114
0
            }
115
0
        } else if (logical_type.__isset.DATE) {
116
0
            return SortOrder::SIGNED;
117
0
        } else if (logical_type.__isset.ENUM) {
118
0
            return SortOrder::UNSIGNED;
119
0
        } else if (logical_type.__isset.BSON) {
120
0
            return SortOrder::UNSIGNED;
121
0
        } else if (logical_type.__isset.JSON) {
122
0
            return SortOrder::UNSIGNED;
123
0
        } else if (logical_type.__isset.STRING) {
124
0
            return SortOrder::UNSIGNED;
125
0
        } else if (logical_type.__isset.DECIMAL) {
126
0
            return SortOrder::UNKNOWN;
127
0
        } else if (logical_type.__isset.MAP) {
128
0
            return SortOrder::UNKNOWN;
129
0
        } else if (logical_type.__isset.LIST) {
130
0
            return SortOrder::UNKNOWN;
131
0
        } else if (logical_type.__isset.TIME) {
132
0
            return SortOrder::SIGNED;
133
0
        } else if (logical_type.__isset.TIMESTAMP) {
134
0
            return SortOrder::SIGNED;
135
0
        } else if (logical_type.__isset.UNKNOWN) {
136
0
            return SortOrder::UNKNOWN;
137
0
        } else {
138
0
            switch (physical_type) {
139
0
            case tparquet::Type::BOOLEAN:
140
0
            case tparquet::Type::INT32:
141
0
            case tparquet::Type::INT64:
142
0
            case tparquet::Type::FLOAT:
143
0
            case tparquet::Type::DOUBLE:
144
0
                return SortOrder::SIGNED;
145
0
            case tparquet::Type::BYTE_ARRAY:
146
0
            case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
147
0
                return SortOrder::UNSIGNED;
148
0
            case tparquet::Type::INT96:
149
0
                return SortOrder::UNKNOWN;
150
0
            default:
151
0
                return SortOrder::UNKNOWN;
152
0
            }
153
0
        }
154
0
    }
155
156
public:
157
    static constexpr int BLOOM_FILTER_MAX_HEADER_LENGTH = 64;
158
    struct ColumnStat {
159
        std::string encoded_min_value;
160
        std::string encoded_max_value;
161
        bool has_null;
162
        bool is_all_null;
163
        const FieldSchema* col_schema;
164
        const cctz::time_zone* ctz;
165
        std::unique_ptr<ParquetBlockSplitBloomFilter> bloom_filter;
166
        std::function<bool(ParquetPredicate::ColumnStat*, const int)>* get_stat_func = nullptr;
167
        std::function<bool(ParquetPredicate::ColumnStat*, const int)>* get_bloom_filter_func =
168
                nullptr;
169
    };
170
171
0
    static bool bloom_filter_supported(PrimitiveType type) {
172
        // Only support types where physical type == logical type (no conversion needed)
173
        // For types like DATEV2, DATETIMEV2, DECIMAL, Parquet stores them in physical format
174
        // (INT32, INT64, etc.) but Doris uses different internal representations.
175
        // Bloom filter works with physical bytes, but we only have logical type values,
176
        // and there's no reverse conversion (logical -> physical) available.
177
        // TINYINT/SMALLINT also need conversion via LittleIntPhysicalConverter.
178
0
        switch (type) {
179
0
        case TYPE_BOOLEAN:
180
0
        case TYPE_INT:
181
0
        case TYPE_BIGINT:
182
0
        case TYPE_FLOAT:
183
0
        case TYPE_DOUBLE:
184
0
        case TYPE_CHAR:
185
0
        case TYPE_VARCHAR:
186
0
        case TYPE_STRING:
187
0
            return true;
188
0
        default:
189
0
            return false;
190
0
        }
191
0
    }
192
193
    struct PageIndexStat {
194
        // Indicates whether the page index information in this column can be used.
195
        bool available = false;
196
        int64_t num_of_pages;
197
        std::vector<std::string> encoded_min_value;
198
        std::vector<std::string> encoded_max_value;
199
        std::vector<bool> has_null;
200
        std::vector<bool> is_all_null;
201
        const FieldSchema* col_schema;
202
203
        // Record the row range corresponding to each page.
204
        std::vector<segment_v2::RowRange> ranges;
205
    };
206
207
    struct CachedPageIndexStat {
208
        const cctz::time_zone* ctz;
209
        std::map<int, PageIndexStat> stats;
210
        std::function<bool(PageIndexStat**, int)> get_stat_func;
211
        RowRange row_group_range;
212
    };
213
214
    // The encoded Parquet min-max value is parsed into `fields`;
215
    // Can be used in row groups and page index statistics.
216
    static Status parse_min_max_value(const FieldSchema* col_schema, const std::string& encoded_min,
217
                                      const std::string& encoded_max, const cctz::time_zone& ctz,
218
74
                                      Field* min_field, Field* max_field) {
219
74
        auto logical_data_type = remove_nullable(col_schema->data_type);
220
74
        auto converter = parquet::PhysicalToLogicalConverter::get_converter(
221
74
                col_schema, logical_data_type, logical_data_type, &ctz);
222
74
        ColumnPtr physical_column;
223
74
        switch (col_schema->parquet_schema.type) {
224
2
        case tparquet::Type::type::BOOLEAN: {
225
2
            auto physical_col = ColumnUInt8::create();
226
2
            physical_col->get_data().data();
227
2
            physical_col->resize(2);
228
2
            physical_col->get_data()[0] = *reinterpret_cast<const bool*>(encoded_min.data());
229
2
            physical_col->get_data()[1] = *reinterpret_cast<const bool*>(encoded_max.data());
230
2
            physical_column = std::move(physical_col);
231
2
            break;
232
0
        }
233
39
        case tparquet::Type::type::INT32: {
234
39
            auto physical_col = ColumnInt32::create();
235
39
            physical_col->resize(2);
236
237
39
            physical_col->get_data()[0] = *reinterpret_cast<const int32_t*>(encoded_min.data());
238
39
            physical_col->get_data()[1] = *reinterpret_cast<const int32_t*>(encoded_max.data());
239
240
39
            physical_column = std::move(physical_col);
241
39
            break;
242
0
        }
243
17
        case tparquet::Type::type::INT64: {
244
17
            auto physical_col = ColumnInt64::create();
245
17
            physical_col->resize(2);
246
17
            physical_col->get_data()[0] = *reinterpret_cast<const int64_t*>(encoded_min.data());
247
17
            physical_col->get_data()[1] = *reinterpret_cast<const int64_t*>(encoded_max.data());
248
17
            physical_column = std::move(physical_col);
249
17
            break;
250
0
        }
251
14
        case tparquet::Type::type::FLOAT: {
252
14
            auto physical_col = ColumnFloat32::create();
253
14
            physical_col->resize(2);
254
14
            physical_col->get_data()[0] = *reinterpret_cast<const float*>(encoded_min.data());
255
14
            physical_col->get_data()[1] = *reinterpret_cast<const float*>(encoded_max.data());
256
14
            physical_column = std::move(physical_col);
257
14
            break;
258
0
        }
259
0
        case tparquet::Type::type::DOUBLE: {
260
0
            auto physical_col = ColumnFloat64 ::create();
261
0
            physical_col->resize(2);
262
0
            physical_col->get_data()[0] = *reinterpret_cast<const double*>(encoded_min.data());
263
0
            physical_col->get_data()[1] = *reinterpret_cast<const double*>(encoded_max.data());
264
0
            physical_column = std::move(physical_col);
265
0
            break;
266
0
        }
267
0
        case tparquet::Type::type::BYTE_ARRAY: {
268
0
            auto physical_col = ColumnString::create();
269
0
            physical_col->insert_data(encoded_min.data(), encoded_min.size());
270
0
            physical_col->insert_data(encoded_max.data(), encoded_max.size());
271
0
            physical_column = std::move(physical_col);
272
0
            break;
273
0
        }
274
2
        case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: {
275
2
            auto physical_col = ColumnUInt8::create();
276
2
            physical_col->resize(2 * col_schema->parquet_schema.type_length);
277
2
            DCHECK(col_schema->parquet_schema.type_length == encoded_min.length());
278
2
            DCHECK(col_schema->parquet_schema.type_length == encoded_max.length());
279
280
2
            auto ptr = physical_col->get_data().data();
281
2
            memcpy(ptr, encoded_min.data(), encoded_min.length());
282
2
            memcpy(ptr + encoded_min.length(), encoded_max.data(), encoded_max.length());
283
2
            physical_column = std::move(physical_col);
284
2
            break;
285
0
        }
286
0
        case tparquet::Type::type::INT96: {
287
0
            auto physical_col = ColumnInt8::create();
288
0
            physical_col->resize(2 * sizeof(ParquetInt96));
289
0
            DCHECK(sizeof(ParquetInt96) == encoded_min.length());
290
0
            DCHECK(sizeof(ParquetInt96) == encoded_max.length());
291
292
0
            auto ptr = physical_col->get_data().data();
293
0
            memcpy(ptr, encoded_min.data(), encoded_min.length());
294
0
            memcpy(ptr + encoded_min.length(), encoded_max.data(), encoded_max.length());
295
0
            physical_column = std::move(physical_col);
296
0
            break;
297
0
        }
298
74
        }
299
300
74
        ColumnPtr logical_column;
301
74
        if (converter->is_consistent()) {
302
70
            logical_column = physical_column;
303
70
        } else {
304
4
            logical_column = logical_data_type->create_column();
305
4
            RETURN_IF_ERROR(converter->physical_convert(physical_column, logical_column));
306
4
        }
307
308
74
        DCHECK(logical_column->size() == 2);
309
74
        *min_field = logical_column->operator[](0);
310
74
        *max_field = logical_column->operator[](1);
311
312
74
        auto logical_prim_type = logical_data_type->get_primitive_type();
313
314
74
        if (logical_prim_type == TYPE_FLOAT) {
315
11
            auto& min_value = min_field->get<TYPE_FLOAT>();
316
11
            auto& max_value = max_field->get<TYPE_FLOAT>();
317
318
11
            if (std::isnan(min_value) || std::isnan(max_value)) {
319
1
                return Status::DataQualityError("Can not use this parquet min/max value.");
320
1
            }
321
            // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
322
10
            if (std::signbit(min_value) == 0 && min_value == 0.0F) {
323
0
                min_value = -0.0F;
324
0
            }
325
10
            if (std::signbit(max_value) != 0 && max_value == -0.0F) {
326
0
                max_value = 0.0F;
327
0
            }
328
63
        } else if (logical_prim_type == TYPE_DOUBLE) {
329
0
            auto& min_value = min_field->get<TYPE_DOUBLE>();
330
0
            auto& max_value = max_field->get<TYPE_DOUBLE>();
331
332
0
            if (std::isnan(min_value) || std::isnan(max_value)) {
333
0
                return Status::DataQualityError("Can not use this parquet min/max value.");
334
0
            }
335
            // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped
336
0
            if (std::signbit(min_value) == 0 && min_value == 0.0F) {
337
0
                min_value = -0.0F;
338
0
            }
339
0
            if (std::signbit(max_value) != 0 && max_value == -0.0F) {
340
0
                max_value = 0.0F;
341
0
            }
342
63
        } else if (col_schema->parquet_schema.type == tparquet::Type::type::INT96 ||
343
63
                   logical_prim_type == TYPE_DATETIMEV2) {
344
1
            auto min_value = min_field->get<TYPE_DATETIMEV2>();
345
1
            auto max_value = min_field->get<TYPE_DATETIMEV2>();
346
347
            // From Trino: Parquet INT96 timestamp values were compared incorrectly
348
            // for the purposes of producing statistics by older parquet writers,
349
            // so PARQUET-1065 deprecated them. The result is that any writer that produced stats
350
            // was producing unusable incorrect values, except the special case where min == max
351
            // and an incorrect ordering would not be material to the result.
352
            // PARQUET-1026 made binary stats available and valid in that special case.
353
1
            if (min_value != max_value) {
354
0
                return Status::DataQualityError("invalid min/max value");
355
0
            }
356
1
        }
357
358
73
        return Status::OK();
359
74
    }
360
361
    static Status read_column_stats(const FieldSchema* col_schema,
362
                                    const tparquet::ColumnMetaData& column_meta_data,
363
                                    std::unordered_map<tparquet::Type::type, bool>* ignored_stats,
364
32
                                    const std::string& file_created_by, ColumnStat* ans_stat) {
365
32
        auto& statistic = column_meta_data.statistics;
366
367
32
        if (!statistic.__isset.null_count) [[unlikely]] {
368
0
            return Status::DataQualityError("This parquet Column meta no set null_count.");
369
0
        }
370
32
        ans_stat->has_null = statistic.null_count > 0;
371
32
        ans_stat->is_all_null = statistic.null_count == column_meta_data.num_values;
372
32
        if (ans_stat->is_all_null) {
373
2
            return Status::OK();
374
2
        }
375
30
        auto prim_type = remove_nullable(col_schema->data_type)->get_primitive_type();
376
377
        // Min-max of statistic is plain-encoded value
378
30
        if (statistic.__isset.min_value && statistic.__isset.max_value) {
379
30
            ColumnOrderName column_order =
380
30
                    col_schema->physical_type == tparquet::Type::INT96 ||
381
30
                                    col_schema->parquet_schema.logicalType.__isset.UNKNOWN
382
30
                            ? ColumnOrderName::UNDEFINED
383
30
                            : ColumnOrderName::TYPE_DEFINED_ORDER;
384
30
            if ((statistic.min_value != statistic.max_value) &&
385
30
                (column_order != ColumnOrderName::TYPE_DEFINED_ORDER)) {
386
0
                return Status::DataQualityError("Can not use this parquet min/max value.");
387
0
            }
388
30
            ans_stat->encoded_min_value = statistic.min_value;
389
30
            ans_stat->encoded_max_value = statistic.max_value;
390
391
30
            if (prim_type == TYPE_VARCHAR || prim_type == TYPE_CHAR || prim_type == TYPE_STRING) {
392
2
                auto encoded_min_copy = ans_stat->encoded_min_value;
393
2
                auto encoded_max_copy = ans_stat->encoded_max_value;
394
2
                if (!_try_read_old_utf8_stats(encoded_min_copy, encoded_max_copy)) {
395
0
                    return Status::DataQualityError("Can not use this parquet min/max value.");
396
0
                }
397
2
                ans_stat->encoded_min_value = encoded_min_copy;
398
2
                ans_stat->encoded_max_value = encoded_max_copy;
399
2
            }
400
401
30
        } else if (statistic.__isset.min && statistic.__isset.max) {
402
0
            bool max_equals_min = statistic.min == statistic.max;
403
404
0
            SortOrder sort_order = _determine_sort_order(col_schema->parquet_schema);
405
0
            bool sort_orders_match = SortOrder::SIGNED == sort_order;
406
0
            if (!sort_orders_match && !max_equals_min) {
407
0
                return Status::NotSupported("Can not use this parquet min/max value.");
408
0
            }
409
410
0
            bool should_ignore_corrupted_stats = false;
411
0
            if (ignored_stats != nullptr) {
412
0
                if (ignored_stats->count(col_schema->physical_type) == 0) {
413
0
                    if (CorruptStatistics::should_ignore_statistics(file_created_by,
414
0
                                                                    col_schema->physical_type)) {
415
0
                        ignored_stats->emplace(col_schema->physical_type, true);
416
0
                        should_ignore_corrupted_stats = true;
417
0
                    } else {
418
0
                        ignored_stats->emplace(col_schema->physical_type, false);
419
0
                    }
420
0
                } else if (ignored_stats->at(col_schema->physical_type)) {
421
0
                    should_ignore_corrupted_stats = true;
422
0
                }
423
0
            } else if (CorruptStatistics::should_ignore_statistics(file_created_by,
424
0
                                                                   col_schema->physical_type)) {
425
0
                should_ignore_corrupted_stats = true;
426
0
            }
427
428
0
            if (should_ignore_corrupted_stats) {
429
0
                return Status::DataQualityError("Error statistics, should ignore.");
430
0
            }
431
432
0
            ans_stat->encoded_min_value = statistic.min;
433
0
            ans_stat->encoded_max_value = statistic.max;
434
0
        } else {
435
0
            return Status::DataQualityError("This parquet file not set min/max value");
436
0
        }
437
438
30
        return Status::OK();
439
30
    }
440
441
    static Status read_bloom_filter(const tparquet::ColumnMetaData& column_meta_data,
442
                                    io::FileReaderSPtr file_reader, io::IOContext* io_ctx,
443
0
                                    ColumnStat* ans_stat) {
444
0
        size_t size;
445
0
        if (!column_meta_data.__isset.bloom_filter_offset) {
446
0
            return Status::NotSupported("Can not use this parquet bloom filter.");
447
0
        }
448
449
0
        if (column_meta_data.__isset.bloom_filter_length &&
450
0
            column_meta_data.bloom_filter_length > 0) {
451
0
            size = column_meta_data.bloom_filter_length;
452
0
        } else {
453
0
            size = BLOOM_FILTER_MAX_HEADER_LENGTH;
454
0
        }
455
0
        size_t bytes_read = 0;
456
0
        std::vector<uint8_t> header_buffer(size);
457
0
        RETURN_IF_ERROR(file_reader->read_at(column_meta_data.bloom_filter_offset,
458
0
                                             Slice(header_buffer.data(), size), &bytes_read,
459
0
                                             io_ctx));
460
461
0
        tparquet::BloomFilterHeader t_bloom_filter_header;
462
0
        uint32_t t_bloom_filter_header_size = static_cast<uint32_t>(bytes_read);
463
0
        RETURN_IF_ERROR(deserialize_thrift_msg(header_buffer.data(), &t_bloom_filter_header_size,
464
0
                                               true, &t_bloom_filter_header));
465
466
        // TODO the bloom filter could be encrypted, too, so need to double check that this is NOT the case
467
0
        if (!t_bloom_filter_header.algorithm.__isset.BLOCK ||
468
0
            !t_bloom_filter_header.compression.__isset.UNCOMPRESSED ||
469
0
            !t_bloom_filter_header.hash.__isset.XXHASH) {
470
0
            return Status::NotSupported("Can not use this parquet bloom filter.");
471
0
        }
472
473
0
        ans_stat->bloom_filter = std::make_unique<ParquetBlockSplitBloomFilter>();
474
475
0
        std::vector<uint8_t> data_buffer(t_bloom_filter_header.numBytes);
476
0
        RETURN_IF_ERROR(file_reader->read_at(
477
0
                column_meta_data.bloom_filter_offset + t_bloom_filter_header_size,
478
0
                Slice(data_buffer.data(), t_bloom_filter_header.numBytes), &bytes_read, io_ctx));
479
480
0
        RETURN_IF_ERROR(ans_stat->bloom_filter->init(
481
0
                reinterpret_cast<const char*>(data_buffer.data()), t_bloom_filter_header.numBytes,
482
0
                segment_v2::HashStrategyPB::XX_HASH_64));
483
484
0
        return Status::OK();
485
0
    }
486
};
487
#include "common/compile_check_end.h"
488
489
} // namespace doris