be/src/storage/rowset/beta_rowset_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset_writer.h" |
19 | | |
20 | | #include <assert.h> |
21 | | // IWYU pragma: no_include <bthread/errno.h> |
22 | | #include <errno.h> // IWYU pragma: keep |
23 | | #include <fmt/format.h> |
24 | | #include <stdio.h> |
25 | | |
26 | | #include <chrono> |
27 | | #include <ctime> // time |
28 | | #include <filesystem> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <thread> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
37 | | #include "common/cast_set.h" |
38 | | #include "common/compiler_util.h" // IWYU pragma: keep |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/block/block.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/data_type/data_type_factory.hpp" |
45 | | #include "io/fs/file_reader.h" |
46 | | #include "io/fs/file_system.h" |
47 | | #include "io/fs/file_writer.h" |
48 | | #include "runtime/thread_context.h" |
49 | | #include "storage/index/inverted/inverted_index_cache.h" |
50 | | #include "storage/index/inverted/inverted_index_desc.h" |
51 | | #include "storage/olap_define.h" |
52 | | #include "storage/rowset/beta_rowset.h" |
53 | | #include "storage/rowset/rowset_factory.h" |
54 | | #include "storage/rowset/rowset_writer.h" |
55 | | #include "storage/rowset/segcompaction.h" |
56 | | #include "storage/schema_change/schema_change.h" |
57 | | #include "storage/segment/segment.h" |
58 | | #include "storage/segment/segment_writer.h" |
59 | | #include "storage/storage_engine.h" |
60 | | #include "storage/tablet/tablet_schema.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/pretty_printer.h" |
63 | | #include "util/slice.h" |
64 | | #include "util/stopwatch.hpp" |
65 | | #include "util/time.h" |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | | namespace { |
71 | | |
72 | 704 | bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { |
73 | 704 | std::string_view last; |
74 | 1.85k | for (auto&& segment_encode_key : segments_encoded_key_bounds) { |
75 | 1.85k | auto&& cur_min = segment_encode_key.min_key(); |
76 | 1.85k | auto&& cur_max = segment_encode_key.max_key(); |
77 | 1.85k | if (cur_min <= last) { |
78 | 143 | return true; |
79 | 143 | } |
80 | 1.71k | last = cur_max; |
81 | 1.71k | } |
82 | 561 | return false; |
83 | 704 | } |
84 | | |
85 | | void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, |
86 | 24 | const RowsetMeta& spec_rowset_meta) { |
87 | 24 | rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); |
88 | 24 | rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); |
89 | 24 | rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size()); |
90 | 24 | rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); |
91 | | // TODO write zonemap to meta |
92 | 24 | rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); |
93 | 24 | rowset_meta.set_creation_time(time(nullptr)); |
94 | 24 | rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); |
95 | 24 | rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); |
96 | 24 | rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); |
97 | 24 | rowset_meta.set_segments_key_bounds_truncated( |
98 | 24 | spec_rowset_meta.is_segments_key_bounds_truncated()); |
99 | 24 | rowset_meta.set_db_id(spec_rowset_meta.db_id()); |
100 | 24 | rowset_meta.set_table_id(spec_rowset_meta.table_id()); |
101 | 24 | std::vector<KeyBoundsPB> segments_key_bounds; |
102 | 24 | spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); |
103 | | // Preserve source layout: if source was aggregated (size 1), re-aggregating |
104 | | // the single entry is a no-op that also keeps the flag consistent. |
105 | 24 | rowset_meta.set_segments_key_bounds(segments_key_bounds, |
106 | 24 | spec_rowset_meta.is_segments_key_bounds_aggregated()); |
107 | 24 | std::vector<uint32_t> num_segment_rows; |
108 | 24 | spec_rowset_meta.get_num_segment_rows(&num_segment_rows); |
109 | 24 | rowset_meta.set_num_segment_rows(num_segment_rows); |
110 | 24 | if (spec_rowset_meta.is_row_binlog()) { |
111 | 0 | rowset_meta.mark_row_binlog(); |
112 | 0 | } |
113 | 24 | } |
114 | | |
115 | | } // namespace |
116 | | |
117 | 1.01k | SegmentFileCollection::~SegmentFileCollection() = default; |
118 | | |
119 | 2.38k | Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { |
120 | 2.38k | std::lock_guard lock(_lock); |
121 | 2.38k | if (_closed) [[unlikely]] { |
122 | 0 | DCHECK(false) << writer->path(); |
123 | 0 | return Status::InternalError("add to closed SegmentFileCollection"); |
124 | 0 | } |
125 | | |
126 | 2.38k | _file_writers.emplace(seg_id, std::move(writer)); |
127 | 2.38k | return Status::OK(); |
128 | 2.38k | } |
129 | | |
130 | 63 | io::FileWriter* SegmentFileCollection::get(int seg_id) const { |
131 | 63 | std::lock_guard lock(_lock); |
132 | 63 | if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { |
133 | 63 | return it->second.get(); |
134 | 63 | } else { |
135 | 0 | return nullptr; |
136 | 0 | } |
137 | 63 | } |
138 | | |
139 | 982 | Status SegmentFileCollection::close() { |
140 | 982 | { |
141 | 982 | std::lock_guard lock(_lock); |
142 | 982 | if (_closed) [[unlikely]] { |
143 | 0 | DCHECK(false); |
144 | 0 | return Status::InternalError("double close SegmentFileCollection"); |
145 | 0 | } |
146 | 982 | _closed = true; |
147 | 982 | } |
148 | | |
149 | 2.38k | for (auto&& [_, writer] : _file_writers) { |
150 | 2.38k | DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", { |
151 | 2.38k | auto before_state = writer->state(); |
152 | 2.38k | for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) { |
153 | 2.38k | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
154 | 2.38k | } |
155 | 2.38k | LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path=" |
156 | 2.38k | << writer->path().native() |
157 | 2.38k | << " before_state=" << static_cast<int>(before_state) |
158 | 2.38k | << " after_state=" << static_cast<int>(writer->state()); |
159 | 2.38k | }); |
160 | 2.38k | if (writer->state() != io::FileWriter::State::CLOSED) { |
161 | 2.32k | RETURN_IF_ERROR(writer->close()); |
162 | 2.32k | } |
163 | 2.38k | } |
164 | | |
165 | 982 | return Status::OK(); |
166 | 982 | } |
167 | | |
168 | 0 | Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) { |
169 | 0 | std::lock_guard lock(_lock); |
170 | 0 | if (!_closed) [[unlikely]] { |
171 | 0 | DCHECK(false); |
172 | 0 | return ResultError(Status::InternalError("get segments file size without closed")); |
173 | 0 | } |
174 | | |
175 | 0 | Status st; |
176 | 0 | std::vector<size_t> seg_file_size(_file_writers.size(), 0); |
177 | 0 | bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { |
178 | 0 | auto&& [seg_id, writer] = it; |
179 | |
|
180 | 0 | int idx = seg_id - seg_id_offset; |
181 | 0 | if (idx >= seg_file_size.size()) [[unlikely]] { |
182 | 0 | auto err_msg = fmt::format( |
183 | 0 | "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, |
184 | 0 | seg_file_size.size(), seg_id_offset, writer->path().native()); |
185 | 0 | DCHECK(false) << err_msg; |
186 | 0 | st = Status::InternalError(err_msg); |
187 | 0 | return false; |
188 | 0 | } |
189 | | |
190 | 0 | auto& fsize = seg_file_size[idx]; |
191 | 0 | if (fsize != 0) { |
192 | | // File size should not been set |
193 | 0 | auto err_msg = |
194 | 0 | fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); |
195 | 0 | DCHECK(false) << err_msg; |
196 | 0 | st = Status::InternalError(err_msg); |
197 | 0 | return false; |
198 | 0 | } |
199 | | |
200 | 0 | fsize = writer->bytes_appended(); |
201 | 0 | if (fsize <= 0) { |
202 | 0 | auto err_msg = |
203 | 0 | fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); |
204 | 0 | DCHECK(false) << err_msg; |
205 | 0 | st = Status::InternalError(err_msg); |
206 | 0 | return false; |
207 | 0 | } |
208 | | |
209 | 0 | return true; |
210 | 0 | }); |
211 | |
|
212 | 0 | if (succ) { |
213 | 0 | return seg_file_size; |
214 | 0 | } |
215 | | |
216 | 0 | return ResultError(st); |
217 | 0 | } |
218 | | |
219 | 1.01k | InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; |
220 | | |
221 | 265 | Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) { |
222 | 265 | std::lock_guard lock(_lock); |
223 | 265 | if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end()) |
224 | 0 | [[unlikely]] { |
225 | 0 | DCHECK(false); |
226 | 0 | return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id); |
227 | 0 | } |
228 | 265 | _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); |
229 | 265 | return Status::OK(); |
230 | 265 | } |
231 | | |
232 | 328 | Status InvertedIndexFileCollection::begin_close() { |
233 | 328 | std::lock_guard lock(_lock); |
234 | 328 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
235 | 36 | RETURN_IF_ERROR(writer->begin_close()); |
236 | 36 | _total_size += writer->get_index_file_total_size(); |
237 | 36 | } |
238 | | |
239 | 328 | return Status::OK(); |
240 | 328 | } |
241 | | |
242 | 982 | Status InvertedIndexFileCollection::finish_close() { |
243 | 982 | std::lock_guard lock(_lock); |
244 | 982 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
245 | 264 | RETURN_IF_ERROR(writer->finish_close()); |
246 | 264 | } |
247 | 982 | return Status::OK(); |
248 | 982 | } |
249 | | |
250 | | Result<std::vector<const InvertedIndexFileInfo*>> |
251 | 106 | InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { |
252 | 106 | std::lock_guard lock(_lock); |
253 | | |
254 | 106 | Status st; |
255 | 106 | std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size()); |
256 | 106 | bool succ = std::all_of( |
257 | 106 | _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), |
258 | 215 | [&](auto&& it) { |
259 | 215 | auto&& [seg_id, writer] = it; |
260 | | |
261 | 215 | int idx = seg_id - seg_id_offset; |
262 | 215 | if (idx >= idx_file_info.size()) [[unlikely]] { |
263 | 0 | auto err_msg = |
264 | 0 | fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", |
265 | 0 | seg_id, idx_file_info.size(), seg_id_offset); |
266 | 0 | DCHECK(false) << err_msg; |
267 | 0 | st = Status::InternalError(err_msg); |
268 | 0 | return false; |
269 | 0 | } |
270 | 215 | idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); |
271 | 215 | return true; |
272 | 215 | }); |
273 | | |
274 | 106 | if (succ) { |
275 | 106 | return idx_file_info; |
276 | 106 | } |
277 | | |
278 | 0 | return ResultError(st); |
279 | 106 | } |
280 | | |
281 | | BaseBetaRowsetWriter::BaseBetaRowsetWriter() |
282 | 1.01k | : _num_segment(0), |
283 | 1.01k | _segment_start_id(0), |
284 | 1.01k | _num_rows_written(0), |
285 | 1.01k | _total_data_size(0), |
286 | 1.01k | _total_index_size(0), |
287 | 1.01k | _segment_creator(_context, _seg_files, _idx_files) {} |
288 | | |
289 | | BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) |
290 | 1.01k | : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} |
291 | | |
292 | 9 | RowBinlogRowsetWriter::RowBinlogRowsetWriter(StorageEngine& engine) : BetaRowsetWriter(engine) {} |
293 | | |
294 | 1.01k | BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { |
295 | 1.01k | if (!_already_built && _rowset_meta->is_local()) { |
296 | | // abnormal exit, remove all files generated |
297 | 10 | auto& fs = io::global_local_filesystem(); |
298 | 12 | for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { |
299 | 2 | std::string seg_path = |
300 | 2 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i); |
301 | | // Even if an error is encountered, these files that have not been cleaned up |
302 | | // will be cleaned up by the GC background. So here we only print the error |
303 | | // message when we encounter an error. |
304 | 2 | WARN_IF_ERROR(fs->delete_file(seg_path), |
305 | 2 | fmt::format("Failed to delete file={}", seg_path)); |
306 | 2 | } |
307 | 10 | } |
308 | 1.01k | if (_calc_delete_bitmap_token) { |
309 | 9 | _calc_delete_bitmap_token->cancel(); |
310 | 9 | } |
311 | 1.01k | } |
312 | | |
313 | 1.01k | BetaRowsetWriter::~BetaRowsetWriter() { |
314 | | /* Note that segcompaction is async and in parallel with load job. So we should handle carefully |
315 | | * when the job is cancelled. Although it is meaningless to continue segcompaction when the job |
316 | | * is cancelled, the objects involved in the job should be preserved during segcompaction to |
317 | | * avoid crashs for memory issues. */ |
318 | 1.01k | WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); |
319 | 1.01k | } |
320 | | |
321 | 1.01k | Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
322 | 1.01k | _context = rowset_writer_context; |
323 | 1.01k | DCHECK(_context.tablet_schema != nullptr); |
324 | 1.01k | _rowset_meta.reset(new RowsetMeta); |
325 | 1.01k | if (_context.storage_resource) { |
326 | 0 | _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
327 | 0 | } |
328 | 1.01k | _rowset_meta->set_rowset_id(_context.rowset_id); |
329 | 1.01k | _rowset_meta->set_partition_id(_context.partition_id); |
330 | 1.01k | _rowset_meta->set_tablet_id(_context.tablet_id); |
331 | 1.01k | _rowset_meta->set_db_id(_context.db_id); |
332 | 1.01k | _rowset_meta->set_table_id(_context.table_id); |
333 | 1.01k | _rowset_meta->set_index_id(_context.index_id); |
334 | 1.01k | _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
335 | 1.01k | _rowset_meta->set_rowset_type(_context.rowset_type); |
336 | 1.01k | _rowset_meta->set_rowset_state(_context.rowset_state); |
337 | 1.01k | _rowset_meta->set_segments_overlap(_context.segments_overlap); |
338 | 1.01k | if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
339 | 36 | _is_pending = true; |
340 | 36 | _rowset_meta->set_txn_id(_context.txn_id); |
341 | 36 | _rowset_meta->set_load_id(_context.load_id); |
342 | 979 | } else { |
343 | 979 | _rowset_meta->set_version(_context.version); |
344 | 979 | _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
345 | 979 | } |
346 | 1.01k | _rowset_meta->set_tablet_uid(_context.tablet_uid); |
347 | 1.01k | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
348 | 1.01k | if (_context.write_binlog_opt().enable) { |
349 | 9 | _rowset_meta->mark_row_binlog(); |
350 | 9 | } |
351 | 1.01k | _rowset_meta->set_compaction_level(_context.compaction_level); |
352 | 1.01k | _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
353 | 1.01k | _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
354 | 1.01k | return Status::OK(); |
355 | 1.01k | } |
356 | | |
357 | 1.36k | Status BaseBetaRowsetWriter::add_block(const Block* block) { |
358 | 1.36k | return _segment_creator.add_block(block); |
359 | 1.36k | } |
360 | | |
361 | 63 | Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { |
362 | 63 | SCOPED_RAW_TIMER(&_delete_bitmap_ns); |
363 | 63 | if (_context.is_transient_rowset_writer || |
364 | 63 | !_context.tablet->enable_unique_key_merge_on_write() || |
365 | 63 | (_context.partial_update_info && _context.partial_update_info->is_partial_update())) { |
366 | 0 | return Status::OK(); |
367 | 0 | } |
368 | 63 | std::vector<RowsetSharedPtr> specified_rowsets; |
369 | 63 | { |
370 | 63 | std::shared_lock meta_rlock(_context.tablet->get_header_lock()); |
371 | 63 | specified_rowsets = |
372 | 63 | _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get()); |
373 | 63 | } |
374 | | |
375 | | // Submit the entire delete bitmap calculation process to thread pool for async execution |
376 | | // This avoids blocking memtable flush thread while waiting for file upload to complete |
377 | | // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap |
378 | 63 | return _calc_delete_bitmap_token->submit_func( |
379 | 63 | [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status { |
380 | 63 | Status st = Status::OK(); |
381 | | // Step 1: Close file_writer (must be done before load_segments) |
382 | 63 | auto* file_writer = _seg_files.get(segment_id); |
383 | 63 | if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) { |
384 | 63 | MonotonicStopWatch close_timer; |
385 | 63 | close_timer.start(); |
386 | 63 | st = file_writer->close(); |
387 | 63 | close_timer.stop(); |
388 | | |
389 | 63 | auto close_time_ms = close_timer.elapsed_time_milliseconds(); |
390 | 63 | if (close_time_ms > 1000) { |
391 | 0 | LOG(INFO) << "file_writer->close() took " << close_time_ms |
392 | 0 | << "ms for segment_id=" << segment_id |
393 | 0 | << ", tablet_id=" << _context.tablet_id |
394 | 0 | << ", rowset_id=" << _context.rowset_id; |
395 | 0 | } |
396 | 63 | if (!st.ok()) { |
397 | 0 | return st; |
398 | 0 | } |
399 | 63 | } |
400 | | |
401 | 63 | OlapStopWatch watch; |
402 | | // Step 2: Build tmp rowset (needs file_writer to be closed) |
403 | 63 | RowsetSharedPtr rowset_ptr; |
404 | 63 | st = _build_tmp(rowset_ptr); |
405 | 63 | if (!st.ok()) { |
406 | 0 | return st; |
407 | 0 | } |
408 | | |
409 | | // Step 3: Load segments (needs file_writer to be closed and rowset to be built) |
410 | 63 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
411 | 63 | std::vector<segment_v2::SegmentSharedPtr> segments; |
412 | 63 | st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments); |
413 | 63 | if (!st.ok()) { |
414 | 0 | return st; |
415 | 0 | } |
416 | | |
417 | | // Step 4: Calculate delete bitmap |
418 | 63 | st = BaseTablet::calc_delete_bitmap( |
419 | 63 | _context.tablet, rowset_ptr, segments, specified_rowsets, |
420 | 63 | _context.mow_context->delete_bitmap, _context.mow_context->max_version, |
421 | 63 | nullptr, nullptr, nullptr); |
422 | 63 | if (!st.ok()) { |
423 | 0 | return st; |
424 | 0 | } |
425 | | |
426 | 63 | size_t total_rows = |
427 | 63 | std::accumulate(segments.begin(), segments.end(), 0, |
428 | 63 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { |
429 | 63 | return sum += s->num_rows(); |
430 | 63 | }); |
431 | 63 | LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " |
432 | 63 | << _context.tablet->tablet_id() |
433 | 63 | << ", rowset_ids: " << _context.mow_context->rowset_ids->size() |
434 | 63 | << ", cur max_version: " << _context.mow_context->max_version |
435 | 63 | << ", transaction_id: " << _context.mow_context->txn_id |
436 | 63 | << ", delete_bitmap_count: " |
437 | 63 | << _context.mow_context->delete_bitmap->get_delete_bitmap_count() |
438 | 63 | << ", delete_bitmap_cardinality: " |
439 | 63 | << _context.mow_context->delete_bitmap->cardinality() |
440 | 63 | << ", cost: " << watch.get_elapse_time_us() |
441 | 63 | << "(us), total rows: " << total_rows; |
442 | 63 | return Status::OK(); |
443 | 63 | }); |
444 | 63 | } |
445 | | |
446 | 1.01k | Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
447 | 1.01k | RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); |
448 | 1.01k | if (_segcompaction_worker) { |
449 | 1.01k | _segcompaction_worker->init_mem_tracker(rowset_writer_context); |
450 | 1.01k | } |
451 | 1.01k | if (_context.mow_context != nullptr) { |
452 | 9 | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
453 | 9 | } |
454 | 1.01k | return Status::OK(); |
455 | 1.01k | } |
456 | | |
457 | | Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, |
458 | 77 | int32_t segment_id) { |
459 | 77 | DCHECK(_rowset_meta->is_local()); |
460 | 77 | auto fs = _rowset_meta->fs(); |
461 | 77 | if (!fs) { |
462 | 0 | return Status::Error<INIT_FAILED>( |
463 | 0 | "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); |
464 | 0 | } |
465 | 77 | auto path = |
466 | 77 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id); |
467 | 77 | io::FileReaderOptions reader_options { |
468 | 77 | .cache_type = |
469 | 77 | _context.write_file_cache |
470 | 77 | ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
471 | 0 | : io::FileCachePolicy::NO_CACHE) |
472 | 77 | : io::FileCachePolicy::NO_CACHE, |
473 | 77 | .is_doris_table = true, |
474 | 77 | .cache_base_path {}, |
475 | 77 | .tablet_id = _rowset_meta->tablet_id(), |
476 | 77 | }; |
477 | 77 | auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), |
478 | 77 | _context.tablet_schema, reader_options, &segment); |
479 | 77 | if (!s.ok()) { |
480 | 0 | LOG(WARNING) << "failed to open segment. " << path << ":" << s; |
481 | 0 | return s; |
482 | 0 | } |
483 | 77 | return Status::OK(); |
484 | 77 | } |
485 | | |
486 | | /* policy of segcompaction target selection: |
487 | | * 1. skip big segments |
488 | | * 2. if the consecutive smalls end up with a big, compact the smalls, except |
489 | | * single small |
490 | | * 3. if the consecutive smalls end up with small, compact the smalls if the |
491 | | * length is beyond (config::segcompaction_batch_size / 2) |
492 | | */ |
493 | | Status BetaRowsetWriter::_find_longest_consecutive_small_segment( |
494 | 15 | SegCompactionCandidatesSharedPtr& segments) { |
495 | 15 | segments = std::make_shared<SegCompactionCandidates>(); |
496 | | // skip last (maybe active) segment |
497 | 15 | int32_t last_segment = _num_segment - 1; |
498 | 15 | size_t task_bytes = 0; |
499 | 15 | uint32_t task_rows = 0; |
500 | 15 | int32_t segid; |
501 | 15 | for (segid = _segcompacted_point; |
502 | 86 | segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { |
503 | 77 | segment_v2::SegmentSharedPtr segment; |
504 | 77 | RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); |
505 | 77 | const auto segment_rows = segment->num_rows(); |
506 | 77 | const auto segment_bytes = segment->file_reader()->size(); |
507 | 77 | bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || |
508 | 77 | segment_bytes > config::segcompaction_candidate_max_bytes; |
509 | 77 | if (is_large_segment) { |
510 | 14 | if (segid == _segcompacted_point) { |
511 | | // skip large segments at the front |
512 | 8 | auto dst_seg_id = _num_segcompacted.load(); |
513 | 8 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
514 | 8 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
515 | 4 | _segcompaction_worker->convert_segment_delete_bitmap( |
516 | 4 | _context.mow_context->delete_bitmap, segid, dst_seg_id); |
517 | 4 | } |
518 | 8 | continue; |
519 | 8 | } else { |
520 | | // stop because we need consecutive segments |
521 | 6 | break; |
522 | 6 | } |
523 | 14 | } |
524 | 63 | bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || |
525 | 63 | task_bytes + segment_bytes > config::segcompaction_task_max_bytes; |
526 | 63 | if (is_task_full) { |
527 | 0 | break; |
528 | 0 | } |
529 | 63 | segments->push_back(segment); |
530 | 63 | task_rows += segment->num_rows(); |
531 | 63 | task_bytes += segment->file_reader()->size(); |
532 | 63 | } |
533 | 15 | size_t s = segments->size(); |
534 | 15 | if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { |
535 | | // we didn't collect enough segments, better to do it in next |
536 | | // round to compact more at once |
537 | 0 | segments->clear(); |
538 | 0 | return Status::OK(); |
539 | 0 | } |
540 | 15 | if (s == 1) { // poor bachelor, let it go |
541 | 4 | VLOG_DEBUG << "only one candidate segment"; |
542 | 4 | auto src_seg_id = _segcompacted_point.load(); |
543 | 4 | auto dst_seg_id = _num_segcompacted.load(); |
544 | 4 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
545 | 4 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
546 | 2 | _segcompaction_worker->convert_segment_delete_bitmap( |
547 | 2 | _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); |
548 | 2 | } |
549 | 4 | segments->clear(); |
550 | 4 | return Status::OK(); |
551 | 4 | } |
552 | 11 | if (VLOG_DEBUG_IS_ON) { |
553 | 0 | vlog_buffer.clear(); |
554 | 0 | for (auto& segment : (*segments.get())) { |
555 | 0 | fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); |
556 | 0 | } |
557 | 0 | VLOG_DEBUG << "candidate segments num:" << s |
558 | 0 | << " list of candidates:" << fmt::to_string(vlog_buffer); |
559 | 0 | } |
560 | 11 | return Status::OK(); |
561 | 15 | } |
562 | | |
563 | 11 | Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { |
564 | 11 | int ret; |
565 | 11 | auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
566 | 11 | _context.rowset_id, begin, end); |
567 | 11 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
568 | 11 | _num_segcompacted); |
569 | 11 | ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
570 | 11 | if (ret) { |
571 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
572 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
573 | 0 | errno); |
574 | 0 | } |
575 | 11 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
576 | | |
577 | | // rename inverted index files |
578 | 11 | RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0)); |
579 | | |
580 | 11 | _num_segcompacted++; |
581 | 11 | return Status::OK(); |
582 | 11 | } |
583 | | |
584 | | void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin, |
585 | 43 | uint32_t end) { |
586 | 43 | VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; |
587 | 134 | for (uint32_t i = begin; i <= end; ++i) { |
588 | 91 | _segid_statistics_map.erase(i); |
589 | 91 | } |
590 | 43 | } |
591 | | |
592 | 42 | Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) { |
593 | 42 | if (seg_id == _num_segcompacted) { |
594 | 10 | ++_num_segcompacted; |
595 | 10 | return Status::OK(); |
596 | 10 | } |
597 | | |
598 | 32 | auto src_seg_path = |
599 | 32 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id); |
600 | 32 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
601 | 32 | _num_segcompacted); |
602 | 32 | VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " |
603 | 0 | << dst_seg_path; |
604 | 32 | { |
605 | 32 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
606 | 32 | DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); |
607 | 32 | DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), |
608 | 32 | true); |
609 | 32 | auto org = _segid_statistics_map[seg_id]; |
610 | 32 | _segid_statistics_map.emplace(_num_segcompacted, org); |
611 | 32 | _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); |
612 | 32 | } |
613 | 32 | int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
614 | 32 | if (ret) { |
615 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
616 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
617 | 0 | errno); |
618 | 0 | } |
619 | | |
620 | 32 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
621 | | // rename remaining inverted index files |
622 | 32 | RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id)); |
623 | | |
624 | 32 | ++_num_segcompacted; |
625 | 32 | return Status::OK(); |
626 | 32 | } |
627 | | |
628 | | Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id, |
629 | 43 | const std::string& segment_path) { |
630 | 43 | auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
631 | 43 | if (!footer_page_cache) { |
632 | 0 | return Status::OK(); |
633 | 0 | } |
634 | | |
635 | 43 | auto fs = _rowset_meta->fs(); |
636 | 43 | bool exists = false; |
637 | 43 | RETURN_IF_ERROR(fs->exists(segment_path, &exists)); |
638 | 43 | if (exists) { |
639 | 43 | io::FileReaderSPtr file_reader; |
640 | 43 | io::FileReaderOptions reader_options { |
641 | 43 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
642 | 43 | : io::FileCachePolicy::NO_CACHE, |
643 | 43 | .is_doris_table = true, |
644 | 43 | .cache_base_path = "", |
645 | 43 | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
646 | 43 | .tablet_id = _rowset_meta->tablet_id(), |
647 | 43 | }; |
648 | 43 | RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options)); |
649 | 43 | DCHECK(file_reader != nullptr); |
650 | 43 | auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader); |
651 | 43 | footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE); |
652 | 43 | } |
653 | 43 | return Status::OK(); |
654 | 43 | } |
655 | | |
656 | 43 | Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) { |
657 | 43 | int ret; |
658 | | |
659 | 43 | auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path, |
660 | 32 | _context.rowset_id.to_string(), seg_id) |
661 | 43 | : BetaRowset::local_segment_path_segcompacted( |
662 | 11 | _context.tablet_path, _context.rowset_id, begin, end); |
663 | 43 | auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path); |
664 | 43 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
665 | 43 | _num_segcompacted); |
666 | 43 | auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path); |
667 | | |
668 | 43 | if (_context.tablet_schema->get_inverted_index_storage_format() >= |
669 | 43 | InvertedIndexStorageFormatPB::V2) { |
670 | 22 | if (_context.tablet_schema->has_inverted_index() || |
671 | 22 | _context.tablet_schema->has_ann_index()) { |
672 | 22 | auto src_idx_path = |
673 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix); |
674 | 22 | auto dst_idx_path = |
675 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix); |
676 | | |
677 | 22 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
678 | 22 | if (ret) { |
679 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
680 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path, |
681 | 0 | ret, errno); |
682 | 0 | } |
683 | 22 | } |
684 | 22 | } |
685 | | // rename remaining inverted index files |
686 | 150 | for (auto column : _context.tablet_schema->columns()) { |
687 | 150 | auto index_infos = _context.tablet_schema->inverted_indexs(*column); |
688 | 150 | for (const auto& index_info : index_infos) { |
689 | 44 | auto index_id = index_info->index_id(); |
690 | 44 | if (_context.tablet_schema->get_inverted_index_storage_format() == |
691 | 44 | InvertedIndexStorageFormatPB::V1) { |
692 | 0 | auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
693 | 0 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
694 | 0 | auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
695 | 0 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
696 | 0 | VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to " |
697 | 0 | << dst_idx_path; |
698 | 0 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
699 | 0 | if (ret) { |
700 | 0 | return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>( |
701 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, |
702 | 0 | dst_idx_path, ret, errno); |
703 | 0 | } |
704 | 0 | } |
705 | | // Erase the origin index file cache |
706 | 44 | auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
707 | 44 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
708 | 44 | auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
709 | 44 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
710 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key)); |
711 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key)); |
712 | 44 | } |
713 | 150 | } |
714 | 43 | return Status::OK(); |
715 | 43 | } |
716 | | |
717 | 1.20k | Status BetaRowsetWriter::_segcompaction_if_necessary() { |
718 | 1.20k | Status status = Status::OK(); |
719 | | // if not doing segcompaction, just check segment number |
720 | 1.20k | if (!config::enable_segcompaction || !_context.enable_segcompaction || |
721 | 1.20k | _context.tablet_schema->num_variant_columns() > 0) { |
722 | 1.08k | return _check_segment_number_limit(_num_segment); |
723 | 1.08k | } |
724 | | // leave _is_doing_segcompaction as the last condition |
725 | | // otherwise _segcompacting_cond will never get notified |
726 | 121 | if (_is_doing_segcompaction.exchange(true)) { |
727 | 0 | return status; |
728 | 0 | } |
729 | 121 | if (_segcompaction_status.load() != OK) { |
730 | 0 | status = Status::Error<SEGCOMPACTION_FAILED>( |
731 | 0 | "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", |
732 | 0 | _segcompaction_status.load()); |
733 | 121 | } else { |
734 | 121 | status = _check_segment_number_limit(_num_segcompacted); |
735 | 121 | } |
736 | 121 | if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { |
737 | 15 | SegCompactionCandidatesSharedPtr segments; |
738 | 15 | status = _find_longest_consecutive_small_segment(segments); |
739 | 15 | if (LIKELY(status.ok()) && (!segments->empty())) { |
740 | 11 | LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id |
741 | 11 | << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment |
742 | 11 | << ", segcompacted_point:" << _segcompacted_point; |
743 | 11 | status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); |
744 | 11 | if (status.ok()) { |
745 | 11 | return status; |
746 | 11 | } |
747 | 11 | } |
748 | 15 | } |
749 | 110 | { |
750 | 110 | std::lock_guard lk(_is_doing_segcompaction_lock); |
751 | 110 | _is_doing_segcompaction = false; |
752 | 110 | _segcompacting_cond.notify_all(); |
753 | 110 | } |
754 | 110 | return status; |
755 | 121 | } |
756 | | |
757 | 654 | Status BetaRowsetWriter::_segcompaction_rename_last_segments() { |
758 | 654 | DCHECK_EQ(_is_doing_segcompaction, false); |
759 | 654 | if (!config::enable_segcompaction) { |
760 | 144 | return Status::OK(); |
761 | 144 | } |
762 | 510 | if (_segcompaction_status.load() != OK) { |
763 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
764 | 0 | "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error " |
765 | 0 | "code: {}", |
766 | 0 | _segcompaction_status.load()); |
767 | 0 | } |
768 | 510 | if (!is_segcompacted() || _segcompacted_point == _num_segment) { |
769 | | // no need if never segcompact before or all segcompacted |
770 | 501 | return Status::OK(); |
771 | 501 | } |
772 | | // currently we only rename remaining segments to reduce wait time |
773 | | // so that transaction can be committed ASAP |
774 | 9 | VLOG_DEBUG << "segcompaction last few segments"; |
775 | 39 | for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { |
776 | 30 | auto dst_segid = _num_segcompacted.load(); |
777 | 30 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
778 | 30 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
779 | 16 | _segcompaction_worker->convert_segment_delete_bitmap( |
780 | 16 | _context.mow_context->delete_bitmap, segid, dst_segid); |
781 | 16 | } |
782 | 30 | } |
783 | 9 | return Status::OK(); |
784 | 9 | } |
785 | | |
786 | 9 | Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { |
787 | 9 | assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); |
788 | 9 | RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); |
789 | 9 | _num_rows_written += rowset->num_rows(); |
790 | 9 | const auto& rowset_meta = rowset->rowset_meta(); |
791 | 9 | auto index_size = rowset_meta->index_disk_size(); |
792 | 9 | auto total_size = rowset_meta->total_disk_size(); |
793 | 9 | auto data_size = rowset_meta->data_disk_size(); |
794 | | // corrupted index size caused by bug before 2.1.5 or 3.0.0 version |
795 | | // try to get real index size from disk. |
796 | 9 | if (index_size < 0 || index_size > total_size * 2) { |
797 | 0 | LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size |
798 | 0 | << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id() |
799 | 0 | << " rowset:" << rowset_meta->rowset_id(); |
800 | 0 | index_size = 0; |
801 | 0 | auto st = rowset->get_inverted_index_size(&index_size); |
802 | 0 | if (!st.ok()) { |
803 | 0 | if (!st.is<NOT_FOUND>()) { |
804 | 0 | LOG(ERROR) << "failed to get inverted index size. res=" << st; |
805 | 0 | return st; |
806 | 0 | } |
807 | 0 | } |
808 | 0 | } |
809 | 9 | _total_data_size += data_size; |
810 | 9 | _total_index_size += index_size; |
811 | 9 | _num_segment += cast_set<int32_t>(rowset->num_segments()); |
812 | | // append key_bounds to current rowset |
813 | 9 | RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); |
814 | 9 | rowset->get_num_segment_rows(&_segment_num_rows); |
815 | 9 | _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); |
816 | | |
817 | | // TODO update zonemap |
818 | 9 | if (rowset->rowset_meta()->has_delete_predicate()) { |
819 | 0 | _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate()); |
820 | 0 | } |
821 | | // Update the tablet schema in the rowset metadata if the tablet schema contains a variant. |
822 | | // During the build process, _context.tablet_schema will be used as the rowset schema. |
823 | | // This situation may arise in the event of a linked schema change. If this schema is not set, |
824 | | // the subcolumns of the variant will be lost. |
825 | 9 | if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) { |
826 | 0 | _context.tablet_schema = rowset->tablet_schema(); |
827 | 0 | } |
828 | 9 | return Status::OK(); |
829 | 9 | } |
830 | | |
831 | 0 | Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { |
832 | | // TODO use schema_mapping to transfer zonemap |
833 | 0 | return add_rowset(rowset); |
834 | 0 | } |
835 | | |
836 | 1.15k | Status BaseBetaRowsetWriter::flush() { |
837 | 1.15k | return _segment_creator.flush(); |
838 | 1.15k | } |
839 | | |
840 | 12 | Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) { |
841 | 12 | if (block->rows() == 0) { |
842 | 0 | return Status::OK(); |
843 | 0 | } |
844 | | |
845 | 12 | { |
846 | 12 | SCOPED_RAW_TIMER(&_segment_writer_ns); |
847 | 12 | RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); |
848 | 12 | } |
849 | 12 | return Status::OK(); |
850 | 12 | } |
851 | | |
852 | 4 | Status BaseBetaRowsetWriter::flush_single_block(const Block* block) { |
853 | 4 | return _segment_creator.flush_single_block(block); |
854 | 4 | } |
855 | | |
856 | 1.01k | Status BetaRowsetWriter::_wait_flying_segcompaction() { |
857 | 1.01k | std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); |
858 | 1.01k | uint64_t begin_wait = GetCurrentTimeMicros(); |
859 | 1.01k | while (_is_doing_segcompaction) { |
860 | | // change sync wait to async? |
861 | 0 | _segcompacting_cond.wait(l); |
862 | 0 | } |
863 | 1.01k | uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; |
864 | 1.01k | if (elapsed >= MICROS_PER_SEC) { |
865 | 0 | LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; |
866 | 0 | } |
867 | 1.01k | if (_segcompaction_status.load() != OK) { |
868 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
869 | 0 | "BetaRowsetWriter meet invalid state, error code: {}", |
870 | 0 | _segcompaction_status.load()); |
871 | 0 | } |
872 | 1.01k | return Status::OK(); |
873 | 1.01k | } |
874 | | |
875 | 24 | RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { |
876 | 24 | if (_rowset_meta->newest_write_timestamp() == -1) { |
877 | 0 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
878 | 0 | } |
879 | | |
880 | 24 | build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); |
881 | 24 | RowsetSharedPtr rowset; |
882 | 24 | auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, |
883 | 24 | _rowset_meta, &rowset); |
884 | 24 | if (!status.ok()) { |
885 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
886 | 0 | return nullptr; |
887 | 0 | } |
888 | 24 | _already_built = true; |
889 | 24 | return rowset; |
890 | 24 | } |
891 | | |
892 | 654 | Status BaseBetaRowsetWriter::_close_file_writers() { |
893 | | // Flush and close segment files |
894 | 654 | RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), |
895 | 654 | "failed to close segment creator when build new rowset"); |
896 | 654 | return Status::OK(); |
897 | 654 | } |
898 | | |
899 | 654 | Status BetaRowsetWriter::_close_file_writers() { |
900 | 654 | RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); |
901 | | // if _segment_start_id is not zero, that means it's a transient rowset writer for |
902 | | // MoW partial update, don't need to do segment compaction. |
903 | 654 | if (_segment_start_id == 0) { |
904 | 654 | if (_segcompaction_worker->cancel()) { |
905 | 654 | std::lock_guard lk(_is_doing_segcompaction_lock); |
906 | 654 | _is_doing_segcompaction = false; |
907 | 654 | _segcompacting_cond.notify_all(); |
908 | 654 | } else { |
909 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), |
910 | 0 | "segcompaction failed when build new rowset"); |
911 | 0 | } |
912 | 654 | RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), |
913 | 654 | "rename last segments failed when build new rowset"); |
914 | | // segcompaction worker would do file wrier's close function in compact_segments |
915 | 654 | if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer(); |
916 | 654 | nullptr != seg_comp_file_writer && |
917 | 654 | seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) { |
918 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(), |
919 | 0 | "close segment compaction worker failed"); |
920 | 0 | } |
921 | | // process delete bitmap for mow table |
922 | 654 | if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { |
923 | 4 | auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); |
924 | | // which means the segment compaction is triggerd |
925 | 4 | if (converted_delete_bitmap != nullptr) { |
926 | 4 | RowsetIdUnorderedSet rowsetids; |
927 | 4 | rowsetids.insert(rowset_id()); |
928 | 4 | context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), |
929 | 4 | rowsetids); |
930 | 4 | context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, |
931 | 4 | {rowset_id(), UINT32_MAX, INT64_MAX}); |
932 | 4 | context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); |
933 | 4 | } |
934 | 4 | } |
935 | 654 | } |
936 | 654 | return Status::OK(); |
937 | 654 | } |
938 | | |
939 | 982 | Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { |
940 | 982 | if (_calc_delete_bitmap_token != nullptr) { |
941 | 9 | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
942 | 9 | } |
943 | 982 | RETURN_IF_ERROR(_close_file_writers()); |
944 | 982 | const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; |
945 | 982 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), |
946 | 982 | "too many segments when build new rowset"); |
947 | 982 | RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true)); |
948 | 982 | if (_is_pending) { |
949 | 30 | _rowset_meta->set_rowset_state(COMMITTED); |
950 | 952 | } else { |
951 | 952 | _rowset_meta->set_rowset_state(VISIBLE); |
952 | 952 | } |
953 | | |
954 | 982 | if (_rowset_meta->newest_write_timestamp() == -1) { |
955 | 633 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
956 | 633 | } |
957 | | |
958 | 982 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
959 | | |
960 | | // If segment compaction occurs, the idx file info will become inaccurate. |
961 | 982 | if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) && |
962 | 982 | _num_segcompacted == 0) { |
963 | 106 | if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
964 | 106 | !idx_files_info.has_value()) [[unlikely]] { |
965 | 0 | LOG(ERROR) << "expected inverted index files info, but none presents: " |
966 | 0 | << idx_files_info.error(); |
967 | 106 | } else { |
968 | 106 | _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
969 | 106 | } |
970 | 106 | } |
971 | | |
972 | 982 | RETURN_NOT_OK_STATUS_WITH_WARN( |
973 | 982 | RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
974 | 982 | &rowset), |
975 | 982 | "rowset init failed when build new rowset"); |
976 | 982 | _already_built = true; |
977 | 982 | return Status::OK(); |
978 | 982 | } |
979 | | |
980 | 0 | int64_t BaseBetaRowsetWriter::_num_seg() const { |
981 | 0 | return _num_segment; |
982 | 0 | } |
983 | | |
984 | 1.04k | int64_t BetaRowsetWriter::_num_seg() const { |
985 | 1.04k | return is_segcompacted() ? _num_segcompacted : _num_segment; |
986 | 1.04k | } |
987 | | |
988 | 1.04k | Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
989 | 1.04k | int64_t num_rows_written = 0; |
990 | 1.04k | int64_t total_data_size = 0; |
991 | 1.04k | int64_t total_index_size = 0; |
992 | 1.04k | std::vector<KeyBoundsPB> segments_encoded_key_bounds; |
993 | 1.04k | std::vector<uint32_t> segment_rows; |
994 | 1.04k | { |
995 | 1.04k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
996 | 1.50k | for (const auto& itr : _segid_statistics_map) { |
997 | 1.50k | num_rows_written += itr.second.row_num; |
998 | 1.50k | total_data_size += itr.second.data_size; |
999 | 1.50k | total_index_size += itr.second.index_size; |
1000 | 1.50k | segments_encoded_key_bounds.push_back(itr.second.key_bounds); |
1001 | | // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load |
1002 | 1.50k | segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num)); |
1003 | 1.50k | } |
1004 | 1.04k | } |
1005 | 1.04k | if (segment_rows.empty()) { |
1006 | | // vertical compaction and linked schema change will not record segment statistics, |
1007 | | // it will record segment rows in _segment_num_rows |
1008 | 424 | RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); |
1009 | 424 | } |
1010 | | |
1011 | 1.18k | for (auto& key_bound : _segments_encoded_key_bounds) { |
1012 | 1.18k | segments_encoded_key_bounds.push_back(key_bound); |
1013 | 1.18k | } |
1014 | 1.04k | if (_segments_key_bounds_truncated.has_value()) { |
1015 | 9 | rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); |
1016 | 9 | } |
1017 | 1.04k | rowset_meta->set_num_segment_rows(segment_rows); |
1018 | | // segment key bounds are empty in old version(before version 1.2.x). So we should not modify |
1019 | | // the overlap property when key bounds are empty. |
1020 | | // for mow table with cluster keys, the overlap is used for cluster keys, |
1021 | | // the key_bounds is primary keys |
1022 | 1.04k | if (!segments_encoded_key_bounds.empty() && |
1023 | 1.04k | !is_segment_overlapping(segments_encoded_key_bounds) && |
1024 | 1.04k | _context.tablet_schema->cluster_key_uids().empty()) { |
1025 | 559 | rowset_meta->set_segments_overlap(NONOVERLAPPING); |
1026 | 559 | } |
1027 | | |
1028 | 1.04k | auto segment_num = _num_seg(); |
1029 | 1.04k | if (check_segment_num && config::check_segment_when_build_rowset_meta) { |
1030 | 0 | auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size(); |
1031 | 0 | if (segments_encoded_key_bounds_size != segment_num) { |
1032 | 0 | return Status::InternalError( |
1033 | 0 | "segments_encoded_key_bounds_size should equal to _num_seg, " |
1034 | 0 | "segments_encoded_key_bounds_size " |
1035 | 0 | "is: {}, _num_seg is: {}", |
1036 | 0 | segments_encoded_key_bounds_size, segment_num); |
1037 | 0 | } |
1038 | 0 | if (segment_rows.size() != segment_num) { |
1039 | 0 | return Status::InternalError( |
1040 | 0 | "segment_rows size should equal to _num_seg, segment_rows size is: {}, " |
1041 | 0 | "_num_seg is {}, tablet={}, rowset={}, txn={}", |
1042 | 0 | segment_rows.size(), segment_num, _context.tablet_id, |
1043 | 0 | _context.rowset_id.to_string(), _context.txn_id); |
1044 | 0 | } |
1045 | 0 | } |
1046 | | |
1047 | 1.04k | rowset_meta->set_num_segments(segment_num); |
1048 | 1.04k | rowset_meta->set_num_rows(num_rows_written + _num_rows_written); |
1049 | 1.04k | rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + |
1050 | 1.04k | _total_index_size); |
1051 | 1.04k | rowset_meta->set_data_disk_size(total_data_size + _total_data_size); |
1052 | 1.04k | rowset_meta->set_index_disk_size(total_index_size + _total_index_size); |
1053 | 1.04k | bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds && |
1054 | 1.04k | !_context.enable_unique_key_merge_on_write; |
1055 | 1.04k | rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds); |
1056 | | // TODO write zonemap to meta |
1057 | 1.04k | rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); |
1058 | 1.04k | rowset_meta->set_creation_time(time(nullptr)); |
1059 | 1.04k | return Status::OK(); |
1060 | 1.04k | } |
1061 | | |
1062 | 63 | Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { |
1063 | 63 | Status status; |
1064 | 63 | std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>(); |
1065 | 63 | tmp_rs_meta->init(_rowset_meta.get()); |
1066 | | |
1067 | 63 | status = _build_rowset_meta(tmp_rs_meta.get()); |
1068 | 63 | if (!status.ok()) { |
1069 | 0 | LOG(WARNING) << "failed to build rowset meta, res=" << status; |
1070 | 0 | return status; |
1071 | 0 | } |
1072 | | |
1073 | 63 | status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta, |
1074 | 63 | &rowset_ptr); |
1075 | 63 | DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed", |
1076 | 63 | { status = Status::InternalError("create rowset failed"); }); |
1077 | 63 | if (!status.ok()) { |
1078 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
1079 | 0 | return status; |
1080 | 0 | } |
1081 | 63 | return Status::OK(); |
1082 | 63 | } |
1083 | | |
1084 | | Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, |
1085 | | io::FileWriterPtr& file_writer, |
1086 | 2.68k | bool is_index_file) { |
1087 | 2.68k | io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file); |
1088 | 2.68k | Status st = _context.fs()->create_file(path, &file_writer, &opts); |
1089 | 2.68k | if (!st.ok()) { |
1090 | 0 | LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; |
1091 | 0 | return st; |
1092 | 0 | } |
1093 | | |
1094 | 2.68k | DCHECK(file_writer != nullptr); |
1095 | 2.68k | return Status::OK(); |
1096 | 2.68k | } |
1097 | | |
1098 | | Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, |
1099 | 2.66k | FileType file_type) { |
1100 | 2.66k | auto segment_path = _context.segment_path(segment_id); |
1101 | 2.66k | if (file_type == FileType::INVERTED_INDEX_FILE) { |
1102 | 255 | std::string prefix = |
1103 | 255 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; |
1104 | 255 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1105 | 255 | return _create_file_writer(index_path, file_writer, true /* is_index_file */); |
1106 | 2.41k | } else if (file_type == FileType::SEGMENT_FILE) { |
1107 | 2.41k | return _create_file_writer(segment_path, file_writer, false /* is_index_file */); |
1108 | 2.41k | } |
1109 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1110 | 0 | fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); |
1111 | 2.66k | } |
1112 | | |
1113 | | Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id, |
1114 | 228 | IndexFileWriterPtr* index_file_writer) { |
1115 | 228 | RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer)); |
1116 | | // used for inverted index format v1 |
1117 | 228 | (*index_file_writer) |
1118 | 228 | ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */)); |
1119 | 228 | return Status::OK(); |
1120 | 228 | } |
1121 | | |
1122 | | Status BetaRowsetWriter::create_segment_writer_for_segcompaction( |
1123 | 12 | std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { |
1124 | 12 | DCHECK(begin >= 0 && end >= 0); |
1125 | 12 | std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
1126 | 12 | _context.rowset_id, begin, end); |
1127 | 12 | io::FileWriterPtr file_writer; |
1128 | 12 | RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */)); |
1129 | | |
1130 | 12 | IndexFileWriterPtr index_file_writer; |
1131 | 12 | if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
1132 | 8 | io::FileWriterPtr idx_file_writer; |
1133 | 8 | std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path)); |
1134 | 8 | if (_context.tablet_schema->get_inverted_index_storage_format() != |
1135 | 8 | InvertedIndexStorageFormatPB::V1) { |
1136 | 8 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1137 | 8 | RETURN_IF_ERROR( |
1138 | 8 | _create_file_writer(index_path, idx_file_writer, true /* is_index_file */)); |
1139 | 8 | } |
1140 | 8 | index_file_writer = std::make_unique<IndexFileWriter>( |
1141 | 8 | _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, |
1142 | 8 | _context.tablet_schema->get_inverted_index_storage_format(), |
1143 | 8 | std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id); |
1144 | 8 | } |
1145 | | |
1146 | 12 | segment_v2::SegmentWriterOptions writer_options; |
1147 | 12 | writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; |
1148 | 12 | writer_options.rowset_ctx = &_context; |
1149 | 12 | writer_options.write_type = _context.write_type; |
1150 | 12 | writer_options.write_type = DataWriteType::TYPE_COMPACTION; |
1151 | 12 | writer_options.max_rows_per_segment = _context.max_rows_per_segment; |
1152 | 12 | writer_options.mow_ctx = _context.mow_context; |
1153 | | |
1154 | 12 | *writer = std::make_unique<segment_v2::SegmentWriter>( |
1155 | 12 | file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, |
1156 | 12 | _context.data_dir, writer_options, index_file_writer.get()); |
1157 | 12 | if (auto& seg_writer = _segcompaction_worker->get_file_writer(); |
1158 | 12 | seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { |
1159 | 0 | RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); |
1160 | 0 | } |
1161 | 12 | _segcompaction_worker->get_file_writer().reset(file_writer.release()); |
1162 | 12 | if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); |
1163 | 12 | idx_file_writer != nullptr) { |
1164 | 0 | RETURN_IF_ERROR(idx_file_writer->begin_close()); |
1165 | 0 | RETURN_IF_ERROR(idx_file_writer->finish_close()); |
1166 | 0 | } |
1167 | 12 | _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); |
1168 | 12 | return Status::OK(); |
1169 | 12 | } |
1170 | | |
1171 | 0 | Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1172 | 0 | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1173 | 0 | { segnum = dp->param("segnum", 1024); }); |
1174 | 0 | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1175 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1176 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " |
1177 | 0 | "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " |
1178 | 0 | "small or if the data is skewed.", |
1179 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1180 | 0 | config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows()); |
1181 | 0 | } |
1182 | 0 | return Status::OK(); |
1183 | 0 | } |
1184 | | |
1185 | 2.19k | Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1186 | 2.19k | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1187 | 2.19k | { segnum = dp->param("segnum", 1024); }); |
1188 | 2.19k | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1189 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1190 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " |
1191 | 0 | "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " |
1192 | 0 | "the bucket number is too small or if the data is skewed.", |
1193 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1194 | 0 | config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, |
1195 | 0 | _num_segcompacted, get_rowset_num_rows()); |
1196 | 0 | } |
1197 | 2.19k | return Status::OK(); |
1198 | 2.19k | } |
1199 | | |
1200 | 1.20k | Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1201 | 1.20k | uint32_t segid_offset = segment_id - _segment_start_id; |
1202 | 1.20k | { |
1203 | 1.20k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1204 | 1.20k | CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true); |
1205 | 1.20k | _segid_statistics_map.emplace(segment_id, segstat); |
1206 | 1.20k | if (segment_id >= _segment_num_rows.size()) { |
1207 | 1.20k | _segment_num_rows.resize(segment_id + 1); |
1208 | 1.20k | } |
1209 | 1.20k | _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num); |
1210 | 1.20k | } |
1211 | 1.20k | VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id |
1212 | 0 | << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size |
1213 | 0 | << " index_size:" << segstat.index_size; |
1214 | | |
1215 | 1.20k | { |
1216 | 1.20k | std::lock_guard<std::mutex> lock(_segment_set_mutex); |
1217 | 1.20k | _segment_set.add(segid_offset); |
1218 | 2.41k | while (_segment_set.contains(_num_segment)) { |
1219 | 1.20k | _num_segment++; |
1220 | 1.20k | } |
1221 | 1.20k | } |
1222 | | |
1223 | 1.20k | if (_context.mow_context != nullptr) { |
1224 | 63 | RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); |
1225 | 63 | } |
1226 | 1.20k | return Status::OK(); |
1227 | 1.20k | } |
1228 | | |
1229 | 1.20k | Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1230 | 1.20k | RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat)); |
1231 | 1.20k | return _segcompaction_if_necessary(); |
1232 | 1.20k | } |
1233 | | |
1234 | | Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( |
1235 | | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, |
1236 | 11 | KeyBoundsPB& key_bounds) { |
1237 | 11 | uint32_t segid = (*writer)->get_segment_id(); |
1238 | 11 | uint32_t row_num = (*writer)->row_count(); |
1239 | 11 | uint64_t segment_size; |
1240 | | |
1241 | 11 | auto s = (*writer)->finalize_footer(&segment_size); |
1242 | 11 | if (!s.ok()) { |
1243 | 0 | return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}", |
1244 | 0 | s.to_string()); |
1245 | 0 | } |
1246 | 11 | int64_t inverted_index_file_size = 0; |
1247 | 11 | RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size)); |
1248 | | |
1249 | 11 | SegmentStatistics segstat; |
1250 | 11 | segstat.row_num = row_num; |
1251 | 11 | segstat.data_size = segment_size; |
1252 | 11 | segstat.index_size = inverted_index_file_size; |
1253 | 11 | segstat.key_bounds = key_bounds; |
1254 | 11 | { |
1255 | 11 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1256 | 11 | CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); |
1257 | 11 | _segid_statistics_map.emplace(segid, segstat); |
1258 | 11 | } |
1259 | 11 | VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num |
1260 | 0 | << " data_size:" << PrettyPrinter::print_bytes(segment_size) |
1261 | 0 | << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size); |
1262 | | |
1263 | 11 | writer->reset(); |
1264 | | |
1265 | 11 | return Status::OK(); |
1266 | 11 | } |
1267 | | |
1268 | | } // namespace doris |