Coverage Report

Created: 2026-05-09 01:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_channel.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_channel.h"
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <glog/logging.h>
22
23
#include "cloud/cloud_tablets_channel.h"
24
#include "cloud/config.h"
25
#include "common/logging.h"
26
#include "load/channel/tablets_channel.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/fragment_mgr.h"
29
#include "runtime/memory/mem_tracker.h"
30
#include "runtime/thread_context.h"
31
#include "runtime/workload_group/workload_group_manager.h"
32
#include "storage/storage_engine.h"
33
#include "util/debug_points.h"
34
35
namespace doris {
36
37
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");
38
39
LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
40
                         std::string sender_ip, int64_t backend_id, bool enable_profile,
41
                         int64_t wg_id)
42
29.2k
        : _load_id(load_id),
43
29.2k
          _timeout_s(timeout_s),
44
29.2k
          _is_high_priority(is_high_priority),
45
29.2k
          _sender_ip(std::move(sender_ip)),
46
29.2k
          _backend_id(backend_id),
47
29.2k
          _enable_profile(enable_profile) {
48
29.2k
    std::shared_ptr<QueryContext> query_context =
49
29.2k
            ExecEnv::GetInstance()->fragment_mgr()->get_query_ctx(_load_id.to_thrift());
50
51
29.2k
    if (query_context != nullptr) {
52
29.2k
        _resource_ctx = query_context->resource_ctx();
53
29.2k
    } else {
54
3
        _resource_ctx = ResourceContext::create_shared();
55
3
        _resource_ctx->task_controller()->set_task_id(_load_id.to_thrift());
56
        // when memtable on sink is not enabled, load can not find queryctx
57
3
        std::shared_ptr<MemTrackerLimiter> mem_tracker = MemTrackerLimiter::create_shared(
58
3
                MemTrackerLimiter::Type::LOAD,
59
3
                fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()));
60
3
        _resource_ctx->memory_context()->set_mem_tracker(mem_tracker);
61
3
        WorkloadGroupPtr wg_ptr = nullptr;
62
3
        if (wg_id > 0) {
63
3
            std::vector<uint64_t> id_set;
64
3
            id_set.push_back(wg_id);
65
3
            wg_ptr = ExecEnv::GetInstance()->workload_group_mgr()->get_group(id_set);
66
3
            _resource_ctx->set_workload_group(wg_ptr);
67
3
        }
68
3
    }
69
70
29.2k
    g_loadchannel_cnt << 1;
71
    // _last_updated_time should be set before being inserted to
72
    // _load_channels in load_channel_mgr, or it may be erased
73
    // immediately by gc thread.
74
29.2k
    _last_updated_time.store(time(nullptr));
75
29.2k
    if (enable_profile) {
76
3
        _init_profile();
77
3
    }
78
29.2k
}
79
80
29.2k
LoadChannel::~LoadChannel() {
81
29.2k
    g_loadchannel_cnt << -1;
82
29.2k
    std::stringstream rows_str;
83
29.9k
    for (const auto& entry : _tablets_channels_rows) {
84
29.9k
        rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first
85
29.9k
                 << ", num_rows_filtered: " << entry.second.second;
86
29.9k
    }
87
29.2k
    LOG(INFO) << "load channel removed"
88
29.2k
              << " load_id=" << _load_id << ", is high priority=" << _is_high_priority
89
29.2k
              << ", sender_ip=" << _sender_ip << rows_str.str();
90
29.2k
}
91
92
3
void LoadChannel::_init_profile() {
93
3
    DCHECK(_enable_profile);
94
3
    _profile = std::make_unique<RuntimeProfile>("LoadChannels");
95
3
    _mgr_add_batch_timer = ADD_TIMER(_profile, "LoadChannelMgrAddBatchTime");
96
3
    _handle_mem_limit_timer = ADD_TIMER(_profile, "HandleMemLimitTime");
97
3
    _self_profile =
98
3
            _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})",
99
3
                                               _load_id.to_string(), _sender_ip, _backend_id),
100
3
                                   true, true);
101
3
    _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT);
102
3
    _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
103
3
    _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime");
104
3
    _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT);
105
3
}
106
107
55.6k
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
108
55.6k
    if (config::is_cloud_mode() && params.txn_expiration() <= 0) {
109
0
        return Status::InternalError(
110
0
                "The txn expiration of PTabletWriterOpenRequest is invalid, value={}",
111
0
                params.txn_expiration());
112
0
    }
113
55.6k
    if (_resource_ctx->workload_group() != nullptr) {
114
55.6k
        RETURN_IF_ERROR(_resource_ctx->workload_group()->add_resource_ctx(
115
55.6k
                _resource_ctx->task_controller()->task_id(), _resource_ctx));
116
55.6k
    }
117
55.6k
    SCOPED_ATTACH_TASK(_resource_ctx);
118
119
55.6k
    int64_t index_id = params.index_id();
120
55.6k
    std::shared_ptr<BaseTabletsChannel> channel;
121
55.6k
    {
122
55.6k
        std::lock_guard<std::mutex> l(_lock);
123
55.6k
        auto it = _tablets_channels.find(index_id);
124
55.6k
        if (it != _tablets_channels.end()) {
125
25.5k
            channel = it->second;
126
30.0k
        } else {
127
            // just for VLOG
128
30.0k
            if (_txn_id == 0) [[unlikely]] {
129
29.2k
                _txn_id = params.txn_id();
130
29.2k
            }
131
            // create a new tablets channel
132
30.0k
            TabletsChannelKey key(params.id(), index_id);
133
30.0k
            BaseStorageEngine& engine = ExecEnv::GetInstance()->storage_engine();
134
30.0k
            if (config::is_cloud_mode()) {
135
29.5k
                channel = std::make_shared<CloudTabletsChannel>(engine.to_cloud(), key, _load_id,
136
29.5k
                                                                _is_high_priority, _self_profile);
137
29.5k
            } else {
138
463
                channel = std::make_shared<TabletsChannel>(engine.to_local(), key, _load_id,
139
463
                                                           _is_high_priority, _self_profile);
140
463
            }
141
30.0k
            {
142
30.0k
                std::lock_guard<std::mutex> lt(_tablets_channels_lock);
143
30.0k
                _tablets_channels.insert({index_id, channel});
144
30.0k
            }
145
30.0k
        }
146
55.6k
    }
147
148
55.6k
    if (params.is_incremental()) {
149
        // incremental open would ensure not to open tablet repeatedly
150
181
        RETURN_IF_ERROR(channel->incremental_open(params));
151
55.4k
    } else {
152
55.4k
        RETURN_IF_ERROR(channel->open(params));
153
55.4k
    }
154
155
55.6k
    _opened = true;
156
55.6k
    _last_updated_time.store(time(nullptr));
157
55.6k
    return Status::OK();
158
55.6k
}
159
160
Status LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel,
161
57.6k
                                         bool& is_finished, const int64_t index_id) {
162
57.6k
    std::lock_guard<std::mutex> l(_lock);
163
57.6k
    auto it = _tablets_channels.find(index_id);
164
57.6k
    if (it == _tablets_channels.end()) {
165
0
        if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) {
166
            // this channel is already finished, just return OK
167
0
            is_finished = true;
168
0
            return Status::OK();
169
0
        }
170
0
        std::stringstream ss;
171
0
        ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id;
172
0
        return Status::InternalError(ss.str());
173
0
    }
174
175
57.6k
    is_finished = false;
176
57.6k
    channel = it->second;
177
57.6k
    return Status::OK();
178
57.6k
}
179
180
Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
181
57.6k
                              PTabletWriterAddBlockResult* response) {
182
57.6k
    DBUG_EXECUTE_IF("LoadChannel.add_batch.failed",
183
57.6k
                    { return Status::InternalError("fault injection"); });
184
57.6k
    SCOPED_TIMER(_add_batch_timer);
185
57.6k
    if (_enable_profile) {
186
21
        COUNTER_UPDATE(_add_batch_times, 1);
187
21
    }
188
57.6k
    SCOPED_ATTACH_TASK(_resource_ctx);
189
57.6k
    int64_t index_id = request.index_id();
190
    // 1. get tablets channel
191
57.6k
    std::shared_ptr<BaseTabletsChannel> channel;
192
57.6k
    bool is_finished = false;
193
57.6k
    Status st = _get_tablets_channel(channel, is_finished, index_id);
194
57.7k
    if (!st.ok() || is_finished) {
195
0
        return st;
196
0
    }
197
198
    // 2. add block to tablets channel
199
57.6k
    if (request.has_block()) {
200
32.9k
        RETURN_IF_ERROR(channel->add_batch(request, response));
201
32.9k
        if (_enable_profile) {
202
4
            _add_batch_number_counter->update(1);
203
4
        }
204
32.9k
    }
205
206
    // 3. handle eos
207
    // if channel is incremental, maybe hang on close until all close request arrived.
208
57.7k
    if (request.has_eos() && request.eos()) {
209
55.3k
        st = _handle_eos(channel.get(), request, response);
210
55.3k
        _report_profile(response);
211
55.3k
        if (!st.ok()) {
212
47
            return st;
213
47
        }
214
55.3k
    } else if (_enable_profile && _add_batch_number_counter->value() % 100 == 1) {
215
0
        _report_profile(response);
216
0
    }
217
57.6k
    _last_updated_time.store(time(nullptr));
218
57.6k
    return st;
219
57.6k
}
220
221
Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
222
                                const PTabletWriterAddBlockRequest& request,
223
55.3k
                                PTabletWriterAddBlockResult* response) {
224
55.3k
    if (_enable_profile) {
225
21
        _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id()));
226
21
    }
227
55.3k
    bool finished = false;
228
55.3k
    auto index_id = request.index_id();
229
230
55.3k
    RETURN_IF_ERROR(channel->close(this, request, response, &finished));
231
232
    // for init node, we close waiting(hang on) all close request and let them return together.
233
55.3k
    if (request.has_hang_wait() && request.hang_wait()) {
234
0
        DCHECK(!channel->is_incremental_channel());
235
0
        VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by sender {}", _txn_id,
236
0
                                  request.index_id(), request.sender_id());
237
0
        int count = 0;
238
0
        while (!channel->is_finished()) {
239
0
            bthread_usleep(1000);
240
0
            count++;
241
0
        }
242
        // now maybe finished or cancelled.
243
0
        VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
244
0
        if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec
245
0
            return Status::InternalError("Tablets channel didn't wait all close");
246
0
        }
247
0
    }
248
249
55.2k
    if (finished) {
250
29.9k
        std::lock_guard<std::mutex> l(_lock);
251
29.9k
        {
252
29.9k
            std::lock_guard<std::mutex> lt(_tablets_channels_lock);
253
29.9k
            _tablets_channels_rows.insert(std::make_pair(
254
29.9k
                    index_id,
255
29.9k
                    std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
256
29.9k
            _tablets_channels.erase(index_id);
257
29.9k
        }
258
29.9k
        LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << index_id;
259
29.9k
        _finished_channel_ids.emplace(index_id);
260
29.9k
    }
261
55.2k
    return Status::OK();
262
55.2k
}
263
264
55.3k
void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
265
55.3k
    if (!_enable_profile) {
266
55.3k
        return;
267
55.3k
    }
268
269
    // TabletSink and LoadChannel in BE are M: N relationship,
270
    // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink,
271
    // so usually all LoadChannel runtime profiles are saved on each TabletSink,
272
    // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different,
273
    // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself,
274
    // and ensures to update the latest LoadChannel profile according to the timestamp.
275
21
    _self_profile->set_timestamp(_last_updated_time);
276
277
21
    {
278
21
        std::lock_guard<std::mutex> l(_tablets_channels_lock);
279
21
        for (auto& it : _tablets_channels) {
280
18
            it.second->refresh_profile();
281
18
        }
282
21
    }
283
284
21
    TRuntimeProfileTree tprofile;
285
21
    ThriftSerializer ser(false, 4096);
286
21
    uint8_t* buf = nullptr;
287
21
    uint32_t len = 0;
288
21
    std::lock_guard<std::mutex> l(_profile_serialize_lock);
289
21
    _profile->to_thrift(&tprofile);
290
21
    auto st = ser.serialize(&tprofile, &len, &buf);
291
21
    if (st.ok()) {
292
21
        response->set_load_channel_profile(std::string((const char*)buf, len));
293
21
    } else {
294
0
        LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st;
295
0
    }
296
21
}
297
298
57.6k
bool LoadChannel::is_finished() {
299
57.6k
    if (!_opened) {
300
0
        return false;
301
0
    }
302
57.6k
    std::lock_guard<std::mutex> l(_lock);
303
57.6k
    return _tablets_channels.empty();
304
57.6k
}
305
306
130
Status LoadChannel::cancel() {
307
130
    _cancelled.store(true);
308
130
    std::lock_guard<std::mutex> l(_lock);
309
130
    for (auto& it : _tablets_channels) {
310
130
        static_cast<void>(it.second->cancel());
311
130
    }
312
130
    return Status::OK();
313
130
}
314
315
} // namespace doris