be/src/storage/rowset/beta_rowset_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset_writer.h" |
19 | | |
20 | | #include <assert.h> |
21 | | // IWYU pragma: no_include <bthread/errno.h> |
22 | | #include <errno.h> // IWYU pragma: keep |
23 | | #include <fmt/format.h> |
24 | | #include <stdio.h> |
25 | | |
26 | | #include <chrono> |
27 | | #include <ctime> // time |
28 | | #include <filesystem> |
29 | | #include <memory> |
30 | | #include <mutex> |
31 | | #include <sstream> |
32 | | #include <thread> |
33 | | #include <utility> |
34 | | #include <vector> |
35 | | |
36 | | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
37 | | #include "common/cast_set.h" |
38 | | #include "common/compiler_util.h" // IWYU pragma: keep |
39 | | #include "common/config.h" |
40 | | #include "common/logging.h" |
41 | | #include "common/status.h" |
42 | | #include "core/block/block.h" |
43 | | #include "core/column/column.h" |
44 | | #include "core/data_type/data_type_factory.hpp" |
45 | | #include "io/fs/file_reader.h" |
46 | | #include "io/fs/file_system.h" |
47 | | #include "io/fs/file_writer.h" |
48 | | #include "runtime/thread_context.h" |
49 | | #include "storage/index/inverted/inverted_index_cache.h" |
50 | | #include "storage/index/inverted/inverted_index_desc.h" |
51 | | #include "storage/olap_define.h" |
52 | | #include "storage/rowset/beta_rowset.h" |
53 | | #include "storage/rowset/rowset_factory.h" |
54 | | #include "storage/rowset/rowset_writer.h" |
55 | | #include "storage/rowset/segcompaction.h" |
56 | | #include "storage/schema_change/schema_change.h" |
57 | | #include "storage/segment/segment.h" |
58 | | #include "storage/segment/segment_writer.h" |
59 | | #include "storage/storage_engine.h" |
60 | | #include "storage/tablet/tablet_schema.h" |
61 | | #include "util/debug_points.h" |
62 | | #include "util/pretty_printer.h" |
63 | | #include "util/slice.h" |
64 | | #include "util/stopwatch.hpp" |
65 | | #include "util/time.h" |
66 | | |
67 | | namespace doris { |
68 | | using namespace ErrorCode; |
69 | | |
70 | | namespace { |
71 | | |
72 | 702 | bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { |
73 | 702 | std::string_view last; |
74 | 1.78k | for (auto&& segment_encode_key : segments_encoded_key_bounds) { |
75 | 1.78k | auto&& cur_min = segment_encode_key.min_key(); |
76 | 1.78k | auto&& cur_max = segment_encode_key.max_key(); |
77 | 1.78k | if (cur_min <= last) { |
78 | 258 | return true; |
79 | 258 | } |
80 | 1.53k | last = cur_max; |
81 | 1.53k | } |
82 | 444 | return false; |
83 | 702 | } |
84 | | |
85 | 1.21k | bool truncate_key_bounds(KeyBoundsPB* key_bounds) { |
86 | 1.21k | DCHECK(key_bounds != nullptr); |
87 | 1.21k | if (config::random_segments_key_bounds_truncation) { |
88 | 0 | return false; |
89 | 0 | } |
90 | 1.21k | const int32_t truncation_threshold = config::segments_key_bounds_truncation_threshold; |
91 | 1.21k | if (truncation_threshold <= 0) { |
92 | 355 | return false; |
93 | 355 | } |
94 | 862 | const size_t truncation_size = cast_set<size_t>(truncation_threshold); |
95 | | |
96 | 862 | bool truncated = false; |
97 | 862 | if (key_bounds->min_key().size() > truncation_size) { |
98 | 700 | key_bounds->mutable_min_key()->resize(truncation_size); |
99 | 700 | truncated = true; |
100 | 700 | } |
101 | 862 | if (key_bounds->max_key().size() > truncation_size) { |
102 | 695 | key_bounds->mutable_max_key()->resize(truncation_size); |
103 | 695 | truncated = true; |
104 | 695 | } |
105 | 862 | return truncated; |
106 | 1.21k | } |
107 | | |
108 | | void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, |
109 | 24 | const RowsetMeta& spec_rowset_meta) { |
110 | 24 | rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); |
111 | 24 | rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); |
112 | 24 | rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size()); |
113 | 24 | rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); |
114 | | // TODO write zonemap to meta |
115 | 24 | rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); |
116 | 24 | rowset_meta.set_creation_time(time(nullptr)); |
117 | 24 | rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); |
118 | 24 | rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); |
119 | 24 | rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); |
120 | 24 | rowset_meta.set_segments_key_bounds_truncated( |
121 | 24 | spec_rowset_meta.is_segments_key_bounds_truncated()); |
122 | 24 | rowset_meta.set_db_id(spec_rowset_meta.db_id()); |
123 | 24 | rowset_meta.set_table_id(spec_rowset_meta.table_id()); |
124 | 24 | std::vector<KeyBoundsPB> segments_key_bounds; |
125 | 24 | spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); |
126 | | // Preserve source layout: if source was aggregated (size 1), re-aggregating |
127 | | // the single entry is a no-op that also keeps the flag consistent. |
128 | 24 | rowset_meta.set_segments_key_bounds(segments_key_bounds, |
129 | 24 | spec_rowset_meta.is_segments_key_bounds_aggregated()); |
130 | 24 | std::vector<uint32_t> num_segment_rows; |
131 | 24 | spec_rowset_meta.get_num_segment_rows(&num_segment_rows); |
132 | 24 | rowset_meta.set_num_segment_rows(num_segment_rows); |
133 | 24 | if (spec_rowset_meta.is_row_binlog()) { |
134 | 0 | rowset_meta.mark_row_binlog(); |
135 | 0 | } |
136 | 24 | } |
137 | | |
138 | | } // namespace |
139 | | |
140 | 1.01k | SegmentFileCollection::~SegmentFileCollection() = default; |
141 | | |
142 | 2.38k | Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { |
143 | 2.38k | std::lock_guard lock(_lock); |
144 | 2.38k | if (_closed) [[unlikely]] { |
145 | 0 | DCHECK(false) << writer->path(); |
146 | 0 | return Status::InternalError("add to closed SegmentFileCollection"); |
147 | 0 | } |
148 | | |
149 | 2.38k | _file_writers.emplace(seg_id, std::move(writer)); |
150 | 2.38k | return Status::OK(); |
151 | 2.38k | } |
152 | | |
153 | 63 | io::FileWriter* SegmentFileCollection::get(int seg_id) const { |
154 | 63 | std::lock_guard lock(_lock); |
155 | 63 | if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { |
156 | 63 | return it->second.get(); |
157 | 63 | } else { |
158 | 0 | return nullptr; |
159 | 0 | } |
160 | 63 | } |
161 | | |
162 | 982 | Status SegmentFileCollection::close() { |
163 | 982 | { |
164 | 982 | std::lock_guard lock(_lock); |
165 | 982 | if (_closed) [[unlikely]] { |
166 | 0 | DCHECK(false); |
167 | 0 | return Status::InternalError("double close SegmentFileCollection"); |
168 | 0 | } |
169 | 982 | _closed = true; |
170 | 982 | } |
171 | | |
172 | 2.38k | for (auto&& [_, writer] : _file_writers) { |
173 | 2.38k | DBUG_EXECUTE_IF("SegmentFileCollection.close.wait_dat_closed", { |
174 | 2.38k | auto before_state = writer->state(); |
175 | 2.38k | for (int i = 0; i < 3000 && writer->state() != io::FileWriter::State::CLOSED; ++i) { |
176 | 2.38k | std::this_thread::sleep_for(std::chrono::milliseconds(10)); |
177 | 2.38k | } |
178 | 2.38k | LOG(INFO) << "SegmentFileCollection.close.wait_dat_closed path=" |
179 | 2.38k | << writer->path().native() |
180 | 2.38k | << " before_state=" << static_cast<int>(before_state) |
181 | 2.38k | << " after_state=" << static_cast<int>(writer->state()); |
182 | 2.38k | }); |
183 | 2.38k | if (writer->state() != io::FileWriter::State::CLOSED) { |
184 | 2.32k | RETURN_IF_ERROR(writer->close()); |
185 | 2.32k | } |
186 | 2.38k | } |
187 | | |
188 | 982 | return Status::OK(); |
189 | 982 | } |
190 | | |
191 | 0 | Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) { |
192 | 0 | std::lock_guard lock(_lock); |
193 | 0 | if (!_closed) [[unlikely]] { |
194 | 0 | DCHECK(false); |
195 | 0 | return ResultError(Status::InternalError("get segments file size without closed")); |
196 | 0 | } |
197 | | |
198 | 0 | Status st; |
199 | 0 | std::vector<size_t> seg_file_size(_file_writers.size(), 0); |
200 | 0 | bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { |
201 | 0 | auto&& [seg_id, writer] = it; |
202 | |
|
203 | 0 | int idx = seg_id - seg_id_offset; |
204 | 0 | if (idx >= seg_file_size.size()) [[unlikely]] { |
205 | 0 | auto err_msg = fmt::format( |
206 | 0 | "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, |
207 | 0 | seg_file_size.size(), seg_id_offset, writer->path().native()); |
208 | 0 | DCHECK(false) << err_msg; |
209 | 0 | st = Status::InternalError(err_msg); |
210 | 0 | return false; |
211 | 0 | } |
212 | | |
213 | 0 | auto& fsize = seg_file_size[idx]; |
214 | 0 | if (fsize != 0) { |
215 | | // File size should not been set |
216 | 0 | auto err_msg = |
217 | 0 | fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); |
218 | 0 | DCHECK(false) << err_msg; |
219 | 0 | st = Status::InternalError(err_msg); |
220 | 0 | return false; |
221 | 0 | } |
222 | | |
223 | 0 | fsize = writer->bytes_appended(); |
224 | 0 | if (fsize <= 0) { |
225 | 0 | auto err_msg = |
226 | 0 | fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); |
227 | 0 | DCHECK(false) << err_msg; |
228 | 0 | st = Status::InternalError(err_msg); |
229 | 0 | return false; |
230 | 0 | } |
231 | | |
232 | 0 | return true; |
233 | 0 | }); |
234 | |
|
235 | 0 | if (succ) { |
236 | 0 | return seg_file_size; |
237 | 0 | } |
238 | | |
239 | 0 | return ResultError(st); |
240 | 0 | } |
241 | | |
242 | 1.01k | InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; |
243 | | |
244 | 263 | Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) { |
245 | 263 | std::lock_guard lock(_lock); |
246 | 263 | if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end()) |
247 | 0 | [[unlikely]] { |
248 | 0 | DCHECK(false); |
249 | 0 | return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id); |
250 | 0 | } |
251 | 263 | _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); |
252 | 263 | return Status::OK(); |
253 | 263 | } |
254 | | |
255 | 328 | Status InvertedIndexFileCollection::begin_close() { |
256 | 328 | std::lock_guard lock(_lock); |
257 | 328 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
258 | 36 | RETURN_IF_ERROR(writer->begin_close()); |
259 | 36 | _total_size += writer->get_index_file_total_size(); |
260 | 36 | } |
261 | | |
262 | 328 | return Status::OK(); |
263 | 328 | } |
264 | | |
265 | 982 | Status InvertedIndexFileCollection::finish_close() { |
266 | 982 | std::lock_guard lock(_lock); |
267 | 982 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
268 | 262 | RETURN_IF_ERROR(writer->finish_close()); |
269 | 262 | } |
270 | 982 | return Status::OK(); |
271 | 982 | } |
272 | | |
273 | | Result<std::vector<const InvertedIndexFileInfo*>> |
274 | 106 | InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { |
275 | 106 | std::lock_guard lock(_lock); |
276 | | |
277 | 106 | Status st; |
278 | 106 | std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size()); |
279 | 106 | bool succ = std::all_of( |
280 | 106 | _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), |
281 | 213 | [&](auto&& it) { |
282 | 213 | auto&& [seg_id, writer] = it; |
283 | | |
284 | 213 | int idx = seg_id - seg_id_offset; |
285 | 213 | if (idx >= idx_file_info.size()) [[unlikely]] { |
286 | 0 | auto err_msg = |
287 | 0 | fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", |
288 | 0 | seg_id, idx_file_info.size(), seg_id_offset); |
289 | 0 | DCHECK(false) << err_msg; |
290 | 0 | st = Status::InternalError(err_msg); |
291 | 0 | return false; |
292 | 0 | } |
293 | 213 | idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); |
294 | 213 | return true; |
295 | 213 | }); |
296 | | |
297 | 106 | if (succ) { |
298 | 106 | return idx_file_info; |
299 | 106 | } |
300 | | |
301 | 0 | return ResultError(st); |
302 | 106 | } |
303 | | |
304 | | BaseBetaRowsetWriter::BaseBetaRowsetWriter() |
305 | 1.01k | : _num_segment(0), |
306 | 1.01k | _segment_start_id(0), |
307 | 1.01k | _num_rows_written(0), |
308 | 1.01k | _total_data_size(0), |
309 | 1.01k | _total_index_size(0), |
310 | 1.01k | _segment_creator(_context, _seg_files, _idx_files) {} |
311 | | |
312 | | BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) |
313 | 1.01k | : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} |
314 | | |
315 | 9 | RowBinlogRowsetWriter::RowBinlogRowsetWriter(StorageEngine& engine) : BetaRowsetWriter(engine) {} |
316 | | |
317 | 1.01k | BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { |
318 | 1.01k | if (!_already_built && _rowset_meta->is_local()) { |
319 | | // abnormal exit, remove all files generated |
320 | 10 | auto& fs = io::global_local_filesystem(); |
321 | 12 | for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { |
322 | 2 | std::string seg_path = |
323 | 2 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i); |
324 | | // Even if an error is encountered, these files that have not been cleaned up |
325 | | // will be cleaned up by the GC background. So here we only print the error |
326 | | // message when we encounter an error. |
327 | 2 | WARN_IF_ERROR(fs->delete_file(seg_path), |
328 | 2 | fmt::format("Failed to delete file={}", seg_path)); |
329 | 2 | } |
330 | 10 | } |
331 | 1.01k | if (_calc_delete_bitmap_token) { |
332 | 9 | _calc_delete_bitmap_token->cancel(); |
333 | 9 | } |
334 | 1.01k | } |
335 | | |
336 | 1.01k | BetaRowsetWriter::~BetaRowsetWriter() { |
337 | | /* Note that segcompaction is async and in parallel with load job. So we should handle carefully |
338 | | * when the job is cancelled. Although it is meaningless to continue segcompaction when the job |
339 | | * is cancelled, the objects involved in the job should be preserved during segcompaction to |
340 | | * avoid crashs for memory issues. */ |
341 | 1.01k | WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); |
342 | 1.01k | } |
343 | | |
344 | 1.01k | Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
345 | 1.01k | _context = rowset_writer_context; |
346 | 1.01k | DCHECK(_context.tablet_schema != nullptr); |
347 | 1.01k | _rowset_meta.reset(new RowsetMeta); |
348 | 1.01k | if (_context.storage_resource) { |
349 | 0 | _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
350 | 0 | } |
351 | 1.01k | _rowset_meta->set_rowset_id(_context.rowset_id); |
352 | 1.01k | _rowset_meta->set_partition_id(_context.partition_id); |
353 | 1.01k | _rowset_meta->set_tablet_id(_context.tablet_id); |
354 | 1.01k | _rowset_meta->set_db_id(_context.db_id); |
355 | 1.01k | _rowset_meta->set_table_id(_context.table_id); |
356 | 1.01k | _rowset_meta->set_index_id(_context.index_id); |
357 | 1.01k | _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
358 | 1.01k | _rowset_meta->set_rowset_type(_context.rowset_type); |
359 | 1.01k | _rowset_meta->set_rowset_state(_context.rowset_state); |
360 | 1.01k | _rowset_meta->set_segments_overlap(_context.segments_overlap); |
361 | 1.01k | if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
362 | 36 | _is_pending = true; |
363 | 36 | _rowset_meta->set_txn_id(_context.txn_id); |
364 | 36 | _rowset_meta->set_load_id(_context.load_id); |
365 | 979 | } else { |
366 | 979 | _rowset_meta->set_version(_context.version); |
367 | 979 | _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
368 | 979 | } |
369 | 1.01k | _rowset_meta->set_tablet_uid(_context.tablet_uid); |
370 | 1.01k | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
371 | 1.01k | if (_context.write_binlog_opt().enable) { |
372 | 9 | _rowset_meta->mark_row_binlog(); |
373 | 9 | } |
374 | 1.01k | _rowset_meta->set_compaction_level(_context.compaction_level); |
375 | 1.01k | _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
376 | 1.01k | _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
377 | 1.01k | return Status::OK(); |
378 | 1.01k | } |
379 | | |
380 | 1.36k | Status BaseBetaRowsetWriter::add_block(const Block* block) { |
381 | 1.36k | return _segment_creator.add_block(block); |
382 | 1.36k | } |
383 | | |
384 | 63 | Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { |
385 | 63 | SCOPED_RAW_TIMER(&_delete_bitmap_ns); |
386 | 63 | if (_context.is_transient_rowset_writer || |
387 | 63 | !_context.tablet->enable_unique_key_merge_on_write() || |
388 | 63 | (_context.partial_update_info && _context.partial_update_info->is_partial_update())) { |
389 | 0 | return Status::OK(); |
390 | 0 | } |
391 | 63 | std::vector<RowsetSharedPtr> specified_rowsets; |
392 | 63 | { |
393 | 63 | std::shared_lock meta_rlock(_context.tablet->get_header_lock()); |
394 | 63 | specified_rowsets = |
395 | 63 | _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get()); |
396 | 63 | } |
397 | | |
398 | | // Submit the entire delete bitmap calculation process to thread pool for async execution |
399 | | // This avoids blocking memtable flush thread while waiting for file upload to complete |
400 | | // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap |
401 | 63 | return _calc_delete_bitmap_token->submit_func( |
402 | 63 | [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status { |
403 | 63 | Status st = Status::OK(); |
404 | | // Step 1: Close file_writer (must be done before load_segments) |
405 | 63 | auto* file_writer = _seg_files.get(segment_id); |
406 | 63 | if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) { |
407 | 63 | MonotonicStopWatch close_timer; |
408 | 63 | close_timer.start(); |
409 | 63 | st = file_writer->close(); |
410 | 63 | close_timer.stop(); |
411 | | |
412 | 63 | auto close_time_ms = close_timer.elapsed_time_milliseconds(); |
413 | 63 | if (close_time_ms > 1000) { |
414 | 0 | LOG(INFO) << "file_writer->close() took " << close_time_ms |
415 | 0 | << "ms for segment_id=" << segment_id |
416 | 0 | << ", tablet_id=" << _context.tablet_id |
417 | 0 | << ", rowset_id=" << _context.rowset_id; |
418 | 0 | } |
419 | 63 | if (!st.ok()) { |
420 | 0 | return st; |
421 | 0 | } |
422 | 63 | } |
423 | | |
424 | 63 | OlapStopWatch watch; |
425 | | // Step 2: Build tmp rowset (needs file_writer to be closed) |
426 | 63 | RowsetSharedPtr rowset_ptr; |
427 | 63 | st = _build_tmp(rowset_ptr); |
428 | 63 | if (!st.ok()) { |
429 | 0 | return st; |
430 | 0 | } |
431 | | |
432 | | // Step 3: Load segments (needs file_writer to be closed and rowset to be built) |
433 | 63 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
434 | 63 | std::vector<segment_v2::SegmentSharedPtr> segments; |
435 | 63 | st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments); |
436 | 63 | if (!st.ok()) { |
437 | 0 | return st; |
438 | 0 | } |
439 | | |
440 | | // Step 4: Calculate delete bitmap |
441 | 63 | st = BaseTablet::calc_delete_bitmap( |
442 | 63 | _context.tablet, rowset_ptr, segments, specified_rowsets, |
443 | 63 | _context.mow_context->delete_bitmap, _context.mow_context->max_version, |
444 | 63 | nullptr, nullptr, nullptr); |
445 | 63 | if (!st.ok()) { |
446 | 0 | return st; |
447 | 0 | } |
448 | | |
449 | 63 | size_t total_rows = |
450 | 63 | std::accumulate(segments.begin(), segments.end(), 0, |
451 | 63 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { |
452 | 63 | return sum += s->num_rows(); |
453 | 63 | }); |
454 | 63 | LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " |
455 | 63 | << _context.tablet->tablet_id() |
456 | 63 | << ", rowset_ids: " << _context.mow_context->rowset_ids->size() |
457 | 63 | << ", cur max_version: " << _context.mow_context->max_version |
458 | 63 | << ", transaction_id: " << _context.mow_context->txn_id |
459 | 63 | << ", delete_bitmap_count: " |
460 | 63 | << _context.mow_context->delete_bitmap->get_delete_bitmap_count() |
461 | 63 | << ", delete_bitmap_cardinality: " |
462 | 63 | << _context.mow_context->delete_bitmap->cardinality() |
463 | 63 | << ", cost: " << watch.get_elapse_time_us() |
464 | 63 | << "(us), total rows: " << total_rows; |
465 | 63 | return Status::OK(); |
466 | 63 | }); |
467 | 63 | } |
468 | | |
469 | 1.01k | Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
470 | 1.01k | RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); |
471 | 1.01k | if (_segcompaction_worker) { |
472 | 1.01k | _segcompaction_worker->init_mem_tracker(rowset_writer_context); |
473 | 1.01k | } |
474 | 1.01k | if (_context.mow_context != nullptr) { |
475 | 9 | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
476 | 9 | } |
477 | 1.01k | return Status::OK(); |
478 | 1.01k | } |
479 | | |
480 | | Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, |
481 | 77 | int32_t segment_id) { |
482 | 77 | DCHECK(_rowset_meta->is_local()); |
483 | 77 | auto fs = _rowset_meta->fs(); |
484 | 77 | if (!fs) { |
485 | 0 | return Status::Error<INIT_FAILED>( |
486 | 0 | "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); |
487 | 0 | } |
488 | 77 | auto path = |
489 | 77 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id); |
490 | 77 | io::FileReaderOptions reader_options { |
491 | 77 | .cache_type = |
492 | 77 | _context.write_file_cache |
493 | 77 | ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
494 | 0 | : io::FileCachePolicy::NO_CACHE) |
495 | 77 | : io::FileCachePolicy::NO_CACHE, |
496 | 77 | .is_doris_table = true, |
497 | 77 | .cache_base_path {}, |
498 | 77 | .tablet_id = _rowset_meta->tablet_id(), |
499 | 77 | }; |
500 | 77 | auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), |
501 | 77 | _context.tablet_schema, reader_options, &segment); |
502 | 77 | if (!s.ok()) { |
503 | 0 | LOG(WARNING) << "failed to open segment. " << path << ":" << s; |
504 | 0 | return s; |
505 | 0 | } |
506 | 77 | return Status::OK(); |
507 | 77 | } |
508 | | |
509 | | /* policy of segcompaction target selection: |
510 | | * 1. skip big segments |
511 | | * 2. if the consecutive smalls end up with a big, compact the smalls, except |
512 | | * single small |
513 | | * 3. if the consecutive smalls end up with small, compact the smalls if the |
514 | | * length is beyond (config::segcompaction_batch_size / 2) |
515 | | */ |
516 | | Status BetaRowsetWriter::_find_longest_consecutive_small_segment( |
517 | 15 | SegCompactionCandidatesSharedPtr& segments) { |
518 | 15 | segments = std::make_shared<SegCompactionCandidates>(); |
519 | | // skip last (maybe active) segment |
520 | 15 | int32_t last_segment = _num_segment - 1; |
521 | 15 | size_t task_bytes = 0; |
522 | 15 | uint32_t task_rows = 0; |
523 | 15 | int32_t segid; |
524 | 15 | for (segid = _segcompacted_point; |
525 | 86 | segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { |
526 | 77 | segment_v2::SegmentSharedPtr segment; |
527 | 77 | RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); |
528 | 77 | const auto segment_rows = segment->num_rows(); |
529 | 77 | const auto segment_bytes = segment->file_reader()->size(); |
530 | 77 | bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || |
531 | 77 | segment_bytes > config::segcompaction_candidate_max_bytes; |
532 | 77 | if (is_large_segment) { |
533 | 14 | if (segid == _segcompacted_point) { |
534 | | // skip large segments at the front |
535 | 8 | auto dst_seg_id = _num_segcompacted.load(); |
536 | 8 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
537 | 8 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
538 | 4 | _segcompaction_worker->convert_segment_delete_bitmap( |
539 | 4 | _context.mow_context->delete_bitmap, segid, dst_seg_id); |
540 | 4 | } |
541 | 8 | continue; |
542 | 8 | } else { |
543 | | // stop because we need consecutive segments |
544 | 6 | break; |
545 | 6 | } |
546 | 14 | } |
547 | 63 | bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || |
548 | 63 | task_bytes + segment_bytes > config::segcompaction_task_max_bytes; |
549 | 63 | if (is_task_full) { |
550 | 0 | break; |
551 | 0 | } |
552 | 63 | segments->push_back(segment); |
553 | 63 | task_rows += segment->num_rows(); |
554 | 63 | task_bytes += segment->file_reader()->size(); |
555 | 63 | } |
556 | 15 | size_t s = segments->size(); |
557 | 15 | if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { |
558 | | // we didn't collect enough segments, better to do it in next |
559 | | // round to compact more at once |
560 | 0 | segments->clear(); |
561 | 0 | return Status::OK(); |
562 | 0 | } |
563 | 15 | if (s == 1) { // poor bachelor, let it go |
564 | 4 | VLOG_DEBUG << "only one candidate segment"; |
565 | 4 | auto src_seg_id = _segcompacted_point.load(); |
566 | 4 | auto dst_seg_id = _num_segcompacted.load(); |
567 | 4 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
568 | 4 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
569 | 2 | _segcompaction_worker->convert_segment_delete_bitmap( |
570 | 2 | _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); |
571 | 2 | } |
572 | 4 | segments->clear(); |
573 | 4 | return Status::OK(); |
574 | 4 | } |
575 | 11 | if (VLOG_DEBUG_IS_ON) { |
576 | 0 | vlog_buffer.clear(); |
577 | 0 | for (auto& segment : (*segments.get())) { |
578 | 0 | fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); |
579 | 0 | } |
580 | 0 | VLOG_DEBUG << "candidate segments num:" << s |
581 | 0 | << " list of candidates:" << fmt::to_string(vlog_buffer); |
582 | 0 | } |
583 | 11 | return Status::OK(); |
584 | 15 | } |
585 | | |
586 | 11 | Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { |
587 | 11 | int ret; |
588 | 11 | auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
589 | 11 | _context.rowset_id, begin, end); |
590 | 11 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
591 | 11 | _num_segcompacted); |
592 | 11 | ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
593 | 11 | if (ret) { |
594 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
595 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
596 | 0 | errno); |
597 | 0 | } |
598 | 11 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
599 | | |
600 | | // rename inverted index files |
601 | 11 | RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0)); |
602 | | |
603 | 11 | _num_segcompacted++; |
604 | 11 | return Status::OK(); |
605 | 11 | } |
606 | | |
607 | | void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin, |
608 | 43 | uint32_t end) { |
609 | 43 | VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; |
610 | 134 | for (uint32_t i = begin; i <= end; ++i) { |
611 | 91 | _segid_statistics_map.erase(i); |
612 | 91 | } |
613 | 43 | } |
614 | | |
615 | 42 | Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) { |
616 | 42 | if (seg_id == _num_segcompacted) { |
617 | 10 | ++_num_segcompacted; |
618 | 10 | return Status::OK(); |
619 | 10 | } |
620 | | |
621 | 32 | auto src_seg_path = |
622 | 32 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id); |
623 | 32 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
624 | 32 | _num_segcompacted); |
625 | 32 | VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " |
626 | 0 | << dst_seg_path; |
627 | 32 | { |
628 | 32 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
629 | 32 | DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); |
630 | 32 | DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), |
631 | 32 | true); |
632 | 32 | auto org = _segid_statistics_map[seg_id]; |
633 | 32 | _segid_statistics_map.emplace(_num_segcompacted, org); |
634 | 32 | _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); |
635 | 32 | } |
636 | 32 | int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
637 | 32 | if (ret) { |
638 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
639 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
640 | 0 | errno); |
641 | 0 | } |
642 | | |
643 | 32 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
644 | | // rename remaining inverted index files |
645 | 32 | RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id)); |
646 | | |
647 | 32 | ++_num_segcompacted; |
648 | 32 | return Status::OK(); |
649 | 32 | } |
650 | | |
651 | | Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id, |
652 | 43 | const std::string& segment_path) { |
653 | 43 | auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
654 | 43 | if (!footer_page_cache) { |
655 | 0 | return Status::OK(); |
656 | 0 | } |
657 | | |
658 | 43 | auto fs = _rowset_meta->fs(); |
659 | 43 | bool exists = false; |
660 | 43 | RETURN_IF_ERROR(fs->exists(segment_path, &exists)); |
661 | 43 | if (exists) { |
662 | 43 | io::FileReaderSPtr file_reader; |
663 | 43 | io::FileReaderOptions reader_options { |
664 | 43 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
665 | 43 | : io::FileCachePolicy::NO_CACHE, |
666 | 43 | .is_doris_table = true, |
667 | 43 | .cache_base_path = "", |
668 | 43 | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
669 | 43 | .tablet_id = _rowset_meta->tablet_id(), |
670 | 43 | }; |
671 | 43 | RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options)); |
672 | 43 | DCHECK(file_reader != nullptr); |
673 | 43 | auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader); |
674 | 43 | footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE); |
675 | 43 | } |
676 | 43 | return Status::OK(); |
677 | 43 | } |
678 | | |
679 | 43 | Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) { |
680 | 43 | int ret; |
681 | | |
682 | 43 | auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path, |
683 | 32 | _context.rowset_id.to_string(), seg_id) |
684 | 43 | : BetaRowset::local_segment_path_segcompacted( |
685 | 11 | _context.tablet_path, _context.rowset_id, begin, end); |
686 | 43 | auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path); |
687 | 43 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
688 | 43 | _num_segcompacted); |
689 | 43 | auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path); |
690 | | |
691 | 43 | if (_context.tablet_schema->get_inverted_index_storage_format() >= |
692 | 43 | InvertedIndexStorageFormatPB::V2) { |
693 | 22 | if (_context.tablet_schema->has_inverted_index() || |
694 | 22 | _context.tablet_schema->has_ann_index()) { |
695 | 22 | auto src_idx_path = |
696 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix); |
697 | 22 | auto dst_idx_path = |
698 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix); |
699 | | |
700 | 22 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
701 | 22 | if (ret) { |
702 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
703 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path, |
704 | 0 | ret, errno); |
705 | 0 | } |
706 | 22 | } |
707 | 22 | } |
708 | | // rename remaining inverted index files |
709 | 150 | for (auto column : _context.tablet_schema->columns()) { |
710 | 150 | auto index_infos = _context.tablet_schema->inverted_indexs(*column); |
711 | 150 | for (const auto& index_info : index_infos) { |
712 | 44 | auto index_id = index_info->index_id(); |
713 | 44 | if (_context.tablet_schema->get_inverted_index_storage_format() == |
714 | 44 | InvertedIndexStorageFormatPB::V1) { |
715 | 0 | auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
716 | 0 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
717 | 0 | auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
718 | 0 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
719 | 0 | VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to " |
720 | 0 | << dst_idx_path; |
721 | 0 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
722 | 0 | if (ret) { |
723 | 0 | return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>( |
724 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, |
725 | 0 | dst_idx_path, ret, errno); |
726 | 0 | } |
727 | 0 | } |
728 | | // Erase the origin index file cache |
729 | 44 | auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
730 | 44 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
731 | 44 | auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
732 | 44 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
733 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key)); |
734 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key)); |
735 | 44 | } |
736 | 150 | } |
737 | 43 | return Status::OK(); |
738 | 43 | } |
739 | | |
740 | 1.20k | Status BetaRowsetWriter::_segcompaction_if_necessary() { |
741 | 1.20k | Status status = Status::OK(); |
742 | | // if not doing segcompaction, just check segment number |
743 | 1.20k | if (!config::enable_segcompaction || !_context.enable_segcompaction || |
744 | 1.20k | _context.tablet_schema->num_variant_columns() > 0) { |
745 | 1.08k | return _check_segment_number_limit(_num_segment); |
746 | 1.08k | } |
747 | | // leave _is_doing_segcompaction as the last condition |
748 | | // otherwise _segcompacting_cond will never get notified |
749 | 121 | if (_is_doing_segcompaction.exchange(true)) { |
750 | 0 | return status; |
751 | 0 | } |
752 | 121 | if (_segcompaction_status.load() != OK) { |
753 | 0 | status = Status::Error<SEGCOMPACTION_FAILED>( |
754 | 0 | "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", |
755 | 0 | _segcompaction_status.load()); |
756 | 121 | } else { |
757 | 121 | status = _check_segment_number_limit(_num_segcompacted); |
758 | 121 | } |
759 | 121 | if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { |
760 | 15 | SegCompactionCandidatesSharedPtr segments; |
761 | 15 | status = _find_longest_consecutive_small_segment(segments); |
762 | 15 | if (LIKELY(status.ok()) && (!segments->empty())) { |
763 | 11 | LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id |
764 | 11 | << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment |
765 | 11 | << ", segcompacted_point:" << _segcompacted_point; |
766 | 11 | status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); |
767 | 11 | if (status.ok()) { |
768 | 11 | return status; |
769 | 11 | } |
770 | 11 | } |
771 | 15 | } |
772 | 110 | { |
773 | 110 | std::lock_guard lk(_is_doing_segcompaction_lock); |
774 | 110 | _is_doing_segcompaction = false; |
775 | 110 | _segcompacting_cond.notify_all(); |
776 | 110 | } |
777 | 110 | return status; |
778 | 121 | } |
779 | | |
780 | 654 | Status BetaRowsetWriter::_segcompaction_rename_last_segments() { |
781 | 654 | DCHECK_EQ(_is_doing_segcompaction, false); |
782 | 654 | if (!config::enable_segcompaction) { |
783 | 144 | return Status::OK(); |
784 | 144 | } |
785 | 510 | if (_segcompaction_status.load() != OK) { |
786 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
787 | 0 | "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error " |
788 | 0 | "code: {}", |
789 | 0 | _segcompaction_status.load()); |
790 | 0 | } |
791 | 510 | if (!is_segcompacted() || _segcompacted_point == _num_segment) { |
792 | | // no need if never segcompact before or all segcompacted |
793 | 501 | return Status::OK(); |
794 | 501 | } |
795 | | // currently we only rename remaining segments to reduce wait time |
796 | | // so that transaction can be committed ASAP |
797 | 9 | VLOG_DEBUG << "segcompaction last few segments"; |
798 | 39 | for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { |
799 | 30 | auto dst_segid = _num_segcompacted.load(); |
800 | 30 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
801 | 30 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
802 | 16 | _segcompaction_worker->convert_segment_delete_bitmap( |
803 | 16 | _context.mow_context->delete_bitmap, segid, dst_segid); |
804 | 16 | } |
805 | 30 | } |
806 | 9 | return Status::OK(); |
807 | 9 | } |
808 | | |
809 | 9 | Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { |
810 | 9 | assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); |
811 | 9 | RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); |
812 | 9 | _num_rows_written += rowset->num_rows(); |
813 | 9 | const auto& rowset_meta = rowset->rowset_meta(); |
814 | 9 | auto index_size = rowset_meta->index_disk_size(); |
815 | 9 | auto total_size = rowset_meta->total_disk_size(); |
816 | 9 | auto data_size = rowset_meta->data_disk_size(); |
817 | | // corrupted index size caused by bug before 2.1.5 or 3.0.0 version |
818 | | // try to get real index size from disk. |
819 | 9 | if (index_size < 0 || index_size > total_size * 2) { |
820 | 0 | LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size |
821 | 0 | << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id() |
822 | 0 | << " rowset:" << rowset_meta->rowset_id(); |
823 | 0 | index_size = 0; |
824 | 0 | auto st = rowset->get_inverted_index_size(&index_size); |
825 | 0 | if (!st.ok()) { |
826 | 0 | if (!st.is<NOT_FOUND>()) { |
827 | 0 | LOG(ERROR) << "failed to get inverted index size. res=" << st; |
828 | 0 | return st; |
829 | 0 | } |
830 | 0 | } |
831 | 0 | } |
832 | 9 | _total_data_size += data_size; |
833 | 9 | _total_index_size += index_size; |
834 | 9 | _num_segment += cast_set<int32_t>(rowset->num_segments()); |
835 | | // append key_bounds to current rowset |
836 | 9 | RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); |
837 | 9 | rowset->get_num_segment_rows(&_segment_num_rows); |
838 | 9 | _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); |
839 | | |
840 | | // TODO update zonemap |
841 | 9 | if (rowset->rowset_meta()->has_delete_predicate()) { |
842 | 0 | _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate()); |
843 | 0 | } |
844 | | // Update the tablet schema in the rowset metadata if the tablet schema contains a variant. |
845 | | // During the build process, _context.tablet_schema will be used as the rowset schema. |
846 | | // This situation may arise in the event of a linked schema change. If this schema is not set, |
847 | | // the subcolumns of the variant will be lost. |
848 | 9 | if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) { |
849 | 0 | _context.tablet_schema = rowset->tablet_schema(); |
850 | 0 | } |
851 | 9 | return Status::OK(); |
852 | 9 | } |
853 | | |
854 | 0 | Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { |
855 | | // TODO use schema_mapping to transfer zonemap |
856 | 0 | return add_rowset(rowset); |
857 | 0 | } |
858 | | |
859 | 1.15k | Status BaseBetaRowsetWriter::flush() { |
860 | 1.15k | return _segment_creator.flush(); |
861 | 1.15k | } |
862 | | |
863 | 12 | Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) { |
864 | 12 | if (block->rows() == 0) { |
865 | 0 | return Status::OK(); |
866 | 0 | } |
867 | | |
868 | 12 | { |
869 | 12 | SCOPED_RAW_TIMER(&_segment_writer_ns); |
870 | 12 | RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); |
871 | 12 | } |
872 | 12 | return Status::OK(); |
873 | 12 | } |
874 | | |
875 | 4 | Status BaseBetaRowsetWriter::flush_single_block(const Block* block) { |
876 | 4 | return _segment_creator.flush_single_block(block); |
877 | 4 | } |
878 | | |
879 | 1.01k | Status BetaRowsetWriter::_wait_flying_segcompaction() { |
880 | 1.01k | std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); |
881 | 1.01k | uint64_t begin_wait = GetCurrentTimeMicros(); |
882 | 1.01k | while (_is_doing_segcompaction) { |
883 | | // change sync wait to async? |
884 | 0 | _segcompacting_cond.wait(l); |
885 | 0 | } |
886 | 1.01k | uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; |
887 | 1.01k | if (elapsed >= MICROS_PER_SEC) { |
888 | 0 | LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; |
889 | 0 | } |
890 | 1.01k | if (_segcompaction_status.load() != OK) { |
891 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
892 | 0 | "BetaRowsetWriter meet invalid state, error code: {}", |
893 | 0 | _segcompaction_status.load()); |
894 | 0 | } |
895 | 1.01k | return Status::OK(); |
896 | 1.01k | } |
897 | | |
898 | 24 | RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { |
899 | 24 | if (_rowset_meta->newest_write_timestamp() == -1) { |
900 | 0 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
901 | 0 | } |
902 | | |
903 | 24 | build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); |
904 | 24 | RowsetSharedPtr rowset; |
905 | 24 | auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, |
906 | 24 | _rowset_meta, &rowset); |
907 | 24 | if (!status.ok()) { |
908 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
909 | 0 | return nullptr; |
910 | 0 | } |
911 | 24 | _already_built = true; |
912 | 24 | return rowset; |
913 | 24 | } |
914 | | |
915 | 654 | Status BaseBetaRowsetWriter::_close_file_writers() { |
916 | | // Flush and close segment files |
917 | 654 | RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), |
918 | 654 | "failed to close segment creator when build new rowset"); |
919 | 654 | return Status::OK(); |
920 | 654 | } |
921 | | |
922 | 654 | Status BetaRowsetWriter::_close_file_writers() { |
923 | 654 | RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); |
924 | | // if _segment_start_id is not zero, that means it's a transient rowset writer for |
925 | | // MoW partial update, don't need to do segment compaction. |
926 | 654 | if (_segment_start_id == 0) { |
927 | 654 | if (_segcompaction_worker->cancel()) { |
928 | 654 | std::lock_guard lk(_is_doing_segcompaction_lock); |
929 | 654 | _is_doing_segcompaction = false; |
930 | 654 | _segcompacting_cond.notify_all(); |
931 | 654 | } else { |
932 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), |
933 | 0 | "segcompaction failed when build new rowset"); |
934 | 0 | } |
935 | 654 | RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), |
936 | 654 | "rename last segments failed when build new rowset"); |
937 | | // segcompaction worker would do file wrier's close function in compact_segments |
938 | 654 | if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer(); |
939 | 654 | nullptr != seg_comp_file_writer && |
940 | 654 | seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) { |
941 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(), |
942 | 0 | "close segment compaction worker failed"); |
943 | 0 | } |
944 | | // process delete bitmap for mow table |
945 | 654 | if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { |
946 | 4 | auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); |
947 | | // which means the segment compaction is triggerd |
948 | 4 | if (converted_delete_bitmap != nullptr) { |
949 | 4 | RowsetIdUnorderedSet rowsetids; |
950 | 4 | rowsetids.insert(rowset_id()); |
951 | 4 | context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), |
952 | 4 | rowsetids); |
953 | 4 | context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, |
954 | 4 | {rowset_id(), UINT32_MAX, INT64_MAX}); |
955 | 4 | context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); |
956 | 4 | } |
957 | 4 | } |
958 | 654 | } |
959 | 654 | return Status::OK(); |
960 | 654 | } |
961 | | |
962 | 982 | Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { |
963 | 982 | if (_calc_delete_bitmap_token != nullptr) { |
964 | 9 | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
965 | 9 | } |
966 | 982 | RETURN_IF_ERROR(_close_file_writers()); |
967 | 982 | const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; |
968 | 982 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), |
969 | 982 | "too many segments when build new rowset"); |
970 | 982 | RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true)); |
971 | 982 | if (_is_pending) { |
972 | 30 | _rowset_meta->set_rowset_state(COMMITTED); |
973 | 952 | } else { |
974 | 952 | _rowset_meta->set_rowset_state(VISIBLE); |
975 | 952 | } |
976 | | |
977 | 982 | if (_rowset_meta->newest_write_timestamp() == -1) { |
978 | 633 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
979 | 633 | } |
980 | | |
981 | 982 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
982 | | |
983 | | // If segment compaction occurs, the idx file info will become inaccurate. |
984 | 982 | if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) && |
985 | 982 | _num_segcompacted == 0) { |
986 | 106 | if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
987 | 106 | !idx_files_info.has_value()) [[unlikely]] { |
988 | 0 | LOG(ERROR) << "expected inverted index files info, but none presents: " |
989 | 0 | << idx_files_info.error(); |
990 | 106 | } else { |
991 | 106 | _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
992 | 106 | } |
993 | 106 | } |
994 | | |
995 | 982 | RETURN_NOT_OK_STATUS_WITH_WARN( |
996 | 982 | RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
997 | 982 | &rowset), |
998 | 982 | "rowset init failed when build new rowset"); |
999 | 982 | _already_built = true; |
1000 | 982 | return Status::OK(); |
1001 | 982 | } |
1002 | | |
1003 | 0 | int64_t BaseBetaRowsetWriter::_num_seg() const { |
1004 | 0 | return _num_segment; |
1005 | 0 | } |
1006 | | |
1007 | 1.04k | int64_t BetaRowsetWriter::_num_seg() const { |
1008 | 1.04k | return is_segcompacted() ? _num_segcompacted : _num_segment; |
1009 | 1.04k | } |
1010 | | |
1011 | 1.04k | Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
1012 | 1.04k | int64_t num_rows_written = 0; |
1013 | 1.04k | int64_t total_data_size = 0; |
1014 | 1.04k | int64_t total_index_size = 0; |
1015 | 1.04k | std::vector<KeyBoundsPB> segments_encoded_key_bounds; |
1016 | 1.04k | std::vector<uint32_t> segment_rows; |
1017 | 1.04k | std::optional<bool> segments_key_bounds_truncated; |
1018 | 1.04k | { |
1019 | 1.04k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1020 | 1.50k | for (const auto& itr : _segid_statistics_map) { |
1021 | 1.50k | num_rows_written += itr.second.row_num; |
1022 | 1.50k | total_data_size += itr.second.data_size; |
1023 | 1.50k | total_index_size += itr.second.index_size; |
1024 | 1.50k | segments_encoded_key_bounds.push_back(itr.second.key_bounds); |
1025 | | // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load |
1026 | 1.50k | segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num)); |
1027 | 1.50k | } |
1028 | 1.04k | segments_key_bounds_truncated = _segments_key_bounds_truncated; |
1029 | 1.04k | } |
1030 | 1.04k | if (segment_rows.empty()) { |
1031 | | // vertical compaction and linked schema change will not record segment statistics, |
1032 | | // it will record segment rows in _segment_num_rows |
1033 | 426 | RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); |
1034 | 426 | } |
1035 | | |
1036 | 1.18k | for (auto& key_bound : _segments_encoded_key_bounds) { |
1037 | 1.18k | segments_encoded_key_bounds.push_back(key_bound); |
1038 | 1.18k | } |
1039 | 1.04k | if (segments_key_bounds_truncated.has_value()) { |
1040 | 405 | rowset_meta->set_segments_key_bounds_truncated(segments_key_bounds_truncated.value()); |
1041 | 405 | } |
1042 | 1.04k | rowset_meta->set_num_segment_rows(segment_rows); |
1043 | | // segment key bounds are empty in old version(before version 1.2.x). So we should not modify |
1044 | | // the overlap property when key bounds are empty. |
1045 | | // for mow table with cluster keys, the overlap is used for cluster keys, |
1046 | | // the key_bounds is primary keys |
1047 | 1.04k | if (!segments_encoded_key_bounds.empty() && |
1048 | 1.04k | !is_segment_overlapping(segments_encoded_key_bounds) && |
1049 | 1.04k | _context.tablet_schema->cluster_key_uids().empty()) { |
1050 | 442 | rowset_meta->set_segments_overlap(NONOVERLAPPING); |
1051 | 442 | } |
1052 | | |
1053 | 1.04k | auto segment_num = _num_seg(); |
1054 | 1.04k | if (check_segment_num && config::check_segment_when_build_rowset_meta) { |
1055 | 0 | auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size(); |
1056 | 0 | if (segments_encoded_key_bounds_size != segment_num) { |
1057 | 0 | return Status::InternalError( |
1058 | 0 | "segments_encoded_key_bounds_size should equal to _num_seg, " |
1059 | 0 | "segments_encoded_key_bounds_size " |
1060 | 0 | "is: {}, _num_seg is: {}", |
1061 | 0 | segments_encoded_key_bounds_size, segment_num); |
1062 | 0 | } |
1063 | 0 | if (segment_rows.size() != segment_num) { |
1064 | 0 | return Status::InternalError( |
1065 | 0 | "segment_rows size should equal to _num_seg, segment_rows size is: {}, " |
1066 | 0 | "_num_seg is {}, tablet={}, rowset={}, txn={}", |
1067 | 0 | segment_rows.size(), segment_num, _context.tablet_id, |
1068 | 0 | _context.rowset_id.to_string(), _context.txn_id); |
1069 | 0 | } |
1070 | 0 | } |
1071 | | |
1072 | 1.04k | rowset_meta->set_num_segments(segment_num); |
1073 | 1.04k | rowset_meta->set_num_rows(num_rows_written + _num_rows_written); |
1074 | 1.04k | rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + |
1075 | 1.04k | _total_index_size); |
1076 | 1.04k | rowset_meta->set_data_disk_size(total_data_size + _total_data_size); |
1077 | 1.04k | rowset_meta->set_index_disk_size(total_index_size + _total_index_size); |
1078 | 1.04k | bool aggregate_key_bounds = config::enable_aggregate_non_mow_key_bounds && |
1079 | 1.04k | !_context.enable_unique_key_merge_on_write; |
1080 | 1.04k | rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds, aggregate_key_bounds); |
1081 | | // TODO write zonemap to meta |
1082 | 1.04k | rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); |
1083 | 1.04k | rowset_meta->set_creation_time(time(nullptr)); |
1084 | 1.04k | return Status::OK(); |
1085 | 1.04k | } |
1086 | | |
1087 | 63 | Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { |
1088 | 63 | Status status; |
1089 | 63 | std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>(); |
1090 | 63 | tmp_rs_meta->init(_rowset_meta.get()); |
1091 | | |
1092 | 63 | status = _build_rowset_meta(tmp_rs_meta.get()); |
1093 | 63 | if (!status.ok()) { |
1094 | 0 | LOG(WARNING) << "failed to build rowset meta, res=" << status; |
1095 | 0 | return status; |
1096 | 0 | } |
1097 | | |
1098 | 63 | status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta, |
1099 | 63 | &rowset_ptr); |
1100 | 63 | DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed", |
1101 | 63 | { status = Status::InternalError("create rowset failed"); }); |
1102 | 63 | if (!status.ok()) { |
1103 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
1104 | 0 | return status; |
1105 | 0 | } |
1106 | 63 | return Status::OK(); |
1107 | 63 | } |
1108 | | |
1109 | | Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, |
1110 | | io::FileWriterPtr& file_writer, |
1111 | 2.68k | bool is_index_file) { |
1112 | 2.68k | io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file); |
1113 | 2.68k | Status st = _context.fs()->create_file(path, &file_writer, &opts); |
1114 | 2.68k | if (!st.ok()) { |
1115 | 0 | LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; |
1116 | 0 | return st; |
1117 | 0 | } |
1118 | | |
1119 | 2.68k | DCHECK(file_writer != nullptr); |
1120 | 2.68k | return Status::OK(); |
1121 | 2.68k | } |
1122 | | |
1123 | | Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, |
1124 | 2.66k | FileType file_type) { |
1125 | 2.66k | auto segment_path = _context.segment_path(segment_id); |
1126 | 2.66k | if (file_type == FileType::INVERTED_INDEX_FILE) { |
1127 | 253 | std::string prefix = |
1128 | 253 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; |
1129 | 253 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1130 | 253 | return _create_file_writer(index_path, file_writer, true /* is_index_file */); |
1131 | 2.40k | } else if (file_type == FileType::SEGMENT_FILE) { |
1132 | 2.40k | return _create_file_writer(segment_path, file_writer, false /* is_index_file */); |
1133 | 2.40k | } |
1134 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1135 | 0 | fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); |
1136 | 2.66k | } |
1137 | | |
1138 | | Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id, |
1139 | 226 | IndexFileWriterPtr* index_file_writer) { |
1140 | 226 | RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer)); |
1141 | | // used for inverted index format v1 |
1142 | 226 | (*index_file_writer) |
1143 | 226 | ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */)); |
1144 | 226 | return Status::OK(); |
1145 | 226 | } |
1146 | | |
1147 | | Status BetaRowsetWriter::create_segment_writer_for_segcompaction( |
1148 | 12 | std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { |
1149 | 12 | DCHECK(begin >= 0 && end >= 0); |
1150 | 12 | std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
1151 | 12 | _context.rowset_id, begin, end); |
1152 | 12 | io::FileWriterPtr file_writer; |
1153 | 12 | RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */)); |
1154 | | |
1155 | 12 | IndexFileWriterPtr index_file_writer; |
1156 | 12 | if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
1157 | 8 | io::FileWriterPtr idx_file_writer; |
1158 | 8 | std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path)); |
1159 | 8 | if (_context.tablet_schema->get_inverted_index_storage_format() != |
1160 | 8 | InvertedIndexStorageFormatPB::V1) { |
1161 | 8 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1162 | 8 | RETURN_IF_ERROR( |
1163 | 8 | _create_file_writer(index_path, idx_file_writer, true /* is_index_file */)); |
1164 | 8 | } |
1165 | 8 | index_file_writer = std::make_unique<IndexFileWriter>( |
1166 | 8 | _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, |
1167 | 8 | _context.tablet_schema->get_inverted_index_storage_format(), |
1168 | 8 | std::move(idx_file_writer), true /* can_use_ram_dir */, _context.tablet_id); |
1169 | 8 | } |
1170 | | |
1171 | 12 | segment_v2::SegmentWriterOptions writer_options; |
1172 | 12 | writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; |
1173 | 12 | writer_options.rowset_ctx = &_context; |
1174 | 12 | writer_options.write_type = _context.write_type; |
1175 | 12 | writer_options.write_type = DataWriteType::TYPE_COMPACTION; |
1176 | 12 | writer_options.max_rows_per_segment = _context.max_rows_per_segment; |
1177 | 12 | writer_options.mow_ctx = _context.mow_context; |
1178 | | |
1179 | 12 | *writer = std::make_unique<segment_v2::SegmentWriter>( |
1180 | 12 | file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, |
1181 | 12 | _context.data_dir, writer_options, index_file_writer.get()); |
1182 | 12 | if (auto& seg_writer = _segcompaction_worker->get_file_writer(); |
1183 | 12 | seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { |
1184 | 0 | RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); |
1185 | 0 | } |
1186 | 12 | _segcompaction_worker->get_file_writer().reset(file_writer.release()); |
1187 | 12 | if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); |
1188 | 12 | idx_file_writer != nullptr) { |
1189 | 0 | RETURN_IF_ERROR(idx_file_writer->begin_close()); |
1190 | 0 | RETURN_IF_ERROR(idx_file_writer->finish_close()); |
1191 | 0 | } |
1192 | 12 | _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); |
1193 | 12 | return Status::OK(); |
1194 | 12 | } |
1195 | | |
1196 | 0 | Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1197 | 0 | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1198 | 0 | { segnum = dp->param("segnum", 1024); }); |
1199 | 0 | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1200 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1201 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " |
1202 | 0 | "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " |
1203 | 0 | "small or if the data is skewed.", |
1204 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1205 | 0 | config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows()); |
1206 | 0 | } |
1207 | 0 | return Status::OK(); |
1208 | 0 | } |
1209 | | |
1210 | 2.18k | Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1211 | 2.18k | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1212 | 2.18k | { segnum = dp->param("segnum", 1024); }); |
1213 | 2.18k | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1214 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1215 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " |
1216 | 0 | "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " |
1217 | 0 | "the bucket number is too small or if the data is skewed.", |
1218 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1219 | 0 | config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, |
1220 | 0 | _num_segcompacted, get_rowset_num_rows()); |
1221 | 0 | } |
1222 | 2.18k | return Status::OK(); |
1223 | 2.18k | } |
1224 | | |
1225 | 1.20k | Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1226 | 1.20k | uint32_t segid_offset = segment_id - _segment_start_id; |
1227 | 1.20k | SegmentStatistics stored_segstat = segstat; |
1228 | 1.20k | const bool key_bounds_truncated = truncate_key_bounds(&stored_segstat.key_bounds); |
1229 | 1.20k | { |
1230 | 1.20k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1231 | 1.20k | CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true); |
1232 | 1.20k | _segid_statistics_map.emplace(segment_id, std::move(stored_segstat)); |
1233 | 1.20k | if (segment_id >= _segment_num_rows.size()) { |
1234 | 1.20k | _segment_num_rows.resize(segment_id + 1); |
1235 | 1.20k | } |
1236 | 1.20k | _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num); |
1237 | 1.20k | if (key_bounds_truncated) { |
1238 | 702 | _segments_key_bounds_truncated = true; |
1239 | 702 | } |
1240 | 1.20k | } |
1241 | 1.20k | VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id |
1242 | 0 | << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size |
1243 | 0 | << " index_size:" << segstat.index_size; |
1244 | | |
1245 | 1.20k | { |
1246 | 1.20k | std::lock_guard<std::mutex> lock(_segment_set_mutex); |
1247 | 1.20k | _segment_set.add(segid_offset); |
1248 | 2.41k | while (_segment_set.contains(_num_segment)) { |
1249 | 1.20k | _num_segment++; |
1250 | 1.20k | } |
1251 | 1.20k | } |
1252 | | |
1253 | 1.20k | if (_context.mow_context != nullptr) { |
1254 | 63 | RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); |
1255 | 63 | } |
1256 | 1.20k | return Status::OK(); |
1257 | 1.20k | } |
1258 | | |
1259 | 1.20k | Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1260 | 1.20k | RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat)); |
1261 | 1.20k | return _segcompaction_if_necessary(); |
1262 | 1.20k | } |
1263 | | |
1264 | | Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( |
1265 | | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, |
1266 | 11 | KeyBoundsPB& key_bounds) { |
1267 | 11 | uint32_t segid = (*writer)->get_segment_id(); |
1268 | 11 | uint32_t row_num = (*writer)->row_count(); |
1269 | 11 | uint64_t segment_size; |
1270 | | |
1271 | 11 | auto s = (*writer)->finalize_footer(&segment_size); |
1272 | 11 | if (!s.ok()) { |
1273 | 0 | return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}", |
1274 | 0 | s.to_string()); |
1275 | 0 | } |
1276 | 11 | int64_t inverted_index_file_size = 0; |
1277 | 11 | RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size)); |
1278 | | |
1279 | 11 | SegmentStatistics segstat; |
1280 | 11 | segstat.row_num = row_num; |
1281 | 11 | segstat.data_size = segment_size; |
1282 | 11 | segstat.index_size = inverted_index_file_size; |
1283 | 11 | segstat.key_bounds = key_bounds; |
1284 | 11 | const bool key_bounds_truncated = truncate_key_bounds(&segstat.key_bounds); |
1285 | 11 | { |
1286 | 11 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1287 | 11 | CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); |
1288 | 11 | _segid_statistics_map.emplace(segid, std::move(segstat)); |
1289 | 11 | if (key_bounds_truncated) { |
1290 | 2 | _segments_key_bounds_truncated = true; |
1291 | 2 | } |
1292 | 11 | } |
1293 | 11 | VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num |
1294 | 0 | << " data_size:" << PrettyPrinter::print_bytes(segment_size) |
1295 | 0 | << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size); |
1296 | | |
1297 | 11 | writer->reset(); |
1298 | | |
1299 | 11 | return Status::OK(); |
1300 | 11 | } |
1301 | | |
1302 | | } // namespace doris |