Coverage Report

Created: 2024-11-20 12:30

/root/doris/be/src/olap/file_header.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdio.h>
21
#include <sys/stat.h>
22
23
#include <memory>
24
#include <string>
25
#include <vector>
26
27
#include "io/fs/file_reader.h"
28
#include "io/fs/file_writer.h"
29
#include "io/fs/local_file_system.h"
30
#include "olap/lru_cache.h"
31
#include "olap/olap_common.h"
32
#include "olap/olap_define.h"
33
#include "olap/utils.h"
34
#include "util/debug_util.h"
35
36
namespace doris {
37
38
using FixedFileHeader = struct _FixedFileHeader {
39
    // the length of the entire file
40
    uint32_t file_length;
41
    // Checksum of the file's contents except the FileHeader
42
    uint32_t checksum;
43
    // Protobuf length of section
44
    uint32_t protobuf_length;
45
    // Checksum of Protobuf part
46
    uint32_t protobuf_checksum;
47
} __attribute__((packed));
48
49
using FixedFileHeaderV2 = struct _FixedFileHeaderV2 {
50
    uint64_t magic_number;
51
    uint32_t version;
52
    // the length of the entire file
53
    uint64_t file_length;
54
    // Checksum of the file's contents except the FileHeader
55
    uint32_t checksum;
56
    // Protobuf length of section
57
    uint64_t protobuf_length;
58
    // Checksum of Protobuf part
59
    uint32_t protobuf_checksum;
60
} __attribute__((packed));
61
62
template <typename MessageType, typename ExtraType = uint32_t>
63
class FileHeader {
64
public:
65
13
    FileHeader(const std::string& file_path) : _file_path(file_path) {
66
13
        memset(&_fixed_file_header, 0, sizeof(_fixed_file_header));
67
13
        memset(&_extra_fixed_header, 0, sizeof(_extra_fixed_header));
68
13
        _fixed_file_header_size = sizeof(_fixed_file_header);
69
13
    }
70
13
    ~FileHeader() = default;
71
72
    // To calculate the length of the proto part, it needs to be called after the proto is operated,
73
    // and prepare must be called before calling serialize
74
    Status prepare();
75
76
    // call prepare() first, serialize() will write fixed header and protobuffer.
77
    // Write the header to the starting position of the incoming file handle
78
    Status serialize();
79
80
    // read from file, validate file length, signature and alder32 of protobuffer.
81
    // Read the header from the beginning of the incoming file handle
82
    Status deserialize();
83
84
    // Check the validity of Header
85
    // it is actually call deserialize().
86
    Status validate();
87
88
5
    uint64_t file_length() const { return _fixed_file_header.file_length; }
89
    uint32_t checksum() const { return _fixed_file_header.checksum; }
90
    const ExtraType& extra() const { return _extra_fixed_header; }
91
    ExtraType* mutable_extra() { return &_extra_fixed_header; }
92
5
    const MessageType& message() const { return _proto; }
93
8
    MessageType* mutable_message() { return &_proto; }
94
8
    uint64_t size() const {
95
8
        return _fixed_file_header_size + sizeof(_extra_fixed_header) +
96
8
               _fixed_file_header.protobuf_length;
97
8
    }
98
99
    void set_file_length(uint64_t file_length) { _fixed_file_header.file_length = file_length; }
100
    void set_checksum(uint32_t checksum) { _fixed_file_header.checksum = checksum; }
101
102
private:
103
    std::string _file_path;
104
    FixedFileHeaderV2 _fixed_file_header;
105
    uint32_t _fixed_file_header_size;
106
107
    std::string _proto_string;
108
    ExtraType _extra_fixed_header;
109
    MessageType _proto;
110
};
111
112
// FileHeader implementation
113
template <typename MessageType, typename ExtraType>
114
8
Status FileHeader<MessageType, ExtraType>::prepare() {
115
8
    try {
116
8
        if (!_proto.SerializeToString(&_proto_string)) {
117
0
            return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
118
0
                    "serialize file header to string error. [path={}]", _file_path);
119
0
        }
120
8
    } catch (...) {
121
0
        return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
122
0
                "serialize file header to string error. [path={}]", _file_path);
123
0
    }
124
125
8
    _fixed_file_header.protobuf_checksum =
126
8
            olap_adler32(olap_adler32_init(), _proto_string.c_str(), _proto_string.size());
127
128
8
    _fixed_file_header.checksum = 0;
129
8
    _fixed_file_header.protobuf_length = _proto_string.size();
130
8
    _fixed_file_header.file_length = size();
131
8
    _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
132
8
    _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
133
134
8
    return Status::OK();
135
8
}
136
137
template <typename MessageType, typename ExtraType>
138
8
Status FileHeader<MessageType, ExtraType>::serialize() {
139
    // write to file
140
8
    io::FileWriterPtr file_writer;
141
8
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_path, &file_writer));
142
8
    RETURN_IF_ERROR(
143
8
            file_writer->append({(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}));
144
8
    RETURN_IF_ERROR(file_writer->append(
145
8
            {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}));
146
8
    RETURN_IF_ERROR(file_writer->append({_proto_string}));
147
8
    return file_writer->close();
148
8
}
149
150
template <typename MessageType, typename ExtraType>
151
5
Status FileHeader<MessageType, ExtraType>::deserialize() {
152
5
    io::FileReaderSPtr file_reader;
153
5
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_path, &file_reader));
154
5
    off_t real_file_length = 0;
155
5
    uint32_t real_protobuf_checksum = 0;
156
5
    size_t bytes_read = 0;
157
5
    RETURN_IF_ERROR(file_reader->read_at(
158
5
            0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}, &bytes_read));
159
5
    DCHECK(_fixed_file_header_size == bytes_read)
160
0
            << " deserialize read bytes dismatch, request bytes " << _fixed_file_header_size
161
0
            << " actual read " << bytes_read;
162
163
    //Status read_at(size_t offset, Slice result, size_t* bytes_read,
164
    //             const IOContext* io_ctx = nullptr);
165
166
5
    if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) {
167
0
        VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number;
168
0
        FixedFileHeader tmp_header;
169
0
        RETURN_IF_ERROR(file_reader->read_at(0, {(const uint8_t*)&tmp_header, sizeof(tmp_header)},
170
0
                                             &bytes_read));
171
0
        DCHECK(sizeof(tmp_header) == bytes_read)
172
0
                << " deserialize read bytes dismatch, request bytes " << sizeof(tmp_header)
173
0
                << " actual read " << bytes_read;
174
0
        _fixed_file_header.file_length = tmp_header.file_length;
175
0
        _fixed_file_header.checksum = tmp_header.checksum;
176
0
        _fixed_file_header.protobuf_length = tmp_header.protobuf_length;
177
0
        _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum;
178
0
        _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
179
0
        _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
180
0
        _fixed_file_header_size = sizeof(tmp_header);
181
0
    }
182
183
5
    VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length
184
0
                << ", checksum=" << _fixed_file_header.checksum
185
0
                << ", protobuf_length=" << _fixed_file_header.protobuf_length
186
0
                << ", magic_number=" << _fixed_file_header.magic_number
187
0
                << ", version=" << _fixed_file_header.version;
188
189
5
    RETURN_IF_ERROR(file_reader->read_at(
190
5
            _fixed_file_header_size,
191
5
            {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}, &bytes_read));
192
193
5
    std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]);
194
5
    if (nullptr == buf) {
195
0
        char errmsg[64];
196
0
        return Status::Error<ErrorCode::MEM_ALLOC_FAILED>(
197
0
                "malloc protobuf buf error. file={}, error={}", file_reader->path().native(),
198
0
                strerror_r(errno, errmsg, 64));
199
0
    }
200
5
    RETURN_IF_ERROR(file_reader->read_at(_fixed_file_header_size + sizeof(_extra_fixed_header),
201
5
                                         {buf.get(), _fixed_file_header.protobuf_length},
202
5
                                         &bytes_read));
203
5
    real_file_length = file_reader->size();
204
205
5
    if (file_length() != static_cast<uint64_t>(real_file_length)) {
206
0
        return Status::InternalError(
207
0
                "file length is not match. file={}, file_length={}, real_file_length={}",
208
0
                file_reader->path().native(), file_length(), real_file_length);
209
0
    }
210
211
    // check proto checksum
212
5
    real_protobuf_checksum =
213
5
            olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length);
214
215
5
    if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) {
216
        // When compiling using gcc there woule be error like:
217
        // Cannot bind packed field '_FixedFileHeaderV2::protobuf_checksum' to 'unsigned int&'
218
        // so we need to using unary operator+ to evaluate one value to pass
219
        // to status to successfully compile.
220
0
        return Status::InternalError("checksum is not match. file={}, expect={}, actual={}",
221
0
                                     file_reader->path().native(),
222
0
                                     +_fixed_file_header.protobuf_checksum, real_protobuf_checksum);
223
0
    }
224
225
5
    try {
226
5
        std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length);
227
228
5
        if (!_proto.ParseFromString(protobuf_str)) {
229
0
            return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
230
0
                    "fail to parse file content to protobuf object. file={}",
231
0
                    file_reader->path().native());
232
0
        }
233
5
    } catch (...) {
234
0
        LOG(WARNING) << "fail to load protobuf. file='" << file_reader->path().native();
235
0
        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf. file={}",
236
0
                                                              file_reader->path().native());
237
0
    }
238
239
5
    return Status::OK();
240
5
}
241
242
template <typename MessageType, typename ExtraType>
243
Status FileHeader<MessageType, ExtraType>::validate() {
244
    return deserialize();
245
}
246
247
} // namespace doris