be/src/load/channel/load_stream_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/channel/load_stream_writer.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <gen_cpp/olap_file.pb.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | |
31 | | #include "bvar/bvar.h" |
32 | | #include "cloud/config.h" |
33 | | #include "common/compiler_util.h" // IWYU pragma: keep |
34 | | #include "common/config.h" |
35 | | #include "common/logging.h" |
36 | | #include "common/status.h" |
37 | | #include "core/block/block.h" |
38 | | #include "io/fs/file_writer.h" // IWYU pragma: keep |
39 | | #include "load/channel/load_channel_mgr.h" |
40 | | #include "load/memtable/memtable.h" |
41 | | #include "load/memtable/memtable_flush_executor.h" |
42 | | #include "runtime/exec_env.h" |
43 | | #include "runtime/memory/mem_tracker.h" |
44 | | #include "service/backend_options.h" |
45 | | #include "storage/data_dir.h" |
46 | | #include "storage/index/inverted/inverted_index_desc.h" |
47 | | #include "storage/olap_define.h" |
48 | | #include "storage/rowset/beta_rowset.h" |
49 | | #include "storage/rowset/beta_rowset_writer.h" |
50 | | #include "storage/rowset/rowset_factory.h" |
51 | | #include "storage/rowset/rowset_meta.h" |
52 | | #include "storage/rowset/rowset_writer.h" |
53 | | #include "storage/rowset/rowset_writer_context.h" |
54 | | #include "storage/rowset_builder.h" |
55 | | #include "storage/schema.h" |
56 | | #include "storage/schema_change/schema_change.h" |
57 | | #include "storage/segment/segment.h" |
58 | | #include "storage/storage_engine.h" |
59 | | #include "storage/tablet/tablet_manager.h" |
60 | | #include "storage/tablet_info.h" |
61 | | #include "storage/txn/txn_manager.h" |
62 | | #include "util/brpc_client_cache.h" |
63 | | #include "util/brpc_closure.h" |
64 | | #include "util/debug_points.h" |
65 | | #include "util/mem_info.h" |
66 | | #include "util/stopwatch.hpp" |
67 | | #include "util/time.h" |
68 | | |
69 | | namespace doris { |
70 | | #include "common/compile_check_begin.h" |
71 | | using namespace ErrorCode; |
72 | | |
73 | | bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count"); |
74 | | bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count"); |
75 | | |
76 | | LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile) |
77 | 14 | : _req(*context), _rowset_writer(nullptr) { |
78 | 14 | g_load_stream_writer_cnt << 1; |
79 | | // TODO(plat1ko): CloudStorageEngine |
80 | 14 | _rowset_builder = std::make_unique<RowsetBuilder>( |
81 | 14 | ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile); |
82 | 14 | _resource_ctx = thread_context()->resource_ctx(); // from load stream |
83 | 14 | } |
84 | | |
85 | 14 | LoadStreamWriter::~LoadStreamWriter() { |
86 | 14 | g_load_stream_file_writer_cnt << -_segment_file_writers.size(); |
87 | 14 | g_load_stream_file_writer_cnt << -_inverted_file_writers.size(); |
88 | 14 | g_load_stream_writer_cnt << -1; |
89 | 14 | } |
90 | | |
91 | 14 | Status LoadStreamWriter::init() { |
92 | 14 | DBUG_EXECUTE_IF("LoadStreamWriter.init.failure", |
93 | 14 | { return Status::InternalError("fault injection"); }); |
94 | 14 | RETURN_IF_ERROR(_rowset_builder->init()); |
95 | 13 | _rowset_writer = _rowset_builder->rowset_writer(); |
96 | 13 | _is_init = true; |
97 | 13 | return Status::OK(); |
98 | 14 | } |
99 | | |
100 | | Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf, |
101 | 26 | FileType file_type) { |
102 | 26 | SCOPED_ATTACH_TASK(_resource_ctx); |
103 | 26 | io::FileWriter* file_writer = nullptr; |
104 | 26 | auto& file_writers = |
105 | 26 | file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; |
106 | 26 | { |
107 | 26 | std::lock_guard lock_guard(_lock); |
108 | 26 | DCHECK(_is_init); |
109 | 26 | if (segid >= file_writers.size()) { |
110 | 39 | for (size_t i = file_writers.size(); i <= segid; i++) { |
111 | 24 | Status st; |
112 | 24 | io::FileWriterPtr seg_file_writer; |
113 | 24 | st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer, |
114 | 24 | file_type); |
115 | 24 | DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed", |
116 | 24 | { st = Status::InternalError("fault injection"); }); |
117 | 24 | if (!st.ok()) { |
118 | 0 | _is_canceled = true; |
119 | 0 | return st; |
120 | 0 | } |
121 | 24 | file_writers.push_back(std::move(seg_file_writer)); |
122 | 24 | g_load_stream_file_writer_cnt << 1; |
123 | 24 | } |
124 | 15 | } |
125 | | |
126 | | // TODO: IOBuf to Slice |
127 | 26 | file_writer = file_writers[segid].get(); |
128 | 26 | } |
129 | 26 | DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; }); |
130 | 26 | VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid; |
131 | 26 | if (file_writer == nullptr) { |
132 | 0 | return Status::Corruption("append_data failed, file writer {} is destoryed", segid); |
133 | 0 | } |
134 | 26 | DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; }); |
135 | 26 | if (file_writer->bytes_appended() != offset) { |
136 | 1 | return Status::Corruption( |
137 | 1 | "append_data out-of-order in segment={}, expected offset={}, actual={}", |
138 | 1 | file_writer->path().native(), offset, file_writer->bytes_appended()); |
139 | 1 | } |
140 | 25 | return file_writer->append(buf.to_string()); |
141 | 26 | } |
142 | | |
143 | 21 | Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) { |
144 | 21 | SCOPED_ATTACH_TASK(_resource_ctx); |
145 | 21 | io::FileWriter* file_writer = nullptr; |
146 | 21 | auto& file_writers = |
147 | 21 | file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; |
148 | 21 | { |
149 | 21 | std::lock_guard lock_guard(_lock); |
150 | 21 | if (!_is_init) { |
151 | 0 | return Status::Corruption("close_writer failed, LoadStreamWriter is not inited"); |
152 | 0 | } |
153 | 21 | DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid", |
154 | 21 | { segid = static_cast<uint32_t>(file_writers.size()); }); |
155 | 21 | if (segid >= file_writers.size()) { |
156 | 0 | return Status::Corruption( |
157 | 0 | "close_writer failed, file {} is never opened, file type is {}", segid, |
158 | 0 | file_type); |
159 | 0 | } |
160 | 21 | file_writer = file_writers[segid].get(); |
161 | 21 | } |
162 | | |
163 | 21 | DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; }); |
164 | 21 | if (file_writer == nullptr) { |
165 | 0 | return Status::Corruption( |
166 | 0 | "close_writer failed, file writer {} is destoryed, fiel type is {}", segid, |
167 | 0 | file_type); |
168 | 0 | } |
169 | 21 | auto st = file_writer->close(); |
170 | 21 | if (!st.ok()) { |
171 | 0 | _is_canceled = true; |
172 | 0 | return st; |
173 | 0 | } |
174 | 21 | LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written " |
175 | 21 | << file_writer->bytes_appended() << " bytes" |
176 | 21 | << ", file type is " << file_type; |
177 | | // Allow the index file to be empty when creating an index on a variant-type column. |
178 | 21 | if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) { |
179 | 1 | return Status::Corruption("file {} closed with 0 bytes, file type is {}", |
180 | 1 | file_writer->path().native(), file_type); |
181 | 1 | } |
182 | 20 | return Status::OK(); |
183 | 21 | } |
184 | | |
185 | 0 | Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) { |
186 | 0 | SCOPED_ATTACH_TASK(_resource_ctx); |
187 | 0 | size_t segment_file_size = 0; |
188 | 0 | size_t inverted_file_size = 0; |
189 | 0 | { |
190 | 0 | std::lock_guard lock_guard(_lock); |
191 | 0 | if (!_is_init) { |
192 | 0 | return Status::Corruption("add_segment failed, LoadStreamWriter is not inited"); |
193 | 0 | } |
194 | 0 | DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid", |
195 | 0 | { segid = static_cast<uint32_t>(_segment_file_writers.size()); }); |
196 | 0 | RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size)); |
197 | 0 | if (_inverted_file_writers.size() > 0) { |
198 | 0 | RETURN_IF_ERROR( |
199 | 0 | _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size)); |
200 | 0 | } |
201 | 0 | } |
202 | | |
203 | 0 | DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; }); |
204 | 0 | if (segment_file_size != stat.data_size) { |
205 | 0 | return Status::Corruption( |
206 | 0 | "add_segment failed, segment stat {} does not match, file size={}, inverted file " |
207 | 0 | "size={}, stat.data_size={}, tablet id={}", |
208 | 0 | segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id); |
209 | 0 | } |
210 | | |
211 | 0 | return _rowset_writer->add_segment(segid, stat); |
212 | 0 | } |
213 | | |
214 | 0 | Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) { |
215 | 0 | io::FileWriter* file_writer = nullptr; |
216 | 0 | auto& file_writers = |
217 | 0 | (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers; |
218 | |
|
219 | 0 | DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment", |
220 | 0 | { segid = static_cast<uint32_t>(file_writers.size()); }); |
221 | 0 | if (segid >= file_writers.size()) { |
222 | 0 | return Status::Corruption("calc file size failed, file {} is never opened, file type is {}", |
223 | 0 | segid, file_type); |
224 | 0 | } |
225 | 0 | file_writer = file_writers[segid].get(); |
226 | 0 | DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer", |
227 | 0 | { file_writer = nullptr; }); |
228 | 0 | if (file_writer == nullptr) { |
229 | 0 | return Status::Corruption( |
230 | 0 | "calc file size failed, file writer {} is destoryed, file type is {}", segid, |
231 | 0 | file_type); |
232 | 0 | } |
233 | 0 | DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", { |
234 | 0 | io::FileWriterPtr fwriter; |
235 | 0 | static_cast<void>(_rowset_writer->create_file_writer( |
236 | 0 | static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE)); |
237 | 0 | file_writers.push_back(std::move(fwriter)); |
238 | 0 | file_writer = file_writers.back().get(); |
239 | 0 | }); |
240 | 0 | if (file_writer->state() != io::FileWriter::State::CLOSED) { |
241 | 0 | return Status::Corruption("calc file size failed, file {} is not closed", |
242 | 0 | file_writer->path().native()); |
243 | 0 | } |
244 | 0 | *file_size = file_writer->bytes_appended(); |
245 | 0 | return Status::OK(); |
246 | 0 | } |
247 | | |
248 | 9 | Status LoadStreamWriter::_pre_close() { |
249 | 9 | SCOPED_ATTACH_TASK(_resource_ctx); |
250 | 9 | if (!_is_init) { |
251 | | // if this delta writer is not initialized, but close() is called. |
252 | | // which means this tablet has no data loaded, but at least one tablet |
253 | | // in same partition has data loaded. |
254 | | // so we have to also init this LoadStreamWriter, so that it can create an empty rowset |
255 | | // for this tablet when being closed. |
256 | 0 | RETURN_IF_ERROR(init()); |
257 | 0 | } |
258 | | |
259 | 9 | DCHECK(_is_init) |
260 | 0 | << "rowset builder is supposed be to initialized before close_wait() being called"; |
261 | | |
262 | 9 | DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; }); |
263 | 9 | if (_is_canceled) { |
264 | 0 | return Status::InternalError("flush segment failed"); |
265 | 0 | } |
266 | 9 | DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", { |
267 | 9 | io::FileWriterPtr file_writer; |
268 | 9 | static_cast<void>(_rowset_writer->create_file_writer( |
269 | 9 | static_cast<uint32_t>(_inverted_file_writers.size()), file_writer, |
270 | 9 | FileType::INVERTED_INDEX_FILE)); |
271 | 9 | _inverted_file_writers.push_back(std::move(file_writer)); |
272 | 9 | }); |
273 | 9 | if (_inverted_file_writers.size() > 0 && |
274 | 9 | _inverted_file_writers.size() != _segment_file_writers.size()) { |
275 | 0 | return Status::Corruption( |
276 | 0 | "LoadStreamWriter close failed, inverted file writer size is {}," |
277 | 0 | "segment file writer size is {}", |
278 | 0 | _inverted_file_writers.size(), _segment_file_writers.size()); |
279 | 0 | } |
280 | 9 | DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", { |
281 | 9 | io::FileWriterPtr file_writer; |
282 | 9 | static_cast<void>(_rowset_writer->create_file_writer( |
283 | 9 | static_cast<uint32_t>(_segment_file_writers.size()), file_writer, |
284 | 9 | FileType::SEGMENT_FILE)); |
285 | 9 | _segment_file_writers.push_back(std::move(file_writer)); |
286 | 9 | }); |
287 | 20 | for (const auto& writer : _segment_file_writers) { |
288 | 20 | if (writer->state() != io::FileWriter::State::CLOSED) { |
289 | 1 | return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed", |
290 | 1 | writer->path().native()); |
291 | 1 | } |
292 | 20 | } |
293 | | |
294 | 8 | DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", { |
295 | 8 | io::FileWriterPtr file_writer; |
296 | 8 | static_cast<void>(_rowset_writer->create_file_writer( |
297 | 8 | static_cast<uint32_t>(_inverted_file_writers.size()), file_writer, |
298 | 8 | FileType::INVERTED_INDEX_FILE)); |
299 | 8 | _inverted_file_writers.push_back(std::move(file_writer)); |
300 | 8 | }); |
301 | 8 | for (const auto& writer : _inverted_file_writers) { |
302 | 0 | if (writer->state() != io::FileWriter::State::CLOSED) { |
303 | 0 | return Status::Corruption( |
304 | 0 | "LoadStreamWriter close failed, inverted file {} is not closed", |
305 | 0 | writer->path().native()); |
306 | 0 | } |
307 | 0 | } |
308 | | |
309 | 8 | RETURN_IF_ERROR(_rowset_builder->build_rowset()); |
310 | 8 | RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task()); |
311 | 8 | _pre_closed = true; |
312 | 8 | return Status::OK(); |
313 | 8 | } |
314 | | |
315 | 8 | Status LoadStreamWriter::close() { |
316 | 8 | std::lock_guard<std::mutex> l(_lock); |
317 | 8 | if (!_pre_closed) { |
318 | 0 | RETURN_IF_ERROR(_pre_close()); |
319 | 0 | } |
320 | 8 | RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap()); |
321 | | // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions |
322 | 8 | RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn()); |
323 | | |
324 | 8 | return Status::OK(); |
325 | 8 | } |
326 | | |
327 | | #include "common/compile_check_end.h" |
328 | | } // namespace doris |