Coverage Report

Created: 2026-06-09 17:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/tablets_channel.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/tablets_channel.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <ctime>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/status.h"
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <chrono> // IWYU pragma: keep
31
#include <initializer_list>
32
#include <optional>
33
#include <set>
34
#include <thread>
35
#include <utility>
36
37
#ifdef DEBUG
38
#include <unordered_set>
39
#endif
40
41
#include "common/logging.h"
42
#include "common/metrics/doris_metrics.h"
43
#include "common/metrics/metrics.h"
44
#include "core/block/block.h"
45
#include "load/channel/load_channel.h"
46
#include "load/delta_writer/delta_writer.h"
47
#include "storage/storage_engine.h"
48
#include "storage/tablet/tablet_manager.h"
49
#include "storage/tablet_info.h"
50
#include "storage/txn/txn_manager.h"
51
#include "util/defer_op.h"
52
53
namespace doris {
54
class SlotDescriptor;
55
56
bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
57
        "tablets_channel_send_data_allocated_size");
58
59
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
60
61
std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
62
63
BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id,
64
                                       bool is_high_priority, RuntimeProfile* profile)
65
0
        : _key(key),
66
0
          _state(kInitialized),
67
0
          _load_id(load_id),
68
0
          _closed_senders(64),
69
0
          _is_high_priority(is_high_priority) {
70
0
    static std::once_flag once_flag;
71
0
    if (profile != nullptr) {
72
0
        _init_profile(profile);
73
0
    }
74
0
    std::call_once(once_flag, [] {
75
0
        REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
76
0
    });
77
0
}
78
79
TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key,
80
                               const UniqueId& load_id, bool is_high_priority,
81
                               RuntimeProfile* profile)
82
0
        : BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {}
83
84
0
BaseTabletsChannel::~BaseTabletsChannel() {
85
0
    _s_tablet_writer_count -= _tablet_writers.size();
86
0
}
87
88
0
TabletsChannel::~TabletsChannel() = default;
89
90
Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
91
0
                                            const PTabletWriterAddBlockRequest& request) {
92
0
    std::lock_guard<std::mutex> l(_lock);
93
0
    if (_state != kOpened) {
94
0
        return _state == kFinished ? _close_status
95
0
                                   : Status::InternalError("TabletsChannel {} state: {}",
96
0
                                                           _key.to_string(), _state);
97
0
    }
98
0
    cur_seq = _next_seqs[request.sender_id()];
99
    // check packet
100
0
    if (request.packet_seq() > cur_seq) {
101
0
        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
102
0
                     << ", recept_seq=" << request.packet_seq();
103
0
        return Status::InternalError("lost data packet");
104
0
    }
105
0
    return Status::OK();
106
0
}
107
108
0
void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
109
0
    DCHECK(profile != nullptr);
110
0
    _profile =
111
0
            profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true);
112
0
    _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
113
114
0
    auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
115
0
    _add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
116
0
    _write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
117
0
    _incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
118
0
    _memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES);
119
0
    _write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES);
120
0
    _flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES);
121
0
    _max_tablet_memory_usage_counter =
122
0
            memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES);
123
0
    _max_tablet_write_memory_usage_counter =
124
0
            memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES);
125
0
    _max_tablet_flush_memory_usage_counter =
126
0
            memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES);
127
0
}
128
129
0
void TabletsChannel::_init_profile(RuntimeProfile* profile) {
130
0
    DCHECK(profile != nullptr);
131
0
    BaseTabletsChannel::_init_profile(profile);
132
0
    _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
133
0
}
134
135
0
Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
136
0
    std::lock_guard<std::mutex> l(_lock);
137
    // if _state is kOpened, it's a normal case, already open by other sender
138
    // if _state is kFinished, already cancelled by other sender
139
0
    if (_state == kOpened || _state == kFinished) {
140
0
        return Status::OK();
141
0
    }
142
0
    _txn_id = request.txn_id();
143
0
    _index_id = request.index_id();
144
0
    _schema = std::make_shared<OlapTableSchemaParam>();
145
0
    RETURN_IF_ERROR(_schema->init(request.schema()));
146
0
    _tuple_desc = _schema->tuple_desc();
147
0
    int max_sender = request.num_senders();
148
    /*
149
     * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
150
     * there are two possibilities:
151
     *  1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be 
152
     *     called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never 
153
     *     be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
154
     *     the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
155
     *     return together to avoid close-then-incremental-open problem.
156
     *  2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
157
     *     (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
158
     *     distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
159
     *     then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
160
     *     and also need same number of senders' close to close. but will not hang.
161
     */
162
0
    if (_open_by_incremental) {
163
0
        DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
164
0
    } else {
165
0
        _num_remaining_senders = max_sender;
166
0
    }
167
0
    LOG(INFO) << fmt::format(
168
0
            "open tablets channel {}, tablets num: {} timeout(s): {}, init senders {} with "
169
0
            "incremental {}",
170
0
            _key.to_string(), request.tablets().size(), request.load_channel_timeout_s(),
171
0
            _num_remaining_senders, _open_by_incremental ? "on" : "off");
172
    // just use max_sender no matter incremental or not cuz we dont know how many senders will open.
173
0
    _next_seqs.resize(max_sender, 0);
174
0
    _closed_senders.Reset(max_sender);
175
176
0
    RETURN_IF_ERROR(_open_all_writers(request));
177
0
    RETURN_IF_ERROR(_init_receiver_side_random_bucket_state(request));
178
179
0
    _state = kOpened;
180
0
    return Status::OK();
181
0
}
182
183
0
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
184
0
    SCOPED_TIMER(_incremental_open_timer);
185
186
    // current node first opened by incremental open
187
0
    if (_state == kInitialized) {
188
0
        _open_by_incremental = true;
189
0
        RETURN_IF_ERROR(open(params));
190
0
    }
191
192
0
    std::lock_guard<std::mutex> l(_lock);
193
194
    // one sender may incremental_open many times. but only close one time. so dont count duplicately.
195
0
    if (_open_by_incremental) {
196
0
        if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
197
0
            _recieved_senders.insert(params.sender_id());
198
0
            _num_remaining_senders++;
199
0
        } else if (!params.has_sender_id()) { // for compatible
200
0
            _num_remaining_senders++;
201
0
        }
202
0
        VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
203
0
                                  _num_remaining_senders);
204
0
    }
205
206
0
    std::vector<SlotDescriptor*>* index_slots = nullptr;
207
0
    int32_t schema_hash = 0;
208
209
0
    for (const auto& index : _schema->indexes()) {
210
0
        if (index->index_id == _index_id) {
211
0
            index_slots = &index->slots;
212
0
            schema_hash = index->schema_hash;
213
0
            break;
214
0
        }
215
0
    }
216
0
    if (index_slots == nullptr) {
217
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
218
0
    }
219
    // update tablets
220
0
    size_t incremental_tablet_num = 0;
221
0
    std::stringstream ss;
222
0
    ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
223
0
       << " incremental open delta writer: ";
224
225
    // every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again.
226
0
    for (const auto& tablet : params.tablets()) {
227
0
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
228
0
            continue;
229
0
        }
230
0
        incremental_tablet_num++;
231
232
0
        WriteRequest wrequest;
233
0
        wrequest.index_id = params.index_id();
234
0
        wrequest.tablet_id = tablet.tablet_id();
235
0
        wrequest.schema_hash = schema_hash;
236
0
        wrequest.txn_id = _txn_id;
237
0
        wrequest.partition_id = tablet.partition_id();
238
0
        wrequest.load_id = params.id();
239
0
        wrequest.tuple_desc = _tuple_desc;
240
0
        wrequest.slots = index_slots;
241
0
        wrequest.is_high_priority = _is_high_priority;
242
0
        wrequest.table_schema_param = _schema;
243
0
        wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD.
244
0
        wrequest.write_file_cache = params.write_file_cache();
245
0
        wrequest.storage_vault_id = params.storage_vault_id();
246
0
        wrequest.enable_table_memtable_backpressure = params.is_receiver_side_random_bucket();
247
248
0
        auto delta_writer = create_delta_writer(wrequest);
249
0
        {
250
            // here we modify _tablet_writers. so need lock.
251
0
            std::lock_guard<std::mutex> lt(_tablet_writers_lock);
252
0
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
253
0
        }
254
255
0
        ss << "[" << tablet.tablet_id() << "]";
256
0
    }
257
258
0
    _s_tablet_writer_count += incremental_tablet_num;
259
0
    LOG(INFO) << ss.str();
260
0
    RETURN_IF_ERROR(_init_receiver_side_random_bucket_state(params));
261
262
0
    _state = kOpened;
263
0
    return Status::OK();
264
0
}
265
266
Status BaseTabletsChannel::_init_receiver_side_random_bucket_state(
267
0
        const PTabletWriterOpenRequest& request) {
268
0
    if (!request.is_receiver_side_random_bucket() || request.tablets().empty()) {
269
0
        return Status::OK();
270
0
    }
271
0
    if (_adaptive_random_bucket_state == nullptr) {
272
0
        _adaptive_random_bucket_state = std::make_shared<AdaptiveRandomBucketState>(_load_id);
273
0
    }
274
0
    _random_bucket_partition_params.clear();
275
0
    _random_bucket_partition_params.reserve(request.random_bucket_partitions_size());
276
0
    for (const auto& partition : request.random_bucket_partitions()) {
277
0
        RandomBucketPartitionParam params;
278
0
        params.ordered_tablet_ids.reserve(partition.ordered_tablet_ids_size());
279
0
        for (auto tablet_id : partition.ordered_tablet_ids()) {
280
0
            params.ordered_tablet_ids.push_back(tablet_id);
281
0
        }
282
0
        _random_bucket_partition_params.emplace(partition.partition_id(), std::move(params));
283
0
    }
284
285
0
    for (const auto& [partition_id, params] : _random_bucket_partition_params) {
286
0
        if (params.ordered_tablet_ids.empty()) {
287
0
            return Status::InternalError(
288
0
                    "ordered_tablet_ids is empty for receiver-side random bucket, load_id={}, "
289
0
                    "partition_id={}",
290
0
                    print_id(_load_id), partition_id);
291
0
        }
292
0
        std::vector<int32_t> ordered_positions;
293
0
        ordered_positions.reserve(params.ordered_tablet_ids.size());
294
0
        for (size_t i = 0; i < params.ordered_tablet_ids.size(); ++i) {
295
0
            ordered_positions.push_back(cast_set<int32_t>(i));
296
0
        }
297
0
        _adaptive_random_bucket_state->init_partition(partition_id, params.ordered_tablet_ids,
298
0
                                                      ordered_positions, 0);
299
0
    }
300
0
    return Status::OK();
301
0
}
302
303
0
std::unique_ptr<BaseDeltaWriter> TabletsChannel::create_delta_writer(const WriteRequest& request) {
304
0
    DCHECK(request.write_req_type == WriteRequestType::DATA);
305
0
    DCHECK(request.table_schema_param != nullptr);
306
307
0
    int64_t row_binlog_index_id = 0;
308
0
    for (const auto* index_schema : request.table_schema_param->indexes()) {
309
0
        if (index_schema->index_id == request.index_id) {
310
0
            row_binlog_index_id = index_schema->row_binlog_id;
311
0
            break;
312
0
        }
313
0
    }
314
0
    if (row_binlog_index_id <= 0) {
315
0
        return std::make_unique<DeltaWriter>(_engine, request, _profile, _load_id);
316
0
    }
317
318
0
    const auto* row_binlog_index_schema = request.table_schema_param->row_binlog_index_schema();
319
0
    DCHECK(row_binlog_index_schema != nullptr);
320
0
    DCHECK(row_binlog_index_schema->index_id == row_binlog_index_id);
321
322
    // group_build_req is only for the group wrapper itself. It provides the group semantics and
323
    // metadata used by BaseDeltaWriter/GroupRowsetBuilder to expose tablet_id, txn_id,
324
    // partition_id, load_id and profile information, while concrete rowset builders use the
325
    // sub requests below.
326
0
    WriteRequest group_build_req = request;
327
0
    group_build_req.write_req_type = WriteRequestType::GROUP;
328
329
0
    WriteRequest sub_data_req = request;
330
0
    sub_data_req.write_req_type = WriteRequestType::DATA;
331
332
0
    WriteRequest sub_row_binlog_req = request;
333
0
    sub_row_binlog_req.write_req_type = WriteRequestType::ROW_BINLOG;
334
0
    sub_row_binlog_req.index_id = row_binlog_index_schema->index_id;
335
0
    sub_row_binlog_req.schema_hash = row_binlog_index_schema->schema_hash;
336
337
0
    return std::make_unique<DeltaWriter>(_engine, group_build_req, sub_data_req, sub_row_binlog_req,
338
0
                                         _profile, _load_id);
339
0
}
340
341
Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
342
0
                             PTabletWriterAddBlockResult* res, bool* finished) {
343
0
    int sender_id = req.sender_id();
344
0
    int64_t backend_id = req.backend_id();
345
0
    const auto& partition_ids = req.partition_ids();
346
0
    auto* tablet_errors = res->mutable_tablet_errors();
347
0
    std::lock_guard<std::mutex> l(_lock);
348
0
    if (_state == kFinished) {
349
0
        return _close_status;
350
0
    }
351
0
    if (_closed_senders.Get(sender_id)) {
352
        // Double close from one sender, just return OK
353
0
        *finished = (_num_remaining_senders == 0);
354
0
        return _close_status;
355
0
    }
356
357
0
    for (auto pid : partition_ids) {
358
0
        _partition_ids.emplace(pid);
359
0
    }
360
0
    _closed_senders.Set(sender_id, true);
361
0
    _num_remaining_senders--;
362
0
    *finished = (_num_remaining_senders == 0);
363
364
0
    LOG(INFO) << fmt::format(
365
0
            "txn {}: close tablets channel of index {} , sender id: {}, backend {}, remain "
366
0
            "senders: {}",
367
0
            _txn_id, _index_id, sender_id, backend_id, _num_remaining_senders);
368
369
0
    if (!*finished) {
370
0
        return Status::OK();
371
0
    }
372
373
0
    _state = kFinished;
374
    // All senders are closed
375
    // 1. close all delta writers
376
0
    std::set<DeltaWriter*> need_wait_writers;
377
    // under _lock. no need _tablet_writers_lock again.
378
0
    for (auto&& [tablet_id, writer] : _tablet_writers) {
379
0
        if (_partition_ids.contains(writer->partition_id())) {
380
0
            auto st = writer->close();
381
0
            if (!st.ok()) {
382
0
                auto err_msg = fmt::format(
383
0
                        "close tablet writer failed, tablet_id={}, "
384
0
                        "transaction_id={}, err={}",
385
0
                        tablet_id, _txn_id, st.to_string());
386
0
                LOG(WARNING) << err_msg;
387
0
                PTabletError* tablet_error = tablet_errors->Add();
388
0
                tablet_error->set_tablet_id(tablet_id);
389
0
                tablet_error->set_msg(st.to_string());
390
                // just skip this tablet(writer) and continue to close others
391
0
                continue;
392
0
            }
393
            // tablet writer in `_broken_tablets` should not call `build_rowset` and
394
            // `commit_txn` method, after that, the publish-version task will success,
395
            // which can cause the replica inconsistency.
396
0
            if (_is_broken_tablet(writer->tablet_id())) {
397
0
                LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
398
0
                             << ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id;
399
0
                continue;
400
0
            }
401
0
            need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
402
0
        } else {
403
0
            auto st = writer->cancel();
404
0
            if (!st.ok()) {
405
0
                LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id
406
0
                             << ", transaction_id=" << _txn_id;
407
                // just skip this tablet(writer) and continue to close others
408
0
                continue;
409
0
            }
410
0
            VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << tablet_id
411
0
                          << ", transaction_id=" << _txn_id;
412
0
        }
413
0
    }
414
415
0
    _write_single_replica = req.write_single_replica();
416
417
    // 2. wait all writer finished flush.
418
0
    for (auto* writer : need_wait_writers) {
419
0
        RETURN_IF_ERROR((writer->wait_flush()));
420
0
    }
421
422
    // 3. build rowset
423
0
    for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
424
0
        Status st = (*it)->build_rowset();
425
0
        if (!st.ok()) {
426
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
427
0
            it = need_wait_writers.erase(it);
428
0
            continue;
429
0
        }
430
        // 3.1 calculate delete bitmap for Unique Key MoW tables
431
0
        st = (*it)->submit_calc_delete_bitmap_task();
432
0
        if (!st.ok()) {
433
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
434
0
            it = need_wait_writers.erase(it);
435
0
            continue;
436
0
        }
437
0
        it++;
438
0
    }
439
440
    // 4. wait for delete bitmap calculation complete if necessary
441
0
    for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
442
0
        Status st = (*it)->wait_calc_delete_bitmap();
443
0
        if (!st.ok()) {
444
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
445
0
            it = need_wait_writers.erase(it);
446
0
            continue;
447
0
        }
448
0
        it++;
449
0
    }
450
451
    // 5. commit all writers
452
453
0
    for (auto* writer : need_wait_writers) {
454
        // close may return failed, but no need to handle it here.
455
        // tablet_vec will only contains success tablet, and then let FE judge it.
456
0
        _commit_txn(writer, req, res);
457
0
    }
458
459
0
    if (_write_single_replica) {
460
0
        auto* success_slave_tablet_node_ids = res->mutable_success_slave_tablet_node_ids();
461
        // The operation waiting for all slave replicas to complete must end before the timeout,
462
        // so that there is enough time to collect completed replica. Otherwise, the task may
463
        // timeout and fail even though most of the replicas are completed. Here we set 0.9
464
        // times the timeout as the maximum waiting time.
465
0
        SCOPED_TIMER(_slave_replica_timer);
466
0
        while (!need_wait_writers.empty() &&
467
0
               (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
468
0
            std::set<DeltaWriter*>::iterator it;
469
0
            for (it = need_wait_writers.begin(); it != need_wait_writers.end();) {
470
0
                bool is_done = (*it)->check_slave_replicas_done(success_slave_tablet_node_ids);
471
0
                if (is_done) {
472
0
                    need_wait_writers.erase(it++);
473
0
                } else {
474
0
                    it++;
475
0
                }
476
0
            }
477
0
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
478
0
        }
479
0
        for (auto* writer : need_wait_writers) {
480
0
            writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
481
0
        }
482
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
483
0
    }
484
485
0
    return Status::OK();
486
0
}
487
488
void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
489
0
                                 PTabletWriterAddBlockResult* res) {
490
0
    PSlaveTabletNodes slave_nodes;
491
0
    if (_write_single_replica) {
492
0
        auto& nodes_map = req.slave_tablet_nodes();
493
0
        auto it = nodes_map.find(writer->tablet_id());
494
0
        if (it != nodes_map.end()) {
495
0
            slave_nodes = it->second;
496
0
        }
497
0
    }
498
0
    Status st = writer->commit_txn(slave_nodes);
499
0
    if (st.ok()) [[likely]] {
500
0
        auto* tablet_vec = res->mutable_tablet_vec();
501
0
        PTabletInfo* tablet_info = tablet_vec->Add();
502
0
        tablet_info->set_tablet_id(writer->tablet_id());
503
        // unused required field.
504
0
        tablet_info->set_schema_hash(0);
505
0
        tablet_info->set_received_rows(writer->total_received_rows());
506
0
        tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
507
0
        _total_received_rows += writer->total_received_rows();
508
0
        _num_rows_filtered += writer->num_rows_filtered();
509
0
    } else {
510
0
        _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st);
511
0
    }
512
0
}
513
514
void BaseTabletsChannel::_add_error_tablet(
515
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id,
516
0
        Status error) const {
517
0
    PTabletError* tablet_error = tablet_errors->Add();
518
0
    tablet_error->set_tablet_id(tablet_id);
519
0
    tablet_error->set_msg(error.to_string());
520
0
    VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " transaction_id " << _txn_id
521
0
                  << "err msg " << error;
522
0
}
523
524
0
void BaseTabletsChannel::refresh_profile() {
525
0
    int64_t write_mem_usage = 0;
526
0
    int64_t flush_mem_usage = 0;
527
0
    int64_t max_tablet_mem_usage = 0;
528
0
    int64_t max_tablet_write_mem_usage = 0;
529
0
    int64_t max_tablet_flush_mem_usage = 0;
530
0
    {
531
0
        std::lock_guard<std::mutex> l(_tablet_writers_lock);
532
0
        for (auto&& [tablet_id, writer] : _tablet_writers) {
533
0
            int64_t write_mem = writer->mem_consumption(MemType::WRITE_FINISHED);
534
0
            write_mem_usage += write_mem;
535
0
            int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
536
0
            flush_mem_usage += flush_mem;
537
0
            if (write_mem > max_tablet_write_mem_usage) {
538
0
                max_tablet_write_mem_usage = write_mem;
539
0
            }
540
0
            if (flush_mem > max_tablet_flush_mem_usage) {
541
0
                max_tablet_flush_mem_usage = flush_mem;
542
0
            }
543
0
            if (write_mem + flush_mem > max_tablet_mem_usage) {
544
0
                max_tablet_mem_usage = write_mem + flush_mem;
545
0
            }
546
0
        }
547
0
    }
548
0
    COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage);
549
0
    COUNTER_SET(_write_memory_usage_counter, write_mem_usage);
550
0
    COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage);
551
0
    COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage);
552
0
    COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage);
553
0
    COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage);
554
0
}
555
556
0
Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
557
0
    std::vector<SlotDescriptor*>* index_slots = nullptr;
558
0
    int32_t schema_hash = 0;
559
0
    for (const auto& index : _schema->indexes()) {
560
0
        if (index->index_id == _index_id) {
561
0
            index_slots = &index->slots;
562
0
            schema_hash = index->schema_hash;
563
0
            break;
564
0
        }
565
0
    }
566
0
    if (index_slots == nullptr) {
567
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
568
0
    }
569
570
#ifdef DEBUG
571
    // check: tablet ids should be unique
572
    {
573
        std::unordered_set<int64_t> tablet_ids;
574
        for (const auto& tablet : request.tablets()) {
575
            CHECK(tablet_ids.count(tablet.tablet_id()) == 0)
576
                    << "found duplicate tablet id: " << tablet.tablet_id();
577
            tablet_ids.insert(tablet.tablet_id());
578
        }
579
    }
580
#endif
581
582
0
    int tablet_cnt = 0;
583
    // under _lock. no need _tablet_writers_lock again.
584
0
    for (const auto& tablet : request.tablets()) {
585
0
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
586
0
            continue;
587
0
        }
588
0
        tablet_cnt++;
589
0
        WriteRequest wrequest {
590
0
                .tablet_id = tablet.tablet_id(),
591
0
                .schema_hash = schema_hash,
592
0
                .txn_id = _txn_id,
593
0
                .txn_expiration = request.txn_expiration(), // Required by CLOUD.
594
0
                .index_id = request.index_id(),
595
0
                .partition_id = tablet.partition_id(),
596
0
                .load_id = request.id(),
597
0
                .tuple_desc = _tuple_desc,
598
0
                .slots = index_slots,
599
0
                .table_schema_param = _schema,
600
0
                .is_high_priority = _is_high_priority,
601
0
                .write_file_cache = request.write_file_cache(),
602
0
                .storage_vault_id = request.storage_vault_id(),
603
0
                .enable_table_memtable_backpressure = request.is_receiver_side_random_bucket(),
604
0
        };
605
606
0
        auto delta_writer = create_delta_writer(wrequest);
607
0
        {
608
0
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
609
0
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
610
0
        }
611
0
    }
612
0
    _s_tablet_writer_count += _tablet_writers.size();
613
0
    DCHECK_EQ(_tablet_writers.size(), tablet_cnt);
614
0
    return Status::OK();
615
0
}
616
617
0
Status BaseTabletsChannel::cancel() {
618
0
    std::lock_guard<std::mutex> l(_lock);
619
0
    if (_state == kFinished) {
620
0
        return _close_status;
621
0
    }
622
0
    for (auto& it : _tablet_writers) {
623
0
        static_cast<void>(it.second->cancel());
624
0
    }
625
0
    _state = kFinished;
626
627
0
    return Status::OK();
628
0
}
629
630
0
Status TabletsChannel::cancel() {
631
0
    RETURN_IF_ERROR(BaseTabletsChannel::cancel());
632
0
    if (_write_single_replica) {
633
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
634
0
    }
635
0
    return Status::OK();
636
0
}
637
638
0
std::string TabletsChannelKey::to_string() const {
639
0
    std::stringstream ss;
640
0
    ss << *this;
641
0
    return ss.str();
642
0
}
643
644
0
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {
645
0
    os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")";
646
0
    return os;
647
0
}
648
649
Status BaseTabletsChannel::_write_block_data(
650
        const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
651
        std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
652
0
        PTabletWriterAddBlockResult* response) {
653
0
    Block send_data;
654
0
    [[maybe_unused]] size_t uncompressed_size = 0;
655
0
    [[maybe_unused]] int64_t uncompressed_time = 0;
656
0
    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time));
657
0
    int request_rows = request.is_receiver_side_random_bucket() ? request.partition_ids_size()
658
0
                                                                : request.tablet_ids_size();
659
0
    if (send_data.rows() != request_rows) {
660
0
        return Status::InternalError(
661
0
                "invalid add block request row count, load_id={}, index_id={}, packet_seq={}, "
662
0
                "block_rows={}, request_rows={}",
663
0
                print_id(_load_id), _index_id, request.packet_seq(), send_data.rows(),
664
0
                request_rows);
665
0
    }
666
667
0
    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
668
0
    Defer defer {
669
0
            [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
670
671
0
    auto write_tablet_data = [&](int64_t tablet_id,
672
0
                                 std::function<Status(BaseDeltaWriter * writer)> write_func) {
673
0
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
674
0
                response->mutable_tablet_errors();
675
676
        // add_batch may concurrency with inc_open but not under _lock.
677
        // so need to protect it with _tablet_writers_lock.
678
0
        decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
679
0
        {
680
0
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
681
0
            tablet_writer_it = _tablet_writers.find(tablet_id);
682
0
            if (tablet_writer_it == _tablet_writers.end()) {
683
0
                return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
684
0
            }
685
0
        }
686
687
0
        Status st = write_func(tablet_writer_it->second.get());
688
0
        if (!st.ok()) {
689
0
            auto err_msg =
690
0
                    fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}",
691
0
                                tablet_id, _txn_id, st.to_string());
692
0
            LOG(WARNING) << err_msg;
693
0
            PTabletError* error = tablet_errors->Add();
694
0
            error->set_tablet_id(tablet_id);
695
0
            error->set_msg(err_msg);
696
0
            static_cast<void>(tablet_writer_it->second->cancel_with_status(st));
697
0
            _add_broken_tablet(tablet_id);
698
            // continue write to other tablet.
699
            // the error will return back to sender.
700
0
        }
701
0
        return Status::OK();
702
0
    };
703
704
0
    SCOPED_TIMER(_write_block_timer);
705
0
    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
706
0
    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
707
0
        bool memtable_flushed = false;
708
0
        RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) {
709
0
            return writer->write(&send_data, tablet_to_rowidxs_it.second, &memtable_flushed);
710
0
        }));
711
0
        if (memtable_flushed && request.is_receiver_side_random_bucket()) {
712
0
            if (_adaptive_random_bucket_state == nullptr) {
713
0
                return Status::InternalError(
714
0
                        "receiver-side random bucket state is not initialized, load_id={}, "
715
0
                        "index_id={}, packet_seq={}",
716
0
                        print_id(_load_id), _index_id, request.packet_seq());
717
0
            }
718
0
            _adaptive_random_bucket_state->rotate_by_tablet(tablet_to_rowidxs_it.first);
719
0
        }
720
721
0
        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
722
0
        if (tablet_writer_it != _tablet_writers.end()) {
723
0
            tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
724
0
        }
725
0
    }
726
727
0
    {
728
0
        std::lock_guard<std::mutex> l(_lock);
729
0
        _next_seqs[request.sender_id()] = cur_seq + 1;
730
0
    }
731
0
    return Status::OK();
732
0
}
733
734
0
std::shared_ptr<std::mutex> BaseTabletsChannel::_get_partition_route_lock(int64_t partition_id) {
735
0
    std::lock_guard<std::mutex> l(_partition_route_locks_lock);
736
0
    auto& lock = _partition_route_locks[partition_id];
737
0
    if (lock == nullptr) {
738
0
        lock = std::make_shared<std::mutex>();
739
0
    }
740
0
    return lock;
741
0
}
742
743
Status BaseTabletsChannel::_write_block_data_for_receiver_side_random_bucket(
744
        const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
745
        std::unordered_map<int64_t, DorisVector<uint32_t>>& partition_to_rowidxs,
746
0
        PTabletWriterAddBlockResult* response) {
747
0
    Block send_data;
748
0
    [[maybe_unused]] size_t uncompressed_size = 0;
749
0
    [[maybe_unused]] int64_t uncompressed_time = 0;
750
0
    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time));
751
0
    if (send_data.rows() != request.partition_ids_size()) {
752
0
        return Status::InternalError(
753
0
                "invalid receiver-side random bucket add block request row count, load_id={}, "
754
0
                "index_id={}, packet_seq={}, block_rows={}, partition_ids_size={}",
755
0
                print_id(_load_id), _index_id, request.packet_seq(), send_data.rows(),
756
0
                request.partition_ids_size());
757
0
    }
758
759
0
    {
760
0
        std::lock_guard<std::mutex> l(_lock);
761
0
        for (const auto& [partition_id, _] : partition_to_rowidxs) {
762
0
            _partition_ids.emplace(partition_id);
763
0
        }
764
0
    }
765
766
0
    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
767
0
    Defer defer {
768
0
            [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
769
770
0
    auto* tablet_errors = response->mutable_tablet_errors();
771
0
    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
772
773
0
    auto write_partition_data = [&](int64_t partition_id,
774
0
                                    const DorisVector<uint32_t>& row_idxs) -> Status {
775
0
        auto partition_lock = _get_partition_route_lock(partition_id);
776
0
        std::lock_guard<std::mutex> partition_guard(*partition_lock);
777
778
0
        if (_adaptive_random_bucket_state == nullptr) {
779
0
            return Status::InternalError(
780
0
                    "receiver-side random bucket state is not initialized, load_id={}, "
781
0
                    "index_id={}, packet_seq={}, partition_id={}",
782
0
                    print_id(_load_id), _index_id, request.packet_seq(), partition_id);
783
0
        }
784
0
        int64_t tablet_id = _adaptive_random_bucket_state->current_tablet(partition_id);
785
0
        if (tablet_id < 0) {
786
0
            return Status::InternalError(
787
0
                    "invalid current tablet for receiver-side random bucket, load_id={}, "
788
0
                    "index_id={}, packet_seq={}, partition_id={}",
789
0
                    print_id(_load_id), _index_id, request.packet_seq(), partition_id);
790
0
        }
791
0
        VLOG_DEBUG << "FIND_TABLET_RANDOM_BUCKET: route+write begin"
792
0
                   << ", load_id=" << _load_id << ", index_id=" << _index_id
793
0
                   << ", sender_id=" << request.sender_id()
794
0
                   << ", packet_seq=" << request.packet_seq() << ", partition_id=" << partition_id
795
0
                   << ", tablet_id=" << tablet_id << ", row_count=" << row_idxs.size();
796
797
0
        {
798
0
            std::shared_lock<std::shared_mutex> broken_rlock(_broken_tablets_lock);
799
0
            if (_is_broken_tablet(tablet_id)) {
800
0
                LOG(INFO) << "FIND_TABLET_RANDOM_BUCKET: skip broken tablet"
801
0
                          << ", load_id=" << _load_id << ", index_id=" << _index_id
802
0
                          << ", sender_id=" << request.sender_id()
803
0
                          << ", packet_seq=" << request.packet_seq()
804
0
                          << ", partition_id=" << partition_id << ", tablet_id=" << tablet_id;
805
0
                return Status::OK();
806
0
            }
807
0
        }
808
809
0
        BaseDeltaWriter* tablet_writer = nullptr;
810
0
        {
811
0
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
812
0
            auto tablet_writer_it = _tablet_writers.find(tablet_id);
813
0
            if (tablet_writer_it == _tablet_writers.end()) {
814
0
                return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
815
0
            }
816
0
            tablet_writer = tablet_writer_it->second.get();
817
0
        }
818
819
0
        bool memtable_flushed = false;
820
0
        Status st = tablet_writer->write(&send_data, row_idxs, &memtable_flushed);
821
0
        if (!st.ok()) {
822
0
            auto err_msg =
823
0
                    fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}",
824
0
                                tablet_id, _txn_id, st.to_string());
825
0
            LOG(WARNING) << err_msg;
826
0
            PTabletError* error = tablet_errors->Add();
827
0
            error->set_tablet_id(tablet_id);
828
0
            error->set_msg(err_msg);
829
0
            static_cast<void>(tablet_writer->cancel_with_status(st));
830
0
            _add_broken_tablet(tablet_id);
831
0
            return Status::OK();
832
0
        }
833
834
0
        VLOG_DEBUG << "FIND_TABLET_RANDOM_BUCKET: route+write done"
835
0
                   << ", load_id=" << _load_id << ", index_id=" << _index_id
836
0
                   << ", sender_id=" << request.sender_id()
837
0
                   << ", packet_seq=" << request.packet_seq() << ", partition_id=" << partition_id
838
0
                   << ", tablet_id=" << tablet_id << ", row_count=" << row_idxs.size()
839
0
                   << ", memtable_flushed=" << memtable_flushed;
840
0
        if (memtable_flushed) {
841
0
            _adaptive_random_bucket_state->rotate_by_tablet(tablet_id);
842
0
        }
843
0
        tablet_writer->set_tablet_load_rowset_num_info(tablet_load_infos);
844
0
        return Status::OK();
845
0
    };
846
847
0
    SCOPED_TIMER(_write_block_timer);
848
0
    for (const auto& [partition_id, row_idxs] : partition_to_rowidxs) {
849
0
        RETURN_IF_ERROR(write_partition_data(partition_id, row_idxs));
850
0
    }
851
852
0
    {
853
0
        std::lock_guard<std::mutex> l(_lock);
854
0
        _next_seqs[request.sender_id()] = cur_seq + 1;
855
0
    }
856
0
    return Status::OK();
857
0
}
858
859
Status BaseTabletsChannel::_build_partition_to_rowidxs_for_receiver_side_random_bucket(
860
        const PTabletWriterAddBlockRequest& request,
861
0
        std::unordered_map<int64_t, DorisVector<uint32_t>>* partition_to_rowidxs) {
862
0
    if (_adaptive_random_bucket_state == nullptr) {
863
0
        return Status::InternalError(
864
0
                "receiver-side random bucket state is not initialized, load_id={}, index_id={}, "
865
0
                "packet_seq={}",
866
0
                print_id(_load_id), _index_id, request.packet_seq());
867
0
    }
868
0
    if (request.partition_ids_size() == 0) {
869
0
        return Status::InternalError(
870
0
                "empty partition ids for receiver-side random bucket add block, load_id={}, "
871
0
                "index_id={}, packet_seq={}",
872
0
                print_id(_load_id), _index_id, request.packet_seq());
873
0
    }
874
0
    for (uint32_t i = 0; i < request.partition_ids_size(); ++i) {
875
0
        int64_t partition_id = request.partition_ids(i);
876
0
        auto it = partition_to_rowidxs->find(partition_id);
877
0
        if (it == partition_to_rowidxs->end()) {
878
0
            partition_to_rowidxs->emplace(partition_id, std::initializer_list<uint32_t> {i});
879
0
        } else {
880
0
            it->second.emplace_back(i);
881
0
        }
882
0
    }
883
0
    return Status::OK();
884
0
}
885
886
Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
887
0
                                 PTabletWriterAddBlockResult* response) {
888
0
    SCOPED_TIMER(_add_batch_timer);
889
0
    int64_t cur_seq = 0;
890
0
    if (_add_batch_number_counter) {
891
0
        _add_batch_number_counter->update(1);
892
0
    }
893
894
0
    auto status = _get_current_seq(cur_seq, request);
895
0
    if (UNLIKELY(!status.ok())) {
896
0
        return status;
897
0
    }
898
899
0
    if (request.packet_seq() < cur_seq) {
900
0
        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
901
0
                  << ", recept_seq=" << request.packet_seq();
902
0
        return Status::OK();
903
0
    }
904
905
0
    if (request.is_receiver_side_random_bucket()) {
906
0
        std::unordered_map<int64_t /* partition_id */, DorisVector<uint32_t> /* row index */>
907
0
                partition_to_rowidxs;
908
0
        RETURN_IF_ERROR(_build_partition_to_rowidxs_for_receiver_side_random_bucket(
909
0
                request, &partition_to_rowidxs));
910
0
        return _write_block_data_for_receiver_side_random_bucket(request, cur_seq,
911
0
                                                                 partition_to_rowidxs, response);
912
0
    }
913
914
0
    std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>
915
0
            tablet_to_rowidxs;
916
0
    _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
917
0
    return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
918
0
}
919
920
0
void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
921
0
    std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
922
0
    _broken_tablets.insert(tablet_id);
923
0
}
924
925
0
bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const {
926
0
    return _broken_tablets.find(tablet_id) != _broken_tablets.end();
927
0
}
928
929
void BaseTabletsChannel::_build_tablet_to_rowidxs(
930
        const PTabletWriterAddBlockRequest& request,
931
0
        std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) {
932
    // just add a coarse-grained read lock here rather than each time when visiting _broken_tablets
933
    // tests show that a relatively coarse-grained read lock here performs better under multicore scenario
934
    // see: https://github.com/apache/doris/pull/28552
935
0
    std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
936
0
    if (request.is_single_tablet_block()) {
937
        // The cloud mode need the tablet ids to prepare rowsets.
938
0
        int64_t tablet_id = request.tablet_ids(0);
939
0
        tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {0});
940
0
        return;
941
0
    }
942
0
    for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
943
0
        int64_t tablet_id = request.tablet_ids(i);
944
0
        if (_is_broken_tablet(tablet_id)) {
945
            // skip broken tablets
946
0
            VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
947
0
            continue;
948
0
        }
949
0
        auto it = tablet_to_rowidxs->find(tablet_id);
950
0
        if (it == tablet_to_rowidxs->end()) {
951
0
            tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {i});
952
0
        } else {
953
0
            it->second.emplace_back(i);
954
0
        }
955
0
    }
956
0
}
957
958
} // namespace doris