Coverage Report

Created: 2024-11-20 10:55

/root/doris/be/src/runtime/load_channel.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/load_channel.h"
19
20
#include <gen_cpp/internal_service.pb.h>
21
#include <glog/logging.h>
22
23
#include "cloud/cloud_tablets_channel.h"
24
#include "cloud/config.h"
25
#include "common/logging.h"
26
#include "olap/storage_engine.h"
27
#include "runtime/exec_env.h"
28
#include "runtime/fragment_mgr.h"
29
#include "runtime/memory/mem_tracker.h"
30
#include "runtime/tablets_channel.h"
31
#include "runtime/thread_context.h"
32
#include "runtime/workload_group/workload_group_manager.h"
33
34
namespace doris {
35
36
bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt");
37
38
LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority,
39
                         std::string sender_ip, int64_t backend_id, bool enable_profile,
40
                         int64_t wg_id)
41
        : _load_id(load_id),
42
          _timeout_s(timeout_s),
43
          _is_high_priority(is_high_priority),
44
          _sender_ip(std::move(sender_ip)),
45
          _backend_id(backend_id),
46
0
          _enable_profile(enable_profile) {
47
0
    std::shared_ptr<QueryContext> query_context =
48
0
            ExecEnv::GetInstance()->fragment_mgr()->get_or_erase_query_ctx_with_lock(
49
0
                    _load_id.to_thrift());
50
0
    std::shared_ptr<MemTrackerLimiter> mem_tracker = nullptr;
51
0
    WorkloadGroupPtr wg_ptr = nullptr;
52
53
0
    if (query_context != nullptr) {
54
0
        mem_tracker = query_context->query_mem_tracker;
55
0
        wg_ptr = query_context->workload_group();
56
0
    } else {
57
        // when memtable on sink is not enabled, load can not find queryctx
58
0
        mem_tracker = MemTrackerLimiter::create_shared(
59
0
                MemTrackerLimiter::Type::LOAD,
60
0
                fmt::format("(FromLoadChannel)Load#Id={}", _load_id.to_string()));
61
0
        if (wg_id > 0) {
62
0
            WorkloadGroupPtr workload_group_ptr =
63
0
                    ExecEnv::GetInstance()->workload_group_mgr()->get_task_group_by_id(wg_id);
64
0
            if (workload_group_ptr) {
65
0
                wg_ptr = workload_group_ptr;
66
0
                wg_ptr->add_mem_tracker_limiter(mem_tracker);
67
0
            }
68
0
        }
69
0
    }
70
0
    _query_thread_context = {_load_id.to_thrift(), mem_tracker, wg_ptr};
71
72
0
    g_loadchannel_cnt << 1;
73
    // _last_updated_time should be set before being inserted to
74
    // _load_channels in load_channel_mgr, or it may be erased
75
    // immediately by gc thread.
76
0
    _last_updated_time.store(time(nullptr));
77
0
    _init_profile();
78
0
}
79
80
0
LoadChannel::~LoadChannel() {
81
0
    g_loadchannel_cnt << -1;
82
0
    std::stringstream rows_str;
83
0
    for (const auto& entry : _tablets_channels_rows) {
84
0
        rows_str << ", index id: " << entry.first << ", total_received_rows: " << entry.second.first
85
0
                 << ", num_rows_filtered: " << entry.second.second;
86
0
    }
87
0
    LOG(INFO) << "load channel removed"
88
0
              << " load_id=" << _load_id << ", is high priority=" << _is_high_priority
89
0
              << ", sender_ip=" << _sender_ip << rows_str.str();
90
0
}
91
92
0
void LoadChannel::_init_profile() {
93
0
    _profile = std::make_unique<RuntimeProfile>("LoadChannels");
94
0
    _mgr_add_batch_timer = ADD_TIMER(_profile, "LoadChannelMgrAddBatchTime");
95
0
    _handle_mem_limit_timer = ADD_TIMER(_profile, "HandleMemLimitTime");
96
0
    _self_profile =
97
0
            _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})",
98
0
                                               _load_id.to_string(), _sender_ip, _backend_id),
99
0
                                   true, true);
100
0
    _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT);
101
0
    _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES);
102
0
    _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime");
103
0
    _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime");
104
0
    _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT);
105
0
}
106
107
0
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
108
0
    if (config::is_cloud_mode() && params.txn_expiration() <= 0) {
109
0
        return Status::InternalError(
110
0
                "The txn expiration of PTabletWriterOpenRequest is invalid, value={}",
111
0
                params.txn_expiration());
112
0
    }
113
0
    SCOPED_ATTACH_TASK(_query_thread_context);
114
115
0
    int64_t index_id = params.index_id();
116
0
    std::shared_ptr<BaseTabletsChannel> channel;
117
0
    {
118
0
        std::lock_guard<std::mutex> l(_lock);
119
0
        auto it = _tablets_channels.find(index_id);
120
0
        if (it != _tablets_channels.end()) {
121
0
            channel = it->second;
122
0
        } else {
123
            // just for VLOG
124
0
            if (_txn_id == 0) [[unlikely]] {
125
0
                _txn_id = params.txn_id();
126
0
            }
127
            // create a new tablets channel
128
0
            TabletsChannelKey key(params.id(), index_id);
129
0
            BaseStorageEngine& engine = ExecEnv::GetInstance()->storage_engine();
130
0
            if (config::is_cloud_mode()) {
131
0
                channel = std::make_shared<CloudTabletsChannel>(engine.to_cloud(), key, _load_id,
132
0
                                                                _is_high_priority, _self_profile);
133
0
            } else {
134
0
                channel = std::make_shared<TabletsChannel>(engine.to_local(), key, _load_id,
135
0
                                                           _is_high_priority, _self_profile);
136
0
            }
137
0
            {
138
0
                std::lock_guard<std::mutex> l(_tablets_channels_lock);
139
0
                _tablets_channels.insert({index_id, channel});
140
0
            }
141
0
        }
142
0
    }
143
144
0
    if (params.is_incremental()) {
145
        // incremental open would ensure not to open tablet repeatedly
146
0
        RETURN_IF_ERROR(channel->incremental_open(params));
147
0
    } else {
148
0
        RETURN_IF_ERROR(channel->open(params));
149
0
    }
150
151
0
    _opened = true;
152
0
    _last_updated_time.store(time(nullptr));
153
0
    return Status::OK();
154
0
}
155
156
Status LoadChannel::_get_tablets_channel(std::shared_ptr<BaseTabletsChannel>& channel,
157
0
                                         bool& is_finished, const int64_t index_id) {
158
0
    std::lock_guard<std::mutex> l(_lock);
159
0
    auto it = _tablets_channels.find(index_id);
160
0
    if (it == _tablets_channels.end()) {
161
0
        if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) {
162
            // this channel is already finished, just return OK
163
0
            is_finished = true;
164
0
            return Status::OK();
165
0
        }
166
0
        std::stringstream ss;
167
0
        ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id;
168
0
        return Status::InternalError(ss.str());
169
0
    }
170
171
0
    is_finished = false;
172
0
    channel = it->second;
173
0
    return Status::OK();
174
0
}
175
176
Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request,
177
0
                              PTabletWriterAddBlockResult* response) {
178
0
    SCOPED_TIMER(_add_batch_timer);
179
0
    COUNTER_UPDATE(_add_batch_times, 1);
180
0
    SCOPED_ATTACH_TASK(_query_thread_context);
181
0
    int64_t index_id = request.index_id();
182
    // 1. get tablets channel
183
0
    std::shared_ptr<BaseTabletsChannel> channel;
184
0
    bool is_finished = false;
185
0
    Status st = _get_tablets_channel(channel, is_finished, index_id);
186
0
    if (!st.ok() || is_finished) {
187
0
        return st;
188
0
    }
189
190
    // 2. add block to tablets channel
191
0
    if (request.has_block()) {
192
0
        RETURN_IF_ERROR(channel->add_batch(request, response));
193
0
        _add_batch_number_counter->update(1);
194
0
    }
195
196
    // 3. handle eos
197
    // if channel is incremental, maybe hang on close until all close request arrived.
198
0
    if (request.has_eos() && request.eos()) {
199
0
        st = _handle_eos(channel.get(), request, response);
200
0
        _report_profile(response);
201
0
        if (!st.ok()) {
202
0
            return st;
203
0
        }
204
0
    } else if (_add_batch_number_counter->value() % 100 == 1) {
205
0
        _report_profile(response);
206
0
    }
207
0
    _last_updated_time.store(time(nullptr));
208
0
    return st;
209
0
}
210
211
Status LoadChannel::_handle_eos(BaseTabletsChannel* channel,
212
                                const PTabletWriterAddBlockRequest& request,
213
0
                                PTabletWriterAddBlockResult* response) {
214
0
    _self_profile->add_info_string("EosHost", fmt::format("{}", request.backend_id()));
215
0
    bool finished = false;
216
0
    auto index_id = request.index_id();
217
218
0
    RETURN_IF_ERROR(channel->close(this, request, response, &finished));
219
220
    // for init node, we close waiting(hang on) all close request and let them return together.
221
0
    if (request.has_hang_wait() && request.hang_wait()) {
222
0
        DCHECK(!channel->is_incremental_channel());
223
0
        VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by sender {}", _txn_id,
224
0
                                  request.index_id(), request.sender_id());
225
0
        int count = 0;
226
0
        while (!channel->is_finished()) {
227
0
            bthread_usleep(1000);
228
0
            count++;
229
0
        }
230
        // now maybe finished or cancelled.
231
0
        VLOG_TRACE << "reciever close wait finished!" << request.sender_id();
232
0
        if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec
233
0
            return Status::InternalError("Tablets channel didn't wait all close");
234
0
        }
235
0
    }
236
237
0
    if (finished) {
238
0
        std::lock_guard<std::mutex> l(_lock);
239
0
        {
240
0
            std::lock_guard<std::mutex> l(_tablets_channels_lock);
241
0
            _tablets_channels_rows.insert(std::make_pair(
242
0
                    index_id,
243
0
                    std::make_pair(channel->total_received_rows(), channel->num_rows_filtered())));
244
0
            _tablets_channels.erase(index_id);
245
0
        }
246
0
        LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << index_id;
247
0
        _finished_channel_ids.emplace(index_id);
248
0
    }
249
0
    return Status::OK();
250
0
}
251
252
0
void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) {
253
0
    if (!_enable_profile) {
254
0
        return;
255
0
    }
256
257
    // TabletSink and LoadChannel in BE are M: N relationship,
258
    // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink,
259
    // so usually all LoadChannel runtime profiles are saved on each TabletSink,
260
    // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different,
261
    // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself,
262
    // and ensures to update the latest LoadChannel profile according to the timestamp.
263
0
    _self_profile->set_timestamp(_last_updated_time);
264
265
0
    {
266
0
        std::lock_guard<std::mutex> l(_tablets_channels_lock);
267
0
        for (auto& it : _tablets_channels) {
268
0
            it.second->refresh_profile();
269
0
        }
270
0
    }
271
272
0
    TRuntimeProfileTree tprofile;
273
0
    ThriftSerializer ser(false, 4096);
274
0
    uint8_t* buf = nullptr;
275
0
    uint32_t len = 0;
276
0
    std::lock_guard<SpinLock> l(_profile_serialize_lock);
277
0
    _profile->to_thrift(&tprofile);
278
0
    auto st = ser.serialize(&tprofile, &len, &buf);
279
0
    if (st.ok()) {
280
0
        response->set_load_channel_profile(std::string((const char*)buf, len));
281
0
    } else {
282
0
        LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st;
283
0
    }
284
0
}
285
286
0
bool LoadChannel::is_finished() {
287
0
    if (!_opened) {
288
0
        return false;
289
0
    }
290
0
    std::lock_guard<std::mutex> l(_lock);
291
0
    return _tablets_channels.empty();
292
0
}
293
294
0
Status LoadChannel::cancel() {
295
0
    std::lock_guard<std::mutex> l(_lock);
296
0
    for (auto& it : _tablets_channels) {
297
0
        static_cast<void>(it.second->cancel());
298
0
    }
299
0
    return Status::OK();
300
0
}
301
302
} // namespace doris