Coverage Report

Created: 2025-04-21 12:08

/root/doris/be/src/runtime/tablets_channel.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/tablets_channel.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <ctime>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/status.h"
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <chrono> // IWYU pragma: keep
31
#include <initializer_list>
32
#include <set>
33
#include <thread>
34
#include <utility>
35
36
#ifdef DEBUG
37
#include <unordered_set>
38
#endif
39
40
#include "common/logging.h"
41
#include "exec/tablet_info.h"
42
#include "olap/delta_writer.h"
43
#include "olap/storage_engine.h"
44
#include "olap/txn_manager.h"
45
#include "runtime/load_channel.h"
46
#include "util/defer_op.h"
47
#include "util/doris_metrics.h"
48
#include "util/metrics.h"
49
#include "vec/core/block.h"
50
51
namespace doris {
52
class SlotDescriptor;
53
54
bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
55
        "tablets_channel_send_data_allocated_size");
56
57
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
58
59
std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
60
61
BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id,
62
                                       bool is_high_priority, RuntimeProfile* profile)
63
        : _key(key),
64
          _state(kInitialized),
65
          _load_id(load_id),
66
          _closed_senders(64),
67
0
          _is_high_priority(is_high_priority) {
68
0
    static std::once_flag once_flag;
69
0
    _init_profile(profile);
70
0
    std::call_once(once_flag, [] {
71
0
        REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
72
0
    });
73
0
}
74
75
TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key,
76
                               const UniqueId& load_id, bool is_high_priority,
77
                               RuntimeProfile* profile)
78
0
        : BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {}
79
80
0
BaseTabletsChannel::~BaseTabletsChannel() {
81
0
    _s_tablet_writer_count -= _tablet_writers.size();
82
0
}
83
84
0
TabletsChannel::~TabletsChannel() = default;
85
86
Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
87
0
                                            const PTabletWriterAddBlockRequest& request) {
88
0
    std::lock_guard<std::mutex> l(_lock);
89
0
    if (_state != kOpened) {
90
0
        return _state == kFinished ? _close_status
91
0
                                   : Status::InternalError("TabletsChannel {} state: {}",
92
0
                                                           _key.to_string(), _state);
93
0
    }
94
0
    cur_seq = _next_seqs[request.sender_id()];
95
    // check packet
96
0
    if (request.packet_seq() > cur_seq) {
97
0
        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
98
0
                     << ", recept_seq=" << request.packet_seq();
99
0
        return Status::InternalError("lost data packet");
100
0
    }
101
0
    return Status::OK();
102
0
}
103
104
0
void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
105
0
    _profile =
106
0
            profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true);
107
0
    _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
108
109
0
    auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
110
0
    _add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
111
0
    _write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
112
0
    _incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
113
0
    _memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES);
114
0
    _write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES);
115
0
    _flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES);
116
0
    _max_tablet_memory_usage_counter =
117
0
            memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES);
118
0
    _max_tablet_write_memory_usage_counter =
119
0
            memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES);
120
0
    _max_tablet_flush_memory_usage_counter =
121
0
            memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES);
122
0
}
123
124
0
void TabletsChannel::_init_profile(RuntimeProfile* profile) {
125
0
    BaseTabletsChannel::_init_profile(profile);
126
0
    _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
127
0
}
128
129
0
Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
130
0
    std::lock_guard<std::mutex> l(_lock);
131
    // if _state is kOpened, it's a normal case, already open by other sender
132
    // if _state is kFinished, already cancelled by other sender
133
0
    if (_state == kOpened || _state == kFinished) {
134
0
        return Status::OK();
135
0
    }
136
0
    LOG(INFO) << fmt::format("open tablets channel {}, tablets num: {} timeout(s): {}",
137
0
                             _key.to_string(), request.tablets().size(),
138
0
                             request.load_channel_timeout_s());
139
0
    _txn_id = request.txn_id();
140
0
    _index_id = request.index_id();
141
0
    _schema = std::make_shared<OlapTableSchemaParam>();
142
0
    RETURN_IF_ERROR(_schema->init(request.schema()));
143
0
    _tuple_desc = _schema->tuple_desc();
144
145
0
    int max_sender = request.num_senders();
146
    /*
147
     * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
148
     * there are two possibilities:
149
     *  1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be 
150
     *     called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never 
151
     *     be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
152
     *     the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
153
     *     return together to avoid close-then-incremental-open problem.
154
     *  2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
155
     *     (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
156
     *     distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
157
     *     then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
158
     *     and also need same number of senders' close to close. but will not hang.
159
     */
160
0
    if (_open_by_incremental) {
161
0
        DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
162
0
    } else {
163
0
        _num_remaining_senders = max_sender;
164
0
    }
165
0
    LOG(INFO) << fmt::format(
166
0
            "txn {}: TabletsChannel of index {} init senders {} with incremental {}", _txn_id,
167
0
            _index_id, _num_remaining_senders, _open_by_incremental ? "on" : "off");
168
    // just use max_sender no matter incremental or not cuz we dont know how many senders will open.
169
0
    _next_seqs.resize(max_sender, 0);
170
0
    _closed_senders.Reset(max_sender);
171
172
0
    RETURN_IF_ERROR(_open_all_writers(request));
173
174
0
    _state = kOpened;
175
0
    return Status::OK();
176
0
}
177
178
0
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
179
0
    SCOPED_TIMER(_incremental_open_timer);
180
181
    // current node first opened by incremental open
182
0
    if (_state == kInitialized) {
183
0
        _open_by_incremental = true;
184
0
        RETURN_IF_ERROR(open(params));
185
0
    }
186
187
0
    std::lock_guard<std::mutex> l(_lock);
188
189
    // one sender may incremental_open many times. but only close one time. so dont count duplicately.
190
0
    if (_open_by_incremental) {
191
0
        if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
192
0
            _recieved_senders.insert(params.sender_id());
193
0
            _num_remaining_senders++;
194
0
        } else if (!params.has_sender_id()) { // for compatible
195
0
            _num_remaining_senders++;
196
0
        }
197
0
        VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
198
0
                                  _num_remaining_senders);
199
0
    }
200
201
0
    std::vector<SlotDescriptor*>* index_slots = nullptr;
202
0
    int32_t schema_hash = 0;
203
0
    for (const auto& index : _schema->indexes()) {
204
0
        if (index->index_id == _index_id) {
205
0
            index_slots = &index->slots;
206
0
            schema_hash = index->schema_hash;
207
0
            break;
208
0
        }
209
0
    }
210
0
    if (index_slots == nullptr) {
211
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
212
0
    }
213
    // update tablets
214
0
    size_t incremental_tablet_num = 0;
215
0
    std::stringstream ss;
216
0
    ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
217
0
       << " incremental open delta writer: ";
218
219
    // every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again.
220
0
    for (const auto& tablet : params.tablets()) {
221
0
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
222
0
            continue;
223
0
        }
224
0
        incremental_tablet_num++;
225
226
0
        WriteRequest wrequest;
227
0
        wrequest.index_id = params.index_id();
228
0
        wrequest.tablet_id = tablet.tablet_id();
229
0
        wrequest.schema_hash = schema_hash;
230
0
        wrequest.txn_id = _txn_id;
231
0
        wrequest.partition_id = tablet.partition_id();
232
0
        wrequest.load_id = params.id();
233
0
        wrequest.tuple_desc = _tuple_desc;
234
0
        wrequest.slots = index_slots;
235
0
        wrequest.is_high_priority = _is_high_priority;
236
0
        wrequest.table_schema_param = _schema;
237
238
        // TODO(plat1ko): CloudDeltaWriter
239
0
        auto delta_writer = std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest,
240
0
                                                          _profile, _load_id);
241
0
        ss << "[" << tablet.tablet_id() << "]";
242
0
        {
243
            // here we modify _tablet_writers. so need lock.
244
0
            std::lock_guard<SpinLock> l(_tablet_writers_lock);
245
0
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
246
0
        }
247
0
    }
248
249
0
    _s_tablet_writer_count += incremental_tablet_num;
250
0
    LOG(INFO) << ss.str();
251
252
0
    _state = kOpened;
253
0
    return Status::OK();
254
0
}
255
256
Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
257
0
                             PTabletWriterAddBlockResult* res, bool* finished) {
258
0
    int sender_id = req.sender_id();
259
0
    int64_t backend_id = req.backend_id();
260
0
    const auto& partition_ids = req.partition_ids();
261
0
    auto* tablet_errors = res->mutable_tablet_errors();
262
0
    std::lock_guard<std::mutex> l(_lock);
263
0
    if (_state == kFinished) {
264
0
        return _close_status;
265
0
    }
266
0
    if (_closed_senders.Get(sender_id)) {
267
        // Double close from one sender, just return OK
268
0
        *finished = (_num_remaining_senders == 0);
269
0
        return _close_status;
270
0
    }
271
0
    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
272
0
              << ", backend id: " << backend_id;
273
0
    for (auto pid : partition_ids) {
274
0
        _partition_ids.emplace(pid);
275
0
    }
276
0
    _closed_senders.Set(sender_id, true);
277
0
    _num_remaining_senders--;
278
0
    *finished = (_num_remaining_senders == 0);
279
0
    if (*finished) {
280
0
        _state = kFinished;
281
        // All senders are closed
282
        // 1. close all delta writers
283
0
        std::set<DeltaWriter*> need_wait_writers;
284
0
        for (auto&& [tablet_id, writer] : _tablet_writers) {
285
0
            if (_partition_ids.contains(writer->partition_id())) {
286
0
                auto st = writer->close();
287
0
                if (!st.ok()) {
288
0
                    auto err_msg = fmt::format(
289
0
                            "close tablet writer failed, tablet_id={}, "
290
0
                            "transaction_id={}, err={}",
291
0
                            tablet_id, _txn_id, st.to_string());
292
0
                    LOG(WARNING) << err_msg;
293
0
                    PTabletError* tablet_error = tablet_errors->Add();
294
0
                    tablet_error->set_tablet_id(tablet_id);
295
0
                    tablet_error->set_msg(st.to_string());
296
                    // just skip this tablet(writer) and continue to close others
297
0
                    continue;
298
0
                }
299
                // tablet writer in `_broken_tablets` should not call `build_rowset` and
300
                // `commit_txn` method, after that, the publish-version task will success,
301
                // which can cause the replica inconsistency.
302
0
                if (_is_broken_tablet(writer->tablet_id())) {
303
0
                    LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
304
0
                                 << ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id;
305
0
                    continue;
306
0
                }
307
0
                need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
308
0
            } else {
309
0
                auto st = writer->cancel();
310
0
                if (!st.ok()) {
311
0
                    LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id
312
0
                                 << ", transaction_id=" << _txn_id;
313
                    // just skip this tablet(writer) and continue to close others
314
0
                    continue;
315
0
                }
316
0
                VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << tablet_id
317
0
                              << ", transaction_id=" << _txn_id;
318
0
            }
319
0
        }
320
321
0
        _write_single_replica = req.write_single_replica();
322
323
        // 2. wait all writer finished flush.
324
0
        for (auto* writer : need_wait_writers) {
325
0
            RETURN_IF_ERROR((writer->wait_flush()));
326
0
        }
327
328
        // 3. build rowset
329
0
        for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
330
0
            Status st = (*it)->build_rowset();
331
0
            if (!st.ok()) {
332
0
                _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
333
0
                it = need_wait_writers.erase(it);
334
0
                continue;
335
0
            }
336
            // 3.1 calculate delete bitmap for Unique Key MoW tables
337
0
            st = (*it)->submit_calc_delete_bitmap_task();
338
0
            if (!st.ok()) {
339
0
                _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
340
0
                it = need_wait_writers.erase(it);
341
0
                continue;
342
0
            }
343
0
            it++;
344
0
        }
345
346
        // 4. wait for delete bitmap calculation complete if necessary
347
0
        for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
348
0
            Status st = (*it)->wait_calc_delete_bitmap();
349
0
            if (!st.ok()) {
350
0
                _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
351
0
                it = need_wait_writers.erase(it);
352
0
                continue;
353
0
            }
354
0
            it++;
355
0
        }
356
357
        // 5. commit all writers
358
359
0
        for (auto* writer : need_wait_writers) {
360
            // close may return failed, but no need to handle it here.
361
            // tablet_vec will only contains success tablet, and then let FE judge it.
362
0
            _commit_txn(writer, req, res);
363
0
        }
364
365
0
        if (_write_single_replica) {
366
0
            auto* success_slave_tablet_node_ids = res->mutable_success_slave_tablet_node_ids();
367
            // The operation waiting for all slave replicas to complete must end before the timeout,
368
            // so that there is enough time to collect completed replica. Otherwise, the task may
369
            // timeout and fail even though most of the replicas are completed. Here we set 0.9
370
            // times the timeout as the maximum waiting time.
371
0
            SCOPED_TIMER(_slave_replica_timer);
372
0
            while (!need_wait_writers.empty() &&
373
0
                   (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
374
0
                std::set<DeltaWriter*>::iterator it;
375
0
                for (it = need_wait_writers.begin(); it != need_wait_writers.end();) {
376
0
                    bool is_done = (*it)->check_slave_replicas_done(success_slave_tablet_node_ids);
377
0
                    if (is_done) {
378
0
                        need_wait_writers.erase(it++);
379
0
                    } else {
380
0
                        it++;
381
0
                    }
382
0
                }
383
0
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
384
0
            }
385
0
            for (auto* writer : need_wait_writers) {
386
0
                writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
387
0
            }
388
0
            _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
389
0
        }
390
0
    }
391
0
    return Status::OK();
392
0
}
393
394
void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
395
0
                                 PTabletWriterAddBlockResult* res) {
396
0
    PSlaveTabletNodes slave_nodes;
397
0
    if (_write_single_replica) {
398
0
        auto& nodes_map = req.slave_tablet_nodes();
399
0
        auto it = nodes_map.find(writer->tablet_id());
400
0
        if (it != nodes_map.end()) {
401
0
            slave_nodes = it->second;
402
0
        }
403
0
    }
404
0
    Status st = writer->commit_txn(slave_nodes);
405
0
    if (st.ok()) [[likely]] {
406
0
        auto* tablet_vec = res->mutable_tablet_vec();
407
0
        PTabletInfo* tablet_info = tablet_vec->Add();
408
0
        tablet_info->set_tablet_id(writer->tablet_id());
409
        // unused required field.
410
0
        tablet_info->set_schema_hash(0);
411
0
        tablet_info->set_received_rows(writer->total_received_rows());
412
0
        tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
413
0
        _total_received_rows += writer->total_received_rows();
414
0
        _num_rows_filtered += writer->num_rows_filtered();
415
0
    } else {
416
0
        _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st);
417
0
    }
418
0
}
419
420
void BaseTabletsChannel::_add_error_tablet(
421
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id,
422
0
        Status error) const {
423
0
    PTabletError* tablet_error = tablet_errors->Add();
424
0
    tablet_error->set_tablet_id(tablet_id);
425
0
    tablet_error->set_msg(error.to_string());
426
0
    VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " transaction_id " << _txn_id
427
0
                  << "err msg " << error;
428
0
}
429
430
0
void BaseTabletsChannel::refresh_profile() {
431
0
    int64_t write_mem_usage = 0;
432
0
    int64_t flush_mem_usage = 0;
433
0
    int64_t max_tablet_mem_usage = 0;
434
0
    int64_t max_tablet_write_mem_usage = 0;
435
0
    int64_t max_tablet_flush_mem_usage = 0;
436
0
    {
437
0
        std::lock_guard<SpinLock> l(_tablet_writers_lock);
438
0
        for (auto&& [tablet_id, writer] : _tablet_writers) {
439
0
            int64_t write_mem = writer->mem_consumption(MemType::WRITE);
440
0
            write_mem_usage += write_mem;
441
0
            int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
442
0
            flush_mem_usage += flush_mem;
443
0
            if (write_mem > max_tablet_write_mem_usage) {
444
0
                max_tablet_write_mem_usage = write_mem;
445
0
            }
446
0
            if (flush_mem > max_tablet_flush_mem_usage) {
447
0
                max_tablet_flush_mem_usage = flush_mem;
448
0
            }
449
0
            if (write_mem + flush_mem > max_tablet_mem_usage) {
450
0
                max_tablet_mem_usage = write_mem + flush_mem;
451
0
            }
452
0
        }
453
0
    }
454
0
    COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage);
455
0
    COUNTER_SET(_write_memory_usage_counter, write_mem_usage);
456
0
    COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage);
457
0
    COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage);
458
0
    COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage);
459
0
    COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage);
460
0
}
461
462
0
Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
463
0
    std::vector<SlotDescriptor*>* index_slots = nullptr;
464
0
    int32_t schema_hash = 0;
465
0
    for (const auto& index : _schema->indexes()) {
466
0
        if (index->index_id == _index_id) {
467
0
            index_slots = &index->slots;
468
0
            schema_hash = index->schema_hash;
469
0
            break;
470
0
        }
471
0
    }
472
0
    if (index_slots == nullptr) {
473
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
474
0
    }
475
476
#ifdef DEBUG
477
    // check: tablet ids should be unique
478
    {
479
        std::unordered_set<int64_t> tablet_ids;
480
        for (const auto& tablet : request.tablets()) {
481
            CHECK(tablet_ids.count(tablet.tablet_id()) == 0)
482
                    << "found duplicate tablet id: " << tablet.tablet_id();
483
            tablet_ids.insert(tablet.tablet_id());
484
        }
485
    }
486
#endif
487
488
0
    int tablet_cnt = 0;
489
    // under _lock. no need _tablet_writers_lock again.
490
0
    for (const auto& tablet : request.tablets()) {
491
0
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
492
0
            continue;
493
0
        }
494
0
        tablet_cnt++;
495
0
        WriteRequest wrequest {
496
0
                .tablet_id = tablet.tablet_id(),
497
0
                .schema_hash = schema_hash,
498
0
                .txn_id = _txn_id,
499
0
                .index_id = request.index_id(),
500
0
                .partition_id = tablet.partition_id(),
501
0
                .load_id = request.id(),
502
0
                .tuple_desc = _tuple_desc,
503
0
                .slots = index_slots,
504
0
                .table_schema_param = _schema,
505
0
                .is_high_priority = _is_high_priority,
506
0
                .write_file_cache = request.write_file_cache(),
507
0
        };
508
509
        // TODO(plat1ko): CloudDeltaWriter
510
0
        auto writer = std::make_unique<DeltaWriter>(*StorageEngine::instance(), &wrequest, _profile,
511
0
                                                    _load_id);
512
0
        {
513
0
            std::lock_guard<SpinLock> l(_tablet_writers_lock);
514
0
            _tablet_writers.emplace(tablet.tablet_id(), std::move(writer));
515
0
        }
516
0
    }
517
0
    _s_tablet_writer_count += _tablet_writers.size();
518
0
    DCHECK_EQ(_tablet_writers.size(), tablet_cnt);
519
0
    return Status::OK();
520
0
}
521
522
0
Status BaseTabletsChannel::cancel() {
523
0
    std::lock_guard<std::mutex> l(_lock);
524
0
    if (_state == kFinished) {
525
0
        return _close_status;
526
0
    }
527
0
    for (auto& it : _tablet_writers) {
528
0
        static_cast<void>(it.second->cancel());
529
0
    }
530
0
    _state = kFinished;
531
532
0
    return Status::OK();
533
0
}
534
535
0
Status TabletsChannel::cancel() {
536
0
    RETURN_IF_ERROR(BaseTabletsChannel::cancel());
537
0
    if (_write_single_replica) {
538
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
539
0
    }
540
0
    return Status::OK();
541
0
}
542
543
0
std::string TabletsChannelKey::to_string() const {
544
0
    std::stringstream ss;
545
0
    ss << *this;
546
0
    return ss.str();
547
0
}
548
549
0
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {
550
0
    os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")";
551
0
    return os;
552
0
}
553
554
Status BaseTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
555
0
                                     PTabletWriterAddBlockResult* response) {
556
0
    SCOPED_TIMER(_add_batch_timer);
557
0
    int64_t cur_seq = 0;
558
0
    _add_batch_number_counter->update(1);
559
560
0
    auto status = _get_current_seq(cur_seq, request);
561
0
    if (UNLIKELY(!status.ok())) {
562
0
        return status;
563
0
    }
564
565
0
    if (request.packet_seq() < cur_seq) {
566
0
        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
567
0
                  << ", recept_seq=" << request.packet_seq();
568
0
        return Status::OK();
569
0
    }
570
571
0
    std::unordered_map<int64_t /* tablet_id */, std::vector<uint32_t> /* row index */>
572
0
            tablet_to_rowidxs;
573
0
    _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
574
575
0
    vectorized::Block send_data;
576
0
    RETURN_IF_ERROR(send_data.deserialize(request.block()));
577
0
    CHECK(send_data.rows() == request.tablet_ids_size())
578
0
            << "block rows: " << send_data.rows()
579
0
            << ", tablet_ids_size: " << request.tablet_ids_size();
580
581
0
    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
582
0
    Defer defer {
583
0
            [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
584
585
0
    auto write_tablet_data = [&](int64_t tablet_id,
586
0
                                 std::function<Status(BaseDeltaWriter * writer)> write_func) {
587
0
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
588
0
                response->mutable_tablet_errors();
589
590
        // add_batch may concurrency with inc_open but not under _lock.
591
        // so need to protect it with _tablet_writers_lock.
592
0
        decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
593
0
        {
594
0
            std::lock_guard<SpinLock> l(_tablet_writers_lock);
595
0
            tablet_writer_it = _tablet_writers.find(tablet_id);
596
0
            if (tablet_writer_it == _tablet_writers.end()) {
597
0
                return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
598
0
            }
599
0
        }
600
601
0
        Status st = write_func(tablet_writer_it->second.get());
602
0
        if (!st.ok()) {
603
0
            auto err_msg =
604
0
                    fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}",
605
0
                                tablet_id, _txn_id, st.to_string());
606
0
            LOG(WARNING) << err_msg;
607
0
            PTabletError* error = tablet_errors->Add();
608
0
            error->set_tablet_id(tablet_id);
609
0
            error->set_msg(err_msg);
610
0
            static_cast<void>(tablet_writer_it->second->cancel_with_status(st));
611
0
            _add_broken_tablet(tablet_id);
612
            // continue write to other tablet.
613
            // the error will return back to sender.
614
0
        }
615
0
        return Status::OK();
616
0
    };
617
618
0
    SCOPED_TIMER(_write_block_timer);
619
0
    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
620
0
        RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) {
621
0
            return writer->write(&send_data, tablet_to_rowidxs_it.second);
622
0
        }));
623
0
    }
624
625
0
    {
626
0
        std::lock_guard<std::mutex> l(_lock);
627
0
        _next_seqs[request.sender_id()] = cur_seq + 1;
628
0
    }
629
0
    return Status::OK();
630
0
}
631
632
0
void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
633
0
    std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
634
0
    _broken_tablets.insert(tablet_id);
635
0
}
636
637
0
bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const {
638
0
    return _broken_tablets.find(tablet_id) != _broken_tablets.end();
639
0
}
640
641
void BaseTabletsChannel::_build_tablet_to_rowidxs(
642
        const PTabletWriterAddBlockRequest& request,
643
0
        std::unordered_map<int64_t, std::vector<uint32_t>>* tablet_to_rowidxs) {
644
    // just add a coarse-grained read lock here rather than each time when visiting _broken_tablets
645
    // tests show that a relatively coarse-grained read lock here performs better under multicore scenario
646
    // see: https://github.com/apache/doris/pull/28552
647
0
    std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
648
0
    for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
649
0
        if (request.is_single_tablet_block()) {
650
0
            break;
651
0
        }
652
0
        int64_t tablet_id = request.tablet_ids(i);
653
0
        if (_is_broken_tablet(tablet_id)) {
654
            // skip broken tablets
655
0
            VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
656
0
            continue;
657
0
        }
658
0
        auto it = tablet_to_rowidxs->find(tablet_id);
659
0
        if (it == tablet_to_rowidxs->end()) {
660
0
            tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {i});
661
0
        } else {
662
0
            it->second.emplace_back(i);
663
0
        }
664
0
    }
665
0
}
666
667
} // namespace doris