/root/doris/be/src/olap/file_header.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stdio.h> |
21 | | #include <sys/stat.h> |
22 | | |
23 | | #include <memory> |
24 | | #include <string> |
25 | | #include <vector> |
26 | | |
27 | | #include "io/fs/file_reader.h" |
28 | | #include "io/fs/file_writer.h" |
29 | | #include "io/fs/local_file_system.h" |
30 | | #include "olap/lru_cache.h" |
31 | | #include "olap/olap_common.h" |
32 | | #include "olap/olap_define.h" |
33 | | #include "olap/utils.h" |
34 | | #include "util/debug_util.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | using FixedFileHeader = struct _FixedFileHeader { |
39 | | // the length of the entire file |
40 | | uint32_t file_length; |
41 | | // Checksum of the file's contents except the FileHeader |
42 | | uint32_t checksum; |
43 | | // Protobuf length of section |
44 | | uint32_t protobuf_length; |
45 | | // Checksum of Protobuf part |
46 | | uint32_t protobuf_checksum; |
47 | | } __attribute__((packed)); |
48 | | |
49 | | using FixedFileHeaderV2 = struct _FixedFileHeaderV2 { |
50 | | uint64_t magic_number; |
51 | | uint32_t version; |
52 | | // the length of the entire file |
53 | | uint64_t file_length; |
54 | | // Checksum of the file's contents except the FileHeader |
55 | | uint32_t checksum; |
56 | | // Protobuf length of section |
57 | | uint64_t protobuf_length; |
58 | | // Checksum of Protobuf part |
59 | | uint32_t protobuf_checksum; |
60 | | } __attribute__((packed)); |
61 | | |
62 | | template <typename MessageType, typename ExtraType = uint32_t> |
63 | | class FileHeader { |
64 | | public: |
65 | 254 | FileHeader(const std::string& file_path) : _file_path(file_path) { |
66 | 254 | memset(&_fixed_file_header, 0, sizeof(_fixed_file_header)); |
67 | 254 | memset(&_extra_fixed_header, 0, sizeof(_extra_fixed_header)); |
68 | 254 | _fixed_file_header_size = sizeof(_fixed_file_header); |
69 | 254 | } |
70 | 254 | ~FileHeader() = default; |
71 | | |
72 | | // To calculate the length of the proto part, it needs to be called after the proto is operated, |
73 | | // and prepare must be called before calling serialize |
74 | | Status prepare(); |
75 | | |
76 | | // call prepare() first, serialize() will write fixed header and protobuffer. |
77 | | // Write the header to the starting position of the incoming file handle |
78 | | Status serialize(); |
79 | | |
80 | | // read from file, validate file length, signature and alder32 of protobuffer. |
81 | | // Read the header from the beginning of the incoming file handle |
82 | | Status deserialize(); |
83 | | |
84 | | // Check the validity of Header |
85 | | // it is actually call deserialize(). |
86 | | Status validate(); |
87 | | |
88 | | // write to memory |
89 | | Status serialize_to_memory(uint8_t* buffer, size_t buffer_size); |
90 | | |
91 | | // read from memory |
92 | | Status deserialize_from_memory(const uint8_t* buffer, size_t buffer_size); |
93 | | |
94 | 10 | uint64_t file_length() const { return _fixed_file_header.file_length; } |
95 | | uint32_t checksum() const { return _fixed_file_header.checksum; } |
96 | | const ExtraType& extra() const { return _extra_fixed_header; } |
97 | | ExtraType* mutable_extra() { return &_extra_fixed_header; } |
98 | 12 | const MessageType& message() const { return _proto; } |
99 | 238 | MessageType* mutable_message() { return &_proto; } |
100 | 247 | uint64_t size() const { |
101 | 247 | return _fixed_file_header_size + sizeof(_extra_fixed_header) + |
102 | 247 | _fixed_file_header.protobuf_length; |
103 | 247 | } |
104 | | |
105 | | void set_file_length(uint64_t file_length) { _fixed_file_header.file_length = file_length; } |
106 | | void set_checksum(uint32_t checksum) { _fixed_file_header.checksum = checksum; } |
107 | | |
108 | | private: |
109 | | std::string _file_path; |
110 | | FixedFileHeaderV2 _fixed_file_header; |
111 | | uint32_t _fixed_file_header_size; |
112 | | |
113 | | std::string _proto_string; |
114 | | ExtraType _extra_fixed_header; |
115 | | MessageType _proto; |
116 | | }; |
117 | | |
118 | | // FileHeader implementation |
119 | | template <typename MessageType, typename ExtraType> |
120 | 238 | Status FileHeader<MessageType, ExtraType>::prepare() { |
121 | 238 | try { |
122 | 238 | if (!_proto.SerializeToString(&_proto_string)) { |
123 | 0 | return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( |
124 | 0 | "serialize file header to string error. [path={}]", _file_path); |
125 | 0 | } |
126 | 238 | } catch (...) { |
127 | 0 | return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( |
128 | 0 | "serialize file header to string error. [path={}]", _file_path); |
129 | 0 | } |
130 | | |
131 | 238 | _fixed_file_header.protobuf_checksum = |
132 | 238 | olap_adler32(olap_adler32_init(), _proto_string.c_str(), _proto_string.size()); |
133 | | |
134 | 238 | _fixed_file_header.checksum = 0; |
135 | 238 | _fixed_file_header.protobuf_length = _proto_string.size(); |
136 | 238 | _fixed_file_header.file_length = size(); |
137 | 238 | _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED; |
138 | 238 | _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER; |
139 | | |
140 | 238 | return Status::OK(); |
141 | 238 | } |
142 | | |
143 | | template <typename MessageType, typename ExtraType> |
144 | 234 | Status FileHeader<MessageType, ExtraType>::serialize() { |
145 | | // write to file |
146 | 234 | io::FileWriterPtr file_writer; |
147 | 234 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_path, &file_writer)); |
148 | 234 | RETURN_IF_ERROR( |
149 | 234 | file_writer->append({(const uint8_t*)&_fixed_file_header, _fixed_file_header_size})); |
150 | 234 | RETURN_IF_ERROR(file_writer->append( |
151 | 234 | {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)})); |
152 | 234 | RETURN_IF_ERROR(file_writer->append({_proto_string})); |
153 | 234 | return file_writer->close(); |
154 | 234 | } |
155 | | |
156 | | template <typename MessageType, typename ExtraType> |
157 | | Status FileHeader<MessageType, ExtraType>::serialize_to_memory(uint8_t* buffer, |
158 | 4 | size_t buffer_size) { |
159 | 4 | if (buffer_size < size()) { |
160 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>( |
161 | 1 | "buffer size is too small. required={}, provided={}", size(), buffer_size); |
162 | 1 | } |
163 | | |
164 | 3 | uint8_t* ptr = buffer; |
165 | 3 | memcpy(ptr, &_fixed_file_header, _fixed_file_header_size); |
166 | 3 | ptr += _fixed_file_header_size; |
167 | | |
168 | 3 | memcpy(ptr, &_extra_fixed_header, sizeof(_extra_fixed_header)); |
169 | 3 | ptr += sizeof(_extra_fixed_header); |
170 | | |
171 | 3 | memcpy(ptr, _proto_string.data(), _proto_string.size()); |
172 | 3 | return Status::OK(); |
173 | 4 | } |
174 | | |
175 | | template <typename MessageType, typename ExtraType> |
176 | 10 | Status FileHeader<MessageType, ExtraType>::deserialize() { |
177 | 10 | io::FileReaderSPtr file_reader; |
178 | 10 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_path, &file_reader)); |
179 | 10 | off_t real_file_length = 0; |
180 | 10 | uint32_t real_protobuf_checksum = 0; |
181 | 10 | size_t bytes_read = 0; |
182 | 10 | RETURN_IF_ERROR(file_reader->read_at( |
183 | 10 | 0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}, &bytes_read)); |
184 | 10 | DCHECK(_fixed_file_header_size == bytes_read) |
185 | 0 | << " deserialize read bytes dismatch, request bytes " << _fixed_file_header_size |
186 | 0 | << " actual read " << bytes_read; |
187 | | |
188 | | //Status read_at(size_t offset, Slice result, size_t* bytes_read, |
189 | | // const IOContext* io_ctx = nullptr); |
190 | | |
191 | 10 | if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) { |
192 | 0 | VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number; |
193 | 0 | FixedFileHeader tmp_header; |
194 | 0 | RETURN_IF_ERROR(file_reader->read_at(0, {(const uint8_t*)&tmp_header, sizeof(tmp_header)}, |
195 | 0 | &bytes_read)); |
196 | 0 | DCHECK(sizeof(tmp_header) == bytes_read) |
197 | 0 | << " deserialize read bytes dismatch, request bytes " << sizeof(tmp_header) |
198 | 0 | << " actual read " << bytes_read; |
199 | 0 | _fixed_file_header.file_length = tmp_header.file_length; |
200 | 0 | _fixed_file_header.checksum = tmp_header.checksum; |
201 | 0 | _fixed_file_header.protobuf_length = tmp_header.protobuf_length; |
202 | 0 | _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum; |
203 | 0 | _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER; |
204 | 0 | _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED; |
205 | 0 | _fixed_file_header_size = sizeof(tmp_header); |
206 | 0 | } |
207 | | |
208 | 10 | VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length |
209 | 0 | << ", checksum=" << _fixed_file_header.checksum |
210 | 0 | << ", protobuf_length=" << _fixed_file_header.protobuf_length |
211 | 0 | << ", magic_number=" << _fixed_file_header.magic_number |
212 | 0 | << ", version=" << _fixed_file_header.version; |
213 | | |
214 | 10 | RETURN_IF_ERROR(file_reader->read_at( |
215 | 10 | _fixed_file_header_size, |
216 | 10 | {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}, &bytes_read)); |
217 | | |
218 | 10 | std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]); |
219 | 10 | if (nullptr == buf) { |
220 | 0 | char errmsg[64]; |
221 | 0 | return Status::Error<ErrorCode::MEM_ALLOC_FAILED>( |
222 | 0 | "malloc protobuf buf error. file={}, error={}", file_reader->path().native(), |
223 | 0 | strerror_r(errno, errmsg, 64)); |
224 | 0 | } |
225 | 10 | RETURN_IF_ERROR(file_reader->read_at(_fixed_file_header_size + sizeof(_extra_fixed_header), |
226 | 10 | {buf.get(), _fixed_file_header.protobuf_length}, |
227 | 10 | &bytes_read)); |
228 | 10 | real_file_length = file_reader->size(); |
229 | | |
230 | 10 | if (file_length() != static_cast<uint64_t>(real_file_length)) { |
231 | 0 | return Status::InternalError( |
232 | 0 | "file length is not match. file={}, file_length={}, real_file_length={}", |
233 | 0 | file_reader->path().native(), file_length(), real_file_length); |
234 | 0 | } |
235 | | |
236 | | // check proto checksum |
237 | 10 | real_protobuf_checksum = |
238 | 10 | olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length); |
239 | | |
240 | 10 | if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) { |
241 | | // When compiling using gcc there woule be error like: |
242 | | // Cannot bind packed field '_FixedFileHeaderV2::protobuf_checksum' to 'unsigned int&' |
243 | | // so we need to using unary operator+ to evaluate one value to pass |
244 | | // to status to successfully compile. |
245 | 0 | return Status::InternalError("checksum is not match. file={}, expect={}, actual={}", |
246 | 0 | file_reader->path().native(), |
247 | 0 | +_fixed_file_header.protobuf_checksum, real_protobuf_checksum); |
248 | 0 | } |
249 | | |
250 | 10 | try { |
251 | 10 | std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length); |
252 | | |
253 | 10 | if (!_proto.ParseFromString(protobuf_str)) { |
254 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>( |
255 | 0 | "fail to parse file content to protobuf object. file={}", |
256 | 0 | file_reader->path().native()); |
257 | 0 | } |
258 | 10 | } catch (...) { |
259 | 0 | LOG(WARNING) << "fail to load protobuf. file='" << file_reader->path().native(); |
260 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf. file={}", |
261 | 0 | file_reader->path().native()); |
262 | 0 | } |
263 | | |
264 | 10 | return Status::OK(); |
265 | 10 | } |
266 | | |
267 | | template <typename MessageType, typename ExtraType> |
268 | | Status FileHeader<MessageType, ExtraType>::deserialize_from_memory(const uint8_t* buffer, |
269 | 6 | size_t buffer_size) { |
270 | 6 | if (buffer_size < sizeof(FixedFileHeaderV2)) { |
271 | 1 | return Status::Error<ErrorCode::INVALID_ARGUMENT>( |
272 | 1 | "buffer size is too small to contain a valid header. provided={}", buffer_size); |
273 | 1 | } |
274 | | |
275 | 5 | const uint8_t* ptr = buffer; |
276 | 5 | memcpy(&_fixed_file_header, ptr, _fixed_file_header_size); |
277 | 5 | ptr += _fixed_file_header_size; |
278 | | |
279 | 5 | if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) { |
280 | 1 | VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number; |
281 | 1 | if (buffer_size < sizeof(FixedFileHeader)) { |
282 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT>( |
283 | 0 | "buffer size is too small to contain a valid old header. provided={}", |
284 | 0 | buffer_size); |
285 | 0 | } |
286 | 1 | FixedFileHeader tmp_header; |
287 | 1 | memcpy(&tmp_header, buffer, sizeof(tmp_header)); |
288 | 1 | _fixed_file_header.file_length = tmp_header.file_length; |
289 | 1 | _fixed_file_header.checksum = tmp_header.checksum; |
290 | 1 | _fixed_file_header.protobuf_length = tmp_header.protobuf_length; |
291 | 1 | _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum; |
292 | 1 | _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER; |
293 | 1 | _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED; |
294 | 1 | _fixed_file_header_size = sizeof(tmp_header); |
295 | 1 | ptr = buffer + sizeof(tmp_header); |
296 | 1 | } |
297 | | |
298 | 5 | VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length |
299 | 0 | << ", checksum=" << _fixed_file_header.checksum |
300 | 0 | << ", protobuf_length=" << _fixed_file_header.protobuf_length |
301 | 0 | << ", magic_number=" << _fixed_file_header.magic_number |
302 | 0 | << ", version=" << _fixed_file_header.version; |
303 | | |
304 | 5 | if (buffer_size < _fixed_file_header_size + sizeof(_extra_fixed_header) + |
305 | 5 | _fixed_file_header.protobuf_length) { |
306 | 0 | return Status::Error<ErrorCode::INVALID_ARGUMENT>( |
307 | 0 | "buffer size is too small to contain the entire header and protobuf. required={}, " |
308 | 0 | "provided={}", |
309 | 0 | _fixed_file_header_size + sizeof(_extra_fixed_header) + |
310 | 0 | _fixed_file_header.protobuf_length, |
311 | 0 | buffer_size); |
312 | 0 | } |
313 | | |
314 | 5 | memcpy(&_extra_fixed_header, ptr, sizeof(_extra_fixed_header)); |
315 | 5 | ptr += sizeof(_extra_fixed_header); |
316 | | |
317 | 5 | std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]); |
318 | 5 | if (nullptr == buf) { |
319 | 0 | char errmsg[64]; |
320 | 0 | return Status::Error<ErrorCode::MEM_ALLOC_FAILED>("malloc protobuf buf error. error={}", |
321 | 0 | strerror_r(errno, errmsg, 64)); |
322 | 0 | } |
323 | 5 | memcpy(buf.get(), ptr, _fixed_file_header.protobuf_length); |
324 | | |
325 | | // check proto checksum |
326 | 5 | uint32_t real_protobuf_checksum = |
327 | 5 | olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length); |
328 | | |
329 | 5 | if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) { |
330 | 3 | return Status::InternalError("checksum is not match. expect={}, actual={}", |
331 | 3 | +_fixed_file_header.protobuf_checksum, real_protobuf_checksum); |
332 | 3 | } |
333 | | |
334 | 2 | try { |
335 | 2 | std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length); |
336 | | |
337 | 2 | if (!_proto.ParseFromString(protobuf_str)) { |
338 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>( |
339 | 0 | "fail to parse buffer content to protobuf object."); |
340 | 0 | } |
341 | 2 | } catch (...) { |
342 | 0 | LOG(WARNING) << "fail to load protobuf from buffer."; |
343 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf from buffer."); |
344 | 0 | } |
345 | | |
346 | 2 | return Status::OK(); |
347 | 2 | } |
348 | | |
349 | | template <typename MessageType, typename ExtraType> |
350 | | Status FileHeader<MessageType, ExtraType>::validate() { |
351 | | return deserialize(); |
352 | | } |
353 | | |
354 | | } // namespace doris |