Coverage Report

Created: 2026-04-16 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/parquet/parquet_thrift_util.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/parquet_types.h>
21
22
#include <cstdint>
23
24
#include "common/logging.h"
25
#include "common/status.h"
26
#include "core/custom_allocator.h"
27
#include "format/parquet/vparquet_file_metadata.h"
28
#include "io/fs/file_reader.h"
29
#include "io/io_common.h"
30
#include "storage/iterators.h"
31
#include "util/coding.h"
32
#include "util/thrift_util.h"
33
34
namespace doris {
35
constexpr uint8_t PARQUET_VERSION_NUMBER[4] = {'P', 'A', 'R', '1'};
36
constexpr uint32_t PARQUET_FOOTER_SIZE = 8;
37
constexpr size_t INIT_META_SIZE = 48 * 1024; // 48k
38
39
static Status parse_thrift_footer(io::FileReaderSPtr file,
40
                                  std::unique_ptr<FileMetaData>* file_metadata, size_t* meta_size,
41
                                  io::IOContext* io_ctx, const bool enable_mapping_varbinary,
42
104
                                  const bool enable_mapping_timestamp_tz) {
43
104
    size_t file_size = file->size();
44
104
    size_t bytes_read = std::min(file_size, INIT_META_SIZE);
45
104
    std::vector<uint8_t> footer(bytes_read);
46
104
    RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read),
47
104
                                  &bytes_read, io_ctx));
48
49
    // validate magic
50
104
    uint8_t* magic_ptr = footer.data() + bytes_read - 4;
51
104
    if (bytes_read < PARQUET_FOOTER_SIZE) {
52
0
        return Status::Corruption(
53
0
                "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}",
54
0
                bytes_read, file_size, file->path().native());
55
104
    } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) {
56
0
        return Status::Corruption(
57
0
                "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, "
58
0
                "read magic: {}",
59
0
                bytes_read, file_size, file->path().native(),
60
0
                std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER)));
61
0
    }
62
63
    // get metadata_size
64
104
    uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE);
65
104
    if (metadata_size > file_size - PARQUET_FOOTER_SIZE) {
66
0
        return Status::Corruption("Parquet footer size({}) is large than file size({})",
67
0
                                  metadata_size, file_size);
68
0
    }
69
104
    DorisUniqueBufferPtr<uint8_t> new_buff;
70
104
    uint8_t* meta_ptr;
71
104
    if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) {
72
0
        new_buff = make_unique_buffer<uint8_t>(metadata_size);
73
0
        RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size,
74
0
                                      Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx));
75
0
        meta_ptr = new_buff.get();
76
104
    } else {
77
104
        meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size;
78
104
    }
79
80
104
    tparquet::FileMetaData t_metadata;
81
    // deserialize footer
82
104
    RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata));
83
104
    *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
84
104
    RETURN_IF_ERROR(
85
104
            (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz));
86
104
    *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
87
104
    return Status::OK();
88
104
}
parquet_expr_test.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb
Line
Count
Source
42
28
                                  const bool enable_mapping_timestamp_tz) {
43
28
    size_t file_size = file->size();
44
28
    size_t bytes_read = std::min(file_size, INIT_META_SIZE);
45
28
    std::vector<uint8_t> footer(bytes_read);
46
28
    RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read),
47
28
                                  &bytes_read, io_ctx));
48
49
    // validate magic
50
28
    uint8_t* magic_ptr = footer.data() + bytes_read - 4;
51
28
    if (bytes_read < PARQUET_FOOTER_SIZE) {
52
0
        return Status::Corruption(
53
0
                "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}",
54
0
                bytes_read, file_size, file->path().native());
55
28
    } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) {
56
0
        return Status::Corruption(
57
0
                "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, "
58
0
                "read magic: {}",
59
0
                bytes_read, file_size, file->path().native(),
60
0
                std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER)));
61
0
    }
62
63
    // get metadata_size
64
28
    uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE);
65
28
    if (metadata_size > file_size - PARQUET_FOOTER_SIZE) {
66
0
        return Status::Corruption("Parquet footer size({}) is large than file size({})",
67
0
                                  metadata_size, file_size);
68
0
    }
69
28
    DorisUniqueBufferPtr<uint8_t> new_buff;
70
28
    uint8_t* meta_ptr;
71
28
    if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) {
72
0
        new_buff = make_unique_buffer<uint8_t>(metadata_size);
73
0
        RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size,
74
0
                                      Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx));
75
0
        meta_ptr = new_buff.get();
76
28
    } else {
77
28
        meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size;
78
28
    }
79
80
28
    tparquet::FileMetaData t_metadata;
81
    // deserialize footer
82
28
    RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata));
83
28
    *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
84
28
    RETURN_IF_ERROR(
85
28
            (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz));
86
28
    *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
87
28
    return Status::OK();
88
28
}
parquet_thrift_test.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb
Line
Count
Source
42
4
                                  const bool enable_mapping_timestamp_tz) {
43
4
    size_t file_size = file->size();
44
4
    size_t bytes_read = std::min(file_size, INIT_META_SIZE);
45
4
    std::vector<uint8_t> footer(bytes_read);
46
4
    RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read),
47
4
                                  &bytes_read, io_ctx));
48
49
    // validate magic
50
4
    uint8_t* magic_ptr = footer.data() + bytes_read - 4;
51
4
    if (bytes_read < PARQUET_FOOTER_SIZE) {
52
0
        return Status::Corruption(
53
0
                "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}",
54
0
                bytes_read, file_size, file->path().native());
55
4
    } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) {
56
0
        return Status::Corruption(
57
0
                "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, "
58
0
                "read magic: {}",
59
0
                bytes_read, file_size, file->path().native(),
60
0
                std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER)));
61
0
    }
62
63
    // get metadata_size
64
4
    uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE);
65
4
    if (metadata_size > file_size - PARQUET_FOOTER_SIZE) {
66
0
        return Status::Corruption("Parquet footer size({}) is large than file size({})",
67
0
                                  metadata_size, file_size);
68
0
    }
69
4
    DorisUniqueBufferPtr<uint8_t> new_buff;
70
4
    uint8_t* meta_ptr;
71
4
    if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) {
72
0
        new_buff = make_unique_buffer<uint8_t>(metadata_size);
73
0
        RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size,
74
0
                                      Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx));
75
0
        meta_ptr = new_buff.get();
76
4
    } else {
77
4
        meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size;
78
4
    }
79
80
4
    tparquet::FileMetaData t_metadata;
81
    // deserialize footer
82
4
    RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata));
83
4
    *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
84
4
    RETURN_IF_ERROR(
85
4
            (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz));
86
4
    *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
87
4
    return Status::OK();
88
4
}
vparquet_reader.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb
Line
Count
Source
42
72
                                  const bool enable_mapping_timestamp_tz) {
43
72
    size_t file_size = file->size();
44
72
    size_t bytes_read = std::min(file_size, INIT_META_SIZE);
45
72
    std::vector<uint8_t> footer(bytes_read);
46
72
    RETURN_IF_ERROR(file->read_at(file_size - bytes_read, Slice(footer.data(), bytes_read),
47
72
                                  &bytes_read, io_ctx));
48
49
    // validate magic
50
72
    uint8_t* magic_ptr = footer.data() + bytes_read - 4;
51
72
    if (bytes_read < PARQUET_FOOTER_SIZE) {
52
0
        return Status::Corruption(
53
0
                "Read parquet file footer fail, bytes read: {}, file size: {}, path: {}",
54
0
                bytes_read, file_size, file->path().native());
55
72
    } else if (memcmp(magic_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) {
56
0
        return Status::Corruption(
57
0
                "Invalid magic number in parquet file, bytes read: {}, file size: {}, path: {}, "
58
0
                "read magic: {}",
59
0
                bytes_read, file_size, file->path().native(),
60
0
                std::string((char*)magic_ptr, sizeof(PARQUET_VERSION_NUMBER)));
61
0
    }
62
63
    // get metadata_size
64
72
    uint32_t metadata_size = decode_fixed32_le(footer.data() + bytes_read - PARQUET_FOOTER_SIZE);
65
72
    if (metadata_size > file_size - PARQUET_FOOTER_SIZE) {
66
0
        return Status::Corruption("Parquet footer size({}) is large than file size({})",
67
0
                                  metadata_size, file_size);
68
0
    }
69
72
    DorisUniqueBufferPtr<uint8_t> new_buff;
70
72
    uint8_t* meta_ptr;
71
72
    if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) {
72
0
        new_buff = make_unique_buffer<uint8_t>(metadata_size);
73
0
        RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size,
74
0
                                      Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx));
75
0
        meta_ptr = new_buff.get();
76
72
    } else {
77
72
        meta_ptr = footer.data() + bytes_read - PARQUET_FOOTER_SIZE - metadata_size;
78
72
    }
79
80
72
    tparquet::FileMetaData t_metadata;
81
    // deserialize footer
82
72
    RETURN_IF_ERROR(deserialize_thrift_msg(meta_ptr, &metadata_size, true, &t_metadata));
83
72
    *file_metadata = std::make_unique<FileMetaData>(t_metadata, metadata_size);
84
72
    RETURN_IF_ERROR(
85
72
            (*file_metadata)->init_schema(enable_mapping_varbinary, enable_mapping_timestamp_tz));
86
72
    *meta_size = PARQUET_FOOTER_SIZE + metadata_size;
87
72
    return Status::OK();
88
72
}
Unexecuted instantiation: parquet_metadata_reader.cpp:_ZN5dorisL19parse_thrift_footerESt10shared_ptrINS_2io10FileReaderEEPSt10unique_ptrINS_12FileMetaDataESt14default_deleteIS5_EEPmPNS1_9IOContextEbb
89
90
} // namespace doris