be/src/format/parquet/parquet_predicate.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/parquet_types.h> |
21 | | |
22 | | #include <cmath> |
23 | | #include <cstring> |
24 | | #include <vector> |
25 | | |
26 | | #include "cctz/time_zone.h" |
27 | | #include "core/data_type/data_type_decimal.h" |
28 | | #include "core/data_type/primitive_type.h" |
29 | | #include "exec/common/endian.h" |
30 | | #include "format/format_common.h" |
31 | | #include "format/parquet/parquet_block_split_bloom_filter.h" |
32 | | #include "format/parquet/parquet_column_convert.h" |
33 | | #include "format/parquet/parquet_common.h" |
34 | | #include "format/parquet/schema_desc.h" |
35 | | #include "storage/olap_scan_common.h" |
36 | | #include "storage/segment/row_ranges.h" |
37 | | #include "util/timezone_utils.h" |
38 | | |
39 | | namespace doris { |
40 | | #include "common/compile_check_begin.h" |
41 | | class ParquetPredicate { |
42 | | private: |
43 | 19.1k | static inline bool _is_ascii(uint8_t byte) { return byte < 128; } |
44 | | |
45 | 1.65k | static int _common_prefix(const std::string& encoding_min, const std::string& encoding_max) { |
46 | 1.65k | size_t min_length = std::min(encoding_min.size(), encoding_max.size()); |
47 | 1.65k | int common_length = 0; |
48 | 4.15k | while (common_length < min_length && |
49 | 4.15k | encoding_min[common_length] == encoding_max[common_length]) { |
50 | 2.49k | common_length++; |
51 | 2.49k | } |
52 | 1.65k | return common_length; |
53 | 1.65k | } |
54 | | |
55 | 2.36k | static bool _try_read_old_utf8_stats(std::string& encoding_min, std::string& encoding_max) { |
56 | 2.36k | if (encoding_min == encoding_max) { |
57 | | // If min = max, then there is a single value only |
58 | | // No need to modify, just use min |
59 | 705 | encoding_max = encoding_min; |
60 | 705 | return true; |
61 | 1.66k | } else { |
62 | 1.66k | int common_prefix_length = _common_prefix(encoding_min, encoding_max); |
63 | | |
64 | | // For min we can retain all-ASCII, because this produces a strictly lower value. |
65 | 1.66k | int min_good_length = common_prefix_length; |
66 | 16.1k | while (min_good_length < encoding_min.size() && |
67 | 16.1k | _is_ascii(static_cast<uint8_t>(encoding_min[min_good_length]))) { |
68 | 14.4k | min_good_length++; |
69 | 14.4k | } |
70 | | |
71 | | // For max we can be sure only of the part matching the min. When they differ, we can consider only one next, and only if both are ASCII |
72 | 1.66k | int max_good_length = common_prefix_length; |
73 | 1.66k | if (max_good_length < encoding_max.size() && max_good_length < encoding_min.size() && |
74 | 1.66k | _is_ascii(static_cast<uint8_t>(encoding_min[max_good_length])) && |
75 | 1.66k | _is_ascii(static_cast<uint8_t>(encoding_max[max_good_length]))) { |
76 | 1.48k | max_good_length++; |
77 | 1.48k | } |
78 | | // Incrementing 127 would overflow. Incrementing within non-ASCII can have side-effects. |
79 | 1.66k | while (max_good_length > 0 && |
80 | 1.66k | (static_cast<uint8_t>(encoding_max[max_good_length - 1]) == 127 || |
81 | 1.57k | !_is_ascii(static_cast<uint8_t>(encoding_max[max_good_length - 1])))) { |
82 | 5 | max_good_length--; |
83 | 5 | } |
84 | 1.66k | if (max_good_length == 0) { |
85 | | // We can return just min bound, but code downstream likely expects both are present or both are absent. |
86 | 88 | return false; |
87 | 88 | } |
88 | | |
89 | 1.57k | encoding_min.resize(min_good_length); |
90 | 1.57k | encoding_max.resize(max_good_length); |
91 | 1.57k | if (max_good_length > 0) { |
92 | 1.56k | encoding_max[max_good_length - 1]++; |
93 | 1.56k | } |
94 | 1.57k | return true; |
95 | 1.66k | } |
96 | 2.36k | } |
97 | | |
98 | 270 | static SortOrder _determine_sort_order(const tparquet::SchemaElement& parquet_schema) { |
99 | 270 | tparquet::Type::type physical_type = parquet_schema.type; |
100 | 270 | const tparquet::LogicalType& logical_type = parquet_schema.logicalType; |
101 | | |
102 | | // Assume string type is SortOrder::SIGNED, use ParquetPredicate::_try_read_old_utf8_stats() to handle it. |
103 | 270 | if (logical_type.__isset.STRING && |
104 | 270 | (physical_type == tparquet::Type::BYTE_ARRAY || |
105 | 0 | physical_type == tparquet::Type::FIXED_LEN_BYTE_ARRAY)) { |
106 | 0 | return SortOrder::SIGNED; |
107 | 0 | } |
108 | | |
109 | 270 | if (logical_type.__isset.INTEGER) { |
110 | 0 | if (logical_type.INTEGER.isSigned) { |
111 | 0 | return SortOrder::SIGNED; |
112 | 0 | } else { |
113 | 0 | return SortOrder::UNSIGNED; |
114 | 0 | } |
115 | 270 | } else if (logical_type.__isset.DATE) { |
116 | 0 | return SortOrder::SIGNED; |
117 | 270 | } else if (logical_type.__isset.ENUM) { |
118 | 0 | return SortOrder::UNSIGNED; |
119 | 270 | } else if (logical_type.__isset.BSON) { |
120 | 0 | return SortOrder::UNSIGNED; |
121 | 270 | } else if (logical_type.__isset.JSON) { |
122 | 0 | return SortOrder::UNSIGNED; |
123 | 270 | } else if (logical_type.__isset.STRING) { |
124 | 0 | return SortOrder::UNSIGNED; |
125 | 270 | } else if (logical_type.__isset.DECIMAL) { |
126 | 0 | return SortOrder::UNKNOWN; |
127 | 270 | } else if (logical_type.__isset.MAP) { |
128 | 0 | return SortOrder::UNKNOWN; |
129 | 270 | } else if (logical_type.__isset.LIST) { |
130 | 0 | return SortOrder::UNKNOWN; |
131 | 270 | } else if (logical_type.__isset.TIME) { |
132 | 0 | return SortOrder::SIGNED; |
133 | 270 | } else if (logical_type.__isset.TIMESTAMP) { |
134 | 0 | return SortOrder::SIGNED; |
135 | 270 | } else if (logical_type.__isset.UNKNOWN) { |
136 | 0 | return SortOrder::UNKNOWN; |
137 | 270 | } else { |
138 | 270 | switch (physical_type) { |
139 | 0 | case tparquet::Type::BOOLEAN: |
140 | 254 | case tparquet::Type::INT32: |
141 | 254 | case tparquet::Type::INT64: |
142 | 258 | case tparquet::Type::FLOAT: |
143 | 262 | case tparquet::Type::DOUBLE: |
144 | 262 | return SortOrder::SIGNED; |
145 | 4 | case tparquet::Type::BYTE_ARRAY: |
146 | 8 | case tparquet::Type::FIXED_LEN_BYTE_ARRAY: |
147 | 8 | return SortOrder::UNSIGNED; |
148 | 0 | case tparquet::Type::INT96: |
149 | 0 | return SortOrder::UNKNOWN; |
150 | 0 | default: |
151 | 0 | return SortOrder::UNKNOWN; |
152 | 270 | } |
153 | 270 | } |
154 | 270 | } |
155 | | |
156 | | public: |
157 | | static constexpr int BLOOM_FILTER_MAX_HEADER_LENGTH = 64; |
158 | | struct ColumnStat { |
159 | | std::string encoded_min_value; |
160 | | std::string encoded_max_value; |
161 | | bool has_null; |
162 | | bool is_all_null; |
163 | | const FieldSchema* col_schema; |
164 | | const cctz::time_zone* ctz; |
165 | | std::unique_ptr<ParquetBlockSplitBloomFilter> bloom_filter; |
166 | | std::function<bool(ParquetPredicate::ColumnStat*, const int)>* get_stat_func = nullptr; |
167 | | std::function<bool(ParquetPredicate::ColumnStat*, const int)>* get_bloom_filter_func = |
168 | | nullptr; |
169 | | }; |
170 | | |
171 | 48 | static bool bloom_filter_supported(PrimitiveType type) { |
172 | | // Only support types where physical type == logical type (no conversion needed) |
173 | | // For types like DATEV2, DATETIMEV2, DECIMAL, Parquet stores them in physical format |
174 | | // (INT32, INT64, etc.) but Doris uses different internal representations. |
175 | | // Bloom filter works with physical bytes, but we only have logical type values, |
176 | | // and there's no reverse conversion (logical -> physical) available. |
177 | | // TINYINT/SMALLINT also need conversion via LittleIntPhysicalConverter. |
178 | 48 | switch (type) { |
179 | 0 | case TYPE_BOOLEAN: |
180 | 12 | case TYPE_INT: |
181 | 16 | case TYPE_BIGINT: |
182 | 20 | case TYPE_FLOAT: |
183 | 24 | case TYPE_DOUBLE: |
184 | 24 | case TYPE_CHAR: |
185 | 24 | case TYPE_VARCHAR: |
186 | 28 | case TYPE_STRING: |
187 | 28 | return true; |
188 | 20 | default: |
189 | 20 | return false; |
190 | 48 | } |
191 | 48 | } |
192 | | |
193 | | struct PageIndexStat { |
194 | | // Indicates whether the page index information in this column can be used. |
195 | | bool available = false; |
196 | | int64_t num_of_pages; |
197 | | std::vector<std::string> encoded_min_value; |
198 | | std::vector<std::string> encoded_max_value; |
199 | | std::vector<bool> has_null; |
200 | | std::vector<bool> is_all_null; |
201 | | const FieldSchema* col_schema; |
202 | | |
203 | | // Record the row range corresponding to each page. |
204 | | std::vector<segment_v2::RowRange> ranges; |
205 | | }; |
206 | | |
207 | | struct CachedPageIndexStat { |
208 | | const cctz::time_zone* ctz; |
209 | | std::map<int, PageIndexStat> stats; |
210 | | std::function<bool(PageIndexStat**, int)> get_stat_func; |
211 | | RowRange row_group_range; |
212 | | }; |
213 | | |
214 | | // The encoded Parquet min-max value is parsed into `fields`; |
215 | | // Can be used in row groups and page index statistics. |
216 | | static Status parse_min_max_value(const FieldSchema* col_schema, const std::string& encoded_min, |
217 | | const std::string& encoded_max, const cctz::time_zone& ctz, |
218 | 45.6k | Field* min_field, Field* max_field) { |
219 | 45.6k | auto logical_data_type = remove_nullable(col_schema->data_type); |
220 | 45.6k | auto converter = parquet::PhysicalToLogicalConverter::get_converter( |
221 | 45.6k | col_schema, logical_data_type, logical_data_type, &ctz); |
222 | 45.6k | ColumnPtr physical_column; |
223 | 45.6k | switch (col_schema->parquet_schema.type) { |
224 | 658 | case tparquet::Type::type::BOOLEAN: { |
225 | 658 | auto physical_col = ColumnUInt8::create(); |
226 | 658 | physical_col->get_data().data(); |
227 | 658 | physical_col->resize(2); |
228 | 658 | physical_col->get_data()[0] = *reinterpret_cast<const bool*>(encoded_min.data()); |
229 | 658 | physical_col->get_data()[1] = *reinterpret_cast<const bool*>(encoded_max.data()); |
230 | 658 | physical_column = std::move(physical_col); |
231 | 658 | break; |
232 | 0 | } |
233 | 32.0k | case tparquet::Type::type::INT32: { |
234 | 32.0k | auto physical_col = ColumnInt32::create(); |
235 | 32.0k | physical_col->resize(2); |
236 | | |
237 | 32.0k | physical_col->get_data()[0] = *reinterpret_cast<const int32_t*>(encoded_min.data()); |
238 | 32.0k | physical_col->get_data()[1] = *reinterpret_cast<const int32_t*>(encoded_max.data()); |
239 | | |
240 | 32.0k | physical_column = std::move(physical_col); |
241 | 32.0k | break; |
242 | 0 | } |
243 | 4.44k | case tparquet::Type::type::INT64: { |
244 | 4.44k | auto physical_col = ColumnInt64::create(); |
245 | 4.44k | physical_col->resize(2); |
246 | 4.44k | physical_col->get_data()[0] = *reinterpret_cast<const int64_t*>(encoded_min.data()); |
247 | 4.44k | physical_col->get_data()[1] = *reinterpret_cast<const int64_t*>(encoded_max.data()); |
248 | 4.44k | physical_column = std::move(physical_col); |
249 | 4.44k | break; |
250 | 0 | } |
251 | 414 | case tparquet::Type::type::FLOAT: { |
252 | 414 | auto physical_col = ColumnFloat32::create(); |
253 | 414 | physical_col->resize(2); |
254 | 414 | physical_col->get_data()[0] = *reinterpret_cast<const float*>(encoded_min.data()); |
255 | 414 | physical_col->get_data()[1] = *reinterpret_cast<const float*>(encoded_max.data()); |
256 | 414 | physical_column = std::move(physical_col); |
257 | 414 | break; |
258 | 0 | } |
259 | 408 | case tparquet::Type::type::DOUBLE: { |
260 | 408 | auto physical_col = ColumnFloat64 ::create(); |
261 | 408 | physical_col->resize(2); |
262 | 408 | physical_col->get_data()[0] = *reinterpret_cast<const double*>(encoded_min.data()); |
263 | 408 | physical_col->get_data()[1] = *reinterpret_cast<const double*>(encoded_max.data()); |
264 | 408 | physical_column = std::move(physical_col); |
265 | 408 | break; |
266 | 0 | } |
267 | 7.33k | case tparquet::Type::type::BYTE_ARRAY: { |
268 | 7.33k | auto physical_col = ColumnString::create(); |
269 | 7.33k | physical_col->insert_data(encoded_min.data(), encoded_min.size()); |
270 | 7.33k | physical_col->insert_data(encoded_max.data(), encoded_max.size()); |
271 | 7.33k | physical_column = std::move(physical_col); |
272 | 7.33k | break; |
273 | 0 | } |
274 | 326 | case tparquet::Type::type::FIXED_LEN_BYTE_ARRAY: { |
275 | 326 | auto physical_col = ColumnUInt8::create(); |
276 | 326 | physical_col->resize(2 * col_schema->parquet_schema.type_length); |
277 | 326 | DCHECK(col_schema->parquet_schema.type_length == encoded_min.length()); |
278 | 326 | DCHECK(col_schema->parquet_schema.type_length == encoded_max.length()); |
279 | | |
280 | 326 | auto ptr = physical_col->get_data().data(); |
281 | 326 | memcpy(ptr, encoded_min.data(), encoded_min.length()); |
282 | 326 | memcpy(ptr + encoded_min.length(), encoded_max.data(), encoded_max.length()); |
283 | 326 | physical_column = std::move(physical_col); |
284 | 326 | break; |
285 | 0 | } |
286 | 0 | case tparquet::Type::type::INT96: { |
287 | 0 | auto physical_col = ColumnInt8::create(); |
288 | 0 | physical_col->resize(2 * sizeof(ParquetInt96)); |
289 | 0 | DCHECK(sizeof(ParquetInt96) == encoded_min.length()); |
290 | 0 | DCHECK(sizeof(ParquetInt96) == encoded_max.length()); |
291 | |
|
292 | 0 | auto ptr = physical_col->get_data().data(); |
293 | 0 | memcpy(ptr, encoded_min.data(), encoded_min.length()); |
294 | 0 | memcpy(ptr + encoded_min.length(), encoded_max.data(), encoded_max.length()); |
295 | 0 | physical_column = std::move(physical_col); |
296 | 0 | break; |
297 | 0 | } |
298 | 45.6k | } |
299 | | |
300 | 45.7k | ColumnPtr logical_column; |
301 | 45.7k | if (converter->is_consistent()) { |
302 | 39.2k | logical_column = physical_column; |
303 | 39.2k | } else { |
304 | 6.47k | logical_column = logical_data_type->create_column(); |
305 | 6.47k | RETURN_IF_ERROR(converter->physical_convert(physical_column, logical_column)); |
306 | 6.47k | } |
307 | | |
308 | 45.7k | DCHECK(logical_column->size() == 2); |
309 | 45.7k | *min_field = logical_column->operator[](0); |
310 | 45.7k | *max_field = logical_column->operator[](1); |
311 | | |
312 | 45.7k | auto logical_prim_type = logical_data_type->get_primitive_type(); |
313 | | |
314 | 45.7k | if (logical_prim_type == TYPE_FLOAT) { |
315 | 411 | auto& min_value = min_field->get<TYPE_FLOAT>(); |
316 | 411 | auto& max_value = max_field->get<TYPE_FLOAT>(); |
317 | | |
318 | 411 | if (std::isnan(min_value) || std::isnan(max_value)) { |
319 | 1 | return Status::DataQualityError("Can not use this parquet min/max value."); |
320 | 1 | } |
321 | | // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped |
322 | 410 | if (std::signbit(min_value) == 0 && min_value == 0.0F) { |
323 | 0 | min_value = -0.0F; |
324 | 0 | } |
325 | 410 | if (std::signbit(max_value) != 0 && max_value == -0.0F) { |
326 | 0 | max_value = 0.0F; |
327 | 0 | } |
328 | 45.2k | } else if (logical_prim_type == TYPE_DOUBLE) { |
329 | 408 | auto& min_value = min_field->get<TYPE_DOUBLE>(); |
330 | 408 | auto& max_value = max_field->get<TYPE_DOUBLE>(); |
331 | | |
332 | 408 | if (std::isnan(min_value) || std::isnan(max_value)) { |
333 | 0 | return Status::DataQualityError("Can not use this parquet min/max value."); |
334 | 0 | } |
335 | | // Updating min to -0.0 and max to +0.0 to ensure that no 0.0 values would be skipped |
336 | 408 | if (std::signbit(min_value) == 0 && min_value == 0.0F) { |
337 | 0 | min_value = -0.0F; |
338 | 0 | } |
339 | 408 | if (std::signbit(max_value) != 0 && max_value == -0.0F) { |
340 | 0 | max_value = 0.0F; |
341 | 0 | } |
342 | 44.8k | } else if (col_schema->parquet_schema.type == tparquet::Type::type::INT96 || |
343 | 44.8k | logical_prim_type == TYPE_DATETIMEV2) { |
344 | 1.50k | auto min_value = min_field->get<TYPE_DATETIMEV2>(); |
345 | 1.50k | auto max_value = min_field->get<TYPE_DATETIMEV2>(); |
346 | | |
347 | | // From Trino: Parquet INT96 timestamp values were compared incorrectly |
348 | | // for the purposes of producing statistics by older parquet writers, |
349 | | // so PARQUET-1065 deprecated them. The result is that any writer that produced stats |
350 | | // was producing unusable incorrect values, except the special case where min == max |
351 | | // and an incorrect ordering would not be material to the result. |
352 | | // PARQUET-1026 made binary stats available and valid in that special case. |
353 | 1.50k | if (min_value != max_value) { |
354 | 0 | return Status::DataQualityError("invalid min/max value"); |
355 | 0 | } |
356 | 1.50k | } |
357 | | |
358 | 45.7k | return Status::OK(); |
359 | 45.7k | } |
360 | | |
361 | | static Status read_column_stats(const FieldSchema* col_schema, |
362 | | const tparquet::ColumnMetaData& column_meta_data, |
363 | | std::unordered_map<tparquet::Type::type, bool>* ignored_stats, |
364 | 14.3k | const std::string& file_created_by, ColumnStat* ans_stat) { |
365 | 14.3k | auto& statistic = column_meta_data.statistics; |
366 | | |
367 | 14.3k | if (!statistic.__isset.null_count) [[unlikely]] { |
368 | 8 | return Status::DataQualityError("This parquet Column meta no set null_count."); |
369 | 8 | } |
370 | 14.3k | ans_stat->has_null = statistic.null_count > 0; |
371 | 14.3k | ans_stat->is_all_null = statistic.null_count == column_meta_data.num_values; |
372 | 14.3k | if (ans_stat->is_all_null) { |
373 | 278 | return Status::OK(); |
374 | 278 | } |
375 | 14.0k | auto prim_type = remove_nullable(col_schema->data_type)->get_primitive_type(); |
376 | | |
377 | | // Min-max of statistic is plain-encoded value |
378 | 14.0k | if (statistic.__isset.min_value && statistic.__isset.max_value) { |
379 | 13.7k | ColumnOrderName column_order = |
380 | 13.7k | col_schema->physical_type == tparquet::Type::INT96 || |
381 | 13.7k | col_schema->parquet_schema.logicalType.__isset.UNKNOWN |
382 | 13.7k | ? ColumnOrderName::UNDEFINED |
383 | 13.7k | : ColumnOrderName::TYPE_DEFINED_ORDER; |
384 | 13.7k | if ((statistic.min_value != statistic.max_value) && |
385 | 13.7k | (column_order != ColumnOrderName::TYPE_DEFINED_ORDER)) { |
386 | 0 | return Status::DataQualityError("Can not use this parquet min/max value."); |
387 | 0 | } |
388 | 13.7k | ans_stat->encoded_min_value = statistic.min_value; |
389 | 13.7k | ans_stat->encoded_max_value = statistic.max_value; |
390 | | |
391 | 13.7k | if (prim_type == TYPE_VARCHAR || prim_type == TYPE_CHAR || prim_type == TYPE_STRING) { |
392 | 2.34k | auto encoded_min_copy = ans_stat->encoded_min_value; |
393 | 2.34k | auto encoded_max_copy = ans_stat->encoded_max_value; |
394 | 2.34k | if (!_try_read_old_utf8_stats(encoded_min_copy, encoded_max_copy)) { |
395 | 84 | return Status::DataQualityError("Can not use this parquet min/max value."); |
396 | 84 | } |
397 | 2.26k | ans_stat->encoded_min_value = encoded_min_copy; |
398 | 2.26k | ans_stat->encoded_max_value = encoded_max_copy; |
399 | 2.26k | } |
400 | | |
401 | 13.7k | } else if (statistic.__isset.min && statistic.__isset.max) { |
402 | 270 | bool max_equals_min = statistic.min == statistic.max; |
403 | | |
404 | 270 | SortOrder sort_order = _determine_sort_order(col_schema->parquet_schema); |
405 | 270 | bool sort_orders_match = SortOrder::SIGNED == sort_order; |
406 | 270 | if (!sort_orders_match && !max_equals_min) { |
407 | 8 | return Status::NotSupported("Can not use this parquet min/max value."); |
408 | 8 | } |
409 | | |
410 | 262 | bool should_ignore_corrupted_stats = false; |
411 | 262 | if (ignored_stats != nullptr) { |
412 | 262 | if (ignored_stats->count(col_schema->physical_type) == 0) { |
413 | 222 | if (CorruptStatistics::should_ignore_statistics(file_created_by, |
414 | 222 | col_schema->physical_type)) { |
415 | 0 | ignored_stats->emplace(col_schema->physical_type, true); |
416 | 0 | should_ignore_corrupted_stats = true; |
417 | 222 | } else { |
418 | 222 | ignored_stats->emplace(col_schema->physical_type, false); |
419 | 222 | } |
420 | 222 | } else if (ignored_stats->at(col_schema->physical_type)) { |
421 | 0 | should_ignore_corrupted_stats = true; |
422 | 0 | } |
423 | 262 | } else if (CorruptStatistics::should_ignore_statistics(file_created_by, |
424 | 0 | col_schema->physical_type)) { |
425 | 0 | should_ignore_corrupted_stats = true; |
426 | 0 | } |
427 | | |
428 | 262 | if (should_ignore_corrupted_stats) { |
429 | 0 | return Status::DataQualityError("Error statistics, should ignore."); |
430 | 0 | } |
431 | | |
432 | 262 | ans_stat->encoded_min_value = statistic.min; |
433 | 262 | ans_stat->encoded_max_value = statistic.max; |
434 | 18.4E | } else { |
435 | 18.4E | return Status::DataQualityError("This parquet file not set min/max value"); |
436 | 18.4E | } |
437 | | |
438 | 13.9k | return Status::OK(); |
439 | 14.0k | } |
440 | | |
441 | | static Status read_bloom_filter(const tparquet::ColumnMetaData& column_meta_data, |
442 | | io::FileReaderSPtr file_reader, io::IOContext* io_ctx, |
443 | 32 | ColumnStat* ans_stat) { |
444 | 32 | size_t size; |
445 | 32 | if (!column_meta_data.__isset.bloom_filter_offset) { |
446 | 0 | return Status::NotSupported("Can not use this parquet bloom filter."); |
447 | 0 | } |
448 | | |
449 | 32 | if (column_meta_data.__isset.bloom_filter_length && |
450 | 32 | column_meta_data.bloom_filter_length > 0) { |
451 | 32 | size = column_meta_data.bloom_filter_length; |
452 | 32 | } else { |
453 | 0 | size = BLOOM_FILTER_MAX_HEADER_LENGTH; |
454 | 0 | } |
455 | 32 | size_t bytes_read = 0; |
456 | 32 | std::vector<uint8_t> header_buffer(size); |
457 | 32 | RETURN_IF_ERROR(file_reader->read_at(column_meta_data.bloom_filter_offset, |
458 | 32 | Slice(header_buffer.data(), size), &bytes_read, |
459 | 32 | io_ctx)); |
460 | | |
461 | 32 | tparquet::BloomFilterHeader t_bloom_filter_header; |
462 | 32 | uint32_t t_bloom_filter_header_size = static_cast<uint32_t>(bytes_read); |
463 | 32 | RETURN_IF_ERROR(deserialize_thrift_msg(header_buffer.data(), &t_bloom_filter_header_size, |
464 | 32 | true, &t_bloom_filter_header)); |
465 | | |
466 | | // TODO the bloom filter could be encrypted, too, so need to double check that this is NOT the case |
467 | 32 | if (!t_bloom_filter_header.algorithm.__isset.BLOCK || |
468 | 32 | !t_bloom_filter_header.compression.__isset.UNCOMPRESSED || |
469 | 32 | !t_bloom_filter_header.hash.__isset.XXHASH) { |
470 | 0 | return Status::NotSupported("Can not use this parquet bloom filter."); |
471 | 0 | } |
472 | | |
473 | 32 | ans_stat->bloom_filter = std::make_unique<ParquetBlockSplitBloomFilter>(); |
474 | | |
475 | 32 | std::vector<uint8_t> data_buffer(t_bloom_filter_header.numBytes); |
476 | 32 | RETURN_IF_ERROR(file_reader->read_at( |
477 | 32 | column_meta_data.bloom_filter_offset + t_bloom_filter_header_size, |
478 | 32 | Slice(data_buffer.data(), t_bloom_filter_header.numBytes), &bytes_read, io_ctx)); |
479 | | |
480 | 32 | RETURN_IF_ERROR(ans_stat->bloom_filter->init( |
481 | 32 | reinterpret_cast<const char*>(data_buffer.data()), t_bloom_filter_header.numBytes, |
482 | 32 | segment_v2::HashStrategyPB::XX_HASH_64)); |
483 | | |
484 | 32 | return Status::OK(); |
485 | 32 | } |
486 | | }; |
487 | | #include "common/compile_check_end.h" |
488 | | |
489 | | } // namespace doris |