Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/olap/file_header.h
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdio.h>
21
#include <sys/stat.h>
22
23
#include <memory>
24
#include <string>
25
#include <vector>
26
27
#include "io/fs/file_reader.h"
28
#include "io/fs/file_writer.h"
29
#include "io/fs/local_file_system.h"
30
#include "olap/lru_cache.h"
31
#include "olap/olap_common.h"
32
#include "olap/olap_define.h"
33
#include "olap/utils.h"
34
#include "util/debug_util.h"
35
36
namespace doris {
37
38
using FixedFileHeader = struct _FixedFileHeader {
39
    // the length of the entire file
40
    uint32_t file_length;
41
    // Checksum of the file's contents except the FileHeader
42
    uint32_t checksum;
43
    // Protobuf length of section
44
    uint32_t protobuf_length;
45
    // Checksum of Protobuf part
46
    uint32_t protobuf_checksum;
47
} __attribute__((packed));
48
49
using FixedFileHeaderV2 = struct _FixedFileHeaderV2 {
50
    uint64_t magic_number;
51
    uint32_t version;
52
    // the length of the entire file
53
    uint64_t file_length;
54
    // Checksum of the file's contents except the FileHeader
55
    uint32_t checksum;
56
    // Protobuf length of section
57
    uint64_t protobuf_length;
58
    // Checksum of Protobuf part
59
    uint32_t protobuf_checksum;
60
} __attribute__((packed));
61
62
template <typename MessageType, typename ExtraType = uint32_t>
63
class FileHeader {
64
public:
65
13
    FileHeader(const std::string& file_path) : _file_path(file_path) {
66
13
        memset(&_fixed_file_header, 0, sizeof(_fixed_file_header));
67
13
        memset(&_extra_fixed_header, 0, sizeof(_extra_fixed_header));
68
13
        _fixed_file_header_size = sizeof(_fixed_file_header);
69
13
    }
70
13
    ~FileHeader() = default;
71
72
    // To calculate the length of the proto part, it needs to be called after the proto is operated,
73
    // and prepare must be called before calling serialize
74
    Status prepare();
75
76
    // call prepare() first, serialize() will write fixed header and protobuffer.
77
    // Write the header to the starting position of the incoming file handle
78
    Status serialize();
79
80
    // read from file, validate file length, signature and alder32 of protobuffer.
81
    // Read the header from the beginning of the incoming file handle
82
    Status deserialize();
83
84
    // Check the validity of Header
85
    // it is actually call deserialize().
86
    Status validate();
87
88
5
    uint64_t file_length() const { return _fixed_file_header.file_length; }
89
    uint32_t checksum() const { return _fixed_file_header.checksum; }
90
    const ExtraType& extra() const { return _extra_fixed_header; }
91
    ExtraType* mutable_extra() { return &_extra_fixed_header; }
92
5
    const MessageType& message() const { return _proto; }
93
8
    MessageType* mutable_message() { return &_proto; }
94
8
    uint64_t size() const {
95
8
        return _fixed_file_header_size + sizeof(_extra_fixed_header) +
96
8
               _fixed_file_header.protobuf_length;
97
8
    }
98
99
    void set_file_length(uint64_t file_length) { _fixed_file_header.file_length = file_length; }
100
    void set_checksum(uint32_t checksum) { _fixed_file_header.checksum = checksum; }
101
102
private:
103
    std::string _file_path;
104
    FixedFileHeaderV2 _fixed_file_header;
105
    uint32_t _fixed_file_header_size;
106
107
    std::string _proto_string;
108
    ExtraType _extra_fixed_header;
109
    MessageType _proto;
110
};
111
112
// FileHeader implementation
113
template <typename MessageType, typename ExtraType>
114
8
Status FileHeader<MessageType, ExtraType>::prepare() {
115
8
    try {
116
8
        if (!_proto.SerializeToString(&_proto_string)) {
117
0
            return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
118
0
                    "serialize file header to string error. [path={}]", _file_path);
119
0
        }
120
8
    } catch (...) {
121
0
        return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
122
0
                "serialize file header to string error. [path={}]", _file_path);
123
0
    }
124
125
8
    _fixed_file_header.protobuf_checksum =
126
8
            olap_adler32(olap_adler32_init(), _proto_string.c_str(), _proto_string.size());
127
128
8
    _fixed_file_header.checksum = 0;
129
8
    _fixed_file_header.protobuf_length = _proto_string.size();
130
8
    _fixed_file_header.file_length = size();
131
8
    _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
132
8
    _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
133
134
8
    return Status::OK();
135
8
}
136
137
template <typename MessageType, typename ExtraType>
138
8
Status FileHeader<MessageType, ExtraType>::serialize() {
139
    // write to file
140
8
    io::FileWriterPtr file_writer;
141
8
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_path, &file_writer));
142
8
    RETURN_IF_ERROR(file_writer->write_at(
143
8
            0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}));
144
8
    RETURN_IF_ERROR(file_writer->write_at(
145
8
            _fixed_file_header_size,
146
8
            {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}));
147
8
    RETURN_IF_ERROR(file_writer->write_at(_fixed_file_header_size + sizeof(_extra_fixed_header),
148
8
                                          {_proto_string}));
149
8
    return file_writer->close();
150
8
}
151
152
template <typename MessageType, typename ExtraType>
153
5
Status FileHeader<MessageType, ExtraType>::deserialize() {
154
5
    io::FileReaderSPtr file_reader;
155
5
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_path, &file_reader));
156
5
    off_t real_file_length = 0;
157
5
    uint32_t real_protobuf_checksum = 0;
158
5
    size_t bytes_read = 0;
159
5
    RETURN_IF_ERROR(file_reader->read_at(
160
5
            0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}, &bytes_read));
161
5
    DCHECK(_fixed_file_header_size == bytes_read)
162
0
            << " deserialize read bytes dismatch, request bytes " << _fixed_file_header_size
163
0
            << " actual read " << bytes_read;
164
165
    //Status read_at(size_t offset, Slice result, size_t* bytes_read,
166
    //             const IOContext* io_ctx = nullptr);
167
168
5
    if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) {
169
0
        VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number;
170
0
        FixedFileHeader tmp_header;
171
0
        RETURN_IF_ERROR(file_reader->read_at(0, {(const uint8_t*)&tmp_header, sizeof(tmp_header)},
172
0
                                             &bytes_read));
173
0
        DCHECK(sizeof(tmp_header) == bytes_read)
174
0
                << " deserialize read bytes dismatch, request bytes " << sizeof(tmp_header)
175
0
                << " actual read " << bytes_read;
176
0
        _fixed_file_header.file_length = tmp_header.file_length;
177
0
        _fixed_file_header.checksum = tmp_header.checksum;
178
0
        _fixed_file_header.protobuf_length = tmp_header.protobuf_length;
179
0
        _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum;
180
0
        _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
181
0
        _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
182
0
        _fixed_file_header_size = sizeof(tmp_header);
183
0
    }
184
185
5
    VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length
186
0
                << ", checksum=" << _fixed_file_header.checksum
187
0
                << ", protobuf_length=" << _fixed_file_header.protobuf_length
188
0
                << ", magic_number=" << _fixed_file_header.magic_number
189
0
                << ", version=" << _fixed_file_header.version;
190
191
5
    RETURN_IF_ERROR(file_reader->read_at(
192
5
            _fixed_file_header_size,
193
5
            {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}, &bytes_read));
194
195
5
    std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]);
196
5
    if (nullptr == buf) {
197
0
        char errmsg[64];
198
0
        return Status::Error<ErrorCode::MEM_ALLOC_FAILED>(
199
0
                "malloc protobuf buf error. file={}, error={}", file_reader->path().native(),
200
0
                strerror_r(errno, errmsg, 64));
201
0
    }
202
5
    RETURN_IF_ERROR(file_reader->read_at(_fixed_file_header_size + sizeof(_extra_fixed_header),
203
5
                                         {buf.get(), _fixed_file_header.protobuf_length},
204
5
                                         &bytes_read));
205
5
    real_file_length = file_reader->size();
206
207
5
    if (file_length() != static_cast<uint64_t>(real_file_length)) {
208
0
        return Status::Error<ErrorCode::FILE_DATA_ERROR>(
209
0
                "file length is not match. file={}, file_length={}, real_file_length={}",
210
0
                file_reader->path().native(), file_length(), real_file_length);
211
0
    }
212
213
    // check proto checksum
214
5
    real_protobuf_checksum =
215
5
            olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length);
216
217
5
    if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) {
218
        // When compiling using gcc there woule be error like:
219
        // Cannot bind packed field '_FixedFileHeaderV2::protobuf_checksum' to 'unsigned int&'
220
        // so we need to using unary operator+ to evaluate one value to pass
221
        // to status to successfully compile.
222
0
        return Status::Error<ErrorCode::CHECKSUM_ERROR>(
223
0
                "checksum is not match. file={}, expect={}, actual={}",
224
0
                file_reader->path().native(), +_fixed_file_header.protobuf_checksum,
225
0
                real_protobuf_checksum);
226
0
    }
227
228
5
    try {
229
5
        std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length);
230
231
5
        if (!_proto.ParseFromString(protobuf_str)) {
232
0
            return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
233
0
                    "fail to parse file content to protobuf object. file={}",
234
0
                    file_reader->path().native());
235
0
        }
236
5
    } catch (...) {
237
0
        LOG(WARNING) << "fail to load protobuf. file='" << file_reader->path().native();
238
0
        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf. file={}",
239
0
                                                              file_reader->path().native());
240
0
    }
241
242
5
    return Status::OK();
243
5
}
244
245
template <typename MessageType, typename ExtraType>
246
Status FileHeader<MessageType, ExtraType>::validate() {
247
    return deserialize();
248
}
249
250
} // namespace doris