be/src/storage/rowset/beta_rowset_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset_writer.h" |
19 | | |
20 | | #include <assert.h> |
21 | | // IWYU pragma: no_include <bthread/errno.h> |
22 | | #include <errno.h> // IWYU pragma: keep |
23 | | #include <fmt/format.h> |
24 | | #include <stdio.h> |
25 | | |
26 | | #include <chrono> |
27 | | #include <ctime> // time |
28 | | #include <filesystem> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <string> |
33 | | #include <thread> |
34 | | #include <utility> |
35 | | #include <vector> |
36 | | |
37 | | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
38 | | #include "common/cast_set.h" |
39 | | #include "common/compiler_util.h" // IWYU pragma: keep |
40 | | #include "common/config.h" |
41 | | #include "common/logging.h" |
42 | | #include "common/status.h" |
43 | | #include "core/block/block.h" |
44 | | #include "core/column/column.h" |
45 | | #include "core/data_type/data_type_factory.hpp" |
46 | | #include "io/fs/file_reader.h" |
47 | | #include "io/fs/file_system.h" |
48 | | #include "io/fs/file_writer.h" |
49 | | #include "runtime/thread_context.h" |
50 | | #include "storage/index/inverted/inverted_index_cache.h" |
51 | | #include "storage/index/inverted/inverted_index_desc.h" |
52 | | #include "storage/olap_define.h" |
53 | | #include "storage/rowset/beta_rowset.h" |
54 | | #include "storage/rowset/rowset_factory.h" |
55 | | #include "storage/rowset/rowset_writer.h" |
56 | | #include "storage/rowset/segcompaction.h" |
57 | | #include "storage/schema_change/schema_change.h" |
58 | | #include "storage/segment/segment.h" |
59 | | #include "storage/segment/segment_writer.h" |
60 | | #include "storage/storage_engine.h" |
61 | | #include "storage/tablet/tablet_schema.h" |
62 | | #include "util/debug_points.h" |
63 | | #include "util/pretty_printer.h" |
64 | | #include "util/slice.h" |
65 | | #include "util/stopwatch.hpp" |
66 | | #include "util/time.h" |
67 | | |
68 | | namespace doris { |
69 | | using namespace ErrorCode; |
70 | | |
71 | | namespace { |
72 | | |
73 | 699 | bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { |
74 | 699 | std::string_view last; |
75 | 1.78k | for (auto&& segment_encode_key : segments_encoded_key_bounds) { |
76 | 1.78k | auto&& cur_min = segment_encode_key.min_key(); |
77 | 1.78k | auto&& cur_max = segment_encode_key.max_key(); |
78 | 1.78k | if (cur_min <= last) { |
79 | 258 | return true; |
80 | 258 | } |
81 | 1.52k | last = cur_max; |
82 | 1.52k | } |
83 | 441 | return false; |
84 | 699 | } |
85 | | |
86 | 1.21k | bool copy_key_bounds_with_truncation(const KeyBoundsPB& src, KeyBoundsPB* dst) { |
87 | 1.21k | DCHECK(dst != nullptr); |
88 | 1.21k | if (config::random_segments_key_bounds_truncation) { |
89 | 0 | dst->CopyFrom(src); |
90 | 0 | return false; |
91 | 0 | } |
92 | 1.21k | const int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold; |
93 | 1.21k | if (truncation_threshold <= 0) { |
94 | 355 | dst->CopyFrom(src); |
95 | 355 | return false; |
96 | 355 | } |
97 | 858 | const size_t truncation_size = cast_set<size_t>(truncation_threshold); |
98 | | |
99 | 858 | bool truncated = false; |
100 | 1.71k | auto copy_key = [&](const std::string& key, std::string* stored_key) { |
101 | 1.71k | if (key.size() > truncation_size) { |
102 | 1.39k | stored_key->assign(key.data(), truncation_size); |
103 | 1.39k | truncated = true; |
104 | 1.39k | return; |
105 | 1.39k | } |
106 | 321 | stored_key->assign(key.data(), key.size()); |
107 | 321 | }; |
108 | 858 | copy_key(src.min_key(), dst->mutable_min_key()); |
109 | 858 | copy_key(src.max_key(), dst->mutable_max_key()); |
110 | 858 | return truncated; |
111 | 1.21k | } |
112 | | |
113 | | SegmentStatistics copy_segment_statistics_with_truncated_key_bounds(const SegmentStatistics& src, |
114 | 1.20k | bool& key_bounds_truncated) { |
115 | 1.20k | SegmentStatistics dst; |
116 | 1.20k | dst.row_num = src.row_num; |
117 | 1.20k | dst.data_size = src.data_size; |
118 | 1.20k | dst.index_size = src.index_size; |
119 | 1.20k | key_bounds_truncated = copy_key_bounds_with_truncation(src.key_bounds, &dst.key_bounds); |
120 | 1.20k | return dst; |
121 | 1.20k | } |
122 | | |
123 | | void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, |
124 | 24 | const RowsetMeta& spec_rowset_meta) { |
125 | 24 | rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); |
126 | 24 | rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); |
127 | 24 | rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size()); |
128 | 24 | rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); |
129 | | // TODO write zonemap to meta |
130 | 24 | rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); |
131 | 24 | rowset_meta.set_creation_time(time(nullptr)); |
132 | 24 | rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); |
133 | 24 | rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); |
134 | 24 | rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); |
135 | 24 | rowset_meta.set_segments_key_bounds_truncated( |
136 | 24 | spec_rowset_meta.is_segments_key_bounds_truncated()); |
137 | 24 | rowset_meta.set_db_id(spec_rowset_meta.db_id()); |
138 | 24 | rowset_meta.set_table_id(spec_rowset_meta.table_id()); |
139 | 24 | std::vector<KeyBoundsPB> segments_key_bounds; |
140 | 24 | spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); |
141 | | // Preserve source layout: if source was aggregated (size 1), re-aggregating |
142 | | // the single entry is a no-op that also keeps the flag consistent. |
143 | 24 | rowset_meta.set_segments_key_bounds(segments_key_bounds, |
144 | 24 | spec_rowset_meta.is_segments_key_bounds_aggregated()); |
145 | 24 | std::vector<uint32_t> num_segment_rows; |
146 | 24 | spec_rowset_meta.get_num_segment_rows(&num_segment_rows); |
147 | 24 | rowset_meta.set_num_segment_rows(num_segment_rows); |
148 | 24 | if (spec_rowset_meta.has_commit_tso()) { |
149 | 6 | rowset_meta.set_commit_tso(spec_rowset_meta.commit_tso()); |
150 | 6 | } |
151 | 24 | if (spec_rowset_meta.is_row_binlog()) { |
152 | 0 | rowset_meta.mark_row_binlog(); |
153 | 0 | } |
154 | 24 | } |
155 | | |
156 | | } // namespace |
157 | | |
158 | 894 | SegmentFileCollection::~SegmentFileCollection() = default; |
159 | | |
160 | 2.38k | Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { |
161 | 2.38k | std::lock_guard lock(_lock); |
162 | 2.38k | if (_closed) [[unlikely]] { |
163 | 0 | DCHECK(false) << writer->path(); |
164 | 0 | return Status::InternalError("add to closed SegmentFileCollection"); |
165 | 0 | } |
166 | | |
167 | 2.38k | _file_writers.emplace(seg_id, std::move(writer)); |
168 | 2.38k | return Status::OK(); |
169 | 2.38k | } |
170 | | |
171 | 63 | io::FileWriter* SegmentFileCollection::get(int seg_id) const { |
172 | 63 | std::lock_guard lock(_lock); |
173 | 63 | if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { |
174 | 63 | return it->second.get(); |
175 | 63 | } else { |
176 | 0 | return nullptr; |
177 | 0 | } |
178 | 63 | } |
179 | | |
180 | 861 | Status SegmentFileCollection::close() { |
181 | 861 | { |
182 | 861 | std::lock_guard lock(_lock); |
183 | 861 | if (_closed) [[unlikely]] { |
184 | 0 | DCHECK(false); |
185 | 0 | return Status::InternalError("double close SegmentFileCollection"); |
186 | 0 | } |
187 | 861 | _closed = true; |
188 | 861 | } |
189 | | |
190 | 2.38k | for (auto&& [_, writer] : _file_writers) { |
191 | 2.38k | DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", { |
192 | 2.38k | auto before_state = writer->state(); |
193 | 2.38k | for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) { |
194 | 2.38k | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
195 | 2.38k | } |
196 | 2.38k | LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path=" |
197 | 2.38k | << writer->path().native() |
198 | 2.38k | << " before_state=" << static_cast<int>(before_state) |
199 | 2.38k | << " after_state=" << static_cast<int>(writer->state()); |
200 | 2.38k | }); |
201 | 2.38k | if (writer->state() != io::FileWriter::State::CLOSED) { |
202 | 2.31k | RETURN_IF_ERROR(writer->close()); |
203 | 2.31k | } |
204 | 2.38k | } |
205 | | |
206 | 861 | return Status::OK(); |
207 | 861 | } |
208 | | |
209 | 0 | Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) { |
210 | 0 | std::lock_guard lock(_lock); |
211 | 0 | if (!_closed) [[unlikely]] { |
212 | 0 | DCHECK(false); |
213 | 0 | return ResultError(Status::InternalError("get segments file size without closed")); |
214 | 0 | } |
215 | | |
216 | 0 | Status st; |
217 | 0 | std::vector<size_t> seg_file_size(_file_writers.size(), 0); |
218 | 0 | bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { |
219 | 0 | auto&& [seg_id, writer] = it; |
220 | |
|
221 | 0 | int idx = seg_id - seg_id_offset; |
222 | 0 | if (idx >= seg_file_size.size()) [[unlikely]] { |
223 | 0 | auto err_msg = fmt::format( |
224 | 0 | "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, |
225 | 0 | seg_file_size.size(), seg_id_offset, writer->path().native()); |
226 | 0 | DCHECK(false) << err_msg; |
227 | 0 | st = Status::InternalError(err_msg); |
228 | 0 | return false; |
229 | 0 | } |
230 | | |
231 | 0 | auto& fsize = seg_file_size[idx]; |
232 | 0 | if (fsize != 0) { |
233 | | // File size should not been set |
234 | 0 | auto err_msg = |
235 | 0 | fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); |
236 | 0 | DCHECK(false) << err_msg; |
237 | 0 | st = Status::InternalError(err_msg); |
238 | 0 | return false; |
239 | 0 | } |
240 | | |
241 | 0 | fsize = writer->bytes_appended(); |
242 | 0 | if (fsize <= 0) { |
243 | 0 | auto err_msg = |
244 | 0 | fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); |
245 | 0 | DCHECK(false) << err_msg; |
246 | 0 | st = Status::InternalError(err_msg); |
247 | 0 | return false; |
248 | 0 | } |
249 | | |
250 | 0 | return true; |
251 | 0 | }); |
252 | |
|
253 | 0 | if (succ) { |
254 | 0 | return seg_file_size; |
255 | 0 | } |
256 | | |
257 | 0 | return ResultError(st); |
258 | 0 | } |
259 | | |
260 | 894 | InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; |
261 | | |
262 | 263 | Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) { |
263 | 263 | std::lock_guard lock(_lock); |
264 | 263 | if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end()) |
265 | 0 | [[unlikely]] { |
266 | 0 | DCHECK(false); |
267 | 0 | return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id); |
268 | 0 | } |
269 | 263 | _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); |
270 | 263 | return Status::OK(); |
271 | 263 | } |
272 | | |
273 | 328 | Status InvertedIndexFileCollection::begin_close() { |
274 | 328 | std::lock_guard lock(_lock); |
275 | 328 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
276 | 36 | RETURN_IF_ERROR(writer->begin_close()); |
277 | 36 | _total_size += writer->get_index_file_total_size(); |
278 | 36 | } |
279 | | |
280 | 328 | return Status::OK(); |
281 | 328 | } |
282 | | |
283 | 861 | Status InvertedIndexFileCollection::finish_close() { |
284 | 861 | std::lock_guard lock(_lock); |
285 | 861 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
286 | 262 | RETURN_IF_ERROR(writer->finish_close()); |
287 | 262 | } |
288 | 861 | return Status::OK(); |
289 | 861 | } |
290 | | |
291 | | Result<std::vector<const InvertedIndexFileInfo*>> |
292 | 106 | InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { |
293 | 106 | std::lock_guard lock(_lock); |
294 | | |
295 | 106 | Status st; |
296 | 106 | std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size()); |
297 | 106 | bool succ = std::all_of( |
298 | 106 | _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), |
299 | 213 | [&](auto&& it) { |
300 | 213 | auto&& [seg_id, writer] = it; |
301 | | |
302 | 213 | int idx = seg_id - seg_id_offset; |
303 | 213 | if (idx >= idx_file_info.size()) [[unlikely]] { |
304 | 0 | auto err_msg = |
305 | 0 | fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", |
306 | 0 | seg_id, idx_file_info.size(), seg_id_offset); |
307 | 0 | DCHECK(false) << err_msg; |
308 | 0 | st = Status::InternalError(err_msg); |
309 | 0 | return false; |
310 | 0 | } |
311 | 213 | idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); |
312 | 213 | return true; |
313 | 213 | }); |
314 | | |
315 | 106 | if (succ) { |
316 | 106 | return idx_file_info; |
317 | 106 | } |
318 | | |
319 | 0 | return ResultError(st); |
320 | 106 | } |
321 | | |
322 | | BaseBetaRowsetWriter::BaseBetaRowsetWriter() |
323 | 894 | : _num_segment(0), |
324 | 894 | _segment_start_id(0), |
325 | 894 | _num_rows_written(0), |
326 | 894 | _total_data_size(0), |
327 | 894 | _total_index_size(0), |
328 | 894 | _segment_creator(_context, _seg_files, _idx_files) {} |
329 | | |
330 | | BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) |
331 | 893 | : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} |
332 | | |
333 | 4 | RowBinlogRowsetWriter::RowBinlogRowsetWriter(StorageEngine& engine) : BetaRowsetWriter(engine) {} |
334 | | |
335 | 894 | BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { |
336 | 894 | if (!_already_built && _rowset_meta->is_local()) { |
337 | | // abnormal exit, remove all files generated |
338 | 8 | auto& fs = io::global_local_filesystem(); |
339 | 8 | for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { |
340 | 0 | std::string seg_path = |
341 | 0 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i); |
342 | | // Even if an error is encountered, these files that have not been cleaned up |
343 | | // will be cleaned up by the GC background. So here we only print the error |
344 | | // message when we encounter an error. |
345 | 0 | WARN_IF_ERROR(fs->delete_file(seg_path), |
346 | 0 | fmt::format("Failed to delete file={}", seg_path)); |
347 | 0 | } |
348 | 8 | } |
349 | 894 | if (_calc_delete_bitmap_token) { |
350 | 8 | _calc_delete_bitmap_token->cancel(); |
351 | 8 | } |
352 | 894 | } |
353 | | |
354 | 893 | BetaRowsetWriter::~BetaRowsetWriter() { |
355 | | /* Note that segcompaction is async and in parallel with load job. So we should handle carefully |
356 | | * when the job is cancelled. Although it is meaningless to continue segcompaction when the job |
357 | | * is cancelled, the objects involved in the job should be preserved during segcompaction to |
358 | | * avoid crashs for memory issues. */ |
359 | 893 | WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); |
360 | 893 | } |
361 | | |
362 | 893 | Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
363 | 893 | _context = rowset_writer_context; |
364 | 893 | DCHECK(_context.tablet_schema != nullptr); |
365 | 893 | _rowset_meta.reset(new RowsetMeta); |
366 | 893 | if (_context.storage_resource) { |
367 | 0 | _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
368 | 0 | } |
369 | 893 | _rowset_meta->set_rowset_id(_context.rowset_id); |
370 | 893 | _rowset_meta->set_partition_id(_context.partition_id); |
371 | 893 | _rowset_meta->set_tablet_id(_context.tablet_id); |
372 | 893 | _rowset_meta->set_db_id(_context.db_id); |
373 | 893 | _rowset_meta->set_table_id(_context.table_id); |
374 | 893 | _rowset_meta->set_index_id(_context.index_id); |
375 | 893 | _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
376 | 893 | _rowset_meta->set_rowset_type(_context.rowset_type); |
377 | 893 | _rowset_meta->set_rowset_state(_context.rowset_state); |
378 | 893 | _rowset_meta->set_segments_overlap(_context.segments_overlap); |
379 | 893 | if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
380 | 30 | _is_pending = true; |
381 | 30 | _rowset_meta->set_txn_id(_context.txn_id); |
382 | 30 | _rowset_meta->set_load_id(_context.load_id); |
383 | 863 | } else { |
384 | 863 | _rowset_meta->set_version(_context.version); |
385 | 863 | _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
386 | 863 | } |
387 | 893 | _rowset_meta->set_tablet_uid(_context.tablet_uid); |
388 | 893 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
389 | 893 | _rowset_meta->set_compaction_level(_context.compaction_level); |
390 | 893 | if (_context.write_binlog_opt().enable) { |
391 | 4 | _rowset_meta->mark_row_binlog(); |
392 | 4 | } |
393 | 893 | _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
394 | 893 | _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
395 | 893 | return Status::OK(); |
396 | 893 | } |
397 | | |
398 | 1.36k | Status BaseBetaRowsetWriter::add_block(const Block* block) { |
399 | 1.36k | return _segment_creator.add_block(block); |
400 | 1.36k | } |
401 | | |
402 | 63 | Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { |
403 | 63 | SCOPED_RAW_TIMER(&_delete_bitmap_ns); |
404 | 63 | if (_context.is_transient_rowset_writer || |
405 | 63 | !_context.tablet->enable_unique_key_merge_on_write() || |
406 | 63 | (_context.partial_update_info && _context.partial_update_info->is_partial_update())) { |
407 | 0 | return Status::OK(); |
408 | 0 | } |
409 | 63 | std::vector<RowsetSharedPtr> specified_rowsets; |
410 | 63 | { |
411 | 63 | std::shared_lock meta_rlock(_context.tablet->get_header_lock()); |
412 | 63 | specified_rowsets = |
413 | 63 | _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get()); |
414 | 63 | } |
415 | | |
416 | | // Submit the entire delete bitmap calculation process to thread pool for async execution |
417 | | // This avoids blocking memtable flush thread while waiting for file upload to complete |
418 | | // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap |
419 | 63 | return _calc_delete_bitmap_token->submit_func( |
420 | 63 | [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status { |
421 | 63 | Status st = Status::OK(); |
422 | | // Step 1: Close file_writer (must be done before load_segments) |
423 | 63 | auto* file_writer = _seg_files.get(segment_id); |
424 | 63 | if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) { |
425 | 63 | MonotonicStopWatch close_timer; |
426 | 63 | close_timer.start(); |
427 | 63 | st = file_writer->close(); |
428 | 63 | close_timer.stop(); |
429 | | |
430 | 63 | auto close_time_ms = close_timer.elapsed_time_milliseconds(); |
431 | 63 | if (close_time_ms > 1000) { |
432 | 0 | LOG(INFO) << "file_writer->close() took " << close_time_ms |
433 | 0 | << "ms for segment_id=" << segment_id |
434 | 0 | << ", tablet_id=" << _context.tablet_id |
435 | 0 | << ", rowset_id=" << _context.rowset_id; |
436 | 0 | } |
437 | 63 | if (!st.ok()) { |
438 | 0 | return st; |
439 | 0 | } |
440 | 63 | } |
441 | | |
442 | 63 | OlapStopWatch watch; |
443 | | // Step 2: Build tmp rowset (needs file_writer to be closed) |
444 | 63 | RowsetSharedPtr rowset_ptr; |
445 | 63 | st = _build_tmp(rowset_ptr); |
446 | 63 | if (!st.ok()) { |
447 | 0 | return st; |
448 | 0 | } |
449 | | |
450 | | // Step 3: Load segments (needs file_writer to be closed and rowset to be built) |
451 | 63 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
452 | 63 | std::vector<segment_v2::SegmentSharedPtr> segments; |
453 | 63 | st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments); |
454 | 63 | if (!st.ok()) { |
455 | 0 | return st; |
456 | 0 | } |
457 | | |
458 | | // Step 4: Calculate delete bitmap |
459 | 63 | st = BaseTablet::calc_delete_bitmap( |
460 | 63 | _context.tablet, rowset_ptr, segments, specified_rowsets, |
461 | 63 | _context.mow_context->delete_bitmap, _context.mow_context->max_version, |
462 | 63 | nullptr, nullptr, nullptr); |
463 | 63 | if (!st.ok()) { |
464 | 0 | return st; |
465 | 0 | } |
466 | | |
467 | 63 | size_t total_rows = |
468 | 63 | std::accumulate(segments.begin(), segments.end(), 0, |
469 | 63 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { |
470 | 63 | return sum += s->num_rows(); |
471 | 63 | }); |
472 | 63 | LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " |
473 | 63 | << _context.tablet->tablet_id() |
474 | 63 | << ", rowset_ids: " << _context.mow_context->rowset_ids->size() |
475 | 63 | << ", cur max_version: " << _context.mow_context->max_version |
476 | 63 | << ", transaction_id: " << _context.mow_context->txn_id |
477 | 63 | << ", delete_bitmap_count: " |
478 | 63 | << _context.mow_context->delete_bitmap->get_delete_bitmap_count() |
479 | 63 | << ", delete_bitmap_cardinality: " |
480 | 63 | << _context.mow_context->delete_bitmap->cardinality() |
481 | 63 | << ", cost: " << watch.get_elapse_time_us() |
482 | 63 | << "(us), total rows: " << total_rows; |
483 | 63 | return Status::OK(); |
484 | 63 | }); |
485 | 63 | } |
486 | | |
487 | 893 | Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
488 | 893 | RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); |
489 | 893 | if (_segcompaction_worker) { |
490 | 893 | _segcompaction_worker->init_mem_tracker(rowset_writer_context); |
491 | 893 | } |
492 | 893 | if (_context.mow_context != nullptr) { |
493 | 8 | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
494 | 8 | } |
495 | 893 | return Status::OK(); |
496 | 893 | } |
497 | | |
498 | | Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, |
499 | 77 | int32_t segment_id) { |
500 | 77 | DCHECK(_rowset_meta->is_local()); |
501 | 77 | auto fs = _rowset_meta->fs(); |
502 | 77 | if (!fs) { |
503 | 0 | return Status::Error<INIT_FAILED>( |
504 | 0 | "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); |
505 | 0 | } |
506 | 77 | auto path = |
507 | 77 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id); |
508 | 77 | io::FileReaderOptions reader_options { |
509 | 77 | .cache_type = |
510 | 77 | _context.write_file_cache |
511 | 77 | ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
512 | 0 | : io::FileCachePolicy::NO_CACHE) |
513 | 77 | : io::FileCachePolicy::NO_CACHE, |
514 | 77 | .is_doris_table = true, |
515 | 77 | .cache_base_path {}, |
516 | 77 | .tablet_id = _rowset_meta->tablet_id(), |
517 | 77 | }; |
518 | 77 | auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), |
519 | 77 | _context.tablet_schema, reader_options, &segment); |
520 | 77 | if (!s.ok()) { |
521 | 0 | LOG(WARNING) << "failed to open segment. " << path << ":" << s; |
522 | 0 | return s; |
523 | 0 | } |
524 | 77 | return Status::OK(); |
525 | 77 | } |
526 | | |
527 | | /* policy of segcompaction target selection: |
528 | | * 1. skip big segments |
529 | | * 2. if the consecutive smalls end up with a big, compact the smalls, except |
530 | | * single small |
531 | | * 3. if the consecutive smalls end up with small, compact the smalls if the |
532 | | * length is beyond (config::segcompaction_batch_size / 2) |
533 | | */ |
534 | | Status BetaRowsetWriter::_find_longest_consecutive_small_segment( |
535 | 15 | SegCompactionCandidatesSharedPtr& segments) { |
536 | 15 | segments = std::make_shared<SegCompactionCandidates>(); |
537 | | // skip last (maybe active) segment |
538 | 15 | int32_t last_segment = _num_segment - 1; |
539 | 15 | size_t task_bytes = 0; |
540 | 15 | uint32_t task_rows = 0; |
541 | 15 | int32_t segid; |
542 | 15 | for (segid = _segcompacted_point; |
543 | 86 | segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { |
544 | 77 | segment_v2::SegmentSharedPtr segment; |
545 | 77 | RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); |
546 | 77 | const auto segment_rows = segment->num_rows(); |
547 | 77 | const auto segment_bytes = segment->file_reader()->size(); |
548 | 77 | bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || |
549 | 77 | segment_bytes > config::segcompaction_candidate_max_bytes; |
550 | 77 | if (is_large_segment) { |
551 | 14 | if (segid == _segcompacted_point) { |
552 | | // skip large segments at the front |
553 | 8 | auto dst_seg_id = _num_segcompacted.load(); |
554 | 8 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
555 | 8 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
556 | 4 | _segcompaction_worker->convert_segment_delete_bitmap( |
557 | 4 | _context.mow_context->delete_bitmap, segid, dst_seg_id); |
558 | 4 | } |
559 | 8 | continue; |
560 | 8 | } else { |
561 | | // stop because we need consecutive segments |
562 | 6 | break; |
563 | 6 | } |
564 | 14 | } |
565 | 63 | bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || |
566 | 63 | task_bytes + segment_bytes > config::segcompaction_task_max_bytes; |
567 | 63 | if (is_task_full) { |
568 | 0 | break; |
569 | 0 | } |
570 | 63 | segments->push_back(segment); |
571 | 63 | task_rows += segment->num_rows(); |
572 | 63 | task_bytes += segment->file_reader()->size(); |
573 | 63 | } |
574 | 15 | size_t s = segments->size(); |
575 | 15 | if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { |
576 | | // we didn't collect enough segments, better to do it in next |
577 | | // round to compact more at once |
578 | 0 | segments->clear(); |
579 | 0 | return Status::OK(); |
580 | 0 | } |
581 | 15 | if (s == 1) { // poor bachelor, let it go |
582 | 4 | VLOG_DEBUG << "only one candidate segment"; |
583 | 4 | auto src_seg_id = _segcompacted_point.load(); |
584 | 4 | auto dst_seg_id = _num_segcompacted.load(); |
585 | 4 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
586 | 4 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
587 | 2 | _segcompaction_worker->convert_segment_delete_bitmap( |
588 | 2 | _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); |
589 | 2 | } |
590 | 4 | segments->clear(); |
591 | 4 | return Status::OK(); |
592 | 4 | } |
593 | 11 | if (VLOG_DEBUG_IS_ON) { |
594 | 0 | vlog_buffer.clear(); |
595 | 0 | for (auto& segment : (*segments.get())) { |
596 | 0 | fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); |
597 | 0 | } |
598 | 0 | VLOG_DEBUG << "candidate segments num:" << s |
599 | 0 | << " list of candidates:" << fmt::to_string(vlog_buffer); |
600 | 0 | } |
601 | 11 | return Status::OK(); |
602 | 15 | } |
603 | | |
604 | 11 | Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { |
605 | 11 | int ret; |
606 | 11 | auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
607 | 11 | _context.rowset_id, begin, end); |
608 | 11 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
609 | 11 | _num_segcompacted); |
610 | 11 | ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
611 | 11 | if (ret) { |
612 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
613 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
614 | 0 | errno); |
615 | 0 | } |
616 | 11 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
617 | | |
618 | | // rename inverted index files |
619 | 11 | RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0)); |
620 | | |
621 | 11 | _num_segcompacted++; |
622 | 11 | return Status::OK(); |
623 | 11 | } |
624 | | |
625 | | void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin, |
626 | 43 | uint32_t end) { |
627 | 43 | VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; |
628 | 134 | for (uint32_t i = begin; i <= end; ++i) { |
629 | 91 | _segid_statistics_map.erase(i); |
630 | 91 | } |
631 | 43 | } |
632 | | |
633 | 42 | Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) { |
634 | 42 | if (seg_id == _num_segcompacted) { |
635 | 10 | ++_num_segcompacted; |
636 | 10 | return Status::OK(); |
637 | 10 | } |
638 | | |
639 | 32 | auto src_seg_path = |
640 | 32 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id); |
641 | 32 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
642 | 32 | _num_segcompacted); |
643 | 32 | VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " |
644 | 0 | << dst_seg_path; |
645 | 32 | { |
646 | 32 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
647 | 32 | DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); |
648 | 32 | DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), |
649 | 32 | true); |
650 | 32 | auto org = _segid_statistics_map[seg_id]; |
651 | 32 | _segid_statistics_map.emplace(_num_segcompacted, org); |
652 | 32 | _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); |
653 | 32 | } |
654 | 32 | int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
655 | 32 | if (ret) { |
656 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
657 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
658 | 0 | errno); |
659 | 0 | } |
660 | | |
661 | 32 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
662 | | // rename remaining inverted index files |
663 | 32 | RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id)); |
664 | | |
665 | 32 | ++_num_segcompacted; |
666 | 32 | return Status::OK(); |
667 | 32 | } |
668 | | |
669 | | Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id, |
670 | 43 | const std::string& segment_path) { |
671 | 43 | auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
672 | 43 | if (!footer_page_cache) { |
673 | 0 | return Status::OK(); |
674 | 0 | } |
675 | | |
676 | 43 | auto fs = _rowset_meta->fs(); |
677 | 43 | bool exists = false; |
678 | 43 | RETURN_IF_ERROR(fs->exists(segment_path, &exists)); |
679 | 43 | if (exists) { |
680 | 43 | io::FileReaderSPtr file_reader; |
681 | 43 | io::FileReaderOptions reader_options { |
682 | 43 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
683 | 43 | : io::FileCachePolicy::NO_CACHE, |
684 | 43 | .is_doris_table = true, |
685 | 43 | .cache_base_path = "", |
686 | 43 | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
687 | 43 | .tablet_id = _rowset_meta->tablet_id(), |
688 | 43 | }; |
689 | 43 | RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options)); |
690 | 43 | DCHECK(file_reader != nullptr); |
691 | 43 | auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader); |
692 | 43 | footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE); |
693 | 43 | } |
694 | 43 | return Status::OK(); |
695 | 43 | } |
696 | | |
697 | 43 | Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) { |
698 | 43 | int ret; |
699 | | |
700 | 43 | auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path, |
701 | 32 | _context.rowset_id.to_string(), seg_id) |
702 | 43 | : BetaRowset::local_segment_path_segcompacted( |
703 | 11 | _context.tablet_path, _context.rowset_id, begin, end); |
704 | 43 | auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path); |
705 | 43 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
706 | 43 | _num_segcompacted); |
707 | 43 | auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path); |
708 | | |
709 | 43 | if (_context.tablet_schema->get_inverted_index_storage_format() >= |
710 | 43 | InvertedIndexStorageFormatPB::V2) { |
711 | 22 | if (_context.tablet_schema->has_inverted_index() || |
712 | 22 | _context.tablet_schema->has_ann_index()) { |
713 | 22 | auto src_idx_path = |
714 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix); |
715 | 22 | auto dst_idx_path = |
716 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix); |
717 | | |
718 | 22 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
719 | 22 | if (ret) { |
720 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
721 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path, |
722 | 0 | ret, errno); |
723 | 0 | } |
724 | 22 | } |
725 | 22 | } |
726 | | // rename remaining inverted index files |
727 | 150 | for (auto column : _context.tablet_schema->columns()) { |
728 | 150 | auto index_infos = _context.tablet_schema->inverted_indexs(*column); |
729 | 150 | for (const auto& index_info : index_infos) { |
730 | 44 | auto index_id = index_info->index_id(); |
731 | 44 | if (_context.tablet_schema->get_inverted_index_storage_format() == |
732 | 44 | InvertedIndexStorageFormatPB::V1) { |
733 | 0 | auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
734 | 0 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
735 | 0 | auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
736 | 0 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
737 | 0 | VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to " |
738 | 0 | << dst_idx_path; |
739 | 0 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
740 | 0 | if (ret) { |
741 | 0 | return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>( |
742 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, |
743 | 0 | dst_idx_path, ret, errno); |
744 | 0 | } |
745 | 0 | } |
746 | | // Erase the origin index file cache |
747 | 44 | auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
748 | 44 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
749 | 44 | auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
750 | 44 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
751 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key)); |
752 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key)); |
753 | 44 | } |
754 | 150 | } |
755 | 43 | return Status::OK(); |
756 | 43 | } |
757 | | |
758 | 1.20k | Status BetaRowsetWriter::_segcompaction_if_necessary() { |
759 | 1.20k | Status status = Status::OK(); |
760 | | // if not doing segcompaction, just check segment number |
761 | 1.20k | if (!config::enable_segcompaction || !_context.enable_segcompaction || |
762 | 1.20k | _context.tablet_schema->num_variant_columns() > 0) { |
763 | 1.08k | return _check_segment_number_limit(_num_segment); |
764 | 1.08k | } |
765 | | // leave _is_doing_segcompaction as the last condition |
766 | | // otherwise _segcompacting_cond will never get notified |
767 | 121 | if (_is_doing_segcompaction.exchange(true)) { |
768 | 0 | return status; |
769 | 0 | } |
770 | 121 | if (_segcompaction_status.load() != OK) { |
771 | 0 | status = Status::Error<SEGCOMPACTION_FAILED>( |
772 | 0 | "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", |
773 | 0 | _segcompaction_status.load()); |
774 | 121 | } else { |
775 | 121 | status = _check_segment_number_limit(_num_segcompacted); |
776 | 121 | } |
777 | 121 | if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { |
778 | 15 | SegCompactionCandidatesSharedPtr segments; |
779 | 15 | status = _find_longest_consecutive_small_segment(segments); |
780 | 15 | if (LIKELY(status.ok()) && (!segments->empty())) { |
781 | 11 | LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id |
782 | 11 | << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment |
783 | 11 | << ", segcompacted_point:" << _segcompacted_point; |
784 | 11 | status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); |
785 | 11 | if (status.ok()) { |
786 | 11 | return status; |
787 | 11 | } |
788 | 11 | } |
789 | 15 | } |
790 | 110 | { |
791 | 110 | std::lock_guard lk(_is_doing_segcompaction_lock); |
792 | 110 | _is_doing_segcompaction = false; |
793 | 110 | _segcompacting_cond.notify_all(); |
794 | 110 | } |
795 | 110 | return status; |
796 | 121 | } |
797 | | |
798 | 533 | Status BetaRowsetWriter::_segcompaction_rename_last_segments() { |
799 | 533 | DCHECK_EQ(_is_doing_segcompaction, false); |
800 | 533 | if (!config::enable_segcompaction) { |
801 | 144 | return Status::OK(); |
802 | 144 | } |
803 | 389 | if (_segcompaction_status.load() != OK) { |
804 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
805 | 0 | "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error " |
806 | 0 | "code: {}", |
807 | 0 | _segcompaction_status.load()); |
808 | 0 | } |
809 | 389 | if (!is_segcompacted() || _segcompacted_point == _num_segment) { |
810 | | // no need if never segcompact before or all segcompacted |
811 | 380 | return Status::OK(); |
812 | 380 | } |
813 | | // currently we only rename remaining segments to reduce wait time |
814 | | // so that transaction can be committed ASAP |
815 | 9 | VLOG_DEBUG << "segcompaction last few segments"; |
816 | 39 | for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { |
817 | 30 | auto dst_segid = _num_segcompacted.load(); |
818 | 30 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
819 | 30 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
820 | 16 | _segcompaction_worker->convert_segment_delete_bitmap( |
821 | 16 | _context.mow_context->delete_bitmap, segid, dst_segid); |
822 | 16 | } |
823 | 30 | } |
824 | 9 | return Status::OK(); |
825 | 9 | } |
826 | | |
827 | 10 | Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { |
828 | 10 | assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); |
829 | 10 | RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); |
830 | 10 | _num_rows_written += rowset->num_rows(); |
831 | 10 | const auto& rowset_meta = rowset->rowset_meta(); |
832 | 10 | auto index_size = rowset_meta->index_disk_size(); |
833 | 10 | auto total_size = rowset_meta->total_disk_size(); |
834 | 10 | auto data_size = rowset_meta->data_disk_size(); |
835 | | // corrupted index size caused by bug before 2.1.5 or 3.0.0 version |
836 | | // try to get real index size from disk. |
837 | 10 | if (index_size < 0 || index_size > total_size * 2) { |
838 | 0 | LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size |
839 | 0 | << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id() |
840 | 0 | << " rowset:" << rowset_meta->rowset_id(); |
841 | 0 | index_size = 0; |
842 | 0 | auto st = rowset->get_inverted_index_size(&index_size); |
843 | 0 | if (!st.ok()) { |
844 | 0 | if (!st.is<NOT_FOUND>()) { |
845 | 0 | LOG(ERROR) << "failed to get inverted index size. res=" << st; |
846 | 0 | return st; |
847 | 0 | } |
848 | 0 | } |
849 | 0 | } |
850 | 10 | _total_data_size += data_size; |
851 | 10 | _total_index_size += index_size; |
852 | 10 | _num_segment += cast_set<int32_t>(rowset->num_segments()); |
853 | | // append key_bounds to current rowset |
854 | 10 | RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); |
855 | 10 | rowset->get_num_segment_rows(&_segment_num_rows); |
856 | 10 | _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); |
857 | | |
858 | | // TODO update zonemap |
859 | 10 | if (rowset->rowset_meta()->has_delete_predicate()) { |
860 | 0 | _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate()); |
861 | 0 | } |
862 | | // Update the tablet schema in the rowset metadata if the tablet schema contains a variant. |
863 | | // During the build process, _context.tablet_schema will be used as the rowset schema. |
864 | | // This situation may arise in the event of a linked schema change. If this schema is not set, |
865 | | // the subcolumns of the variant will be lost. |
866 | 10 | if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) { |
867 | 0 | _context.tablet_schema = rowset->tablet_schema(); |
868 | 0 | } |
869 | 10 | return Status::OK(); |
870 | 10 | } |
871 | | |
872 | 0 | Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { |
873 | | // TODO use schema_mapping to transfer zonemap |
874 | 0 | return add_rowset(rowset); |
875 | 0 | } |
876 | | |
877 | 1.03k | Status BaseBetaRowsetWriter::flush() { |
878 | 1.03k | return _segment_creator.flush(); |
879 | 1.03k | } |
880 | | |
881 | 12 | Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) { |
882 | 12 | if (block->rows() == 0) { |
883 | 0 | return Status::OK(); |
884 | 0 | } |
885 | | |
886 | 12 | { |
887 | 12 | SCOPED_RAW_TIMER(&_segment_writer_ns); |
888 | 12 | RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); |
889 | 12 | } |
890 | 12 | return Status::OK(); |
891 | 12 | } |
892 | | |
893 | 0 | Status BaseBetaRowsetWriter::flush_single_block(const Block* block) { |
894 | 0 | return _segment_creator.flush_single_block(block); |
895 | 0 | } |
896 | | |
897 | 893 | Status BetaRowsetWriter::_wait_flying_segcompaction() { |
898 | 893 | std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); |
899 | 893 | uint64_t begin_wait = GetCurrentTimeMicros(); |
900 | 893 | while (_is_doing_segcompaction) { |
901 | | // change sync wait to async? |
902 | 0 | _segcompacting_cond.wait(l); |
903 | 0 | } |
904 | 893 | uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; |
905 | 893 | if (elapsed >= MICROS_PER_SEC) { |
906 | 0 | LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; |
907 | 0 | } |
908 | 893 | if (_segcompaction_status.load() != OK) { |
909 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
910 | 0 | "BetaRowsetWriter meet invalid state, error code: {}", |
911 | 0 | _segcompaction_status.load()); |
912 | 0 | } |
913 | 893 | return Status::OK(); |
914 | 893 | } |
915 | | |
916 | 24 | RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { |
917 | 24 | if (_rowset_meta->newest_write_timestamp() == -1) { |
918 | 0 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
919 | 0 | } |
920 | | |
921 | 24 | build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); |
922 | 24 | RowsetSharedPtr rowset; |
923 | 24 | auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, |
924 | 24 | _rowset_meta, &rowset); |
925 | 24 | if (!status.ok()) { |
926 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
927 | 0 | return nullptr; |
928 | 0 | } |
929 | 24 | _already_built = true; |
930 | 24 | return rowset; |
931 | 24 | } |
932 | | |
933 | 533 | Status BaseBetaRowsetWriter::_close_file_writers() { |
934 | | // Flush and close segment files |
935 | 533 | RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), |
936 | 533 | "failed to close segment creator when build new rowset"); |
937 | 533 | return Status::OK(); |
938 | 533 | } |
939 | | |
940 | 533 | Status BetaRowsetWriter::_close_file_writers() { |
941 | 533 | RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); |
942 | | // if _segment_start_id is not zero, that means it's a transient rowset writer for |
943 | | // MoW partial update, don't need to do segment compaction. |
944 | 533 | if (_segment_start_id == 0) { |
945 | 533 | if (_segcompaction_worker->cancel()) { |
946 | 533 | std::lock_guard lk(_is_doing_segcompaction_lock); |
947 | 533 | _is_doing_segcompaction = false; |
948 | 533 | _segcompacting_cond.notify_all(); |
949 | 533 | } else { |
950 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), |
951 | 0 | "segcompaction failed when build new rowset"); |
952 | 0 | } |
953 | 533 | RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), |
954 | 533 | "rename last segments failed when build new rowset"); |
955 | | // segcompaction worker would do file wrier's close function in compact_segments |
956 | 533 | if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer(); |
957 | 533 | nullptr != seg_comp_file_writer && |
958 | 533 | seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) { |
959 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(), |
960 | 0 | "close segment compaction worker failed"); |
961 | 0 | } |
962 | | // process delete bitmap for mow table |
963 | 533 | if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { |
964 | 4 | auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); |
965 | | // which means the segment compaction is triggerd |
966 | 4 | if (converted_delete_bitmap != nullptr) { |
967 | 4 | RowsetIdUnorderedSet rowsetids; |
968 | 4 | rowsetids.insert(rowset_id()); |
969 | 4 | context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), |
970 | 4 | rowsetids); |
971 | 4 | context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, |
972 | 4 | {rowset_id(), UINT32_MAX, INT64_MAX}); |
973 | 4 | context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); |
974 | 4 | } |
975 | 4 | } |
976 | 533 | } |
977 | 533 | return Status::OK(); |
978 | 533 | } |
979 | | |
980 | 861 | Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { |
981 | 861 | if (_calc_delete_bitmap_token != nullptr) { |
982 | 8 | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
983 | 8 | } |
984 | 861 | RETURN_IF_ERROR(_close_file_writers()); |
985 | 861 | const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; |
986 | 861 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), |
987 | 861 | "too many segments when build new rowset"); |
988 | 861 | RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true)); |
989 | 861 | if (_is_pending) { |
990 | 25 | _rowset_meta->set_rowset_state(COMMITTED); |
991 | 836 | } else { |
992 | 836 | _rowset_meta->set_rowset_state(VISIBLE); |
993 | 836 | } |
994 | | |
995 | 861 | if (_rowset_meta->newest_write_timestamp() == -1) { |
996 | 629 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
997 | 629 | } |
998 | | |
999 | 861 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
1000 | | |
1001 | | // If segment compaction occurs, the idx file info will become inaccurate. |
1002 | 861 | if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) && |
1003 | 861 | _num_segcompacted == 0) { |
1004 | 106 | if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
1005 | 106 | !idx_files_info.has_value()) [[unlikely]] { |
1006 | 0 | LOG(ERROR) << "expected inverted index files info, but none presents: " |
1007 | 0 | << idx_files_info.error(); |
1008 | 106 | } else { |
1009 | 106 | _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
1010 | 106 | } |
1011 | 106 | } |
1012 | | |
1013 | 861 | RETURN_NOT_OK_STATUS_WITH_WARN( |
1014 | 861 | RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
1015 | 861 | &rowset), |
1016 | 861 | "rowset init failed when build new rowset"); |
1017 | 861 | _already_built = true; |
1018 | 861 | return Status::OK(); |
1019 | 861 | } |
1020 | | |
1021 | 0 | int64_t BaseBetaRowsetWriter::_num_seg() const { |
1022 | 0 | return _num_segment; |
1023 | 0 | } |
1024 | | |
1025 | 924 | int64_t BetaRowsetWriter::_num_seg() const { |
1026 | 924 | return is_segcompacted() ? _num_segcompacted : _num_segment; |
1027 | 924 | } |
1028 | | |
1029 | 924 | Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
1030 | 924 | int64_t num_rows_written = 0; |
1031 | 924 | int64_t total_data_size = 0; |
1032 | 924 | int64_t total_index_size = 0; |
1033 | 924 | std::vector<KeyBoundsPB> segments_encoded_key_bounds; |
1034 | 924 | std::vector<uint32_t> segment_rows; |
1035 | 924 | std::optional<bool> segments_key_bounds_truncated; |
1036 | 924 | { |
1037 | 924 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1038 | 1.50k | for (const auto& itr : _segid_statistics_map) { |
1039 | 1.50k | num_rows_written += itr.second.row_num; |
1040 | 1.50k | total_data_size += itr.second.data_size; |
1041 | 1.50k | total_index_size += itr.second.index_size; |
1042 | 1.50k | segments_encoded_key_bounds.push_back(itr.second.key_bounds); |
1043 | | // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load |
1044 | 1.50k | segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num)); |
1045 | 1.50k | } |
1046 | 924 | segments_key_bounds_truncated = _segments_key_bounds_truncated; |
1047 | 924 | } |
1048 | 924 | if (segment_rows.empty()) { |
1049 | | // vertical compaction and linked schema change will not record segment statistics, |
1050 | | // it will record segment rows in _segment_num_rows |
1051 | 308 | RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); |
1052 | 308 | } |
1053 | | |
1054 | 1.18k | for (auto& key_bound : _segments_encoded_key_bounds) { |
1055 | 1.18k | segments_encoded_key_bounds.push_back(key_bound); |
1056 | 1.18k | } |
1057 | 924 | if (segments_key_bounds_truncated.has_value()) { |
1058 | 406 | rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated.value()); |
1059 | 406 | } |
1060 | 924 | rowset_meta->set_num_segment_rows(segment_rows); |
1061 | | // segment key bounds are empty in old version(before version 1.2.x). So we should not modify |
1062 | | // the overlap property when key bounds are empty. |
1063 | | // for mow table with cluster keys, the overlap is used for cluster keys, |
1064 | | // the key_bounds is primary keys |
1065 | 924 | if (!segments_encoded_key_bounds.empty() && |
1066 | 924 | !is_segment_overlapping(segments_encoded_key_bounds) && |
1067 | 924 | _context.tablet_schema->cluster_key_uids().empty()) { |
1068 | 439 | rowset_meta->set_segments_overlap(NONOVERLAPPING); |
1069 | 439 | } |
1070 | | |
1071 | 924 | auto segment_num = _num_seg(); |
1072 | 924 | if (check_segment_num && config::check_segment_when_build_rowset_meta) { |
1073 | 0 | auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size(); |
1074 | 0 | if (segments_encoded_key_bounds_size != segment_num) { |
1075 | 0 | return Status::InternalError( |
1076 | 0 | "segments_encoded_key_bounds_size should equal to _num_seg, " |
1077 | 0 | "segments_encoded_key_bounds_size " |
1078 | 0 | "is: {}, _num_seg is: {}", |
1079 | 0 | segments_encoded_key_bounds_size, segment_num); |
1080 | 0 | } |
1081 | 0 | if (segment_rows.size() != segment_num) { |
1082 | 0 | return Status::InternalError( |
1083 | 0 | "segment_rows size should equal to _num_seg, segment_rows size is: {}, " |
1084 | 0 | "_num_seg is {}, tablet={}, rowset={}, txn={}", |
1085 | 0 | segment_rows.size(), segment_num, _context.tablet_id, |
1086 | 0 | _context.rowset_id.to_string(), _context.txn_id); |
1087 | 0 | } |
1088 | 0 | } |
1089 | | |
1090 | 924 | rowset_meta->set_num_segments(segment_num); |
1091 | 924 | rowset_meta->set_num_rows(num_rows_written + _num_rows_written); |
1092 | 924 | rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + |
1093 | 924 | _total_index_size); |
1094 | 924 | rowset_meta->set_data_disk_size(total_data_size + _total_data_size); |
1095 | 924 | rowset_meta->set_index_disk_size(total_index_size + _total_index_size); |
1096 | 924 | bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds && |
1097 | 924 | !_context.enable_unique_key_merge_on_write; |
1098 | 924 | rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds); |
1099 | | // TODO write zonemap to meta |
1100 | 924 | rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); |
1101 | 924 | rowset_meta->set_creation_time(time(nullptr)); |
1102 | 924 | return Status::OK(); |
1103 | 924 | } |
1104 | | |
1105 | 63 | Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { |
1106 | 63 | Status status; |
1107 | 63 | std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>(); |
1108 | 63 | tmp_rs_meta->init(_rowset_meta.get()); |
1109 | | |
1110 | 63 | status = _build_rowset_meta(tmp_rs_meta.get()); |
1111 | 63 | if (!status.ok()) { |
1112 | 0 | LOG(WARNING) << "failed to build rowset meta, res=" << status; |
1113 | 0 | return status; |
1114 | 0 | } |
1115 | | |
1116 | 63 | status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta, |
1117 | 63 | &rowset_ptr); |
1118 | 63 | DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed", |
1119 | 63 | { status = Status::InternalError("create rowset failed"); }); |
1120 | 63 | if (!status.ok()) { |
1121 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
1122 | 0 | return status; |
1123 | 0 | } |
1124 | 63 | return Status::OK(); |
1125 | 63 | } |
1126 | | |
1127 | | Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, |
1128 | | io::FileWriterPtr& file_writer, |
1129 | 2.67k | bool is_index_file) { |
1130 | 2.67k | io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file); |
1131 | 2.67k | Status st = _context.fs()->create_file(path, &file_writer, &opts); |
1132 | 2.67k | if (!st.ok()) { |
1133 | 0 | LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; |
1134 | 0 | return st; |
1135 | 0 | } |
1136 | | |
1137 | 2.67k | DCHECK(file_writer != nullptr); |
1138 | 2.67k | return Status::OK(); |
1139 | 2.67k | } |
1140 | | |
1141 | | Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, |
1142 | 2.65k | FileType file_type) { |
1143 | 2.65k | auto segment_path = _context.segment_path(segment_id); |
1144 | 2.65k | if (file_type == FileType::INVERTED_INDEX_FILE) { |
1145 | 253 | std::string prefix = |
1146 | 253 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; |
1147 | 253 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1148 | 253 | return _create_file_writer(index_path, file_writer, true /* is_index_file */); |
1149 | 2.40k | } else if (file_type == FileType::SEGMENT_FILE) { |
1150 | 2.40k | return _create_file_writer(segment_path, file_writer, false /* is_index_file */); |
1151 | 2.40k | } |
1152 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1153 | 0 | fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); |
1154 | 2.65k | } |
1155 | | |
1156 | | Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id, |
1157 | 226 | IndexFileWriterPtr* index_file_writer) { |
1158 | 226 | RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer)); |
1159 | | // used for inverted index format v1 |
1160 | 226 | (*index_file_writer) |
1161 | 226 | ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */)); |
1162 | 226 | return Status::OK(); |
1163 | 226 | } |
1164 | | |
1165 | | Status BetaRowsetWriter::create_segment_writer_for_segcompaction( |
1166 | 12 | std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { |
1167 | 12 | DCHECK(begin >= 0 && end >= 0); |
1168 | 12 | std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
1169 | 12 | _context.rowset_id, begin, end); |
1170 | 12 | io::FileWriterPtr file_writer; |
1171 | 12 | RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */)); |
1172 | | |
1173 | 12 | IndexFileWriterPtr index_file_writer; |
1174 | 12 | if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
1175 | 8 | io::FileWriterPtr idx_file_writer; |
1176 | 8 | std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path)); |
1177 | 8 | if (_context.tablet_schema->get_inverted_index_storage_format() != |
1178 | 8 | InvertedIndexStorageFormatPB::V1) { |
1179 | 8 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1180 | 8 | RETURN_IF_ERROR( |
1181 | 8 | _create_file_writer(index_path, idx_file_writer, true /* is_index_file */)); |
1182 | 8 | } |
1183 | 8 | index_file_writer = std::make_unique<IndexFileWriter>( |
1184 | 8 | _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, |
1185 | 8 | _context.tablet_schema->get_inverted_index_storage_format(), |
1186 | 8 | std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id); |
1187 | 8 | } |
1188 | | |
1189 | 12 | segment_v2::SegmentWriterOptions writer_options; |
1190 | 12 | writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; |
1191 | 12 | writer_options.rowset_ctx = &_context; |
1192 | 12 | writer_options.write_type = _context.write_type; |
1193 | 12 | writer_options.write_type = DataWriteType::TYPE_COMPACTION; |
1194 | 12 | writer_options.max_rows_per_segment = _context.max_rows_per_segment; |
1195 | 12 | writer_options.mow_ctx = _context.mow_context; |
1196 | | |
1197 | 12 | *writer = std::make_unique<segment_v2::SegmentWriter>( |
1198 | 12 | file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, |
1199 | 12 | _context.data_dir, writer_options, index_file_writer.get()); |
1200 | 12 | if (auto& seg_writer = _segcompaction_worker->get_file_writer(); |
1201 | 12 | seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { |
1202 | 0 | RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); |
1203 | 0 | } |
1204 | 12 | _segcompaction_worker->get_file_writer().reset(file_writer.release()); |
1205 | 12 | if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); |
1206 | 12 | idx_file_writer != nullptr) { |
1207 | 0 | RETURN_IF_ERROR(idx_file_writer->begin_close()); |
1208 | 0 | RETURN_IF_ERROR(idx_file_writer->finish_close()); |
1209 | 0 | } |
1210 | 12 | _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); |
1211 | 12 | return Status::OK(); |
1212 | 12 | } |
1213 | | |
1214 | 0 | Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1215 | 0 | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1216 | 0 | { segnum = dp->param("segnum", 1024); }); |
1217 | 0 | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1218 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1219 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " |
1220 | 0 | "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " |
1221 | 0 | "small or if the data is skewed.", |
1222 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1223 | 0 | config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows()); |
1224 | 0 | } |
1225 | 0 | return Status::OK(); |
1226 | 0 | } |
1227 | | |
1228 | 2.06k | Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1229 | 2.06k | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1230 | 2.06k | { segnum = dp->param("segnum", 1024); }); |
1231 | 2.06k | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1232 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1233 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " |
1234 | 0 | "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " |
1235 | 0 | "the bucket number is too small or if the data is skewed.", |
1236 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1237 | 0 | config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, |
1238 | 0 | _num_segcompacted, get_rowset_num_rows()); |
1239 | 0 | } |
1240 | 2.06k | return Status::OK(); |
1241 | 2.06k | } |
1242 | | |
1243 | 1.20k | Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1244 | 1.20k | uint32_t segid_offset = segment_id - _segment_start_id; |
1245 | 1.20k | bool key_bounds_truncated = false; |
1246 | 1.20k | SegmentStatistics stored_segstat = |
1247 | 1.20k | copy_segment_statistics_with_truncated_key_bounds(segstat, key_bounds_truncated); |
1248 | 1.20k | { |
1249 | 1.20k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1250 | 1.20k | CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true); |
1251 | 1.20k | _segid_statistics_map.emplace(segment_id, std::move(stored_segstat)); |
1252 | 1.20k | if (segment_id >= _segment_num_rows.size()) { |
1253 | 1.20k | _segment_num_rows.resize(segment_id + 1); |
1254 | 1.20k | } |
1255 | 1.20k | _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num); |
1256 | 1.20k | if (key_bounds_truncated) { |
1257 | 702 | _segments_key_bounds_truncated = true; |
1258 | 702 | } |
1259 | 1.20k | } |
1260 | 1.20k | VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id |
1261 | 0 | << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size |
1262 | 0 | << " index_size:" << segstat.index_size; |
1263 | | |
1264 | 1.20k | { |
1265 | 1.20k | std::lock_guard<std::mutex> lock(_segment_set_mutex); |
1266 | 1.20k | _segment_set.add(segid_offset); |
1267 | 2.40k | while (_segment_set.contains(_num_segment)) { |
1268 | 1.20k | _num_segment++; |
1269 | 1.20k | } |
1270 | 1.20k | } |
1271 | | |
1272 | 1.20k | if (_context.mow_context != nullptr) { |
1273 | 63 | RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); |
1274 | 63 | } |
1275 | 1.20k | return Status::OK(); |
1276 | 1.20k | } |
1277 | | |
1278 | 1.20k | Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1279 | 1.20k | RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat)); |
1280 | 1.20k | return _segcompaction_if_necessary(); |
1281 | 1.20k | } |
1282 | | |
1283 | | Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( |
1284 | | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, |
1285 | 11 | KeyBoundsPB& key_bounds) { |
1286 | 11 | uint32_t segid = (*writer)->get_segment_id(); |
1287 | 11 | uint32_t row_num = (*writer)->row_count(); |
1288 | 11 | uint64_t segment_size; |
1289 | | |
1290 | 11 | auto s = (*writer)->finalize_footer(&segment_size); |
1291 | 11 | if (!s.ok()) { |
1292 | 0 | return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}", |
1293 | 0 | s.to_string()); |
1294 | 0 | } |
1295 | 11 | int64_t inverted_index_file_size = 0; |
1296 | 11 | RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size)); |
1297 | | |
1298 | 11 | SegmentStatistics segstat; |
1299 | 11 | segstat.row_num = row_num; |
1300 | 11 | segstat.data_size = segment_size; |
1301 | 11 | segstat.index_size = inverted_index_file_size; |
1302 | 11 | bool key_bounds_truncated = copy_key_bounds_with_truncation(key_bounds, &segstat.key_bounds); |
1303 | 11 | { |
1304 | 11 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1305 | 11 | CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); |
1306 | 11 | _segid_statistics_map.emplace(segid, std::move(segstat)); |
1307 | 11 | if (key_bounds_truncated) { |
1308 | 2 | _segments_key_bounds_truncated = true; |
1309 | 2 | } |
1310 | 11 | } |
1311 | 11 | VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num |
1312 | 0 | << " data_size:" << PrettyPrinter::print_bytes(segment_size) |
1313 | 0 | << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size); |
1314 | | |
1315 | 11 | writer->reset(); |
1316 | | |
1317 | 11 | return Status::OK(); |
1318 | 11 | } |
1319 | | |
1320 | | } // namespace doris |