Coverage Report

Created: 2024-11-20 12:56

/root/doris/be/src/olap/delta_writer.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "olap/delta_writer.h"
19
20
#include <brpc/controller.h>
21
#include <butil/errno.h>
22
#include <fmt/format.h>
23
#include <gen_cpp/internal_service.pb.h>
24
#include <gen_cpp/olap_file.pb.h>
25
26
#include <filesystem>
27
#include <ostream>
28
#include <string>
29
#include <utility>
30
31
// IWYU pragma: no_include <opentelemetry/common/threadlocal.h>
32
#include "common/compiler_util.h" // IWYU pragma: keep
33
#include "common/config.h"
34
#include "common/logging.h"
35
#include "common/status.h"
36
#include "exec/tablet_info.h"
37
#include "gutil/integral_types.h"
38
#include "gutil/strings/numbers.h"
39
#include "io/fs/file_writer.h" // IWYU pragma: keep
40
#include "olap/data_dir.h"
41
#include "olap/memtable.h"
42
#include "olap/memtable_flush_executor.h"
43
#include "olap/olap_define.h"
44
#include "olap/rowset/beta_rowset.h"
45
#include "olap/rowset/beta_rowset_writer.h"
46
#include "olap/rowset/rowset_meta.h"
47
#include "olap/rowset/rowset_writer.h"
48
#include "olap/rowset/rowset_writer_context.h"
49
#include "olap/rowset/segment_v2/inverted_index_desc.h"
50
#include "olap/rowset/segment_v2/segment.h"
51
#include "olap/schema.h"
52
#include "olap/schema_change.h"
53
#include "olap/storage_engine.h"
54
#include "olap/tablet_manager.h"
55
#include "olap/tablet_meta.h"
56
#include "olap/txn_manager.h"
57
#include "runtime/exec_env.h"
58
#include "runtime/load_channel_mgr.h"
59
#include "runtime/memory/mem_tracker.h"
60
#include "service/backend_options.h"
61
#include "util/brpc_client_cache.h"
62
#include "util/mem_info.h"
63
#include "util/ref_count_closure.h"
64
#include "util/stopwatch.hpp"
65
#include "util/time.h"
66
#include "vec/core/block.h"
67
68
namespace doris {
69
using namespace ErrorCode;
70
71
Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, RuntimeProfile* profile,
72
8
                         const UniqueId& load_id) {
73
8
    *writer = new DeltaWriter(req, StorageEngine::instance(), profile, load_id);
74
8
    return Status::OK();
75
8
}
76
77
DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, RuntimeProfile* profile,
78
                         const UniqueId& load_id)
79
        : _req(*req),
80
          _tablet(nullptr),
81
          _cur_rowset(nullptr),
82
          _rowset_writer(nullptr),
83
          _tablet_schema(new TabletSchema),
84
          _delta_written_success(false),
85
          _storage_engine(storage_engine),
86
8
          _load_id(load_id) {
87
8
    _init_profile(profile);
88
8
}
89
90
8
void DeltaWriter::_init_profile(RuntimeProfile* profile) {
91
8
    _profile = profile->create_child(fmt::format("DeltaWriter {}", _req.tablet_id), true, true);
92
8
    _lock_timer = ADD_TIMER(_profile, "LockTime");
93
8
    _sort_timer = ADD_TIMER(_profile, "MemTableSortTime");
94
8
    _agg_timer = ADD_TIMER(_profile, "MemTableAggTime");
95
8
    _memtable_duration_timer = ADD_TIMER(_profile, "MemTableDurationTime");
96
8
    _segment_writer_timer = ADD_TIMER(_profile, "SegmentWriterTime");
97
8
    _wait_flush_timer = ADD_TIMER(_profile, "MemTableWaitFlushTime");
98
8
    _put_into_output_timer = ADD_TIMER(_profile, "MemTablePutIntoOutputTime");
99
8
    _delete_bitmap_timer = ADD_TIMER(_profile, "MemTableDeleteBitmapTime");
100
8
    _close_wait_timer = ADD_TIMER(_profile, "DeltaWriterCloseWaitTime");
101
8
    _sort_times = ADD_COUNTER(_profile, "MemTableSortTimes", TUnit::UNIT);
102
8
    _agg_times = ADD_COUNTER(_profile, "MemTableAggTimes", TUnit::UNIT);
103
8
    _segment_num = ADD_COUNTER(_profile, "SegmentNum", TUnit::UNIT);
104
8
    _raw_rows_num = ADD_COUNTER(_profile, "RawRowNum", TUnit::UNIT);
105
8
    _merged_rows_num = ADD_COUNTER(_profile, "MergedRowNum", TUnit::UNIT);
106
8
}
107
108
8
DeltaWriter::~DeltaWriter() {
109
8
    if (_is_init && !_delta_written_success) {
110
0
        _garbage_collection();
111
0
    }
112
113
8
    if (!_is_init) {
114
0
        return;
115
0
    }
116
117
8
    if (_flush_token != nullptr) {
118
        // cancel and wait all memtables in flush queue to be finished
119
8
        _flush_token->cancel();
120
121
8
        if (_tablet != nullptr) {
122
8
            const FlushStatistic& stat = _flush_token->get_stats();
123
8
            _tablet->flush_bytes->increment(stat.flush_size_bytes);
124
8
            _tablet->flush_finish_count->increment(stat.flush_finish_count);
125
8
        }
126
8
    }
127
128
8
    if (_calc_delete_bitmap_token != nullptr) {
129
8
        _calc_delete_bitmap_token->cancel();
130
8
    }
131
132
8
    _mem_table.reset();
133
8
}
134
135
0
void DeltaWriter::_garbage_collection() {
136
0
    Status rollback_status = Status::OK();
137
0
    TxnManager* txn_mgr = _storage_engine->txn_manager();
138
0
    if (_tablet != nullptr) {
139
0
        rollback_status = txn_mgr->rollback_txn(_req.partition_id, _tablet, _req.txn_id);
140
0
    }
141
    // has to check rollback status, because the rowset maybe committed in this thread and
142
    // published in another thread, then rollback will failed.
143
    // when rollback failed should not delete rowset
144
0
    if (rollback_status.ok()) {
145
0
        _storage_engine->add_unused_rowset(_cur_rowset);
146
0
    }
147
0
}
148
149
8
Status DeltaWriter::init() {
150
8
    TabletManager* tablet_mgr = _storage_engine->tablet_manager();
151
8
    _tablet = tablet_mgr->get_tablet(_req.tablet_id);
152
8
    if (_tablet == nullptr) {
153
0
        return Status::Error<TABLE_NOT_FOUND>("fail to find tablet. tablet_id={}, schema_hash={}",
154
0
                                              _req.tablet_id, _req.schema_hash);
155
0
    }
156
157
    // get rowset ids snapshot
158
8
    if (_tablet->enable_unique_key_merge_on_write()) {
159
2
        std::lock_guard<std::shared_mutex> lck(_tablet->get_header_lock());
160
2
        _cur_max_version = _tablet->max_version_unlocked().second;
161
        // tablet is under alter process. The delete bitmap will be calculated after conversion.
162
2
        if (_tablet->tablet_state() == TABLET_NOTREADY) {
163
            // Disable 'partial_update' when the tablet is undergoing a 'schema changing process'
164
0
            if (_req.table_schema_param->is_partial_update()) {
165
0
                return Status::InternalError(
166
0
                        "Unable to do 'partial_update' when "
167
0
                        "the tablet is undergoing a 'schema changing process'");
168
0
            }
169
0
            _rowset_ids.clear();
170
2
        } else {
171
2
            RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version, &_rowset_ids));
172
2
        }
173
2
        _rowset_ptrs = _tablet->get_rowset_by_ids(&_rowset_ids);
174
2
    }
175
176
    // check tablet version number
177
8
    if (!config::disable_auto_compaction &&
178
8
        !_tablet->tablet_meta()->tablet_schema()->disable_auto_compaction() &&
179
8
        _tablet->exceed_version_limit(config::max_tablet_version_num - 100) &&
180
8
        !MemInfo::is_exceed_soft_mem_limit(GB_EXCHANGE_BYTE)) {
181
        //trigger compaction
182
0
        StorageEngine::instance()->submit_compaction_task(
183
0
                _tablet, CompactionType::CUMULATIVE_COMPACTION, true);
184
0
        if (_tablet->version_count() > config::max_tablet_version_num) {
185
0
            return Status::Error<TOO_MANY_VERSION>(
186
0
                    "failed to init delta writer. version count: {}, exceed limit: {}, tablet: {}",
187
0
                    _tablet->version_count(), config::max_tablet_version_num, _tablet->full_name());
188
0
        }
189
0
    }
190
191
8
    int version_count = _tablet->version_count() + _tablet->stale_version_count();
192
8
    if (_tablet->avg_rs_meta_serialize_size() * version_count >
193
8
        config::tablet_meta_serialize_size_limit) {
194
0
        return Status::Error<TOO_MANY_VERSION>(
195
0
                "failed to init rowset builder. meta serialize size : {}, exceed limit: {}, "
196
0
                "tablet: {}",
197
0
                _tablet->avg_rs_meta_serialize_size() * version_count,
198
0
                config::tablet_meta_serialize_size_limit, _tablet->tablet_id());
199
0
    }
200
201
8
    {
202
8
        std::shared_lock base_migration_rlock(_tablet->get_migration_lock(), std::defer_lock);
203
8
        if (!base_migration_rlock.try_lock_for(
204
8
                    std::chrono::milliseconds(config::migration_lock_timeout_ms))) {
205
0
            return Status::Error<TRY_LOCK_FAILED>("try_lock migration lock failed after {}ms",
206
0
                                                  config::migration_lock_timeout_ms);
207
0
        }
208
8
        std::lock_guard<std::mutex> push_lock(_tablet->get_push_lock());
209
8
        RETURN_IF_ERROR(_storage_engine->txn_manager()->prepare_txn(_req.partition_id, _tablet,
210
8
                                                                    _req.txn_id, _req.load_id));
211
8
    }
212
8
    if (_tablet->enable_unique_key_merge_on_write() && _delete_bitmap == nullptr) {
213
2
        _delete_bitmap.reset(new DeleteBitmap(_tablet->tablet_id()));
214
2
    }
215
    // build tablet schema in request level
216
8
    _build_current_tablet_schema(_req.index_id, _req.table_schema_param, *_tablet->tablet_schema());
217
8
    RowsetWriterContext context;
218
8
    context.txn_id = _req.txn_id;
219
8
    context.load_id = _req.load_id;
220
8
    context.rowset_state = PREPARED;
221
8
    context.segments_overlap = OVERLAPPING;
222
8
    context.tablet_schema = _tablet_schema;
223
8
    context.newest_write_timestamp = UnixSeconds();
224
8
    context.tablet_id = _tablet->table_id();
225
8
    context.tablet = _tablet;
226
8
    context.write_type = DataWriteType::TYPE_DIRECT;
227
8
    context.mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids,
228
8
                                                       _rowset_ptrs, _delete_bitmap);
229
8
    context.partial_update_info = _partial_update_info;
230
8
    RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
231
232
8
    _schema.reset(new Schema(_tablet_schema));
233
8
    _reset_mem_table();
234
235
    // create flush handler
236
    // by assigning segment_id to memtable before submiting to flush executor,
237
    // we can make sure same keys sort in the same order in all replicas.
238
8
    bool should_serial = false;
239
8
    RETURN_IF_ERROR(_storage_engine->memtable_flush_executor()->create_flush_token(
240
8
            &_flush_token, _rowset_writer->type(), should_serial, _req.is_high_priority));
241
8
    _calc_delete_bitmap_token = _storage_engine->calc_delete_bitmap_executor()->create_token();
242
243
8
    _is_init = true;
244
8
    return Status::OK();
245
8
}
246
247
9
Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>& row_idxs) {
248
9
    if (UNLIKELY(row_idxs.empty())) {
249
0
        return Status::OK();
250
0
    }
251
9
    _lock_watch.start();
252
9
    std::lock_guard<std::mutex> l(_lock);
253
9
    _lock_watch.stop();
254
9
    if (!_is_init && !_is_cancelled) {
255
6
        RETURN_IF_ERROR(init());
256
6
    }
257
258
9
    if (_is_cancelled) {
259
0
        return _cancel_status;
260
0
    }
261
262
9
    if (_is_closed) {
263
0
        return Status::Error<ALREADY_CLOSED>(
264
0
                "write block after closed tablet_id={}, load_id={}-{}, txn_id={}", _req.tablet_id,
265
0
                _req.load_id.hi(), _req.load_id.lo(), _req.txn_id);
266
0
    }
267
268
9
    _total_received_rows += row_idxs.size();
269
9
    _mem_table->insert(block, row_idxs);
270
271
9
    if (UNLIKELY(_mem_table->need_agg() && config::enable_shrink_memory)) {
272
0
        _mem_table->shrink_memtable_by_agg();
273
0
    }
274
9
    if (UNLIKELY(_mem_table->need_flush())) {
275
0
        auto s = _flush_memtable_async();
276
0
        _reset_mem_table();
277
0
        if (UNLIKELY(!s.ok())) {
278
0
            return s;
279
0
        }
280
0
    }
281
282
9
    return Status::OK();
283
9
}
284
285
8
Status DeltaWriter::_flush_memtable_async() {
286
8
    if (_mem_table->empty()) {
287
2
        return Status::OK();
288
2
    }
289
6
    _mem_table->assign_segment_id();
290
6
    return _flush_token->submit(std::move(_mem_table));
291
8
}
292
293
0
Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
294
0
    std::lock_guard<std::mutex> l(_lock);
295
0
    if (!_is_init) {
296
        // This writer is not initialized before flushing. Do nothing
297
        // But we return OK instead of Status::Error<ALREADY_CANCELLED>(),
298
        // Because this method maybe called when trying to reduce mem consumption,
299
        // and at that time, the writer may not be initialized yet and that is a normal case.
300
0
        return Status::OK();
301
0
    }
302
303
0
    if (_is_cancelled) {
304
0
        return _cancel_status;
305
0
    }
306
307
0
    VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
308
0
                << _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
309
0
                << ", load id: " << print_id(_req.load_id);
310
0
    auto s = _flush_memtable_async();
311
0
    _reset_mem_table();
312
0
    if (UNLIKELY(!s.ok())) {
313
0
        return s;
314
0
    }
315
316
0
    if (need_wait) {
317
        // wait all memtables in flush queue to be flushed.
318
0
        SCOPED_TIMER(_wait_flush_timer);
319
0
        RETURN_IF_ERROR(_flush_token->wait());
320
0
    }
321
0
    return Status::OK();
322
0
}
323
324
4
Status DeltaWriter::wait_flush() {
325
4
    {
326
4
        std::lock_guard<std::mutex> l(_lock);
327
4
        if (!_is_init) {
328
            // return OK instead of Status::Error<ALREADY_CANCELLED>() for same reason
329
            // as described in flush_memtable_and_wait()
330
0
            return Status::OK();
331
0
        }
332
4
        if (_is_cancelled) {
333
0
            return _cancel_status;
334
0
        }
335
4
    }
336
4
    SCOPED_TIMER(_wait_flush_timer);
337
4
    RETURN_IF_ERROR(_flush_token->wait());
338
4
    return Status::OK();
339
4
}
340
341
8
void DeltaWriter::_reset_mem_table() {
342
#ifndef BE_TEST
343
    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
344
            fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
345
                        std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()),
346
            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
347
    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
348
            fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
349
                        std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()),
350
            ExecEnv::GetInstance()->load_channel_mgr()->mem_tracker());
351
#else
352
8
    auto mem_table_insert_tracker = std::make_shared<MemTracker>(
353
8
            fmt::format("MemTableManualInsert:TabletId={}:MemTableNum={}#loadID={}",
354
8
                        std::to_string(tablet_id()), _mem_table_num, _load_id.to_string()));
355
8
    auto mem_table_flush_tracker = std::make_shared<MemTracker>(
356
8
            fmt::format("MemTableHookFlush:TabletId={}:MemTableNum={}#loadID={}",
357
8
                        std::to_string(tablet_id()), _mem_table_num++, _load_id.to_string()));
358
8
#endif
359
8
    {
360
8
        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
361
8
        _mem_table_insert_trackers.push_back(mem_table_insert_tracker);
362
8
        _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
363
8
    }
364
8
    auto mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids,
365
8
                                                    _rowset_ptrs, _delete_bitmap);
366
8
    _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots,
367
8
                                  _req.tuple_desc, _rowset_writer.get(), mow_context,
368
8
                                  _partial_update_info.get(), mem_table_insert_tracker,
369
8
                                  mem_table_flush_tracker));
370
371
8
    COUNTER_UPDATE(_segment_num, 1);
372
8
    _mem_table->set_callback([this](MemTableStat& stat) {
373
6
        _memtable_stat += stat;
374
6
        COUNTER_SET(_sort_timer, _memtable_stat.sort_ns);
375
6
        COUNTER_SET(_agg_timer, _memtable_stat.agg_ns);
376
6
        COUNTER_SET(_memtable_duration_timer, _memtable_stat.duration_ns);
377
6
        COUNTER_SET(_segment_writer_timer, _memtable_stat.segment_writer_ns);
378
6
        COUNTER_SET(_delete_bitmap_timer, _memtable_stat.delete_bitmap_ns);
379
6
        COUNTER_SET(_put_into_output_timer, _memtable_stat.put_into_output_ns);
380
6
        COUNTER_SET(_sort_times, _memtable_stat.sort_times);
381
6
        COUNTER_SET(_agg_times, _memtable_stat.agg_times);
382
6
        COUNTER_SET(_raw_rows_num, _memtable_stat.raw_rows);
383
6
        COUNTER_SET(_merged_rows_num, _memtable_stat.merged_rows);
384
6
    });
385
8
}
386
387
8
Status DeltaWriter::close() {
388
8
    _lock_watch.start();
389
8
    std::lock_guard<std::mutex> l(_lock);
390
8
    _lock_watch.stop();
391
8
    if (!_is_init && !_is_cancelled) {
392
        // if this delta writer is not initialized, but close() is called.
393
        // which means this tablet has no data loaded, but at least one tablet
394
        // in same partition has data loaded.
395
        // so we have to also init this DeltaWriter, so that it can create an empty rowset
396
        // for this tablet when being closed.
397
2
        RETURN_IF_ERROR(init());
398
2
    }
399
400
8
    if (_is_cancelled) {
401
0
        return _cancel_status;
402
0
    }
403
404
8
    if (_is_closed) {
405
0
        LOG(WARNING) << "close after closed tablet_id=" << _req.tablet_id
406
0
                     << " load_id=" << _req.load_id << " txn_id=" << _req.txn_id;
407
0
        return Status::OK();
408
0
    }
409
410
8
    auto s = _flush_memtable_async();
411
8
    _mem_table.reset();
412
8
    _is_closed = true;
413
8
    if (UNLIKELY(!s.ok())) {
414
0
        return s;
415
8
    } else {
416
8
        return Status::OK();
417
8
    }
418
8
}
419
420
8
Status DeltaWriter::build_rowset() {
421
8
    std::lock_guard<std::mutex> l(_lock);
422
8
    DCHECK(_is_init)
423
0
            << "delta writer is supposed be to initialized before build_rowset() being called";
424
425
8
    if (_is_cancelled) {
426
0
        return _cancel_status;
427
0
    }
428
429
8
    Status st;
430
    // return error if previous flush failed
431
8
    {
432
8
        SCOPED_TIMER(_wait_flush_timer);
433
8
        st = _flush_token->wait();
434
8
    }
435
8
    if (UNLIKELY(!st.ok())) {
436
0
        LOG(WARNING) << "previous flush failed tablet " << _tablet->tablet_id();
437
0
        return st;
438
0
    }
439
440
8
    _mem_table.reset();
441
442
8
    if (_rowset_writer->num_rows() + _memtable_stat.merged_rows != _total_received_rows) {
443
0
        LOG(WARNING) << "the rows number written doesn't match, rowset num rows written to file: "
444
0
                     << _rowset_writer->num_rows()
445
0
                     << ", merged_rows: " << _memtable_stat.merged_rows
446
0
                     << ", total received rows: " << _total_received_rows;
447
0
        return Status::InternalError("rows number written by delta writer dosen't match");
448
0
    }
449
    // use rowset meta manager to save meta
450
8
    RETURN_NOT_OK_STATUS_WITH_WARN(_rowset_writer->build(_cur_rowset), "fail to build rowset");
451
8
    return Status::OK();
452
8
}
453
454
4
Status DeltaWriter::submit_calc_delete_bitmap_task() {
455
4
    if (!_tablet->enable_unique_key_merge_on_write()) {
456
2
        return Status::OK();
457
2
    }
458
459
2
    std::lock_guard<std::mutex> l(_lock);
460
    // tablet is under alter process. The delete bitmap will be calculated after conversion.
461
2
    if (_tablet->tablet_state() == TABLET_NOTREADY) {
462
0
        LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
463
0
                     "tablet_id: "
464
0
                  << _tablet->tablet_id() << " txn_id: " << _req.txn_id;
465
0
        return Status::OK();
466
0
    }
467
2
    auto beta_rowset = reinterpret_cast<BetaRowset*>(_cur_rowset.get());
468
2
    std::vector<segment_v2::SegmentSharedPtr> segments;
469
2
    RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
470
2
    if (segments.size() > 1) {
471
        // calculate delete bitmap between segments
472
0
        RETURN_IF_ERROR(_tablet->calc_delete_bitmap_between_segments(_cur_rowset, segments,
473
0
                                                                     _delete_bitmap));
474
0
    }
475
476
    // For partial update, we need to fill in the entire row of data, during the calculation
477
    // of the delete bitmap. This operation is resource-intensive, and we need to minimize
478
    // the number of times it occurs. Therefore, we skip this operation here.
479
2
    if (_partial_update_info->is_partial_update) {
480
0
        return Status::OK();
481
0
    }
482
483
2
    LOG(INFO) << "submit calc delete bitmap task to executor, tablet_id: " << _tablet->tablet_id()
484
2
              << ", txn_id: " << _req.txn_id;
485
2
    return _tablet->commit_phase_update_delete_bitmap(_cur_rowset, _rowset_ids, _delete_bitmap,
486
2
                                                      segments, _req.txn_id,
487
2
                                                      _calc_delete_bitmap_token.get(), nullptr);
488
2
}
489
490
4
Status DeltaWriter::wait_calc_delete_bitmap() {
491
4
    if (!_tablet->enable_unique_key_merge_on_write() || _partial_update_info->is_partial_update) {
492
2
        return Status::OK();
493
2
    }
494
2
    std::lock_guard<std::mutex> l(_lock);
495
2
    RETURN_IF_ERROR(_calc_delete_bitmap_token->wait());
496
2
    LOG(INFO) << "Got result of calc delete bitmap task from executor, tablet_id: "
497
2
              << _tablet->tablet_id() << ", txn_id: " << _req.txn_id;
498
2
    return Status::OK();
499
2
}
500
501
Status DeltaWriter::commit_txn(const PSlaveTabletNodes& slave_tablet_nodes,
502
8
                               const bool write_single_replica) {
503
8
    if (_tablet->enable_unique_key_merge_on_write() &&
504
8
        config::enable_merge_on_write_correctness_check && _cur_rowset->num_rows() != 0 &&
505
8
        _tablet->tablet_state() != TABLET_NOTREADY) {
506
2
        auto st = _tablet->check_delete_bitmap_correctness(
507
2
                _delete_bitmap, _cur_rowset->end_version() - 1, _req.txn_id, _rowset_ids);
508
2
        if (!st.ok()) {
509
0
            LOG(WARNING) << fmt::format(
510
0
                    "[tablet_id:{}][txn_id:{}][load_id:{}][partition_id:{}] "
511
0
                    "delete bitmap correctness check failed in commit phase!",
512
0
                    _req.tablet_id, _req.txn_id, UniqueId(_req.load_id).to_string(),
513
0
                    _req.partition_id);
514
0
            return st;
515
0
        }
516
2
    }
517
518
8
    std::lock_guard<std::mutex> l(_lock);
519
8
    SCOPED_TIMER(_close_wait_timer);
520
8
    Status res = _storage_engine->txn_manager()->commit_txn(_req.partition_id, _tablet, _req.txn_id,
521
8
                                                            _req.load_id, _cur_rowset, false);
522
523
8
    if (!res && !res.is<PUSH_TRANSACTION_ALREADY_EXIST>()) {
524
0
        LOG(WARNING) << "Failed to commit txn: " << _req.txn_id
525
0
                     << " for rowset: " << _cur_rowset->rowset_id();
526
0
        return res;
527
0
    }
528
8
    if (_tablet->enable_unique_key_merge_on_write()) {
529
2
        _storage_engine->txn_manager()->set_txn_related_delete_bitmap(
530
2
                _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(),
531
2
                _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids, _partial_update_info);
532
2
    }
533
534
8
    _delta_written_success = true;
535
536
    // const FlushStatistic& stat = _flush_token->get_stats();
537
    // print slow log if wait more than 1s
538
    /*if (_wait_flush_timer->elapsed_time() > 1000UL * 1000 * 1000) {
539
        LOG(INFO) << "close delta writer for tablet: " << _tablet->tablet_id()
540
                  << ", load id: " << print_id(_req.load_id) << ", wait close for "
541
                  << _wait_flush_timer->elapsed_time() << "(ns), stats: " << stat;
542
    }*/
543
544
8
    if (write_single_replica) {
545
0
        for (auto node_info : slave_tablet_nodes.slave_nodes()) {
546
0
            _request_slave_tablet_pull_rowset(node_info);
547
0
        }
548
0
    }
549
8
    COUNTER_UPDATE(_lock_timer, _lock_watch.elapsed_time() / 1000);
550
8
    return Status::OK();
551
8
}
552
553
bool DeltaWriter::check_slave_replicas_done(
554
0
        google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids) {
555
0
    std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
556
0
    if (_unfinished_slave_node.empty()) {
557
0
        success_slave_tablet_node_ids->insert({_tablet->tablet_id(), _success_slave_node_ids});
558
0
        return true;
559
0
    }
560
0
    return false;
561
0
}
562
563
void DeltaWriter::add_finished_slave_replicas(
564
0
        google::protobuf::Map<int64_t, PSuccessSlaveTabletNodeIds>* success_slave_tablet_node_ids) {
565
0
    std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
566
0
    success_slave_tablet_node_ids->insert({_tablet->tablet_id(), _success_slave_node_ids});
567
0
}
568
569
0
Status DeltaWriter::cancel() {
570
0
    return cancel_with_status(Status::Cancelled("already cancelled"));
571
0
}
572
573
0
Status DeltaWriter::cancel_with_status(const Status& st) {
574
0
    std::lock_guard<std::mutex> l(_lock);
575
0
    if (_is_cancelled) {
576
0
        return Status::OK();
577
0
    }
578
0
    if (_rowset_writer && _rowset_writer->is_doing_segcompaction()) {
579
0
        _rowset_writer->wait_flying_segcompaction(); /* already cancel, ignore the return status */
580
0
    }
581
0
    _mem_table.reset();
582
0
    if (_flush_token != nullptr) {
583
        // cancel and wait all memtables in flush queue to be finished
584
0
        _flush_token->cancel();
585
0
    }
586
0
    if (_calc_delete_bitmap_token != nullptr) {
587
0
        _calc_delete_bitmap_token->cancel();
588
0
    }
589
0
    _is_cancelled = true;
590
0
    _cancel_status = st;
591
0
    return Status::OK();
592
0
}
593
594
0
int64_t DeltaWriter::mem_consumption(MemType mem) {
595
0
    if (_flush_token == nullptr) {
596
        // This method may be called before this writer is initialized.
597
        // So _flush_token may be null.
598
0
        return 0;
599
0
    }
600
0
    int64_t mem_usage = 0;
601
0
    {
602
0
        std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
603
0
        if ((mem & MemType::WRITE) == MemType::WRITE) { // 3 & 2 = 2
604
0
            for (auto mem_table_tracker : _mem_table_insert_trackers) {
605
0
                mem_usage += mem_table_tracker->consumption();
606
0
            }
607
0
        }
608
0
        if ((mem & MemType::FLUSH) == MemType::FLUSH) { // 3 & 1 = 1
609
0
            for (auto mem_table_tracker : _mem_table_flush_trackers) {
610
0
                mem_usage += mem_table_tracker->consumption();
611
0
            }
612
0
        }
613
0
    }
614
0
    return mem_usage;
615
0
}
616
617
0
int64_t DeltaWriter::active_memtable_mem_consumption() {
618
0
    std::lock_guard<std::mutex> l(_lock);
619
0
    return _mem_table != nullptr ? _mem_table->memory_usage() : 0;
620
0
}
621
622
0
int64_t DeltaWriter::partition_id() const {
623
0
    return _req.partition_id;
624
0
}
625
626
void DeltaWriter::_build_current_tablet_schema(int64_t index_id,
627
                                               const OlapTableSchemaParam* table_schema_param,
628
8
                                               const TabletSchema& ori_tablet_schema) {
629
8
    _tablet_schema->copy_from(ori_tablet_schema);
630
    // find the right index id
631
8
    int i = 0;
632
8
    auto indexes = table_schema_param->indexes();
633
8
    for (; i < indexes.size(); i++) {
634
0
        if (indexes[i]->index_id == index_id) {
635
0
            break;
636
0
        }
637
0
    }
638
639
8
    if (indexes.size() > 0 && indexes[i]->columns.size() != 0 &&
640
8
        indexes[i]->columns[0]->unique_id() >= 0) {
641
0
        _tablet_schema->build_current_tablet_schema(index_id, table_schema_param->version(),
642
0
                                                    indexes[i], ori_tablet_schema);
643
0
    }
644
8
    if (_tablet_schema->schema_version() > ori_tablet_schema.schema_version()) {
645
0
        _tablet->update_max_version_schema(_tablet_schema);
646
0
    }
647
648
8
    _tablet_schema->set_table_id(table_schema_param->table_id());
649
    // set partial update columns info
650
8
    _partial_update_info = std::make_shared<PartialUpdateInfo>();
651
8
    _partial_update_info->init(
652
8
            *_tablet_schema, table_schema_param->is_partial_update(),
653
8
            table_schema_param->partial_update_input_columns(),
654
8
            table_schema_param->is_strict_mode(), table_schema_param->timestamp_ms(),
655
8
            table_schema_param->nano_seconds(), table_schema_param->timezone(), _cur_max_version);
656
8
}
657
658
0
void DeltaWriter::_request_slave_tablet_pull_rowset(PNodeInfo node_info) {
659
0
    std::shared_ptr<PBackendService_Stub> stub =
660
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
661
0
                    node_info.host(), node_info.async_internal_port());
662
0
    if (stub == nullptr) {
663
0
        LOG(WARNING) << "failed to send pull rowset request to slave replica. get rpc stub failed, "
664
0
                        "slave host="
665
0
                     << node_info.host() << ", port=" << node_info.async_internal_port()
666
0
                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _req.txn_id;
667
0
        return;
668
0
    }
669
670
0
    _storage_engine->txn_manager()->add_txn_tablet_delta_writer(_req.txn_id, _tablet->tablet_id(),
671
0
                                                                this);
672
0
    {
673
0
        std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
674
0
        _unfinished_slave_node.insert(node_info.id());
675
0
    }
676
677
0
    std::vector<int64_t> indices_ids;
678
0
    auto tablet_schema = _cur_rowset->rowset_meta()->tablet_schema();
679
0
    if (!tablet_schema->skip_write_index_on_load()) {
680
0
        for (auto& column : tablet_schema->columns()) {
681
0
            const TabletIndex* index_meta = tablet_schema->get_inverted_index(column.unique_id());
682
0
            if (index_meta) {
683
0
                indices_ids.emplace_back(index_meta->index_id());
684
0
            }
685
0
        }
686
0
    }
687
688
0
    PTabletWriteSlaveRequest request;
689
0
    RowsetMetaPB rowset_meta_pb = _cur_rowset->rowset_meta()->get_rowset_pb();
690
    // TODO(dx): remove log after fix partition id eq 0 bug
691
0
    if (!rowset_meta_pb.has_partition_id() || rowset_meta_pb.partition_id() == 0) {
692
0
        rowset_meta_pb.set_partition_id(_req.partition_id);
693
0
        LOG(WARNING) << "cant get partition id from local rs pb, get from _req, partition_id="
694
0
                     << rowset_meta_pb.partition_id();
695
0
    }
696
697
0
    request.set_allocated_rowset_meta(&rowset_meta_pb);
698
0
    request.set_host(BackendOptions::get_localhost());
699
0
    request.set_http_port(config::webserver_port);
700
0
    string tablet_path = _tablet->tablet_path();
701
0
    request.set_rowset_path(tablet_path);
702
0
    request.set_token(ExecEnv::GetInstance()->token());
703
0
    request.set_brpc_port(config::brpc_port);
704
0
    request.set_node_id(node_info.id());
705
0
    for (int segment_id = 0; segment_id < _cur_rowset->rowset_meta()->num_segments();
706
0
         segment_id++) {
707
0
        std::stringstream segment_name;
708
0
        segment_name << _cur_rowset->rowset_id() << "_" << segment_id << ".dat";
709
0
        int64_t segment_size = std::filesystem::file_size(tablet_path + "/" + segment_name.str());
710
0
        request.mutable_segments_size()->insert({segment_id, segment_size});
711
712
0
        if (!indices_ids.empty()) {
713
0
            for (auto index_id : indices_ids) {
714
0
                std::string inverted_index_file = InvertedIndexDescriptor::get_index_file_name(
715
0
                        tablet_path + "/" + segment_name.str(), index_id);
716
0
                int64_t size = std::filesystem::file_size(inverted_index_file);
717
0
                PTabletWriteSlaveRequest::IndexSize index_size;
718
0
                index_size.set_indexid(index_id);
719
0
                index_size.set_size(size);
720
                // Fetch the map value for the current segment_id.
721
                // If it doesn't exist, this will insert a new default-constructed IndexSizeMapValue
722
0
                auto& index_size_map_value = (*request.mutable_inverted_indices_size())[segment_id];
723
                // Add the new index size to the map value.
724
0
                *index_size_map_value.mutable_index_sizes()->Add() = std::move(index_size);
725
0
            }
726
0
        }
727
0
    }
728
0
    RefCountClosure<PTabletWriteSlaveResult>* closure =
729
0
            new RefCountClosure<PTabletWriteSlaveResult>();
730
0
    closure->ref();
731
0
    closure->ref();
732
0
    closure->cntl.set_timeout_ms(config::slave_replica_writer_rpc_timeout_sec * 1000);
733
0
    closure->cntl.ignore_eovercrowded();
734
0
    stub->request_slave_tablet_pull_rowset(&closure->cntl, &request, &closure->result, closure);
735
0
    static_cast<void>(request.release_rowset_meta());
736
737
0
    closure->join();
738
0
    if (closure->cntl.Failed()) {
739
0
        if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
740
0
                    stub, node_info.host(), node_info.async_internal_port())) {
741
0
            ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
742
0
                    closure->cntl.remote_side());
743
0
        }
744
0
        LOG(WARNING) << "failed to send pull rowset request to slave replica, error="
745
0
                     << berror(closure->cntl.ErrorCode())
746
0
                     << ", error_text=" << closure->cntl.ErrorText()
747
0
                     << ". slave host: " << node_info.host()
748
0
                     << ", tablet_id=" << _tablet->tablet_id() << ", txn_id=" << _req.txn_id;
749
0
        std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
750
0
        _unfinished_slave_node.erase(node_info.id());
751
0
    }
752
753
0
    if (closure->unref()) {
754
0
        delete closure;
755
0
    }
756
0
    closure = nullptr;
757
0
}
758
759
0
void DeltaWriter::finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed) {
760
0
    std::lock_guard<std::shared_mutex> lock(_slave_node_lock);
761
0
    if (is_succeed) {
762
0
        _success_slave_node_ids.add_slave_node_ids(node_id);
763
0
        VLOG_CRITICAL << "record successful slave replica for txn [" << _req.txn_id
764
0
                      << "], tablet_id=" << _tablet->tablet_id() << ", node_id=" << node_id;
765
0
    }
766
0
    _unfinished_slave_node.erase(node_id);
767
0
}
768
769
0
int64_t DeltaWriter::num_rows_filtered() const {
770
0
    return _rowset_writer == nullptr ? 0 : _rowset_writer->num_rows_filtered();
771
0
}
772
773
} // namespace doris