/root/doris/be/src/olap/delta_writer.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "olap/delta_writer.h" |
19 | | |
20 | | #include <brpc/controller.h> |
21 | | #include <butil/errno.h> |
22 | | #include <fmt/format.h> |
23 | | #include <gen_cpp/internal_service.pb.h> |
24 | | #include <gen_cpp/olap_file.pb.h> |
25 | | |
26 | | #include <filesystem> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <utility> |
30 | | |
31 | | // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
32 | | #include "common/compiler_util.h" // IWYU pragma: keep |
33 | | #include "common/config.h" |
34 | | #include "common/logging.h" |
35 | | #include "common/status.h" |
36 | | #include "exec/tablet_info.h" |
37 | | #include "gutil/integral_types.h" |
38 | | #include "gutil/strings/numbers.h" |
39 | | #include "io/fs/file_writer.h" // IWYU pragma: keep |
40 | | #include "olap/data_dir.h" |
41 | | #include "olap/memtable.h" |
42 | | #include "olap/memtable_flush_executor.h" |
43 | | #include "olap/olap_define.h" |
44 | | #include "olap/rowset/beta_rowset.h" |
45 | | #include "olap/rowset/beta_rowset_writer.h" |
46 | | #include "olap/rowset/rowset_meta.h" |
47 | | #include "olap/rowset/rowset_writer.h" |
48 | | #include "olap/rowset/rowset_writer_context.h" |
49 | | #include "olap/rowset/segment_v2/inverted_index_desc.h" |
50 | | #include "olap/rowset/segment_v2/segment.h" |
51 | | #include "olap/schema.h" |
52 | | #include "olap/schema_change.h" |
53 | | #include "olap/storage_engine.h" |
54 | | #include "olap/tablet_manager.h" |
55 | | #include "olap/tablet_meta.h" |
56 | | #include "olap/txn_manager.h" |
57 | | #include "runtime/exec_env.h" |
58 | | #include "runtime/load_channel_mgr.h" |
59 | | #include "runtime/memory/mem_tracker.h" |
60 | | #include "service/backend_options.h" |
61 | | #include "util/brpc_client_cache.h" |
62 | | #include "util/mem_info.h" |
63 | | #include "util/ref_count_closure.h" |
64 | | #include "util/stopwatch.hpp" |
65 | | #include "util/time.h" |
66 | | #include "vec/core/block.h" |
67 | | |
68 | | namespace doris { |
69 | | using namespace ErrorCode; |
70 | | |
71 | | Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, RuntimeProfile* profile, |
72 | 8 | const UniqueId& load_id) { |
73 | 8 | *writer = new DeltaWriter(req, StorageEngine::instance(), profile, load_id); |
74 | 8 | return Status::OK(); |
75 | 8 | } |
76 | | |
77 | | DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile, |
78 | | const UniqueId& load_id) |
79 | | : _req(*req), |
80 | | _tablet(nullptr), |
81 | | _cur_rowset(nullptr), |
82 | | _rowset_writer(nullptr), |
83 | | _tablet_schema(new TabletSchema), |
84 | | _delta_written_success(false), |
85 | | _storage_engine(storage_engine), |
86 | 8 | _load_id(load_id) { |
87 | 8 | _init_profile(profile); |
88 | 8 | } |
89 | | |
90 | 8 | void DeltaWriter::_init_profile(RuntimeProfile* profile) { |
91 | 8 | _profile = profile->create_child(fmt::format("DeltaWriter {}", _req.tablet_id), true, true); |
92 | 8 | _lock_timer = ADD_TIMER(_profile, "LockTime"); |
93 | 8 | _sort_timer = ADD_TIMER(_profile, "MemTableSortTime"); |
94 | 8 | _agg_timer = ADD_TIMER(_profile, "MemTableAggTime"); |
95 | 8 | _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime"); |
96 | 8 | _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime"); |
97 | 8 | _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime"); |
98 | 8 | _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime"); |
99 | 8 | _delete_bitmap_timer = ADD_TIMER(_profile, "MemTableDeleteBitmapTime"); |
100 | 8 | _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime"); |
101 | 8 | _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT); |
102 | 8 | _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT); |
103 | 8 | _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT); |
104 | 8 | _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT); |
105 | 8 | _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT); |
106 | 8 | } |
107 | | |
108 | 8 | DeltaWriter::~DeltaWriter() { |
109 | 8 | if (_is_init && !_delta_written_success) { |
110 | 0 | _garbage_collection(); |
111 | 0 | } |
112 | | |
113 | 8 | if (!_is_init) { |
114 | 0 | return; |
115 | 0 | } |
116 | | |
117 | 8 | if (_flush_token != nullptr) { |
118 | | // cancel and wait all memtables in flush queue to be finished |
119 | 8 | _flush_token->cancel(); |
120 | | |
121 | 8 | if (_tablet != nullptr) { |
122 | 8 | const FlushStatistic& stat = _flush_token->get_stats(); |
123 | 8 | _tablet->flush_bytes->increment(stat.flush_size_bytes); |
124 | 8 | _tablet->flush_finish_count->increment(stat.flush_finish_count); |
125 | 8 | } |
126 | 8 | } |
127 | | |
128 | 8 | if (_calc_delete_bitmap_token != nullptr) { |
129 | 8 | _calc_delete_bitmap_token->cancel(); |
130 | 8 | } |
131 | | |
132 | 8 | _mem_table.reset(); |
133 | 8 | } |
134 | | |
135 | 0 | void DeltaWriter::_garbage_collection() { |
136 | 0 | Status rollback_status = Status::OK(); |
137 | 0 | TxnManager* txn_mgr = _storage_engine->txn_manager(); |
138 | 0 | if (_tablet != nullptr) { |
139 | 0 | rollback_status = txn_mgr->rollback_txn(_req.partition_id, _tablet, _req.txn_id); |
140 | 0 | } |
141 | | // has to check rollback status, because the rowset maybe committed in this thread and |
142 | | // published in another thread, then rollback will failed. |
143 | | // when rollback failed should not delete rowset |
144 | 0 | if (rollback_status.ok()) { |
145 | 0 | _storage_engine->add_unused_rowset(_cur_rowset); |
146 | 0 | } |
147 | 0 | } |
148 | | |
149 | 8 | Status DeltaWriter::init() { |
150 | 8 | TabletManager* tablet_mgr = _storage_engine->tablet_manager(); |
151 | 8 | _tablet = tablet_mgr->get_tablet(_req.tablet_id); |
152 | 8 | if (_tablet == nullptr) { |
153 | 0 | return Status::Error<TABLE_NOT_FOUND>("fail to find tablet. tablet_id={}, schema_hash={}", |
154 | 0 | _req.tablet_id, _req.schema_hash); |
155 | 0 | } |
156 | | |
157 | | // get rowset ids snapshot |
158 | 8 | if (_tablet->enable_unique_key_merge_on_write()) { |
159 | 2 | std::lock_guard<std::shared_mutex> lck(_tablet->get_header_lock()); |
160 | 2 | _cur_max_version = _tablet->max_version_unlocked().second; |
161 | | // tablet is under alter process. The delete bitmap will be calculated after conversion. |
162 | 2 | if (_tablet->tablet_state() == TABLET_NOTREADY) { |
163 | | // Disable 'partial_update' when the tablet is undergoing a 'schema changing process' |
164 | 0 | if (_req.table_schema_param->is_partial_update()) { |
165 | 0 | return Status::InternalError( |
166 | 0 | "Unable to do 'partial_update' when " |
167 | 0 | "the tablet is undergoing a 'schema changing process'"); |
168 | 0 | } |
169 | 0 | _rowset_ids.clear(); |
170 | 2 | } else { |
171 | 2 | RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version, &_rowset_ids)); |
172 | 2 | } |
173 | 2 | _rowset_ptrs = _tablet->get_rowset_by_ids(&_rowset_ids); |
174 | 2 | } |
175 | | |
176 | | // check tablet version number |
177 | 8 | if (!config::disable_auto_compaction && |
178 | 8 | !_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction() && |
179 | 8 | _tablet->exceed_version_limit(config::max_tablet_version_num - 100) && |
180 | 8 | !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) { |
181 | | //trigger compaction |
182 | 0 | StorageEngine::instance()->submit_compaction_task( |
183 | 0 | _tablet, CompactionType::CUMULATIVE_COMPACTION, true); |
184 | 0 | if (_tablet->version_count() > config::max_tablet_version_num) { |
185 | 0 | return Status::Error<TOO_MANY_VERSION>( |
186 | 0 | "failed to init delta writer. version count: {}, exceed limit: {}, tablet: {}", |
187 | 0 | _tablet->version_count(), config::max_tablet_version_num, _tablet->full_name()); |
188 | 0 | } |
189 | 0 | } |
190 | | |
191 | 8 | int version_count = _tablet->version_count() + _tablet->stale_version_count(); |
192 | 8 | if (_tablet->avg_rs_meta_serialize_size() * version_count > |
193 | 8 | config::tablet_meta_serialize_size_limit) { |
194 | 0 | return Status::Error<TOO_MANY_VERSION>( |
195 | 0 | "failed to init rowset builder. meta serialize size : {}, exceed limit: {}, " |
196 | 0 | "tablet: {}", |
197 | 0 | _tablet->avg_rs_meta_serialize_size() * version_count, |
198 | 0 | config::tablet_meta_serialize_size_limit, _tablet->tablet_id()); |
199 | 0 | } |
200 | | |
201 | 8 | { |
202 | 8 | std::shared_lock base_migration_rlock(_tablet->get_migration_lock(), std::defer_lock); |
203 | 8 | if (!base_migration_rlock.try_lock_for( |
204 | 8 | std::chrono::milliseconds(config::migration_lock_timeout_ms))) { |
205 | 0 | return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms", |
206 | 0 | config::migration_lock_timeout_ms); |
207 | 0 | } |
208 | 8 | std::lock_guard<std::mutex> push_lock(_tablet->get_push_lock()); |
209 | 8 | RETURN_IF_ERROR(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet, |
210 | 8 | _req.txn_id, _req.load_id)); |
211 | 8 | } |
212 | 8 | if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == nullptr) { |
213 | 2 | _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id())); |
214 | 2 | } |
215 | | // build tablet schema in request level |
216 | 8 | _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema()); |
217 | 8 | RowsetWriterContext context; |
218 | 8 | context.txn_id = _req.txn_id; |
219 | 8 | context.load_id = _req.load_id; |
220 | 8 | context.rowset_state = PREPARED; |
221 | 8 | context.segments_overlap = OVERLAPPING; |
222 | 8 | context.tablet_schema = _tablet_schema; |
223 | 8 | context.newest_write_timestamp = UnixSeconds(); |
224 | 8 | context.tablet_id = _tablet->table_id(); |
225 | 8 | context.tablet = _tablet; |
226 | 8 | context.write_type = DataWriteType::TYPE_DIRECT; |
227 | 8 | context.mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids, |
228 | 8 | _rowset_ptrs, _delete_bitmap); |
229 | 8 | context.partial_update_info = _partial_update_info; |
230 | 8 | RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer)); |
231 | | |
232 | 8 | _schema.reset(new Schema(_tablet_schema)); |
233 | 8 | _reset_mem_table(); |
234 | | |
235 | | // create flush handler |
236 | | // by assigning segment_id to memtable before submiting to flush executor, |
237 | | // we can make sure same keys sort in the same order in all replicas. |
238 | 8 | bool should_serial = false; |
239 | 8 | RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token( |
240 | 8 | &_flush_token, _rowset_writer->type(), should_serial, _req.is_high_priority)); |
241 | 8 | _calc_delete_bitmap_token = _storage_engine->calc_delete_bitmap_executor()->create_token(); |
242 | | |
243 | 8 | _is_init = true; |
244 | 8 | return Status::OK(); |
245 | 8 | } |
246 | | |
247 | 9 | Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs) { |
248 | 9 | if (UNLIKELY(row_idxs.empty())) { |
249 | 0 | return Status::OK(); |
250 | 0 | } |
251 | 9 | _lock_watch.start(); |
252 | 9 | std::lock_guard<std::mutex> l(_lock); |
253 | 9 | _lock_watch.stop(); |
254 | 9 | if (!_is_init && !_is_cancelled) { |
255 | 6 | RETURN_IF_ERROR(init()); |
256 | 6 | } |
257 | | |
258 | 9 | if (_is_cancelled) { |
259 | 0 | return _cancel_status; |
260 | 0 | } |
261 | | |
262 | 9 | if (_is_closed) { |
263 | 0 | return Status::Error<ALREADY_CLOSED>( |
264 | 0 | "write block after closed tablet_id={}, load_id={}-{}, txn_id={}", _req.tablet_id, |
265 | 0 | _req.load_id.hi(), _req.load_id.lo(), _req.txn_id); |
266 | 0 | } |
267 | | |
268 | 9 | _total_received_rows += row_idxs.size(); |
269 | 9 | _mem_table->insert(block, row_idxs); |
270 | | |
271 | 9 | if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) { |
272 | 0 | _mem_table->shrink_memtable_by_agg(); |
273 | 0 | } |
274 | 9 | if (UNLIKELY(_mem_table->need_flush())) { |
275 | 0 | auto s = _flush_memtable_async(); |
276 | 0 | _reset_mem_table(); |
277 | 0 | if (UNLIKELY(!s.ok())) { |
278 | 0 | return s; |
279 | 0 | } |
280 | 0 | } |
281 | | |
282 | 9 | return Status::OK(); |
283 | 9 | } |
284 | | |
285 | 8 | Status DeltaWriter::_flush_memtable_async() { |
286 | 8 | if (_mem_table->empty()) { |
287 | 2 | return Status::OK(); |
288 | 2 | } |
289 | 6 | _mem_table->assign_segment_id(); |
290 | 6 | return _flush_token->submit(std::move(_mem_table)); |
291 | 8 | } |
292 | | |
293 | 0 | Status DeltaWriter::flush_memtable_and_wait(bool need_wait) { |
294 | 0 | std::lock_guard<std::mutex> l(_lock); |
295 | 0 | if (!_is_init) { |
296 | | // This writer is not initialized before flushing. Do nothing |
297 | | // But we return OK instead of Status::Error<ALREADY_CANCELLED>(), |
298 | | // Because this method maybe called when trying to reduce mem consumption, |
299 | | // and at that time, the writer may not be initialized yet and that is a normal case. |
300 | 0 | return Status::OK(); |
301 | 0 | } |
302 | | |
303 | 0 | if (_is_cancelled) { |
304 | 0 | return _cancel_status; |
305 | 0 | } |
306 | | |
307 | 0 | VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: " |
308 | 0 | << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id |
309 | 0 | << ", load id: " << print_id(_req.load_id); |
310 | 0 | auto s = _flush_memtable_async(); |
311 | 0 | _reset_mem_table(); |
312 | 0 | if (UNLIKELY(!s.ok())) { |
313 | 0 | return s; |
314 | 0 | } |
315 | | |
316 | 0 | if (need_wait) { |
317 | | // wait all memtables in flush queue to be flushed. |
318 | 0 | SCOPED_TIMER(_wait_flush_timer); |
319 | 0 | RETURN_IF_ERROR(_flush_token->wait()); |
320 | 0 | } |
321 | 0 | return Status::OK(); |
322 | 0 | } |
323 | | |
324 | 4 | Status DeltaWriter::wait_flush() { |
325 | 4 | { |
326 | 4 | std::lock_guard<std::mutex> l(_lock); |
327 | 4 | if (!_is_init) { |
328 | | // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason |
329 | | // as described in flush_memtable_and_wait() |
330 | 0 | return Status::OK(); |
331 | 0 | } |
332 | 4 | if (_is_cancelled) { |
333 | 0 | return _cancel_status; |
334 | 0 | } |
335 | 4 | } |
336 | 4 | SCOPED_TIMER(_wait_flush_timer); |
337 | 4 | RETURN_IF_ERROR(_flush_token->wait()); |
338 | 4 | return Status::OK(); |
339 | 4 | } |
340 | | |
341 | 8 | void DeltaWriter::_reset_mem_table() { |
342 | | #ifndef BE_TEST |
343 | | auto mem_table_insert_tracker = std::make_shared<MemTracker>( |
344 | | fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", |
345 | | std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()), |
346 | | ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); |
347 | | auto mem_table_flush_tracker = std::make_shared<MemTracker>( |
348 | | fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", |
349 | | std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()), |
350 | | ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker()); |
351 | | #else |
352 | 8 | auto mem_table_insert_tracker = std::make_shared<MemTracker>( |
353 | 8 | fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}", |
354 | 8 | std::to_string(tablet_id()), _mem_table_num, _load_id.to_string())); |
355 | 8 | auto mem_table_flush_tracker = std::make_shared<MemTracker>( |
356 | 8 | fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}", |
357 | 8 | std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string())); |
358 | 8 | #endif |
359 | 8 | { |
360 | 8 | std::lock_guard<SpinLock> l(_mem_table_tracker_lock); |
361 | 8 | _mem_table_insert_trackers.push_back(mem_table_insert_tracker); |
362 | 8 | _mem_table_flush_trackers.push_back(mem_table_flush_tracker); |
363 | 8 | } |
364 | 8 | auto mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids, |
365 | 8 | _rowset_ptrs, _delete_bitmap); |
366 | 8 | _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, |
367 | 8 | _req.tuple_desc, _rowset_writer.get(), mow_context, |
368 | 8 | _partial_update_info.get(), mem_table_insert_tracker, |
369 | 8 | mem_table_flush_tracker)); |
370 | | |
371 | 8 | COUNTER_UPDATE(_segment_num, 1); |
372 | 8 | _mem_table->set_callback([this](MemTableStat& stat) { |
373 | 6 | _memtable_stat += stat; |
374 | 6 | COUNTER_SET(_sort_timer, _memtable_stat.sort_ns); |
375 | 6 | COUNTER_SET(_agg_timer, _memtable_stat.agg_ns); |
376 | 6 | COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns); |
377 | 6 | COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns); |
378 | 6 | COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns); |
379 | 6 | COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns); |
380 | 6 | COUNTER_SET(_sort_times, _memtable_stat.sort_times); |
381 | 6 | COUNTER_SET(_agg_times, _memtable_stat.agg_times); |
382 | 6 | COUNTER_SET(_raw_rows_num, _memtable_stat.raw_rows); |
383 | 6 | COUNTER_SET(_merged_rows_num, _memtable_stat.merged_rows); |
384 | 6 | }); |
385 | 8 | } |
386 | | |
387 | 8 | Status DeltaWriter::close() { |
388 | 8 | _lock_watch.start(); |
389 | 8 | std::lock_guard<std::mutex> l(_lock); |
390 | 8 | _lock_watch.stop(); |
391 | 8 | if (!_is_init && !_is_cancelled) { |
392 | | // if this delta writer is not initialized, but close() is called. |
393 | | // which means this tablet has no data loaded, but at least one tablet |
394 | | // in same partition has data loaded. |
395 | | // so we have to also init this DeltaWriter, so that it can create an empty rowset |
396 | | // for this tablet when being closed. |
397 | 2 | RETURN_IF_ERROR(init()); |
398 | 2 | } |
399 | | |
400 | 8 | if (_is_cancelled) { |
401 | 0 | return _cancel_status; |
402 | 0 | } |
403 | | |
404 | 8 | if (_is_closed) { |
405 | 0 | LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id |
406 | 0 | << " load_id=" << _req.load_id << " txn_id=" << _req.txn_id; |
407 | 0 | return Status::OK(); |
408 | 0 | } |
409 | | |
410 | 8 | auto s = _flush_memtable_async(); |
411 | 8 | _mem_table.reset(); |
412 | 8 | _is_closed = true; |
413 | 8 | if (UNLIKELY(!s.ok())) { |
414 | 0 | return s; |
415 | 8 | } else { |
416 | 8 | return Status::OK(); |
417 | 8 | } |
418 | 8 | } |
419 | | |
420 | 8 | Status DeltaWriter::build_rowset() { |
421 | 8 | std::lock_guard<std::mutex> l(_lock); |
422 | 8 | DCHECK(_is_init) |
423 | 0 | << "delta writer is supposed be to initialized before build_rowset() being called"; |
424 | | |
425 | 8 | if (_is_cancelled) { |
426 | 0 | return _cancel_status; |
427 | 0 | } |
428 | | |
429 | 8 | Status st; |
430 | | // return error if previous flush failed |
431 | 8 | { |
432 | 8 | SCOPED_TIMER(_wait_flush_timer); |
433 | 8 | st = _flush_token->wait(); |
434 | 8 | } |
435 | 8 | if (UNLIKELY(!st.ok())) { |
436 | 0 | LOG(WARNING) << "previous flush failed tablet " << _tablet->tablet_id(); |
437 | 0 | return st; |
438 | 0 | } |
439 | | |
440 | 8 | _mem_table.reset(); |
441 | | |
442 | 8 | if (_rowset_writer->num_rows() + _memtable_stat.merged_rows != _total_received_rows) { |
443 | 0 | LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: " |
444 | 0 | << _rowset_writer->num_rows() |
445 | 0 | << ", merged_rows: " << _memtable_stat.merged_rows |
446 | 0 | << ", total received rows: " << _total_received_rows; |
447 | 0 | return Status::InternalError("rows number written by delta writer dosen't match"); |
448 | 0 | } |
449 | | // use rowset meta manager to save meta |
450 | 8 | RETURN_NOT_OK_STATUS_WITH_WARN(_rowset_writer->build(_cur_rowset), "fail to build rowset"); |
451 | 8 | return Status::OK(); |
452 | 8 | } |
453 | | |
454 | 4 | Status DeltaWriter::submit_calc_delete_bitmap_task() { |
455 | 4 | if (!_tablet->enable_unique_key_merge_on_write()) { |
456 | 2 | return Status::OK(); |
457 | 2 | } |
458 | | |
459 | 2 | std::lock_guard<std::mutex> l(_lock); |
460 | | // tablet is under alter process. The delete bitmap will be calculated after conversion. |
461 | 2 | if (_tablet->tablet_state() == TABLET_NOTREADY) { |
462 | 0 | LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " |
463 | 0 | "tablet_id: " |
464 | 0 | << _tablet->tablet_id() << " txn_id: " << _req.txn_id; |
465 | 0 | return Status::OK(); |
466 | 0 | } |
467 | 2 | auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get()); |
468 | 2 | std::vector<segment_v2::SegmentSharedPtr> segments; |
469 | 2 | RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); |
470 | 2 | if (segments.size() > 1) { |
471 | | // calculate delete bitmap between segments |
472 | 0 | RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments, |
473 | 0 | _delete_bitmap)); |
474 | 0 | } |
475 | | |
476 | | // For partial update, we need to fill in the entire row of data, during the calculation |
477 | | // of the delete bitmap. This operation is resource-intensive, and we need to minimize |
478 | | // the number of times it occurs. Therefore, we skip this operation here. |
479 | 2 | if (_partial_update_info->is_partial_update) { |
480 | 0 | return Status::OK(); |
481 | 0 | } |
482 | | |
483 | 2 | LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << _tablet->tablet_id() |
484 | 2 | << ", txn_id: " << _req.txn_id; |
485 | 2 | return _tablet->commit_phase_update_delete_bitmap(_cur_rowset, _rowset_ids, _delete_bitmap, |
486 | 2 | segments, _req.txn_id, |
487 | 2 | _calc_delete_bitmap_token.get(), nullptr); |
488 | 2 | } |
489 | | |
490 | 4 | Status DeltaWriter::wait_calc_delete_bitmap() { |
491 | 4 | if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) { |
492 | 2 | return Status::OK(); |
493 | 2 | } |
494 | 2 | std::lock_guard<std::mutex> l(_lock); |
495 | 2 | RETURN_IF_ERROR(_calc_delete_bitmap_token->wait()); |
496 | 2 | LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: " |
497 | 2 | << _tablet->tablet_id() << ", txn_id: " << _req.txn_id; |
498 | 2 | return Status::OK(); |
499 | 2 | } |
500 | | |
501 | | Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes, |
502 | 8 | const bool write_single_replica) { |
503 | 8 | if (_tablet->enable_unique_key_merge_on_write() && |
504 | 8 | config::enable_merge_on_write_correctness_check && _cur_rowset->num_rows() != 0 && |
505 | 8 | _tablet->tablet_state() != TABLET_NOTREADY) { |
506 | 2 | auto st = _tablet->check_delete_bitmap_correctness( |
507 | 2 | _delete_bitmap, _cur_rowset->end_version() - 1, _req.txn_id, _rowset_ids); |
508 | 2 | if (!st.ok()) { |
509 | 0 | LOG(WARNING) << fmt::format( |
510 | 0 | "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] " |
511 | 0 | "delete bitmap correctness check failed in commit phase!", |
512 | 0 | _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(), |
513 | 0 | _req.partition_id); |
514 | 0 | return st; |
515 | 0 | } |
516 | 2 | } |
517 | | |
518 | 8 | std::lock_guard<std::mutex> l(_lock); |
519 | 8 | SCOPED_TIMER(_close_wait_timer); |
520 | 8 | Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id, |
521 | 8 | _req.load_id, _cur_rowset, false); |
522 | | |
523 | 8 | if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) { |
524 | 0 | LOG(WARNING) << "Failed to commit txn: " << _req.txn_id |
525 | 0 | << " for rowset: " << _cur_rowset->rowset_id(); |
526 | 0 | return res; |
527 | 0 | } |
528 | 8 | if (_tablet->enable_unique_key_merge_on_write()) { |
529 | 2 | _storage_engine->txn_manager()->set_txn_related_delete_bitmap( |
530 | 2 | _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(), |
531 | 2 | _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids, _partial_update_info); |
532 | 2 | } |
533 | | |
534 | 8 | _delta_written_success = true; |
535 | | |
536 | | // const FlushStatistic& stat = _flush_token->get_stats(); |
537 | | // print slow log if wait more than 1s |
538 | | /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) { |
539 | | LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id() |
540 | | << ", load id: " << print_id(_req.load_id) << ", wait close for " |
541 | | << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat; |
542 | | }*/ |
543 | | |
544 | 8 | if (write_single_replica) { |
545 | 0 | for (auto node_info : slave_tablet_nodes.slave_nodes()) { |
546 | 0 | _request_slave_tablet_pull_rowset(node_info); |
547 | 0 | } |
548 | 0 | } |
549 | 8 | COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000); |
550 | 8 | return Status::OK(); |
551 | 8 | } |
552 | | |
553 | | bool DeltaWriter::check_slave_replicas_done( |
554 | 0 | google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids) { |
555 | 0 | std::lock_guard<std::shared_mutex> lock(_slave_node_lock); |
556 | 0 | if (_unfinished_slave_node.empty()) { |
557 | 0 | success_slave_tablet_node_ids->insert({_tablet->tablet_id(), _success_slave_node_ids}); |
558 | 0 | return true; |
559 | 0 | } |
560 | 0 | return false; |
561 | 0 | } |
562 | | |
563 | | void DeltaWriter::add_finished_slave_replicas( |
564 | 0 | google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids) { |
565 | 0 | std::lock_guard<std::shared_mutex> lock(_slave_node_lock); |
566 | 0 | success_slave_tablet_node_ids->insert({_tablet->tablet_id(), _success_slave_node_ids}); |
567 | 0 | } |
568 | | |
569 | 0 | Status DeltaWriter::cancel() { |
570 | 0 | return cancel_with_status(Status::Cancelled("already cancelled")); |
571 | 0 | } |
572 | | |
573 | 0 | Status DeltaWriter::cancel_with_status(const Status& st) { |
574 | 0 | std::lock_guard<std::mutex> l(_lock); |
575 | 0 | if (_is_cancelled) { |
576 | 0 | return Status::OK(); |
577 | 0 | } |
578 | 0 | if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) { |
579 | 0 | _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore the return status */ |
580 | 0 | } |
581 | 0 | _mem_table.reset(); |
582 | 0 | if (_flush_token != nullptr) { |
583 | | // cancel and wait all memtables in flush queue to be finished |
584 | 0 | _flush_token->cancel(); |
585 | 0 | } |
586 | 0 | if (_calc_delete_bitmap_token != nullptr) { |
587 | 0 | _calc_delete_bitmap_token->cancel(); |
588 | 0 | } |
589 | 0 | _is_cancelled = true; |
590 | 0 | _cancel_status = st; |
591 | 0 | return Status::OK(); |
592 | 0 | } |
593 | | |
594 | 0 | int64_t DeltaWriter::mem_consumption(MemType mem) { |
595 | 0 | if (_flush_token == nullptr) { |
596 | | // This method may be called before this writer is initialized. |
597 | | // So _flush_token may be null. |
598 | 0 | return 0; |
599 | 0 | } |
600 | 0 | int64_t mem_usage = 0; |
601 | 0 | { |
602 | 0 | std::lock_guard<SpinLock> l(_mem_table_tracker_lock); |
603 | 0 | if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2 |
604 | 0 | for (auto mem_table_tracker : _mem_table_insert_trackers) { |
605 | 0 | mem_usage += mem_table_tracker->consumption(); |
606 | 0 | } |
607 | 0 | } |
608 | 0 | if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1 |
609 | 0 | for (auto mem_table_tracker : _mem_table_flush_trackers) { |
610 | 0 | mem_usage += mem_table_tracker->consumption(); |
611 | 0 | } |
612 | 0 | } |
613 | 0 | } |
614 | 0 | return mem_usage; |
615 | 0 | } |
616 | | |
617 | 0 | int64_t DeltaWriter::active_memtable_mem_consumption() { |
618 | 0 | std::lock_guard<std::mutex> l(_lock); |
619 | 0 | return _mem_table != nullptr ? _mem_table->memory_usage() : 0; |
620 | 0 | } |
621 | | |
622 | 0 | int64_t DeltaWriter::partition_id() const { |
623 | 0 | return _req.partition_id; |
624 | 0 | } |
625 | | |
626 | | void DeltaWriter::_build_current_tablet_schema(int64_t index_id, |
627 | | const OlapTableSchemaParam* table_schema_param, |
628 | 8 | const TabletSchema& ori_tablet_schema) { |
629 | 8 | _tablet_schema->copy_from(ori_tablet_schema); |
630 | | // find the right index id |
631 | 8 | int i = 0; |
632 | 8 | auto indexes = table_schema_param->indexes(); |
633 | 8 | for (; i < indexes.size(); i++) { |
634 | 0 | if (indexes[i]->index_id == index_id) { |
635 | 0 | break; |
636 | 0 | } |
637 | 0 | } |
638 | | |
639 | 8 | if (indexes.size() > 0 && indexes[i]->columns.size() != 0 && |
640 | 8 | indexes[i]->columns[0]->unique_id() >= 0) { |
641 | 0 | _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(), |
642 | 0 | indexes[i], ori_tablet_schema); |
643 | 0 | } |
644 | 8 | if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) { |
645 | 0 | _tablet->update_max_version_schema(_tablet_schema); |
646 | 0 | } |
647 | | |
648 | 8 | _tablet_schema->set_table_id(table_schema_param->table_id()); |
649 | | // set partial update columns info |
650 | 8 | _partial_update_info = std::make_shared<PartialUpdateInfo>(); |
651 | 8 | _partial_update_info->init( |
652 | 8 | *_tablet_schema, table_schema_param->is_partial_update(), |
653 | 8 | table_schema_param->partial_update_input_columns(), |
654 | 8 | table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(), |
655 | 8 | table_schema_param->nano_seconds(), table_schema_param->timezone(), _cur_max_version); |
656 | 8 | } |
657 | | |
658 | 0 | void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) { |
659 | 0 | std::shared_ptr<PBackendService_Stub> stub = |
660 | 0 | ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client( |
661 | 0 | node_info.host(), node_info.async_internal_port()); |
662 | 0 | if (stub == nullptr) { |
663 | 0 | LOG(WARNING) << "failed to send pull rowset request to slave replica. get rpc stub failed, " |
664 | 0 | "slave host=" |
665 | 0 | << node_info.host() << ", port=" << node_info.async_internal_port() |
666 | 0 | << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _req.txn_id; |
667 | 0 | return; |
668 | 0 | } |
669 | | |
670 | 0 | _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, _tablet->tablet_id(), |
671 | 0 | this); |
672 | 0 | { |
673 | 0 | std::lock_guard<std::shared_mutex> lock(_slave_node_lock); |
674 | 0 | _unfinished_slave_node.insert(node_info.id()); |
675 | 0 | } |
676 | |
|
677 | 0 | std::vector<int64_t> indices_ids; |
678 | 0 | auto tablet_schema = _cur_rowset->rowset_meta()->tablet_schema(); |
679 | 0 | if (!tablet_schema->skip_write_index_on_load()) { |
680 | 0 | for (auto& column : tablet_schema->columns()) { |
681 | 0 | const TabletIndex* index_meta = tablet_schema->get_inverted_index(column.unique_id()); |
682 | 0 | if (index_meta) { |
683 | 0 | indices_ids.emplace_back(index_meta->index_id()); |
684 | 0 | } |
685 | 0 | } |
686 | 0 | } |
687 | |
|
688 | 0 | PTabletWriteSlaveRequest request; |
689 | 0 | RowsetMetaPB rowset_meta_pb = _cur_rowset->rowset_meta()->get_rowset_pb(); |
690 | | // TODO(dx): remove log after fix partition id eq 0 bug |
691 | 0 | if (!rowset_meta_pb.has_partition_id() || rowset_meta_pb.partition_id() == 0) { |
692 | 0 | rowset_meta_pb.set_partition_id(_req.partition_id); |
693 | 0 | LOG(WARNING) << "cant get partition id from local rs pb, get from _req, partition_id=" |
694 | 0 | << rowset_meta_pb.partition_id(); |
695 | 0 | } |
696 | |
|
697 | 0 | request.set_allocated_rowset_meta(&rowset_meta_pb); |
698 | 0 | request.set_host(BackendOptions::get_localhost()); |
699 | 0 | request.set_http_port(config::webserver_port); |
700 | 0 | string tablet_path = _tablet->tablet_path(); |
701 | 0 | request.set_rowset_path(tablet_path); |
702 | 0 | request.set_token(ExecEnv::GetInstance()->token()); |
703 | 0 | request.set_brpc_port(config::brpc_port); |
704 | 0 | request.set_node_id(node_info.id()); |
705 | 0 | for (int segment_id = 0; segment_id < _cur_rowset->rowset_meta()->num_segments(); |
706 | 0 | segment_id++) { |
707 | 0 | std::stringstream segment_name; |
708 | 0 | segment_name << _cur_rowset->rowset_id() << "_" << segment_id << ".dat"; |
709 | 0 | int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + segment_name.str()); |
710 | 0 | request.mutable_segments_size()->insert({segment_id, segment_size}); |
711 | |
|
712 | 0 | if (!indices_ids.empty()) { |
713 | 0 | for (auto index_id : indices_ids) { |
714 | 0 | std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_name( |
715 | 0 | tablet_path + "/" + segment_name.str(), index_id); |
716 | 0 | int64_t size = std::filesystem::file_size(inverted_index_file); |
717 | 0 | PTabletWriteSlaveRequest::IndexSize index_size; |
718 | 0 | index_size.set_indexid(index_id); |
719 | 0 | index_size.set_size(size); |
720 | | // Fetch the map value for the current segment_id. |
721 | | // If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue |
722 | 0 | auto& index_size_map_value = (*request.mutable_inverted_indices_size())[segment_id]; |
723 | | // Add the new index size to the map value. |
724 | 0 | *index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size); |
725 | 0 | } |
726 | 0 | } |
727 | 0 | } |
728 | 0 | RefCountClosure<PTabletWriteSlaveResult>* closure = |
729 | 0 | new RefCountClosure<PTabletWriteSlaveResult>(); |
730 | 0 | closure->ref(); |
731 | 0 | closure->ref(); |
732 | 0 | closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000); |
733 | 0 | closure->cntl.ignore_eovercrowded(); |
734 | 0 | stub->request_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure); |
735 | 0 | static_cast<void>(request.release_rowset_meta()); |
736 | |
|
737 | 0 | closure->join(); |
738 | 0 | if (closure->cntl.Failed()) { |
739 | 0 | if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available( |
740 | 0 | stub, node_info.host(), node_info.async_internal_port())) { |
741 | 0 | ExecEnv::GetInstance()->brpc_internal_client_cache()->erase( |
742 | 0 | closure->cntl.remote_side()); |
743 | 0 | } |
744 | 0 | LOG(WARNING) << "failed to send pull rowset request to slave replica, error=" |
745 | 0 | << berror(closure->cntl.ErrorCode()) |
746 | 0 | << ", error_text=" << closure->cntl.ErrorText() |
747 | 0 | << ". slave host: " << node_info.host() |
748 | 0 | << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _req.txn_id; |
749 | 0 | std::lock_guard<std::shared_mutex> lock(_slave_node_lock); |
750 | 0 | _unfinished_slave_node.erase(node_info.id()); |
751 | 0 | } |
752 | |
|
753 | 0 | if (closure->unref()) { |
754 | 0 | delete closure; |
755 | 0 | } |
756 | 0 | closure = nullptr; |
757 | 0 | } |
758 | | |
759 | 0 | void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed) { |
760 | 0 | std::lock_guard<std::shared_mutex> lock(_slave_node_lock); |
761 | 0 | if (is_succeed) { |
762 | 0 | _success_slave_node_ids.add_slave_node_ids(node_id); |
763 | 0 | VLOG_CRITICAL << "record successful slave replica for txn [" << _req.txn_id |
764 | 0 | << "], tablet_id=" << _tablet->tablet_id() << ", node_id=" << node_id; |
765 | 0 | } |
766 | 0 | _unfinished_slave_node.erase(node_id); |
767 | 0 | } |
768 | | |
769 | 0 | int64_t DeltaWriter::num_rows_filtered() const { |
770 | 0 | return _rowset_writer == nullptr ? 0 : _rowset_writer->num_rows_filtered(); |
771 | 0 | } |
772 | | |
773 | | } // namespace doris |