Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/group_commit/wal/wal_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/group_commit/wal/wal_writer.h"
19
20
#include <crc32c/crc32c.h>
21
#include <gen_cpp/AgentService_types.h>
22
#include <gen_cpp/FrontendService_types.h>
23
24
#include "common/config.h"
25
#include "common/status.h"
26
#include "io/fs/encrypted_fs_factory.h"
27
#include "io/fs/file_system.h"
28
#include "io/fs/file_writer.h"
29
#include "io/fs/local_file_system.h"
30
#include "io/fs/path.h"
31
#include "load/group_commit/wal/wal_manager.h"
32
#include "storage/storage_engine.h"
33
#include "util/thrift_rpc_helper.h"
34
35
namespace doris {
36
37
const char* k_wal_magic = "WAL1";
38
const uint32_t k_wal_magic_length = 4;
39
40
1
WalWriter::WalWriter(const std::string& file_name) : _file_name(file_name) {}
41
42
1
WalWriter::~WalWriter() {}
43
44
1
Status determine_wal_fs(int64_t db_id, int64_t tb_id, io::FileSystemSPtr& fs) {
45
1
    if (!config::enable_wal_tde) {
46
1
        fs = io::global_local_filesystem();
47
1
        return Status::OK();
48
1
    }
49
50
#ifndef BE_TEST
51
    TNetworkAddress master_addr = ExecEnv::GetInstance()->cluster_info()->master_fe_addr;
52
    TGetTableTDEInfoRequest req;
53
    req.__set_db_id(db_id);
54
    req.__set_table_id(tb_id);
55
    TGetTableTDEInfoResult ret;
56
    RETURN_IF_ERROR(ThriftRpcHelper::rpc<FrontendServiceClient>(
57
            master_addr.hostname, master_addr.port,
58
            [&req, &ret](FrontendServiceConnection& client) {
59
                client->getTableTDEInfo(ret, req);
60
            }));
61
    if (auto st = Status::create(ret.status); !st) {
62
        return st;
63
    }
64
    auto encrypt_algorithm = [&ret]() -> EncryptionAlgorithmPB {
65
        switch (ret.algorithm) {
66
        case doris::TEncryptionAlgorithm::AES256:
67
            return EncryptionAlgorithmPB::AES_256_CTR;
68
        case doris::TEncryptionAlgorithm::SM4:
69
            return EncryptionAlgorithmPB::SM4_128_CTR;
70
        default:
71
            return EncryptionAlgorithmPB::PLAINTEXT;
72
        }
73
    }();
74
75
    auto local_fs = io::global_local_filesystem();
76
    fs = io::make_file_system(local_fs, encrypt_algorithm);
77
#else
78
0
    fs = io::global_local_filesystem();
79
0
#endif
80
81
0
    return Status::OK();
82
1
}
83
84
1
Status WalWriter::init(const io::FileSystemSPtr& fs) {
85
1
    io::Path wal_path = _file_name;
86
1
    auto parent_path = wal_path.parent_path();
87
1
    bool exists = false;
88
1
    RETURN_IF_ERROR(fs->exists(parent_path, &exists));
89
1
    if (!exists) {
90
0
        RETURN_IF_ERROR(fs->create_directory(parent_path));
91
0
    }
92
1
    RETURN_IF_ERROR(fs->create_file(_file_name, &_file_writer));
93
1
    LOG(INFO) << "create wal " << _file_name;
94
1
    return Status::OK();
95
1
}
96
97
1
Status WalWriter::finalize() {
98
1
    if (!_file_writer) {
99
0
        return Status::InternalError("wal writer is null,fail to close file={}", _file_name);
100
0
    }
101
1
    auto st = _file_writer->close();
102
1
    if (!st.ok()) {
103
0
        LOG(WARNING) << "fail to close wal " << _file_name;
104
0
    }
105
1
    return Status::OK();
106
1
}
107
108
2
Status WalWriter::append_blocks(const PBlockArray& blocks) {
109
2
    if (!_file_writer) {
110
0
        return Status::InternalError("wal writer is null,fail to write file={}", _file_name);
111
0
    }
112
2
    size_t total_size = 0;
113
2
    size_t offset = 0;
114
3
    for (const auto& block : blocks) {
115
3
        uint8_t len_buf[sizeof(uint64_t)];
116
3
        uint64_t block_length = block->ByteSizeLong();
117
3
        total_size += LENGTH_SIZE + block_length + CHECKSUM_SIZE;
118
3
        encode_fixed64_le(len_buf, block_length);
119
3
        RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
120
3
        offset += LENGTH_SIZE;
121
122
3
        std::string content = block->SerializeAsString();
123
3
        RETURN_IF_ERROR(_file_writer->append(content));
124
3
        offset += block_length;
125
126
3
        uint8_t checksum_buf[sizeof(uint32_t)];
127
3
        uint32_t checksum = crc32c::Crc32c(content.data(), block_length);
128
3
        encode_fixed32_le(checksum_buf, checksum);
129
3
        RETURN_IF_ERROR(_file_writer->append({checksum_buf, sizeof(uint32_t)}));
130
3
        offset += CHECKSUM_SIZE;
131
3
    }
132
2
    if (offset != total_size) {
133
0
        return Status::InternalError(
134
0
                "failed to write block to wal expected= " + std::to_string(total_size) +
135
0
                ",actually=" + std::to_string(offset));
136
0
    }
137
2
    return Status::OK();
138
2
}
139
140
0
Status WalWriter::append_header(std::string col_ids) {
141
0
    if (!_file_writer) {
142
0
        return Status::InternalError("wal writer is null,fail to write file={}", _file_name);
143
0
    }
144
0
    size_t total_size = 0;
145
0
    uint64_t length = col_ids.size();
146
0
    total_size += k_wal_magic_length;
147
0
    total_size += VERSION_SIZE;
148
0
    total_size += LENGTH_SIZE;
149
0
    total_size += length;
150
0
    size_t offset = 0;
151
0
    RETURN_IF_ERROR(_file_writer->append({k_wal_magic, k_wal_magic_length}));
152
0
    offset += k_wal_magic_length;
153
154
0
    uint8_t version_buf[sizeof(uint32_t)];
155
0
    encode_fixed32_le(version_buf, WAL_VERSION);
156
0
    RETURN_IF_ERROR(_file_writer->append({version_buf, sizeof(uint32_t)}));
157
0
    offset += VERSION_SIZE;
158
0
    uint8_t len_buf[sizeof(uint64_t)];
159
0
    encode_fixed64_le(len_buf, length);
160
0
    RETURN_IF_ERROR(_file_writer->append({len_buf, sizeof(uint64_t)}));
161
0
    offset += LENGTH_SIZE;
162
0
    RETURN_IF_ERROR(_file_writer->append(col_ids));
163
0
    offset += length;
164
0
    if (offset != total_size) {
165
0
        return Status::InternalError(
166
0
                "failed to write header to wal expected= " + std::to_string(total_size) +
167
0
                ",actually=" + std::to_string(offset));
168
0
    }
169
0
    return Status::OK();
170
0
}
171
172
} // namespace doris