Coverage Report

Created: 2026-07-02 23:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format_v2/parquet/parquet_statistics.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//   http://www.apache.org/licenses/LICENSE-2.0
9
// Unless required by applicable law or agreed to in writing,
10
// software distributed under the License is distributed on an
11
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
12
// KIND, either express or implied.  See the License for the
13
// specific language governing permissions and limitations
14
// under the License.
15
16
#include "format_v2/parquet/parquet_statistics.h"
17
18
#include <parquet/api/reader.h>
19
#include <parquet/bloom_filter.h>
20
#include <parquet/bloom_filter_reader.h>
21
#include <parquet/column_page.h>
22
#include <parquet/encoding.h>
23
#include <parquet/page_index.h>
24
#include <parquet/statistics.h>
25
#include <parquet/types.h>
26
27
#include <algorithm>
28
#include <cstddef>
29
#include <cstring>
30
#include <exception>
31
#include <map>
32
#include <memory>
33
#include <set>
34
#include <string>
35
#include <utility>
36
#include <vector>
37
38
#include "common/config.h"
39
#include "core/data_type/data_type.h"
40
#include "core/data_type/data_type_nullable.h"
41
#include "core/data_type_serde/data_type_serde.h"
42
#include "core/field.h"
43
#include "format_v2/parquet/parquet_column_schema.h"
44
#include "runtime/runtime_profile.h"
45
#include "storage/index/zone_map/zone_map_index.h"
46
#include "storage/predicate/accept_null_predicate.h"
47
#include "storage/predicate/column_predicate.h"
48
49
namespace doris::format::parquet {
50
51
namespace {
52
53
enum class ParquetRowGroupPruneReason {
54
    NONE,         // cannot prune; must read
55
    STATISTICS,   // excluded by min/max statistics
56
    DICTIONARY,   // excluded by dictionary
57
    BLOOM_FILTER, // excluded by bloom filter
58
};
59
60
59
PrimitiveType physical_filter_type(const ParquetColumnSchema& column_schema) {
61
59
    if (column_schema.type == nullptr) {
62
0
        return INVALID_TYPE;
63
0
    }
64
59
    switch (remove_nullable(column_schema.type)->get_primitive_type()) {
65
4
    case TYPE_BOOLEAN:
66
41
    case TYPE_INT:
67
41
    case TYPE_BIGINT:
68
41
    case TYPE_FLOAT:
69
41
    case TYPE_DOUBLE:
70
58
    case TYPE_STRING:
71
58
        return remove_nullable(column_schema.type)->get_primitive_type();
72
1
    default:
73
1
        return INVALID_TYPE;
74
59
    }
75
59
}
76
77
432
DecodedTimeUnit decoded_time_unit(ParquetTimeUnit time_unit) {
78
432
    switch (time_unit) {
79
0
    case ParquetTimeUnit::MILLIS:
80
0
        return DecodedTimeUnit::MILLIS;
81
2
    case ParquetTimeUnit::MICROS:
82
2
        return DecodedTimeUnit::MICROS;
83
0
    case ParquetTimeUnit::NANOS:
84
0
        return DecodedTimeUnit::NANOS;
85
430
    default:
86
430
        return DecodedTimeUnit::UNKNOWN;
87
432
    }
88
432
}
89
90
Status read_decoded_field(const ParquetColumnSchema& column_schema, DecodedColumnView view,
91
432
                          Field* field, const cctz::time_zone* timezone) {
92
432
    DORIS_CHECK(column_schema.type != nullptr);
93
432
    DORIS_CHECK(field != nullptr);
94
432
    constexpr uint8_t not_null = 0;
95
432
    view.row_count = 1;
96
432
    view.null_map = &not_null;
97
432
    view.time_unit = decoded_time_unit(column_schema.type_descriptor.time_unit);
98
432
    view.logical_integer_bit_width = column_schema.type_descriptor.integer_bit_width;
99
432
    view.logical_integer_is_signed = !column_schema.type_descriptor.is_unsigned_integer;
100
432
    view.decimal_precision = column_schema.type_descriptor.decimal_precision;
101
432
    view.decimal_scale = column_schema.type_descriptor.decimal_scale;
102
432
    view.fixed_length = column_schema.type_descriptor.fixed_length;
103
432
    view.timestamp_is_adjusted_to_utc = column_schema.type_descriptor.timestamp_is_adjusted_to_utc;
104
432
    view.timezone = timezone;
105
432
    return column_schema.type->get_serde()->read_field_from_decoded_value(*column_schema.type,
106
432
                                                                          field, view);
107
432
}
108
109
template <typename NativeType>
110
bool set_decoded_field(const ParquetColumnSchema& column_schema, DecodedValueKind value_kind,
111
426
                       const NativeType& value, Field* field, const cctz::time_zone* timezone) {
112
426
    DecodedColumnView view;
113
426
    view.value_kind = value_kind;
114
426
    view.values = reinterpret_cast<const uint8_t*>(&value);
115
426
    return read_decoded_field(column_schema, view, field, timezone).ok();
116
426
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIbEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIiEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE
Line
Count
Source
111
424
                       const NativeType& value, Field* field, const cctz::time_zone* timezone) {
112
424
    DecodedColumnView view;
113
424
    view.value_kind = value_kind;
114
424
    view.values = reinterpret_cast<const uint8_t*>(&value);
115
424
    return read_decoded_field(column_schema, view, field, timezone).ok();
116
424
}
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIlEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE
Line
Count
Source
111
2
                       const NativeType& value, Field* field, const cctz::time_zone* timezone) {
112
2
    DecodedColumnView view;
113
2
    view.value_kind = value_kind;
114
2
    view.values = reinterpret_cast<const uint8_t*>(&value);
115
2
    return read_decoded_field(column_schema, view, field, timezone).ok();
116
2
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIfEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIdEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE
117
118
template <typename ParquetDType>
119
bool set_decoded_min_max(const std::shared_ptr<::parquet::Statistics>& statistics,
120
                         const ParquetColumnSchema& column_schema, DecodedValueKind value_kind,
121
                         ParquetColumnStatistics* column_statistics,
122
69
                         const cctz::time_zone* timezone) {
123
69
    auto typed_statistics =
124
69
            std::static_pointer_cast<::parquet::TypedStatistics<ParquetDType>>(statistics);
125
69
    if (!set_decoded_field(column_schema, value_kind, typed_statistics->min(),
126
69
                           &column_statistics->min_value, timezone) ||
127
69
        !set_decoded_field(column_schema, value_kind, typed_statistics->max(),
128
69
                           &column_statistics->max_value, timezone)) {
129
0
        return false;
130
0
    }
131
69
    return true;
132
69
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE0EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE1EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
Line
Count
Source
122
68
                         const cctz::time_zone* timezone) {
123
68
    auto typed_statistics =
124
68
            std::static_pointer_cast<::parquet::TypedStatistics<ParquetDType>>(statistics);
125
68
    if (!set_decoded_field(column_schema, value_kind, typed_statistics->min(),
126
68
                           &column_statistics->min_value, timezone) ||
127
68
        !set_decoded_field(column_schema, value_kind, typed_statistics->max(),
128
68
                           &column_statistics->max_value, timezone)) {
129
0
        return false;
130
0
    }
131
68
    return true;
132
68
}
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE2EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
Line
Count
Source
122
1
                         const cctz::time_zone* timezone) {
123
1
    auto typed_statistics =
124
1
            std::static_pointer_cast<::parquet::TypedStatistics<ParquetDType>>(statistics);
125
1
    if (!set_decoded_field(column_schema, value_kind, typed_statistics->min(),
126
1
                           &column_statistics->min_value, timezone) ||
127
1
        !set_decoded_field(column_schema, value_kind, typed_statistics->max(),
128
1
                           &column_statistics->max_value, timezone)) {
129
0
        return false;
130
0
    }
131
1
    return true;
132
1
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE4EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE5EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
133
134
bool set_decoded_binary_field(const ParquetColumnSchema& column_schema, DecodedValueKind value_kind,
135
                              const StringRef& value, Field* field,
136
6
                              const cctz::time_zone* timezone) {
137
6
    std::vector<StringRef> binary_values {value};
138
6
    DecodedColumnView view;
139
6
    view.value_kind = value_kind;
140
6
    view.binary_values = &binary_values;
141
6
    return read_decoded_field(column_schema, view, field, timezone).ok();
142
6
}
143
144
bool set_string_min_max(const std::shared_ptr<::parquet::Statistics>& statistics,
145
                        const ParquetColumnSchema& column_schema,
146
                        ParquetColumnStatistics* column_statistics,
147
3
                        const cctz::time_zone* timezone) {
148
3
    switch (statistics->physical_type()) {
149
3
    case ::parquet::Type::BYTE_ARRAY: {
150
3
        auto typed_statistics =
151
3
                std::static_pointer_cast<::parquet::TypedStatistics<::parquet::ByteArrayType>>(
152
3
                        statistics);
153
3
        const auto min = ::parquet::ByteArrayToString(typed_statistics->min());
154
3
        const auto max = ::parquet::ByteArrayToString(typed_statistics->max());
155
3
        if (!set_decoded_binary_field(column_schema, DecodedValueKind::BINARY,
156
3
                                      StringRef(min.data(), min.size()),
157
3
                                      &column_statistics->min_value, timezone) ||
158
3
            !set_decoded_binary_field(column_schema, DecodedValueKind::BINARY,
159
3
                                      StringRef(max.data(), max.size()),
160
3
                                      &column_statistics->max_value, timezone)) {
161
0
            return false;
162
0
        }
163
3
        return true;
164
3
    }
165
0
    case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
166
0
        if (column_schema.descriptor == nullptr || column_schema.descriptor->type_length() <= 0) {
167
0
            return false;
168
0
        }
169
0
        auto typed_statistics =
170
0
                std::static_pointer_cast<::parquet::TypedStatistics<::parquet::FLBAType>>(
171
0
                        statistics);
172
0
        const int type_length = column_schema.descriptor->type_length();
173
0
        const std::string min(reinterpret_cast<const char*>(typed_statistics->min().ptr),
174
0
                              type_length);
175
0
        const std::string max(reinterpret_cast<const char*>(typed_statistics->max().ptr),
176
0
                              type_length);
177
0
        if (!set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY,
178
0
                                      StringRef(min.data(), min.size()),
179
0
                                      &column_statistics->min_value, timezone) ||
180
0
            !set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY,
181
0
                                      StringRef(max.data(), max.size()),
182
0
                                      &column_statistics->max_value, timezone)) {
183
0
            return false;
184
0
        }
185
0
        return true;
186
0
    }
187
0
    default:
188
0
        return false;
189
3
    }
190
3
}
191
192
244
bool is_null_only_predicate(const ColumnPredicate& predicate) {
193
244
    return predicate.type() == PredicateType::IS_NULL ||
194
244
           predicate.type() == PredicateType::IS_NOT_NULL;
195
244
}
196
197
31
bool is_supported_dictionary_predicate(const ColumnPredicate& predicate) {
198
31
    switch (predicate.type()) {
199
21
    case PredicateType::EQ:
200
31
    case PredicateType::IN_LIST:
201
31
        return true;
202
0
    default:
203
0
        return false;
204
31
    }
205
31
}
206
207
48
bool is_bloom_filter_prunable_predicate(const ColumnPredicate& predicate) {
208
48
    if (dynamic_cast<const AcceptNullPredicate*>(&predicate) != nullptr ||
209
48
        is_null_only_predicate(predicate)) {
210
4
        return false;
211
4
    }
212
44
    return predicate.can_do_bloom_filter(false);
213
48
}
214
215
template <typename T>
216
8
T load_predicate_value(const char* data) {
217
8
    T value;
218
8
    memcpy(&value, data, sizeof(T));
219
8
    return value;
220
8
}
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIbEET_PKc
Line
Count
Source
216
2
T load_predicate_value(const char* data) {
217
2
    T value;
218
2
    memcpy(&value, data, sizeof(T));
219
2
    return value;
220
2
}
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIiEET_PKc
Line
Count
Source
216
6
T load_predicate_value(const char* data) {
217
6
    T value;
218
6
    memcpy(&value, data, sizeof(T));
219
6
    return value;
220
6
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIaEET_PKc
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIsEET_PKc
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIlEET_PKc
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIfEET_PKc
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIdEET_PKc
221
222
class ArrowParquetBloomFilterAdapter final : public segment_v2::BloomFilter {
223
public:
224
    ArrowParquetBloomFilterAdapter(const ParquetColumnSchema& column_schema,
225
                                   const ::parquet::BloomFilter& bloom_filter)
226
9
            : _column_schema(column_schema), _bloom_filter(bloom_filter) {}
227
228
0
    void add_bytes(const char* buf, size_t size) override { DORIS_CHECK(false); }
229
230
10
    bool test_bytes(const char* buf, size_t size) const override {
231
10
        if (buf == nullptr) {
232
0
            return true;
233
0
        }
234
10
        switch (physical_filter_type(_column_schema)) {
235
2
        case TYPE_BOOLEAN:
236
2
            return test_boolean(buf, size);
237
6
        case TYPE_INT:
238
6
            return test_int32(buf, size);
239
0
        case TYPE_BIGINT:
240
0
            return test_int64(buf, size);
241
0
        case TYPE_FLOAT:
242
0
            return test_float(buf, size);
243
0
        case TYPE_DOUBLE:
244
0
            return test_double(buf, size);
245
2
        case TYPE_STRING:
246
2
            return test_string(buf, size);
247
0
        default:
248
0
            return true;
249
10
        }
250
10
    }
251
252
0
    void set_has_null(bool has_null) override { DORIS_CHECK(!has_null); }
253
0
    bool has_null() const override { return false; }
254
0
    void add_hash(uint64_t hash) override { DORIS_CHECK(false); }
255
0
    bool test_hash(uint64_t hash) const override { return _bloom_filter.FindHash(hash); }
256
257
private:
258
2
    bool test_boolean(const char* buf, size_t size) const {
259
2
        if (size == sizeof(bool)) {
260
2
            const int32_t value = load_predicate_value<bool>(buf) ? 1 : 0;
261
2
            return _bloom_filter.FindHash(_bloom_filter.Hash(value));
262
2
        }
263
0
        if (size == sizeof(int32_t)) {
264
0
            const int32_t value = load_predicate_value<int32_t>(buf);
265
0
            return _bloom_filter.FindHash(_bloom_filter.Hash(value != 0 ? 1 : 0));
266
0
        }
267
0
        return true;
268
0
    }
269
270
6
    bool test_int32(const char* buf, size_t size) const {
271
6
        if (size == sizeof(int8_t)) {
272
0
            return find_int32(static_cast<int32_t>(load_predicate_value<int8_t>(buf)));
273
0
        }
274
6
        if (size == sizeof(int16_t)) {
275
0
            return find_int32(static_cast<int32_t>(load_predicate_value<int16_t>(buf)));
276
0
        }
277
6
        if (size == sizeof(int32_t)) {
278
6
            return find_int32(load_predicate_value<int32_t>(buf));
279
6
        }
280
0
        return true;
281
6
    }
282
283
0
    bool test_int64(const char* buf, size_t size) const {
284
0
        if (size != sizeof(int64_t)) {
285
0
            return true;
286
0
        }
287
0
        const int64_t value = load_predicate_value<int64_t>(buf);
288
0
        return _bloom_filter.FindHash(_bloom_filter.Hash(value));
289
0
    }
290
291
0
    bool test_float(const char* buf, size_t size) const {
292
0
        if (size != sizeof(float)) {
293
0
            return true;
294
0
        }
295
0
        const float value = load_predicate_value<float>(buf);
296
0
        return _bloom_filter.FindHash(_bloom_filter.Hash(value));
297
0
    }
298
299
0
    bool test_double(const char* buf, size_t size) const {
300
0
        if (size != sizeof(double)) {
301
0
            return true;
302
0
        }
303
0
        const double value = load_predicate_value<double>(buf);
304
0
        return _bloom_filter.FindHash(_bloom_filter.Hash(value));
305
0
    }
306
307
2
    bool test_string(const char* buf, size_t size) const {
308
2
        ::parquet::ByteArray value(static_cast<uint32_t>(size),
309
2
                                   reinterpret_cast<const uint8_t*>(buf));
310
2
        return _bloom_filter.FindHash(_bloom_filter.Hash(&value));
311
2
    }
312
313
6
    bool find_int32(int32_t value) const {
314
6
        return _bloom_filter.FindHash(_bloom_filter.Hash(value));
315
6
    }
316
317
    const ParquetColumnSchema& _column_schema;
318
    const ::parquet::BloomFilter& _bloom_filter;
319
};
320
321
const ParquetColumnSchema* resolve_predicate_leaf_schema(
322
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
323
        const format::FileColumnPredicateFilter& column_filter);
324
325
49
bool bloom_filter_supported(const ParquetColumnSchema& column_schema) {
326
49
    switch (physical_filter_type(column_schema)) {
327
2
    case TYPE_BOOLEAN:
328
33
    case TYPE_INT:
329
33
    case TYPE_BIGINT:
330
33
    case TYPE_FLOAT:
331
33
    case TYPE_DOUBLE:
332
48
    case TYPE_STRING:
333
48
        return true;
334
1
    default:
335
1
        return false;
336
49
    }
337
49
}
338
339
bool bloom_filter_excludes(const ParquetColumnSchema& column_schema,
340
                           const format::FileColumnPredicateFilter& column_filter,
341
10
                           const ::parquet::BloomFilter& bloom_filter) {
342
10
    if (!bloom_filter_supported(column_schema)) {
343
1
        return false;
344
1
    }
345
9
    ArrowParquetBloomFilterAdapter adapter(column_schema, bloom_filter);
346
9
    for (const auto& column_predicate : column_filter.predicates) {
347
9
        if (column_predicate == nullptr || !is_bloom_filter_prunable_predicate(*column_predicate)) {
348
1
            return false;
349
1
        }
350
8
        if (!column_predicate->evaluate_and(&adapter)) {
351
4
            return true;
352
4
        }
353
8
    }
354
4
    return false;
355
9
}
356
357
struct RowGroupBloomFilterCache {
358
    ::parquet::BloomFilterReader* bloom_filter_reader = nullptr;
359
    std::map<int, std::unique_ptr<::parquet::BloomFilter>> column_bloom_filters;
360
    std::set<int> loaded_columns;
361
362
    ::parquet::BloomFilter* get(int row_group_idx, int leaf_column_id,
363
13
                                ParquetPruningStats* pruning_stats) {
364
13
        if (bloom_filter_reader == nullptr || leaf_column_id < 0) {
365
5
            return nullptr;
366
5
        }
367
8
        if (loaded_columns.find(leaf_column_id) == loaded_columns.end()) {
368
8
            loaded_columns.insert(leaf_column_id);
369
8
            try {
370
8
                std::shared_ptr<::parquet::RowGroupBloomFilterReader> row_group_reader;
371
8
                if (pruning_stats != nullptr) {
372
8
                    SCOPED_RAW_TIMER(&pruning_stats->bloom_filter_read_time);
373
8
                    row_group_reader = bloom_filter_reader->RowGroup(row_group_idx);
374
8
                    if (row_group_reader != nullptr) {
375
8
                        column_bloom_filters[leaf_column_id] =
376
8
                                row_group_reader->GetColumnBloomFilter(leaf_column_id);
377
8
                    }
378
8
                } else {
379
0
                    row_group_reader = bloom_filter_reader->RowGroup(row_group_idx);
380
0
                    if (row_group_reader != nullptr) {
381
0
                        column_bloom_filters[leaf_column_id] =
382
0
                                row_group_reader->GetColumnBloomFilter(leaf_column_id);
383
0
                    }
384
0
                }
385
8
            } catch (const ::parquet::ParquetException&) {
386
0
                return nullptr;
387
0
            } catch (const std::exception&) {
388
0
                return nullptr;
389
0
            }
390
8
        }
391
8
        auto it = column_bloom_filters.find(leaf_column_id);
392
8
        return it == column_bloom_filters.end() ? nullptr : it->second.get();
393
8
    }
394
};
395
396
ParquetRowGroupPruneReason bloom_filter_prune_reason(
397
        int row_group_idx, const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
398
        const format::FileColumnPredicateFilter& column_filter,
399
39
        RowGroupBloomFilterCache* bloom_filter_cache, ParquetPruningStats* pruning_stats) {
400
39
    if (bloom_filter_cache == nullptr || column_filter.predicates.empty()) {
401
0
        return ParquetRowGroupPruneReason::NONE;
402
0
    }
403
39
    const auto* column_schema = resolve_predicate_leaf_schema(schema, column_filter);
404
39
    if (column_schema == nullptr || !bloom_filter_supported(*column_schema)) {
405
0
        return ParquetRowGroupPruneReason::NONE;
406
0
    }
407
39
    for (const auto& column_predicate : column_filter.predicates) {
408
39
        if (column_predicate == nullptr || !is_bloom_filter_prunable_predicate(*column_predicate)) {
409
26
            return ParquetRowGroupPruneReason::NONE;
410
26
        }
411
39
    }
412
13
    auto* bloom_filter =
413
13
            bloom_filter_cache->get(row_group_idx, column_schema->leaf_column_id, pruning_stats);
414
13
    if (bloom_filter == nullptr) {
415
13
        return ParquetRowGroupPruneReason::NONE;
416
13
    }
417
0
    return bloom_filter_excludes(*column_schema, column_filter, *bloom_filter)
418
0
                   ? ParquetRowGroupPruneReason::BLOOM_FILTER
419
0
                   : ParquetRowGroupPruneReason::NONE;
420
13
}
421
422
27
bool is_dictionary_data_encoding(::parquet::Encoding::type encoding) {
423
27
    return encoding == ::parquet::Encoding::PLAIN_DICTIONARY ||
424
27
           encoding == ::parquet::Encoding::RLE_DICTIONARY;
425
27
}
426
427
0
bool is_level_encoding(::parquet::Encoding::type encoding) {
428
0
    return encoding == ::parquet::Encoding::RLE || encoding == ::parquet::Encoding::BIT_PACKED;
429
0
}
430
431
54
bool is_data_page_type(::parquet::PageType::type page_type) {
432
54
    return page_type == ::parquet::PageType::DATA_PAGE ||
433
54
           page_type == ::parquet::PageType::DATA_PAGE_V2;
434
54
}
435
436
31
bool is_dictionary_encoded_chunk(const ::parquet::ColumnChunkMetaData& column_metadata) {
437
31
    if (!column_metadata.has_dictionary_page()) {
438
4
        return false;
439
4
    }
440
441
27
    const auto& encoding_stats = column_metadata.encoding_stats();
442
27
    if (!encoding_stats.empty()) {
443
27
        bool has_dictionary_data_page = false;
444
54
        for (const auto& encoding_stat : encoding_stats) {
445
54
            if (!is_data_page_type(encoding_stat.page_type) || encoding_stat.count <= 0) {
446
27
                continue;
447
27
            }
448
27
            if (!is_dictionary_data_encoding(encoding_stat.encoding)) {
449
0
                return false;
450
0
            }
451
27
            has_dictionary_data_page = true;
452
27
        }
453
27
        return has_dictionary_data_page;
454
27
    }
455
456
0
    bool has_dictionary_encoding = false;
457
0
    for (const auto encoding : column_metadata.encodings()) {
458
0
        if (is_dictionary_data_encoding(encoding)) {
459
0
            has_dictionary_encoding = true;
460
0
            continue;
461
0
        }
462
0
        if (!is_level_encoding(encoding)) {
463
0
            return false;
464
0
        }
465
0
    }
466
0
    return has_dictionary_encoding;
467
0
}
468
469
bool supports_dictionary_pruning(const ParquetColumnSchema& column_schema,
470
                                 const ::parquet::ColumnChunkMetaData& column_metadata,
471
57
                                 const format::FileColumnPredicateFilter& column_filter) {
472
57
    if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
473
57
        column_schema.descriptor == nullptr || column_schema.type == nullptr) {
474
0
        return false;
475
0
    }
476
57
    if (!column_schema.type_descriptor.is_string_like) {
477
26
        return false;
478
26
    }
479
31
    if (column_metadata.type() != ::parquet::Type::BYTE_ARRAY &&
480
31
        column_metadata.type() != ::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
481
0
        return false;
482
0
    }
483
31
    for (const auto& column_predicate : column_filter.predicates) {
484
31
        if (column_predicate == nullptr || !is_supported_dictionary_predicate(*column_predicate)) {
485
0
            return false;
486
0
        }
487
31
    }
488
31
    return true;
489
31
}
490
491
struct OwnedDictionaryWords {
492
    std::vector<std::string> values;
493
    std::vector<StringRef> refs;
494
495
27
    void clear() {
496
27
        values.clear();
497
27
        refs.clear();
498
27
    }
499
500
27
    void build_refs() {
501
27
        refs.reserve(values.size());
502
35
        for (const auto& value : values) {
503
35
            refs.emplace_back(value.data(), value.size());
504
35
        }
505
27
    }
506
};
507
508
bool read_dictionary_words(::parquet::ParquetFileReader* file_reader, int row_group_idx,
509
                           int leaf_column_id, const ParquetColumnSchema& column_schema,
510
27
                           OwnedDictionaryWords* dict_words) {
511
27
    DORIS_CHECK(dict_words != nullptr);
512
27
    dict_words->clear();
513
27
    if (file_reader == nullptr || leaf_column_id < 0) {
514
0
        return false;
515
0
    }
516
517
27
    auto row_group_reader = file_reader->RowGroup(row_group_idx);
518
27
    if (row_group_reader == nullptr) {
519
0
        return false;
520
0
    }
521
27
    auto page_reader = row_group_reader->GetColumnPageReader(leaf_column_id);
522
27
    if (page_reader == nullptr) {
523
0
        return false;
524
0
    }
525
526
27
    std::shared_ptr<::parquet::Page> page;
527
27
    try {
528
27
        page = page_reader->NextPage();
529
27
    } catch (const ::parquet::ParquetException&) {
530
0
        return false;
531
0
    } catch (const std::exception&) {
532
0
        return false;
533
0
    }
534
27
    if (page == nullptr || page->type() != ::parquet::PageType::DICTIONARY_PAGE) {
535
0
        return false;
536
0
    }
537
27
    const auto* dictionary_page = static_cast<const ::parquet::DictionaryPage*>(page.get());
538
27
    if (dictionary_page->encoding() != ::parquet::Encoding::PLAIN &&
539
27
        dictionary_page->encoding() != ::parquet::Encoding::PLAIN_DICTIONARY) {
540
0
        return false;
541
0
    }
542
27
    const int32_t dictionary_length = dictionary_page->num_values();
543
27
    if (dictionary_length <= 0) {
544
0
        return false;
545
0
    }
546
27
    const auto* dictionary_data = dictionary_page->data();
547
27
    const int dictionary_size = dictionary_page->size();
548
549
27
    dict_words->values.reserve(static_cast<size_t>(dictionary_length));
550
27
    if (column_schema.descriptor->physical_type() == ::parquet::Type::BYTE_ARRAY) {
551
27
        auto decoder = ::parquet::MakeTypedDecoder<::parquet::ByteArrayType>(
552
27
                ::parquet::Encoding::PLAIN, column_schema.descriptor);
553
27
        decoder->SetData(dictionary_length, dictionary_data, dictionary_size);
554
27
        std::vector<::parquet::ByteArray> byte_array_values(static_cast<size_t>(dictionary_length));
555
27
        if (decoder->Decode(byte_array_values.data(), dictionary_length) != dictionary_length) {
556
0
            return false;
557
0
        }
558
62
        for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) {
559
35
            dict_words->values.emplace_back(
560
35
                    reinterpret_cast<const char*>(byte_array_values[dict_idx].ptr),
561
35
                    byte_array_values[dict_idx].len);
562
35
        }
563
27
        dict_words->build_refs();
564
27
        return true;
565
27
    }
566
0
    if (column_schema.descriptor->physical_type() == ::parquet::Type::FIXED_LEN_BYTE_ARRAY) {
567
0
        const int type_length = column_schema.descriptor->type_length();
568
0
        if (type_length <= 0) {
569
0
            return false;
570
0
        }
571
0
        auto decoder = ::parquet::MakeTypedDecoder<::parquet::FLBAType>(::parquet::Encoding::PLAIN,
572
0
                                                                        column_schema.descriptor);
573
0
        decoder->SetData(dictionary_length, dictionary_data, dictionary_size);
574
0
        std::vector<::parquet::FixedLenByteArray> flba_values(
575
0
                static_cast<size_t>(dictionary_length));
576
0
        if (decoder->Decode(flba_values.data(), dictionary_length) != dictionary_length) {
577
0
            return false;
578
0
        }
579
0
        for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) {
580
0
            dict_words->values.emplace_back(
581
0
                    reinterpret_cast<const char*>(flba_values[dict_idx].ptr), type_length);
582
0
        }
583
0
        dict_words->build_refs();
584
0
        return true;
585
0
    }
586
0
    return false;
587
0
}
588
589
197
segment_v2::ZoneMap to_column_predicate_statistics(const ParquetColumnStatistics& statistics) {
590
197
    segment_v2::ZoneMap predicate_statistics;
591
197
    predicate_statistics.min_value = statistics.min_value;
592
197
    predicate_statistics.max_value = statistics.max_value;
593
197
    predicate_statistics.has_null = statistics.has_null;
594
197
    predicate_statistics.has_not_null = statistics.has_not_null;
595
197
    return predicate_statistics;
596
197
}
597
598
const ParquetColumnSchema* resolve_predicate_leaf_schema(
599
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
600
150
        const format::FileColumnPredicateFilter& column_filter) {
601
150
    const auto file_column_id = column_filter.file_column_id;
602
150
    if (!file_column_id.is_valid() || file_column_id.value() >= static_cast<int>(schema.size())) {
603
0
        return nullptr;
604
0
    }
605
150
    const ParquetColumnSchema* column_schema = schema[file_column_id.value()].get();
606
150
    if (column_schema == nullptr) {
607
0
        return nullptr;
608
0
    }
609
150
    if (column_schema->kind != ParquetColumnSchemaKind::PRIMITIVE ||
610
150
        column_schema->leaf_column_id < 0 || column_schema->max_repetition_level > 0) {
611
11
        return nullptr;
612
11
    }
613
139
    return column_schema;
614
150
}
615
616
bool check_statistics(const format::FileColumnPredicateFilter& column_filter,
617
228
                      const ParquetColumnStatistics& statistics) {
618
228
    if (!statistics.has_any_statistics()) {
619
31
        return false;
620
31
    }
621
622
197
    for (const auto& column_predicate : column_filter.predicates) {
623
197
        if (is_null_only_predicate(*column_predicate)) {
624
6
            if (!statistics.has_null_count) {
625
0
                continue;
626
0
            }
627
191
        } else if (!statistics.has_any_statistics()) {
628
0
            continue;
629
0
        }
630
197
        if (!column_predicate->evaluate_and(to_column_predicate_statistics(statistics))) {
631
87
            return true;
632
87
        }
633
197
    }
634
110
    return false;
635
197
}
636
637
} // namespace
638
639
ParquetColumnStatistics ParquetStatisticsUtils::TransformColumnStatistics(
640
        const ParquetColumnSchema& column_schema,
641
109
        const std::shared_ptr<::parquet::Statistics>& statistics, const cctz::time_zone* timezone) {
642
109
    ParquetColumnStatistics result;
643
109
    if (statistics == nullptr) {
644
33
        return result;
645
33
    }
646
647
76
    result.has_null = statistics->HasNullCount() && statistics->null_count() > 0;
648
76
    result.has_not_null = statistics->num_values() > 0 || statistics->HasMinMax();
649
76
    result.has_null_count = statistics->HasNullCount();
650
76
    if (!result.has_not_null || !statistics->HasMinMax()) {
651
4
        return result;
652
4
    }
653
654
72
    DORIS_CHECK(column_schema.type != nullptr);
655
72
    switch (statistics->physical_type()) {
656
0
    case ::parquet::Type::BOOLEAN:
657
0
        result.has_min_max = set_decoded_min_max<::parquet::BooleanType>(
658
0
                statistics, column_schema, DecodedValueKind::BOOL, &result, timezone);
659
0
        return result;
660
68
    case ::parquet::Type::INT32:
661
68
        result.has_min_max = set_decoded_min_max<::parquet::Int32Type>(
662
68
                statistics, column_schema, decoded_value_kind(column_schema.type_descriptor),
663
68
                &result, timezone);
664
68
        return result;
665
1
    case ::parquet::Type::INT64:
666
1
        result.has_min_max = set_decoded_min_max<::parquet::Int64Type>(
667
1
                statistics, column_schema, decoded_value_kind(column_schema.type_descriptor),
668
1
                &result, timezone);
669
1
        return result;
670
0
    case ::parquet::Type::FLOAT:
671
0
        result.has_min_max = set_decoded_min_max<::parquet::FloatType>(
672
0
                statistics, column_schema, DecodedValueKind::FLOAT, &result, timezone);
673
0
        return result;
674
0
    case ::parquet::Type::DOUBLE:
675
0
        result.has_min_max = set_decoded_min_max<::parquet::DoubleType>(
676
0
                statistics, column_schema, DecodedValueKind::DOUBLE, &result, timezone);
677
0
        return result;
678
3
    case ::parquet::Type::BYTE_ARRAY:
679
3
    case ::parquet::Type::FIXED_LEN_BYTE_ARRAY:
680
3
        result.has_min_max = set_string_min_max(statistics, column_schema, &result, timezone);
681
3
        return result;
682
0
    default:
683
0
        return result;
684
72
    }
685
72
}
686
687
namespace {
688
689
ParquetRowGroupPruneReason row_group_prune_reason(
690
        const ::parquet::RowGroupMetaData& row_group, ::parquet::ParquetFileReader* file_reader,
691
        int row_group_idx, const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema,
692
        const format::FileColumnPredicateFilter& column_filter,
693
        RowGroupBloomFilterCache* bloom_filter_cache, ParquetPruningStats* pruning_stats,
694
93
        const cctz::time_zone* timezone) {
695
93
    if (column_filter.predicates.empty()) {
696
0
        return ParquetRowGroupPruneReason::NONE;
697
0
    }
698
93
    const auto* column_schema = resolve_predicate_leaf_schema(schema, column_filter);
699
93
    if (column_schema == nullptr) {
700
9
        return ParquetRowGroupPruneReason::NONE;
701
9
    }
702
93
    DCHECK_LT(column_schema->leaf_column_id, row_group.num_columns());
703
84
    auto column_chunk = row_group.ColumnChunk(column_schema->leaf_column_id);
704
84
    if (column_chunk == nullptr) {
705
0
        return ParquetRowGroupPruneReason::NONE;
706
0
    }
707
84
    if (check_statistics(column_filter,
708
84
                         ParquetStatisticsUtils::TransformColumnStatistics(
709
84
                                 *column_schema, column_chunk->statistics(), timezone))) {
710
27
        return ParquetRowGroupPruneReason::STATISTICS;
711
27
    }
712
57
    if (!supports_dictionary_pruning(*column_schema, *column_chunk, column_filter) ||
713
57
        !is_dictionary_encoded_chunk(*column_chunk)) {
714
30
        return bloom_filter_prune_reason(row_group_idx, schema, column_filter, bloom_filter_cache,
715
30
                                         pruning_stats);
716
30
    }
717
27
    OwnedDictionaryWords dict_words;
718
27
    if (!read_dictionary_words(file_reader, row_group_idx, column_schema->leaf_column_id,
719
27
                               *column_schema, &dict_words)) {
720
0
        return bloom_filter_prune_reason(row_group_idx, schema, column_filter, bloom_filter_cache,
721
0
                                         pruning_stats);
722
0
    }
723
27
    for (const auto& column_predicate : column_filter.predicates) {
724
27
        if (!column_predicate->evaluate_and(dict_words.refs.data(), dict_words.refs.size())) {
725
18
            return ParquetRowGroupPruneReason::DICTIONARY;
726
18
        }
727
27
    }
728
9
    return bloom_filter_prune_reason(row_group_idx, schema, column_filter, bloom_filter_cache,
729
9
                                     pruning_stats);
730
27
}
731
732
void init_bloom_filter_cache(::parquet::ParquetFileReader* file_reader, bool enable_bloom_filter,
733
198
                             RowGroupBloomFilterCache* bloom_filter_cache) {
734
198
    DORIS_CHECK(bloom_filter_cache != nullptr);
735
198
    if (!enable_bloom_filter || file_reader == nullptr) {
736
42
        return;
737
42
    }
738
156
    try {
739
156
        bloom_filter_cache->bloom_filter_reader = &file_reader->GetBloomFilterReader();
740
156
    } catch (const ::parquet::ParquetException&) {
741
0
        bloom_filter_cache->bloom_filter_reader = nullptr;
742
0
    } catch (const std::exception&) {
743
0
        bloom_filter_cache->bloom_filter_reader = nullptr;
744
0
    }
745
156
}
746
747
Status select_row_groups(const ::parquet::FileMetaData& metadata,
748
                         ::parquet::ParquetFileReader* file_reader,
749
                         const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
750
                         const format::FileScanRequest& request,
751
                         const std::vector<int>* candidate_row_groups,
752
                         std::vector<int>* selected_row_groups, bool enable_bloom_filter,
753
124
                         ParquetPruningStats* pruning_stats, const cctz::time_zone* timezone) {
754
124
    int64_t row_group_filter_time_sink = 0;
755
124
    SCOPED_RAW_TIMER(pruning_stats == nullptr ? &row_group_filter_time_sink
756
124
                                              : &pruning_stats->row_group_filter_time);
757
124
    if (selected_row_groups == nullptr) {
758
0
        return Status::InvalidArgument("selected_row_groups is null");
759
0
    }
760
124
    selected_row_groups->clear();
761
762
124
    const int num_row_groups = metadata.num_row_groups();
763
124
    if (pruning_stats != nullptr) {
764
124
        pruning_stats->total_row_groups = num_row_groups;
765
124
    }
766
124
    const auto candidate_size = candidate_row_groups == nullptr
767
124
                                        ? static_cast<size_t>(num_row_groups)
768
124
                                        : candidate_row_groups->size();
769
124
    selected_row_groups->reserve(candidate_size);
770
322
    for (size_t candidate_idx = 0; candidate_idx < candidate_size; ++candidate_idx) {
771
198
        const int row_group_idx = candidate_row_groups == nullptr
772
198
                                          ? static_cast<int>(candidate_idx)
773
198
                                          : (*candidate_row_groups)[candidate_idx];
774
198
        DORIS_CHECK(row_group_idx >= 0);
775
198
        DORIS_CHECK(row_group_idx < num_row_groups);
776
198
        auto row_group = metadata.RowGroup(row_group_idx);
777
198
        if (row_group == nullptr) {
778
0
            selected_row_groups->push_back(row_group_idx);
779
0
            continue;
780
0
        }
781
198
        bool drop = false;
782
198
        RowGroupBloomFilterCache bloom_filter_cache;
783
198
        init_bloom_filter_cache(file_reader, enable_bloom_filter, &bloom_filter_cache);
784
198
        for (const auto& column_filter : request.column_predicate_filters) {
785
93
            const auto prune_reason = row_group_prune_reason(
786
93
                    *row_group, file_reader, row_group_idx, file_schema, column_filter,
787
93
                    &bloom_filter_cache, pruning_stats, timezone);
788
93
            if (prune_reason == ParquetRowGroupPruneReason::NONE) {
789
48
                continue;
790
48
            }
791
45
            drop = true;
792
45
            if (pruning_stats != nullptr) {
793
45
                pruning_stats->filtered_group_rows += row_group->num_rows();
794
45
                if (prune_reason == ParquetRowGroupPruneReason::STATISTICS) {
795
27
                    ++pruning_stats->filtered_row_groups_by_statistics;
796
27
                } else if (prune_reason == ParquetRowGroupPruneReason::DICTIONARY) {
797
18
                    ++pruning_stats->filtered_row_groups_by_dictionary;
798
18
                } else if (prune_reason == ParquetRowGroupPruneReason::BLOOM_FILTER) {
799
0
                    ++pruning_stats->filtered_row_groups_by_bloom_filter;
800
0
                }
801
45
                break;
802
45
            }
803
0
            break;
804
45
        }
805
198
        if (drop) {
806
45
            continue;
807
45
        }
808
153
        selected_row_groups->push_back(row_group_idx);
809
153
    }
810
124
    return Status::OK();
811
124
}
812
813
} // namespace
814
815
bool ParquetStatisticsUtils::BloomFilterExcludes(
816
        const ParquetColumnSchema& column_schema,
817
        const format::FileColumnPredicateFilter& column_filter,
818
10
        const ::parquet::BloomFilter& bloom_filter) {
819
10
    return bloom_filter_excludes(column_schema, column_filter, bloom_filter);
820
10
}
821
822
Status select_row_groups_by_statistics(
823
        const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader,
824
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
825
        const format::FileScanRequest& request, const std::vector<int>* candidate_row_groups,
826
        std::vector<int>* selected_row_groups, bool enable_bloom_filter,
827
124
        ParquetPruningStats* pruning_stats, const cctz::time_zone* timezone) {
828
124
    return select_row_groups(metadata, file_reader, file_schema, request, candidate_row_groups,
829
124
                             selected_row_groups, enable_bloom_filter, pruning_stats, timezone);
830
124
}
831
832
namespace {
833
834
template <typename ParquetDType>
835
bool set_page_decoded_min_max(const std::shared_ptr<::parquet::ColumnIndex>& column_index,
836
                              const ParquetColumnSchema& column_schema, size_t page_idx,
837
                              DecodedValueKind value_kind, ParquetColumnStatistics* page_statistics,
838
144
                              const cctz::time_zone* timezone) {
839
144
    const auto typed_index =
840
144
            std::static_pointer_cast<::parquet::TypedColumnIndex<ParquetDType>>(column_index);
841
144
    if (page_idx >= typed_index->min_values().size() ||
842
144
        page_idx >= typed_index->max_values().size()) {
843
0
        return false;
844
0
    }
845
144
    if (!set_decoded_field(column_schema, value_kind, typed_index->min_values()[page_idx],
846
144
                           &page_statistics->min_value, timezone) ||
847
144
        !set_decoded_field(column_schema, value_kind, typed_index->max_values()[page_idx],
848
144
                           &page_statistics->max_value, timezone)) {
849
0
        return false;
850
0
    }
851
144
    page_statistics->has_min_max = true;
852
144
    return true;
853
144
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE0EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE1EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
Line
Count
Source
838
144
                              const cctz::time_zone* timezone) {
839
144
    const auto typed_index =
840
144
            std::static_pointer_cast<::parquet::TypedColumnIndex<ParquetDType>>(column_index);
841
144
    if (page_idx >= typed_index->min_values().size() ||
842
144
        page_idx >= typed_index->max_values().size()) {
843
0
        return false;
844
0
    }
845
144
    if (!set_decoded_field(column_schema, value_kind, typed_index->min_values()[page_idx],
846
144
                           &page_statistics->min_value, timezone) ||
847
144
        !set_decoded_field(column_schema, value_kind, typed_index->max_values()[page_idx],
848
144
                           &page_statistics->max_value, timezone)) {
849
0
        return false;
850
0
    }
851
144
    page_statistics->has_min_max = true;
852
144
    return true;
853
144
}
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE2EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE4EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE5EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE
854
855
bool set_page_string_min_max(const std::shared_ptr<::parquet::ColumnIndex>& column_index,
856
                             const ParquetColumnSchema& column_schema, size_t page_idx,
857
                             ParquetColumnStatistics* page_statistics,
858
0
                             const cctz::time_zone* timezone) {
859
0
    switch (column_schema.descriptor->physical_type()) {
860
0
    case ::parquet::Type::BYTE_ARRAY: {
861
0
        const auto typed_index =
862
0
                std::static_pointer_cast<::parquet::ByteArrayColumnIndex>(column_index);
863
0
        if (page_idx >= typed_index->min_values().size() ||
864
0
            page_idx >= typed_index->max_values().size()) {
865
0
            return false;
866
0
        }
867
0
        const auto min = ::parquet::ByteArrayToString(typed_index->min_values()[page_idx]);
868
0
        const auto max = ::parquet::ByteArrayToString(typed_index->max_values()[page_idx]);
869
0
        if (!set_decoded_binary_field(column_schema, DecodedValueKind::BINARY,
870
0
                                      StringRef(min.data(), min.size()),
871
0
                                      &page_statistics->min_value, timezone) ||
872
0
            !set_decoded_binary_field(column_schema, DecodedValueKind::BINARY,
873
0
                                      StringRef(max.data(), max.size()),
874
0
                                      &page_statistics->max_value, timezone)) {
875
0
            return false;
876
0
        }
877
0
        page_statistics->has_min_max = true;
878
0
        return true;
879
0
    }
880
0
    case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
881
0
        const int type_length = column_schema.descriptor->type_length();
882
0
        if (type_length <= 0) {
883
0
            return false;
884
0
        }
885
0
        const auto typed_index = std::static_pointer_cast<::parquet::FLBAColumnIndex>(column_index);
886
0
        if (page_idx >= typed_index->min_values().size() ||
887
0
            page_idx >= typed_index->max_values().size()) {
888
0
            return false;
889
0
        }
890
0
        const std::string min(
891
0
                reinterpret_cast<const char*>(typed_index->min_values()[page_idx].ptr),
892
0
                type_length);
893
0
        const std::string max(
894
0
                reinterpret_cast<const char*>(typed_index->max_values()[page_idx].ptr),
895
0
                type_length);
896
0
        if (!set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY,
897
0
                                      StringRef(min.data(), min.size()),
898
0
                                      &page_statistics->min_value, timezone) ||
899
0
            !set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY,
900
0
                                      StringRef(max.data(), max.size()),
901
0
                                      &page_statistics->max_value, timezone)) {
902
0
            return false;
903
0
        }
904
0
        page_statistics->has_min_max = true;
905
0
        return true;
906
0
    }
907
0
    default:
908
0
        return false;
909
0
    }
910
0
}
911
912
bool set_page_min_max(const std::shared_ptr<::parquet::ColumnIndex>& column_index,
913
                      const ParquetColumnSchema& column_schema, size_t page_idx,
914
144
                      ParquetColumnStatistics* page_statistics, const cctz::time_zone* timezone) {
915
144
    DORIS_CHECK(column_schema.type != nullptr);
916
144
    switch (column_schema.descriptor->physical_type()) {
917
0
    case ::parquet::Type::BOOLEAN:
918
0
        return set_page_decoded_min_max<::parquet::BooleanType>(column_index, column_schema,
919
0
                                                                page_idx, DecodedValueKind::BOOL,
920
0
                                                                page_statistics, timezone);
921
144
    case ::parquet::Type::INT32:
922
144
        return set_page_decoded_min_max<::parquet::Int32Type>(
923
144
                column_index, column_schema, page_idx,
924
144
                decoded_value_kind(column_schema.type_descriptor), page_statistics, timezone);
925
0
    case ::parquet::Type::INT64:
926
0
        return set_page_decoded_min_max<::parquet::Int64Type>(
927
0
                column_index, column_schema, page_idx,
928
0
                decoded_value_kind(column_schema.type_descriptor), page_statistics, timezone);
929
0
    case ::parquet::Type::FLOAT:
930
0
        return set_page_decoded_min_max<::parquet::FloatType>(column_index, column_schema, page_idx,
931
0
                                                              DecodedValueKind::FLOAT,
932
0
                                                              page_statistics, timezone);
933
0
    case ::parquet::Type::DOUBLE:
934
0
        return set_page_decoded_min_max<::parquet::DoubleType>(column_index, column_schema,
935
0
                                                               page_idx, DecodedValueKind::DOUBLE,
936
0
                                                               page_statistics, timezone);
937
0
    case ::parquet::Type::BYTE_ARRAY:
938
0
    case ::parquet::Type::FIXED_LEN_BYTE_ARRAY:
939
0
        return set_page_string_min_max(column_index, column_schema, page_idx, page_statistics,
940
0
                                       timezone);
941
0
    default:
942
0
        return false;
943
144
    }
944
144
}
945
946
bool build_page_statistics(const std::shared_ptr<::parquet::ColumnIndex>& column_index,
947
                           const ParquetColumnSchema& column_schema, size_t page_idx,
948
                           ParquetColumnStatistics* page_statistics,
949
144
                           const cctz::time_zone* timezone) {
950
144
    DORIS_CHECK(page_statistics != nullptr);
951
144
    *page_statistics = ParquetColumnStatistics {};
952
953
144
    const auto& null_pages = column_index->null_pages();
954
144
    if (!column_index->has_null_counts() || page_idx >= null_pages.size() ||
955
144
        page_idx >= column_index->null_counts().size()) {
956
0
        return false;
957
0
    }
958
959
144
    page_statistics->has_null_count = true;
960
144
    page_statistics->has_null = column_index->null_counts()[page_idx] > 0;
961
144
    page_statistics->has_not_null = !null_pages[page_idx];
962
144
    if (!page_statistics->has_not_null) {
963
0
        return true;
964
0
    }
965
144
    return set_page_min_max(column_index, column_schema, page_idx, page_statistics, timezone);
966
144
}
967
968
std::vector<RowRange> intersect_ranges(const std::vector<RowRange>& left,
969
9
                                       const std::vector<RowRange>& right) {
970
9
    std::vector<RowRange> result;
971
9
    size_t left_idx = 0;
972
9
    size_t right_idx = 0;
973
18
    while (left_idx < left.size() && right_idx < right.size()) {
974
9
        const int64_t left_start = left[left_idx].start;
975
9
        const int64_t left_end = left_start + left[left_idx].length;
976
9
        const int64_t right_start = right[right_idx].start;
977
9
        const int64_t right_end = right_start + right[right_idx].length;
978
9
        const int64_t start = std::max(left_start, right_start);
979
9
        const int64_t end = std::min(left_end, right_end);
980
9
        if (start < end) {
981
8
            result.push_back(RowRange {start, end - start});
982
8
        }
983
9
        if (left_end < right_end) {
984
0
            ++left_idx;
985
9
        } else {
986
9
            ++right_idx;
987
9
        }
988
9
    }
989
9
    return result;
990
9
}
991
992
7
int64_t count_range_rows(const std::vector<RowRange>& ranges) {
993
7
    int64_t rows = 0;
994
7
    for (const auto& range : ranges) {
995
7
        rows += range.length;
996
7
    }
997
7
    return rows;
998
7
}
999
1000
RowRange page_row_range(const ::parquet::OffsetIndex& offset_index, size_t page_idx,
1001
272
                        int64_t row_group_rows) {
1002
272
    const auto& page_locations = offset_index.page_locations();
1003
272
    const int64_t start = page_locations[page_idx].first_row_index;
1004
272
    const int64_t end = page_idx + 1 == page_locations.size()
1005
272
                                ? row_group_rows
1006
272
                                : page_locations[page_idx + 1].first_row_index;
1007
272
    DORIS_CHECK(start >= 0);
1008
272
    DORIS_CHECK(end >= start);
1009
272
    DORIS_CHECK(end <= row_group_rows);
1010
272
    return RowRange {start, end - start};
1011
272
}
1012
1013
144
void append_row_range(const RowRange& range, std::vector<RowRange>* ranges) {
1014
144
    if (range.length == 0) {
1015
0
        return;
1016
0
    }
1017
144
    if (!ranges->empty()) {
1018
127
        auto& previous = ranges->back();
1019
127
        if (previous.start + previous.length == range.start) {
1020
125
            previous.length += range.length;
1021
125
            return;
1022
125
        }
1023
127
    }
1024
19
    ranges->push_back(range);
1025
19
}
1026
1027
bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group,
1028
                              const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
1029
                              const format::FileColumnPredicateFilter& column_filter,
1030
                              int64_t row_group_rows, std::vector<RowRange>* ranges,
1031
10
                              const cctz::time_zone* timezone) {
1032
10
    if (column_filter.predicates.empty()) {
1033
0
        return false;
1034
0
    }
1035
10
    const auto* column_schema = resolve_predicate_leaf_schema(file_schema, column_filter);
1036
10
    if (column_schema == nullptr || column_schema->descriptor == nullptr) {
1037
1
        return false;
1038
1
    }
1039
1040
9
    std::shared_ptr<::parquet::ColumnIndex> column_index;
1041
9
    std::shared_ptr<::parquet::OffsetIndex> offset_index;
1042
9
    try {
1043
9
        column_index = row_group->GetColumnIndex(column_schema->leaf_column_id);
1044
9
        offset_index = row_group->GetOffsetIndex(column_schema->leaf_column_id);
1045
9
    } catch (const ::parquet::ParquetException&) {
1046
0
        return false;
1047
0
    } catch (const std::exception&) {
1048
0
        return false;
1049
0
    }
1050
9
    if (column_index == nullptr || offset_index == nullptr ||
1051
9
        column_index->null_pages().size() != offset_index->page_locations().size()) {
1052
0
        return false;
1053
0
    }
1054
1055
9
    ranges->clear();
1056
9
    const auto page_count = offset_index->page_locations().size();
1057
153
    for (size_t page_idx = 0; page_idx < page_count; ++page_idx) {
1058
144
        ParquetColumnStatistics page_statistics;
1059
144
        if (!build_page_statistics(column_index, *column_schema, page_idx, &page_statistics,
1060
144
                                   timezone)) {
1061
0
            ranges->clear();
1062
0
            return false;
1063
0
        }
1064
144
        const RowRange row_range = page_row_range(*offset_index, page_idx, row_group_rows);
1065
144
        if (check_statistics(column_filter, page_statistics)) {
1066
60
            continue;
1067
60
        }
1068
84
        append_row_range(row_range, ranges);
1069
84
    }
1070
9
    return true;
1071
9
}
1072
1073
128
bool ranges_intersect(const std::vector<RowRange>& ranges, const RowRange& range) {
1074
128
    const int64_t range_end = range.start + range.length;
1075
128
    for (const auto& selected_range : ranges) {
1076
128
        const int64_t selected_end = selected_range.start + selected_range.length;
1077
128
        if (selected_end <= range.start) {
1078
8
            continue;
1079
8
        }
1080
120
        if (selected_range.start >= range_end) {
1081
52
            return false;
1082
52
        }
1083
68
        return true;
1084
120
    }
1085
8
    return false;
1086
128
}
1087
1088
void collect_leaf_schemas(const ParquetColumnSchema& column_schema,
1089
                          const format::LocalColumnIndex* projection,
1090
3
                          std::vector<const ParquetColumnSchema*>* leaf_schemas) {
1091
3
    if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) {
1092
3
        leaf_schemas->push_back(&column_schema);
1093
3
        return;
1094
3
    }
1095
0
    for (const auto& child_schema : column_schema.children) {
1096
0
        if (!format::is_child_projected(projection, child_schema->local_id)) {
1097
0
            continue;
1098
0
        }
1099
0
        const auto* child_projection =
1100
0
                format::find_child_projection(projection, child_schema->local_id);
1101
0
        collect_leaf_schemas(*child_schema, child_projection, leaf_schemas);
1102
0
    }
1103
0
}
1104
1105
void collect_request_leaf_schemas(
1106
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
1107
        const format::FileScanRequest& request,
1108
7
        std::vector<const ParquetColumnSchema*>* leaf_schemas) {
1109
7
    std::set<int> seen_leaf_ids;
1110
7
    auto collect_projection = [&](const format::LocalColumnIndex& projection) {
1111
3
        const int32_t local_id = projection.local_id();
1112
3
        if (local_id < 0 || local_id >= static_cast<int32_t>(file_schema.size())) {
1113
0
            return;
1114
0
        }
1115
3
        std::vector<const ParquetColumnSchema*> projection_leaf_schemas;
1116
3
        collect_leaf_schemas(*file_schema[local_id], &projection, &projection_leaf_schemas);
1117
3
        for (const auto* leaf_schema : projection_leaf_schemas) {
1118
3
            DORIS_CHECK(leaf_schema != nullptr);
1119
3
            if (seen_leaf_ids.insert(leaf_schema->leaf_column_id).second) {
1120
3
                leaf_schemas->push_back(leaf_schema);
1121
3
            }
1122
3
        }
1123
3
    };
1124
7
    for (const auto& projection : request.predicate_columns) {
1125
1
        collect_projection(projection);
1126
1
    }
1127
7
    for (const auto& projection : request.non_predicate_columns) {
1128
2
        collect_projection(projection);
1129
2
    }
1130
8
    for (const auto& column_filter : request.column_predicate_filters) {
1131
8
        const auto* leaf_schema = resolve_predicate_leaf_schema(file_schema, column_filter);
1132
8
        if (leaf_schema == nullptr) {
1133
1
            continue;
1134
1
        }
1135
7
        if (seen_leaf_ids.insert(leaf_schema->leaf_column_id).second) {
1136
5
            leaf_schemas->push_back(leaf_schema);
1137
5
        }
1138
7
    }
1139
7
}
1140
1141
bool build_page_skip_plan_for_leaf(
1142
        const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group,
1143
        const ParquetColumnSchema& column_schema, const std::vector<RowRange>& selected_ranges,
1144
8
        int64_t row_group_rows, ParquetPageSkipPlan* page_skip_plan) {
1145
8
    DORIS_CHECK(page_skip_plan != nullptr);
1146
8
    *page_skip_plan = ParquetPageSkipPlan {};
1147
    // OffsetIndex first_row_index is row-based only for non-repeated leaves. LIST/MAP/repeated
1148
    // leaves need repetition-level-aware range mapping and are intentionally left out for now.
1149
8
    if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE ||
1150
8
        column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0 ||
1151
8
        column_schema.descriptor->max_repetition_level() != 0) {
1152
0
        return false;
1153
0
    }
1154
1155
8
    std::shared_ptr<::parquet::OffsetIndex> offset_index;
1156
8
    try {
1157
8
        offset_index = row_group->GetOffsetIndex(column_schema.leaf_column_id);
1158
8
    } catch (const ::parquet::ParquetException&) {
1159
0
        return false;
1160
0
    } catch (const std::exception&) {
1161
0
        return false;
1162
0
    }
1163
8
    if (offset_index == nullptr) {
1164
0
        return false;
1165
0
    }
1166
1167
8
    const auto page_count = offset_index->page_locations().size();
1168
8
    page_skip_plan->leaf_column_id = column_schema.leaf_column_id;
1169
8
    page_skip_plan->skipped_pages.resize(page_count);
1170
8
    page_skip_plan->skipped_page_compressed_sizes.resize(page_count);
1171
8
    const auto& page_locations = offset_index->page_locations();
1172
136
    for (size_t page_idx = 0; page_idx < page_count; ++page_idx) {
1173
128
        const RowRange row_range = page_row_range(*offset_index, page_idx, row_group_rows);
1174
128
        if (row_range.length == 0 || ranges_intersect(selected_ranges, row_range)) {
1175
68
            continue;
1176
68
        }
1177
60
        page_skip_plan->skipped_pages[page_idx] = 1;
1178
60
        page_skip_plan->skipped_page_compressed_sizes[page_idx] =
1179
60
                page_locations[page_idx].compressed_page_size;
1180
60
        append_row_range(row_range, &page_skip_plan->skipped_ranges);
1181
60
    }
1182
8
    if (page_skip_plan->empty()) {
1183
0
        *page_skip_plan = ParquetPageSkipPlan {};
1184
0
        return false;
1185
0
    }
1186
8
    return true;
1187
8
}
1188
1189
void build_page_skip_plans(const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group,
1190
                           const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
1191
                           const format::FileScanRequest& request,
1192
                           const std::vector<RowRange>& selected_ranges, int64_t row_group_rows,
1193
7
                           std::map<int, ParquetPageSkipPlan>* page_skip_plans) {
1194
7
    DORIS_CHECK(page_skip_plans != nullptr);
1195
7
    page_skip_plans->clear();
1196
7
    std::vector<const ParquetColumnSchema*> leaf_schemas;
1197
7
    collect_request_leaf_schemas(file_schema, request, &leaf_schemas);
1198
8
    for (const auto* leaf_schema : leaf_schemas) {
1199
8
        DORIS_CHECK(leaf_schema != nullptr);
1200
8
        ParquetPageSkipPlan page_skip_plan;
1201
8
        if (build_page_skip_plan_for_leaf(row_group, *leaf_schema, selected_ranges, row_group_rows,
1202
8
                                          &page_skip_plan)) {
1203
8
            page_skip_plans->emplace(page_skip_plan.leaf_column_id, std::move(page_skip_plan));
1204
8
        }
1205
8
    }
1206
7
}
1207
1208
} // namespace
1209
1210
Status select_row_group_ranges_by_page_index(
1211
        ::parquet::ParquetFileReader* file_reader,
1212
        const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema,
1213
        const format::FileScanRequest& request, int row_group_idx, int64_t row_group_rows,
1214
        std::vector<RowRange>* selected_ranges, std::map<int, ParquetPageSkipPlan>* page_skip_plans,
1215
146
        ParquetPruningStats* pruning_stats, const cctz::time_zone* timezone) {
1216
146
    int64_t page_index_filter_time_sink = 0;
1217
146
    SCOPED_RAW_TIMER(pruning_stats == nullptr ? &page_index_filter_time_sink
1218
146
                                              : &pruning_stats->page_index_filter_time);
1219
146
    DORIS_CHECK(selected_ranges != nullptr);
1220
146
    selected_ranges->clear();
1221
146
    if (page_skip_plans != nullptr) {
1222
146
        page_skip_plans->clear();
1223
146
    }
1224
146
    if (row_group_rows <= 0) {
1225
0
        return Status::OK();
1226
0
    }
1227
146
    selected_ranges->push_back(RowRange {0, row_group_rows});
1228
146
    if (!config::enable_parquet_page_index || request.column_predicate_filters.empty() ||
1229
146
        file_reader == nullptr) {
1230
108
        return Status::OK();
1231
108
    }
1232
1233
38
    std::shared_ptr<::parquet::PageIndexReader> page_index_reader;
1234
38
    std::shared_ptr<::parquet::RowGroupPageIndexReader> row_group_index_reader;
1235
38
    try {
1236
38
        if (pruning_stats != nullptr) {
1237
38
            ++pruning_stats->page_index_read_calls;
1238
38
        }
1239
38
        {
1240
38
            int64_t read_page_index_time_sink = 0;
1241
38
            SCOPED_RAW_TIMER(pruning_stats == nullptr ? &read_page_index_time_sink
1242
38
                                                      : &pruning_stats->read_page_index_time);
1243
38
            page_index_reader = file_reader->GetPageIndexReader();
1244
38
            if (page_index_reader == nullptr) {
1245
0
                return Status::OK();
1246
0
            }
1247
38
            row_group_index_reader = page_index_reader->RowGroup(row_group_idx);
1248
38
        }
1249
38
    } catch (const ::parquet::ParquetException&) {
1250
0
        return Status::OK();
1251
0
    } catch (const std::exception&) {
1252
0
        return Status::OK();
1253
0
    }
1254
38
    if (row_group_index_reader == nullptr) {
1255
30
        return Status::OK();
1256
30
    }
1257
1258
10
    for (const auto& column_filter : request.column_predicate_filters) {
1259
10
        std::vector<RowRange> filter_ranges;
1260
10
        if (!select_ranges_for_filter(row_group_index_reader, file_schema, column_filter,
1261
10
                                      row_group_rows, &filter_ranges, timezone)) {
1262
1
            continue;
1263
1
        }
1264
9
        *selected_ranges = intersect_ranges(*selected_ranges, filter_ranges);
1265
9
        if (selected_ranges->empty()) {
1266
1
            if (page_skip_plans != nullptr) {
1267
1
                page_skip_plans->clear();
1268
1
            }
1269
1
            if (pruning_stats != nullptr) {
1270
1
                pruning_stats->filtered_page_rows += row_group_rows;
1271
1
                ++pruning_stats->filtered_row_groups_by_page_index;
1272
1
            }
1273
1
            return Status::OK();
1274
1
        }
1275
9
    }
1276
7
    if (page_skip_plans != nullptr) {
1277
7
        build_page_skip_plans(row_group_index_reader, file_schema, request, *selected_ranges,
1278
7
                              row_group_rows, page_skip_plans);
1279
7
    }
1280
7
    if (pruning_stats != nullptr) {
1281
7
        const int64_t selected_rows = count_range_rows(*selected_ranges);
1282
7
        DORIS_CHECK(selected_rows <= row_group_rows);
1283
7
        pruning_stats->filtered_page_rows += row_group_rows - selected_rows;
1284
7
    }
1285
7
    return Status::OK();
1286
8
}
1287
1288
} // namespace doris::format::parquet