Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/channel/load_channel_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/channel/load_channel_mgr.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include <algorithm>
24
// IWYU pragma: no_include <bits/chrono.h>
25
#include <chrono> // IWYU pragma: keep
26
#include <ctime>
27
#include <memory>
28
#include <ostream>
29
#include <string>
30
#include <vector>
31
32
#include "common/config.h"
33
#include "common/logging.h"
34
#include "common/metrics/doris_metrics.h"
35
#include "common/metrics/metrics.h"
36
#include "load/channel/load_channel.h"
37
#include "runtime/exec_env.h"
38
#include "util/thread.h"
39
40
namespace doris {
41
42
#ifndef BE_TEST
43
constexpr uint32_t START_BG_INTERVAL = 60;
44
#else
45
constexpr uint32_t START_BG_INTERVAL = 1;
46
#endif
47
48
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(load_channel_count, MetricUnit::NOUNIT);
49
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(load_channel_mem_consumption, MetricUnit::BYTES, "",
50
                                   mem_consumption, Labels({{"type", "load"}}));
51
52
28.6k
static int64_t calc_channel_timeout_s(int64_t timeout_in_req_s) {
53
28.6k
    int64_t load_channel_timeout_s = config::streaming_load_rpc_max_alive_time_sec;
54
28.6k
    if (timeout_in_req_s > 0) {
55
28.6k
        load_channel_timeout_s = std::max<int64_t>(load_channel_timeout_s, timeout_in_req_s);
56
28.6k
    }
57
28.6k
    return load_channel_timeout_s;
58
28.6k
}
59
60
7
LoadChannelMgr::LoadChannelMgr() : _stop_background_threads_latch(1) {
61
7
    REGISTER_HOOK_METRIC(load_channel_count, [this]() {
62
        // std::lock_guard<std::mutex> l(_lock);
63
7
        return _load_channels.size();
64
7
    });
65
7
}
66
67
3
void LoadChannelMgr::stop() {
68
3
    DEREGISTER_HOOK_METRIC(load_channel_count);
69
3
    DEREGISTER_HOOK_METRIC(load_channel_mem_consumption);
70
3
    _stop_background_threads_latch.count_down();
71
3
    if (_load_channels_clean_thread) {
72
3
        _load_channels_clean_thread->join();
73
3
    }
74
3
}
75
76
7
Status LoadChannelMgr::init(int64_t process_mem_limit) {
77
7
    _load_state_channels = std::make_unique<LoadStateChannelCache>(1024);
78
7
    RETURN_IF_ERROR(_start_bg_worker());
79
7
    return Status::OK();
80
7
}
81
82
67.9k
Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
83
67.9k
    UniqueId load_id(params.id());
84
67.9k
    std::shared_ptr<LoadChannel> channel;
85
67.9k
    {
86
67.9k
        std::lock_guard<std::mutex> l(_lock);
87
67.9k
        auto it = _load_channels.find(load_id);
88
67.9k
        if (it != _load_channels.end()) {
89
39.2k
            channel = it->second;
90
39.2k
        } else {
91
            // create a new load channel
92
28.6k
            int64_t timeout_in_req_s =
93
18.4E
                    params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1;
94
28.6k
            int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s);
95
28.6k
            bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority());
96
97
28.6k
            int64_t wg_id = -1;
98
28.6k
            if (params.has_workload_group_id()) {
99
28.6k
                wg_id = params.workload_group_id();
100
28.6k
            }
101
28.6k
            channel.reset(new LoadChannel(load_id, channel_timeout_s, is_high_priority,
102
28.6k
                                          params.sender_ip(), params.backend_id(),
103
28.6k
                                          params.enable_profile(), wg_id));
104
28.6k
            _load_channels.insert({load_id, channel});
105
28.6k
        }
106
67.9k
    }
107
108
67.9k
    RETURN_IF_ERROR(channel->open(params));
109
110
67.9k
    return Status::OK();
111
67.9k
}
112
113
Status LoadChannelMgr::_get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof,
114
                                         const UniqueId& load_id,
115
70.0k
                                         const PTabletWriterAddBlockRequest& request) {
116
70.0k
    is_eof = false;
117
70.0k
    std::lock_guard<std::mutex> l(_lock);
118
70.0k
    auto it = _load_channels.find(load_id);
119
70.0k
    if (it == _load_channels.end()) {
120
0
        Cache::Handle* handle = _load_state_channels->lookup(load_id.to_string());
121
0
        if (handle != nullptr) {
122
            // load is cancelled
123
0
            if (auto* value = _load_state_channels->value(handle); value != nullptr) {
124
0
                const auto& cancel_reason = reinterpret_cast<CacheValue*>(value)->_cancel_reason;
125
0
                _load_state_channels->release(handle);
126
0
                if (!cancel_reason.empty()) {
127
0
                    LOG(INFO) << fmt::format(
128
0
                            "The channel has been cancelled, load_id = {}, error = {}",
129
0
                            print_id(load_id), cancel_reason);
130
0
                    return Status::Cancelled(cancel_reason);
131
0
                }
132
0
            } else {
133
                // load is success, success only when eos be true
134
0
                _load_state_channels->release(handle);
135
0
                if (request.has_eos() && request.eos()) {
136
0
                    is_eof = true;
137
0
                    return Status::OK();
138
0
                }
139
0
            }
140
0
        }
141
142
0
        return Status::InternalError<false>(
143
0
                "Fail to add batch in load channel: unknown load_id={}. "
144
0
                "This may be due to a BE restart. Please retry the load.",
145
0
                load_id.to_string());
146
0
    }
147
70.0k
    channel = it->second;
148
70.0k
    return Status::OK();
149
70.0k
}
150
151
Status LoadChannelMgr::add_batch(const PTabletWriterAddBlockRequest& request,
152
70.0k
                                 PTabletWriterAddBlockResult* response) {
153
70.0k
    UniqueId load_id(request.id());
154
    // 1. get load channel
155
70.0k
    std::shared_ptr<LoadChannel> channel;
156
70.0k
    bool is_eof;
157
70.0k
    auto status = _get_load_channel(channel, is_eof, load_id, request);
158
70.0k
    if (!status.ok() || is_eof) {
159
0
        return status;
160
0
    }
161
70.0k
    SCOPED_TIMER(channel->get_mgr_add_batch_timer());
162
163
70.0k
    if (!channel->is_high_priority()) {
164
        // 2. check if mem consumption exceed limit
165
        // If this is a high priority load task, do not handle this.
166
        // because this may block for a while, which may lead to rpc timeout.
167
69.7k
        SCOPED_TIMER(channel->get_handle_mem_limit_timer());
168
69.7k
        ExecEnv::GetInstance()->memtable_memory_limiter()->handle_memtable_flush(
169
69.7k
                [channel]() { return channel->is_cancelled(); });
170
69.7k
        if (channel->is_cancelled()) {
171
0
            return Status::Cancelled("LoadChannel has been cancelled: {}.", load_id.to_string());
172
0
        }
173
69.7k
    }
174
175
    // 3. add batch to load channel
176
    // batch may not exist in request(eg: eos request without batch),
177
    // this case will be handled in load channel's add batch method.
178
70.0k
    Status st = channel->add_batch(request, response);
179
70.0k
    if (UNLIKELY(!st.ok())) {
180
48
        RETURN_IF_ERROR(channel->cancel());
181
48
        return st;
182
48
    }
183
184
    // 4. handle finish
185
70.0k
    if (channel->is_finished()) {
186
28.9k
        _finish_load_channel(load_id);
187
28.9k
    }
188
70.0k
    return Status::OK();
189
70.0k
}
190
191
28.9k
void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) {
192
28.9k
    VLOG_NOTICE << "removing load channel " << load_id << " because it's finished";
193
28.9k
    {
194
28.9k
        std::lock_guard<std::mutex> l(_lock);
195
28.9k
        if (_load_channels.contains(load_id)) {
196
28.6k
            _load_channels.erase(load_id);
197
28.6k
        }
198
28.9k
        auto* handle = _load_state_channels->insert(load_id.to_string(), nullptr, 1, 1);
199
28.9k
        _load_state_channels->release(handle);
200
28.9k
    }
201
18.4E
    VLOG_CRITICAL << "removed load channel " << load_id;
202
28.9k
}
203
204
94
Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
205
94
    UniqueId load_id(params.id());
206
94
    std::shared_ptr<LoadChannel> cancelled_channel;
207
94
    {
208
94
        std::lock_guard<std::mutex> l(_lock);
209
94
        if (_load_channels.contains(load_id)) {
210
78
            cancelled_channel = _load_channels[load_id];
211
78
            _load_channels.erase(load_id);
212
78
        }
213
        // We just need to record the first cancel msg
214
94
        auto* existing_handle = _load_state_channels->lookup(load_id.to_string());
215
94
        if (existing_handle == nullptr) {
216
78
            if (params.has_cancel_reason() && !params.cancel_reason().empty()) {
217
78
                std::unique_ptr<CacheValue> cancel_reason_ptr = std::make_unique<CacheValue>();
218
78
                cancel_reason_ptr->_cancel_reason = params.cancel_reason();
219
78
                size_t cache_capacity =
220
78
                        cancel_reason_ptr->_cancel_reason.capacity() + sizeof(CacheValue);
221
78
                auto* handle = _load_state_channels->insert(
222
78
                        load_id.to_string(), cancel_reason_ptr.get(), 1, cache_capacity);
223
78
                cancel_reason_ptr.release();
224
78
                _load_state_channels->release(handle);
225
78
                LOG(INFO) << fmt::format("load_id = {}, record_error reason = {}",
226
78
                                         print_id(load_id), params.cancel_reason());
227
78
            }
228
78
        } else {
229
16
            _load_state_channels->release(existing_handle);
230
16
        }
231
94
    }
232
233
94
    if (cancelled_channel != nullptr) {
234
78
        RETURN_IF_ERROR(cancelled_channel->cancel());
235
78
        LOG(INFO) << "load channel has been cancelled: " << load_id;
236
78
    }
237
238
94
    return Status::OK();
239
94
}
240
241
7
Status LoadChannelMgr::_start_bg_worker() {
242
7
    RETURN_IF_ERROR(Thread::create(
243
7
            "LoadChannelMgr", "cancel_timeout_load_channels",
244
7
            [this]() {
245
7
                while (!_stop_background_threads_latch.wait_for(
246
7
                        std::chrono::seconds(START_BG_INTERVAL))) {
247
7
                    static_cast<void>(_start_load_channels_clean());
248
7
                }
249
7
            },
250
7
            &_load_channels_clean_thread));
251
252
7
    return Status::OK();
253
7
}
254
255
229
Status LoadChannelMgr::_start_load_channels_clean() {
256
229
    std::vector<std::shared_ptr<LoadChannel>> need_delete_channels;
257
229
    LOG(INFO) << "cleaning timed out load channels";
258
229
    time_t now = time(nullptr);
259
229
    {
260
229
        std::vector<UniqueId> need_delete_channel_ids;
261
229
        std::lock_guard<std::mutex> l(_lock);
262
229
        int i = 0;
263
329
        for (auto& kv : _load_channels) {
264
329
            VLOG_CRITICAL << "load channel[" << i++ << "]: " << *(kv.second);
265
329
            time_t last_updated_time = kv.second->last_updated_time();
266
329
            if (difftime(now, last_updated_time) >= kv.second->timeout()) {
267
0
                need_delete_channel_ids.emplace_back(kv.first);
268
0
                need_delete_channels.emplace_back(kv.second);
269
0
            }
270
329
        }
271
272
229
        for (auto& key : need_delete_channel_ids) {
273
0
            _load_channels.erase(key);
274
0
            LOG(INFO) << "erase timeout load channel: " << key;
275
0
        }
276
229
    }
277
278
    // we must cancel these load channels before destroying them.
279
    // otherwise some object may be invalid before trying to visit it.
280
    // eg: MemTracker in load channel
281
229
    for (auto& channel : need_delete_channels) {
282
0
        RETURN_IF_ERROR(channel->cancel());
283
0
        LOG(INFO) << "load channel has been safely deleted: " << channel->load_id()
284
0
                  << ", timeout(s): " << channel->timeout();
285
0
    }
286
287
229
    return Status::OK();
288
229
}
289
} // namespace doris