Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/tablets_channel.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/tablets_channel.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <ctime>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/status.h"
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <chrono> // IWYU pragma: keep
31
#include <initializer_list>
32
#include <set>
33
#include <thread>
34
#include <utility>
35
36
#ifdef DEBUG
37
#include <unordered_set>
38
#endif
39
40
#include "common/logging.h"
41
#include "common/metrics/doris_metrics.h"
42
#include "common/metrics/metrics.h"
43
#include "core/block/block.h"
44
#include "load/channel/load_channel.h"
45
#include "load/delta_writer/delta_writer.h"
46
#include "storage/storage_engine.h"
47
#include "storage/tablet_info.h"
48
#include "storage/txn/txn_manager.h"
49
#include "util/defer_op.h"
50
51
namespace doris {
52
class SlotDescriptor;
53
54
bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
55
        "tablets_channel_send_data_allocated_size");
56
57
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
58
59
std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
60
61
BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id,
62
                                       bool is_high_priority, RuntimeProfile* profile)
63
29.1k
        : _key(key),
64
29.1k
          _state(kInitialized),
65
29.1k
          _load_id(load_id),
66
29.1k
          _closed_senders(64),
67
29.1k
          _is_high_priority(is_high_priority) {
68
29.1k
    static std::once_flag once_flag;
69
29.1k
    _init_profile(profile);
70
29.1k
    std::call_once(once_flag, [] {
71
5
        REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
72
5
    });
73
29.1k
}
74
75
TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key,
76
                               const UniqueId& load_id, bool is_high_priority,
77
                               RuntimeProfile* profile)
78
12
        : BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {}
79
80
29.1k
BaseTabletsChannel::~BaseTabletsChannel() {
81
29.1k
    _s_tablet_writer_count -= _tablet_writers.size();
82
29.1k
}
83
84
TabletsChannel::~TabletsChannel() = default;
85
86
Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
87
32.1k
                                            const PTabletWriterAddBlockRequest& request) {
88
32.1k
    std::lock_guard<std::mutex> l(_lock);
89
32.1k
    if (_state != kOpened) {
90
0
        return _state == kFinished ? _close_status
91
0
                                   : Status::InternalError("TabletsChannel {} state: {}",
92
0
                                                           _key.to_string(), _state);
93
0
    }
94
32.1k
    cur_seq = _next_seqs[request.sender_id()];
95
    // check packet
96
32.1k
    if (request.packet_seq() > cur_seq) {
97
0
        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
98
0
                     << ", recept_seq=" << request.packet_seq();
99
0
        return Status::InternalError("lost data packet");
100
0
    }
101
32.1k
    return Status::OK();
102
32.1k
}
103
104
29.1k
void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
105
29.1k
    _profile =
106
29.1k
            profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true);
107
29.1k
    _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
108
109
29.1k
    auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
110
29.1k
    _add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
111
29.1k
    _write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
112
29.1k
    _incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
113
29.1k
    _memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES);
114
29.1k
    _write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES);
115
29.1k
    _flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES);
116
29.1k
    _max_tablet_memory_usage_counter =
117
29.1k
            memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES);
118
29.1k
    _max_tablet_write_memory_usage_counter =
119
29.1k
            memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES);
120
29.1k
    _max_tablet_flush_memory_usage_counter =
121
29.1k
            memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES);
122
29.1k
}
123
124
0
void TabletsChannel::_init_profile(RuntimeProfile* profile) {
125
0
    BaseTabletsChannel::_init_profile(profile);
126
0
    _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
127
0
}
128
129
66.7k
Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
130
66.7k
    std::lock_guard<std::mutex> l(_lock);
131
    // if _state is kOpened, it's a normal case, already open by other sender
132
    // if _state is kFinished, already cancelled by other sender
133
66.7k
    if (_state == kOpened || _state == kFinished) {
134
37.5k
        return Status::OK();
135
37.5k
    }
136
29.1k
    _txn_id = request.txn_id();
137
29.1k
    _index_id = request.index_id();
138
29.1k
    _schema = std::make_shared<OlapTableSchemaParam>();
139
29.1k
    RETURN_IF_ERROR(_schema->init(request.schema()));
140
29.1k
    _tuple_desc = _schema->tuple_desc();
141
142
29.1k
    int max_sender = request.num_senders();
143
    /*
144
     * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
145
     * there are two possibilities:
146
     *  1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be 
147
     *     called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never 
148
     *     be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
149
     *     the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
150
     *     return together to avoid close-then-incremental-open problem.
151
     *  2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
152
     *     (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
153
     *     distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
154
     *     then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
155
     *     and also need same number of senders' close to close. but will not hang.
156
     */
157
29.1k
    if (_open_by_incremental) {
158
0
        DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
159
29.1k
    } else {
160
29.1k
        _num_remaining_senders = max_sender;
161
29.1k
    }
162
29.1k
    LOG(INFO) << fmt::format(
163
29.1k
            "open tablets channel {}, tablets num: {} timeout(s): {}, init senders {} with "
164
29.1k
            "incremental {}",
165
29.1k
            _key.to_string(), request.tablets().size(), request.load_channel_timeout_s(),
166
29.1k
            _num_remaining_senders, _open_by_incremental ? "on" : "off");
167
    // just use max_sender no matter incremental or not cuz we dont know how many senders will open.
168
29.1k
    _next_seqs.resize(max_sender, 0);
169
29.1k
    _closed_senders.Reset(max_sender);
170
171
29.1k
    RETURN_IF_ERROR(_open_all_writers(request));
172
173
29.1k
    _state = kOpened;
174
29.1k
    return Status::OK();
175
29.1k
}
176
177
179
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
178
179
    SCOPED_TIMER(_incremental_open_timer);
179
180
    // current node first opened by incremental open
181
179
    if (_state == kInitialized) {
182
0
        _open_by_incremental = true;
183
0
        RETURN_IF_ERROR(open(params));
184
0
    }
185
186
179
    std::lock_guard<std::mutex> l(_lock);
187
188
    // one sender may incremental_open many times. but only close one time. so dont count duplicately.
189
179
    if (_open_by_incremental) {
190
0
        if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
191
0
            _recieved_senders.insert(params.sender_id());
192
0
            _num_remaining_senders++;
193
0
        } else if (!params.has_sender_id()) { // for compatible
194
0
            _num_remaining_senders++;
195
0
        }
196
0
        VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
197
0
                                  _num_remaining_senders);
198
0
    }
199
200
179
    std::vector<SlotDescriptor*>* index_slots = nullptr;
201
179
    int32_t schema_hash = 0;
202
179
    for (const auto& index : _schema->indexes()) {
203
179
        if (index->index_id == _index_id) {
204
179
            index_slots = &index->slots;
205
179
            schema_hash = index->schema_hash;
206
179
            break;
207
179
        }
208
179
    }
209
179
    if (index_slots == nullptr) {
210
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
211
0
    }
212
    // update tablets
213
179
    size_t incremental_tablet_num = 0;
214
179
    std::stringstream ss;
215
179
    ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
216
179
       << " incremental open delta writer: ";
217
218
    // every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again.
219
8.82k
    for (const auto& tablet : params.tablets()) {
220
8.82k
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
221
96
            continue;
222
96
        }
223
8.73k
        incremental_tablet_num++;
224
225
8.73k
        WriteRequest wrequest;
226
8.73k
        wrequest.index_id = params.index_id();
227
8.73k
        wrequest.tablet_id = tablet.tablet_id();
228
8.73k
        wrequest.schema_hash = schema_hash;
229
8.73k
        wrequest.txn_id = _txn_id;
230
8.73k
        wrequest.partition_id = tablet.partition_id();
231
8.73k
        wrequest.load_id = params.id();
232
8.73k
        wrequest.tuple_desc = _tuple_desc;
233
8.73k
        wrequest.slots = index_slots;
234
8.73k
        wrequest.is_high_priority = _is_high_priority;
235
8.73k
        wrequest.table_schema_param = _schema;
236
8.73k
        wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD.
237
8.73k
        wrequest.storage_vault_id = params.storage_vault_id();
238
239
8.73k
        auto delta_writer = create_delta_writer(wrequest);
240
8.73k
        {
241
            // here we modify _tablet_writers. so need lock.
242
8.73k
            std::lock_guard<std::mutex> lt(_tablet_writers_lock);
243
8.73k
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
244
8.73k
        }
245
246
8.73k
        ss << "[" << tablet.tablet_id() << "]";
247
8.73k
    }
248
249
179
    _s_tablet_writer_count += incremental_tablet_num;
250
179
    LOG(INFO) << ss.str();
251
252
179
    _state = kOpened;
253
179
    return Status::OK();
254
179
}
255
256
90
std::unique_ptr<BaseDeltaWriter> TabletsChannel::create_delta_writer(const WriteRequest& request) {
257
90
    return std::make_unique<DeltaWriter>(_engine, request, _profile, _load_id);
258
90
}
259
260
Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
261
12
                             PTabletWriterAddBlockResult* res, bool* finished) {
262
12
    int sender_id = req.sender_id();
263
12
    int64_t backend_id = req.backend_id();
264
12
    const auto& partition_ids = req.partition_ids();
265
12
    auto* tablet_errors = res->mutable_tablet_errors();
266
12
    std::lock_guard<std::mutex> l(_lock);
267
12
    if (_state == kFinished) {
268
0
        return _close_status;
269
0
    }
270
12
    if (_closed_senders.Get(sender_id)) {
271
        // Double close from one sender, just return OK
272
0
        *finished = (_num_remaining_senders == 0);
273
0
        return _close_status;
274
0
    }
275
276
12
    for (auto pid : partition_ids) {
277
12
        _partition_ids.emplace(pid);
278
12
    }
279
12
    _closed_senders.Set(sender_id, true);
280
12
    _num_remaining_senders--;
281
12
    *finished = (_num_remaining_senders == 0);
282
283
12
    LOG(INFO) << fmt::format(
284
12
            "txn {}: close tablets channel of index {} , sender id: {}, backend {}, remain "
285
12
            "senders: {}",
286
12
            _txn_id, _index_id, sender_id, backend_id, _num_remaining_senders);
287
288
12
    if (!*finished) {
289
0
        return Status::OK();
290
0
    }
291
292
12
    _state = kFinished;
293
    // All senders are closed
294
    // 1. close all delta writers
295
12
    std::set<DeltaWriter*> need_wait_writers;
296
    // under _lock. no need _tablet_writers_lock again.
297
90
    for (auto&& [tablet_id, writer] : _tablet_writers) {
298
90
        if (_partition_ids.contains(writer->partition_id())) {
299
54
            auto st = writer->close();
300
54
            if (!st.ok()) {
301
0
                auto err_msg = fmt::format(
302
0
                        "close tablet writer failed, tablet_id={}, "
303
0
                        "transaction_id={}, err={}",
304
0
                        tablet_id, _txn_id, st.to_string());
305
0
                LOG(WARNING) << err_msg;
306
0
                PTabletError* tablet_error = tablet_errors->Add();
307
0
                tablet_error->set_tablet_id(tablet_id);
308
0
                tablet_error->set_msg(st.to_string());
309
                // just skip this tablet(writer) and continue to close others
310
0
                continue;
311
0
            }
312
            // tablet writer in `_broken_tablets` should not call `build_rowset` and
313
            // `commit_txn` method, after that, the publish-version task will success,
314
            // which can cause the replica inconsistency.
315
54
            if (_is_broken_tablet(writer->tablet_id())) {
316
0
                LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
317
0
                             << ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id;
318
0
                continue;
319
0
            }
320
54
            need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
321
54
        } else {
322
36
            auto st = writer->cancel();
323
36
            if (!st.ok()) {
324
0
                LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id
325
0
                             << ", transaction_id=" << _txn_id;
326
                // just skip this tablet(writer) and continue to close others
327
0
                continue;
328
0
            }
329
36
            VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << tablet_id
330
0
                          << ", transaction_id=" << _txn_id;
331
36
        }
332
90
    }
333
334
12
    _write_single_replica = req.write_single_replica();
335
336
    // 2. wait all writer finished flush.
337
54
    for (auto* writer : need_wait_writers) {
338
54
        RETURN_IF_ERROR((writer->wait_flush()));
339
54
    }
340
341
    // 3. build rowset
342
66
    for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
343
54
        Status st = (*it)->build_rowset();
344
54
        if (!st.ok()) {
345
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
346
0
            it = need_wait_writers.erase(it);
347
0
            continue;
348
0
        }
349
        // 3.1 calculate delete bitmap for Unique Key MoW tables
350
54
        st = (*it)->submit_calc_delete_bitmap_task();
351
54
        if (!st.ok()) {
352
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
353
0
            it = need_wait_writers.erase(it);
354
0
            continue;
355
0
        }
356
54
        it++;
357
54
    }
358
359
    // 4. wait for delete bitmap calculation complete if necessary
360
66
    for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
361
54
        Status st = (*it)->wait_calc_delete_bitmap();
362
54
        if (!st.ok()) {
363
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
364
0
            it = need_wait_writers.erase(it);
365
0
            continue;
366
0
        }
367
54
        it++;
368
54
    }
369
370
    // 5. commit all writers
371
372
54
    for (auto* writer : need_wait_writers) {
373
        // close may return failed, but no need to handle it here.
374
        // tablet_vec will only contains success tablet, and then let FE judge it.
375
54
        _commit_txn(writer, req, res);
376
54
    }
377
378
12
    if (_write_single_replica) {
379
0
        auto* success_slave_tablet_node_ids = res->mutable_success_slave_tablet_node_ids();
380
        // The operation waiting for all slave replicas to complete must end before the timeout,
381
        // so that there is enough time to collect completed replica. Otherwise, the task may
382
        // timeout and fail even though most of the replicas are completed. Here we set 0.9
383
        // times the timeout as the maximum waiting time.
384
0
        SCOPED_TIMER(_slave_replica_timer);
385
0
        while (!need_wait_writers.empty() &&
386
0
               (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
387
0
            std::set<DeltaWriter*>::iterator it;
388
0
            for (it = need_wait_writers.begin(); it != need_wait_writers.end();) {
389
0
                bool is_done = (*it)->check_slave_replicas_done(success_slave_tablet_node_ids);
390
0
                if (is_done) {
391
0
                    need_wait_writers.erase(it++);
392
0
                } else {
393
0
                    it++;
394
0
                }
395
0
            }
396
0
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
397
0
        }
398
0
        for (auto* writer : need_wait_writers) {
399
0
            writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
400
0
        }
401
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
402
0
    }
403
404
12
    return Status::OK();
405
12
}
406
407
void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
408
54
                                 PTabletWriterAddBlockResult* res) {
409
54
    PSlaveTabletNodes slave_nodes;
410
54
    if (_write_single_replica) {
411
0
        auto& nodes_map = req.slave_tablet_nodes();
412
0
        auto it = nodes_map.find(writer->tablet_id());
413
0
        if (it != nodes_map.end()) {
414
0
            slave_nodes = it->second;
415
0
        }
416
0
    }
417
54
    Status st = writer->commit_txn(slave_nodes);
418
54
    if (st.ok()) [[likely]] {
419
54
        auto* tablet_vec = res->mutable_tablet_vec();
420
54
        PTabletInfo* tablet_info = tablet_vec->Add();
421
54
        tablet_info->set_tablet_id(writer->tablet_id());
422
        // unused required field.
423
54
        tablet_info->set_schema_hash(0);
424
54
        tablet_info->set_received_rows(writer->total_received_rows());
425
54
        tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
426
54
        _total_received_rows += writer->total_received_rows();
427
54
        _num_rows_filtered += writer->num_rows_filtered();
428
54
    } else {
429
0
        _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st);
430
0
    }
431
54
}
432
433
void BaseTabletsChannel::_add_error_tablet(
434
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id,
435
0
        Status error) const {
436
0
    PTabletError* tablet_error = tablet_errors->Add();
437
0
    tablet_error->set_tablet_id(tablet_id);
438
0
    tablet_error->set_msg(error.to_string());
439
0
    VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " transaction_id " << _txn_id
440
0
                  << "err msg " << error;
441
0
}
442
443
27
void BaseTabletsChannel::refresh_profile() {
444
27
    int64_t write_mem_usage = 0;
445
27
    int64_t flush_mem_usage = 0;
446
27
    int64_t max_tablet_mem_usage = 0;
447
27
    int64_t max_tablet_write_mem_usage = 0;
448
27
    int64_t max_tablet_flush_mem_usage = 0;
449
27
    {
450
27
        std::lock_guard<std::mutex> l(_tablet_writers_lock);
451
240
        for (auto&& [tablet_id, writer] : _tablet_writers) {
452
240
            int64_t write_mem = writer->mem_consumption(MemType::WRITE_FINISHED);
453
240
            write_mem_usage += write_mem;
454
240
            int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
455
240
            flush_mem_usage += flush_mem;
456
240
            if (write_mem > max_tablet_write_mem_usage) {
457
0
                max_tablet_write_mem_usage = write_mem;
458
0
            }
459
240
            if (flush_mem > max_tablet_flush_mem_usage) {
460
0
                max_tablet_flush_mem_usage = flush_mem;
461
0
            }
462
240
            if (write_mem + flush_mem > max_tablet_mem_usage) {
463
0
                max_tablet_mem_usage = write_mem + flush_mem;
464
0
            }
465
240
        }
466
27
    }
467
27
    COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage);
468
27
    COUNTER_SET(_write_memory_usage_counter, write_mem_usage);
469
27
    COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage);
470
27
    COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage);
471
27
    COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage);
472
27
    COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage);
473
27
}
474
475
29.1k
Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
476
29.1k
    std::vector<SlotDescriptor*>* index_slots = nullptr;
477
29.1k
    int32_t schema_hash = 0;
478
33.1k
    for (const auto& index : _schema->indexes()) {
479
33.1k
        if (index->index_id == _index_id) {
480
29.1k
            index_slots = &index->slots;
481
29.1k
            schema_hash = index->schema_hash;
482
29.1k
            break;
483
29.1k
        }
484
33.1k
    }
485
29.1k
    if (index_slots == nullptr) {
486
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
487
0
    }
488
489
#ifdef DEBUG
490
    // check: tablet ids should be unique
491
    {
492
        std::unordered_set<int64_t> tablet_ids;
493
        for (const auto& tablet : request.tablets()) {
494
            CHECK(tablet_ids.count(tablet.tablet_id()) == 0)
495
                    << "found duplicate tablet id: " << tablet.tablet_id();
496
            tablet_ids.insert(tablet.tablet_id());
497
        }
498
    }
499
#endif
500
501
29.1k
    int tablet_cnt = 0;
502
    // under _lock. no need _tablet_writers_lock again.
503
263k
    for (const auto& tablet : request.tablets()) {
504
263k
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
505
0
            continue;
506
0
        }
507
263k
        tablet_cnt++;
508
263k
        WriteRequest wrequest {
509
263k
                .tablet_id = tablet.tablet_id(),
510
263k
                .schema_hash = schema_hash,
511
263k
                .txn_id = _txn_id,
512
263k
                .txn_expiration = request.txn_expiration(), // Required by CLOUD.
513
263k
                .index_id = request.index_id(),
514
263k
                .partition_id = tablet.partition_id(),
515
263k
                .load_id = request.id(),
516
263k
                .tuple_desc = _tuple_desc,
517
263k
                .slots = index_slots,
518
263k
                .table_schema_param = _schema,
519
263k
                .is_high_priority = _is_high_priority,
520
263k
                .write_file_cache = request.write_file_cache(),
521
263k
                .storage_vault_id = request.storage_vault_id(),
522
263k
        };
523
524
263k
        auto delta_writer = create_delta_writer(wrequest);
525
263k
        {
526
263k
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
527
263k
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
528
263k
        }
529
263k
    }
530
29.1k
    _s_tablet_writer_count += _tablet_writers.size();
531
29.1k
    DCHECK_EQ(_tablet_writers.size(), tablet_cnt);
532
29.1k
    return Status::OK();
533
29.1k
}
534
535
126
Status BaseTabletsChannel::cancel() {
536
126
    std::lock_guard<std::mutex> l(_lock);
537
126
    if (_state == kFinished) {
538
96
        return _close_status;
539
96
    }
540
1.39k
    for (auto& it : _tablet_writers) {
541
1.39k
        static_cast<void>(it.second->cancel());
542
1.39k
    }
543
30
    _state = kFinished;
544
545
30
    return Status::OK();
546
126
}
547
548
0
Status TabletsChannel::cancel() {
549
0
    RETURN_IF_ERROR(BaseTabletsChannel::cancel());
550
0
    if (_write_single_replica) {
551
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
552
0
    }
553
0
    return Status::OK();
554
0
}
555
556
58.3k
std::string TabletsChannelKey::to_string() const {
557
58.3k
    std::stringstream ss;
558
58.3k
    ss << *this;
559
58.3k
    return ss.str();
560
58.3k
}
561
562
125k
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {
563
125k
    os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")";
564
125k
    return os;
565
125k
}
566
567
Status BaseTabletsChannel::_write_block_data(
568
        const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
569
        std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
570
32.1k
        PTabletWriterAddBlockResult* response) {
571
32.1k
    Block send_data;
572
32.1k
    [[maybe_unused]] size_t uncompressed_size = 0;
573
32.1k
    [[maybe_unused]] int64_t uncompressed_time = 0;
574
32.1k
    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time));
575
18.4E
    CHECK(send_data.rows() == request.tablet_ids_size())
576
18.4E
            << "block rows: " << send_data.rows()
577
18.4E
            << ", tablet_ids_size: " << request.tablet_ids_size();
578
579
32.1k
    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
580
32.1k
    Defer defer {
581
32.1k
            [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
582
583
32.1k
    auto write_tablet_data = [&](int64_t tablet_id,
584
114k
                                 std::function<Status(BaseDeltaWriter * writer)> write_func) {
585
114k
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
586
114k
                response->mutable_tablet_errors();
587
588
        // add_batch may concurrency with inc_open but not under _lock.
589
        // so need to protect it with _tablet_writers_lock.
590
114k
        decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
591
114k
        {
592
114k
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
593
114k
            tablet_writer_it = _tablet_writers.find(tablet_id);
594
114k
            if (tablet_writer_it == _tablet_writers.end()) {
595
0
                return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
596
0
            }
597
114k
        }
598
599
114k
        Status st = write_func(tablet_writer_it->second.get());
600
114k
        if (!st.ok()) {
601
0
            auto err_msg =
602
0
                    fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}",
603
0
                                tablet_id, _txn_id, st.to_string());
604
0
            LOG(WARNING) << err_msg;
605
0
            PTabletError* error = tablet_errors->Add();
606
0
            error->set_tablet_id(tablet_id);
607
0
            error->set_msg(err_msg);
608
0
            static_cast<void>(tablet_writer_it->second->cancel_with_status(st));
609
0
            _add_broken_tablet(tablet_id);
610
            // continue write to other tablet.
611
            // the error will return back to sender.
612
0
        }
613
114k
        return Status::OK();
614
114k
    };
615
616
32.1k
    SCOPED_TIMER(_write_block_timer);
617
32.1k
    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
618
114k
    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
619
114k
        RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) {
620
114k
            return writer->write(&send_data, tablet_to_rowidxs_it.second);
621
114k
        }));
622
623
114k
        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
624
114k
        if (tablet_writer_it != _tablet_writers.end()) {
625
114k
            tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
626
114k
        }
627
114k
    }
628
629
32.1k
    {
630
32.1k
        std::lock_guard<std::mutex> l(_lock);
631
32.1k
        _next_seqs[request.sender_id()] = cur_seq + 1;
632
32.1k
    }
633
32.1k
    return Status::OK();
634
32.1k
}
635
636
Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
637
12
                                 PTabletWriterAddBlockResult* response) {
638
12
    SCOPED_TIMER(_add_batch_timer);
639
12
    int64_t cur_seq = 0;
640
12
    _add_batch_number_counter->update(1);
641
642
12
    auto status = _get_current_seq(cur_seq, request);
643
12
    if (UNLIKELY(!status.ok())) {
644
0
        return status;
645
0
    }
646
647
12
    if (request.packet_seq() < cur_seq) {
648
0
        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
649
0
                  << ", recept_seq=" << request.packet_seq();
650
0
        return Status::OK();
651
0
    }
652
653
12
    std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>
654
12
            tablet_to_rowidxs;
655
12
    _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
656
657
12
    return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
658
12
}
659
660
0
void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
661
0
    std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
662
0
    _broken_tablets.insert(tablet_id);
663
0
}
664
665
35.8M
bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const {
666
35.8M
    return _broken_tablets.find(tablet_id) != _broken_tablets.end();
667
35.8M
}
668
669
void BaseTabletsChannel::_build_tablet_to_rowidxs(
670
        const PTabletWriterAddBlockRequest& request,
671
32.1k
        std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) {
672
    // just add a coarse-grained read lock here rather than each time when visiting _broken_tablets
673
    // tests show that a relatively coarse-grained read lock here performs better under multicore scenario
674
    // see: https://github.com/apache/doris/pull/28552
675
32.1k
    std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
676
32.1k
    if (request.is_single_tablet_block()) {
677
        // The cloud mode need the tablet ids to prepare rowsets.
678
0
        int64_t tablet_id = request.tablet_ids(0);
679
0
        tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {0});
680
0
        return;
681
0
    }
682
35.7M
    for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
683
35.6M
        int64_t tablet_id = request.tablet_ids(i);
684
35.6M
        if (_is_broken_tablet(tablet_id)) {
685
            // skip broken tablets
686
0
            VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
687
0
            continue;
688
0
        }
689
35.6M
        auto it = tablet_to_rowidxs->find(tablet_id);
690
35.6M
        if (it == tablet_to_rowidxs->end()) {
691
114k
            tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {i});
692
35.5M
        } else {
693
35.5M
            it->second.emplace_back(i);
694
35.5M
        }
695
35.6M
    }
696
32.1k
}
697
698
} // namespace doris