be/src/format/parquet/parquet_thrift_util.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/parquet_types.h> |
21 | | |
22 | | #include <cstdint> |
23 | | |
24 | | #include "common/logging.h" |
25 | | #include "common/status.h" |
26 | | #include "core/custom_allocator.h" |
27 | | #include "format/parquet/vparquet_file_metadata.h" |
28 | | #include "io/fs/file_reader.h" |
29 | | #include "io/io_common.h" |
30 | | #include "storage/iterators.h" |
31 | | #include "util/coding.h" |
32 | | #include "util/thrift_util.h" |
33 | | |
34 | | namespace doris { |
35 | | #include "common/compile_check_begin.h" |
36 | | constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'}; |
37 | | constexpr uint32_t PARQUET_FOOTER_SIZE = 8; |
38 | | constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k |
39 | | |
40 | | static Status parse_thrift_footer(io::FileReaderSPtr file, |
41 | | std::unique_ptr<FileMetaData>* file_metadata, size_t* meta_size, |
42 | | io::IOContext* io_ctx, const bool enable_mapping_varbinary, |
43 | 99 | const bool enable_mapping_timestamp_tz) { |
44 | 99 | size_t file_size = file->size(); |
45 | 99 | size_t bytes_read = std::min(file_size, INIT_META_SIZE); |
46 | 99 | std::vector<uint8_t> footer(bytes_read); |
47 | 99 | RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read), |
48 | 99 | &bytes_read, io_ctx)); |
49 | | |
50 | | // validate magic |
51 | 99 | uint8_t* magic_ptr = footer.data() + bytes_read - 4; |
52 | 99 | if (bytes_read < PARQUET_FOOTER_SIZE) { |
53 | 0 | return Status::Corruption( |
54 | 0 | "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}", |
55 | 0 | bytes_read, file_size, file->path().native()); |
56 | 99 | } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) { |
57 | 0 | return Status::Corruption( |
58 | 0 | "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, " |
59 | 0 | "read magic: {}", |
60 | 0 | bytes_read, file_size, file->path().native(), |
61 | 0 | std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER))); |
62 | 0 | } |
63 | | |
64 | | // get metadata_size |
65 | 99 | uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE); |
66 | 99 | if (metadata_size > file_size - PARQUET_FOOTER_SIZE) { |
67 | 0 | return Status::Corruption("Parquet footer size({}) is large than file size({})", |
68 | 0 | metadata_size, file_size); |
69 | 0 | } |
70 | 99 | DorisUniqueBufferPtr<uint8_t> new_buff; |
71 | 99 | uint8_t* meta_ptr; |
72 | 99 | if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) { |
73 | 0 | new_buff = make_unique_buffer<uint8_t>(metadata_size); |
74 | 0 | RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, |
75 | 0 | Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx)); |
76 | 0 | meta_ptr = new_buff.get(); |
77 | 99 | } else { |
78 | 99 | meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size; |
79 | 99 | } |
80 | | |
81 | 99 | tparquet::FileMetaData t_metadata; |
82 | | // deserialize footer |
83 | 99 | RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata)); |
84 | 99 | *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size); |
85 | 99 | RETURN_IF_ERROR( |
86 | 99 | (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz)); |
87 | 99 | *meta_size = PARQUET_FOOTER_SIZE + metadata_size; |
88 | 99 | return Status::OK(); |
89 | 99 | } parquet_expr_test.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb Line | Count | Source | 43 | 27 | const bool enable_mapping_timestamp_tz) { | 44 | 27 | size_t file_size = file->size(); | 45 | 27 | size_t bytes_read = std::min(file_size, INIT_META_SIZE); | 46 | 27 | std::vector<uint8_t> footer(bytes_read); | 47 | 27 | RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read), | 48 | 27 | &bytes_read, io_ctx)); | 49 | | | 50 | | // validate magic | 51 | 27 | uint8_t* magic_ptr = footer.data() + bytes_read - 4; | 52 | 27 | if (bytes_read < PARQUET_FOOTER_SIZE) { | 53 | 0 | return Status::Corruption( | 54 | 0 | "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}", | 55 | 0 | bytes_read, file_size, file->path().native()); | 56 | 27 | } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) { | 57 | 0 | return Status::Corruption( | 58 | 0 | "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, " | 59 | 0 | "read magic: {}", | 60 | 0 | bytes_read, file_size, file->path().native(), | 61 | 0 | std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER))); | 62 | 0 | } | 63 | | | 64 | | // get metadata_size | 65 | 27 | uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE); | 66 | 27 | if (metadata_size > file_size - PARQUET_FOOTER_SIZE) { | 67 | 0 | return Status::Corruption("Parquet footer size({}) is large than file size({})", | 68 | 0 | metadata_size, file_size); | 69 | 0 | } | 70 | 27 | DorisUniqueBufferPtr<uint8_t> new_buff; | 71 | 27 | uint8_t* meta_ptr; | 72 | 27 | if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) { | 73 | 0 | new_buff = make_unique_buffer<uint8_t>(metadata_size); | 74 | 0 | RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, | 75 | 0 | Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx)); | 76 | 0 | meta_ptr = new_buff.get(); | 77 | 27 | } else { | 78 | 27 | meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size; | 79 | 27 | } | 80 | | | 81 | 27 | tparquet::FileMetaData t_metadata; | 82 | | // deserialize footer | 83 | 27 | RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata)); | 84 | 27 | *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size); | 85 | 27 | RETURN_IF_ERROR( | 86 | 27 | (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz)); | 87 | 27 | *meta_size = PARQUET_FOOTER_SIZE + metadata_size; | 88 | 27 | return Status::OK(); | 89 | 27 | } |
parquet_thrift_test.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb Line | Count | Source | 43 | 4 | const bool enable_mapping_timestamp_tz) { | 44 | 4 | size_t file_size = file->size(); | 45 | 4 | size_t bytes_read = std::min(file_size, INIT_META_SIZE); | 46 | 4 | std::vector<uint8_t> footer(bytes_read); | 47 | 4 | RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read), | 48 | 4 | &bytes_read, io_ctx)); | 49 | | | 50 | | // validate magic | 51 | 4 | uint8_t* magic_ptr = footer.data() + bytes_read - 4; | 52 | 4 | if (bytes_read < PARQUET_FOOTER_SIZE) { | 53 | 0 | return Status::Corruption( | 54 | 0 | "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}", | 55 | 0 | bytes_read, file_size, file->path().native()); | 56 | 4 | } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) { | 57 | 0 | return Status::Corruption( | 58 | 0 | "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, " | 59 | 0 | "read magic: {}", | 60 | 0 | bytes_read, file_size, file->path().native(), | 61 | 0 | std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER))); | 62 | 0 | } | 63 | | | 64 | | // get metadata_size | 65 | 4 | uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE); | 66 | 4 | if (metadata_size > file_size - PARQUET_FOOTER_SIZE) { | 67 | 0 | return Status::Corruption("Parquet footer size({}) is large than file size({})", | 68 | 0 | metadata_size, file_size); | 69 | 0 | } | 70 | 4 | DorisUniqueBufferPtr<uint8_t> new_buff; | 71 | 4 | uint8_t* meta_ptr; | 72 | 4 | if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) { | 73 | 0 | new_buff = make_unique_buffer<uint8_t>(metadata_size); | 74 | 0 | RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, | 75 | 0 | Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx)); | 76 | 0 | meta_ptr = new_buff.get(); | 77 | 4 | } else { | 78 | 4 | meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size; | 79 | 4 | } | 80 | | | 81 | 4 | tparquet::FileMetaData t_metadata; | 82 | | // deserialize footer | 83 | 4 | RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata)); | 84 | 4 | *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size); | 85 | 4 | RETURN_IF_ERROR( | 86 | 4 | (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz)); | 87 | 4 | *meta_size = PARQUET_FOOTER_SIZE + metadata_size; | 88 | 4 | return Status::OK(); | 89 | 4 | } |
vparquet_reader.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb Line | Count | Source | 43 | 68 | const bool enable_mapping_timestamp_tz) { | 44 | 68 | size_t file_size = file->size(); | 45 | 68 | size_t bytes_read = std::min(file_size, INIT_META_SIZE); | 46 | 68 | std::vector<uint8_t> footer(bytes_read); | 47 | 68 | RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read), | 48 | 68 | &bytes_read, io_ctx)); | 49 | | | 50 | | // validate magic | 51 | 68 | uint8_t* magic_ptr = footer.data() + bytes_read - 4; | 52 | 68 | if (bytes_read < PARQUET_FOOTER_SIZE) { | 53 | 0 | return Status::Corruption( | 54 | 0 | "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}", | 55 | 0 | bytes_read, file_size, file->path().native()); | 56 | 68 | } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) { | 57 | 0 | return Status::Corruption( | 58 | 0 | "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, " | 59 | 0 | "read magic: {}", | 60 | 0 | bytes_read, file_size, file->path().native(), | 61 | 0 | std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER))); | 62 | 0 | } | 63 | | | 64 | | // get metadata_size | 65 | 68 | uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE); | 66 | 68 | if (metadata_size > file_size - PARQUET_FOOTER_SIZE) { | 67 | 0 | return Status::Corruption("Parquet footer size({}) is large than file size({})", | 68 | 0 | metadata_size, file_size); | 69 | 0 | } | 70 | 68 | DorisUniqueBufferPtr<uint8_t> new_buff; | 71 | 68 | uint8_t* meta_ptr; | 72 | 68 | if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) { | 73 | 0 | new_buff = make_unique_buffer<uint8_t>(metadata_size); | 74 | 0 | RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, | 75 | 0 | Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx)); | 76 | 0 | meta_ptr = new_buff.get(); | 77 | 68 | } else { | 78 | 68 | meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size; | 79 | 68 | } | 80 | | | 81 | 68 | tparquet::FileMetaData t_metadata; | 82 | | // deserialize footer | 83 | 68 | RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata)); | 84 | 68 | *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size); | 85 | 68 | RETURN_IF_ERROR( | 86 | 68 | (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz)); | 87 | 68 | *meta_size = PARQUET_FOOTER_SIZE + metadata_size; | 88 | 68 | return Status::OK(); | 89 | 68 | } |
Unexecuted instantiation: parquet_metadata_reader.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb |
90 | | #include "common/compile_check_end.h" |
91 | | |
92 | | } // namespace doris |