Coverage Report

Created: 2026-05-09 01:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/tablets_channel.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/tablets_channel.h"
19
20
#include <bvar/bvar.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
25
#include <ctime>
26
27
#include "common/compiler_util.h" // IWYU pragma: keep
28
#include "common/status.h"
29
// IWYU pragma: no_include <bits/chrono.h>
30
#include <chrono> // IWYU pragma: keep
31
#include <initializer_list>
32
#include <optional>
33
#include <set>
34
#include <thread>
35
#include <utility>
36
37
#ifdef DEBUG
38
#include <unordered_set>
39
#endif
40
41
#include "common/logging.h"
42
#include "common/metrics/doris_metrics.h"
43
#include "common/metrics/metrics.h"
44
#include "core/block/block.h"
45
#include "load/channel/load_channel.h"
46
#include "load/delta_writer/delta_writer.h"
47
#include "storage/storage_engine.h"
48
#include "storage/tablet_info.h"
49
#include "storage/txn/txn_manager.h"
50
#include "util/defer_op.h"
51
52
namespace doris {
53
class SlotDescriptor;
54
55
bvar::Adder<int64_t> g_tablets_channel_send_data_allocated_size(
56
        "tablets_channel_send_data_allocated_size");
57
58
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
59
60
std::atomic<uint64_t> BaseTabletsChannel::_s_tablet_writer_count;
61
62
BaseTabletsChannel::BaseTabletsChannel(const TabletsChannelKey& key, const UniqueId& load_id,
63
                                       bool is_high_priority, RuntimeProfile* profile)
64
30.0k
        : _key(key),
65
30.0k
          _state(kInitialized),
66
30.0k
          _load_id(load_id),
67
30.0k
          _closed_senders(64),
68
30.0k
          _is_high_priority(is_high_priority) {
69
30.0k
    static std::once_flag once_flag;
70
30.0k
    if (profile != nullptr) {
71
3
        _init_profile(profile);
72
3
    }
73
30.0k
    std::call_once(once_flag, [] {
74
5
        REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
75
5
    });
76
30.0k
}
77
78
TabletsChannel::TabletsChannel(StorageEngine& engine, const TabletsChannelKey& key,
79
                               const UniqueId& load_id, bool is_high_priority,
80
                               RuntimeProfile* profile)
81
464
        : BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {}
82
83
30.0k
BaseTabletsChannel::~BaseTabletsChannel() {
84
30.0k
    _s_tablet_writer_count -= _tablet_writers.size();
85
30.0k
}
86
87
TabletsChannel::~TabletsChannel() = default;
88
89
Status BaseTabletsChannel::_get_current_seq(int64_t& cur_seq,
90
32.9k
                                            const PTabletWriterAddBlockRequest& request) {
91
32.9k
    std::lock_guard<std::mutex> l(_lock);
92
32.9k
    if (_state != kOpened) {
93
0
        return _state == kFinished ? _close_status
94
0
                                   : Status::InternalError("TabletsChannel {} state: {}",
95
0
                                                           _key.to_string(), _state);
96
0
    }
97
32.9k
    cur_seq = _next_seqs[request.sender_id()];
98
    // check packet
99
32.9k
    if (request.packet_seq() > cur_seq) {
100
0
        LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq
101
0
                     << ", recept_seq=" << request.packet_seq();
102
0
        return Status::InternalError("lost data packet");
103
0
    }
104
32.9k
    return Status::OK();
105
32.9k
}
106
107
3
void BaseTabletsChannel::_init_profile(RuntimeProfile* profile) {
108
3
    DCHECK(profile != nullptr);
109
3
    _profile =
110
3
            profile->create_child(fmt::format("TabletsChannel {}", _key.to_string()), true, true);
111
3
    _add_batch_number_counter = ADD_COUNTER(_profile, "NumberBatchAdded", TUnit::UNIT);
112
113
3
    auto* memory_usage = _profile->create_child("PeakMemoryUsage", true, true);
114
3
    _add_batch_timer = ADD_TIMER(_profile, "AddBatchTime");
115
3
    _write_block_timer = ADD_TIMER(_profile, "WriteBlockTime");
116
3
    _incremental_open_timer = ADD_TIMER(_profile, "IncrementalOpenTabletTime");
117
3
    _memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Total", TUnit::BYTES);
118
3
    _write_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Write", TUnit::BYTES);
119
3
    _flush_memory_usage_counter = memory_usage->AddHighWaterMarkCounter("Flush", TUnit::BYTES);
120
3
    _max_tablet_memory_usage_counter =
121
3
            memory_usage->AddHighWaterMarkCounter("MaxTablet", TUnit::BYTES);
122
3
    _max_tablet_write_memory_usage_counter =
123
3
            memory_usage->AddHighWaterMarkCounter("MaxTabletWrite", TUnit::BYTES);
124
3
    _max_tablet_flush_memory_usage_counter =
125
3
            memory_usage->AddHighWaterMarkCounter("MaxTabletFlush", TUnit::BYTES);
126
3
}
127
128
0
void TabletsChannel::_init_profile(RuntimeProfile* profile) {
129
0
    DCHECK(profile != nullptr);
130
0
    BaseTabletsChannel::_init_profile(profile);
131
0
    _slave_replica_timer = ADD_TIMER(_profile, "SlaveReplicaTime");
132
0
}
133
134
55.4k
Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) {
135
55.4k
    std::lock_guard<std::mutex> l(_lock);
136
    // if _state is kOpened, it's a normal case, already open by other sender
137
    // if _state is kFinished, already cancelled by other sender
138
55.4k
    if (_state == kOpened || _state == kFinished) {
139
25.3k
        return Status::OK();
140
25.3k
    }
141
30.0k
    _txn_id = request.txn_id();
142
30.0k
    _index_id = request.index_id();
143
30.0k
    _schema = std::make_shared<OlapTableSchemaParam>();
144
30.0k
    RETURN_IF_ERROR(_schema->init(request.schema()));
145
30.0k
    _tuple_desc = _schema->tuple_desc();
146
147
30.0k
    int max_sender = request.num_senders();
148
    /*
149
     * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none.
150
     * there are two possibilities:
151
     *  1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be 
152
     *     called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never 
153
     *     be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc,
154
     *     the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and
155
     *     return together to avoid close-then-incremental-open problem.
156
     *  2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition
157
     *     (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data
158
     *     distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true.
159
     *     then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic
160
     *     and also need same number of senders' close to close. but will not hang.
161
     */
162
30.0k
    if (_open_by_incremental) {
163
0
        DCHECK(_num_remaining_senders == 0) << _num_remaining_senders;
164
30.0k
    } else {
165
30.0k
        _num_remaining_senders = max_sender;
166
30.0k
    }
167
30.0k
    LOG(INFO) << fmt::format(
168
30.0k
            "open tablets channel {}, tablets num: {} timeout(s): {}, init senders {} with "
169
30.0k
            "incremental {}",
170
30.0k
            _key.to_string(), request.tablets().size(), request.load_channel_timeout_s(),
171
30.0k
            _num_remaining_senders, _open_by_incremental ? "on" : "off");
172
    // just use max_sender no matter incremental or not cuz we dont know how many senders will open.
173
30.0k
    _next_seqs.resize(max_sender, 0);
174
30.0k
    _closed_senders.Reset(max_sender);
175
176
30.0k
    RETURN_IF_ERROR(_open_all_writers(request));
177
178
30.0k
    _state = kOpened;
179
30.0k
    return Status::OK();
180
30.0k
}
181
182
181
Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) {
183
181
    SCOPED_TIMER(_incremental_open_timer);
184
185
    // current node first opened by incremental open
186
181
    if (_state == kInitialized) {
187
0
        _open_by_incremental = true;
188
0
        RETURN_IF_ERROR(open(params));
189
0
    }
190
191
181
    std::lock_guard<std::mutex> l(_lock);
192
193
    // one sender may incremental_open many times. but only close one time. so dont count duplicately.
194
181
    if (_open_by_incremental) {
195
0
        if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) {
196
0
            _recieved_senders.insert(params.sender_id());
197
0
            _num_remaining_senders++;
198
0
        } else if (!params.has_sender_id()) { // for compatible
199
0
            _num_remaining_senders++;
200
0
        }
201
0
        VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id,
202
0
                                  _num_remaining_senders);
203
0
    }
204
205
181
    std::vector<SlotDescriptor*>* index_slots = nullptr;
206
181
    int32_t schema_hash = 0;
207
208
181
    for (const auto& index : _schema->indexes()) {
209
181
        if (index->index_id == _index_id) {
210
181
            index_slots = &index->slots;
211
181
            schema_hash = index->schema_hash;
212
181
            break;
213
181
        }
214
181
    }
215
181
    if (index_slots == nullptr) {
216
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
217
0
    }
218
    // update tablets
219
181
    size_t incremental_tablet_num = 0;
220
181
    std::stringstream ss;
221
181
    ss << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(params.id())
222
181
       << " incremental open delta writer: ";
223
224
    // every change will hold _lock. this find in under _lock too. so no need _tablet_writers_lock again.
225
6.57k
    for (const auto& tablet : params.tablets()) {
226
6.57k
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
227
96
            continue;
228
96
        }
229
6.48k
        incremental_tablet_num++;
230
231
6.48k
        WriteRequest wrequest;
232
6.48k
        wrequest.index_id = params.index_id();
233
6.48k
        wrequest.tablet_id = tablet.tablet_id();
234
6.48k
        wrequest.schema_hash = schema_hash;
235
6.48k
        wrequest.txn_id = _txn_id;
236
6.48k
        wrequest.partition_id = tablet.partition_id();
237
6.48k
        wrequest.load_id = params.id();
238
6.48k
        wrequest.tuple_desc = _tuple_desc;
239
6.48k
        wrequest.slots = index_slots;
240
6.48k
        wrequest.is_high_priority = _is_high_priority;
241
6.48k
        wrequest.table_schema_param = _schema;
242
6.48k
        wrequest.txn_expiration = params.txn_expiration(); // Required by CLOUD.
243
6.48k
        wrequest.storage_vault_id = params.storage_vault_id();
244
245
6.48k
        auto delta_writer = create_delta_writer(wrequest);
246
6.48k
        {
247
            // here we modify _tablet_writers. so need lock.
248
6.48k
            std::lock_guard<std::mutex> lt(_tablet_writers_lock);
249
6.48k
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
250
6.48k
        }
251
252
6.48k
        ss << "[" << tablet.tablet_id() << "]";
253
6.48k
    }
254
255
181
    _s_tablet_writer_count += incremental_tablet_num;
256
181
    LOG(INFO) << ss.str();
257
258
181
    _state = kOpened;
259
181
    return Status::OK();
260
181
}
261
262
3.33k
std::unique_ptr<BaseDeltaWriter> TabletsChannel::create_delta_writer(const WriteRequest& request) {
263
3.33k
    return std::make_unique<DeltaWriter>(_engine, request, _profile, _load_id);
264
3.33k
}
265
266
Status TabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
267
1.12k
                             PTabletWriterAddBlockResult* res, bool* finished) {
268
1.12k
    int sender_id = req.sender_id();
269
1.12k
    int64_t backend_id = req.backend_id();
270
1.12k
    const auto& partition_ids = req.partition_ids();
271
1.12k
    auto* tablet_errors = res->mutable_tablet_errors();
272
1.12k
    std::lock_guard<std::mutex> l(_lock);
273
1.12k
    if (_state == kFinished) {
274
0
        return _close_status;
275
0
    }
276
1.12k
    if (_closed_senders.Get(sender_id)) {
277
        // Double close from one sender, just return OK
278
0
        *finished = (_num_remaining_senders == 0);
279
0
        return _close_status;
280
0
    }
281
282
1.12k
    for (auto pid : partition_ids) {
283
396
        _partition_ids.emplace(pid);
284
396
    }
285
1.12k
    _closed_senders.Set(sender_id, true);
286
1.12k
    _num_remaining_senders--;
287
1.12k
    *finished = (_num_remaining_senders == 0);
288
289
1.12k
    LOG(INFO) << fmt::format(
290
1.12k
            "txn {}: close tablets channel of index {} , sender id: {}, backend {}, remain "
291
1.12k
            "senders: {}",
292
1.12k
            _txn_id, _index_id, sender_id, backend_id, _num_remaining_senders);
293
294
1.12k
    if (!*finished) {
295
658
        return Status::OK();
296
658
    }
297
298
462
    _state = kFinished;
299
    // All senders are closed
300
    // 1. close all delta writers
301
462
    std::set<DeltaWriter*> need_wait_writers;
302
    // under _lock. no need _tablet_writers_lock again.
303
3.33k
    for (auto&& [tablet_id, writer] : _tablet_writers) {
304
3.33k
        if (_partition_ids.contains(writer->partition_id())) {
305
2.32k
            auto st = writer->close();
306
2.32k
            if (!st.ok()) {
307
0
                auto err_msg = fmt::format(
308
0
                        "close tablet writer failed, tablet_id={}, "
309
0
                        "transaction_id={}, err={}",
310
0
                        tablet_id, _txn_id, st.to_string());
311
0
                LOG(WARNING) << err_msg;
312
0
                PTabletError* tablet_error = tablet_errors->Add();
313
0
                tablet_error->set_tablet_id(tablet_id);
314
0
                tablet_error->set_msg(st.to_string());
315
                // just skip this tablet(writer) and continue to close others
316
0
                continue;
317
0
            }
318
            // tablet writer in `_broken_tablets` should not call `build_rowset` and
319
            // `commit_txn` method, after that, the publish-version task will success,
320
            // which can cause the replica inconsistency.
321
2.32k
            if (_is_broken_tablet(writer->tablet_id())) {
322
0
                LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
323
0
                             << ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id;
324
0
                continue;
325
0
            }
326
2.32k
            need_wait_writers.insert(static_cast<DeltaWriter*>(writer.get()));
327
2.32k
        } else {
328
1.01k
            auto st = writer->cancel();
329
1.01k
            if (!st.ok()) {
330
0
                LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id
331
0
                             << ", transaction_id=" << _txn_id;
332
                // just skip this tablet(writer) and continue to close others
333
0
                continue;
334
0
            }
335
1.01k
            VLOG_PROGRESS << "cancel tablet writer successfully, tablet_id=" << tablet_id
336
0
                          << ", transaction_id=" << _txn_id;
337
1.01k
        }
338
3.33k
    }
339
340
462
    _write_single_replica = req.write_single_replica();
341
342
    // 2. wait all writer finished flush.
343
2.32k
    for (auto* writer : need_wait_writers) {
344
2.32k
        RETURN_IF_ERROR((writer->wait_flush()));
345
2.32k
    }
346
347
    // 3. build rowset
348
2.78k
    for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
349
2.32k
        Status st = (*it)->build_rowset();
350
2.32k
        if (!st.ok()) {
351
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
352
0
            it = need_wait_writers.erase(it);
353
0
            continue;
354
0
        }
355
        // 3.1 calculate delete bitmap for Unique Key MoW tables
356
2.32k
        st = (*it)->submit_calc_delete_bitmap_task();
357
2.32k
        if (!st.ok()) {
358
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
359
0
            it = need_wait_writers.erase(it);
360
0
            continue;
361
0
        }
362
2.32k
        it++;
363
2.32k
    }
364
365
    // 4. wait for delete bitmap calculation complete if necessary
366
2.78k
    for (auto it = need_wait_writers.begin(); it != need_wait_writers.end();) {
367
2.32k
        Status st = (*it)->wait_calc_delete_bitmap();
368
2.32k
        if (!st.ok()) {
369
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
370
0
            it = need_wait_writers.erase(it);
371
0
            continue;
372
0
        }
373
2.32k
        it++;
374
2.32k
    }
375
376
    // 5. commit all writers
377
378
2.32k
    for (auto* writer : need_wait_writers) {
379
        // close may return failed, but no need to handle it here.
380
        // tablet_vec will only contains success tablet, and then let FE judge it.
381
2.32k
        _commit_txn(writer, req, res);
382
2.32k
    }
383
384
462
    if (_write_single_replica) {
385
0
        auto* success_slave_tablet_node_ids = res->mutable_success_slave_tablet_node_ids();
386
        // The operation waiting for all slave replicas to complete must end before the timeout,
387
        // so that there is enough time to collect completed replica. Otherwise, the task may
388
        // timeout and fail even though most of the replicas are completed. Here we set 0.9
389
        // times the timeout as the maximum waiting time.
390
0
        SCOPED_TIMER(_slave_replica_timer);
391
0
        while (!need_wait_writers.empty() &&
392
0
               (time(nullptr) - parent->last_updated_time()) < (parent->timeout() * 0.9)) {
393
0
            std::set<DeltaWriter*>::iterator it;
394
0
            for (it = need_wait_writers.begin(); it != need_wait_writers.end();) {
395
0
                bool is_done = (*it)->check_slave_replicas_done(success_slave_tablet_node_ids);
396
0
                if (is_done) {
397
0
                    need_wait_writers.erase(it++);
398
0
                } else {
399
0
                    it++;
400
0
                }
401
0
            }
402
0
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
403
0
        }
404
0
        for (auto* writer : need_wait_writers) {
405
0
            writer->add_finished_slave_replicas(success_slave_tablet_node_ids);
406
0
        }
407
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
408
0
    }
409
410
462
    return Status::OK();
411
462
}
412
413
void TabletsChannel::_commit_txn(DeltaWriter* writer, const PTabletWriterAddBlockRequest& req,
414
2.32k
                                 PTabletWriterAddBlockResult* res) {
415
2.32k
    PSlaveTabletNodes slave_nodes;
416
2.32k
    if (_write_single_replica) {
417
0
        auto& nodes_map = req.slave_tablet_nodes();
418
0
        auto it = nodes_map.find(writer->tablet_id());
419
0
        if (it != nodes_map.end()) {
420
0
            slave_nodes = it->second;
421
0
        }
422
0
    }
423
2.32k
    Status st = writer->commit_txn(slave_nodes);
424
2.32k
    if (st.ok()) [[likely]] {
425
2.32k
        auto* tablet_vec = res->mutable_tablet_vec();
426
2.32k
        PTabletInfo* tablet_info = tablet_vec->Add();
427
2.32k
        tablet_info->set_tablet_id(writer->tablet_id());
428
        // unused required field.
429
2.32k
        tablet_info->set_schema_hash(0);
430
2.32k
        tablet_info->set_received_rows(writer->total_received_rows());
431
2.32k
        tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
432
2.32k
        _total_received_rows += writer->total_received_rows();
433
2.32k
        _num_rows_filtered += writer->num_rows_filtered();
434
2.32k
    } else {
435
0
        _add_error_tablet(res->mutable_tablet_errors(), writer->tablet_id(), st);
436
0
    }
437
2.32k
}
438
439
void BaseTabletsChannel::_add_error_tablet(
440
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors, int64_t tablet_id,
441
0
        Status error) const {
442
0
    PTabletError* tablet_error = tablet_errors->Add();
443
0
    tablet_error->set_tablet_id(tablet_id);
444
0
    tablet_error->set_msg(error.to_string());
445
0
    VLOG_PROGRESS << "close wait failed tablet " << tablet_id << " transaction_id " << _txn_id
446
0
                  << "err msg " << error;
447
0
}
448
449
18
void BaseTabletsChannel::refresh_profile() {
450
18
    int64_t write_mem_usage = 0;
451
18
    int64_t flush_mem_usage = 0;
452
18
    int64_t max_tablet_mem_usage = 0;
453
18
    int64_t max_tablet_write_mem_usage = 0;
454
18
    int64_t max_tablet_flush_mem_usage = 0;
455
18
    {
456
18
        std::lock_guard<std::mutex> l(_tablet_writers_lock);
457
150
        for (auto&& [tablet_id, writer] : _tablet_writers) {
458
150
            int64_t write_mem = writer->mem_consumption(MemType::WRITE_FINISHED);
459
150
            write_mem_usage += write_mem;
460
150
            int64_t flush_mem = writer->mem_consumption(MemType::FLUSH);
461
150
            flush_mem_usage += flush_mem;
462
150
            if (write_mem > max_tablet_write_mem_usage) {
463
0
                max_tablet_write_mem_usage = write_mem;
464
0
            }
465
150
            if (flush_mem > max_tablet_flush_mem_usage) {
466
0
                max_tablet_flush_mem_usage = flush_mem;
467
0
            }
468
150
            if (write_mem + flush_mem > max_tablet_mem_usage) {
469
0
                max_tablet_mem_usage = write_mem + flush_mem;
470
0
            }
471
150
        }
472
18
    }
473
18
    COUNTER_SET(_memory_usage_counter, write_mem_usage + flush_mem_usage);
474
18
    COUNTER_SET(_write_memory_usage_counter, write_mem_usage);
475
18
    COUNTER_SET(_flush_memory_usage_counter, flush_mem_usage);
476
18
    COUNTER_SET(_max_tablet_memory_usage_counter, max_tablet_mem_usage);
477
18
    COUNTER_SET(_max_tablet_write_memory_usage_counter, max_tablet_write_mem_usage);
478
18
    COUNTER_SET(_max_tablet_flush_memory_usage_counter, max_tablet_flush_mem_usage);
479
18
}
480
481
30.0k
Status BaseTabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
482
30.0k
    std::vector<SlotDescriptor*>* index_slots = nullptr;
483
30.0k
    int32_t schema_hash = 0;
484
34.0k
    for (const auto& index : _schema->indexes()) {
485
34.0k
        if (index->index_id == _index_id) {
486
30.0k
            index_slots = &index->slots;
487
30.0k
            schema_hash = index->schema_hash;
488
30.0k
            break;
489
30.0k
        }
490
34.0k
    }
491
30.0k
    if (index_slots == nullptr) {
492
0
        return Status::InternalError("unknown index id, key={}", _key.to_string());
493
0
    }
494
495
#ifdef DEBUG
496
    // check: tablet ids should be unique
497
    {
498
        std::unordered_set<int64_t> tablet_ids;
499
        for (const auto& tablet : request.tablets()) {
500
            CHECK(tablet_ids.count(tablet.tablet_id()) == 0)
501
                    << "found duplicate tablet id: " << tablet.tablet_id();
502
            tablet_ids.insert(tablet.tablet_id());
503
        }
504
    }
505
#endif
506
507
30.0k
    int tablet_cnt = 0;
508
    // under _lock. no need _tablet_writers_lock again.
509
258k
    for (const auto& tablet : request.tablets()) {
510
258k
        if (_tablet_writers.find(tablet.tablet_id()) != _tablet_writers.end()) {
511
0
            continue;
512
0
        }
513
258k
        tablet_cnt++;
514
258k
        WriteRequest wrequest {
515
258k
                .tablet_id = tablet.tablet_id(),
516
258k
                .schema_hash = schema_hash,
517
258k
                .txn_id = _txn_id,
518
258k
                .txn_expiration = request.txn_expiration(), // Required by CLOUD.
519
258k
                .index_id = request.index_id(),
520
258k
                .partition_id = tablet.partition_id(),
521
258k
                .load_id = request.id(),
522
258k
                .tuple_desc = _tuple_desc,
523
258k
                .slots = index_slots,
524
258k
                .table_schema_param = _schema,
525
258k
                .is_high_priority = _is_high_priority,
526
258k
                .write_file_cache = request.write_file_cache(),
527
258k
                .storage_vault_id = request.storage_vault_id(),
528
258k
        };
529
530
258k
        auto delta_writer = create_delta_writer(wrequest);
531
258k
        {
532
258k
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
533
258k
            _tablet_writers.emplace(tablet.tablet_id(), std::move(delta_writer));
534
258k
        }
535
258k
    }
536
30.0k
    _s_tablet_writer_count += _tablet_writers.size();
537
30.0k
    DCHECK_EQ(_tablet_writers.size(), tablet_cnt);
538
30.0k
    return Status::OK();
539
30.0k
}
540
541
130
Status BaseTabletsChannel::cancel() {
542
130
    std::lock_guard<std::mutex> l(_lock);
543
130
    if (_state == kFinished) {
544
94
        return _close_status;
545
94
    }
546
1.44k
    for (auto& it : _tablet_writers) {
547
1.44k
        static_cast<void>(it.second->cancel());
548
1.44k
    }
549
36
    _state = kFinished;
550
551
36
    return Status::OK();
552
130
}
553
554
0
Status TabletsChannel::cancel() {
555
0
    RETURN_IF_ERROR(BaseTabletsChannel::cancel());
556
0
    if (_write_single_replica) {
557
0
        _engine.txn_manager()->clear_txn_tablet_delta_writer(_txn_id);
558
0
    }
559
0
    return Status::OK();
560
0
}
561
562
30.0k
std::string TabletsChannelKey::to_string() const {
563
30.0k
    std::stringstream ss;
564
30.0k
    ss << *this;
565
30.0k
    return ss.str();
566
30.0k
}
567
568
84.2k
std::ostream& operator<<(std::ostream& os, const TabletsChannelKey& key) {
569
84.2k
    os << "(load_id=" << key.id << ", index_id=" << key.index_id << ")";
570
84.2k
    return os;
571
84.2k
}
572
573
Status BaseTabletsChannel::_write_block_data(
574
        const PTabletWriterAddBlockRequest& request, int64_t cur_seq,
575
        std::unordered_map<int64_t, DorisVector<uint32_t>>& tablet_to_rowidxs,
576
32.9k
        PTabletWriterAddBlockResult* response) {
577
32.9k
    Block send_data;
578
32.9k
    [[maybe_unused]] size_t uncompressed_size = 0;
579
32.9k
    [[maybe_unused]] int64_t uncompressed_time = 0;
580
32.9k
    RETURN_IF_ERROR(send_data.deserialize(request.block(), &uncompressed_size, &uncompressed_time));
581
18.4E
    CHECK(send_data.rows() == request.tablet_ids_size())
582
18.4E
            << "block rows: " << send_data.rows()
583
18.4E
            << ", tablet_ids_size: " << request.tablet_ids_size();
584
585
32.9k
    g_tablets_channel_send_data_allocated_size << send_data.allocated_bytes();
586
32.9k
    Defer defer {
587
32.9k
            [&]() { g_tablets_channel_send_data_allocated_size << -send_data.allocated_bytes(); }};
588
589
32.9k
    auto write_tablet_data = [&](int64_t tablet_id,
590
115k
                                 std::function<Status(BaseDeltaWriter * writer)> write_func) {
591
115k
        google::protobuf::RepeatedPtrField<PTabletError>* tablet_errors =
592
115k
                response->mutable_tablet_errors();
593
594
        // add_batch may concurrency with inc_open but not under _lock.
595
        // so need to protect it with _tablet_writers_lock.
596
115k
        decltype(_tablet_writers.find(tablet_id)) tablet_writer_it;
597
115k
        {
598
115k
            std::lock_guard<std::mutex> l(_tablet_writers_lock);
599
115k
            tablet_writer_it = _tablet_writers.find(tablet_id);
600
115k
            if (tablet_writer_it == _tablet_writers.end()) {
601
0
                return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
602
0
            }
603
115k
        }
604
605
115k
        Status st = write_func(tablet_writer_it->second.get());
606
115k
        if (!st.ok()) {
607
0
            auto err_msg =
608
0
                    fmt::format("tablet writer write failed, tablet_id={}, txn_id={}, err={}",
609
0
                                tablet_id, _txn_id, st.to_string());
610
0
            LOG(WARNING) << err_msg;
611
0
            PTabletError* error = tablet_errors->Add();
612
0
            error->set_tablet_id(tablet_id);
613
0
            error->set_msg(err_msg);
614
0
            static_cast<void>(tablet_writer_it->second->cancel_with_status(st));
615
0
            _add_broken_tablet(tablet_id);
616
            // continue write to other tablet.
617
            // the error will return back to sender.
618
0
        }
619
115k
        return Status::OK();
620
115k
    };
621
622
32.9k
    SCOPED_TIMER(_write_block_timer);
623
32.9k
    auto* tablet_load_infos = response->mutable_tablet_load_rowset_num_infos();
624
115k
    for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) {
625
115k
        RETURN_IF_ERROR(write_tablet_data(tablet_to_rowidxs_it.first, [&](BaseDeltaWriter* writer) {
626
115k
            return writer->write(&send_data, tablet_to_rowidxs_it.second);
627
115k
        }));
628
629
115k
        auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first);
630
115k
        if (tablet_writer_it != _tablet_writers.end()) {
631
115k
            tablet_writer_it->second->set_tablet_load_rowset_num_info(tablet_load_infos);
632
115k
        }
633
115k
    }
634
635
32.9k
    {
636
32.9k
        std::lock_guard<std::mutex> l(_lock);
637
32.9k
        _next_seqs[request.sender_id()] = cur_seq + 1;
638
32.9k
    }
639
32.9k
    return Status::OK();
640
32.9k
}
641
642
Status TabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
643
396
                                 PTabletWriterAddBlockResult* response) {
644
396
    SCOPED_TIMER(_add_batch_timer);
645
396
    int64_t cur_seq = 0;
646
396
    if (_add_batch_number_counter) {
647
0
        _add_batch_number_counter->update(1);
648
0
    }
649
650
396
    auto status = _get_current_seq(cur_seq, request);
651
396
    if (UNLIKELY(!status.ok())) {
652
0
        return status;
653
0
    }
654
655
396
    if (request.packet_seq() < cur_seq) {
656
0
        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
657
0
                  << ", recept_seq=" << request.packet_seq();
658
0
        return Status::OK();
659
0
    }
660
661
396
    std::unordered_map<int64_t /* tablet_id */, DorisVector<uint32_t> /* row index */>
662
396
            tablet_to_rowidxs;
663
396
    _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
664
665
396
    return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
666
396
}
667
668
0
void BaseTabletsChannel::_add_broken_tablet(int64_t tablet_id) {
669
0
    std::unique_lock<std::shared_mutex> wlock(_broken_tablets_lock);
670
0
    _broken_tablets.insert(tablet_id);
671
0
}
672
673
35.7M
bool BaseTabletsChannel::_is_broken_tablet(int64_t tablet_id) const {
674
35.7M
    return _broken_tablets.find(tablet_id) != _broken_tablets.end();
675
35.7M
}
676
677
void BaseTabletsChannel::_build_tablet_to_rowidxs(
678
        const PTabletWriterAddBlockRequest& request,
679
32.9k
        std::unordered_map<int64_t, DorisVector<uint32_t>>* tablet_to_rowidxs) {
680
    // just add a coarse-grained read lock here rather than each time when visiting _broken_tablets
681
    // tests show that a relatively coarse-grained read lock here performs better under multicore scenario
682
    // see: https://github.com/apache/doris/pull/28552
683
32.9k
    std::shared_lock<std::shared_mutex> rlock(_broken_tablets_lock);
684
32.9k
    if (request.is_single_tablet_block()) {
685
        // The cloud mode need the tablet ids to prepare rowsets.
686
0
        int64_t tablet_id = request.tablet_ids(0);
687
0
        tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {0});
688
0
        return;
689
0
    }
690
35.6M
    for (uint32_t i = 0; i < request.tablet_ids_size(); ++i) {
691
35.6M
        int64_t tablet_id = request.tablet_ids(i);
692
35.6M
        if (_is_broken_tablet(tablet_id)) {
693
            // skip broken tablets
694
0
            VLOG_PROGRESS << "skip broken tablet tablet=" << tablet_id;
695
0
            continue;
696
0
        }
697
35.6M
        auto it = tablet_to_rowidxs->find(tablet_id);
698
35.6M
        if (it == tablet_to_rowidxs->end()) {
699
115k
            tablet_to_rowidxs->emplace(tablet_id, std::initializer_list<uint32_t> {i});
700
35.5M
        } else {
701
35.5M
            it->second.emplace_back(i);
702
35.5M
        }
703
35.6M
    }
704
32.9k
}
705
706
} // namespace doris