Coverage Report

Created: 2025-07-30 19:44

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/olap/file_header.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdio.h>
21
#include <sys/stat.h>
22
23
#include <memory>
24
#include <string>
25
#include <vector>
26
27
#include "io/fs/file_reader.h"
28
#include "io/fs/file_writer.h"
29
#include "io/fs/local_file_system.h"
30
#include "olap/lru_cache.h"
31
#include "olap/olap_common.h"
32
#include "olap/olap_define.h"
33
#include "olap/utils.h"
34
#include "util/debug_util.h"
35
36
namespace doris {
37
38
using FixedFileHeader = struct _FixedFileHeader {
39
    // the length of the entire file
40
    uint32_t file_length;
41
    // Checksum of the file's contents except the FileHeader
42
    uint32_t checksum;
43
    // Protobuf length of section
44
    uint32_t protobuf_length;
45
    // Checksum of Protobuf part
46
    uint32_t protobuf_checksum;
47
} __attribute__((packed));
48
49
using FixedFileHeaderV2 = struct _FixedFileHeaderV2 {
50
    uint64_t magic_number;
51
    uint32_t version;
52
    // the length of the entire file
53
    uint64_t file_length;
54
    // Checksum of the file's contents except the FileHeader
55
    uint32_t checksum;
56
    // Protobuf length of section
57
    uint64_t protobuf_length;
58
    // Checksum of Protobuf part
59
    uint32_t protobuf_checksum;
60
} __attribute__((packed));
61
62
template <typename MessageType, typename ExtraType = uint32_t>
63
class FileHeader {
64
public:
65
254
    FileHeader(const std::string& file_path) : _file_path(file_path) {
66
254
        memset(&_fixed_file_header, 0, sizeof(_fixed_file_header));
67
254
        memset(&_extra_fixed_header, 0, sizeof(_extra_fixed_header));
68
254
        _fixed_file_header_size = sizeof(_fixed_file_header);
69
254
    }
70
254
    ~FileHeader() = default;
71
72
    // To calculate the length of the proto part, it needs to be called after the proto is operated,
73
    // and prepare must be called before calling serialize
74
    Status prepare();
75
76
    // call prepare() first, serialize() will write fixed header and protobuffer.
77
    // Write the header to the starting position of the incoming file handle
78
    Status serialize();
79
80
    // read from file, validate file length, signature and alder32 of protobuffer.
81
    // Read the header from the beginning of the incoming file handle
82
    Status deserialize();
83
84
    // Check the validity of Header
85
    // it is actually call deserialize().
86
    Status validate();
87
88
    // write to memory
89
    Status serialize_to_memory(uint8_t* buffer, size_t buffer_size);
90
91
    // read from memory
92
    Status deserialize_from_memory(const uint8_t* buffer, size_t buffer_size);
93
94
10
    uint64_t file_length() const { return _fixed_file_header.file_length; }
95
    uint32_t checksum() const { return _fixed_file_header.checksum; }
96
    const ExtraType& extra() const { return _extra_fixed_header; }
97
    ExtraType* mutable_extra() { return &_extra_fixed_header; }
98
12
    const MessageType& message() const { return _proto; }
99
238
    MessageType* mutable_message() { return &_proto; }
100
247
    uint64_t size() const {
101
247
        return _fixed_file_header_size + sizeof(_extra_fixed_header) +
102
247
               _fixed_file_header.protobuf_length;
103
247
    }
104
105
    void set_file_length(uint64_t file_length) { _fixed_file_header.file_length = file_length; }
106
    void set_checksum(uint32_t checksum) { _fixed_file_header.checksum = checksum; }
107
108
private:
109
    std::string _file_path;
110
    FixedFileHeaderV2 _fixed_file_header;
111
    uint32_t _fixed_file_header_size;
112
113
    std::string _proto_string;
114
    ExtraType _extra_fixed_header;
115
    MessageType _proto;
116
};
117
118
// FileHeader implementation
119
template <typename MessageType, typename ExtraType>
120
238
Status FileHeader<MessageType, ExtraType>::prepare() {
121
238
    try {
122
238
        if (!_proto.SerializeToString(&_proto_string)) {
123
0
            return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
124
0
                    "serialize file header to string error. [path={}]", _file_path);
125
0
        }
126
238
    } catch (...) {
127
0
        return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>(
128
0
                "serialize file header to string error. [path={}]", _file_path);
129
0
    }
130
131
238
    _fixed_file_header.protobuf_checksum =
132
238
            olap_adler32(olap_adler32_init(), _proto_string.c_str(), _proto_string.size());
133
134
238
    _fixed_file_header.checksum = 0;
135
238
    _fixed_file_header.protobuf_length = _proto_string.size();
136
238
    _fixed_file_header.file_length = size();
137
238
    _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
138
238
    _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
139
140
238
    return Status::OK();
141
238
}
142
143
template <typename MessageType, typename ExtraType>
144
234
Status FileHeader<MessageType, ExtraType>::serialize() {
145
    // write to file
146
234
    io::FileWriterPtr file_writer;
147
234
    RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_path, &file_writer));
148
234
    RETURN_IF_ERROR(
149
234
            file_writer->append({(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}));
150
234
    RETURN_IF_ERROR(file_writer->append(
151
234
            {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}));
152
234
    RETURN_IF_ERROR(file_writer->append({_proto_string}));
153
234
    return file_writer->close();
154
234
}
155
156
template <typename MessageType, typename ExtraType>
157
Status FileHeader<MessageType, ExtraType>::serialize_to_memory(uint8_t* buffer,
158
4
                                                               size_t buffer_size) {
159
4
    if (buffer_size < size()) {
160
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>(
161
1
                "buffer size is too small. required={}, provided={}", size(), buffer_size);
162
1
    }
163
164
3
    uint8_t* ptr = buffer;
165
3
    memcpy(ptr, &_fixed_file_header, _fixed_file_header_size);
166
3
    ptr += _fixed_file_header_size;
167
168
3
    memcpy(ptr, &_extra_fixed_header, sizeof(_extra_fixed_header));
169
3
    ptr += sizeof(_extra_fixed_header);
170
171
3
    memcpy(ptr, _proto_string.data(), _proto_string.size());
172
3
    return Status::OK();
173
4
}
174
175
template <typename MessageType, typename ExtraType>
176
10
Status FileHeader<MessageType, ExtraType>::deserialize() {
177
10
    io::FileReaderSPtr file_reader;
178
10
    RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_path, &file_reader));
179
10
    off_t real_file_length = 0;
180
10
    uint32_t real_protobuf_checksum = 0;
181
10
    size_t bytes_read = 0;
182
10
    RETURN_IF_ERROR(file_reader->read_at(
183
10
            0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}, &bytes_read));
184
10
    DCHECK(_fixed_file_header_size == bytes_read)
185
0
            << " deserialize read bytes dismatch, request bytes " << _fixed_file_header_size
186
0
            << " actual read " << bytes_read;
187
188
    //Status read_at(size_t offset, Slice result, size_t* bytes_read,
189
    //             const IOContext* io_ctx = nullptr);
190
191
10
    if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) {
192
0
        VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number;
193
0
        FixedFileHeader tmp_header;
194
0
        RETURN_IF_ERROR(file_reader->read_at(0, {(const uint8_t*)&tmp_header, sizeof(tmp_header)},
195
0
                                             &bytes_read));
196
0
        DCHECK(sizeof(tmp_header) == bytes_read)
197
0
                << " deserialize read bytes dismatch, request bytes " << sizeof(tmp_header)
198
0
                << " actual read " << bytes_read;
199
0
        _fixed_file_header.file_length = tmp_header.file_length;
200
0
        _fixed_file_header.checksum = tmp_header.checksum;
201
0
        _fixed_file_header.protobuf_length = tmp_header.protobuf_length;
202
0
        _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum;
203
0
        _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
204
0
        _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
205
0
        _fixed_file_header_size = sizeof(tmp_header);
206
0
    }
207
208
10
    VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length
209
0
                << ", checksum=" << _fixed_file_header.checksum
210
0
                << ", protobuf_length=" << _fixed_file_header.protobuf_length
211
0
                << ", magic_number=" << _fixed_file_header.magic_number
212
0
                << ", version=" << _fixed_file_header.version;
213
214
10
    RETURN_IF_ERROR(file_reader->read_at(
215
10
            _fixed_file_header_size,
216
10
            {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}, &bytes_read));
217
218
10
    std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]);
219
10
    if (nullptr == buf) {
220
0
        char errmsg[64];
221
0
        return Status::Error<ErrorCode::MEM_ALLOC_FAILED>(
222
0
                "malloc protobuf buf error. file={}, error={}", file_reader->path().native(),
223
0
                strerror_r(errno, errmsg, 64));
224
0
    }
225
10
    RETURN_IF_ERROR(file_reader->read_at(_fixed_file_header_size + sizeof(_extra_fixed_header),
226
10
                                         {buf.get(), _fixed_file_header.protobuf_length},
227
10
                                         &bytes_read));
228
10
    real_file_length = file_reader->size();
229
230
10
    if (file_length() != static_cast<uint64_t>(real_file_length)) {
231
0
        return Status::InternalError(
232
0
                "file length is not match. file={}, file_length={}, real_file_length={}",
233
0
                file_reader->path().native(), file_length(), real_file_length);
234
0
    }
235
236
    // check proto checksum
237
10
    real_protobuf_checksum =
238
10
            olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length);
239
240
10
    if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) {
241
        // When compiling using gcc there woule be error like:
242
        // Cannot bind packed field '_FixedFileHeaderV2::protobuf_checksum' to 'unsigned int&'
243
        // so we need to using unary operator+ to evaluate one value to pass
244
        // to status to successfully compile.
245
0
        return Status::InternalError("checksum is not match. file={}, expect={}, actual={}",
246
0
                                     file_reader->path().native(),
247
0
                                     +_fixed_file_header.protobuf_checksum, real_protobuf_checksum);
248
0
    }
249
250
10
    try {
251
10
        std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length);
252
253
10
        if (!_proto.ParseFromString(protobuf_str)) {
254
0
            return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
255
0
                    "fail to parse file content to protobuf object. file={}",
256
0
                    file_reader->path().native());
257
0
        }
258
10
    } catch (...) {
259
0
        LOG(WARNING) << "fail to load protobuf. file='" << file_reader->path().native();
260
0
        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf. file={}",
261
0
                                                              file_reader->path().native());
262
0
    }
263
264
10
    return Status::OK();
265
10
}
266
267
template <typename MessageType, typename ExtraType>
268
Status FileHeader<MessageType, ExtraType>::deserialize_from_memory(const uint8_t* buffer,
269
6
                                                                   size_t buffer_size) {
270
6
    if (buffer_size < sizeof(FixedFileHeaderV2)) {
271
1
        return Status::Error<ErrorCode::INVALID_ARGUMENT>(
272
1
                "buffer size is too small to contain a valid header. provided={}", buffer_size);
273
1
    }
274
275
5
    const uint8_t* ptr = buffer;
276
5
    memcpy(&_fixed_file_header, ptr, _fixed_file_header_size);
277
5
    ptr += _fixed_file_header_size;
278
279
5
    if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) {
280
1
        VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number;
281
1
        if (buffer_size < sizeof(FixedFileHeader)) {
282
0
            return Status::Error<ErrorCode::INVALID_ARGUMENT>(
283
0
                    "buffer size is too small to contain a valid old header. provided={}",
284
0
                    buffer_size);
285
0
        }
286
1
        FixedFileHeader tmp_header;
287
1
        memcpy(&tmp_header, buffer, sizeof(tmp_header));
288
1
        _fixed_file_header.file_length = tmp_header.file_length;
289
1
        _fixed_file_header.checksum = tmp_header.checksum;
290
1
        _fixed_file_header.protobuf_length = tmp_header.protobuf_length;
291
1
        _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum;
292
1
        _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER;
293
1
        _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED;
294
1
        _fixed_file_header_size = sizeof(tmp_header);
295
1
        ptr = buffer + sizeof(tmp_header);
296
1
    }
297
298
5
    VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length
299
0
                << ", checksum=" << _fixed_file_header.checksum
300
0
                << ", protobuf_length=" << _fixed_file_header.protobuf_length
301
0
                << ", magic_number=" << _fixed_file_header.magic_number
302
0
                << ", version=" << _fixed_file_header.version;
303
304
5
    if (buffer_size < _fixed_file_header_size + sizeof(_extra_fixed_header) +
305
5
                              _fixed_file_header.protobuf_length) {
306
0
        return Status::Error<ErrorCode::INVALID_ARGUMENT>(
307
0
                "buffer size is too small to contain the entire header and protobuf. required={}, "
308
0
                "provided={}",
309
0
                _fixed_file_header_size + sizeof(_extra_fixed_header) +
310
0
                        _fixed_file_header.protobuf_length,
311
0
                buffer_size);
312
0
    }
313
314
5
    memcpy(&_extra_fixed_header, ptr, sizeof(_extra_fixed_header));
315
5
    ptr += sizeof(_extra_fixed_header);
316
317
5
    std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]);
318
5
    if (nullptr == buf) {
319
0
        char errmsg[64];
320
0
        return Status::Error<ErrorCode::MEM_ALLOC_FAILED>("malloc protobuf buf error. error={}",
321
0
                                                          strerror_r(errno, errmsg, 64));
322
0
    }
323
5
    memcpy(buf.get(), ptr, _fixed_file_header.protobuf_length);
324
325
    // check proto checksum
326
5
    uint32_t real_protobuf_checksum =
327
5
            olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length);
328
329
5
    if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) {
330
3
        return Status::InternalError("checksum is not match. expect={}, actual={}",
331
3
                                     +_fixed_file_header.protobuf_checksum, real_protobuf_checksum);
332
3
    }
333
334
2
    try {
335
2
        std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length);
336
337
2
        if (!_proto.ParseFromString(protobuf_str)) {
338
0
            return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>(
339
0
                    "fail to parse buffer content to protobuf object.");
340
0
        }
341
2
    } catch (...) {
342
0
        LOG(WARNING) << "fail to load protobuf from buffer.";
343
0
        return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf from buffer.");
344
0
    }
345
346
2
    return Status::OK();
347
2
}
348
349
template <typename MessageType, typename ExtraType>
350
Status FileHeader<MessageType, ExtraType>::validate() {
351
    return deserialize();
352
}
353
354
} // namespace doris