be/src/exec/sink/writer/vwal_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vwal_writer.h" |
19 | | |
20 | | #include <gen_cpp/FrontendService.h> |
21 | | #include <gen_cpp/data.pb.h> |
22 | | |
23 | | #include <sstream> |
24 | | |
25 | | #include "io/fs/encrypted_fs_factory.h" |
26 | | #include "util/debug_points.h" |
27 | | |
28 | | namespace doris { |
29 | | |
30 | | VWalWriter::VWalWriter(int64_t db_id, int64_t tb_id, int64_t wal_id, |
31 | | const std::string& import_label, WalManager* wal_manager, |
32 | | std::vector<TSlotDescriptor>& slot_desc, int be_exe_version) |
33 | 73 | : _db_id(db_id), |
34 | 73 | _tb_id(tb_id), |
35 | 73 | _wal_id(wal_id), |
36 | 73 | _label(import_label), |
37 | 73 | _wal_manager(wal_manager), |
38 | 73 | _slot_descs(slot_desc), |
39 | 73 | _be_exe_version(be_exe_version) {} |
40 | | |
41 | 73 | VWalWriter::~VWalWriter() {} |
42 | | |
43 | 73 | Status VWalWriter::init() { |
44 | 73 | io::FileSystemSPtr wal_fs = io::global_local_filesystem(); |
45 | 73 | #ifndef BE_TEST |
46 | 73 | if (config::group_commit_wait_replay_wal_finish) { |
47 | 0 | std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>(); |
48 | 0 | std::shared_ptr<std::condition_variable> cv = std::make_shared<std::condition_variable>(); |
49 | 0 | auto add_st = _wal_manager->add_wal_cv_map(_wal_id, lock, cv); |
50 | 0 | if (!add_st.ok()) { |
51 | 0 | LOG(WARNING) << "fail to add wal_id " << _wal_id << " to wal_cv_map"; |
52 | 0 | } |
53 | 0 | } |
54 | 73 | RETURN_IF_ERROR(determine_wal_fs(_db_id, _tb_id, wal_fs)); |
55 | 73 | #endif |
56 | 73 | RETURN_IF_ERROR(_create_wal_writer(_wal_id, wal_fs, _wal_writer)); |
57 | 73 | _wal_manager->add_wal_queue(_tb_id, _wal_id); |
58 | 73 | std::stringstream ss; |
59 | 326 | for (auto slot_desc : _slot_descs) { |
60 | 326 | if (slot_desc.col_unique_id < 0) { |
61 | 0 | continue; |
62 | 0 | } |
63 | 326 | ss << std::to_string(slot_desc.col_unique_id) << ","; |
64 | 326 | } |
65 | 73 | std::string col_ids = ss.str().substr(0, ss.str().size() - 1); |
66 | 73 | RETURN_IF_ERROR(_wal_writer->append_header(col_ids)); |
67 | 73 | return Status::OK(); |
68 | 73 | } |
69 | | |
70 | 582 | Status VWalWriter::write_wal(Block* block) { |
71 | 582 | DBUG_EXECUTE_IF("VWalWriter.write_wal.fail", |
72 | 582 | { return Status::InternalError("Failed to write wal!"); }); |
73 | 582 | PBlock pblock; |
74 | 582 | size_t uncompressed_bytes = 0, compressed_bytes = 0; |
75 | 582 | int64_t compressed_time = 0; |
76 | 582 | RETURN_IF_ERROR(block->serialize(_be_exe_version, &pblock, &uncompressed_bytes, |
77 | 582 | &compressed_bytes, &compressed_time, |
78 | 582 | segment_v2::CompressionTypePB::NO_COMPRESSION)); |
79 | 582 | RETURN_IF_ERROR(_wal_writer->append_blocks(std::vector<PBlock*> {&pblock})); |
80 | 582 | return Status::OK(); |
81 | 582 | } |
82 | | |
83 | 73 | Status VWalWriter::close() { |
84 | 73 | if (config::group_commit_wait_replay_wal_finish) { |
85 | 0 | std::string wal_path; |
86 | 0 | RETURN_IF_ERROR(_wal_manager->get_wal_path(_wal_id, wal_path)); |
87 | 0 | LOG(INFO) << "close file " << wal_path; |
88 | 0 | RETURN_IF_ERROR(_wal_manager->add_recover_wal(_db_id, _tb_id, _wal_id, wal_path)); |
89 | 0 | RETURN_IF_ERROR(_wal_manager->wait_replay_wal_finish(_wal_id)); |
90 | 0 | } |
91 | 73 | if (_wal_writer != nullptr) { |
92 | 73 | RETURN_IF_ERROR(_wal_writer->finalize()); |
93 | 73 | } |
94 | 73 | return Status::OK(); |
95 | 73 | } |
96 | | |
97 | | Status VWalWriter::_create_wal_writer(int64_t wal_id, const io::FileSystemSPtr& fs, |
98 | 73 | std::shared_ptr<WalWriter>& wal_writer) { |
99 | 73 | std::string wal_path; |
100 | 73 | RETURN_IF_ERROR(_wal_manager->get_wal_path(wal_id, wal_path)); |
101 | 73 | wal_writer = std::make_shared<WalWriter>(wal_path); |
102 | 73 | RETURN_IF_ERROR(wal_writer->init(fs)); |
103 | 73 | return Status::OK(); |
104 | 73 | } |
105 | | } // namespace doris |