be/src/storage/rowset/beta_rowset_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset_writer.h" |
19 | | |
20 | | #include <assert.h> |
21 | | // IWYU pragma: no_include <bthread/errno.h> |
22 | | #include <errno.h> // IWYU pragma: keep |
23 | | #include <fmt/format.h> |
24 | | #include <stdio.h> |
25 | | |
26 | | #include <chrono> |
27 | | #include <ctime> // time |
28 | | #include <filesystem> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <thread> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
37 | | #include "common/cast_set.h" |
38 | | #include "common/compiler_util.h" // IWYU pragma: keep |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/block/block.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/data_type/data_type_factory.hpp" |
45 | | #include "io/fs/file_reader.h" |
46 | | #include "io/fs/file_system.h" |
47 | | #include "io/fs/file_writer.h" |
48 | | #include "runtime/thread_context.h" |
49 | | #include "storage/index/inverted/inverted_index_cache.h" |
50 | | #include "storage/index/inverted/inverted_index_desc.h" |
51 | | #include "storage/olap_define.h" |
52 | | #include "storage/rowset/beta_rowset.h" |
53 | | #include "storage/rowset/rowset_factory.h" |
54 | | #include "storage/rowset/rowset_writer.h" |
55 | | #include "storage/rowset/segcompaction.h" |
56 | | #include "storage/schema_change/schema_change.h" |
57 | | #include "storage/segment/segment.h" |
58 | | #include "storage/segment/segment_writer.h" |
59 | | #include "storage/storage_engine.h" |
60 | | #include "storage/tablet/tablet_schema.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/pretty_printer.h" |
63 | | #include "util/slice.h" |
64 | | #include "util/stopwatch.hpp" |
65 | | #include "util/time.h" |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | | namespace { |
71 | | |
72 | 701 | bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { |
73 | 701 | std::string_view last; |
74 | 1.85k | for (auto&& segment_encode_key : segments_encoded_key_bounds) { |
75 | 1.85k | auto&& cur_min = segment_encode_key.min_key(); |
76 | 1.85k | auto&& cur_max = segment_encode_key.max_key(); |
77 | 1.85k | if (cur_min <= last) { |
78 | 143 | return true; |
79 | 143 | } |
80 | 1.70k | last = cur_max; |
81 | 1.70k | } |
82 | 558 | return false; |
83 | 701 | } |
84 | | |
85 | | void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, |
86 | 24 | const RowsetMeta& spec_rowset_meta) { |
87 | 24 | rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); |
88 | 24 | rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); |
89 | 24 | rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size()); |
90 | 24 | rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); |
91 | | // TODO write zonemap to meta |
92 | 24 | rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); |
93 | 24 | rowset_meta.set_creation_time(time(nullptr)); |
94 | 24 | rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); |
95 | 24 | rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); |
96 | 24 | rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); |
97 | 24 | rowset_meta.set_segments_key_bounds_truncated( |
98 | 24 | spec_rowset_meta.is_segments_key_bounds_truncated()); |
99 | 24 | std::vector<KeyBoundsPB> segments_key_bounds; |
100 | 24 | spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); |
101 | | // Preserve source layout: if source was aggregated (size 1), re-aggregating |
102 | | // the single entry is a no-op that also keeps the flag consistent. |
103 | 24 | rowset_meta.set_segments_key_bounds(segments_key_bounds, |
104 | 24 | spec_rowset_meta.is_segments_key_bounds_aggregated()); |
105 | 24 | std::vector<uint32_t> num_segment_rows; |
106 | 24 | spec_rowset_meta.get_num_segment_rows(&num_segment_rows); |
107 | 24 | rowset_meta.set_num_segment_rows(num_segment_rows); |
108 | 24 | if (spec_rowset_meta.is_row_binlog()) { |
109 | 0 | rowset_meta.mark_row_binlog(); |
110 | 0 | } |
111 | 24 | } |
112 | | |
113 | | } // namespace |
114 | | |
115 | 1.00k | SegmentFileCollection::~SegmentFileCollection() = default; |
116 | | |
117 | 2.38k | Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { |
118 | 2.38k | std::lock_guard lock(_lock); |
119 | 2.38k | if (_closed) [[unlikely]] { |
120 | 0 | DCHECK(false) << writer->path(); |
121 | 0 | return Status::InternalError("add to closed SegmentFileCollection"); |
122 | 0 | } |
123 | | |
124 | 2.38k | _file_writers.emplace(seg_id, std::move(writer)); |
125 | 2.38k | return Status::OK(); |
126 | 2.38k | } |
127 | | |
128 | 63 | io::FileWriter* SegmentFileCollection::get(int seg_id) const { |
129 | 63 | std::lock_guard lock(_lock); |
130 | 63 | if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { |
131 | 63 | return it->second.get(); |
132 | 63 | } else { |
133 | 0 | return nullptr; |
134 | 0 | } |
135 | 63 | } |
136 | | |
137 | 969 | Status SegmentFileCollection::close() { |
138 | 969 | { |
139 | 969 | std::lock_guard lock(_lock); |
140 | 969 | if (_closed) [[unlikely]] { |
141 | 0 | DCHECK(false); |
142 | 0 | return Status::InternalError("double close SegmentFileCollection"); |
143 | 0 | } |
144 | 969 | _closed = true; |
145 | 969 | } |
146 | | |
147 | 2.38k | for (auto&& [_, writer] : _file_writers) { |
148 | 2.38k | DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", { |
149 | 2.38k | auto before_state = writer->state(); |
150 | 2.38k | for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) { |
151 | 2.38k | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
152 | 2.38k | } |
153 | 2.38k | LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path=" |
154 | 2.38k | << writer->path().native() |
155 | 2.38k | << " before_state=" << static_cast<int>(before_state) |
156 | 2.38k | << " after_state=" << static_cast<int>(writer->state()); |
157 | 2.38k | }); |
158 | 2.38k | if (writer->state() != io::FileWriter::State::CLOSED) { |
159 | 2.31k | RETURN_IF_ERROR(writer->close()); |
160 | 2.31k | } |
161 | 2.38k | } |
162 | | |
163 | 969 | return Status::OK(); |
164 | 969 | } |
165 | | |
166 | 0 | Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) { |
167 | 0 | std::lock_guard lock(_lock); |
168 | 0 | if (!_closed) [[unlikely]] { |
169 | 0 | DCHECK(false); |
170 | 0 | return ResultError(Status::InternalError("get segments file size without closed")); |
171 | 0 | } |
172 | | |
173 | 0 | Status st; |
174 | 0 | std::vector<size_t> seg_file_size(_file_writers.size(), 0); |
175 | 0 | bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { |
176 | 0 | auto&& [seg_id, writer] = it; |
177 | |
|
178 | 0 | int idx = seg_id - seg_id_offset; |
179 | 0 | if (idx >= seg_file_size.size()) [[unlikely]] { |
180 | 0 | auto err_msg = fmt::format( |
181 | 0 | "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, |
182 | 0 | seg_file_size.size(), seg_id_offset, writer->path().native()); |
183 | 0 | DCHECK(false) << err_msg; |
184 | 0 | st = Status::InternalError(err_msg); |
185 | 0 | return false; |
186 | 0 | } |
187 | | |
188 | 0 | auto& fsize = seg_file_size[idx]; |
189 | 0 | if (fsize != 0) { |
190 | | // File size should not been set |
191 | 0 | auto err_msg = |
192 | 0 | fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); |
193 | 0 | DCHECK(false) << err_msg; |
194 | 0 | st = Status::InternalError(err_msg); |
195 | 0 | return false; |
196 | 0 | } |
197 | | |
198 | 0 | fsize = writer->bytes_appended(); |
199 | 0 | if (fsize <= 0) { |
200 | 0 | auto err_msg = |
201 | 0 | fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); |
202 | 0 | DCHECK(false) << err_msg; |
203 | 0 | st = Status::InternalError(err_msg); |
204 | 0 | return false; |
205 | 0 | } |
206 | | |
207 | 0 | return true; |
208 | 0 | }); |
209 | |
|
210 | 0 | if (succ) { |
211 | 0 | return seg_file_size; |
212 | 0 | } |
213 | | |
214 | 0 | return ResultError(st); |
215 | 0 | } |
216 | | |
217 | 1.00k | InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; |
218 | | |
219 | 265 | Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) { |
220 | 265 | std::lock_guard lock(_lock); |
221 | 265 | if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end()) |
222 | 0 | [[unlikely]] { |
223 | 0 | DCHECK(false); |
224 | 0 | return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id); |
225 | 0 | } |
226 | 265 | _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); |
227 | 265 | return Status::OK(); |
228 | 265 | } |
229 | | |
230 | 328 | Status InvertedIndexFileCollection::begin_close() { |
231 | 328 | std::lock_guard lock(_lock); |
232 | 328 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
233 | 36 | RETURN_IF_ERROR(writer->begin_close()); |
234 | 36 | _total_size += writer->get_index_file_total_size(); |
235 | 36 | } |
236 | | |
237 | 328 | return Status::OK(); |
238 | 328 | } |
239 | | |
240 | 969 | Status InvertedIndexFileCollection::finish_close() { |
241 | 969 | std::lock_guard lock(_lock); |
242 | 969 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
243 | 264 | RETURN_IF_ERROR(writer->finish_close()); |
244 | 264 | } |
245 | 969 | return Status::OK(); |
246 | 969 | } |
247 | | |
248 | | Result<std::vector<const InvertedIndexFileInfo*>> |
249 | 106 | InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { |
250 | 106 | std::lock_guard lock(_lock); |
251 | | |
252 | 106 | Status st; |
253 | 106 | std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size()); |
254 | 106 | bool succ = std::all_of( |
255 | 106 | _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), |
256 | 215 | [&](auto&& it) { |
257 | 215 | auto&& [seg_id, writer] = it; |
258 | | |
259 | 215 | int idx = seg_id - seg_id_offset; |
260 | 215 | if (idx >= idx_file_info.size()) [[unlikely]] { |
261 | 0 | auto err_msg = |
262 | 0 | fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", |
263 | 0 | seg_id, idx_file_info.size(), seg_id_offset); |
264 | 0 | DCHECK(false) << err_msg; |
265 | 0 | st = Status::InternalError(err_msg); |
266 | 0 | return false; |
267 | 0 | } |
268 | 215 | idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); |
269 | 215 | return true; |
270 | 215 | }); |
271 | | |
272 | 106 | if (succ) { |
273 | 106 | return idx_file_info; |
274 | 106 | } |
275 | | |
276 | 0 | return ResultError(st); |
277 | 106 | } |
278 | | |
279 | | BaseBetaRowsetWriter::BaseBetaRowsetWriter() |
280 | 1.00k | : _num_segment(0), |
281 | 1.00k | _segment_start_id(0), |
282 | 1.00k | _num_rows_written(0), |
283 | 1.00k | _total_data_size(0), |
284 | 1.00k | _total_index_size(0), |
285 | 1.00k | _segment_creator(_context, _seg_files, _idx_files) {} |
286 | | |
287 | | BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) |
288 | 1.00k | : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} |
289 | | |
290 | 2 | RowBinlogRowsetWriter::RowBinlogRowsetWriter(StorageEngine& engine) : BetaRowsetWriter(engine) {} |
291 | | |
292 | 1.00k | BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { |
293 | 1.00k | if (!_already_built && _rowset_meta->is_local()) { |
294 | | // abnormal exit, remove all files generated |
295 | 8 | auto& fs = io::global_local_filesystem(); |
296 | 8 | for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { |
297 | 0 | std::string seg_path = |
298 | 0 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i); |
299 | | // Even if an error is encountered, these files that have not been cleaned up |
300 | | // will be cleaned up by the GC background. So here we only print the error |
301 | | // message when we encounter an error. |
302 | 0 | WARN_IF_ERROR(fs->delete_file(seg_path), |
303 | 0 | fmt::format("Failed to delete file={}", seg_path)); |
304 | 0 | } |
305 | 8 | } |
306 | 1.00k | if (_calc_delete_bitmap_token) { |
307 | 8 | _calc_delete_bitmap_token->cancel(); |
308 | 8 | } |
309 | 1.00k | } |
310 | | |
311 | 1.00k | BetaRowsetWriter::~BetaRowsetWriter() { |
312 | | /* Note that segcompaction is async and in parallel with load job. So we should handle carefully |
313 | | * when the job is cancelled. Although it is meaningless to continue segcompaction when the job |
314 | | * is cancelled, the objects involved in the job should be preserved during segcompaction to |
315 | | * avoid crashs for memory issues. */ |
316 | 1.00k | WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); |
317 | 1.00k | } |
318 | | |
319 | 1.00k | Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
320 | 1.00k | _context = rowset_writer_context; |
321 | 1.00k | _rowset_meta.reset(new RowsetMeta); |
322 | 1.00k | if (_context.storage_resource) { |
323 | 0 | _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
324 | 0 | } |
325 | 1.00k | _rowset_meta->set_rowset_id(_context.rowset_id); |
326 | 1.00k | _rowset_meta->set_partition_id(_context.partition_id); |
327 | 1.00k | _rowset_meta->set_tablet_id(_context.tablet_id); |
328 | 1.00k | _rowset_meta->set_index_id(_context.index_id); |
329 | 1.00k | _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
330 | 1.00k | _rowset_meta->set_rowset_type(_context.rowset_type); |
331 | 1.00k | _rowset_meta->set_rowset_state(_context.rowset_state); |
332 | 1.00k | _rowset_meta->set_segments_overlap(_context.segments_overlap); |
333 | 1.00k | if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
334 | 32 | _is_pending = true; |
335 | 32 | _rowset_meta->set_txn_id(_context.txn_id); |
336 | 32 | _rowset_meta->set_load_id(_context.load_id); |
337 | 969 | } else { |
338 | 969 | _rowset_meta->set_version(_context.version); |
339 | 969 | _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
340 | 969 | } |
341 | 1.00k | _rowset_meta->set_tablet_uid(_context.tablet_uid); |
342 | 1.00k | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
343 | 1.00k | if (_context.write_binlog_opt().is_binlog_writer()) { |
344 | 2 | _rowset_meta->mark_row_binlog(); |
345 | 2 | } |
346 | 1.00k | _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
347 | 1.00k | _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
348 | 1.00k | return Status::OK(); |
349 | 1.00k | } |
350 | | |
351 | 1.36k | Status BaseBetaRowsetWriter::add_block(const Block* block) { |
352 | 1.36k | return _segment_creator.add_block(block); |
353 | 1.36k | } |
354 | | |
355 | 63 | Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { |
356 | 63 | SCOPED_RAW_TIMER(&_delete_bitmap_ns); |
357 | 63 | if (!_context.tablet->enable_unique_key_merge_on_write() || |
358 | 63 | (_context.partial_update_info && _context.partial_update_info->is_partial_update())) { |
359 | 0 | return Status::OK(); |
360 | 0 | } |
361 | 63 | std::vector<RowsetSharedPtr> specified_rowsets; |
362 | 63 | { |
363 | 63 | std::shared_lock meta_rlock(_context.tablet->get_header_lock()); |
364 | 63 | specified_rowsets = |
365 | 63 | _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get()); |
366 | 63 | } |
367 | | |
368 | | // Submit the entire delete bitmap calculation process to thread pool for async execution |
369 | | // This avoids blocking memtable flush thread while waiting for file upload to complete |
370 | | // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap |
371 | 63 | return _calc_delete_bitmap_token->submit_func( |
372 | 63 | [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status { |
373 | 63 | Status st = Status::OK(); |
374 | | // Step 1: Close file_writer (must be done before load_segments) |
375 | 63 | auto* file_writer = _seg_files.get(segment_id); |
376 | 63 | if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) { |
377 | 63 | MonotonicStopWatch close_timer; |
378 | 63 | close_timer.start(); |
379 | 63 | st = file_writer->close(); |
380 | 63 | close_timer.stop(); |
381 | | |
382 | 63 | auto close_time_ms = close_timer.elapsed_time_milliseconds(); |
383 | 63 | if (close_time_ms > 1000) { |
384 | 0 | LOG(INFO) << "file_writer->close() took " << close_time_ms |
385 | 0 | << "ms for segment_id=" << segment_id |
386 | 0 | << ", tablet_id=" << _context.tablet_id |
387 | 0 | << ", rowset_id=" << _context.rowset_id; |
388 | 0 | } |
389 | 63 | if (!st.ok()) { |
390 | 0 | return st; |
391 | 0 | } |
392 | 63 | } |
393 | | |
394 | 63 | OlapStopWatch watch; |
395 | | // Step 2: Build tmp rowset (needs file_writer to be closed) |
396 | 63 | RowsetSharedPtr rowset_ptr; |
397 | 63 | st = _build_tmp(rowset_ptr); |
398 | 63 | if (!st.ok()) { |
399 | 0 | return st; |
400 | 0 | } |
401 | | |
402 | | // Step 3: Load segments (needs file_writer to be closed and rowset to be built) |
403 | 63 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
404 | 63 | std::vector<segment_v2::SegmentSharedPtr> segments; |
405 | 63 | st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments); |
406 | 63 | if (!st.ok()) { |
407 | 0 | return st; |
408 | 0 | } |
409 | | |
410 | | // Step 4: Calculate delete bitmap |
411 | 63 | st = BaseTablet::calc_delete_bitmap( |
412 | 63 | _context.tablet, rowset_ptr, segments, specified_rowsets, |
413 | 63 | _context.mow_context->delete_bitmap, _context.mow_context->max_version, |
414 | 63 | nullptr, nullptr, nullptr); |
415 | 63 | if (!st.ok()) { |
416 | 0 | return st; |
417 | 0 | } |
418 | | |
419 | 63 | size_t total_rows = |
420 | 63 | std::accumulate(segments.begin(), segments.end(), 0, |
421 | 63 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { |
422 | 63 | return sum += s->num_rows(); |
423 | 63 | }); |
424 | 63 | LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " |
425 | 63 | << _context.tablet->tablet_id() |
426 | 63 | << ", rowset_ids: " << _context.mow_context->rowset_ids->size() |
427 | 63 | << ", cur max_version: " << _context.mow_context->max_version |
428 | 63 | << ", transaction_id: " << _context.mow_context->txn_id |
429 | 63 | << ", delete_bitmap_count: " |
430 | 63 | << _context.mow_context->delete_bitmap->get_delete_bitmap_count() |
431 | 63 | << ", delete_bitmap_cardinality: " |
432 | 63 | << _context.mow_context->delete_bitmap->cardinality() |
433 | 63 | << ", cost: " << watch.get_elapse_time_us() |
434 | 63 | << "(us), total rows: " << total_rows; |
435 | 63 | return Status::OK(); |
436 | 63 | }); |
437 | 63 | } |
438 | | |
439 | 1.00k | Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
440 | 1.00k | RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); |
441 | 1.00k | if (_segcompaction_worker) { |
442 | 1.00k | _segcompaction_worker->init_mem_tracker(rowset_writer_context); |
443 | 1.00k | } |
444 | 1.00k | if (_context.mow_context != nullptr) { |
445 | 8 | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
446 | 8 | } |
447 | 1.00k | return Status::OK(); |
448 | 1.00k | } |
449 | | |
450 | | Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, |
451 | 77 | int32_t segment_id) { |
452 | 77 | DCHECK(_rowset_meta->is_local()); |
453 | 77 | auto fs = _rowset_meta->fs(); |
454 | 77 | if (!fs) { |
455 | 0 | return Status::Error<INIT_FAILED>( |
456 | 0 | "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); |
457 | 0 | } |
458 | 77 | auto path = |
459 | 77 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id); |
460 | 77 | io::FileReaderOptions reader_options { |
461 | 77 | .cache_type = |
462 | 77 | _context.write_file_cache |
463 | 77 | ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
464 | 0 | : io::FileCachePolicy::NO_CACHE) |
465 | 77 | : io::FileCachePolicy::NO_CACHE, |
466 | 77 | .is_doris_table = true, |
467 | 77 | .cache_base_path {}, |
468 | 77 | .tablet_id = _rowset_meta->tablet_id(), |
469 | 77 | }; |
470 | 77 | auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), |
471 | 77 | _context.tablet_schema, reader_options, &segment); |
472 | 77 | if (!s.ok()) { |
473 | 0 | LOG(WARNING) << "failed to open segment. " << path << ":" << s; |
474 | 0 | return s; |
475 | 0 | } |
476 | 77 | return Status::OK(); |
477 | 77 | } |
478 | | |
479 | | /* policy of segcompaction target selection: |
480 | | * 1. skip big segments |
481 | | * 2. if the consecutive smalls end up with a big, compact the smalls, except |
482 | | * single small |
483 | | * 3. if the consecutive smalls end up with small, compact the smalls if the |
484 | | * length is beyond (config::segcompaction_batch_size / 2) |
485 | | */ |
486 | | Status BetaRowsetWriter::_find_longest_consecutive_small_segment( |
487 | 15 | SegCompactionCandidatesSharedPtr& segments) { |
488 | 15 | segments = std::make_shared<SegCompactionCandidates>(); |
489 | | // skip last (maybe active) segment |
490 | 15 | int32_t last_segment = _num_segment - 1; |
491 | 15 | size_t task_bytes = 0; |
492 | 15 | uint32_t task_rows = 0; |
493 | 15 | int32_t segid; |
494 | 15 | for (segid = _segcompacted_point; |
495 | 86 | segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { |
496 | 77 | segment_v2::SegmentSharedPtr segment; |
497 | 77 | RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); |
498 | 77 | const auto segment_rows = segment->num_rows(); |
499 | 77 | const auto segment_bytes = segment->file_reader()->size(); |
500 | 77 | bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || |
501 | 77 | segment_bytes > config::segcompaction_candidate_max_bytes; |
502 | 77 | if (is_large_segment) { |
503 | 14 | if (segid == _segcompacted_point) { |
504 | | // skip large segments at the front |
505 | 8 | auto dst_seg_id = _num_segcompacted.load(); |
506 | 8 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
507 | 8 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
508 | 4 | _segcompaction_worker->convert_segment_delete_bitmap( |
509 | 4 | _context.mow_context->delete_bitmap, segid, dst_seg_id); |
510 | 4 | } |
511 | 8 | continue; |
512 | 8 | } else { |
513 | | // stop because we need consecutive segments |
514 | 6 | break; |
515 | 6 | } |
516 | 14 | } |
517 | 63 | bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || |
518 | 63 | task_bytes + segment_bytes > config::segcompaction_task_max_bytes; |
519 | 63 | if (is_task_full) { |
520 | 0 | break; |
521 | 0 | } |
522 | 63 | segments->push_back(segment); |
523 | 63 | task_rows += segment->num_rows(); |
524 | 63 | task_bytes += segment->file_reader()->size(); |
525 | 63 | } |
526 | 15 | size_t s = segments->size(); |
527 | 15 | if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { |
528 | | // we didn't collect enough segments, better to do it in next |
529 | | // round to compact more at once |
530 | 0 | segments->clear(); |
531 | 0 | return Status::OK(); |
532 | 0 | } |
533 | 15 | if (s == 1) { // poor bachelor, let it go |
534 | 4 | VLOG_DEBUG << "only one candidate segment"; |
535 | 4 | auto src_seg_id = _segcompacted_point.load(); |
536 | 4 | auto dst_seg_id = _num_segcompacted.load(); |
537 | 4 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
538 | 4 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
539 | 2 | _segcompaction_worker->convert_segment_delete_bitmap( |
540 | 2 | _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); |
541 | 2 | } |
542 | 4 | segments->clear(); |
543 | 4 | return Status::OK(); |
544 | 4 | } |
545 | 11 | if (VLOG_DEBUG_IS_ON) { |
546 | 0 | vlog_buffer.clear(); |
547 | 0 | for (auto& segment : (*segments.get())) { |
548 | 0 | fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); |
549 | 0 | } |
550 | 0 | VLOG_DEBUG << "candidate segments num:" << s |
551 | 0 | << " list of candidates:" << fmt::to_string(vlog_buffer); |
552 | 0 | } |
553 | 11 | return Status::OK(); |
554 | 15 | } |
555 | | |
556 | 11 | Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { |
557 | 11 | int ret; |
558 | 11 | auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
559 | 11 | _context.rowset_id, begin, end); |
560 | 11 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
561 | 11 | _num_segcompacted); |
562 | 11 | ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
563 | 11 | if (ret) { |
564 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
565 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
566 | 0 | errno); |
567 | 0 | } |
568 | 11 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
569 | | |
570 | | // rename inverted index files |
571 | 11 | RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0)); |
572 | | |
573 | 11 | _num_segcompacted++; |
574 | 11 | return Status::OK(); |
575 | 11 | } |
576 | | |
577 | | void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin, |
578 | 43 | uint32_t end) { |
579 | 43 | VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; |
580 | 134 | for (uint32_t i = begin; i <= end; ++i) { |
581 | 91 | _segid_statistics_map.erase(i); |
582 | 91 | } |
583 | 43 | } |
584 | | |
585 | 42 | Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) { |
586 | 42 | if (seg_id == _num_segcompacted) { |
587 | 10 | ++_num_segcompacted; |
588 | 10 | return Status::OK(); |
589 | 10 | } |
590 | | |
591 | 32 | auto src_seg_path = |
592 | 32 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id); |
593 | 32 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
594 | 32 | _num_segcompacted); |
595 | 32 | VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " |
596 | 0 | << dst_seg_path; |
597 | 32 | { |
598 | 32 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
599 | 32 | DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); |
600 | 32 | DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), |
601 | 32 | true); |
602 | 32 | auto org = _segid_statistics_map[seg_id]; |
603 | 32 | _segid_statistics_map.emplace(_num_segcompacted, org); |
604 | 32 | _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); |
605 | 32 | } |
606 | 32 | int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
607 | 32 | if (ret) { |
608 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
609 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
610 | 0 | errno); |
611 | 0 | } |
612 | | |
613 | 32 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
614 | | // rename remaining inverted index files |
615 | 32 | RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id)); |
616 | | |
617 | 32 | ++_num_segcompacted; |
618 | 32 | return Status::OK(); |
619 | 32 | } |
620 | | |
621 | | Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id, |
622 | 43 | const std::string& segment_path) { |
623 | 43 | auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
624 | 43 | if (!footer_page_cache) { |
625 | 0 | return Status::OK(); |
626 | 0 | } |
627 | | |
628 | 43 | auto fs = _rowset_meta->fs(); |
629 | 43 | bool exists = false; |
630 | 43 | RETURN_IF_ERROR(fs->exists(segment_path, &exists)); |
631 | 43 | if (exists) { |
632 | 43 | io::FileReaderSPtr file_reader; |
633 | 43 | io::FileReaderOptions reader_options { |
634 | 43 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
635 | 43 | : io::FileCachePolicy::NO_CACHE, |
636 | 43 | .is_doris_table = true, |
637 | 43 | .cache_base_path = "", |
638 | 43 | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
639 | 43 | .tablet_id = _rowset_meta->tablet_id(), |
640 | 43 | }; |
641 | 43 | RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options)); |
642 | 43 | DCHECK(file_reader != nullptr); |
643 | 43 | auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader); |
644 | 43 | footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE); |
645 | 43 | } |
646 | 43 | return Status::OK(); |
647 | 43 | } |
648 | | |
649 | 43 | Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) { |
650 | 43 | int ret; |
651 | | |
652 | 43 | auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path, |
653 | 32 | _context.rowset_id.to_string(), seg_id) |
654 | 43 | : BetaRowset::local_segment_path_segcompacted( |
655 | 11 | _context.tablet_path, _context.rowset_id, begin, end); |
656 | 43 | auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path); |
657 | 43 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
658 | 43 | _num_segcompacted); |
659 | 43 | auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path); |
660 | | |
661 | 43 | if (_context.tablet_schema->get_inverted_index_storage_format() >= |
662 | 43 | InvertedIndexStorageFormatPB::V2) { |
663 | 22 | if (_context.tablet_schema->has_inverted_index() || |
664 | 22 | _context.tablet_schema->has_ann_index()) { |
665 | 22 | auto src_idx_path = |
666 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix); |
667 | 22 | auto dst_idx_path = |
668 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix); |
669 | | |
670 | 22 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
671 | 22 | if (ret) { |
672 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
673 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path, |
674 | 0 | ret, errno); |
675 | 0 | } |
676 | 22 | } |
677 | 22 | } |
678 | | // rename remaining inverted index files |
679 | 150 | for (auto column : _context.tablet_schema->columns()) { |
680 | 150 | auto index_infos = _context.tablet_schema->inverted_indexs(*column); |
681 | 150 | for (const auto& index_info : index_infos) { |
682 | 44 | auto index_id = index_info->index_id(); |
683 | 44 | if (_context.tablet_schema->get_inverted_index_storage_format() == |
684 | 44 | InvertedIndexStorageFormatPB::V1) { |
685 | 0 | auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
686 | 0 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
687 | 0 | auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
688 | 0 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
689 | 0 | VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to " |
690 | 0 | << dst_idx_path; |
691 | 0 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
692 | 0 | if (ret) { |
693 | 0 | return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>( |
694 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, |
695 | 0 | dst_idx_path, ret, errno); |
696 | 0 | } |
697 | 0 | } |
698 | | // Erase the origin index file cache |
699 | 44 | auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
700 | 44 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
701 | 44 | auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
702 | 44 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
703 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key)); |
704 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key)); |
705 | 44 | } |
706 | 150 | } |
707 | 43 | return Status::OK(); |
708 | 43 | } |
709 | | |
710 | 1.20k | Status BetaRowsetWriter::_segcompaction_if_necessary() { |
711 | 1.20k | Status status = Status::OK(); |
712 | | // if not doing segcompaction, just check segment number |
713 | 1.20k | if (!config::enable_segcompaction || !_context.enable_segcompaction || |
714 | 1.20k | _context.tablet_schema->num_variant_columns() > 0) { |
715 | 1.08k | return _check_segment_number_limit(_num_segment); |
716 | 1.08k | } |
717 | | // leave _is_doing_segcompaction as the last condition |
718 | | // otherwise _segcompacting_cond will never get notified |
719 | 121 | if (_is_doing_segcompaction.exchange(true)) { |
720 | 0 | return status; |
721 | 0 | } |
722 | 121 | if (_segcompaction_status.load() != OK) { |
723 | 0 | status = Status::Error<SEGCOMPACTION_FAILED>( |
724 | 0 | "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", |
725 | 0 | _segcompaction_status.load()); |
726 | 121 | } else { |
727 | 121 | status = _check_segment_number_limit(_num_segcompacted); |
728 | 121 | } |
729 | 121 | if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { |
730 | 15 | SegCompactionCandidatesSharedPtr segments; |
731 | 15 | status = _find_longest_consecutive_small_segment(segments); |
732 | 15 | if (LIKELY(status.ok()) && (!segments->empty())) { |
733 | 11 | LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id |
734 | 11 | << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment |
735 | 11 | << ", segcompacted_point:" << _segcompacted_point; |
736 | 11 | status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); |
737 | 11 | if (status.ok()) { |
738 | 11 | return status; |
739 | 11 | } |
740 | 11 | } |
741 | 15 | } |
742 | 110 | { |
743 | 110 | std::lock_guard lk(_is_doing_segcompaction_lock); |
744 | 110 | _is_doing_segcompaction = false; |
745 | 110 | _segcompacting_cond.notify_all(); |
746 | 110 | } |
747 | 110 | return status; |
748 | 121 | } |
749 | | |
750 | 641 | Status BetaRowsetWriter::_segcompaction_rename_last_segments() { |
751 | 641 | DCHECK_EQ(_is_doing_segcompaction, false); |
752 | 641 | if (!config::enable_segcompaction) { |
753 | 144 | return Status::OK(); |
754 | 144 | } |
755 | 497 | if (_segcompaction_status.load() != OK) { |
756 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
757 | 0 | "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error " |
758 | 0 | "code: {}", |
759 | 0 | _segcompaction_status.load()); |
760 | 0 | } |
761 | 497 | if (!is_segcompacted() || _segcompacted_point == _num_segment) { |
762 | | // no need if never segcompact before or all segcompacted |
763 | 488 | return Status::OK(); |
764 | 488 | } |
765 | | // currently we only rename remaining segments to reduce wait time |
766 | | // so that transaction can be committed ASAP |
767 | 9 | VLOG_DEBUG << "segcompaction last few segments"; |
768 | 39 | for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { |
769 | 30 | auto dst_segid = _num_segcompacted.load(); |
770 | 30 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
771 | 30 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
772 | 16 | _segcompaction_worker->convert_segment_delete_bitmap( |
773 | 16 | _context.mow_context->delete_bitmap, segid, dst_segid); |
774 | 16 | } |
775 | 30 | } |
776 | 9 | return Status::OK(); |
777 | 9 | } |
778 | | |
779 | 9 | Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { |
780 | 9 | assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); |
781 | 9 | RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); |
782 | 9 | _num_rows_written += rowset->num_rows(); |
783 | 9 | const auto& rowset_meta = rowset->rowset_meta(); |
784 | 9 | auto index_size = rowset_meta->index_disk_size(); |
785 | 9 | auto total_size = rowset_meta->total_disk_size(); |
786 | 9 | auto data_size = rowset_meta->data_disk_size(); |
787 | | // corrupted index size caused by bug before 2.1.5 or 3.0.0 version |
788 | | // try to get real index size from disk. |
789 | 9 | if (index_size < 0 || index_size > total_size * 2) { |
790 | 0 | LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size |
791 | 0 | << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id() |
792 | 0 | << " rowset:" << rowset_meta->rowset_id(); |
793 | 0 | index_size = 0; |
794 | 0 | auto st = rowset->get_inverted_index_size(&index_size); |
795 | 0 | if (!st.ok()) { |
796 | 0 | if (!st.is<NOT_FOUND>()) { |
797 | 0 | LOG(ERROR) << "failed to get inverted index size. res=" << st; |
798 | 0 | return st; |
799 | 0 | } |
800 | 0 | } |
801 | 0 | } |
802 | 9 | _total_data_size += data_size; |
803 | 9 | _total_index_size += index_size; |
804 | 9 | _num_segment += cast_set<int32_t>(rowset->num_segments()); |
805 | | // append key_bounds to current rowset |
806 | 9 | RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); |
807 | 9 | rowset->get_num_segment_rows(&_segment_num_rows); |
808 | 9 | _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); |
809 | | |
810 | | // TODO update zonemap |
811 | 9 | if (rowset->rowset_meta()->has_delete_predicate()) { |
812 | 0 | _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate()); |
813 | 0 | } |
814 | | // Update the tablet schema in the rowset metadata if the tablet schema contains a variant. |
815 | | // During the build process, _context.tablet_schema will be used as the rowset schema. |
816 | | // This situation may arise in the event of a linked schema change. If this schema is not set, |
817 | | // the subcolumns of the variant will be lost. |
818 | 9 | if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) { |
819 | 0 | _context.tablet_schema = rowset->tablet_schema(); |
820 | 0 | } |
821 | 9 | return Status::OK(); |
822 | 9 | } |
823 | | |
824 | 0 | Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { |
825 | | // TODO use schema_mapping to transfer zonemap |
826 | 0 | return add_rowset(rowset); |
827 | 0 | } |
828 | | |
829 | 1.14k | Status BaseBetaRowsetWriter::flush() { |
830 | 1.14k | return _segment_creator.flush(); |
831 | 1.14k | } |
832 | | |
833 | 12 | Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) { |
834 | 12 | if (block->rows() == 0) { |
835 | 0 | return Status::OK(); |
836 | 0 | } |
837 | | |
838 | 12 | { |
839 | 12 | SCOPED_RAW_TIMER(&_segment_writer_ns); |
840 | 12 | RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); |
841 | 12 | } |
842 | 12 | return Status::OK(); |
843 | 12 | } |
844 | | |
845 | 0 | Status BaseBetaRowsetWriter::flush_single_block(const Block* block) { |
846 | 0 | return _segment_creator.flush_single_block(block); |
847 | 0 | } |
848 | | |
849 | 1.00k | Status BetaRowsetWriter::_wait_flying_segcompaction() { |
850 | 1.00k | std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); |
851 | 1.00k | uint64_t begin_wait = GetCurrentTimeMicros(); |
852 | 1.00k | while (_is_doing_segcompaction) { |
853 | | // change sync wait to async? |
854 | 0 | _segcompacting_cond.wait(l); |
855 | 0 | } |
856 | 1.00k | uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; |
857 | 1.00k | if (elapsed >= MICROS_PER_SEC) { |
858 | 0 | LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; |
859 | 0 | } |
860 | 1.00k | if (_segcompaction_status.load() != OK) { |
861 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
862 | 0 | "BetaRowsetWriter meet invalid state, error code: {}", |
863 | 0 | _segcompaction_status.load()); |
864 | 0 | } |
865 | 1.00k | return Status::OK(); |
866 | 1.00k | } |
867 | | |
868 | 24 | RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { |
869 | 24 | if (_rowset_meta->newest_write_timestamp() == -1) { |
870 | 0 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
871 | 0 | } |
872 | | |
873 | 24 | build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); |
874 | 24 | RowsetSharedPtr rowset; |
875 | 24 | auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, |
876 | 24 | _rowset_meta, &rowset); |
877 | 24 | if (!status.ok()) { |
878 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
879 | 0 | return nullptr; |
880 | 0 | } |
881 | 24 | _already_built = true; |
882 | 24 | return rowset; |
883 | 24 | } |
884 | | |
885 | 641 | Status BaseBetaRowsetWriter::_close_file_writers() { |
886 | | // Flush and close segment files |
887 | 641 | RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), |
888 | 641 | "failed to close segment creator when build new rowset"); |
889 | 641 | return Status::OK(); |
890 | 641 | } |
891 | | |
892 | 641 | Status BetaRowsetWriter::_close_file_writers() { |
893 | 641 | RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); |
894 | | // if _segment_start_id is not zero, that means it's a transient rowset writer for |
895 | | // MoW partial update, don't need to do segment compaction. |
896 | 641 | if (_segment_start_id == 0) { |
897 | 641 | if (_segcompaction_worker->cancel()) { |
898 | 641 | std::lock_guard lk(_is_doing_segcompaction_lock); |
899 | 641 | _is_doing_segcompaction = false; |
900 | 641 | _segcompacting_cond.notify_all(); |
901 | 641 | } else { |
902 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), |
903 | 0 | "segcompaction failed when build new rowset"); |
904 | 0 | } |
905 | 641 | RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), |
906 | 641 | "rename last segments failed when build new rowset"); |
907 | | // segcompaction worker would do file wrier's close function in compact_segments |
908 | 641 | if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer(); |
909 | 641 | nullptr != seg_comp_file_writer && |
910 | 641 | seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) { |
911 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(), |
912 | 0 | "close segment compaction worker failed"); |
913 | 0 | } |
914 | | // process delete bitmap for mow table |
915 | 641 | if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { |
916 | 4 | auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); |
917 | | // which means the segment compaction is triggerd |
918 | 4 | if (converted_delete_bitmap != nullptr) { |
919 | 4 | RowsetIdUnorderedSet rowsetids; |
920 | 4 | rowsetids.insert(rowset_id()); |
921 | 4 | context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), |
922 | 4 | rowsetids); |
923 | 4 | context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, |
924 | 4 | {rowset_id(), UINT32_MAX, INT64_MAX}); |
925 | 4 | context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); |
926 | 4 | } |
927 | 4 | } |
928 | 641 | } |
929 | 641 | return Status::OK(); |
930 | 641 | } |
931 | | |
932 | 969 | Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { |
933 | 969 | if (_calc_delete_bitmap_token != nullptr) { |
934 | 8 | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
935 | 8 | } |
936 | 969 | RETURN_IF_ERROR(_close_file_writers()); |
937 | 969 | const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; |
938 | 969 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), |
939 | 969 | "too many segments when build new rowset"); |
940 | 969 | RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true)); |
941 | 969 | if (_is_pending) { |
942 | 27 | _rowset_meta->set_rowset_state(COMMITTED); |
943 | 942 | } else { |
944 | 942 | _rowset_meta->set_rowset_state(VISIBLE); |
945 | 942 | } |
946 | | |
947 | 969 | if (_rowset_meta->newest_write_timestamp() == -1) { |
948 | 630 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
949 | 630 | } |
950 | | |
951 | 969 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
952 | | |
953 | | // If segment compaction occurs, the idx file info will become inaccurate. |
954 | 969 | if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) && |
955 | 969 | _num_segcompacted == 0) { |
956 | 106 | if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
957 | 106 | !idx_files_info.has_value()) [[unlikely]] { |
958 | 0 | LOG(ERROR) << "expected inverted index files info, but none presents: " |
959 | 0 | << idx_files_info.error(); |
960 | 106 | } else { |
961 | 106 | _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
962 | 106 | } |
963 | 106 | } |
964 | | |
965 | 969 | RETURN_NOT_OK_STATUS_WITH_WARN( |
966 | 969 | RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
967 | 969 | &rowset), |
968 | 969 | "rowset init failed when build new rowset"); |
969 | 969 | _already_built = true; |
970 | 969 | return Status::OK(); |
971 | 969 | } |
972 | | |
973 | 0 | int64_t BaseBetaRowsetWriter::_num_seg() const { |
974 | 0 | return _num_segment; |
975 | 0 | } |
976 | | |
977 | 1.03k | int64_t BetaRowsetWriter::_num_seg() const { |
978 | 1.03k | return is_segcompacted() ? _num_segcompacted : _num_segment; |
979 | 1.03k | } |
980 | | |
981 | 1.03k | Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
982 | 1.03k | int64_t num_rows_written = 0; |
983 | 1.03k | int64_t total_data_size = 0; |
984 | 1.03k | int64_t total_index_size = 0; |
985 | 1.03k | std::vector<KeyBoundsPB> segments_encoded_key_bounds; |
986 | 1.03k | std::vector<uint32_t> segment_rows; |
987 | 1.03k | { |
988 | 1.03k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
989 | 1.50k | for (const auto& itr : _segid_statistics_map) { |
990 | 1.50k | num_rows_written += itr.second.row_num; |
991 | 1.50k | total_data_size += itr.second.data_size; |
992 | 1.50k | total_index_size += itr.second.index_size; |
993 | 1.50k | segments_encoded_key_bounds.push_back(itr.second.key_bounds); |
994 | | // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load |
995 | 1.50k | segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num)); |
996 | 1.50k | } |
997 | 1.03k | } |
998 | 1.03k | if (segment_rows.empty()) { |
999 | | // vertical compaction and linked schema change will not record segment statistics, |
1000 | | // it will record segment rows in _segment_num_rows |
1001 | 414 | RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); |
1002 | 414 | } |
1003 | | |
1004 | 1.18k | for (auto& key_bound : _segments_encoded_key_bounds) { |
1005 | 1.18k | segments_encoded_key_bounds.push_back(key_bound); |
1006 | 1.18k | } |
1007 | 1.03k | if (_segments_key_bounds_truncated.has_value()) { |
1008 | 9 | rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); |
1009 | 9 | } |
1010 | 1.03k | rowset_meta->set_num_segment_rows(segment_rows); |
1011 | | // segment key bounds are empty in old version(before version 1.2.x). So we should not modify |
1012 | | // the overlap property when key bounds are empty. |
1013 | | // for mow table with cluster keys, the overlap is used for cluster keys, |
1014 | | // the key_bounds is primary keys |
1015 | 1.03k | if (!segments_encoded_key_bounds.empty() && |
1016 | 1.03k | !is_segment_overlapping(segments_encoded_key_bounds) && |
1017 | 1.03k | _context.tablet_schema->cluster_key_uids().empty()) { |
1018 | 556 | rowset_meta->set_segments_overlap(NONOVERLAPPING); |
1019 | 556 | } |
1020 | | |
1021 | 1.03k | auto segment_num = _num_seg(); |
1022 | 1.03k | if (check_segment_num && config::check_segment_when_build_rowset_meta) { |
1023 | 0 | auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size(); |
1024 | 0 | if (segments_encoded_key_bounds_size != segment_num) { |
1025 | 0 | return Status::InternalError( |
1026 | 0 | "segments_encoded_key_bounds_size should equal to _num_seg, " |
1027 | 0 | "segments_encoded_key_bounds_size " |
1028 | 0 | "is: {}, _num_seg is: {}", |
1029 | 0 | segments_encoded_key_bounds_size, segment_num); |
1030 | 0 | } |
1031 | 0 | if (segment_rows.size() != segment_num) { |
1032 | 0 | return Status::InternalError( |
1033 | 0 | "segment_rows size should equal to _num_seg, segment_rows size is: {}, " |
1034 | 0 | "_num_seg is {}, tablet={}, rowset={}, txn={}", |
1035 | 0 | segment_rows.size(), segment_num, _context.tablet_id, |
1036 | 0 | _context.rowset_id.to_string(), _context.txn_id); |
1037 | 0 | } |
1038 | 0 | } |
1039 | | |
1040 | 1.03k | rowset_meta->set_num_segments(segment_num); |
1041 | 1.03k | rowset_meta->set_num_rows(num_rows_written + _num_rows_written); |
1042 | 1.03k | rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + |
1043 | 1.03k | _total_index_size); |
1044 | 1.03k | rowset_meta->set_data_disk_size(total_data_size + _total_data_size); |
1045 | 1.03k | rowset_meta->set_index_disk_size(total_index_size + _total_index_size); |
1046 | 1.03k | bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds && |
1047 | 1.03k | !_context.enable_unique_key_merge_on_write; |
1048 | 1.03k | rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds); |
1049 | | // TODO write zonemap to meta |
1050 | 1.03k | rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); |
1051 | 1.03k | rowset_meta->set_creation_time(time(nullptr)); |
1052 | 1.03k | return Status::OK(); |
1053 | 1.03k | } |
1054 | | |
1055 | 63 | Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { |
1056 | 63 | Status status; |
1057 | 63 | std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>(); |
1058 | 63 | tmp_rs_meta->init(_rowset_meta.get()); |
1059 | | |
1060 | 63 | status = _build_rowset_meta(tmp_rs_meta.get()); |
1061 | 63 | if (!status.ok()) { |
1062 | 0 | LOG(WARNING) << "failed to build rowset meta, res=" << status; |
1063 | 0 | return status; |
1064 | 0 | } |
1065 | | |
1066 | 63 | status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta, |
1067 | 63 | &rowset_ptr); |
1068 | 63 | DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed", |
1069 | 63 | { status = Status::InternalError("create rowset failed"); }); |
1070 | 63 | if (!status.ok()) { |
1071 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
1072 | 0 | return status; |
1073 | 0 | } |
1074 | 63 | return Status::OK(); |
1075 | 63 | } |
1076 | | |
1077 | | Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, |
1078 | | io::FileWriterPtr& file_writer, |
1079 | 2.68k | bool is_index_file) { |
1080 | 2.68k | io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file); |
1081 | 2.68k | Status st = _context.fs()->create_file(path, &file_writer, &opts); |
1082 | 2.68k | if (!st.ok()) { |
1083 | 0 | LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; |
1084 | 0 | return st; |
1085 | 0 | } |
1086 | | |
1087 | 2.68k | DCHECK(file_writer != nullptr); |
1088 | 2.68k | return Status::OK(); |
1089 | 2.68k | } |
1090 | | |
1091 | | Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, |
1092 | 2.66k | FileType file_type) { |
1093 | 2.66k | auto segment_path = _context.segment_path(segment_id); |
1094 | 2.66k | if (file_type == FileType::INVERTED_INDEX_FILE) { |
1095 | 255 | std::string prefix = |
1096 | 255 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; |
1097 | 255 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1098 | 255 | return _create_file_writer(index_path, file_writer, true /* is_index_file */); |
1099 | 2.40k | } else if (file_type == FileType::SEGMENT_FILE) { |
1100 | 2.40k | return _create_file_writer(segment_path, file_writer, false /* is_index_file */); |
1101 | 2.40k | } |
1102 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1103 | 0 | fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); |
1104 | 2.66k | } |
1105 | | |
1106 | | Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id, |
1107 | 228 | IndexFileWriterPtr* index_file_writer) { |
1108 | 228 | RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer)); |
1109 | | // used for inverted index format v1 |
1110 | 228 | (*index_file_writer) |
1111 | 228 | ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */)); |
1112 | 228 | return Status::OK(); |
1113 | 228 | } |
1114 | | |
1115 | | Status BetaRowsetWriter::create_segment_writer_for_segcompaction( |
1116 | 12 | std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { |
1117 | 12 | DCHECK(begin >= 0 && end >= 0); |
1118 | 12 | std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
1119 | 12 | _context.rowset_id, begin, end); |
1120 | 12 | io::FileWriterPtr file_writer; |
1121 | 12 | RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */)); |
1122 | | |
1123 | 12 | IndexFileWriterPtr index_file_writer; |
1124 | 12 | if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
1125 | 8 | io::FileWriterPtr idx_file_writer; |
1126 | 8 | std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path)); |
1127 | 8 | if (_context.tablet_schema->get_inverted_index_storage_format() != |
1128 | 8 | InvertedIndexStorageFormatPB::V1) { |
1129 | 8 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1130 | 8 | RETURN_IF_ERROR( |
1131 | 8 | _create_file_writer(index_path, idx_file_writer, true /* is_index_file */)); |
1132 | 8 | } |
1133 | 8 | index_file_writer = std::make_unique<IndexFileWriter>( |
1134 | 8 | _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, |
1135 | 8 | _context.tablet_schema->get_inverted_index_storage_format(), |
1136 | 8 | std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id); |
1137 | 8 | } |
1138 | | |
1139 | 12 | segment_v2::SegmentWriterOptions writer_options; |
1140 | 12 | writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; |
1141 | 12 | writer_options.rowset_ctx = &_context; |
1142 | 12 | writer_options.write_type = _context.write_type; |
1143 | 12 | writer_options.write_type = DataWriteType::TYPE_COMPACTION; |
1144 | 12 | writer_options.max_rows_per_segment = _context.max_rows_per_segment; |
1145 | 12 | writer_options.mow_ctx = _context.mow_context; |
1146 | | |
1147 | 12 | *writer = std::make_unique<segment_v2::SegmentWriter>( |
1148 | 12 | file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, |
1149 | 12 | _context.data_dir, writer_options, index_file_writer.get()); |
1150 | 12 | if (auto& seg_writer = _segcompaction_worker->get_file_writer(); |
1151 | 12 | seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { |
1152 | 0 | RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); |
1153 | 0 | } |
1154 | 12 | _segcompaction_worker->get_file_writer().reset(file_writer.release()); |
1155 | 12 | if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); |
1156 | 12 | idx_file_writer != nullptr) { |
1157 | 0 | RETURN_IF_ERROR(idx_file_writer->begin_close()); |
1158 | 0 | RETURN_IF_ERROR(idx_file_writer->finish_close()); |
1159 | 0 | } |
1160 | 12 | _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); |
1161 | 12 | return Status::OK(); |
1162 | 12 | } |
1163 | | |
1164 | 0 | Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1165 | 0 | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1166 | 0 | { segnum = dp->param("segnum", 1024); }); |
1167 | 0 | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1168 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1169 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " |
1170 | 0 | "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " |
1171 | 0 | "small or if the data is skewed.", |
1172 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1173 | 0 | config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows()); |
1174 | 0 | } |
1175 | 0 | return Status::OK(); |
1176 | 0 | } |
1177 | | |
1178 | 2.17k | Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1179 | 2.17k | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1180 | 2.17k | { segnum = dp->param("segnum", 1024); }); |
1181 | 2.17k | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1182 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1183 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " |
1184 | 0 | "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " |
1185 | 0 | "the bucket number is too small or if the data is skewed.", |
1186 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1187 | 0 | config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, |
1188 | 0 | _num_segcompacted, get_rowset_num_rows()); |
1189 | 0 | } |
1190 | 2.17k | return Status::OK(); |
1191 | 2.17k | } |
1192 | | |
1193 | 1.20k | Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1194 | 1.20k | uint32_t segid_offset = segment_id - _segment_start_id; |
1195 | 1.20k | { |
1196 | 1.20k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1197 | 1.20k | CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true); |
1198 | 1.20k | _segid_statistics_map.emplace(segment_id, segstat); |
1199 | 1.20k | if (segment_id >= _segment_num_rows.size()) { |
1200 | 1.20k | _segment_num_rows.resize(segment_id + 1); |
1201 | 1.20k | } |
1202 | 1.20k | _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num); |
1203 | 1.20k | } |
1204 | 1.20k | VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id |
1205 | 0 | << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size |
1206 | 0 | << " index_size:" << segstat.index_size; |
1207 | | |
1208 | 1.20k | { |
1209 | 1.20k | std::lock_guard<std::mutex> lock(_segment_set_mutex); |
1210 | 1.20k | _segment_set.add(segid_offset); |
1211 | 2.40k | while (_segment_set.contains(_num_segment)) { |
1212 | 1.20k | _num_segment++; |
1213 | 1.20k | } |
1214 | 1.20k | } |
1215 | | |
1216 | 1.20k | if (_context.mow_context != nullptr) { |
1217 | 63 | RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); |
1218 | 63 | } |
1219 | 1.20k | return Status::OK(); |
1220 | 1.20k | } |
1221 | | |
1222 | 1.20k | Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1223 | 1.20k | RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat)); |
1224 | 1.20k | return _segcompaction_if_necessary(); |
1225 | 1.20k | } |
1226 | | |
1227 | | Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( |
1228 | | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, |
1229 | 11 | KeyBoundsPB& key_bounds) { |
1230 | 11 | uint32_t segid = (*writer)->get_segment_id(); |
1231 | 11 | uint32_t row_num = (*writer)->row_count(); |
1232 | 11 | uint64_t segment_size; |
1233 | | |
1234 | 11 | auto s = (*writer)->finalize_footer(&segment_size); |
1235 | 11 | if (!s.ok()) { |
1236 | 0 | return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}", |
1237 | 0 | s.to_string()); |
1238 | 0 | } |
1239 | 11 | int64_t inverted_index_file_size = 0; |
1240 | 11 | RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size)); |
1241 | | |
1242 | 11 | SegmentStatistics segstat; |
1243 | 11 | segstat.row_num = row_num; |
1244 | 11 | segstat.data_size = segment_size; |
1245 | 11 | segstat.index_size = inverted_index_file_size; |
1246 | 11 | segstat.key_bounds = key_bounds; |
1247 | 11 | { |
1248 | 11 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1249 | 11 | CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); |
1250 | 11 | _segid_statistics_map.emplace(segid, segstat); |
1251 | 11 | } |
1252 | 11 | VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num |
1253 | 0 | << " data_size:" << PrettyPrinter::print_bytes(segment_size) |
1254 | 0 | << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size); |
1255 | | |
1256 | 11 | writer->reset(); |
1257 | | |
1258 | 11 | return Status::OK(); |
1259 | 11 | } |
1260 | | |
1261 | | } // namespace doris |