be/src/format_v2/parquet/parquet_statistics.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // http://www.apache.org/licenses/LICENSE-2.0 |
9 | | // Unless required by applicable law or agreed to in writing, |
10 | | // software distributed under the License is distributed on an |
11 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
12 | | // KIND, either express or implied. See the License for the |
13 | | // specific language governing permissions and limitations |
14 | | // under the License. |
15 | | |
16 | | #include "format_v2/parquet/parquet_statistics.h" |
17 | | |
18 | | #include <parquet/api/reader.h> |
19 | | #include <parquet/bloom_filter.h> |
20 | | #include <parquet/bloom_filter_reader.h> |
21 | | #include <parquet/column_page.h> |
22 | | #include <parquet/encoding.h> |
23 | | #include <parquet/page_index.h> |
24 | | #include <parquet/statistics.h> |
25 | | #include <parquet/types.h> |
26 | | |
27 | | #include <algorithm> |
28 | | #include <cstddef> |
29 | | #include <cstring> |
30 | | #include <exception> |
31 | | #include <map> |
32 | | #include <memory> |
33 | | #include <set> |
34 | | #include <string> |
35 | | #include <utility> |
36 | | #include <vector> |
37 | | |
38 | | #include "common/config.h" |
39 | | #include "core/data_type/data_type.h" |
40 | | #include "core/data_type/data_type_nullable.h" |
41 | | #include "core/data_type_serde/data_type_serde.h" |
42 | | #include "core/field.h" |
43 | | #include "format_v2/parquet/parquet_column_schema.h" |
44 | | #include "runtime/runtime_profile.h" |
45 | | #include "storage/index/zone_map/zone_map_index.h" |
46 | | #include "storage/predicate/accept_null_predicate.h" |
47 | | #include "storage/predicate/column_predicate.h" |
48 | | |
49 | | namespace doris::format::parquet { |
50 | | |
51 | | namespace { |
52 | | |
53 | | enum class ParquetRowGroupPruneReason { |
54 | | NONE, // cannot prune; must read |
55 | | STATISTICS, // excluded by min/max statistics |
56 | | DICTIONARY, // excluded by dictionary |
57 | | BLOOM_FILTER, // excluded by bloom filter |
58 | | }; |
59 | | |
60 | 59 | PrimitiveType physical_filter_type(const ParquetColumnSchema& column_schema) { |
61 | 59 | if (column_schema.type == nullptr) { |
62 | 0 | return INVALID_TYPE; |
63 | 0 | } |
64 | 59 | switch (remove_nullable(column_schema.type)->get_primitive_type()) { |
65 | 4 | case TYPE_BOOLEAN: |
66 | 41 | case TYPE_INT: |
67 | 41 | case TYPE_BIGINT: |
68 | 41 | case TYPE_FLOAT: |
69 | 41 | case TYPE_DOUBLE: |
70 | 58 | case TYPE_STRING: |
71 | 58 | return remove_nullable(column_schema.type)->get_primitive_type(); |
72 | 1 | default: |
73 | 1 | return INVALID_TYPE; |
74 | 59 | } |
75 | 59 | } |
76 | | |
77 | 432 | DecodedTimeUnit decoded_time_unit(ParquetTimeUnit time_unit) { |
78 | 432 | switch (time_unit) { |
79 | 0 | case ParquetTimeUnit::MILLIS: |
80 | 0 | return DecodedTimeUnit::MILLIS; |
81 | 2 | case ParquetTimeUnit::MICROS: |
82 | 2 | return DecodedTimeUnit::MICROS; |
83 | 0 | case ParquetTimeUnit::NANOS: |
84 | 0 | return DecodedTimeUnit::NANOS; |
85 | 430 | default: |
86 | 430 | return DecodedTimeUnit::UNKNOWN; |
87 | 432 | } |
88 | 432 | } |
89 | | |
90 | | Status read_decoded_field(const ParquetColumnSchema& column_schema, DecodedColumnView view, |
91 | 432 | Field* field, const cctz::time_zone* timezone) { |
92 | 432 | DORIS_CHECK(column_schema.type != nullptr); |
93 | 432 | DORIS_CHECK(field != nullptr); |
94 | 432 | constexpr uint8_t not_null = 0; |
95 | 432 | view.row_count = 1; |
96 | 432 | view.null_map = ¬_null; |
97 | 432 | view.time_unit = decoded_time_unit(column_schema.type_descriptor.time_unit); |
98 | 432 | view.logical_integer_bit_width = column_schema.type_descriptor.integer_bit_width; |
99 | 432 | view.logical_integer_is_signed = !column_schema.type_descriptor.is_unsigned_integer; |
100 | 432 | view.decimal_precision = column_schema.type_descriptor.decimal_precision; |
101 | 432 | view.decimal_scale = column_schema.type_descriptor.decimal_scale; |
102 | 432 | view.fixed_length = column_schema.type_descriptor.fixed_length; |
103 | 432 | view.timestamp_is_adjusted_to_utc = column_schema.type_descriptor.timestamp_is_adjusted_to_utc; |
104 | 432 | view.timezone = timezone; |
105 | 432 | return column_schema.type->get_serde()->read_field_from_decoded_value(*column_schema.type, |
106 | 432 | field, view); |
107 | 432 | } |
108 | | |
109 | | template <typename NativeType> |
110 | | bool set_decoded_field(const ParquetColumnSchema& column_schema, DecodedValueKind value_kind, |
111 | 426 | const NativeType& value, Field* field, const cctz::time_zone* timezone) { |
112 | 426 | DecodedColumnView view; |
113 | 426 | view.value_kind = value_kind; |
114 | 426 | view.values = reinterpret_cast<const uint8_t*>(&value); |
115 | 426 | return read_decoded_field(column_schema, view, field, timezone).ok(); |
116 | 426 | } Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIbEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIiEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE Line | Count | Source | 111 | 424 | const NativeType& value, Field* field, const cctz::time_zone* timezone) { | 112 | 424 | DecodedColumnView view; | 113 | 424 | view.value_kind = value_kind; | 114 | 424 | view.values = reinterpret_cast<const uint8_t*>(&value); | 115 | 424 | return read_decoded_field(column_schema, view, field, timezone).ok(); | 116 | 424 | } |
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIlEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE Line | Count | Source | 111 | 2 | const NativeType& value, Field* field, const cctz::time_zone* timezone) { | 112 | 2 | DecodedColumnView view; | 113 | 2 | view.value_kind = value_kind; | 114 | 2 | view.values = reinterpret_cast<const uint8_t*>(&value); | 115 | 2 | return read_decoded_field(column_schema, view, field, timezone).ok(); | 116 | 2 | } |
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIfEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_117set_decoded_fieldIdEEbRKNS1_19ParquetColumnSchemaENS_16DecodedValueKindERKT_PNS_5FieldEPKN4cctz9time_zoneE |
117 | | |
118 | | template <typename ParquetDType> |
119 | | bool set_decoded_min_max(const std::shared_ptr<::parquet::Statistics>& statistics, |
120 | | const ParquetColumnSchema& column_schema, DecodedValueKind value_kind, |
121 | | ParquetColumnStatistics* column_statistics, |
122 | 69 | const cctz::time_zone* timezone) { |
123 | 69 | auto typed_statistics = |
124 | 69 | std::static_pointer_cast<::parquet::TypedStatistics<ParquetDType>>(statistics); |
125 | 69 | if (!set_decoded_field(column_schema, value_kind, typed_statistics->min(), |
126 | 69 | &column_statistics->min_value, timezone) || |
127 | 69 | !set_decoded_field(column_schema, value_kind, typed_statistics->max(), |
128 | 69 | &column_statistics->max_value, timezone)) { |
129 | 0 | return false; |
130 | 0 | } |
131 | 69 | return true; |
132 | 69 | } Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE0EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE1EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE Line | Count | Source | 122 | 68 | const cctz::time_zone* timezone) { | 123 | 68 | auto typed_statistics = | 124 | 68 | std::static_pointer_cast<::parquet::TypedStatistics<ParquetDType>>(statistics); | 125 | 68 | if (!set_decoded_field(column_schema, value_kind, typed_statistics->min(), | 126 | 68 | &column_statistics->min_value, timezone) || | 127 | 68 | !set_decoded_field(column_schema, value_kind, typed_statistics->max(), | 128 | 68 | &column_statistics->max_value, timezone)) { | 129 | 0 | return false; | 130 | 0 | } | 131 | 68 | return true; | 132 | 68 | } |
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE2EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE Line | Count | Source | 122 | 1 | const cctz::time_zone* timezone) { | 123 | 1 | auto typed_statistics = | 124 | 1 | std::static_pointer_cast<::parquet::TypedStatistics<ParquetDType>>(statistics); | 125 | 1 | if (!set_decoded_field(column_schema, value_kind, typed_statistics->min(), | 126 | 1 | &column_statistics->min_value, timezone) || | 127 | 1 | !set_decoded_field(column_schema, value_kind, typed_statistics->max(), | 128 | 1 | &column_statistics->max_value, timezone)) { | 129 | 0 | return false; | 130 | 0 | } | 131 | 1 | return true; | 132 | 1 | } |
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE4EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_119set_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE5EEEEEbRKSt10shared_ptrINS4_10StatisticsEERKNS1_19ParquetColumnSchemaENS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE |
133 | | |
134 | | bool set_decoded_binary_field(const ParquetColumnSchema& column_schema, DecodedValueKind value_kind, |
135 | | const StringRef& value, Field* field, |
136 | 6 | const cctz::time_zone* timezone) { |
137 | 6 | std::vector<StringRef> binary_values {value}; |
138 | 6 | DecodedColumnView view; |
139 | 6 | view.value_kind = value_kind; |
140 | 6 | view.binary_values = &binary_values; |
141 | 6 | return read_decoded_field(column_schema, view, field, timezone).ok(); |
142 | 6 | } |
143 | | |
144 | | bool set_string_min_max(const std::shared_ptr<::parquet::Statistics>& statistics, |
145 | | const ParquetColumnSchema& column_schema, |
146 | | ParquetColumnStatistics* column_statistics, |
147 | 3 | const cctz::time_zone* timezone) { |
148 | 3 | switch (statistics->physical_type()) { |
149 | 3 | case ::parquet::Type::BYTE_ARRAY: { |
150 | 3 | auto typed_statistics = |
151 | 3 | std::static_pointer_cast<::parquet::TypedStatistics<::parquet::ByteArrayType>>( |
152 | 3 | statistics); |
153 | 3 | const auto min = ::parquet::ByteArrayToString(typed_statistics->min()); |
154 | 3 | const auto max = ::parquet::ByteArrayToString(typed_statistics->max()); |
155 | 3 | if (!set_decoded_binary_field(column_schema, DecodedValueKind::BINARY, |
156 | 3 | StringRef(min.data(), min.size()), |
157 | 3 | &column_statistics->min_value, timezone) || |
158 | 3 | !set_decoded_binary_field(column_schema, DecodedValueKind::BINARY, |
159 | 3 | StringRef(max.data(), max.size()), |
160 | 3 | &column_statistics->max_value, timezone)) { |
161 | 0 | return false; |
162 | 0 | } |
163 | 3 | return true; |
164 | 3 | } |
165 | 0 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { |
166 | 0 | if (column_schema.descriptor == nullptr || column_schema.descriptor->type_length() <= 0) { |
167 | 0 | return false; |
168 | 0 | } |
169 | 0 | auto typed_statistics = |
170 | 0 | std::static_pointer_cast<::parquet::TypedStatistics<::parquet::FLBAType>>( |
171 | 0 | statistics); |
172 | 0 | const int type_length = column_schema.descriptor->type_length(); |
173 | 0 | const std::string min(reinterpret_cast<const char*>(typed_statistics->min().ptr), |
174 | 0 | type_length); |
175 | 0 | const std::string max(reinterpret_cast<const char*>(typed_statistics->max().ptr), |
176 | 0 | type_length); |
177 | 0 | if (!set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY, |
178 | 0 | StringRef(min.data(), min.size()), |
179 | 0 | &column_statistics->min_value, timezone) || |
180 | 0 | !set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY, |
181 | 0 | StringRef(max.data(), max.size()), |
182 | 0 | &column_statistics->max_value, timezone)) { |
183 | 0 | return false; |
184 | 0 | } |
185 | 0 | return true; |
186 | 0 | } |
187 | 0 | default: |
188 | 0 | return false; |
189 | 3 | } |
190 | 3 | } |
191 | | |
192 | 244 | bool is_null_only_predicate(const ColumnPredicate& predicate) { |
193 | 244 | return predicate.type() == PredicateType::IS_NULL || |
194 | 244 | predicate.type() == PredicateType::IS_NOT_NULL; |
195 | 244 | } |
196 | | |
197 | 31 | bool is_supported_dictionary_predicate(const ColumnPredicate& predicate) { |
198 | 31 | switch (predicate.type()) { |
199 | 21 | case PredicateType::EQ: |
200 | 31 | case PredicateType::IN_LIST: |
201 | 31 | return true; |
202 | 0 | default: |
203 | 0 | return false; |
204 | 31 | } |
205 | 31 | } |
206 | | |
207 | 48 | bool is_bloom_filter_prunable_predicate(const ColumnPredicate& predicate) { |
208 | 48 | if (dynamic_cast<const AcceptNullPredicate*>(&predicate) != nullptr || |
209 | 48 | is_null_only_predicate(predicate)) { |
210 | 4 | return false; |
211 | 4 | } |
212 | 44 | return predicate.can_do_bloom_filter(false); |
213 | 48 | } |
214 | | |
215 | | template <typename T> |
216 | 8 | T load_predicate_value(const char* data) { |
217 | 8 | T value; |
218 | 8 | memcpy(&value, data, sizeof(T)); |
219 | 8 | return value; |
220 | 8 | } parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIbEET_PKc Line | Count | Source | 216 | 2 | T load_predicate_value(const char* data) { | 217 | 2 | T value; | 218 | 2 | memcpy(&value, data, sizeof(T)); | 219 | 2 | return value; | 220 | 2 | } |
parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIiEET_PKc Line | Count | Source | 216 | 6 | T load_predicate_value(const char* data) { | 217 | 6 | T value; | 218 | 6 | memcpy(&value, data, sizeof(T)); | 219 | 6 | return value; | 220 | 6 | } |
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIaEET_PKc Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIsEET_PKc Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIlEET_PKc Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIfEET_PKc Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_120load_predicate_valueIdEET_PKc |
221 | | |
222 | | class ArrowParquetBloomFilterAdapter final : public segment_v2::BloomFilter { |
223 | | public: |
224 | | ArrowParquetBloomFilterAdapter(const ParquetColumnSchema& column_schema, |
225 | | const ::parquet::BloomFilter& bloom_filter) |
226 | 9 | : _column_schema(column_schema), _bloom_filter(bloom_filter) {} |
227 | | |
228 | 0 | void add_bytes(const char* buf, size_t size) override { DORIS_CHECK(false); } |
229 | | |
230 | 10 | bool test_bytes(const char* buf, size_t size) const override { |
231 | 10 | if (buf == nullptr) { |
232 | 0 | return true; |
233 | 0 | } |
234 | 10 | switch (physical_filter_type(_column_schema)) { |
235 | 2 | case TYPE_BOOLEAN: |
236 | 2 | return test_boolean(buf, size); |
237 | 6 | case TYPE_INT: |
238 | 6 | return test_int32(buf, size); |
239 | 0 | case TYPE_BIGINT: |
240 | 0 | return test_int64(buf, size); |
241 | 0 | case TYPE_FLOAT: |
242 | 0 | return test_float(buf, size); |
243 | 0 | case TYPE_DOUBLE: |
244 | 0 | return test_double(buf, size); |
245 | 2 | case TYPE_STRING: |
246 | 2 | return test_string(buf, size); |
247 | 0 | default: |
248 | 0 | return true; |
249 | 10 | } |
250 | 10 | } |
251 | | |
252 | 0 | void set_has_null(bool has_null) override { DORIS_CHECK(!has_null); } |
253 | 0 | bool has_null() const override { return false; } |
254 | 0 | void add_hash(uint64_t hash) override { DORIS_CHECK(false); } |
255 | 0 | bool test_hash(uint64_t hash) const override { return _bloom_filter.FindHash(hash); } |
256 | | |
257 | | private: |
258 | 2 | bool test_boolean(const char* buf, size_t size) const { |
259 | 2 | if (size == sizeof(bool)) { |
260 | 2 | const int32_t value = load_predicate_value<bool>(buf) ? 1 : 0; |
261 | 2 | return _bloom_filter.FindHash(_bloom_filter.Hash(value)); |
262 | 2 | } |
263 | 0 | if (size == sizeof(int32_t)) { |
264 | 0 | const int32_t value = load_predicate_value<int32_t>(buf); |
265 | 0 | return _bloom_filter.FindHash(_bloom_filter.Hash(value != 0 ? 1 : 0)); |
266 | 0 | } |
267 | 0 | return true; |
268 | 0 | } |
269 | | |
270 | 6 | bool test_int32(const char* buf, size_t size) const { |
271 | 6 | if (size == sizeof(int8_t)) { |
272 | 0 | return find_int32(static_cast<int32_t>(load_predicate_value<int8_t>(buf))); |
273 | 0 | } |
274 | 6 | if (size == sizeof(int16_t)) { |
275 | 0 | return find_int32(static_cast<int32_t>(load_predicate_value<int16_t>(buf))); |
276 | 0 | } |
277 | 6 | if (size == sizeof(int32_t)) { |
278 | 6 | return find_int32(load_predicate_value<int32_t>(buf)); |
279 | 6 | } |
280 | 0 | return true; |
281 | 6 | } |
282 | | |
283 | 0 | bool test_int64(const char* buf, size_t size) const { |
284 | 0 | if (size != sizeof(int64_t)) { |
285 | 0 | return true; |
286 | 0 | } |
287 | 0 | const int64_t value = load_predicate_value<int64_t>(buf); |
288 | 0 | return _bloom_filter.FindHash(_bloom_filter.Hash(value)); |
289 | 0 | } |
290 | | |
291 | 0 | bool test_float(const char* buf, size_t size) const { |
292 | 0 | if (size != sizeof(float)) { |
293 | 0 | return true; |
294 | 0 | } |
295 | 0 | const float value = load_predicate_value<float>(buf); |
296 | 0 | return _bloom_filter.FindHash(_bloom_filter.Hash(value)); |
297 | 0 | } |
298 | | |
299 | 0 | bool test_double(const char* buf, size_t size) const { |
300 | 0 | if (size != sizeof(double)) { |
301 | 0 | return true; |
302 | 0 | } |
303 | 0 | const double value = load_predicate_value<double>(buf); |
304 | 0 | return _bloom_filter.FindHash(_bloom_filter.Hash(value)); |
305 | 0 | } |
306 | | |
307 | 2 | bool test_string(const char* buf, size_t size) const { |
308 | 2 | ::parquet::ByteArray value(static_cast<uint32_t>(size), |
309 | 2 | reinterpret_cast<const uint8_t*>(buf)); |
310 | 2 | return _bloom_filter.FindHash(_bloom_filter.Hash(&value)); |
311 | 2 | } |
312 | | |
313 | 6 | bool find_int32(int32_t value) const { |
314 | 6 | return _bloom_filter.FindHash(_bloom_filter.Hash(value)); |
315 | 6 | } |
316 | | |
317 | | const ParquetColumnSchema& _column_schema; |
318 | | const ::parquet::BloomFilter& _bloom_filter; |
319 | | }; |
320 | | |
321 | | const ParquetColumnSchema* resolve_predicate_leaf_schema( |
322 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, |
323 | | const format::FileColumnPredicateFilter& column_filter); |
324 | | |
325 | 49 | bool bloom_filter_supported(const ParquetColumnSchema& column_schema) { |
326 | 49 | switch (physical_filter_type(column_schema)) { |
327 | 2 | case TYPE_BOOLEAN: |
328 | 33 | case TYPE_INT: |
329 | 33 | case TYPE_BIGINT: |
330 | 33 | case TYPE_FLOAT: |
331 | 33 | case TYPE_DOUBLE: |
332 | 48 | case TYPE_STRING: |
333 | 48 | return true; |
334 | 1 | default: |
335 | 1 | return false; |
336 | 49 | } |
337 | 49 | } |
338 | | |
339 | | bool bloom_filter_excludes(const ParquetColumnSchema& column_schema, |
340 | | const format::FileColumnPredicateFilter& column_filter, |
341 | 10 | const ::parquet::BloomFilter& bloom_filter) { |
342 | 10 | if (!bloom_filter_supported(column_schema)) { |
343 | 1 | return false; |
344 | 1 | } |
345 | 9 | ArrowParquetBloomFilterAdapter adapter(column_schema, bloom_filter); |
346 | 9 | for (const auto& column_predicate : column_filter.predicates) { |
347 | 9 | if (column_predicate == nullptr || !is_bloom_filter_prunable_predicate(*column_predicate)) { |
348 | 1 | return false; |
349 | 1 | } |
350 | 8 | if (!column_predicate->evaluate_and(&adapter)) { |
351 | 4 | return true; |
352 | 4 | } |
353 | 8 | } |
354 | 4 | return false; |
355 | 9 | } |
356 | | |
357 | | struct RowGroupBloomFilterCache { |
358 | | ::parquet::BloomFilterReader* bloom_filter_reader = nullptr; |
359 | | std::map<int, std::unique_ptr<::parquet::BloomFilter>> column_bloom_filters; |
360 | | std::set<int> loaded_columns; |
361 | | |
362 | | ::parquet::BloomFilter* get(int row_group_idx, int leaf_column_id, |
363 | 13 | ParquetPruningStats* pruning_stats) { |
364 | 13 | if (bloom_filter_reader == nullptr || leaf_column_id < 0) { |
365 | 5 | return nullptr; |
366 | 5 | } |
367 | 8 | if (loaded_columns.find(leaf_column_id) == loaded_columns.end()) { |
368 | 8 | loaded_columns.insert(leaf_column_id); |
369 | 8 | try { |
370 | 8 | std::shared_ptr<::parquet::RowGroupBloomFilterReader> row_group_reader; |
371 | 8 | if (pruning_stats != nullptr) { |
372 | 8 | SCOPED_RAW_TIMER(&pruning_stats->bloom_filter_read_time); |
373 | 8 | row_group_reader = bloom_filter_reader->RowGroup(row_group_idx); |
374 | 8 | if (row_group_reader != nullptr) { |
375 | 8 | column_bloom_filters[leaf_column_id] = |
376 | 8 | row_group_reader->GetColumnBloomFilter(leaf_column_id); |
377 | 8 | } |
378 | 8 | } else { |
379 | 0 | row_group_reader = bloom_filter_reader->RowGroup(row_group_idx); |
380 | 0 | if (row_group_reader != nullptr) { |
381 | 0 | column_bloom_filters[leaf_column_id] = |
382 | 0 | row_group_reader->GetColumnBloomFilter(leaf_column_id); |
383 | 0 | } |
384 | 0 | } |
385 | 8 | } catch (const ::parquet::ParquetException&) { |
386 | 0 | return nullptr; |
387 | 0 | } catch (const std::exception&) { |
388 | 0 | return nullptr; |
389 | 0 | } |
390 | 8 | } |
391 | 8 | auto it = column_bloom_filters.find(leaf_column_id); |
392 | 8 | return it == column_bloom_filters.end() ? nullptr : it->second.get(); |
393 | 8 | } |
394 | | }; |
395 | | |
396 | | ParquetRowGroupPruneReason bloom_filter_prune_reason( |
397 | | int row_group_idx, const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, |
398 | | const format::FileColumnPredicateFilter& column_filter, |
399 | 39 | RowGroupBloomFilterCache* bloom_filter_cache, ParquetPruningStats* pruning_stats) { |
400 | 39 | if (bloom_filter_cache == nullptr || column_filter.predicates.empty()) { |
401 | 0 | return ParquetRowGroupPruneReason::NONE; |
402 | 0 | } |
403 | 39 | const auto* column_schema = resolve_predicate_leaf_schema(schema, column_filter); |
404 | 39 | if (column_schema == nullptr || !bloom_filter_supported(*column_schema)) { |
405 | 0 | return ParquetRowGroupPruneReason::NONE; |
406 | 0 | } |
407 | 39 | for (const auto& column_predicate : column_filter.predicates) { |
408 | 39 | if (column_predicate == nullptr || !is_bloom_filter_prunable_predicate(*column_predicate)) { |
409 | 26 | return ParquetRowGroupPruneReason::NONE; |
410 | 26 | } |
411 | 39 | } |
412 | 13 | auto* bloom_filter = |
413 | 13 | bloom_filter_cache->get(row_group_idx, column_schema->leaf_column_id, pruning_stats); |
414 | 13 | if (bloom_filter == nullptr) { |
415 | 13 | return ParquetRowGroupPruneReason::NONE; |
416 | 13 | } |
417 | 0 | return bloom_filter_excludes(*column_schema, column_filter, *bloom_filter) |
418 | 0 | ? ParquetRowGroupPruneReason::BLOOM_FILTER |
419 | 0 | : ParquetRowGroupPruneReason::NONE; |
420 | 13 | } |
421 | | |
422 | 27 | bool is_dictionary_data_encoding(::parquet::Encoding::type encoding) { |
423 | 27 | return encoding == ::parquet::Encoding::PLAIN_DICTIONARY || |
424 | 27 | encoding == ::parquet::Encoding::RLE_DICTIONARY; |
425 | 27 | } |
426 | | |
427 | 0 | bool is_level_encoding(::parquet::Encoding::type encoding) { |
428 | 0 | return encoding == ::parquet::Encoding::RLE || encoding == ::parquet::Encoding::BIT_PACKED; |
429 | 0 | } |
430 | | |
431 | 54 | bool is_data_page_type(::parquet::PageType::type page_type) { |
432 | 54 | return page_type == ::parquet::PageType::DATA_PAGE || |
433 | 54 | page_type == ::parquet::PageType::DATA_PAGE_V2; |
434 | 54 | } |
435 | | |
436 | 31 | bool is_dictionary_encoded_chunk(const ::parquet::ColumnChunkMetaData& column_metadata) { |
437 | 31 | if (!column_metadata.has_dictionary_page()) { |
438 | 4 | return false; |
439 | 4 | } |
440 | | |
441 | 27 | const auto& encoding_stats = column_metadata.encoding_stats(); |
442 | 27 | if (!encoding_stats.empty()) { |
443 | 27 | bool has_dictionary_data_page = false; |
444 | 54 | for (const auto& encoding_stat : encoding_stats) { |
445 | 54 | if (!is_data_page_type(encoding_stat.page_type) || encoding_stat.count <= 0) { |
446 | 27 | continue; |
447 | 27 | } |
448 | 27 | if (!is_dictionary_data_encoding(encoding_stat.encoding)) { |
449 | 0 | return false; |
450 | 0 | } |
451 | 27 | has_dictionary_data_page = true; |
452 | 27 | } |
453 | 27 | return has_dictionary_data_page; |
454 | 27 | } |
455 | | |
456 | 0 | bool has_dictionary_encoding = false; |
457 | 0 | for (const auto encoding : column_metadata.encodings()) { |
458 | 0 | if (is_dictionary_data_encoding(encoding)) { |
459 | 0 | has_dictionary_encoding = true; |
460 | 0 | continue; |
461 | 0 | } |
462 | 0 | if (!is_level_encoding(encoding)) { |
463 | 0 | return false; |
464 | 0 | } |
465 | 0 | } |
466 | 0 | return has_dictionary_encoding; |
467 | 0 | } |
468 | | |
469 | | bool supports_dictionary_pruning(const ParquetColumnSchema& column_schema, |
470 | | const ::parquet::ColumnChunkMetaData& column_metadata, |
471 | 57 | const format::FileColumnPredicateFilter& column_filter) { |
472 | 57 | if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || |
473 | 57 | column_schema.descriptor == nullptr || column_schema.type == nullptr) { |
474 | 0 | return false; |
475 | 0 | } |
476 | 57 | if (!column_schema.type_descriptor.is_string_like) { |
477 | 26 | return false; |
478 | 26 | } |
479 | 31 | if (column_metadata.type() != ::parquet::Type::BYTE_ARRAY && |
480 | 31 | column_metadata.type() != ::parquet::Type::FIXED_LEN_BYTE_ARRAY) { |
481 | 0 | return false; |
482 | 0 | } |
483 | 31 | for (const auto& column_predicate : column_filter.predicates) { |
484 | 31 | if (column_predicate == nullptr || !is_supported_dictionary_predicate(*column_predicate)) { |
485 | 0 | return false; |
486 | 0 | } |
487 | 31 | } |
488 | 31 | return true; |
489 | 31 | } |
490 | | |
491 | | struct OwnedDictionaryWords { |
492 | | std::vector<std::string> values; |
493 | | std::vector<StringRef> refs; |
494 | | |
495 | 27 | void clear() { |
496 | 27 | values.clear(); |
497 | 27 | refs.clear(); |
498 | 27 | } |
499 | | |
500 | 27 | void build_refs() { |
501 | 27 | refs.reserve(values.size()); |
502 | 35 | for (const auto& value : values) { |
503 | 35 | refs.emplace_back(value.data(), value.size()); |
504 | 35 | } |
505 | 27 | } |
506 | | }; |
507 | | |
508 | | bool read_dictionary_words(::parquet::ParquetFileReader* file_reader, int row_group_idx, |
509 | | int leaf_column_id, const ParquetColumnSchema& column_schema, |
510 | 27 | OwnedDictionaryWords* dict_words) { |
511 | 27 | DORIS_CHECK(dict_words != nullptr); |
512 | 27 | dict_words->clear(); |
513 | 27 | if (file_reader == nullptr || leaf_column_id < 0) { |
514 | 0 | return false; |
515 | 0 | } |
516 | | |
517 | 27 | auto row_group_reader = file_reader->RowGroup(row_group_idx); |
518 | 27 | if (row_group_reader == nullptr) { |
519 | 0 | return false; |
520 | 0 | } |
521 | 27 | auto page_reader = row_group_reader->GetColumnPageReader(leaf_column_id); |
522 | 27 | if (page_reader == nullptr) { |
523 | 0 | return false; |
524 | 0 | } |
525 | | |
526 | 27 | std::shared_ptr<::parquet::Page> page; |
527 | 27 | try { |
528 | 27 | page = page_reader->NextPage(); |
529 | 27 | } catch (const ::parquet::ParquetException&) { |
530 | 0 | return false; |
531 | 0 | } catch (const std::exception&) { |
532 | 0 | return false; |
533 | 0 | } |
534 | 27 | if (page == nullptr || page->type() != ::parquet::PageType::DICTIONARY_PAGE) { |
535 | 0 | return false; |
536 | 0 | } |
537 | 27 | const auto* dictionary_page = static_cast<const ::parquet::DictionaryPage*>(page.get()); |
538 | 27 | if (dictionary_page->encoding() != ::parquet::Encoding::PLAIN && |
539 | 27 | dictionary_page->encoding() != ::parquet::Encoding::PLAIN_DICTIONARY) { |
540 | 0 | return false; |
541 | 0 | } |
542 | 27 | const int32_t dictionary_length = dictionary_page->num_values(); |
543 | 27 | if (dictionary_length <= 0) { |
544 | 0 | return false; |
545 | 0 | } |
546 | 27 | const auto* dictionary_data = dictionary_page->data(); |
547 | 27 | const int dictionary_size = dictionary_page->size(); |
548 | | |
549 | 27 | dict_words->values.reserve(static_cast<size_t>(dictionary_length)); |
550 | 27 | if (column_schema.descriptor->physical_type() == ::parquet::Type::BYTE_ARRAY) { |
551 | 27 | auto decoder = ::parquet::MakeTypedDecoder<::parquet::ByteArrayType>( |
552 | 27 | ::parquet::Encoding::PLAIN, column_schema.descriptor); |
553 | 27 | decoder->SetData(dictionary_length, dictionary_data, dictionary_size); |
554 | 27 | std::vector<::parquet::ByteArray> byte_array_values(static_cast<size_t>(dictionary_length)); |
555 | 27 | if (decoder->Decode(byte_array_values.data(), dictionary_length) != dictionary_length) { |
556 | 0 | return false; |
557 | 0 | } |
558 | 62 | for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) { |
559 | 35 | dict_words->values.emplace_back( |
560 | 35 | reinterpret_cast<const char*>(byte_array_values[dict_idx].ptr), |
561 | 35 | byte_array_values[dict_idx].len); |
562 | 35 | } |
563 | 27 | dict_words->build_refs(); |
564 | 27 | return true; |
565 | 27 | } |
566 | 0 | if (column_schema.descriptor->physical_type() == ::parquet::Type::FIXED_LEN_BYTE_ARRAY) { |
567 | 0 | const int type_length = column_schema.descriptor->type_length(); |
568 | 0 | if (type_length <= 0) { |
569 | 0 | return false; |
570 | 0 | } |
571 | 0 | auto decoder = ::parquet::MakeTypedDecoder<::parquet::FLBAType>(::parquet::Encoding::PLAIN, |
572 | 0 | column_schema.descriptor); |
573 | 0 | decoder->SetData(dictionary_length, dictionary_data, dictionary_size); |
574 | 0 | std::vector<::parquet::FixedLenByteArray> flba_values( |
575 | 0 | static_cast<size_t>(dictionary_length)); |
576 | 0 | if (decoder->Decode(flba_values.data(), dictionary_length) != dictionary_length) { |
577 | 0 | return false; |
578 | 0 | } |
579 | 0 | for (int32_t dict_idx = 0; dict_idx < dictionary_length; ++dict_idx) { |
580 | 0 | dict_words->values.emplace_back( |
581 | 0 | reinterpret_cast<const char*>(flba_values[dict_idx].ptr), type_length); |
582 | 0 | } |
583 | 0 | dict_words->build_refs(); |
584 | 0 | return true; |
585 | 0 | } |
586 | 0 | return false; |
587 | 0 | } |
588 | | |
589 | 197 | segment_v2::ZoneMap to_column_predicate_statistics(const ParquetColumnStatistics& statistics) { |
590 | 197 | segment_v2::ZoneMap predicate_statistics; |
591 | 197 | predicate_statistics.min_value = statistics.min_value; |
592 | 197 | predicate_statistics.max_value = statistics.max_value; |
593 | 197 | predicate_statistics.has_null = statistics.has_null; |
594 | 197 | predicate_statistics.has_not_null = statistics.has_not_null; |
595 | 197 | return predicate_statistics; |
596 | 197 | } |
597 | | |
598 | | const ParquetColumnSchema* resolve_predicate_leaf_schema( |
599 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, |
600 | 150 | const format::FileColumnPredicateFilter& column_filter) { |
601 | 150 | const auto file_column_id = column_filter.file_column_id; |
602 | 150 | if (!file_column_id.is_valid() || file_column_id.value() >= static_cast<int>(schema.size())) { |
603 | 0 | return nullptr; |
604 | 0 | } |
605 | 150 | const ParquetColumnSchema* column_schema = schema[file_column_id.value()].get(); |
606 | 150 | if (column_schema == nullptr) { |
607 | 0 | return nullptr; |
608 | 0 | } |
609 | 150 | if (column_schema->kind != ParquetColumnSchemaKind::PRIMITIVE || |
610 | 150 | column_schema->leaf_column_id < 0 || column_schema->max_repetition_level > 0) { |
611 | 11 | return nullptr; |
612 | 11 | } |
613 | 139 | return column_schema; |
614 | 150 | } |
615 | | |
616 | | bool check_statistics(const format::FileColumnPredicateFilter& column_filter, |
617 | 228 | const ParquetColumnStatistics& statistics) { |
618 | 228 | if (!statistics.has_any_statistics()) { |
619 | 31 | return false; |
620 | 31 | } |
621 | | |
622 | 197 | for (const auto& column_predicate : column_filter.predicates) { |
623 | 197 | if (is_null_only_predicate(*column_predicate)) { |
624 | 6 | if (!statistics.has_null_count) { |
625 | 0 | continue; |
626 | 0 | } |
627 | 191 | } else if (!statistics.has_any_statistics()) { |
628 | 0 | continue; |
629 | 0 | } |
630 | 197 | if (!column_predicate->evaluate_and(to_column_predicate_statistics(statistics))) { |
631 | 87 | return true; |
632 | 87 | } |
633 | 197 | } |
634 | 110 | return false; |
635 | 197 | } |
636 | | |
637 | | } // namespace |
638 | | |
639 | | ParquetColumnStatistics ParquetStatisticsUtils::TransformColumnStatistics( |
640 | | const ParquetColumnSchema& column_schema, |
641 | 109 | const std::shared_ptr<::parquet::Statistics>& statistics, const cctz::time_zone* timezone) { |
642 | 109 | ParquetColumnStatistics result; |
643 | 109 | if (statistics == nullptr) { |
644 | 33 | return result; |
645 | 33 | } |
646 | | |
647 | 76 | result.has_null = statistics->HasNullCount() && statistics->null_count() > 0; |
648 | 76 | result.has_not_null = statistics->num_values() > 0 || statistics->HasMinMax(); |
649 | 76 | result.has_null_count = statistics->HasNullCount(); |
650 | 76 | if (!result.has_not_null || !statistics->HasMinMax()) { |
651 | 4 | return result; |
652 | 4 | } |
653 | | |
654 | 72 | DORIS_CHECK(column_schema.type != nullptr); |
655 | 72 | switch (statistics->physical_type()) { |
656 | 0 | case ::parquet::Type::BOOLEAN: |
657 | 0 | result.has_min_max = set_decoded_min_max<::parquet::BooleanType>( |
658 | 0 | statistics, column_schema, DecodedValueKind::BOOL, &result, timezone); |
659 | 0 | return result; |
660 | 68 | case ::parquet::Type::INT32: |
661 | 68 | result.has_min_max = set_decoded_min_max<::parquet::Int32Type>( |
662 | 68 | statistics, column_schema, decoded_value_kind(column_schema.type_descriptor), |
663 | 68 | &result, timezone); |
664 | 68 | return result; |
665 | 1 | case ::parquet::Type::INT64: |
666 | 1 | result.has_min_max = set_decoded_min_max<::parquet::Int64Type>( |
667 | 1 | statistics, column_schema, decoded_value_kind(column_schema.type_descriptor), |
668 | 1 | &result, timezone); |
669 | 1 | return result; |
670 | 0 | case ::parquet::Type::FLOAT: |
671 | 0 | result.has_min_max = set_decoded_min_max<::parquet::FloatType>( |
672 | 0 | statistics, column_schema, DecodedValueKind::FLOAT, &result, timezone); |
673 | 0 | return result; |
674 | 0 | case ::parquet::Type::DOUBLE: |
675 | 0 | result.has_min_max = set_decoded_min_max<::parquet::DoubleType>( |
676 | 0 | statistics, column_schema, DecodedValueKind::DOUBLE, &result, timezone); |
677 | 0 | return result; |
678 | 3 | case ::parquet::Type::BYTE_ARRAY: |
679 | 3 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: |
680 | 3 | result.has_min_max = set_string_min_max(statistics, column_schema, &result, timezone); |
681 | 3 | return result; |
682 | 0 | default: |
683 | 0 | return result; |
684 | 72 | } |
685 | 72 | } |
686 | | |
687 | | namespace { |
688 | | |
689 | | ParquetRowGroupPruneReason row_group_prune_reason( |
690 | | const ::parquet::RowGroupMetaData& row_group, ::parquet::ParquetFileReader* file_reader, |
691 | | int row_group_idx, const std::vector<std::unique_ptr<ParquetColumnSchema>>& schema, |
692 | | const format::FileColumnPredicateFilter& column_filter, |
693 | | RowGroupBloomFilterCache* bloom_filter_cache, ParquetPruningStats* pruning_stats, |
694 | 93 | const cctz::time_zone* timezone) { |
695 | 93 | if (column_filter.predicates.empty()) { |
696 | 0 | return ParquetRowGroupPruneReason::NONE; |
697 | 0 | } |
698 | 93 | const auto* column_schema = resolve_predicate_leaf_schema(schema, column_filter); |
699 | 93 | if (column_schema == nullptr) { |
700 | 9 | return ParquetRowGroupPruneReason::NONE; |
701 | 9 | } |
702 | 93 | DCHECK_LT(column_schema->leaf_column_id, row_group.num_columns()); |
703 | 84 | auto column_chunk = row_group.ColumnChunk(column_schema->leaf_column_id); |
704 | 84 | if (column_chunk == nullptr) { |
705 | 0 | return ParquetRowGroupPruneReason::NONE; |
706 | 0 | } |
707 | 84 | if (check_statistics(column_filter, |
708 | 84 | ParquetStatisticsUtils::TransformColumnStatistics( |
709 | 84 | *column_schema, column_chunk->statistics(), timezone))) { |
710 | 27 | return ParquetRowGroupPruneReason::STATISTICS; |
711 | 27 | } |
712 | 57 | if (!supports_dictionary_pruning(*column_schema, *column_chunk, column_filter) || |
713 | 57 | !is_dictionary_encoded_chunk(*column_chunk)) { |
714 | 30 | return bloom_filter_prune_reason(row_group_idx, schema, column_filter, bloom_filter_cache, |
715 | 30 | pruning_stats); |
716 | 30 | } |
717 | 27 | OwnedDictionaryWords dict_words; |
718 | 27 | if (!read_dictionary_words(file_reader, row_group_idx, column_schema->leaf_column_id, |
719 | 27 | *column_schema, &dict_words)) { |
720 | 0 | return bloom_filter_prune_reason(row_group_idx, schema, column_filter, bloom_filter_cache, |
721 | 0 | pruning_stats); |
722 | 0 | } |
723 | 27 | for (const auto& column_predicate : column_filter.predicates) { |
724 | 27 | if (!column_predicate->evaluate_and(dict_words.refs.data(), dict_words.refs.size())) { |
725 | 18 | return ParquetRowGroupPruneReason::DICTIONARY; |
726 | 18 | } |
727 | 27 | } |
728 | 9 | return bloom_filter_prune_reason(row_group_idx, schema, column_filter, bloom_filter_cache, |
729 | 9 | pruning_stats); |
730 | 27 | } |
731 | | |
732 | | void init_bloom_filter_cache(::parquet::ParquetFileReader* file_reader, bool enable_bloom_filter, |
733 | 198 | RowGroupBloomFilterCache* bloom_filter_cache) { |
734 | 198 | DORIS_CHECK(bloom_filter_cache != nullptr); |
735 | 198 | if (!enable_bloom_filter || file_reader == nullptr) { |
736 | 42 | return; |
737 | 42 | } |
738 | 156 | try { |
739 | 156 | bloom_filter_cache->bloom_filter_reader = &file_reader->GetBloomFilterReader(); |
740 | 156 | } catch (const ::parquet::ParquetException&) { |
741 | 0 | bloom_filter_cache->bloom_filter_reader = nullptr; |
742 | 0 | } catch (const std::exception&) { |
743 | 0 | bloom_filter_cache->bloom_filter_reader = nullptr; |
744 | 0 | } |
745 | 156 | } |
746 | | |
747 | | Status select_row_groups(const ::parquet::FileMetaData& metadata, |
748 | | ::parquet::ParquetFileReader* file_reader, |
749 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
750 | | const format::FileScanRequest& request, |
751 | | const std::vector<int>* candidate_row_groups, |
752 | | std::vector<int>* selected_row_groups, bool enable_bloom_filter, |
753 | 124 | ParquetPruningStats* pruning_stats, const cctz::time_zone* timezone) { |
754 | 124 | int64_t row_group_filter_time_sink = 0; |
755 | 124 | SCOPED_RAW_TIMER(pruning_stats == nullptr ? &row_group_filter_time_sink |
756 | 124 | : &pruning_stats->row_group_filter_time); |
757 | 124 | if (selected_row_groups == nullptr) { |
758 | 0 | return Status::InvalidArgument("selected_row_groups is null"); |
759 | 0 | } |
760 | 124 | selected_row_groups->clear(); |
761 | | |
762 | 124 | const int num_row_groups = metadata.num_row_groups(); |
763 | 124 | if (pruning_stats != nullptr) { |
764 | 124 | pruning_stats->total_row_groups = num_row_groups; |
765 | 124 | } |
766 | 124 | const auto candidate_size = candidate_row_groups == nullptr |
767 | 124 | ? static_cast<size_t>(num_row_groups) |
768 | 124 | : candidate_row_groups->size(); |
769 | 124 | selected_row_groups->reserve(candidate_size); |
770 | 322 | for (size_t candidate_idx = 0; candidate_idx < candidate_size; ++candidate_idx) { |
771 | 198 | const int row_group_idx = candidate_row_groups == nullptr |
772 | 198 | ? static_cast<int>(candidate_idx) |
773 | 198 | : (*candidate_row_groups)[candidate_idx]; |
774 | 198 | DORIS_CHECK(row_group_idx >= 0); |
775 | 198 | DORIS_CHECK(row_group_idx < num_row_groups); |
776 | 198 | auto row_group = metadata.RowGroup(row_group_idx); |
777 | 198 | if (row_group == nullptr) { |
778 | 0 | selected_row_groups->push_back(row_group_idx); |
779 | 0 | continue; |
780 | 0 | } |
781 | 198 | bool drop = false; |
782 | 198 | RowGroupBloomFilterCache bloom_filter_cache; |
783 | 198 | init_bloom_filter_cache(file_reader, enable_bloom_filter, &bloom_filter_cache); |
784 | 198 | for (const auto& column_filter : request.column_predicate_filters) { |
785 | 93 | const auto prune_reason = row_group_prune_reason( |
786 | 93 | *row_group, file_reader, row_group_idx, file_schema, column_filter, |
787 | 93 | &bloom_filter_cache, pruning_stats, timezone); |
788 | 93 | if (prune_reason == ParquetRowGroupPruneReason::NONE) { |
789 | 48 | continue; |
790 | 48 | } |
791 | 45 | drop = true; |
792 | 45 | if (pruning_stats != nullptr) { |
793 | 45 | pruning_stats->filtered_group_rows += row_group->num_rows(); |
794 | 45 | if (prune_reason == ParquetRowGroupPruneReason::STATISTICS) { |
795 | 27 | ++pruning_stats->filtered_row_groups_by_statistics; |
796 | 27 | } else if (prune_reason == ParquetRowGroupPruneReason::DICTIONARY) { |
797 | 18 | ++pruning_stats->filtered_row_groups_by_dictionary; |
798 | 18 | } else if (prune_reason == ParquetRowGroupPruneReason::BLOOM_FILTER) { |
799 | 0 | ++pruning_stats->filtered_row_groups_by_bloom_filter; |
800 | 0 | } |
801 | 45 | break; |
802 | 45 | } |
803 | 0 | break; |
804 | 45 | } |
805 | 198 | if (drop) { |
806 | 45 | continue; |
807 | 45 | } |
808 | 153 | selected_row_groups->push_back(row_group_idx); |
809 | 153 | } |
810 | 124 | return Status::OK(); |
811 | 124 | } |
812 | | |
813 | | } // namespace |
814 | | |
815 | | bool ParquetStatisticsUtils::BloomFilterExcludes( |
816 | | const ParquetColumnSchema& column_schema, |
817 | | const format::FileColumnPredicateFilter& column_filter, |
818 | 10 | const ::parquet::BloomFilter& bloom_filter) { |
819 | 10 | return bloom_filter_excludes(column_schema, column_filter, bloom_filter); |
820 | 10 | } |
821 | | |
822 | | Status select_row_groups_by_statistics( |
823 | | const ::parquet::FileMetaData& metadata, ::parquet::ParquetFileReader* file_reader, |
824 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
825 | | const format::FileScanRequest& request, const std::vector<int>* candidate_row_groups, |
826 | | std::vector<int>* selected_row_groups, bool enable_bloom_filter, |
827 | 124 | ParquetPruningStats* pruning_stats, const cctz::time_zone* timezone) { |
828 | 124 | return select_row_groups(metadata, file_reader, file_schema, request, candidate_row_groups, |
829 | 124 | selected_row_groups, enable_bloom_filter, pruning_stats, timezone); |
830 | 124 | } |
831 | | |
832 | | namespace { |
833 | | |
834 | | template <typename ParquetDType> |
835 | | bool set_page_decoded_min_max(const std::shared_ptr<::parquet::ColumnIndex>& column_index, |
836 | | const ParquetColumnSchema& column_schema, size_t page_idx, |
837 | | DecodedValueKind value_kind, ParquetColumnStatistics* page_statistics, |
838 | 144 | const cctz::time_zone* timezone) { |
839 | 144 | const auto typed_index = |
840 | 144 | std::static_pointer_cast<::parquet::TypedColumnIndex<ParquetDType>>(column_index); |
841 | 144 | if (page_idx >= typed_index->min_values().size() || |
842 | 144 | page_idx >= typed_index->max_values().size()) { |
843 | 0 | return false; |
844 | 0 | } |
845 | 144 | if (!set_decoded_field(column_schema, value_kind, typed_index->min_values()[page_idx], |
846 | 144 | &page_statistics->min_value, timezone) || |
847 | 144 | !set_decoded_field(column_schema, value_kind, typed_index->max_values()[page_idx], |
848 | 144 | &page_statistics->max_value, timezone)) { |
849 | 0 | return false; |
850 | 0 | } |
851 | 144 | page_statistics->has_min_max = true; |
852 | 144 | return true; |
853 | 144 | } Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE0EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE1EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE Line | Count | Source | 838 | 144 | const cctz::time_zone* timezone) { | 839 | 144 | const auto typed_index = | 840 | 144 | std::static_pointer_cast<::parquet::TypedColumnIndex<ParquetDType>>(column_index); | 841 | 144 | if (page_idx >= typed_index->min_values().size() || | 842 | 144 | page_idx >= typed_index->max_values().size()) { | 843 | 0 | return false; | 844 | 0 | } | 845 | 144 | if (!set_decoded_field(column_schema, value_kind, typed_index->min_values()[page_idx], | 846 | 144 | &page_statistics->min_value, timezone) || | 847 | 144 | !set_decoded_field(column_schema, value_kind, typed_index->max_values()[page_idx], | 848 | 144 | &page_statistics->max_value, timezone)) { | 849 | 0 | return false; | 850 | 0 | } | 851 | 144 | page_statistics->has_min_max = true; | 852 | 144 | return true; | 853 | 144 | } |
Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE2EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE4EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE Unexecuted instantiation: parquet_statistics.cpp:_ZN5doris6format7parquet12_GLOBAL__N_124set_page_decoded_min_maxIN7parquet12PhysicalTypeILNS4_4Type4typeE5EEEEEbRKSt10shared_ptrINS4_11ColumnIndexEERKNS1_19ParquetColumnSchemaEmNS_16DecodedValueKindEPNS1_23ParquetColumnStatisticsEPKN4cctz9time_zoneE |
854 | | |
855 | | bool set_page_string_min_max(const std::shared_ptr<::parquet::ColumnIndex>& column_index, |
856 | | const ParquetColumnSchema& column_schema, size_t page_idx, |
857 | | ParquetColumnStatistics* page_statistics, |
858 | 0 | const cctz::time_zone* timezone) { |
859 | 0 | switch (column_schema.descriptor->physical_type()) { |
860 | 0 | case ::parquet::Type::BYTE_ARRAY: { |
861 | 0 | const auto typed_index = |
862 | 0 | std::static_pointer_cast<::parquet::ByteArrayColumnIndex>(column_index); |
863 | 0 | if (page_idx >= typed_index->min_values().size() || |
864 | 0 | page_idx >= typed_index->max_values().size()) { |
865 | 0 | return false; |
866 | 0 | } |
867 | 0 | const auto min = ::parquet::ByteArrayToString(typed_index->min_values()[page_idx]); |
868 | 0 | const auto max = ::parquet::ByteArrayToString(typed_index->max_values()[page_idx]); |
869 | 0 | if (!set_decoded_binary_field(column_schema, DecodedValueKind::BINARY, |
870 | 0 | StringRef(min.data(), min.size()), |
871 | 0 | &page_statistics->min_value, timezone) || |
872 | 0 | !set_decoded_binary_field(column_schema, DecodedValueKind::BINARY, |
873 | 0 | StringRef(max.data(), max.size()), |
874 | 0 | &page_statistics->max_value, timezone)) { |
875 | 0 | return false; |
876 | 0 | } |
877 | 0 | page_statistics->has_min_max = true; |
878 | 0 | return true; |
879 | 0 | } |
880 | 0 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: { |
881 | 0 | const int type_length = column_schema.descriptor->type_length(); |
882 | 0 | if (type_length <= 0) { |
883 | 0 | return false; |
884 | 0 | } |
885 | 0 | const auto typed_index = std::static_pointer_cast<::parquet::FLBAColumnIndex>(column_index); |
886 | 0 | if (page_idx >= typed_index->min_values().size() || |
887 | 0 | page_idx >= typed_index->max_values().size()) { |
888 | 0 | return false; |
889 | 0 | } |
890 | 0 | const std::string min( |
891 | 0 | reinterpret_cast<const char*>(typed_index->min_values()[page_idx].ptr), |
892 | 0 | type_length); |
893 | 0 | const std::string max( |
894 | 0 | reinterpret_cast<const char*>(typed_index->max_values()[page_idx].ptr), |
895 | 0 | type_length); |
896 | 0 | if (!set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY, |
897 | 0 | StringRef(min.data(), min.size()), |
898 | 0 | &page_statistics->min_value, timezone) || |
899 | 0 | !set_decoded_binary_field(column_schema, DecodedValueKind::FIXED_BINARY, |
900 | 0 | StringRef(max.data(), max.size()), |
901 | 0 | &page_statistics->max_value, timezone)) { |
902 | 0 | return false; |
903 | 0 | } |
904 | 0 | page_statistics->has_min_max = true; |
905 | 0 | return true; |
906 | 0 | } |
907 | 0 | default: |
908 | 0 | return false; |
909 | 0 | } |
910 | 0 | } |
911 | | |
912 | | bool set_page_min_max(const std::shared_ptr<::parquet::ColumnIndex>& column_index, |
913 | | const ParquetColumnSchema& column_schema, size_t page_idx, |
914 | 144 | ParquetColumnStatistics* page_statistics, const cctz::time_zone* timezone) { |
915 | 144 | DORIS_CHECK(column_schema.type != nullptr); |
916 | 144 | switch (column_schema.descriptor->physical_type()) { |
917 | 0 | case ::parquet::Type::BOOLEAN: |
918 | 0 | return set_page_decoded_min_max<::parquet::BooleanType>(column_index, column_schema, |
919 | 0 | page_idx, DecodedValueKind::BOOL, |
920 | 0 | page_statistics, timezone); |
921 | 144 | case ::parquet::Type::INT32: |
922 | 144 | return set_page_decoded_min_max<::parquet::Int32Type>( |
923 | 144 | column_index, column_schema, page_idx, |
924 | 144 | decoded_value_kind(column_schema.type_descriptor), page_statistics, timezone); |
925 | 0 | case ::parquet::Type::INT64: |
926 | 0 | return set_page_decoded_min_max<::parquet::Int64Type>( |
927 | 0 | column_index, column_schema, page_idx, |
928 | 0 | decoded_value_kind(column_schema.type_descriptor), page_statistics, timezone); |
929 | 0 | case ::parquet::Type::FLOAT: |
930 | 0 | return set_page_decoded_min_max<::parquet::FloatType>(column_index, column_schema, page_idx, |
931 | 0 | DecodedValueKind::FLOAT, |
932 | 0 | page_statistics, timezone); |
933 | 0 | case ::parquet::Type::DOUBLE: |
934 | 0 | return set_page_decoded_min_max<::parquet::DoubleType>(column_index, column_schema, |
935 | 0 | page_idx, DecodedValueKind::DOUBLE, |
936 | 0 | page_statistics, timezone); |
937 | 0 | case ::parquet::Type::BYTE_ARRAY: |
938 | 0 | case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: |
939 | 0 | return set_page_string_min_max(column_index, column_schema, page_idx, page_statistics, |
940 | 0 | timezone); |
941 | 0 | default: |
942 | 0 | return false; |
943 | 144 | } |
944 | 144 | } |
945 | | |
946 | | bool build_page_statistics(const std::shared_ptr<::parquet::ColumnIndex>& column_index, |
947 | | const ParquetColumnSchema& column_schema, size_t page_idx, |
948 | | ParquetColumnStatistics* page_statistics, |
949 | 144 | const cctz::time_zone* timezone) { |
950 | 144 | DORIS_CHECK(page_statistics != nullptr); |
951 | 144 | *page_statistics = ParquetColumnStatistics {}; |
952 | | |
953 | 144 | const auto& null_pages = column_index->null_pages(); |
954 | 144 | if (!column_index->has_null_counts() || page_idx >= null_pages.size() || |
955 | 144 | page_idx >= column_index->null_counts().size()) { |
956 | 0 | return false; |
957 | 0 | } |
958 | | |
959 | 144 | page_statistics->has_null_count = true; |
960 | 144 | page_statistics->has_null = column_index->null_counts()[page_idx] > 0; |
961 | 144 | page_statistics->has_not_null = !null_pages[page_idx]; |
962 | 144 | if (!page_statistics->has_not_null) { |
963 | 0 | return true; |
964 | 0 | } |
965 | 144 | return set_page_min_max(column_index, column_schema, page_idx, page_statistics, timezone); |
966 | 144 | } |
967 | | |
968 | | std::vector<RowRange> intersect_ranges(const std::vector<RowRange>& left, |
969 | 9 | const std::vector<RowRange>& right) { |
970 | 9 | std::vector<RowRange> result; |
971 | 9 | size_t left_idx = 0; |
972 | 9 | size_t right_idx = 0; |
973 | 18 | while (left_idx < left.size() && right_idx < right.size()) { |
974 | 9 | const int64_t left_start = left[left_idx].start; |
975 | 9 | const int64_t left_end = left_start + left[left_idx].length; |
976 | 9 | const int64_t right_start = right[right_idx].start; |
977 | 9 | const int64_t right_end = right_start + right[right_idx].length; |
978 | 9 | const int64_t start = std::max(left_start, right_start); |
979 | 9 | const int64_t end = std::min(left_end, right_end); |
980 | 9 | if (start < end) { |
981 | 8 | result.push_back(RowRange {start, end - start}); |
982 | 8 | } |
983 | 9 | if (left_end < right_end) { |
984 | 0 | ++left_idx; |
985 | 9 | } else { |
986 | 9 | ++right_idx; |
987 | 9 | } |
988 | 9 | } |
989 | 9 | return result; |
990 | 9 | } |
991 | | |
992 | 7 | int64_t count_range_rows(const std::vector<RowRange>& ranges) { |
993 | 7 | int64_t rows = 0; |
994 | 7 | for (const auto& range : ranges) { |
995 | 7 | rows += range.length; |
996 | 7 | } |
997 | 7 | return rows; |
998 | 7 | } |
999 | | |
1000 | | RowRange page_row_range(const ::parquet::OffsetIndex& offset_index, size_t page_idx, |
1001 | 272 | int64_t row_group_rows) { |
1002 | 272 | const auto& page_locations = offset_index.page_locations(); |
1003 | 272 | const int64_t start = page_locations[page_idx].first_row_index; |
1004 | 272 | const int64_t end = page_idx + 1 == page_locations.size() |
1005 | 272 | ? row_group_rows |
1006 | 272 | : page_locations[page_idx + 1].first_row_index; |
1007 | 272 | DORIS_CHECK(start >= 0); |
1008 | 272 | DORIS_CHECK(end >= start); |
1009 | 272 | DORIS_CHECK(end <= row_group_rows); |
1010 | 272 | return RowRange {start, end - start}; |
1011 | 272 | } |
1012 | | |
1013 | 144 | void append_row_range(const RowRange& range, std::vector<RowRange>* ranges) { |
1014 | 144 | if (range.length == 0) { |
1015 | 0 | return; |
1016 | 0 | } |
1017 | 144 | if (!ranges->empty()) { |
1018 | 127 | auto& previous = ranges->back(); |
1019 | 127 | if (previous.start + previous.length == range.start) { |
1020 | 125 | previous.length += range.length; |
1021 | 125 | return; |
1022 | 125 | } |
1023 | 127 | } |
1024 | 19 | ranges->push_back(range); |
1025 | 19 | } |
1026 | | |
1027 | | bool select_ranges_for_filter(const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group, |
1028 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
1029 | | const format::FileColumnPredicateFilter& column_filter, |
1030 | | int64_t row_group_rows, std::vector<RowRange>* ranges, |
1031 | 10 | const cctz::time_zone* timezone) { |
1032 | 10 | if (column_filter.predicates.empty()) { |
1033 | 0 | return false; |
1034 | 0 | } |
1035 | 10 | const auto* column_schema = resolve_predicate_leaf_schema(file_schema, column_filter); |
1036 | 10 | if (column_schema == nullptr || column_schema->descriptor == nullptr) { |
1037 | 1 | return false; |
1038 | 1 | } |
1039 | | |
1040 | 9 | std::shared_ptr<::parquet::ColumnIndex> column_index; |
1041 | 9 | std::shared_ptr<::parquet::OffsetIndex> offset_index; |
1042 | 9 | try { |
1043 | 9 | column_index = row_group->GetColumnIndex(column_schema->leaf_column_id); |
1044 | 9 | offset_index = row_group->GetOffsetIndex(column_schema->leaf_column_id); |
1045 | 9 | } catch (const ::parquet::ParquetException&) { |
1046 | 0 | return false; |
1047 | 0 | } catch (const std::exception&) { |
1048 | 0 | return false; |
1049 | 0 | } |
1050 | 9 | if (column_index == nullptr || offset_index == nullptr || |
1051 | 9 | column_index->null_pages().size() != offset_index->page_locations().size()) { |
1052 | 0 | return false; |
1053 | 0 | } |
1054 | | |
1055 | 9 | ranges->clear(); |
1056 | 9 | const auto page_count = offset_index->page_locations().size(); |
1057 | 153 | for (size_t page_idx = 0; page_idx < page_count; ++page_idx) { |
1058 | 144 | ParquetColumnStatistics page_statistics; |
1059 | 144 | if (!build_page_statistics(column_index, *column_schema, page_idx, &page_statistics, |
1060 | 144 | timezone)) { |
1061 | 0 | ranges->clear(); |
1062 | 0 | return false; |
1063 | 0 | } |
1064 | 144 | const RowRange row_range = page_row_range(*offset_index, page_idx, row_group_rows); |
1065 | 144 | if (check_statistics(column_filter, page_statistics)) { |
1066 | 60 | continue; |
1067 | 60 | } |
1068 | 84 | append_row_range(row_range, ranges); |
1069 | 84 | } |
1070 | 9 | return true; |
1071 | 9 | } |
1072 | | |
1073 | 128 | bool ranges_intersect(const std::vector<RowRange>& ranges, const RowRange& range) { |
1074 | 128 | const int64_t range_end = range.start + range.length; |
1075 | 128 | for (const auto& selected_range : ranges) { |
1076 | 128 | const int64_t selected_end = selected_range.start + selected_range.length; |
1077 | 128 | if (selected_end <= range.start) { |
1078 | 8 | continue; |
1079 | 8 | } |
1080 | 120 | if (selected_range.start >= range_end) { |
1081 | 52 | return false; |
1082 | 52 | } |
1083 | 68 | return true; |
1084 | 120 | } |
1085 | 8 | return false; |
1086 | 128 | } |
1087 | | |
1088 | | void collect_leaf_schemas(const ParquetColumnSchema& column_schema, |
1089 | | const format::LocalColumnIndex* projection, |
1090 | 3 | std::vector<const ParquetColumnSchema*>* leaf_schemas) { |
1091 | 3 | if (column_schema.kind == ParquetColumnSchemaKind::PRIMITIVE) { |
1092 | 3 | leaf_schemas->push_back(&column_schema); |
1093 | 3 | return; |
1094 | 3 | } |
1095 | 0 | for (const auto& child_schema : column_schema.children) { |
1096 | 0 | if (!format::is_child_projected(projection, child_schema->local_id)) { |
1097 | 0 | continue; |
1098 | 0 | } |
1099 | 0 | const auto* child_projection = |
1100 | 0 | format::find_child_projection(projection, child_schema->local_id); |
1101 | 0 | collect_leaf_schemas(*child_schema, child_projection, leaf_schemas); |
1102 | 0 | } |
1103 | 0 | } |
1104 | | |
1105 | | void collect_request_leaf_schemas( |
1106 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
1107 | | const format::FileScanRequest& request, |
1108 | 7 | std::vector<const ParquetColumnSchema*>* leaf_schemas) { |
1109 | 7 | std::set<int> seen_leaf_ids; |
1110 | 7 | auto collect_projection = [&](const format::LocalColumnIndex& projection) { |
1111 | 3 | const int32_t local_id = projection.local_id(); |
1112 | 3 | if (local_id < 0 || local_id >= static_cast<int32_t>(file_schema.size())) { |
1113 | 0 | return; |
1114 | 0 | } |
1115 | 3 | std::vector<const ParquetColumnSchema*> projection_leaf_schemas; |
1116 | 3 | collect_leaf_schemas(*file_schema[local_id], &projection, &projection_leaf_schemas); |
1117 | 3 | for (const auto* leaf_schema : projection_leaf_schemas) { |
1118 | 3 | DORIS_CHECK(leaf_schema != nullptr); |
1119 | 3 | if (seen_leaf_ids.insert(leaf_schema->leaf_column_id).second) { |
1120 | 3 | leaf_schemas->push_back(leaf_schema); |
1121 | 3 | } |
1122 | 3 | } |
1123 | 3 | }; |
1124 | 7 | for (const auto& projection : request.predicate_columns) { |
1125 | 1 | collect_projection(projection); |
1126 | 1 | } |
1127 | 7 | for (const auto& projection : request.non_predicate_columns) { |
1128 | 2 | collect_projection(projection); |
1129 | 2 | } |
1130 | 8 | for (const auto& column_filter : request.column_predicate_filters) { |
1131 | 8 | const auto* leaf_schema = resolve_predicate_leaf_schema(file_schema, column_filter); |
1132 | 8 | if (leaf_schema == nullptr) { |
1133 | 1 | continue; |
1134 | 1 | } |
1135 | 7 | if (seen_leaf_ids.insert(leaf_schema->leaf_column_id).second) { |
1136 | 5 | leaf_schemas->push_back(leaf_schema); |
1137 | 5 | } |
1138 | 7 | } |
1139 | 7 | } |
1140 | | |
1141 | | bool build_page_skip_plan_for_leaf( |
1142 | | const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group, |
1143 | | const ParquetColumnSchema& column_schema, const std::vector<RowRange>& selected_ranges, |
1144 | 8 | int64_t row_group_rows, ParquetPageSkipPlan* page_skip_plan) { |
1145 | 8 | DORIS_CHECK(page_skip_plan != nullptr); |
1146 | 8 | *page_skip_plan = ParquetPageSkipPlan {}; |
1147 | | // OffsetIndex first_row_index is row-based only for non-repeated leaves. LIST/MAP/repeated |
1148 | | // leaves need repetition-level-aware range mapping and are intentionally left out for now. |
1149 | 8 | if (column_schema.kind != ParquetColumnSchemaKind::PRIMITIVE || |
1150 | 8 | column_schema.descriptor == nullptr || column_schema.leaf_column_id < 0 || |
1151 | 8 | column_schema.descriptor->max_repetition_level() != 0) { |
1152 | 0 | return false; |
1153 | 0 | } |
1154 | | |
1155 | 8 | std::shared_ptr<::parquet::OffsetIndex> offset_index; |
1156 | 8 | try { |
1157 | 8 | offset_index = row_group->GetOffsetIndex(column_schema.leaf_column_id); |
1158 | 8 | } catch (const ::parquet::ParquetException&) { |
1159 | 0 | return false; |
1160 | 0 | } catch (const std::exception&) { |
1161 | 0 | return false; |
1162 | 0 | } |
1163 | 8 | if (offset_index == nullptr) { |
1164 | 0 | return false; |
1165 | 0 | } |
1166 | | |
1167 | 8 | const auto page_count = offset_index->page_locations().size(); |
1168 | 8 | page_skip_plan->leaf_column_id = column_schema.leaf_column_id; |
1169 | 8 | page_skip_plan->skipped_pages.resize(page_count); |
1170 | 8 | page_skip_plan->skipped_page_compressed_sizes.resize(page_count); |
1171 | 8 | const auto& page_locations = offset_index->page_locations(); |
1172 | 136 | for (size_t page_idx = 0; page_idx < page_count; ++page_idx) { |
1173 | 128 | const RowRange row_range = page_row_range(*offset_index, page_idx, row_group_rows); |
1174 | 128 | if (row_range.length == 0 || ranges_intersect(selected_ranges, row_range)) { |
1175 | 68 | continue; |
1176 | 68 | } |
1177 | 60 | page_skip_plan->skipped_pages[page_idx] = 1; |
1178 | 60 | page_skip_plan->skipped_page_compressed_sizes[page_idx] = |
1179 | 60 | page_locations[page_idx].compressed_page_size; |
1180 | 60 | append_row_range(row_range, &page_skip_plan->skipped_ranges); |
1181 | 60 | } |
1182 | 8 | if (page_skip_plan->empty()) { |
1183 | 0 | *page_skip_plan = ParquetPageSkipPlan {}; |
1184 | 0 | return false; |
1185 | 0 | } |
1186 | 8 | return true; |
1187 | 8 | } |
1188 | | |
1189 | | void build_page_skip_plans(const std::shared_ptr<::parquet::RowGroupPageIndexReader>& row_group, |
1190 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
1191 | | const format::FileScanRequest& request, |
1192 | | const std::vector<RowRange>& selected_ranges, int64_t row_group_rows, |
1193 | 7 | std::map<int, ParquetPageSkipPlan>* page_skip_plans) { |
1194 | 7 | DORIS_CHECK(page_skip_plans != nullptr); |
1195 | 7 | page_skip_plans->clear(); |
1196 | 7 | std::vector<const ParquetColumnSchema*> leaf_schemas; |
1197 | 7 | collect_request_leaf_schemas(file_schema, request, &leaf_schemas); |
1198 | 8 | for (const auto* leaf_schema : leaf_schemas) { |
1199 | 8 | DORIS_CHECK(leaf_schema != nullptr); |
1200 | 8 | ParquetPageSkipPlan page_skip_plan; |
1201 | 8 | if (build_page_skip_plan_for_leaf(row_group, *leaf_schema, selected_ranges, row_group_rows, |
1202 | 8 | &page_skip_plan)) { |
1203 | 8 | page_skip_plans->emplace(page_skip_plan.leaf_column_id, std::move(page_skip_plan)); |
1204 | 8 | } |
1205 | 8 | } |
1206 | 7 | } |
1207 | | |
1208 | | } // namespace |
1209 | | |
1210 | | Status select_row_group_ranges_by_page_index( |
1211 | | ::parquet::ParquetFileReader* file_reader, |
1212 | | const std::vector<std::unique_ptr<ParquetColumnSchema>>& file_schema, |
1213 | | const format::FileScanRequest& request, int row_group_idx, int64_t row_group_rows, |
1214 | | std::vector<RowRange>* selected_ranges, std::map<int, ParquetPageSkipPlan>* page_skip_plans, |
1215 | 146 | ParquetPruningStats* pruning_stats, const cctz::time_zone* timezone) { |
1216 | 146 | int64_t page_index_filter_time_sink = 0; |
1217 | 146 | SCOPED_RAW_TIMER(pruning_stats == nullptr ? &page_index_filter_time_sink |
1218 | 146 | : &pruning_stats->page_index_filter_time); |
1219 | 146 | DORIS_CHECK(selected_ranges != nullptr); |
1220 | 146 | selected_ranges->clear(); |
1221 | 146 | if (page_skip_plans != nullptr) { |
1222 | 146 | page_skip_plans->clear(); |
1223 | 146 | } |
1224 | 146 | if (row_group_rows <= 0) { |
1225 | 0 | return Status::OK(); |
1226 | 0 | } |
1227 | 146 | selected_ranges->push_back(RowRange {0, row_group_rows}); |
1228 | 146 | if (!config::enable_parquet_page_index || request.column_predicate_filters.empty() || |
1229 | 146 | file_reader == nullptr) { |
1230 | 108 | return Status::OK(); |
1231 | 108 | } |
1232 | | |
1233 | 38 | std::shared_ptr<::parquet::PageIndexReader> page_index_reader; |
1234 | 38 | std::shared_ptr<::parquet::RowGroupPageIndexReader> row_group_index_reader; |
1235 | 38 | try { |
1236 | 38 | if (pruning_stats != nullptr) { |
1237 | 38 | ++pruning_stats->page_index_read_calls; |
1238 | 38 | } |
1239 | 38 | { |
1240 | 38 | int64_t read_page_index_time_sink = 0; |
1241 | 38 | SCOPED_RAW_TIMER(pruning_stats == nullptr ? &read_page_index_time_sink |
1242 | 38 | : &pruning_stats->read_page_index_time); |
1243 | 38 | page_index_reader = file_reader->GetPageIndexReader(); |
1244 | 38 | if (page_index_reader == nullptr) { |
1245 | 0 | return Status::OK(); |
1246 | 0 | } |
1247 | 38 | row_group_index_reader = page_index_reader->RowGroup(row_group_idx); |
1248 | 38 | } |
1249 | 38 | } catch (const ::parquet::ParquetException&) { |
1250 | 0 | return Status::OK(); |
1251 | 0 | } catch (const std::exception&) { |
1252 | 0 | return Status::OK(); |
1253 | 0 | } |
1254 | 38 | if (row_group_index_reader == nullptr) { |
1255 | 30 | return Status::OK(); |
1256 | 30 | } |
1257 | | |
1258 | 10 | for (const auto& column_filter : request.column_predicate_filters) { |
1259 | 10 | std::vector<RowRange> filter_ranges; |
1260 | 10 | if (!select_ranges_for_filter(row_group_index_reader, file_schema, column_filter, |
1261 | 10 | row_group_rows, &filter_ranges, timezone)) { |
1262 | 1 | continue; |
1263 | 1 | } |
1264 | 9 | *selected_ranges = intersect_ranges(*selected_ranges, filter_ranges); |
1265 | 9 | if (selected_ranges->empty()) { |
1266 | 1 | if (page_skip_plans != nullptr) { |
1267 | 1 | page_skip_plans->clear(); |
1268 | 1 | } |
1269 | 1 | if (pruning_stats != nullptr) { |
1270 | 1 | pruning_stats->filtered_page_rows += row_group_rows; |
1271 | 1 | ++pruning_stats->filtered_row_groups_by_page_index; |
1272 | 1 | } |
1273 | 1 | return Status::OK(); |
1274 | 1 | } |
1275 | 9 | } |
1276 | 7 | if (page_skip_plans != nullptr) { |
1277 | 7 | build_page_skip_plans(row_group_index_reader, file_schema, request, *selected_ranges, |
1278 | 7 | row_group_rows, page_skip_plans); |
1279 | 7 | } |
1280 | 7 | if (pruning_stats != nullptr) { |
1281 | 7 | const int64_t selected_rows = count_range_rows(*selected_ranges); |
1282 | 7 | DORIS_CHECK(selected_rows <= row_group_rows); |
1283 | 7 | pruning_stats->filtered_page_rows += row_group_rows - selected_rows; |
1284 | 7 | } |
1285 | 7 | return Status::OK(); |
1286 | 8 | } |
1287 | | |
1288 | | } // namespace doris::format::parquet |