/root/doris/be/src/olap/file_header.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <stdio.h> |
21 | | #include <sys/stat.h> |
22 | | |
23 | | #include <memory> |
24 | | #include <string> |
25 | | #include <vector> |
26 | | |
27 | | #include "io/fs/file_reader.h" |
28 | | #include "io/fs/file_writer.h" |
29 | | #include "io/fs/local_file_system.h" |
30 | | #include "olap/lru_cache.h" |
31 | | #include "olap/olap_common.h" |
32 | | #include "olap/olap_define.h" |
33 | | #include "olap/utils.h" |
34 | | #include "util/debug_util.h" |
35 | | |
36 | | namespace doris { |
37 | | |
38 | | using FixedFileHeader = struct _FixedFileHeader { |
39 | | // the length of the entire file |
40 | | uint32_t file_length; |
41 | | // Checksum of the file's contents except the FileHeader |
42 | | uint32_t checksum; |
43 | | // Protobuf length of section |
44 | | uint32_t protobuf_length; |
45 | | // Checksum of Protobuf part |
46 | | uint32_t protobuf_checksum; |
47 | | } __attribute__((packed)); |
48 | | |
49 | | using FixedFileHeaderV2 = struct _FixedFileHeaderV2 { |
50 | | uint64_t magic_number; |
51 | | uint32_t version; |
52 | | // the length of the entire file |
53 | | uint64_t file_length; |
54 | | // Checksum of the file's contents except the FileHeader |
55 | | uint32_t checksum; |
56 | | // Protobuf length of section |
57 | | uint64_t protobuf_length; |
58 | | // Checksum of Protobuf part |
59 | | uint32_t protobuf_checksum; |
60 | | } __attribute__((packed)); |
61 | | |
62 | | template <typename MessageType, typename ExtraType = uint32_t> |
63 | | class FileHeader { |
64 | | public: |
65 | 33 | FileHeader(const std::string& file_path) : _file_path(file_path) { |
66 | 33 | memset(&_fixed_file_header, 0, sizeof(_fixed_file_header)); |
67 | 33 | memset(&_extra_fixed_header, 0, sizeof(_extra_fixed_header)); |
68 | 33 | _fixed_file_header_size = sizeof(_fixed_file_header); |
69 | 33 | } |
70 | 33 | ~FileHeader() = default; |
71 | | |
72 | | // To calculate the length of the proto part, it needs to be called after the proto is operated, |
73 | | // and prepare must be called before calling serialize |
74 | | Status prepare(); |
75 | | |
76 | | // call prepare() first, serialize() will write fixed header and protobuffer. |
77 | | // Write the header to the starting position of the incoming file handle |
78 | | Status serialize(); |
79 | | |
80 | | // read from file, validate file length, signature and alder32 of protobuffer. |
81 | | // Read the header from the beginning of the incoming file handle |
82 | | Status deserialize(); |
83 | | |
84 | | // Check the validity of Header |
85 | | // it is actually call deserialize(). |
86 | | Status validate(); |
87 | | |
88 | 5 | uint64_t file_length() const { return _fixed_file_header.file_length; } |
89 | | uint32_t checksum() const { return _fixed_file_header.checksum; } |
90 | | const ExtraType& extra() const { return _extra_fixed_header; } |
91 | | ExtraType* mutable_extra() { return &_extra_fixed_header; } |
92 | 5 | const MessageType& message() const { return _proto; } |
93 | 28 | MessageType* mutable_message() { return &_proto; } |
94 | 28 | uint64_t size() const { |
95 | 28 | return _fixed_file_header_size + sizeof(_extra_fixed_header) + |
96 | 28 | _fixed_file_header.protobuf_length; |
97 | 28 | } |
98 | | |
99 | | void set_file_length(uint64_t file_length) { _fixed_file_header.file_length = file_length; } |
100 | | void set_checksum(uint32_t checksum) { _fixed_file_header.checksum = checksum; } |
101 | | |
102 | | private: |
103 | | std::string _file_path; |
104 | | FixedFileHeaderV2 _fixed_file_header; |
105 | | uint32_t _fixed_file_header_size; |
106 | | |
107 | | std::string _proto_string; |
108 | | ExtraType _extra_fixed_header; |
109 | | MessageType _proto; |
110 | | }; |
111 | | |
112 | | // FileHeader implementation |
113 | | template <typename MessageType, typename ExtraType> |
114 | 28 | Status FileHeader<MessageType, ExtraType>::prepare() { |
115 | 28 | try { |
116 | 28 | if (!_proto.SerializeToString(&_proto_string)) { |
117 | 0 | return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( |
118 | 0 | "serialize file header to string error. [path={}]", _file_path); |
119 | 0 | } |
120 | 28 | } catch (...) { |
121 | 0 | return Status::Error<ErrorCode::SERIALIZE_PROTOBUF_ERROR>( |
122 | 0 | "serialize file header to string error. [path={}]", _file_path); |
123 | 0 | } |
124 | | |
125 | 28 | _fixed_file_header.protobuf_checksum = |
126 | 28 | olap_adler32(olap_adler32_init(), _proto_string.c_str(), _proto_string.size()); |
127 | | |
128 | 28 | _fixed_file_header.checksum = 0; |
129 | 28 | _fixed_file_header.protobuf_length = _proto_string.size(); |
130 | 28 | _fixed_file_header.file_length = size(); |
131 | 28 | _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED; |
132 | 28 | _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER; |
133 | | |
134 | 28 | return Status::OK(); |
135 | 28 | } |
136 | | |
137 | | template <typename MessageType, typename ExtraType> |
138 | 28 | Status FileHeader<MessageType, ExtraType>::serialize() { |
139 | | // write to file |
140 | 28 | io::FileWriterPtr file_writer; |
141 | 28 | RETURN_IF_ERROR(io::global_local_filesystem()->create_file(_file_path, &file_writer)); |
142 | 28 | RETURN_IF_ERROR( |
143 | 28 | file_writer->append({(const uint8_t*)&_fixed_file_header, _fixed_file_header_size})); |
144 | 28 | RETURN_IF_ERROR(file_writer->append( |
145 | 28 | {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)})); |
146 | 28 | RETURN_IF_ERROR(file_writer->append({_proto_string})); |
147 | 28 | return file_writer->close(); |
148 | 28 | } |
149 | | |
150 | | template <typename MessageType, typename ExtraType> |
151 | 5 | Status FileHeader<MessageType, ExtraType>::deserialize() { |
152 | 5 | io::FileReaderSPtr file_reader; |
153 | 5 | RETURN_IF_ERROR(io::global_local_filesystem()->open_file(_file_path, &file_reader)); |
154 | 5 | off_t real_file_length = 0; |
155 | 5 | uint32_t real_protobuf_checksum = 0; |
156 | 5 | size_t bytes_read = 0; |
157 | 5 | RETURN_IF_ERROR(file_reader->read_at( |
158 | 5 | 0, {(const uint8_t*)&_fixed_file_header, _fixed_file_header_size}, &bytes_read)); |
159 | 5 | DCHECK(_fixed_file_header_size == bytes_read) |
160 | 0 | << " deserialize read bytes dismatch, request bytes " << _fixed_file_header_size |
161 | 0 | << " actual read " << bytes_read; |
162 | | |
163 | | //Status read_at(size_t offset, Slice result, size_t* bytes_read, |
164 | | // const IOContext* io_ctx = nullptr); |
165 | | |
166 | 5 | if (_fixed_file_header.magic_number != OLAP_FIX_HEADER_MAGIC_NUMBER) { |
167 | 0 | VLOG_TRACE << "old fix header found, magic num=" << _fixed_file_header.magic_number; |
168 | 0 | FixedFileHeader tmp_header; |
169 | 0 | RETURN_IF_ERROR(file_reader->read_at(0, {(const uint8_t*)&tmp_header, sizeof(tmp_header)}, |
170 | 0 | &bytes_read)); |
171 | 0 | DCHECK(sizeof(tmp_header) == bytes_read) |
172 | 0 | << " deserialize read bytes dismatch, request bytes " << sizeof(tmp_header) |
173 | 0 | << " actual read " << bytes_read; |
174 | 0 | _fixed_file_header.file_length = tmp_header.file_length; |
175 | 0 | _fixed_file_header.checksum = tmp_header.checksum; |
176 | 0 | _fixed_file_header.protobuf_length = tmp_header.protobuf_length; |
177 | 0 | _fixed_file_header.protobuf_checksum = tmp_header.protobuf_checksum; |
178 | 0 | _fixed_file_header.magic_number = OLAP_FIX_HEADER_MAGIC_NUMBER; |
179 | 0 | _fixed_file_header.version = OLAP_DATA_VERSION_APPLIED; |
180 | 0 | _fixed_file_header_size = sizeof(tmp_header); |
181 | 0 | } |
182 | | |
183 | 5 | VLOG_NOTICE << "fix head loaded. file_length=" << _fixed_file_header.file_length |
184 | 0 | << ", checksum=" << _fixed_file_header.checksum |
185 | 0 | << ", protobuf_length=" << _fixed_file_header.protobuf_length |
186 | 0 | << ", magic_number=" << _fixed_file_header.magic_number |
187 | 0 | << ", version=" << _fixed_file_header.version; |
188 | | |
189 | 5 | RETURN_IF_ERROR(file_reader->read_at( |
190 | 5 | _fixed_file_header_size, |
191 | 5 | {(const uint8_t*)&_extra_fixed_header, sizeof(_extra_fixed_header)}, &bytes_read)); |
192 | | |
193 | 5 | std::unique_ptr<char[]> buf(new (std::nothrow) char[_fixed_file_header.protobuf_length]); |
194 | 5 | if (nullptr == buf) { |
195 | 0 | char errmsg[64]; |
196 | 0 | return Status::Error<ErrorCode::MEM_ALLOC_FAILED>( |
197 | 0 | "malloc protobuf buf error. file={}, error={}", file_reader->path().native(), |
198 | 0 | strerror_r(errno, errmsg, 64)); |
199 | 0 | } |
200 | 5 | RETURN_IF_ERROR(file_reader->read_at(_fixed_file_header_size + sizeof(_extra_fixed_header), |
201 | 5 | {buf.get(), _fixed_file_header.protobuf_length}, |
202 | 5 | &bytes_read)); |
203 | 5 | real_file_length = file_reader->size(); |
204 | | |
205 | 5 | if (file_length() != static_cast<uint64_t>(real_file_length)) { |
206 | 0 | return Status::InternalError( |
207 | 0 | "file length is not match. file={}, file_length={}, real_file_length={}", |
208 | 0 | file_reader->path().native(), file_length(), real_file_length); |
209 | 0 | } |
210 | | |
211 | | // check proto checksum |
212 | 5 | real_protobuf_checksum = |
213 | 5 | olap_adler32(olap_adler32_init(), buf.get(), _fixed_file_header.protobuf_length); |
214 | | |
215 | 5 | if (real_protobuf_checksum != _fixed_file_header.protobuf_checksum) { |
216 | | // When compiling using gcc there woule be error like: |
217 | | // Cannot bind packed field '_FixedFileHeaderV2::protobuf_checksum' to 'unsigned int&' |
218 | | // so we need to using unary operator+ to evaluate one value to pass |
219 | | // to status to successfully compile. |
220 | 0 | return Status::InternalError("checksum is not match. file={}, expect={}, actual={}", |
221 | 0 | file_reader->path().native(), |
222 | 0 | +_fixed_file_header.protobuf_checksum, real_protobuf_checksum); |
223 | 0 | } |
224 | | |
225 | 5 | try { |
226 | 5 | std::string protobuf_str(buf.get(), _fixed_file_header.protobuf_length); |
227 | | |
228 | 5 | if (!_proto.ParseFromString(protobuf_str)) { |
229 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>( |
230 | 0 | "fail to parse file content to protobuf object. file={}", |
231 | 0 | file_reader->path().native()); |
232 | 0 | } |
233 | 5 | } catch (...) { |
234 | 0 | LOG(WARNING) << "fail to load protobuf. file='" << file_reader->path().native(); |
235 | 0 | return Status::Error<ErrorCode::PARSE_PROTOBUF_ERROR>("fail to load protobuf. file={}", |
236 | 0 | file_reader->path().native()); |
237 | 0 | } |
238 | | |
239 | 5 | return Status::OK(); |
240 | 5 | } |
241 | | |
242 | | template <typename MessageType, typename ExtraType> |
243 | | Status FileHeader<MessageType, ExtraType>::validate() { |
244 | | return deserialize(); |
245 | | } |
246 | | |
247 | | } // namespace doris |