be/src/load/channel/load_stream_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "load/channel/load_stream_writer.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <gen_cpp/olap_file.pb.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | |
31 | | #include "bvar/bvar.h" |
32 | | #include "cloud/config.h" |
33 | | #include "common/compiler_util.h" // IWYU pragma: keep |
34 | | #include "common/config.h" |
35 | | #include "common/logging.h" |
36 | | #include "common/status.h" |
37 | | #include "core/block/block.h" |
38 | | #include "io/fs/file_writer.h" // IWYU pragma: keep |
39 | | #include "load/channel/load_channel_mgr.h" |
40 | | #include "load/memtable/memtable.h" |
41 | | #include "load/memtable/memtable_flush_executor.h" |
42 | | #include "runtime/exec_env.h" |
43 | | #include "runtime/memory/mem_tracker.h" |
44 | | #include "service/backend_options.h" |
45 | | #include "storage/data_dir.h" |
46 | | #include "storage/index/inverted/inverted_index_desc.h" |
47 | | #include "storage/olap_define.h" |
48 | | #include "storage/rowset/beta_rowset.h" |
49 | | #include "storage/rowset/beta_rowset_writer.h" |
50 | | #include "storage/rowset/rowset_factory.h" |
51 | | #include "storage/rowset/rowset_meta.h" |
52 | | #include "storage/rowset/rowset_writer.h" |
53 | | #include "storage/rowset/rowset_writer_context.h" |
54 | | #include "storage/rowset_builder.h" |
55 | | #include "storage/schema.h" |
56 | | #include "storage/schema_change/schema_change.h" |
57 | | #include "storage/segment/segment.h" |
58 | | #include "storage/storage_engine.h" |
59 | | #include "storage/tablet/tablet_manager.h" |
60 | | #include "storage/tablet_info.h" |
61 | | #include "storage/txn/txn_manager.h" |
62 | | #include "util/brpc_client_cache.h" |
63 | | #include "util/brpc_closure.h" |
64 | | #include "util/debug_points.h" |
65 | | #include "util/mem_info.h" |
66 | | #include "util/stopwatch.hpp" |
67 | | #include "util/time.h" |
68 | | |
69 | | namespace doris { |
70 | | using namespace ErrorCode; |
71 | | |
72 | | bvar::Adder<int64_t> g_load_stream_writer_cnt("load_stream_writer_count"); |
73 | | bvar::Adder<int64_t> g_load_stream_file_writer_cnt("load_stream_file_writer_count"); |
74 | | |
75 | | LoadStreamWriter::LoadStreamWriter(WriteRequest* context, RuntimeProfile* profile) |
76 | 13.4k | : _req(*context), _rowset_writer(nullptr) { |
77 | 13.4k | g_load_stream_writer_cnt << 1; |
78 | | // TODO(plat1ko): CloudStorageEngine |
79 | 13.4k | _rowset_builder = std::make_unique<RowsetBuilder>( |
80 | 13.4k | ExecEnv::GetInstance()->storage_engine().to_local(), *context, profile); |
81 | 13.4k | _resource_ctx = thread_context()->resource_ctx(); // from load stream |
82 | 13.4k | } |
83 | | |
84 | 13.4k | LoadStreamWriter::~LoadStreamWriter() { |
85 | 13.4k | g_load_stream_file_writer_cnt << -_segment_file_writers.size(); |
86 | 13.4k | g_load_stream_file_writer_cnt << -_inverted_file_writers.size(); |
87 | 13.4k | g_load_stream_writer_cnt << -1; |
88 | 13.4k | } |
89 | | |
90 | 13.4k | Status LoadStreamWriter::init() { |
91 | 13.4k | DBUG_EXECUTE_IF("LoadStreamWriter.init.failure", |
92 | 13.4k | { return Status::InternalError("fault injection"); }); |
93 | 13.4k | RETURN_IF_ERROR(_rowset_builder->init()); |
94 | 13.4k | _rowset_writer = _rowset_builder->rowset_writer(); |
95 | 13.4k | _is_init = true; |
96 | 13.4k | return Status::OK(); |
97 | 13.4k | } |
98 | | |
99 | | Status LoadStreamWriter::append_data(uint32_t segid, uint64_t offset, butil::IOBuf buf, |
100 | 106k | FileType file_type) { |
101 | 106k | SCOPED_ATTACH_TASK(_resource_ctx); |
102 | 106k | io::FileWriter* file_writer = nullptr; |
103 | 106k | auto& file_writers = |
104 | 18.4E | file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; |
105 | 106k | { |
106 | 106k | std::lock_guard lock_guard(_lock); |
107 | 106k | DCHECK(_is_init); |
108 | 106k | if (segid >= file_writers.size()) { |
109 | 7.44k | for (size_t i = file_writers.size(); i <= segid; i++) { |
110 | 3.72k | Status st; |
111 | 3.72k | io::FileWriterPtr seg_file_writer; |
112 | 3.72k | st = _rowset_writer->create_file_writer(static_cast<uint32_t>(i), seg_file_writer, |
113 | 3.72k | file_type); |
114 | 3.72k | DBUG_EXECUTE_IF("LoadStreamWriter.append_data.create_file_writer_failed", |
115 | 3.72k | { st = Status::InternalError("fault injection"); }); |
116 | 3.72k | if (!st.ok()) { |
117 | 0 | _is_canceled = true; |
118 | 0 | return st; |
119 | 0 | } |
120 | 3.72k | file_writers.push_back(std::move(seg_file_writer)); |
121 | 3.72k | g_load_stream_file_writer_cnt << 1; |
122 | 3.72k | } |
123 | 3.71k | } |
124 | | |
125 | | // TODO: IOBuf to Slice |
126 | 106k | file_writer = file_writers[segid].get(); |
127 | 106k | } |
128 | 106k | DBUG_EXECUTE_IF("LoadStreamWriter.append_data.null_file_writer", { file_writer = nullptr; }); |
129 | 18.4E | VLOG_DEBUG << " file_writer " << file_writer << "seg id " << segid; |
130 | 106k | if (file_writer == nullptr) { |
131 | 0 | return Status::Corruption("append_data failed, file writer {} is destoryed", segid); |
132 | 0 | } |
133 | 106k | DBUG_EXECUTE_IF("LoadStreamWriter.append_data.wrong_offset", { offset++; }); |
134 | 106k | if (file_writer->bytes_appended() != offset) { |
135 | 1 | return Status::Corruption( |
136 | 1 | "append_data out-of-order in segment={}, expected offset={}, actual={}", |
137 | 1 | file_writer->path().native(), offset, file_writer->bytes_appended()); |
138 | 1 | } |
139 | 106k | return file_writer->append(buf.to_string()); |
140 | 106k | } |
141 | | |
142 | 3.72k | Status LoadStreamWriter::close_writer(uint32_t segid, FileType file_type) { |
143 | 3.72k | SCOPED_ATTACH_TASK(_resource_ctx); |
144 | 3.72k | io::FileWriter* file_writer = nullptr; |
145 | 3.72k | auto& file_writers = |
146 | 3.72k | file_type == FileType::SEGMENT_FILE ? _segment_file_writers : _inverted_file_writers; |
147 | 3.72k | { |
148 | 3.72k | std::lock_guard lock_guard(_lock); |
149 | 3.72k | if (!_is_init) { |
150 | 0 | return Status::Corruption("close_writer failed, LoadStreamWriter is not inited"); |
151 | 0 | } |
152 | 3.72k | DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.bad_segid", |
153 | 3.72k | { segid = static_cast<uint32_t>(file_writers.size()); }); |
154 | 3.72k | if (segid >= file_writers.size()) { |
155 | 0 | return Status::Corruption( |
156 | 0 | "close_writer failed, file {} is never opened, file type is {}", segid, |
157 | 0 | file_type); |
158 | 0 | } |
159 | 3.72k | file_writer = file_writers[segid].get(); |
160 | 3.72k | } |
161 | | |
162 | 3.72k | DBUG_EXECUTE_IF("LoadStreamWriter.close_writer.null_file_writer", { file_writer = nullptr; }); |
163 | 3.72k | if (file_writer == nullptr) { |
164 | 0 | return Status::Corruption( |
165 | 0 | "close_writer failed, file writer {} is destoryed, fiel type is {}", segid, |
166 | 0 | file_type); |
167 | 0 | } |
168 | 3.72k | auto st = file_writer->close(); |
169 | 3.72k | if (!st.ok()) { |
170 | 0 | _is_canceled = true; |
171 | 0 | return st; |
172 | 0 | } |
173 | 3.72k | LOG(INFO) << "file " << segid << " path " << file_writer->path().native() << " closed, written " |
174 | 3.72k | << file_writer->bytes_appended() << " bytes" |
175 | 3.72k | << ", file type is " << file_type; |
176 | | // Allow the index file to be empty when creating an index on a variant-type column. |
177 | 3.72k | if (file_writer->bytes_appended() == 0 && file_type != FileType::INVERTED_INDEX_FILE) { |
178 | 1 | return Status::Corruption("file {} closed with 0 bytes, file type is {}", |
179 | 1 | file_writer->path().native(), file_type); |
180 | 1 | } |
181 | 3.72k | return Status::OK(); |
182 | 3.72k | } |
183 | | |
184 | 3.70k | Status LoadStreamWriter::add_segment(uint32_t segid, const SegmentStatistics& stat) { |
185 | 3.70k | SCOPED_ATTACH_TASK(_resource_ctx); |
186 | 3.70k | size_t segment_file_size = 0; |
187 | 3.70k | size_t inverted_file_size = 0; |
188 | 3.70k | { |
189 | 3.70k | std::lock_guard lock_guard(_lock); |
190 | 3.70k | if (!_is_init) { |
191 | 0 | return Status::Corruption("add_segment failed, LoadStreamWriter is not inited"); |
192 | 0 | } |
193 | 3.70k | DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.bad_segid", |
194 | 3.70k | { segid = static_cast<uint32_t>(_segment_file_writers.size()); }); |
195 | 3.70k | RETURN_IF_ERROR(_calc_file_size(segid, FileType::SEGMENT_FILE, &segment_file_size)); |
196 | 3.70k | if (_inverted_file_writers.size() > 0) { |
197 | 0 | RETURN_IF_ERROR( |
198 | 0 | _calc_file_size(segid, FileType::INVERTED_INDEX_FILE, &inverted_file_size)); |
199 | 0 | } |
200 | 3.70k | } |
201 | | |
202 | 3.70k | DBUG_EXECUTE_IF("LoadStreamWriter.add_segment.size_not_match", { segment_file_size++; }); |
203 | 3.70k | if (segment_file_size != stat.data_size) { |
204 | 0 | return Status::Corruption( |
205 | 0 | "add_segment failed, segment stat {} does not match, file size={}, inverted file " |
206 | 0 | "size={}, stat.data_size={}, tablet id={}", |
207 | 0 | segid, segment_file_size, inverted_file_size, stat.data_size, _req.tablet_id); |
208 | 0 | } |
209 | | |
210 | 3.70k | return _rowset_writer->add_segment(segid, stat); |
211 | 3.70k | } |
212 | | |
213 | 3.70k | Status LoadStreamWriter::_calc_file_size(uint32_t segid, FileType file_type, size_t* file_size) { |
214 | 3.70k | io::FileWriter* file_writer = nullptr; |
215 | 3.70k | auto& file_writers = |
216 | 3.70k | (file_type == FileType::SEGMENT_FILE) ? _segment_file_writers : _inverted_file_writers; |
217 | | |
218 | 3.70k | DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.unknown_segment", |
219 | 3.70k | { segid = static_cast<uint32_t>(file_writers.size()); }); |
220 | 3.70k | if (segid >= file_writers.size()) { |
221 | 0 | return Status::Corruption("calc file size failed, file {} is never opened, file type is {}", |
222 | 0 | segid, file_type); |
223 | 0 | } |
224 | 3.70k | file_writer = file_writers[segid].get(); |
225 | 3.70k | DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.null_file_writer", |
226 | 3.70k | { file_writer = nullptr; }); |
227 | 3.70k | if (file_writer == nullptr) { |
228 | 0 | return Status::Corruption( |
229 | 0 | "calc file size failed, file writer {} is destoryed, file type is {}", segid, |
230 | 0 | file_type); |
231 | 0 | } |
232 | 3.70k | DBUG_EXECUTE_IF("LoadStreamWriter._calc_file_size.file_not_closed", { |
233 | 3.70k | io::FileWriterPtr fwriter; |
234 | 3.70k | static_cast<void>(_rowset_writer->create_file_writer( |
235 | 3.70k | static_cast<uint32_t>(file_writers.size()), fwriter, FileType::SEGMENT_FILE)); |
236 | 3.70k | file_writers.push_back(std::move(fwriter)); |
237 | 3.70k | file_writer = file_writers.back().get(); |
238 | 3.70k | }); |
239 | 3.70k | if (file_writer->state() != io::FileWriter::State::CLOSED) { |
240 | 0 | return Status::Corruption("calc file size failed, file {} is not closed", |
241 | 0 | file_writer->path().native()); |
242 | 0 | } |
243 | 3.70k | *file_size = file_writer->bytes_appended(); |
244 | 3.70k | return Status::OK(); |
245 | 3.70k | } |
246 | | |
247 | 13.4k | Status LoadStreamWriter::_pre_close() { |
248 | 13.4k | SCOPED_ATTACH_TASK(_resource_ctx); |
249 | 13.4k | if (!_is_init) { |
250 | | // if this delta writer is not initialized, but close() is called. |
251 | | // which means this tablet has no data loaded, but at least one tablet |
252 | | // in same partition has data loaded. |
253 | | // so we have to also init this LoadStreamWriter, so that it can create an empty rowset |
254 | | // for this tablet when being closed. |
255 | 0 | RETURN_IF_ERROR(init()); |
256 | 0 | } |
257 | | |
258 | 13.4k | DCHECK(_is_init) |
259 | 0 | << "rowset builder is supposed be to initialized before close_wait() being called"; |
260 | | |
261 | 13.4k | DBUG_EXECUTE_IF("LoadStreamWriter.close.cancelled", { _is_canceled = true; }); |
262 | 13.4k | if (_is_canceled) { |
263 | 0 | return Status::InternalError("flush segment failed"); |
264 | 0 | } |
265 | 13.4k | DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_writers_size_not_match", { |
266 | 13.4k | io::FileWriterPtr file_writer; |
267 | 13.4k | static_cast<void>(_rowset_writer->create_file_writer( |
268 | 13.4k | static_cast<uint32_t>(_inverted_file_writers.size()), file_writer, |
269 | 13.4k | FileType::INVERTED_INDEX_FILE)); |
270 | 13.4k | _inverted_file_writers.push_back(std::move(file_writer)); |
271 | 13.4k | }); |
272 | 13.4k | if (_inverted_file_writers.size() > 0 && |
273 | 13.4k | _inverted_file_writers.size() != _segment_file_writers.size()) { |
274 | 0 | return Status::Corruption( |
275 | 0 | "LoadStreamWriter close failed, inverted file writer size is {}," |
276 | 0 | "segment file writer size is {}", |
277 | 0 | _inverted_file_writers.size(), _segment_file_writers.size()); |
278 | 0 | } |
279 | 13.4k | DBUG_EXECUTE_IF("LoadStreamWriter.close.file_not_closed", { |
280 | 13.4k | io::FileWriterPtr file_writer; |
281 | 13.4k | static_cast<void>(_rowset_writer->create_file_writer( |
282 | 13.4k | static_cast<uint32_t>(_segment_file_writers.size()), file_writer, |
283 | 13.4k | FileType::SEGMENT_FILE)); |
284 | 13.4k | _segment_file_writers.push_back(std::move(file_writer)); |
285 | 13.4k | }); |
286 | 13.4k | for (const auto& writer : _segment_file_writers) { |
287 | 3.72k | if (writer->state() != io::FileWriter::State::CLOSED) { |
288 | 1 | return Status::Corruption("LoadStreamWriter close failed, segment {} is not closed", |
289 | 1 | writer->path().native()); |
290 | 1 | } |
291 | 3.72k | } |
292 | | |
293 | 13.4k | DBUG_EXECUTE_IF("LoadStreamWriter.close.inverted_file_not_closed", { |
294 | 13.4k | io::FileWriterPtr file_writer; |
295 | 13.4k | static_cast<void>(_rowset_writer->create_file_writer( |
296 | 13.4k | static_cast<uint32_t>(_inverted_file_writers.size()), file_writer, |
297 | 13.4k | FileType::INVERTED_INDEX_FILE)); |
298 | 13.4k | _inverted_file_writers.push_back(std::move(file_writer)); |
299 | 13.4k | }); |
300 | 13.4k | for (const auto& writer : _inverted_file_writers) { |
301 | 0 | if (writer->state() != io::FileWriter::State::CLOSED) { |
302 | 0 | return Status::Corruption( |
303 | 0 | "LoadStreamWriter close failed, inverted file {} is not closed", |
304 | 0 | writer->path().native()); |
305 | 0 | } |
306 | 0 | } |
307 | | |
308 | 13.4k | RETURN_IF_ERROR(_rowset_builder->build_rowset()); |
309 | 13.4k | RETURN_IF_ERROR(_rowset_builder->submit_calc_delete_bitmap_task()); |
310 | 13.4k | _pre_closed = true; |
311 | 13.4k | return Status::OK(); |
312 | 13.4k | } |
313 | | |
314 | 13.4k | Status LoadStreamWriter::close() { |
315 | 13.4k | std::lock_guard<std::mutex> l(_lock); |
316 | 13.4k | if (!_pre_closed) { |
317 | 0 | RETURN_IF_ERROR(_pre_close()); |
318 | 0 | } |
319 | 13.4k | RETURN_IF_ERROR(_rowset_builder->wait_calc_delete_bitmap()); |
320 | | // FIXME(plat1ko): No `commit_txn` operation in cloud mode, need better abstractions |
321 | 13.4k | RETURN_IF_ERROR(static_cast<RowsetBuilder*>(_rowset_builder.get())->commit_txn()); |
322 | | |
323 | 13.4k | return Status::OK(); |
324 | 13.4k | } |
325 | | |
326 | | } // namespace doris |