be/src/storage/rowset/beta_rowset_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "storage/rowset/beta_rowset_writer.h" |
19 | | |
20 | | #include <assert.h> |
21 | | // IWYU pragma: no_include <bthread/errno.h> |
22 | | #include <errno.h> // IWYU pragma: keep |
23 | | #include <fmt/format.h> |
24 | | #include <stdio.h> |
25 | | |
26 | | #include <ctime> // time |
27 | | #include <filesystem> |
28 | | #include <memory> |
29 | | #include <mutex> |
30 | | #include <sstream> |
31 | | #include <utility> |
32 | | #include <vector> |
33 | | |
34 | | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
35 | | #include "common/cast_set.h" |
36 | | #include "common/compiler_util.h" // IWYU pragma: keep |
37 | | #include "common/config.h" |
38 | | #include "common/logging.h" |
39 | | #include "common/status.h" |
40 | | #include "core/block/block.h" |
41 | | #include "core/column/column.h" |
42 | | #include "core/data_type/data_type_factory.hpp" |
43 | | #include "io/fs/file_reader.h" |
44 | | #include "io/fs/file_system.h" |
45 | | #include "io/fs/file_writer.h" |
46 | | #include "runtime/thread_context.h" |
47 | | #include "storage/index/inverted/inverted_index_cache.h" |
48 | | #include "storage/index/inverted/inverted_index_desc.h" |
49 | | #include "storage/olap_define.h" |
50 | | #include "storage/rowset/beta_rowset.h" |
51 | | #include "storage/rowset/rowset_factory.h" |
52 | | #include "storage/rowset/rowset_writer.h" |
53 | | #include "storage/rowset/segcompaction.h" |
54 | | #include "storage/schema_change/schema_change.h" |
55 | | #include "storage/segment/segment.h" |
56 | | #include "storage/segment/segment_writer.h" |
57 | | #include "storage/storage_engine.h" |
58 | | #include "storage/tablet/tablet_schema.h" |
59 | | #include "util/debug_points.h" |
60 | | #include "util/pretty_printer.h" |
61 | | #include "util/slice.h" |
62 | | #include "util/stopwatch.hpp" |
63 | | #include "util/time.h" |
64 | | |
65 | | namespace doris { |
66 | | #include "common/compile_check_begin.h" |
67 | | using namespace ErrorCode; |
68 | | |
69 | | namespace { |
70 | | |
71 | 689 | bool is_segment_overlapping(const std::vector<KeyBoundsPB>& segments_encoded_key_bounds) { |
72 | 689 | std::string_view last; |
73 | 3.40k | for (auto&& segment_encode_key : segments_encoded_key_bounds) { |
74 | 3.40k | auto&& cur_min = segment_encode_key.min_key(); |
75 | 3.40k | auto&& cur_max = segment_encode_key.max_key(); |
76 | 3.40k | if (cur_min <= last) { |
77 | 136 | return true; |
78 | 136 | } |
79 | 3.27k | last = cur_max; |
80 | 3.27k | } |
81 | 553 | return false; |
82 | 689 | } |
83 | | |
84 | | void build_rowset_meta_with_spec_field(RowsetMeta& rowset_meta, |
85 | 24 | const RowsetMeta& spec_rowset_meta) { |
86 | 24 | rowset_meta.set_num_rows(spec_rowset_meta.num_rows()); |
87 | 24 | rowset_meta.set_total_disk_size(spec_rowset_meta.total_disk_size()); |
88 | 24 | rowset_meta.set_data_disk_size(spec_rowset_meta.data_disk_size()); |
89 | 24 | rowset_meta.set_index_disk_size(spec_rowset_meta.index_disk_size()); |
90 | | // TODO write zonemap to meta |
91 | 24 | rowset_meta.set_empty(spec_rowset_meta.num_rows() == 0); |
92 | 24 | rowset_meta.set_creation_time(time(nullptr)); |
93 | 24 | rowset_meta.set_num_segments(spec_rowset_meta.num_segments()); |
94 | 24 | rowset_meta.set_segments_overlap(spec_rowset_meta.segments_overlap()); |
95 | 24 | rowset_meta.set_rowset_state(spec_rowset_meta.rowset_state()); |
96 | 24 | rowset_meta.set_segments_key_bounds_truncated( |
97 | 24 | spec_rowset_meta.is_segments_key_bounds_truncated()); |
98 | 24 | std::vector<KeyBoundsPB> segments_key_bounds; |
99 | 24 | spec_rowset_meta.get_segments_key_bounds(&segments_key_bounds); |
100 | 24 | rowset_meta.set_segments_key_bounds(segments_key_bounds); |
101 | 24 | std::vector<uint32_t> num_segment_rows; |
102 | 24 | spec_rowset_meta.get_num_segment_rows(&num_segment_rows); |
103 | 24 | rowset_meta.set_num_segment_rows(num_segment_rows); |
104 | 24 | } |
105 | | |
106 | | } // namespace |
107 | | |
108 | 985 | SegmentFileCollection::~SegmentFileCollection() = default; |
109 | | |
110 | 5.19k | Status SegmentFileCollection::add(int seg_id, io::FileWriterPtr&& writer) { |
111 | 5.19k | std::lock_guard lock(_lock); |
112 | 5.19k | if (_closed) [[unlikely]] { |
113 | 0 | DCHECK(false) << writer->path(); |
114 | 0 | return Status::InternalError("add to closed SegmentFileCollection"); |
115 | 0 | } |
116 | | |
117 | 5.19k | _file_writers.emplace(seg_id, std::move(writer)); |
118 | 5.19k | return Status::OK(); |
119 | 5.19k | } |
120 | | |
121 | 63 | io::FileWriter* SegmentFileCollection::get(int seg_id) const { |
122 | 63 | std::lock_guard lock(_lock); |
123 | 63 | if (auto it = _file_writers.find(seg_id); it != _file_writers.end()) { |
124 | 63 | return it->second.get(); |
125 | 63 | } else { |
126 | 0 | return nullptr; |
127 | 0 | } |
128 | 63 | } |
129 | | |
130 | 953 | Status SegmentFileCollection::close() { |
131 | 953 | { |
132 | 953 | std::lock_guard lock(_lock); |
133 | 953 | if (_closed) [[unlikely]] { |
134 | 0 | DCHECK(false); |
135 | 0 | return Status::InternalError("double close SegmentFileCollection"); |
136 | 0 | } |
137 | 953 | _closed = true; |
138 | 953 | } |
139 | | |
140 | 5.19k | for (auto&& [_, writer] : _file_writers) { |
141 | 5.19k | if (writer->state() != io::FileWriter::State::CLOSED) { |
142 | 5.13k | RETURN_IF_ERROR(writer->close()); |
143 | 5.13k | } |
144 | 5.19k | } |
145 | | |
146 | 953 | return Status::OK(); |
147 | 953 | } |
148 | | |
149 | 0 | Result<std::vector<size_t>> SegmentFileCollection::segments_file_size(int seg_id_offset) { |
150 | 0 | std::lock_guard lock(_lock); |
151 | 0 | if (!_closed) [[unlikely]] { |
152 | 0 | DCHECK(false); |
153 | 0 | return ResultError(Status::InternalError("get segments file size without closed")); |
154 | 0 | } |
155 | | |
156 | 0 | Status st; |
157 | 0 | std::vector<size_t> seg_file_size(_file_writers.size(), 0); |
158 | 0 | bool succ = std::all_of(_file_writers.begin(), _file_writers.end(), [&](auto&& it) { |
159 | 0 | auto&& [seg_id, writer] = it; |
160 | |
|
161 | 0 | int idx = seg_id - seg_id_offset; |
162 | 0 | if (idx >= seg_file_size.size()) [[unlikely]] { |
163 | 0 | auto err_msg = fmt::format( |
164 | 0 | "invalid seg_id={} num_file_writers={} seg_id_offset={} path={}", seg_id, |
165 | 0 | seg_file_size.size(), seg_id_offset, writer->path().native()); |
166 | 0 | DCHECK(false) << err_msg; |
167 | 0 | st = Status::InternalError(err_msg); |
168 | 0 | return false; |
169 | 0 | } |
170 | | |
171 | 0 | auto& fsize = seg_file_size[idx]; |
172 | 0 | if (fsize != 0) { |
173 | | // File size should not been set |
174 | 0 | auto err_msg = |
175 | 0 | fmt::format("duplicate seg_id={} path={}", seg_id, writer->path().native()); |
176 | 0 | DCHECK(false) << err_msg; |
177 | 0 | st = Status::InternalError(err_msg); |
178 | 0 | return false; |
179 | 0 | } |
180 | | |
181 | 0 | fsize = writer->bytes_appended(); |
182 | 0 | if (fsize <= 0) { |
183 | 0 | auto err_msg = |
184 | 0 | fmt::format("invalid segment fsize={} path={}", fsize, writer->path().native()); |
185 | 0 | DCHECK(false) << err_msg; |
186 | 0 | st = Status::InternalError(err_msg); |
187 | 0 | return false; |
188 | 0 | } |
189 | | |
190 | 0 | return true; |
191 | 0 | }); |
192 | |
|
193 | 0 | if (succ) { |
194 | 0 | return seg_file_size; |
195 | 0 | } |
196 | | |
197 | 0 | return ResultError(st); |
198 | 0 | } |
199 | | |
200 | 985 | InvertedIndexFileCollection::~InvertedIndexFileCollection() = default; |
201 | | |
202 | 265 | Status InvertedIndexFileCollection::add(int seg_id, IndexFileWriterPtr&& index_writer) { |
203 | 265 | std::lock_guard lock(_lock); |
204 | 265 | if (_inverted_index_file_writers.find(seg_id) != _inverted_index_file_writers.end()) |
205 | 0 | [[unlikely]] { |
206 | 0 | DCHECK(false); |
207 | 0 | return Status::InternalError("The seg_id already exists, seg_id is {}", seg_id); |
208 | 0 | } |
209 | 265 | _inverted_index_file_writers.emplace(seg_id, std::move(index_writer)); |
210 | 265 | return Status::OK(); |
211 | 265 | } |
212 | | |
213 | 322 | Status InvertedIndexFileCollection::begin_close() { |
214 | 322 | std::lock_guard lock(_lock); |
215 | 322 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
216 | 36 | RETURN_IF_ERROR(writer->begin_close()); |
217 | 36 | _total_size += writer->get_index_file_total_size(); |
218 | 36 | } |
219 | | |
220 | 322 | return Status::OK(); |
221 | 322 | } |
222 | | |
223 | 953 | Status InvertedIndexFileCollection::finish_close() { |
224 | 953 | std::lock_guard lock(_lock); |
225 | 953 | for (auto&& [id, writer] : _inverted_index_file_writers) { |
226 | 264 | RETURN_IF_ERROR(writer->finish_close()); |
227 | 264 | } |
228 | 953 | return Status::OK(); |
229 | 953 | } |
230 | | |
231 | | Result<std::vector<const InvertedIndexFileInfo*>> |
232 | 106 | InvertedIndexFileCollection::inverted_index_file_info(int seg_id_offset) { |
233 | 106 | std::lock_guard lock(_lock); |
234 | | |
235 | 106 | Status st; |
236 | 106 | std::vector<const InvertedIndexFileInfo*> idx_file_info(_inverted_index_file_writers.size()); |
237 | 106 | bool succ = std::all_of( |
238 | 106 | _inverted_index_file_writers.begin(), _inverted_index_file_writers.end(), |
239 | 215 | [&](auto&& it) { |
240 | 215 | auto&& [seg_id, writer] = it; |
241 | | |
242 | 215 | int idx = seg_id - seg_id_offset; |
243 | 215 | if (idx >= idx_file_info.size()) [[unlikely]] { |
244 | 0 | auto err_msg = |
245 | 0 | fmt::format("invalid seg_id={} num_file_writers={} seg_id_offset={}", |
246 | 0 | seg_id, idx_file_info.size(), seg_id_offset); |
247 | 0 | DCHECK(false) << err_msg; |
248 | 0 | st = Status::InternalError(err_msg); |
249 | 0 | return false; |
250 | 0 | } |
251 | 215 | idx_file_info[idx] = _inverted_index_file_writers[seg_id]->get_index_file_info(); |
252 | 215 | return true; |
253 | 215 | }); |
254 | | |
255 | 106 | if (succ) { |
256 | 106 | return idx_file_info; |
257 | 106 | } |
258 | | |
259 | 0 | return ResultError(st); |
260 | 106 | } |
261 | | |
262 | | BaseBetaRowsetWriter::BaseBetaRowsetWriter() |
263 | 985 | : _num_segment(0), |
264 | 985 | _segment_start_id(0), |
265 | 985 | _num_rows_written(0), |
266 | 985 | _total_data_size(0), |
267 | 985 | _total_index_size(0), |
268 | 985 | _segment_creator(_context, _seg_files, _idx_files) {} |
269 | | |
270 | | BetaRowsetWriter::BetaRowsetWriter(StorageEngine& engine) |
271 | 984 | : _engine(engine), _segcompaction_worker(std::make_shared<SegcompactionWorker>(this)) {} |
272 | | |
273 | 985 | BaseBetaRowsetWriter::~BaseBetaRowsetWriter() { |
274 | 985 | if (!_already_built && _rowset_meta->is_local()) { |
275 | | // abnormal exit, remove all files generated |
276 | 7 | auto& fs = io::global_local_filesystem(); |
277 | 7 | for (int i = _segment_start_id; i < _segment_creator.next_segment_id(); ++i) { |
278 | 0 | std::string seg_path = |
279 | 0 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), i); |
280 | | // Even if an error is encountered, these files that have not been cleaned up |
281 | | // will be cleaned up by the GC background. So here we only print the error |
282 | | // message when we encounter an error. |
283 | 0 | WARN_IF_ERROR(fs->delete_file(seg_path), |
284 | 0 | fmt::format("Failed to delete file={}", seg_path)); |
285 | 0 | } |
286 | 7 | } |
287 | 985 | if (_calc_delete_bitmap_token) { |
288 | 8 | _calc_delete_bitmap_token->cancel(); |
289 | 8 | } |
290 | 985 | } |
291 | | |
292 | 984 | BetaRowsetWriter::~BetaRowsetWriter() { |
293 | | /* Note that segcompaction is async and in parallel with load job. So we should handle carefully |
294 | | * when the job is cancelled. Although it is meaningless to continue segcompaction when the job |
295 | | * is cancelled, the objects involved in the job should be preserved during segcompaction to |
296 | | * avoid crashs for memory issues. */ |
297 | 984 | WARN_IF_ERROR(_wait_flying_segcompaction(), "segment compaction failed"); |
298 | 984 | } |
299 | | |
300 | 984 | Status BaseBetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
301 | 984 | _context = rowset_writer_context; |
302 | 984 | _rowset_meta.reset(new RowsetMeta); |
303 | 984 | if (_context.storage_resource) { |
304 | 0 | _rowset_meta->set_remote_storage_resource(*_context.storage_resource); |
305 | 0 | } |
306 | 984 | _rowset_meta->set_rowset_id(_context.rowset_id); |
307 | 984 | _rowset_meta->set_partition_id(_context.partition_id); |
308 | 984 | _rowset_meta->set_tablet_id(_context.tablet_id); |
309 | 984 | _rowset_meta->set_tablet_schema_hash(_context.tablet_schema_hash); |
310 | 984 | _rowset_meta->set_rowset_type(_context.rowset_type); |
311 | 984 | _rowset_meta->set_rowset_state(_context.rowset_state); |
312 | 984 | _rowset_meta->set_segments_overlap(_context.segments_overlap); |
313 | 984 | if (_context.rowset_state == PREPARED || _context.rowset_state == COMMITTED) { |
314 | 30 | _is_pending = true; |
315 | 30 | _rowset_meta->set_txn_id(_context.txn_id); |
316 | 30 | _rowset_meta->set_load_id(_context.load_id); |
317 | 954 | } else { |
318 | 954 | _rowset_meta->set_version(_context.version); |
319 | 954 | _rowset_meta->set_newest_write_timestamp(_context.newest_write_timestamp); |
320 | 954 | } |
321 | 984 | _rowset_meta->set_tablet_uid(_context.tablet_uid); |
322 | 984 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
323 | 984 | _context.segment_collector = std::make_shared<SegmentCollectorT<BaseBetaRowsetWriter>>(this); |
324 | 984 | _context.file_writer_creator = std::make_shared<FileWriterCreatorT<BaseBetaRowsetWriter>>(this); |
325 | 984 | return Status::OK(); |
326 | 984 | } |
327 | | |
328 | 1.35k | Status BaseBetaRowsetWriter::add_block(const Block* block) { |
329 | 1.35k | return _segment_creator.add_block(block); |
330 | 1.35k | } |
331 | | |
332 | 63 | Status BaseBetaRowsetWriter::_generate_delete_bitmap(int32_t segment_id) { |
333 | 63 | SCOPED_RAW_TIMER(&_delete_bitmap_ns); |
334 | 63 | if (!_context.tablet->enable_unique_key_merge_on_write() || |
335 | 63 | (_context.partial_update_info && _context.partial_update_info->is_partial_update())) { |
336 | 0 | return Status::OK(); |
337 | 0 | } |
338 | 63 | std::vector<RowsetSharedPtr> specified_rowsets; |
339 | 63 | { |
340 | 63 | std::shared_lock meta_rlock(_context.tablet->get_header_lock()); |
341 | 63 | specified_rowsets = |
342 | 63 | _context.tablet->get_rowset_by_ids(_context.mow_context->rowset_ids.get()); |
343 | 63 | } |
344 | | |
345 | | // Submit the entire delete bitmap calculation process to thread pool for async execution |
346 | | // This avoids blocking memtable flush thread while waiting for file upload to complete |
347 | | // The process includes: file_writer->close(), _build_tmp, load_segments, and calc_delete_bitmap |
348 | 63 | return _calc_delete_bitmap_token->submit_func( |
349 | 63 | [this, segment_id, specified_rowsets = std::move(specified_rowsets)]() -> Status { |
350 | 63 | Status st = Status::OK(); |
351 | | // Step 1: Close file_writer (must be done before load_segments) |
352 | 63 | auto* file_writer = _seg_files.get(segment_id); |
353 | 63 | if (file_writer && file_writer->state() != io::FileWriter::State::CLOSED) { |
354 | 63 | MonotonicStopWatch close_timer; |
355 | 63 | close_timer.start(); |
356 | 63 | st = file_writer->close(); |
357 | 63 | close_timer.stop(); |
358 | | |
359 | 63 | auto close_time_ms = close_timer.elapsed_time_milliseconds(); |
360 | 63 | if (close_time_ms > 1000) { |
361 | 0 | LOG(INFO) << "file_writer->close() took " << close_time_ms |
362 | 0 | << "ms for segment_id=" << segment_id |
363 | 0 | << ", tablet_id=" << _context.tablet_id |
364 | 0 | << ", rowset_id=" << _context.rowset_id; |
365 | 0 | } |
366 | 63 | if (!st.ok()) { |
367 | 0 | return st; |
368 | 0 | } |
369 | 63 | } |
370 | | |
371 | 63 | OlapStopWatch watch; |
372 | | // Step 2: Build tmp rowset (needs file_writer to be closed) |
373 | 63 | RowsetSharedPtr rowset_ptr; |
374 | 63 | st = _build_tmp(rowset_ptr); |
375 | 63 | if (!st.ok()) { |
376 | 0 | return st; |
377 | 0 | } |
378 | | |
379 | | // Step 3: Load segments (needs file_writer to be closed and rowset to be built) |
380 | 63 | auto* beta_rowset = reinterpret_cast<BetaRowset*>(rowset_ptr.get()); |
381 | 63 | std::vector<segment_v2::SegmentSharedPtr> segments; |
382 | 63 | st = beta_rowset->load_segments(segment_id, segment_id + 1, &segments); |
383 | 63 | if (!st.ok()) { |
384 | 0 | return st; |
385 | 0 | } |
386 | | |
387 | | // Step 4: Calculate delete bitmap |
388 | 63 | st = BaseTablet::calc_delete_bitmap( |
389 | 63 | _context.tablet, rowset_ptr, segments, specified_rowsets, |
390 | 63 | _context.mow_context->delete_bitmap, _context.mow_context->max_version, |
391 | 63 | nullptr, nullptr, nullptr); |
392 | 63 | if (!st.ok()) { |
393 | 0 | return st; |
394 | 0 | } |
395 | | |
396 | 63 | size_t total_rows = |
397 | 63 | std::accumulate(segments.begin(), segments.end(), 0, |
398 | 63 | [](size_t sum, const segment_v2::SegmentSharedPtr& s) { |
399 | 63 | return sum += s->num_rows(); |
400 | 63 | }); |
401 | 63 | LOG(INFO) << "[Memtable Flush] construct delete bitmap tablet: " |
402 | 63 | << _context.tablet->tablet_id() |
403 | 63 | << ", rowset_ids: " << _context.mow_context->rowset_ids->size() |
404 | 63 | << ", cur max_version: " << _context.mow_context->max_version |
405 | 63 | << ", transaction_id: " << _context.mow_context->txn_id |
406 | 63 | << ", delete_bitmap_count: " |
407 | 63 | << _context.mow_context->delete_bitmap->get_delete_bitmap_count() |
408 | 63 | << ", delete_bitmap_cardinality: " |
409 | 63 | << _context.mow_context->delete_bitmap->cardinality() |
410 | 63 | << ", cost: " << watch.get_elapse_time_us() |
411 | 63 | << "(us), total rows: " << total_rows; |
412 | 63 | return Status::OK(); |
413 | 63 | }); |
414 | 63 | } |
415 | | |
416 | 984 | Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) { |
417 | 984 | RETURN_IF_ERROR(BaseBetaRowsetWriter::init(rowset_writer_context)); |
418 | 984 | if (_segcompaction_worker) { |
419 | 984 | _segcompaction_worker->init_mem_tracker(rowset_writer_context); |
420 | 984 | } |
421 | 984 | if (_context.mow_context != nullptr) { |
422 | 8 | _calc_delete_bitmap_token = _engine.calc_delete_bitmap_executor_for_load()->create_token(); |
423 | 8 | } |
424 | 984 | return Status::OK(); |
425 | 984 | } |
426 | | |
427 | | Status BetaRowsetWriter::_load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment, |
428 | 77 | int32_t segment_id) { |
429 | 77 | DCHECK(_rowset_meta->is_local()); |
430 | 77 | auto fs = _rowset_meta->fs(); |
431 | 77 | if (!fs) { |
432 | 0 | return Status::Error<INIT_FAILED>( |
433 | 0 | "BetaRowsetWriter::_load_noncompacted_segment _rowset_meta->fs get failed"); |
434 | 0 | } |
435 | 77 | auto path = |
436 | 77 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), segment_id); |
437 | 77 | io::FileReaderOptions reader_options { |
438 | 77 | .cache_type = |
439 | 77 | _context.write_file_cache |
440 | 77 | ? (config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
441 | 0 | : io::FileCachePolicy::NO_CACHE) |
442 | 77 | : io::FileCachePolicy::NO_CACHE, |
443 | 77 | .is_doris_table = true, |
444 | 77 | .cache_base_path {}, |
445 | 77 | }; |
446 | 77 | auto s = segment_v2::Segment::open(fs, path, _rowset_meta->tablet_id(), segment_id, rowset_id(), |
447 | 77 | _context.tablet_schema, reader_options, &segment); |
448 | 77 | if (!s.ok()) { |
449 | 0 | LOG(WARNING) << "failed to open segment. " << path << ":" << s; |
450 | 0 | return s; |
451 | 0 | } |
452 | 77 | return Status::OK(); |
453 | 77 | } |
454 | | |
455 | | /* policy of segcompaction target selection: |
456 | | * 1. skip big segments |
457 | | * 2. if the consecutive smalls end up with a big, compact the smalls, except |
458 | | * single small |
459 | | * 3. if the consecutive smalls end up with small, compact the smalls if the |
460 | | * length is beyond (config::segcompaction_batch_size / 2) |
461 | | */ |
462 | | Status BetaRowsetWriter::_find_longest_consecutive_small_segment( |
463 | 15 | SegCompactionCandidatesSharedPtr& segments) { |
464 | 15 | segments = std::make_shared<SegCompactionCandidates>(); |
465 | | // skip last (maybe active) segment |
466 | 15 | int32_t last_segment = _num_segment - 1; |
467 | 15 | size_t task_bytes = 0; |
468 | 15 | uint32_t task_rows = 0; |
469 | 15 | int32_t segid; |
470 | 15 | for (segid = _segcompacted_point; |
471 | 86 | segid < last_segment && segments->size() < config::segcompaction_batch_size; segid++) { |
472 | 77 | segment_v2::SegmentSharedPtr segment; |
473 | 77 | RETURN_IF_ERROR(_load_noncompacted_segment(segment, segid)); |
474 | 77 | const auto segment_rows = segment->num_rows(); |
475 | 77 | const auto segment_bytes = segment->file_reader()->size(); |
476 | 77 | bool is_large_segment = segment_rows > config::segcompaction_candidate_max_rows || |
477 | 77 | segment_bytes > config::segcompaction_candidate_max_bytes; |
478 | 77 | if (is_large_segment) { |
479 | 14 | if (segid == _segcompacted_point) { |
480 | | // skip large segments at the front |
481 | 8 | auto dst_seg_id = _num_segcompacted.load(); |
482 | 8 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
483 | 8 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
484 | 4 | _segcompaction_worker->convert_segment_delete_bitmap( |
485 | 4 | _context.mow_context->delete_bitmap, segid, dst_seg_id); |
486 | 4 | } |
487 | 8 | continue; |
488 | 8 | } else { |
489 | | // stop because we need consecutive segments |
490 | 6 | break; |
491 | 6 | } |
492 | 14 | } |
493 | 63 | bool is_task_full = task_rows + segment_rows > config::segcompaction_task_max_rows || |
494 | 63 | task_bytes + segment_bytes > config::segcompaction_task_max_bytes; |
495 | 63 | if (is_task_full) { |
496 | 0 | break; |
497 | 0 | } |
498 | 63 | segments->push_back(segment); |
499 | 63 | task_rows += segment->num_rows(); |
500 | 63 | task_bytes += segment->file_reader()->size(); |
501 | 63 | } |
502 | 15 | size_t s = segments->size(); |
503 | 15 | if (segid == last_segment && s <= (config::segcompaction_batch_size / 2)) { |
504 | | // we didn't collect enough segments, better to do it in next |
505 | | // round to compact more at once |
506 | 0 | segments->clear(); |
507 | 0 | return Status::OK(); |
508 | 0 | } |
509 | 15 | if (s == 1) { // poor bachelor, let it go |
510 | 4 | VLOG_DEBUG << "only one candidate segment"; |
511 | 4 | auto src_seg_id = _segcompacted_point.load(); |
512 | 4 | auto dst_seg_id = _num_segcompacted.load(); |
513 | 4 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
514 | 4 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
515 | 2 | _segcompaction_worker->convert_segment_delete_bitmap( |
516 | 2 | _context.mow_context->delete_bitmap, src_seg_id, dst_seg_id); |
517 | 2 | } |
518 | 4 | segments->clear(); |
519 | 4 | return Status::OK(); |
520 | 4 | } |
521 | 11 | if (VLOG_DEBUG_IS_ON) { |
522 | 0 | vlog_buffer.clear(); |
523 | 0 | for (auto& segment : (*segments.get())) { |
524 | 0 | fmt::format_to(vlog_buffer, "[id:{} num_rows:{}]", segment->id(), segment->num_rows()); |
525 | 0 | } |
526 | 0 | VLOG_DEBUG << "candidate segments num:" << s |
527 | 0 | << " list of candidates:" << fmt::to_string(vlog_buffer); |
528 | 0 | } |
529 | 11 | return Status::OK(); |
530 | 15 | } |
531 | | |
532 | 11 | Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { |
533 | 11 | int ret; |
534 | 11 | auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
535 | 11 | _context.rowset_id, begin, end); |
536 | 11 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
537 | 11 | _num_segcompacted); |
538 | 11 | ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
539 | 11 | if (ret) { |
540 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
541 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
542 | 0 | errno); |
543 | 0 | } |
544 | 11 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
545 | | |
546 | | // rename inverted index files |
547 | 11 | RETURN_IF_ERROR(_rename_compacted_indices(begin, end, 0)); |
548 | | |
549 | 11 | _num_segcompacted++; |
550 | 11 | return Status::OK(); |
551 | 11 | } |
552 | | |
553 | | void BetaRowsetWriter::_clear_statistics_for_deleting_segments_unsafe(uint32_t begin, |
554 | 43 | uint32_t end) { |
555 | 43 | VLOG_DEBUG << "_segid_statistics_map clear record segid range from:" << begin << " to:" << end; |
556 | 134 | for (uint32_t i = begin; i <= end; ++i) { |
557 | 91 | _segid_statistics_map.erase(i); |
558 | 91 | } |
559 | 43 | } |
560 | | |
561 | 42 | Status BetaRowsetWriter::_rename_compacted_segment_plain(uint32_t seg_id) { |
562 | 42 | if (seg_id == _num_segcompacted) { |
563 | 10 | ++_num_segcompacted; |
564 | 10 | return Status::OK(); |
565 | 10 | } |
566 | | |
567 | 32 | auto src_seg_path = |
568 | 32 | local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), seg_id); |
569 | 32 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
570 | 32 | _num_segcompacted); |
571 | 32 | VLOG_DEBUG << "segcompaction skip this segment. rename " << src_seg_path << " to " |
572 | 0 | << dst_seg_path; |
573 | 32 | { |
574 | 32 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
575 | 32 | DCHECK_EQ(_segid_statistics_map.find(seg_id) == _segid_statistics_map.end(), false); |
576 | 32 | DCHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), |
577 | 32 | true); |
578 | 32 | auto org = _segid_statistics_map[seg_id]; |
579 | 32 | _segid_statistics_map.emplace(_num_segcompacted, org); |
580 | 32 | _clear_statistics_for_deleting_segments_unsafe(seg_id, seg_id); |
581 | 32 | } |
582 | 32 | int ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); |
583 | 32 | if (ret) { |
584 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
585 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_seg_path, dst_seg_path, ret, |
586 | 0 | errno); |
587 | 0 | } |
588 | | |
589 | 32 | RETURN_IF_ERROR(_remove_segment_footer_cache(_num_segcompacted, dst_seg_path)); |
590 | | // rename remaining inverted index files |
591 | 32 | RETURN_IF_ERROR(_rename_compacted_indices(-1, -1, seg_id)); |
592 | | |
593 | 32 | ++_num_segcompacted; |
594 | 32 | return Status::OK(); |
595 | 32 | } |
596 | | |
597 | | Status BetaRowsetWriter::_remove_segment_footer_cache(const uint32_t seg_id, |
598 | 43 | const std::string& segment_path) { |
599 | 43 | auto* footer_page_cache = ExecEnv::GetInstance()->get_storage_page_cache(); |
600 | 43 | if (!footer_page_cache) { |
601 | 0 | return Status::OK(); |
602 | 0 | } |
603 | | |
604 | 43 | auto fs = _rowset_meta->fs(); |
605 | 43 | bool exists = false; |
606 | 43 | RETURN_IF_ERROR(fs->exists(segment_path, &exists)); |
607 | 43 | if (exists) { |
608 | 43 | io::FileReaderSPtr file_reader; |
609 | 43 | io::FileReaderOptions reader_options { |
610 | 43 | .cache_type = config::enable_file_cache ? io::FileCachePolicy::FILE_BLOCK_CACHE |
611 | 43 | : io::FileCachePolicy::NO_CACHE, |
612 | 43 | .is_doris_table = true, |
613 | 43 | .cache_base_path = "", |
614 | 43 | .file_size = _rowset_meta->segment_file_size(static_cast<int>(seg_id)), |
615 | 43 | .tablet_id = _rowset_meta->tablet_id(), |
616 | 43 | }; |
617 | 43 | RETURN_IF_ERROR(fs->open_file(segment_path, &file_reader, &reader_options)); |
618 | 43 | DCHECK(file_reader != nullptr); |
619 | 43 | auto cache_key = segment_v2::Segment::get_segment_footer_cache_key(file_reader); |
620 | 43 | footer_page_cache->erase(cache_key, segment_v2::PageTypePB::INDEX_PAGE); |
621 | 43 | } |
622 | 43 | return Status::OK(); |
623 | 43 | } |
624 | | |
625 | 43 | Status BetaRowsetWriter::_rename_compacted_indices(int64_t begin, int64_t end, uint64_t seg_id) { |
626 | 43 | int ret; |
627 | | |
628 | 43 | auto src_seg_path = begin < 0 ? local_segment_path(_context.tablet_path, |
629 | 32 | _context.rowset_id.to_string(), seg_id) |
630 | 43 | : BetaRowset::local_segment_path_segcompacted( |
631 | 11 | _context.tablet_path, _context.rowset_id, begin, end); |
632 | 43 | auto src_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(src_seg_path); |
633 | 43 | auto dst_seg_path = local_segment_path(_context.tablet_path, _context.rowset_id.to_string(), |
634 | 43 | _num_segcompacted); |
635 | 43 | auto dst_index_path_prefix = InvertedIndexDescriptor::get_index_file_path_prefix(dst_seg_path); |
636 | | |
637 | 43 | if (_context.tablet_schema->get_inverted_index_storage_format() >= |
638 | 43 | InvertedIndexStorageFormatPB::V2) { |
639 | 22 | if (_context.tablet_schema->has_inverted_index() || |
640 | 22 | _context.tablet_schema->has_ann_index()) { |
641 | 22 | auto src_idx_path = |
642 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(src_index_path_prefix); |
643 | 22 | auto dst_idx_path = |
644 | 22 | InvertedIndexDescriptor::get_index_file_path_v2(dst_index_path_prefix); |
645 | | |
646 | 22 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
647 | 22 | if (ret) { |
648 | 0 | return Status::Error<ROWSET_RENAME_FILE_FAILED>( |
649 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, dst_idx_path, |
650 | 0 | ret, errno); |
651 | 0 | } |
652 | 22 | } |
653 | 22 | } |
654 | | // rename remaining inverted index files |
655 | 150 | for (auto column : _context.tablet_schema->columns()) { |
656 | 150 | auto index_infos = _context.tablet_schema->inverted_indexs(*column); |
657 | 150 | for (const auto& index_info : index_infos) { |
658 | 44 | auto index_id = index_info->index_id(); |
659 | 44 | if (_context.tablet_schema->get_inverted_index_storage_format() == |
660 | 44 | InvertedIndexStorageFormatPB::V1) { |
661 | 0 | auto src_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
662 | 0 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
663 | 0 | auto dst_idx_path = InvertedIndexDescriptor::get_index_file_path_v1( |
664 | 0 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
665 | 0 | VLOG_DEBUG << "segcompaction skip this index. rename " << src_idx_path << " to " |
666 | 0 | << dst_idx_path; |
667 | 0 | ret = rename(src_idx_path.c_str(), dst_idx_path.c_str()); |
668 | 0 | if (ret) { |
669 | 0 | return Status::Error<INVERTED_INDEX_RENAME_FILE_FAILED>( |
670 | 0 | "failed to rename {} to {}. ret:{}, errno:{}", src_idx_path, |
671 | 0 | dst_idx_path, ret, errno); |
672 | 0 | } |
673 | 0 | } |
674 | | // Erase the origin index file cache |
675 | 44 | auto src_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
676 | 44 | src_index_path_prefix, index_id, index_info->get_index_suffix()); |
677 | 44 | auto dst_idx_cache_key = InvertedIndexDescriptor::get_index_file_cache_key( |
678 | 44 | dst_index_path_prefix, index_id, index_info->get_index_suffix()); |
679 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(src_idx_cache_key)); |
680 | 44 | RETURN_IF_ERROR(InvertedIndexSearcherCache::instance()->erase(dst_idx_cache_key)); |
681 | 44 | } |
682 | 150 | } |
683 | 43 | return Status::OK(); |
684 | 43 | } |
685 | | |
686 | 1.19k | Status BetaRowsetWriter::_segcompaction_if_necessary() { |
687 | 1.19k | Status status = Status::OK(); |
688 | | // if not doing segcompaction, just check segment number |
689 | 1.19k | if (!config::enable_segcompaction || !_context.enable_segcompaction || |
690 | 1.19k | _context.tablet_schema->num_variant_columns() > 0) { |
691 | 1.07k | return _check_segment_number_limit(_num_segment); |
692 | 1.07k | } |
693 | | // leave _is_doing_segcompaction as the last condition |
694 | | // otherwise _segcompacting_cond will never get notified |
695 | 121 | if (_is_doing_segcompaction.exchange(true)) { |
696 | 0 | return status; |
697 | 0 | } |
698 | 121 | if (_segcompaction_status.load() != OK) { |
699 | 0 | status = Status::Error<SEGCOMPACTION_FAILED>( |
700 | 0 | "BetaRowsetWriter::_segcompaction_if_necessary meet invalid state, error code: {}", |
701 | 0 | _segcompaction_status.load()); |
702 | 121 | } else { |
703 | 121 | status = _check_segment_number_limit(_num_segcompacted); |
704 | 121 | } |
705 | 121 | if (status.ok() && (_num_segment - _segcompacted_point) >= config::segcompaction_batch_size) { |
706 | 15 | SegCompactionCandidatesSharedPtr segments; |
707 | 15 | status = _find_longest_consecutive_small_segment(segments); |
708 | 15 | if (LIKELY(status.ok()) && (!segments->empty())) { |
709 | 11 | LOG(INFO) << "submit segcompaction task, tablet_id:" << _context.tablet_id |
710 | 11 | << " rowset_id:" << _context.rowset_id << " segment num:" << _num_segment |
711 | 11 | << ", segcompacted_point:" << _segcompacted_point; |
712 | 11 | status = _engine.submit_seg_compaction_task(_segcompaction_worker, segments); |
713 | 11 | if (status.ok()) { |
714 | 11 | return status; |
715 | 11 | } |
716 | 11 | } |
717 | 15 | } |
718 | 110 | { |
719 | 110 | std::lock_guard lk(_is_doing_segcompaction_lock); |
720 | 110 | _is_doing_segcompaction = false; |
721 | 110 | _segcompacting_cond.notify_all(); |
722 | 110 | } |
723 | 110 | return status; |
724 | 121 | } |
725 | | |
726 | 631 | Status BetaRowsetWriter::_segcompaction_rename_last_segments() { |
727 | 631 | DCHECK_EQ(_is_doing_segcompaction, false); |
728 | 631 | if (!config::enable_segcompaction) { |
729 | 144 | return Status::OK(); |
730 | 144 | } |
731 | 487 | if (_segcompaction_status.load() != OK) { |
732 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
733 | 0 | "BetaRowsetWriter::_segcompaction_rename_last_segments meet invalid state, error " |
734 | 0 | "code: {}", |
735 | 0 | _segcompaction_status.load()); |
736 | 0 | } |
737 | 487 | if (!is_segcompacted() || _segcompacted_point == _num_segment) { |
738 | | // no need if never segcompact before or all segcompacted |
739 | 478 | return Status::OK(); |
740 | 478 | } |
741 | | // currently we only rename remaining segments to reduce wait time |
742 | | // so that transaction can be committed ASAP |
743 | 9 | VLOG_DEBUG << "segcompaction last few segments"; |
744 | 39 | for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) { |
745 | 30 | auto dst_segid = _num_segcompacted.load(); |
746 | 30 | RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++)); |
747 | 30 | if (_segcompaction_worker->need_convert_delete_bitmap()) { |
748 | 16 | _segcompaction_worker->convert_segment_delete_bitmap( |
749 | 16 | _context.mow_context->delete_bitmap, segid, dst_segid); |
750 | 16 | } |
751 | 30 | } |
752 | 9 | return Status::OK(); |
753 | 9 | } |
754 | | |
755 | 9 | Status BaseBetaRowsetWriter::add_rowset(RowsetSharedPtr rowset) { |
756 | 9 | assert(rowset->rowset_meta()->rowset_type() == BETA_ROWSET); |
757 | 9 | RETURN_IF_ERROR(rowset->link_files_to(_context.tablet_path, _context.rowset_id)); |
758 | 9 | _num_rows_written += rowset->num_rows(); |
759 | 9 | const auto& rowset_meta = rowset->rowset_meta(); |
760 | 9 | auto index_size = rowset_meta->index_disk_size(); |
761 | 9 | auto total_size = rowset_meta->total_disk_size(); |
762 | 9 | auto data_size = rowset_meta->data_disk_size(); |
763 | | // corrupted index size caused by bug before 2.1.5 or 3.0.0 version |
764 | | // try to get real index size from disk. |
765 | 9 | if (index_size < 0 || index_size > total_size * 2) { |
766 | 0 | LOG(ERROR) << "invalid index size:" << index_size << " total size:" << total_size |
767 | 0 | << " data size:" << data_size << " tablet:" << rowset_meta->tablet_id() |
768 | 0 | << " rowset:" << rowset_meta->rowset_id(); |
769 | 0 | index_size = 0; |
770 | 0 | auto st = rowset->get_inverted_index_size(&index_size); |
771 | 0 | if (!st.ok()) { |
772 | 0 | if (!st.is<NOT_FOUND>()) { |
773 | 0 | LOG(ERROR) << "failed to get inverted index size. res=" << st; |
774 | 0 | return st; |
775 | 0 | } |
776 | 0 | } |
777 | 0 | } |
778 | 9 | _total_data_size += data_size; |
779 | 9 | _total_index_size += index_size; |
780 | 9 | _num_segment += cast_set<int32_t>(rowset->num_segments()); |
781 | | // append key_bounds to current rowset |
782 | 9 | RETURN_IF_ERROR(rowset->get_segments_key_bounds(&_segments_encoded_key_bounds)); |
783 | 9 | rowset->get_num_segment_rows(&_segment_num_rows); |
784 | 9 | _segments_key_bounds_truncated = rowset->rowset_meta()->is_segments_key_bounds_truncated(); |
785 | | |
786 | | // TODO update zonemap |
787 | 9 | if (rowset->rowset_meta()->has_delete_predicate()) { |
788 | 0 | _rowset_meta->set_delete_predicate(rowset->rowset_meta()->delete_predicate()); |
789 | 0 | } |
790 | | // Update the tablet schema in the rowset metadata if the tablet schema contains a variant. |
791 | | // During the build process, _context.tablet_schema will be used as the rowset schema. |
792 | | // This situation may arise in the event of a linked schema change. If this schema is not set, |
793 | | // the subcolumns of the variant will be lost. |
794 | 9 | if (_context.tablet_schema->num_variant_columns() > 0 && rowset->tablet_schema() != nullptr) { |
795 | 0 | _context.tablet_schema = rowset->tablet_schema(); |
796 | 0 | } |
797 | 9 | return Status::OK(); |
798 | 9 | } |
799 | | |
800 | 0 | Status BaseBetaRowsetWriter::add_rowset_for_linked_schema_change(RowsetSharedPtr rowset) { |
801 | | // TODO use schema_mapping to transfer zonemap |
802 | 0 | return add_rowset(rowset); |
803 | 0 | } |
804 | | |
805 | 1.13k | Status BaseBetaRowsetWriter::flush() { |
806 | 1.13k | return _segment_creator.flush(); |
807 | 1.13k | } |
808 | | |
809 | 12 | Status BaseBetaRowsetWriter::flush_memtable(Block* block, int32_t segment_id, int64_t* flush_size) { |
810 | 12 | if (block->rows() == 0) { |
811 | 0 | return Status::OK(); |
812 | 0 | } |
813 | | |
814 | 12 | { |
815 | 12 | SCOPED_RAW_TIMER(&_segment_writer_ns); |
816 | 12 | RETURN_IF_ERROR(_segment_creator.flush_single_block(block, segment_id, flush_size)); |
817 | 12 | } |
818 | 12 | return Status::OK(); |
819 | 12 | } |
820 | | |
821 | 0 | Status BaseBetaRowsetWriter::flush_single_block(const Block* block) { |
822 | 0 | return _segment_creator.flush_single_block(block); |
823 | 0 | } |
824 | | |
825 | 984 | Status BetaRowsetWriter::_wait_flying_segcompaction() { |
826 | 984 | std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); |
827 | 984 | uint64_t begin_wait = GetCurrentTimeMicros(); |
828 | 984 | while (_is_doing_segcompaction) { |
829 | | // change sync wait to async? |
830 | 0 | _segcompacting_cond.wait(l); |
831 | 0 | } |
832 | 984 | uint64_t elapsed = GetCurrentTimeMicros() - begin_wait; |
833 | 984 | if (elapsed >= MICROS_PER_SEC) { |
834 | 0 | LOG(INFO) << "wait flying segcompaction finish time:" << elapsed << "us"; |
835 | 0 | } |
836 | 984 | if (_segcompaction_status.load() != OK) { |
837 | 0 | return Status::Error<SEGCOMPACTION_FAILED>( |
838 | 0 | "BetaRowsetWriter meet invalid state, error code: {}", |
839 | 0 | _segcompaction_status.load()); |
840 | 0 | } |
841 | 984 | return Status::OK(); |
842 | 984 | } |
843 | | |
844 | 24 | RowsetSharedPtr BaseBetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { |
845 | 24 | if (_rowset_meta->newest_write_timestamp() == -1) { |
846 | 0 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
847 | 0 | } |
848 | | |
849 | 24 | build_rowset_meta_with_spec_field(*_rowset_meta, *spec_rowset_meta); |
850 | 24 | RowsetSharedPtr rowset; |
851 | 24 | auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, |
852 | 24 | _rowset_meta, &rowset); |
853 | 24 | if (!status.ok()) { |
854 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
855 | 0 | return nullptr; |
856 | 0 | } |
857 | 24 | _already_built = true; |
858 | 24 | return rowset; |
859 | 24 | } |
860 | | |
861 | 631 | Status BaseBetaRowsetWriter::_close_file_writers() { |
862 | | // Flush and close segment files |
863 | 631 | RETURN_NOT_OK_STATUS_WITH_WARN(_segment_creator.close(), |
864 | 631 | "failed to close segment creator when build new rowset"); |
865 | 631 | return Status::OK(); |
866 | 631 | } |
867 | | |
868 | 631 | Status BetaRowsetWriter::_close_file_writers() { |
869 | 631 | RETURN_IF_ERROR(BaseBetaRowsetWriter::_close_file_writers()); |
870 | | // if _segment_start_id is not zero, that means it's a transient rowset writer for |
871 | | // MoW partial update, don't need to do segment compaction. |
872 | 631 | if (_segment_start_id == 0) { |
873 | 631 | if (_segcompaction_worker->cancel()) { |
874 | 631 | std::lock_guard lk(_is_doing_segcompaction_lock); |
875 | 631 | _is_doing_segcompaction = false; |
876 | 631 | _segcompacting_cond.notify_all(); |
877 | 631 | } else { |
878 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_wait_flying_segcompaction(), |
879 | 0 | "segcompaction failed when build new rowset"); |
880 | 0 | } |
881 | 631 | RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_rename_last_segments(), |
882 | 631 | "rename last segments failed when build new rowset"); |
883 | | // segcompaction worker would do file wrier's close function in compact_segments |
884 | 631 | if (auto& seg_comp_file_writer = _segcompaction_worker->get_file_writer(); |
885 | 631 | nullptr != seg_comp_file_writer && |
886 | 631 | seg_comp_file_writer->state() != io::FileWriter::State::CLOSED) { |
887 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(seg_comp_file_writer->close(), |
888 | 0 | "close segment compaction worker failed"); |
889 | 0 | } |
890 | | // process delete bitmap for mow table |
891 | 631 | if (is_segcompacted() && _segcompaction_worker->need_convert_delete_bitmap()) { |
892 | 4 | auto converted_delete_bitmap = _segcompaction_worker->get_converted_delete_bitmap(); |
893 | | // which means the segment compaction is triggerd |
894 | 4 | if (converted_delete_bitmap != nullptr) { |
895 | 4 | RowsetIdUnorderedSet rowsetids; |
896 | 4 | rowsetids.insert(rowset_id()); |
897 | 4 | context().tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(), |
898 | 4 | rowsetids); |
899 | 4 | context().mow_context->delete_bitmap->remove({rowset_id(), 0, 0}, |
900 | 4 | {rowset_id(), UINT32_MAX, INT64_MAX}); |
901 | 4 | context().mow_context->delete_bitmap->merge(*converted_delete_bitmap); |
902 | 4 | } |
903 | 4 | } |
904 | 631 | } |
905 | 631 | return Status::OK(); |
906 | 631 | } |
907 | | |
908 | 953 | Status BetaRowsetWriter::build(RowsetSharedPtr& rowset) { |
909 | 953 | if (_calc_delete_bitmap_token != nullptr) { |
910 | 8 | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
911 | 8 | } |
912 | 953 | RETURN_IF_ERROR(_close_file_writers()); |
913 | 953 | const auto total_segment_num = _num_segment - _segcompacted_point + 1 + _num_segcompacted; |
914 | 953 | RETURN_NOT_OK_STATUS_WITH_WARN(_check_segment_number_limit(total_segment_num), |
915 | 953 | "too many segments when build new rowset"); |
916 | 953 | RETURN_IF_ERROR(_build_rowset_meta(_rowset_meta.get(), true)); |
917 | 953 | if (_is_pending) { |
918 | 25 | _rowset_meta->set_rowset_state(COMMITTED); |
919 | 928 | } else { |
920 | 928 | _rowset_meta->set_rowset_state(VISIBLE); |
921 | 928 | } |
922 | | |
923 | 953 | if (_rowset_meta->newest_write_timestamp() == -1) { |
924 | 616 | _rowset_meta->set_newest_write_timestamp(UnixSeconds()); |
925 | 616 | } |
926 | | |
927 | 953 | _rowset_meta->set_tablet_schema(_context.tablet_schema); |
928 | | |
929 | | // If segment compaction occurs, the idx file info will become inaccurate. |
930 | 953 | if ((_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) && |
931 | 953 | _num_segcompacted == 0) { |
932 | 106 | if (auto idx_files_info = _idx_files.inverted_index_file_info(_segment_start_id); |
933 | 106 | !idx_files_info.has_value()) [[unlikely]] { |
934 | 0 | LOG(ERROR) << "expected inverted index files info, but none presents: " |
935 | 0 | << idx_files_info.error(); |
936 | 106 | } else { |
937 | 106 | _rowset_meta->add_inverted_index_files_info(idx_files_info.value()); |
938 | 106 | } |
939 | 106 | } |
940 | | |
941 | 953 | RETURN_NOT_OK_STATUS_WITH_WARN( |
942 | 953 | RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, _rowset_meta, |
943 | 953 | &rowset), |
944 | 953 | "rowset init failed when build new rowset"); |
945 | 953 | _already_built = true; |
946 | 953 | return Status::OK(); |
947 | 953 | } |
948 | | |
949 | 0 | int64_t BaseBetaRowsetWriter::_num_seg() const { |
950 | 0 | return _num_segment; |
951 | 0 | } |
952 | | |
953 | 1.01k | int64_t BetaRowsetWriter::_num_seg() const { |
954 | 1.01k | return is_segcompacted() ? _num_segcompacted : _num_segment; |
955 | 1.01k | } |
956 | | |
957 | 1.01k | Status BaseBetaRowsetWriter::_build_rowset_meta(RowsetMeta* rowset_meta, bool check_segment_num) { |
958 | 1.01k | int64_t num_rows_written = 0; |
959 | 1.01k | int64_t total_data_size = 0; |
960 | 1.01k | int64_t total_index_size = 0; |
961 | 1.01k | std::vector<KeyBoundsPB> segments_encoded_key_bounds; |
962 | 1.01k | std::vector<uint32_t> segment_rows; |
963 | 1.01k | { |
964 | 1.01k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
965 | 1.49k | for (const auto& itr : _segid_statistics_map) { |
966 | 1.49k | num_rows_written += itr.second.row_num; |
967 | 1.49k | total_data_size += itr.second.data_size; |
968 | 1.49k | total_index_size += itr.second.index_size; |
969 | 1.49k | segments_encoded_key_bounds.push_back(itr.second.key_bounds); |
970 | | // segcompaction don't modify _segment_num_rows, so we need to get segment rows from _segid_statistics_map for load |
971 | 1.49k | segment_rows.push_back(cast_set<uint32_t>(itr.second.row_num)); |
972 | 1.49k | } |
973 | 1.01k | } |
974 | 1.01k | if (segment_rows.empty()) { |
975 | | // vertical compaction and linked schema change will not record segment statistics, |
976 | | // it will record segment rows in _segment_num_rows |
977 | 408 | RETURN_IF_ERROR(get_segment_num_rows(&segment_rows)); |
978 | 408 | } |
979 | | |
980 | 4.00k | for (auto& key_bound : _segments_encoded_key_bounds) { |
981 | 4.00k | segments_encoded_key_bounds.push_back(key_bound); |
982 | 4.00k | } |
983 | 1.01k | if (_segments_key_bounds_truncated.has_value()) { |
984 | 9 | rowset_meta->set_segments_key_bounds_truncated(_segments_key_bounds_truncated.value()); |
985 | 9 | } |
986 | 1.01k | rowset_meta->set_num_segment_rows(segment_rows); |
987 | | // segment key bounds are empty in old version(before version 1.2.x). So we should not modify |
988 | | // the overlap property when key bounds are empty. |
989 | | // for mow table with cluster keys, the overlap is used for cluster keys, |
990 | | // the key_bounds is primary keys |
991 | 1.01k | if (!segments_encoded_key_bounds.empty() && |
992 | 1.01k | !is_segment_overlapping(segments_encoded_key_bounds) && |
993 | 1.01k | _context.tablet_schema->cluster_key_uids().empty()) { |
994 | 551 | rowset_meta->set_segments_overlap(NONOVERLAPPING); |
995 | 551 | } |
996 | | |
997 | 1.01k | auto segment_num = _num_seg(); |
998 | 1.01k | if (check_segment_num && config::check_segment_when_build_rowset_meta) { |
999 | 0 | auto segments_encoded_key_bounds_size = segments_encoded_key_bounds.size(); |
1000 | 0 | if (segments_encoded_key_bounds_size != segment_num) { |
1001 | 0 | return Status::InternalError( |
1002 | 0 | "segments_encoded_key_bounds_size should equal to _num_seg, " |
1003 | 0 | "segments_encoded_key_bounds_size " |
1004 | 0 | "is: {}, _num_seg is: {}", |
1005 | 0 | segments_encoded_key_bounds_size, segment_num); |
1006 | 0 | } |
1007 | 0 | if (segment_rows.size() != segment_num) { |
1008 | 0 | return Status::InternalError( |
1009 | 0 | "segment_rows size should equal to _num_seg, segment_rows size is: {}, " |
1010 | 0 | "_num_seg is {}, tablet={}, rowset={}, txn={}", |
1011 | 0 | segment_rows.size(), segment_num, _context.tablet_id, |
1012 | 0 | _context.rowset_id.to_string(), _context.txn_id); |
1013 | 0 | } |
1014 | 0 | } |
1015 | | |
1016 | 1.01k | rowset_meta->set_num_segments(segment_num); |
1017 | 1.01k | rowset_meta->set_num_rows(num_rows_written + _num_rows_written); |
1018 | 1.01k | rowset_meta->set_total_disk_size(total_data_size + _total_data_size + total_index_size + |
1019 | 1.01k | _total_index_size); |
1020 | 1.01k | rowset_meta->set_data_disk_size(total_data_size + _total_data_size); |
1021 | 1.01k | rowset_meta->set_index_disk_size(total_index_size + _total_index_size); |
1022 | 1.01k | rowset_meta->set_segments_key_bounds(segments_encoded_key_bounds); |
1023 | | // TODO write zonemap to meta |
1024 | 1.01k | rowset_meta->set_empty((num_rows_written + _num_rows_written) == 0); |
1025 | 1.01k | rowset_meta->set_creation_time(time(nullptr)); |
1026 | 1.01k | return Status::OK(); |
1027 | 1.01k | } |
1028 | | |
1029 | 63 | Status BaseBetaRowsetWriter::_build_tmp(RowsetSharedPtr& rowset_ptr) { |
1030 | 63 | Status status; |
1031 | 63 | std::shared_ptr<RowsetMeta> tmp_rs_meta = std::make_shared<RowsetMeta>(); |
1032 | 63 | tmp_rs_meta->init(_rowset_meta.get()); |
1033 | | |
1034 | 63 | status = _build_rowset_meta(tmp_rs_meta.get()); |
1035 | 63 | if (!status.ok()) { |
1036 | 0 | LOG(WARNING) << "failed to build rowset meta, res=" << status; |
1037 | 0 | return status; |
1038 | 0 | } |
1039 | | |
1040 | 63 | status = RowsetFactory::create_rowset(_context.tablet_schema, _context.tablet_path, tmp_rs_meta, |
1041 | 63 | &rowset_ptr); |
1042 | 63 | DBUG_EXECUTE_IF("BaseBetaRowsetWriter::_build_tmp.create_rowset_failed", |
1043 | 63 | { status = Status::InternalError("create rowset failed"); }); |
1044 | 63 | if (!status.ok()) { |
1045 | 0 | LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; |
1046 | 0 | return status; |
1047 | 0 | } |
1048 | 63 | return Status::OK(); |
1049 | 63 | } |
1050 | | |
1051 | | Status BaseBetaRowsetWriter::_create_file_writer(const std::string& path, |
1052 | | io::FileWriterPtr& file_writer, |
1053 | 5.49k | bool is_index_file) { |
1054 | 5.49k | io::FileWriterOptions opts = _context.get_file_writer_options(is_index_file); |
1055 | 5.49k | Status st = _context.fs()->create_file(path, &file_writer, &opts); |
1056 | 5.49k | if (!st.ok()) { |
1057 | 0 | LOG(WARNING) << "failed to create writable file. path=" << path << ", err: " << st; |
1058 | 0 | return st; |
1059 | 0 | } |
1060 | | |
1061 | 5.49k | DCHECK(file_writer != nullptr); |
1062 | 5.49k | return Status::OK(); |
1063 | 5.49k | } |
1064 | | |
1065 | | Status BaseBetaRowsetWriter::create_file_writer(uint32_t segment_id, io::FileWriterPtr& file_writer, |
1066 | 5.47k | FileType file_type) { |
1067 | 5.47k | auto segment_path = _context.segment_path(segment_id); |
1068 | 5.47k | if (file_type == FileType::INVERTED_INDEX_FILE) { |
1069 | 255 | std::string prefix = |
1070 | 255 | std::string {InvertedIndexDescriptor::get_index_file_path_prefix(segment_path)}; |
1071 | 255 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1072 | 255 | return _create_file_writer(index_path, file_writer, true /* is_index_file */); |
1073 | 5.21k | } else if (file_type == FileType::SEGMENT_FILE) { |
1074 | 5.21k | return _create_file_writer(segment_path, file_writer, false /* is_index_file */); |
1075 | 5.21k | } |
1076 | 0 | return Status::Error<ErrorCode::INTERNAL_ERROR>( |
1077 | 0 | fmt::format("failed to create file = {}, file type = {}", segment_path, file_type)); |
1078 | 5.47k | } |
1079 | | |
1080 | | Status BaseBetaRowsetWriter::create_index_file_writer(uint32_t segment_id, |
1081 | 228 | IndexFileWriterPtr* index_file_writer) { |
1082 | 228 | RETURN_IF_ERROR(RowsetWriter::create_index_file_writer(segment_id, index_file_writer)); |
1083 | | // used for inverted index format v1 |
1084 | 228 | (*index_file_writer) |
1085 | 228 | ->set_file_writer_opts(_context.get_file_writer_options(true /* is_index_file */)); |
1086 | 228 | return Status::OK(); |
1087 | 228 | } |
1088 | | |
1089 | | Status BetaRowsetWriter::create_segment_writer_for_segcompaction( |
1090 | 12 | std::unique_ptr<segment_v2::SegmentWriter>* writer, int64_t begin, int64_t end) { |
1091 | 12 | DCHECK(begin >= 0 && end >= 0); |
1092 | 12 | std::string path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, |
1093 | 12 | _context.rowset_id, begin, end); |
1094 | 12 | io::FileWriterPtr file_writer; |
1095 | 12 | RETURN_IF_ERROR(_create_file_writer(path, file_writer, false /* is_index_file */)); |
1096 | | |
1097 | 12 | IndexFileWriterPtr index_file_writer; |
1098 | 12 | if (_context.tablet_schema->has_inverted_index() || _context.tablet_schema->has_ann_index()) { |
1099 | 8 | io::FileWriterPtr idx_file_writer; |
1100 | 8 | std::string prefix(InvertedIndexDescriptor::get_index_file_path_prefix(path)); |
1101 | 8 | if (_context.tablet_schema->get_inverted_index_storage_format() != |
1102 | 8 | InvertedIndexStorageFormatPB::V1) { |
1103 | 8 | std::string index_path = InvertedIndexDescriptor::get_index_file_path_v2(prefix); |
1104 | 8 | RETURN_IF_ERROR( |
1105 | 8 | _create_file_writer(index_path, idx_file_writer, true /* is_index_file */)); |
1106 | 8 | } |
1107 | 8 | index_file_writer = std::make_unique<IndexFileWriter>( |
1108 | 8 | _context.fs(), prefix, _context.rowset_id.to_string(), _num_segcompacted, |
1109 | 8 | _context.tablet_schema->get_inverted_index_storage_format(), |
1110 | 8 | std::move(idx_file_writer)); |
1111 | 8 | } |
1112 | | |
1113 | 12 | segment_v2::SegmentWriterOptions writer_options; |
1114 | 12 | writer_options.enable_unique_key_merge_on_write = _context.enable_unique_key_merge_on_write; |
1115 | 12 | writer_options.rowset_ctx = &_context; |
1116 | 12 | writer_options.write_type = _context.write_type; |
1117 | 12 | writer_options.write_type = DataWriteType::TYPE_COMPACTION; |
1118 | 12 | writer_options.max_rows_per_segment = _context.max_rows_per_segment; |
1119 | 12 | writer_options.mow_ctx = _context.mow_context; |
1120 | | |
1121 | 12 | *writer = std::make_unique<segment_v2::SegmentWriter>( |
1122 | 12 | file_writer.get(), _num_segcompacted, _context.tablet_schema, _context.tablet, |
1123 | 12 | _context.data_dir, writer_options, index_file_writer.get()); |
1124 | 12 | if (auto& seg_writer = _segcompaction_worker->get_file_writer(); |
1125 | 12 | seg_writer != nullptr && seg_writer->state() != io::FileWriter::State::CLOSED) { |
1126 | 0 | RETURN_IF_ERROR(_segcompaction_worker->get_file_writer()->close()); |
1127 | 0 | } |
1128 | 12 | _segcompaction_worker->get_file_writer().reset(file_writer.release()); |
1129 | 12 | if (auto& idx_file_writer = _segcompaction_worker->get_inverted_index_file_writer(); |
1130 | 12 | idx_file_writer != nullptr) { |
1131 | 0 | RETURN_IF_ERROR(idx_file_writer->begin_close()); |
1132 | 0 | RETURN_IF_ERROR(idx_file_writer->finish_close()); |
1133 | 0 | } |
1134 | 12 | _segcompaction_worker->get_inverted_index_file_writer().reset(index_file_writer.release()); |
1135 | 12 | return Status::OK(); |
1136 | 12 | } |
1137 | | |
1138 | 0 | Status BaseBetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1139 | 0 | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1140 | 0 | { segnum = dp->param("segnum", 1024); }); |
1141 | 0 | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1142 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1143 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, " |
1144 | 0 | "_num_segment:{}, rowset_num_rows:{}. Please check if the bucket number is too " |
1145 | 0 | "small or if the data is skewed.", |
1146 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1147 | 0 | config::max_segment_num_per_rowset, _num_segment, get_rowset_num_rows()); |
1148 | 0 | } |
1149 | 0 | return Status::OK(); |
1150 | 0 | } |
1151 | | |
1152 | 2.14k | Status BetaRowsetWriter::_check_segment_number_limit(size_t segnum) { |
1153 | 2.14k | DBUG_EXECUTE_IF("BetaRowsetWriter._check_segment_number_limit_too_many_segments", |
1154 | 2.14k | { segnum = dp->param("segnum", 1024); }); |
1155 | 2.14k | if (UNLIKELY(segnum > config::max_segment_num_per_rowset)) { |
1156 | 0 | return Status::Error<TOO_MANY_SEGMENTS>( |
1157 | 0 | "too many segments in rowset. tablet_id:{}, rowset_id:{}, max:{}, _num_segment:{}, " |
1158 | 0 | "_segcompacted_point:{}, _num_segcompacted:{}, rowset_num_rows:{}. Please check if " |
1159 | 0 | "the bucket number is too small or if the data is skewed.", |
1160 | 0 | _context.tablet_id, _context.rowset_id.to_string(), |
1161 | 0 | config::max_segment_num_per_rowset, _num_segment, _segcompacted_point, |
1162 | 0 | _num_segcompacted, get_rowset_num_rows()); |
1163 | 0 | } |
1164 | 2.14k | return Status::OK(); |
1165 | 2.14k | } |
1166 | | |
1167 | 1.19k | Status BaseBetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1168 | 1.19k | uint32_t segid_offset = segment_id - _segment_start_id; |
1169 | 1.19k | { |
1170 | 1.19k | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1171 | 1.19k | CHECK_EQ(_segid_statistics_map.find(segment_id) == _segid_statistics_map.end(), true); |
1172 | 1.19k | _segid_statistics_map.emplace(segment_id, segstat); |
1173 | 1.19k | if (segment_id >= _segment_num_rows.size()) { |
1174 | 1.19k | _segment_num_rows.resize(segment_id + 1); |
1175 | 1.19k | } |
1176 | 1.19k | _segment_num_rows[segid_offset] = cast_set<uint32_t>(segstat.row_num); |
1177 | 1.19k | } |
1178 | 1.19k | VLOG_DEBUG << "_segid_statistics_map add new record. segment_id:" << segment_id |
1179 | 0 | << " row_num:" << segstat.row_num << " data_size:" << segstat.data_size |
1180 | 0 | << " index_size:" << segstat.index_size; |
1181 | | |
1182 | 1.19k | { |
1183 | 1.19k | std::lock_guard<std::mutex> lock(_segment_set_mutex); |
1184 | 1.19k | _segment_set.add(segid_offset); |
1185 | 2.38k | while (_segment_set.contains(_num_segment)) { |
1186 | 1.19k | _num_segment++; |
1187 | 1.19k | } |
1188 | 1.19k | } |
1189 | | |
1190 | 1.19k | if (_context.mow_context != nullptr) { |
1191 | 63 | RETURN_IF_ERROR(_generate_delete_bitmap(segment_id)); |
1192 | 63 | } |
1193 | 1.19k | return Status::OK(); |
1194 | 1.19k | } |
1195 | | |
1196 | 1.19k | Status BetaRowsetWriter::add_segment(uint32_t segment_id, const SegmentStatistics& segstat) { |
1197 | 1.19k | RETURN_IF_ERROR(BaseBetaRowsetWriter::add_segment(segment_id, segstat)); |
1198 | 1.19k | return _segcompaction_if_necessary(); |
1199 | 1.19k | } |
1200 | | |
1201 | | Status BetaRowsetWriter::flush_segment_writer_for_segcompaction( |
1202 | | std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t index_size, |
1203 | 11 | KeyBoundsPB& key_bounds) { |
1204 | 11 | uint32_t segid = (*writer)->get_segment_id(); |
1205 | 11 | uint32_t row_num = (*writer)->row_count(); |
1206 | 11 | uint64_t segment_size; |
1207 | | |
1208 | 11 | auto s = (*writer)->finalize_footer(&segment_size); |
1209 | 11 | if (!s.ok()) { |
1210 | 0 | return Status::Error<WRITER_DATA_WRITE_ERROR>("failed to finalize segment: {}", |
1211 | 0 | s.to_string()); |
1212 | 0 | } |
1213 | 11 | int64_t inverted_index_file_size = 0; |
1214 | 11 | RETURN_IF_ERROR((*writer)->close_inverted_index(&inverted_index_file_size)); |
1215 | | |
1216 | 11 | SegmentStatistics segstat; |
1217 | 11 | segstat.row_num = row_num; |
1218 | 11 | segstat.data_size = segment_size; |
1219 | 11 | segstat.index_size = inverted_index_file_size; |
1220 | 11 | segstat.key_bounds = key_bounds; |
1221 | 11 | { |
1222 | 11 | std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); |
1223 | 11 | CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); |
1224 | 11 | _segid_statistics_map.emplace(segid, segstat); |
1225 | 11 | } |
1226 | 11 | VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num |
1227 | 0 | << " data_size:" << PrettyPrinter::print_bytes(segment_size) |
1228 | 0 | << " index_size:" << PrettyPrinter::print_bytes(inverted_index_file_size); |
1229 | | |
1230 | 11 | writer->reset(); |
1231 | | |
1232 | 11 | return Status::OK(); |
1233 | 11 | } |
1234 | | |
1235 | | #include "common/compile_check_end.h" |
1236 | | } // namespace doris |