Coverage Report

Created: 2026-06-24 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/cloud/cloud_tablets_channel.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "cloud/cloud_tablets_channel.h"
19
20
#include <mutex>
21
22
#include "cloud/cloud_delta_writer.h"
23
#include "cloud/cloud_meta_mgr.h"
24
#include "cloud/cloud_storage_engine.h"
25
#include "cloud/config.h"
26
#include "load/channel/tablets_channel.h"
27
#include "load/delta_writer/delta_writer.h"
28
#include "storage/tablet_info.h"
29
30
namespace doris {
31
32
CloudTabletsChannel::CloudTabletsChannel(CloudStorageEngine& engine, const TabletsChannelKey& key,
33
                                         const UniqueId& load_id, bool is_high_priority,
34
                                         RuntimeProfile* profile)
35
29.9k
        : BaseTabletsChannel(key, load_id, is_high_priority, profile), _engine(engine) {}
36
37
29.9k
CloudTabletsChannel::~CloudTabletsChannel() = default;
38
39
std::unique_ptr<BaseDeltaWriter> CloudTabletsChannel::create_delta_writer(
40
266k
        const WriteRequest& request) {
41
266k
    return std::make_unique<CloudDeltaWriter>(_engine, request, _profile, _load_id);
42
266k
}
43
44
Status CloudTabletsChannel::add_batch(const PTabletWriterAddBlockRequest& request,
45
32.8k
                                      PTabletWriterAddBlockResult* response) {
46
32.8k
    if (_schema != nullptr && _schema->row_binlog_index_schema() != nullptr) {
47
0
        return Status::NotSupported("cloud mode does not support binlog<row> now");
48
0
    }
49
    // FIXME(plat1ko): Too many duplicate code with `TabletsChannel`
50
32.8k
    SCOPED_TIMER(_add_batch_timer);
51
32.8k
    int64_t cur_seq = 0;
52
32.8k
    if (_add_batch_number_counter != nullptr) {
53
3
        _add_batch_number_counter->update(1);
54
3
    }
55
56
32.8k
    auto status = _get_current_seq(cur_seq, request);
57
32.8k
    if (UNLIKELY(!status.ok())) {
58
0
        return status;
59
0
    }
60
61
32.8k
    if (request.packet_seq() < cur_seq) {
62
0
        LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq
63
0
                  << ", recept_seq=" << request.packet_seq();
64
0
        return Status::OK();
65
0
    }
66
67
32.8k
    if (request.is_receiver_side_random_bucket()) {
68
4.27k
        std::unordered_map<int64_t, DorisVector<uint32_t>> partition_to_rowidxs;
69
4.27k
        RETURN_IF_ERROR(_build_partition_to_rowidxs_for_receiver_side_random_bucket(
70
4.27k
                request, &partition_to_rowidxs));
71
4.27k
        if (!partition_to_rowidxs.empty() && !config::skip_writing_empty_rowset_metadata) {
72
0
            std::unordered_set<int64_t> partition_ids;
73
0
            partition_ids.reserve(partition_to_rowidxs.size());
74
0
            for (const auto& [partition_id, _] : partition_to_rowidxs) {
75
0
                partition_ids.insert(partition_id);
76
0
            }
77
0
            {
78
0
                std::lock_guard<std::mutex> l(_tablet_writers_lock);
79
0
                RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
80
0
            }
81
0
        }
82
4.27k
        return _write_block_data_for_receiver_side_random_bucket(request, cur_seq,
83
4.27k
                                                                 partition_to_rowidxs, response);
84
4.27k
    }
85
86
28.5k
    std::unordered_map<int64_t, DorisVector<uint32_t>> tablet_to_rowidxs;
87
28.5k
    _build_tablet_to_rowidxs(request, &tablet_to_rowidxs);
88
89
28.5k
    std::unordered_set<int64_t> partition_ids;
90
28.5k
    std::vector<CloudDeltaWriter*> writers;
91
28.5k
    {
92
        // add_batch may concurrency with inc_open but not under _lock.
93
        // so need to protect it with _tablet_writers_lock.
94
28.5k
        std::lock_guard<std::mutex> l(_tablet_writers_lock);
95
110k
        for (auto& [tablet_id, _] : tablet_to_rowidxs) {
96
110k
            auto tablet_writer_it = _tablet_writers.find(tablet_id);
97
110k
            if (tablet_writer_it == _tablet_writers.end()) {
98
0
                return Status::InternalError("unknown tablet to append data, tablet={}", tablet_id);
99
0
            }
100
110k
            partition_ids.insert(tablet_writer_it->second->partition_id());
101
110k
            writers.push_back(static_cast<CloudDeltaWriter*>(tablet_writer_it->second.get()));
102
110k
        }
103
28.5k
        if (config::skip_writing_empty_rowset_metadata && !writers.empty()) {
104
28.5k
            RETURN_IF_ERROR(CloudDeltaWriter::batch_init(writers));
105
18.4E
        } else if (!partition_ids.empty()) {
106
0
            RETURN_IF_ERROR(_init_writers_by_partition_ids(partition_ids));
107
0
        }
108
28.5k
    }
109
110
28.5k
    return _write_block_data(request, cur_seq, tablet_to_rowidxs, response);
111
28.5k
}
112
113
4.38k
Status CloudTabletsChannel::_prepare_receiver_side_random_bucket_writer(BaseDeltaWriter* writer) {
114
4.38k
    auto* cloud_writer = static_cast<CloudDeltaWriter*>(writer);
115
4.38k
    if (!cloud_writer->is_init()) {
116
3.70k
        return CloudDeltaWriter::batch_init({cloud_writer});
117
3.70k
    }
118
681
    return Status::OK();
119
4.38k
}
120
121
Status CloudTabletsChannel::_init_writers_by_partition_ids(
122
0
        const std::unordered_set<int64_t>& partition_ids) {
123
0
    std::vector<CloudDeltaWriter*> writers;
124
0
    for (auto&& [tablet_id, base_writer] : _tablet_writers) {
125
0
        auto* writer = static_cast<CloudDeltaWriter*>(base_writer.get());
126
0
        if (partition_ids.contains(writer->partition_id()) && !writer->is_init()) {
127
0
            writers.push_back(writer);
128
0
        }
129
0
    }
130
0
    if (!writers.empty()) {
131
0
        RETURN_IF_ERROR(CloudDeltaWriter::batch_init(writers));
132
0
    }
133
0
    return Status::OK();
134
0
}
135
136
Status CloudTabletsChannel::close(LoadChannel* parent, const PTabletWriterAddBlockRequest& req,
137
43.8k
                                  PTabletWriterAddBlockResult* res, bool* finished) {
138
    // FIXME(plat1ko): Too many duplicate code with `TabletsChannel`
139
43.8k
    std::lock_guard l(_lock);
140
43.8k
    if (_state == kFinished) {
141
0
        return _close_status;
142
0
    }
143
144
43.8k
    auto sender_id = req.sender_id();
145
43.8k
    if (_closed_senders.Get(sender_id)) {
146
        // Double close from one sender, just return OK
147
0
        *finished = (_num_remaining_senders == 0);
148
0
        return _close_status;
149
0
    }
150
151
3.21M
    for (auto pid : req.partition_ids()) {
152
3.21M
        _partition_ids.emplace(pid);
153
3.21M
    }
154
155
43.8k
    _closed_senders.Set(sender_id, true);
156
43.8k
    _num_remaining_senders--;
157
43.8k
    *finished = (_num_remaining_senders == 0);
158
159
43.8k
    LOG(INFO) << "close tablets channel: " << _key << ", sender id: " << sender_id
160
43.8k
              << ", backend id: " << req.backend_id()
161
43.8k
              << " remaining sender: " << _num_remaining_senders;
162
163
43.8k
    if (!*finished) {
164
13.9k
        return Status::OK();
165
13.9k
    }
166
167
29.9k
    auto* tablet_errors = res->mutable_tablet_errors();
168
29.9k
    auto* tablet_vec = res->mutable_tablet_vec();
169
29.9k
    _state = kFinished;
170
171
    // All senders are closed
172
    // 1. close all delta writers. under _lock.
173
29.9k
    std::vector<CloudDeltaWriter*> writers_to_commit;
174
29.9k
    writers_to_commit.reserve(_tablet_writers.size());
175
29.9k
    bool success = true;
176
177
265k
    for (auto&& [tablet_id, base_writer] : _tablet_writers) {
178
265k
        auto* writer = static_cast<CloudDeltaWriter*>(base_writer.get());
179
        // ATTN: the strict mode means strict filtering of column type conversions during import.
180
        // Sometimes all inputs are filtered, but the partition ID is still set, and the writer is
181
        // not initialized.
182
265k
        if (_partition_ids.contains(writer->partition_id())) {
183
174k
            if (!success) { // Already failed, cancel all remain writers
184
0
                static_cast<void>(writer->cancel());
185
0
                continue;
186
0
            }
187
188
174k
            if (writer->is_init()) {
189
54.0k
                auto st = writer->close();
190
54.0k
                if (!st.ok()) {
191
0
                    LOG(WARNING) << "close tablet writer failed, tablet_id=" << tablet_id
192
0
                                 << ", txn_id=" << _txn_id << ", err=" << st;
193
0
                    PTabletError* tablet_error = tablet_errors->Add();
194
0
                    tablet_error->set_tablet_id(tablet_id);
195
0
                    tablet_error->set_msg(st.to_string());
196
0
                    success = false;
197
0
                    _close_status = std::move(st);
198
0
                    continue;
199
0
                }
200
54.0k
            }
201
202
            // to make sure tablet writer in `_broken_tablets` won't call `close_wait` method.
203
174k
            if (_is_broken_tablet(writer->tablet_id())) {
204
0
                LOG(WARNING) << "SHOULD NOT HAPPEN, tablet writer is broken but not cancelled"
205
0
                             << ", tablet_id=" << tablet_id << ", transaction_id=" << _txn_id;
206
0
                continue;
207
0
            }
208
209
174k
            writers_to_commit.push_back(writer);
210
174k
        } else {
211
91.0k
            auto st = writer->cancel();
212
91.0k
            if (!st.ok()) {
213
0
                LOG(WARNING) << "cancel tablet writer failed, tablet_id=" << tablet_id
214
0
                             << ", txn_id=" << _txn_id;
215
                // just skip this tablet(writer) and continue to close others
216
0
                continue;
217
0
            }
218
91.0k
        }
219
265k
    }
220
221
29.9k
    if (!success) {
222
0
        return _close_status;
223
0
    }
224
225
    // 2. wait delta writers
226
29.9k
    using namespace std::chrono;
227
29.9k
    auto build_start = steady_clock::now();
228
174k
    for (auto* writer : writers_to_commit) {
229
174k
        if (!writer->is_init()) {
230
120k
            continue;
231
120k
        }
232
233
54.0k
        auto st = writer->build_rowset();
234
54.0k
        if (!st.ok()) {
235
46
            LOG(WARNING) << "failed to close wait DeltaWriter. tablet_id=" << writer->tablet_id()
236
46
                         << ", err=" << st;
237
46
            PTabletError* tablet_error = tablet_errors->Add();
238
46
            tablet_error->set_tablet_id(writer->tablet_id());
239
46
            tablet_error->set_msg(st.to_string());
240
46
            _close_status = std::move(st);
241
46
            return _close_status;
242
46
        }
243
54.0k
    }
244
29.8k
    int64_t build_latency = duration_cast<milliseconds>(steady_clock::now() - build_start).count();
245
246
    // 3. commit rowsets to meta-service
247
29.8k
    auto commit_start = steady_clock::now();
248
29.8k
    std::vector<std::function<Status()>> tasks;
249
29.8k
    tasks.reserve(writers_to_commit.size());
250
174k
    for (auto* writer : writers_to_commit) {
251
174k
        tasks.emplace_back([writer] { return writer->commit_rowset(); });
252
174k
    }
253
29.8k
    _close_status = cloud::bthread_fork_join(tasks, 10);
254
29.8k
    if (!_close_status.ok()) {
255
0
        return _close_status;
256
0
    }
257
258
29.8k
    int64_t commit_latency =
259
29.8k
            duration_cast<milliseconds>(steady_clock::now() - commit_start).count();
260
261
    // 4. calculate delete bitmap for Unique Key MoW tables
262
174k
    for (auto* writer : writers_to_commit) {
263
174k
        auto st = writer->submit_calc_delete_bitmap_task();
264
174k
        if (!st.ok()) {
265
0
            LOG(WARNING) << "failed to close wait DeltaWriter. tablet_id=" << writer->tablet_id()
266
0
                         << ", err=" << st;
267
0
            _add_error_tablet(tablet_errors, writer->tablet_id(), st);
268
0
            _close_status = std::move(st);
269
0
            return _close_status;
270
0
        }
271
174k
    }
272
273
    // 5. wait for delete bitmap calculation complete if necessary
274
174k
    for (auto* writer : writers_to_commit) {
275
174k
        auto st = writer->wait_calc_delete_bitmap();
276
174k
        if (!st.ok()) {
277
0
            LOG(WARNING) << "failed to close wait DeltaWriter. tablet_id=" << writer->tablet_id()
278
0
                         << ", err=" << st;
279
0
            _add_error_tablet(tablet_errors, writer->tablet_id(), st);
280
0
            _close_status = std::move(st);
281
0
            return _close_status;
282
0
        }
283
174k
    }
284
285
    // 6. set txn related info if necessary
286
204k
    for (auto it = writers_to_commit.begin(); it != writers_to_commit.end();) {
287
174k
        auto st = (*it)->set_txn_related_info();
288
174k
        if (!st.ok()) {
289
0
            _add_error_tablet(tablet_errors, (*it)->tablet_id(), st);
290
0
            _close_status = std::move(st);
291
0
            return _close_status;
292
0
        }
293
174k
        it++;
294
174k
    }
295
296
29.8k
    tablet_vec->Reserve(static_cast<int>(writers_to_commit.size()));
297
174k
    for (auto* writer : writers_to_commit) {
298
174k
        PTabletInfo* tablet_info = tablet_vec->Add();
299
174k
        tablet_info->set_tablet_id(writer->tablet_id());
300
        // unused required field.
301
174k
        tablet_info->set_schema_hash(0);
302
174k
        tablet_info->set_received_rows(writer->total_received_rows());
303
174k
        tablet_info->set_num_rows_filtered(writer->num_rows_filtered());
304
        // These stats may be larger than the actual value if the txn is aborted
305
174k
        writer->update_tablet_stats();
306
174k
    }
307
29.8k
    res->set_build_rowset_latency_ms(build_latency);
308
29.8k
    res->set_commit_rowset_latency_ms(commit_latency);
309
29.8k
    return Status::OK();
310
29.8k
}
311
312
} // namespace doris