be/src/load/group_commit/wal/wal_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/group_commit/wal/wal_writer.h" |
19 | | |
20 | | #include <crc32c/crc32c.h> |
21 | | #include <gen_cpp/AgentService_types.h> |
22 | | #include <gen_cpp/FrontendService_types.h> |
23 | | |
24 | | #include "common/config.h" |
25 | | #include "common/status.h" |
26 | | #include "io/fs/encrypted_fs_factory.h" |
27 | | #include "io/fs/file_system.h" |
28 | | #include "io/fs/file_writer.h" |
29 | | #include "io/fs/local_file_system.h" |
30 | | #include "io/fs/path.h" |
31 | | #include "load/group_commit/wal/wal_manager.h" |
32 | | #include "storage/storage_engine.h" |
33 | | #include "util/thrift_rpc_helper.h" |
34 | | |
35 | | namespace doris { |
36 | | |
37 | | const char* k_wal_magic = "WAL1"; |
38 | | const uint32_t k_wal_magic_length = 4; |
39 | | |
40 | 1 | WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {} |
41 | | |
42 | 1 | WalWriter::~WalWriter() {} |
43 | | |
44 | 1 | Status determine_wal_fs(int64_t db_id, int64_t tb_id, io::FileSystemSPtr& fs) { |
45 | 1 | if (!config::enable_wal_tde) { |
46 | 1 | fs = io::global_local_filesystem(); |
47 | 1 | return Status::OK(); |
48 | 1 | } |
49 | | |
50 | | #ifndef BE_TEST |
51 | | TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr; |
52 | | TGetTableTDEInfoRequest req; |
53 | | req.__set_db_id(db_id); |
54 | | req.__set_table_id(tb_id); |
55 | | TGetTableTDEInfoResult ret; |
56 | | RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>( |
57 | | master_addr.hostname, master_addr.port, |
58 | | [&req, &ret](FrontendServiceConnection& client) { |
59 | | client->getTableTDEInfo(ret, req); |
60 | | })); |
61 | | if (auto st = Status::create(ret.status); !st) { |
62 | | return st; |
63 | | } |
64 | | auto encrypt_algorithm = [&ret]() -> EncryptionAlgorithmPB { |
65 | | switch (ret.algorithm) { |
66 | | case doris::TEncryptionAlgorithm::AES256: |
67 | | return EncryptionAlgorithmPB::AES_256_CTR; |
68 | | case doris::TEncryptionAlgorithm::SM4: |
69 | | return EncryptionAlgorithmPB::SM4_128_CTR; |
70 | | default: |
71 | | return EncryptionAlgorithmPB::PLAINTEXT; |
72 | | } |
73 | | }(); |
74 | | |
75 | | auto local_fs = io::global_local_filesystem(); |
76 | | fs = io::make_file_system(local_fs, encrypt_algorithm); |
77 | | #else |
78 | 0 | fs = io::global_local_filesystem(); |
79 | 0 | #endif |
80 | |
|
81 | 0 | return Status::OK(); |
82 | 1 | } |
83 | | |
84 | 1 | Status WalWriter::init(const io::FileSystemSPtr& fs) { |
85 | 1 | io::Path wal_path = _file_name; |
86 | 1 | auto parent_path = wal_path.parent_path(); |
87 | 1 | bool exists = false; |
88 | 1 | RETURN_IF_ERROR(fs->exists(parent_path, &exists)); |
89 | 1 | if (!exists) { |
90 | 0 | RETURN_IF_ERROR(fs->create_directory(parent_path)); |
91 | 0 | } |
92 | 1 | RETURN_IF_ERROR(fs->create_file(_file_name, &_file_writer)); |
93 | 1 | LOG(INFO) << "create wal " << _file_name; |
94 | 1 | return Status::OK(); |
95 | 1 | } |
96 | | |
97 | 1 | Status WalWriter::finalize() { |
98 | 1 | if (!_file_writer) { |
99 | 0 | return Status::InternalError("wal writer is null,fail to close file={}", _file_name); |
100 | 0 | } |
101 | 1 | auto st = _file_writer->close(); |
102 | 1 | if (!st.ok()) { |
103 | 0 | LOG(WARNING) << "fail to close wal " << _file_name; |
104 | 0 | } |
105 | 1 | return Status::OK(); |
106 | 1 | } |
107 | | |
108 | 2 | Status WalWriter::append_blocks(const PBlockArray& blocks) { |
109 | 2 | if (!_file_writer) { |
110 | 0 | return Status::InternalError("wal writer is null,fail to write file={}", _file_name); |
111 | 0 | } |
112 | 2 | size_t total_size = 0; |
113 | 2 | size_t offset = 0; |
114 | 3 | for (const auto& block : blocks) { |
115 | 3 | uint8_t len_buf[sizeof(uint64_t)]; |
116 | 3 | uint64_t block_length = block->ByteSizeLong(); |
117 | 3 | total_size += LENGTH_SIZE + block_length + CHECKSUM_SIZE; |
118 | 3 | encode_fixed64_le(len_buf, block_length); |
119 | 3 | RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)})); |
120 | 3 | offset += LENGTH_SIZE; |
121 | | |
122 | 3 | std::string content = block->SerializeAsString(); |
123 | 3 | RETURN_IF_ERROR(_file_writer->append(content)); |
124 | 3 | offset += block_length; |
125 | | |
126 | 3 | uint8_t checksum_buf[sizeof(uint32_t)]; |
127 | 3 | uint32_t checksum = crc32c::Crc32c(content.data(), block_length); |
128 | 3 | encode_fixed32_le(checksum_buf, checksum); |
129 | 3 | RETURN_IF_ERROR(_file_writer->append({checksum_buf, sizeof(uint32_t)})); |
130 | 3 | offset += CHECKSUM_SIZE; |
131 | 3 | } |
132 | 2 | if (offset != total_size) { |
133 | 0 | return Status::InternalError( |
134 | 0 | "failed to write block to wal expected= " + std::to_string(total_size) + |
135 | 0 | ",actually=" + std::to_string(offset)); |
136 | 0 | } |
137 | 2 | return Status::OK(); |
138 | 2 | } |
139 | | |
140 | 0 | Status WalWriter::append_header(std::string col_ids) { |
141 | 0 | if (!_file_writer) { |
142 | 0 | return Status::InternalError("wal writer is null,fail to write file={}", _file_name); |
143 | 0 | } |
144 | 0 | size_t total_size = 0; |
145 | 0 | uint64_t length = col_ids.size(); |
146 | 0 | total_size += k_wal_magic_length; |
147 | 0 | total_size += VERSION_SIZE; |
148 | 0 | total_size += LENGTH_SIZE; |
149 | 0 | total_size += length; |
150 | 0 | size_t offset = 0; |
151 | 0 | RETURN_IF_ERROR(_file_writer->append({k_wal_magic, k_wal_magic_length})); |
152 | 0 | offset += k_wal_magic_length; |
153 | |
|
154 | 0 | uint8_t version_buf[sizeof(uint32_t)]; |
155 | 0 | encode_fixed32_le(version_buf, WAL_VERSION); |
156 | 0 | RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)})); |
157 | 0 | offset += VERSION_SIZE; |
158 | 0 | uint8_t len_buf[sizeof(uint64_t)]; |
159 | 0 | encode_fixed64_le(len_buf, length); |
160 | 0 | RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)})); |
161 | 0 | offset += LENGTH_SIZE; |
162 | 0 | RETURN_IF_ERROR(_file_writer->append(col_ids)); |
163 | 0 | offset += length; |
164 | 0 | if (offset != total_size) { |
165 | 0 | return Status::InternalError( |
166 | 0 | "failed to write header to wal expected= " + std::to_string(total_size) + |
167 | 0 | ",actually=" + std::to_string(offset)); |
168 | 0 | } |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | | |
172 | | } // namespace doris |