/root/doris/be/src/runtime/load_channel.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/load_channel.h" |
19 | | |
20 | | #include <gen_cpp/internal_service.pb.h> |
21 | | #include <glog/logging.h> |
22 | | |
23 | | #include "runtime/memory/mem_tracker.h" |
24 | | #include "runtime/tablets_channel.h" |
25 | | |
26 | | namespace doris { |
27 | | |
28 | | LoadChannel::LoadChannel(const UniqueId& load_id, std::unique_ptr<MemTracker> mem_tracker, |
29 | | int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, |
30 | | int64_t backend_id, bool enable_profile) |
31 | | : _load_id(load_id), |
32 | | _mem_tracker(std::move(mem_tracker)), |
33 | | _timeout_s(timeout_s), |
34 | | _is_high_priority(is_high_priority), |
35 | | _sender_ip(sender_ip), |
36 | | _backend_id(backend_id), |
37 | 0 | _enable_profile(enable_profile) { |
38 | | // _last_updated_time should be set before being inserted to |
39 | | // _load_channels in load_channel_mgr, or it may be erased |
40 | | // immediately by gc thread. |
41 | 0 | _last_updated_time.store(time(nullptr)); |
42 | 0 | _init_profile(); |
43 | 0 | } |
44 | | |
45 | 0 | LoadChannel::~LoadChannel() { |
46 | 0 | LOG(INFO) << "load channel removed. mem peak usage=" << _mem_tracker->peak_consumption() |
47 | 0 | << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id |
48 | 0 | << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip; |
49 | 0 | } |
50 | | |
51 | 0 | void LoadChannel::_init_profile() { |
52 | 0 | _profile = std::make_unique<RuntimeProfile>("LoadChannels"); |
53 | 0 | _mgr_add_batch_timer = ADD_TIMER(_profile, "LoadChannelMgrAddBatchTime"); |
54 | 0 | _handle_mem_limit_timer = ADD_TIMER(_profile, "HandleMemLimitTime"); |
55 | 0 | _self_profile = |
56 | 0 | _profile->create_child(fmt::format("LoadChannel load_id={} (host={}, backend_id={})", |
57 | 0 | _load_id.to_string(), _sender_ip, _backend_id), |
58 | 0 | true, true); |
59 | 0 | _add_batch_number_counter = ADD_COUNTER(_self_profile, "NumberBatchAdded", TUnit::UNIT); |
60 | 0 | _peak_memory_usage_counter = ADD_COUNTER(_self_profile, "PeakMemoryUsage", TUnit::BYTES); |
61 | 0 | _add_batch_timer = ADD_TIMER(_self_profile, "AddBatchTime"); |
62 | 0 | _handle_eos_timer = ADD_CHILD_TIMER(_self_profile, "HandleEosTime", "AddBatchTime"); |
63 | 0 | _add_batch_times = ADD_COUNTER(_self_profile, "AddBatchTimes", TUnit::UNIT); |
64 | 0 | } |
65 | | |
66 | 0 | Status LoadChannel::open(const PTabletWriterOpenRequest& params) { |
67 | 0 | int64_t index_id = params.index_id(); |
68 | 0 | std::shared_ptr<TabletsChannel> channel; |
69 | 0 | { |
70 | 0 | std::lock_guard<std::mutex> l(_lock); |
71 | 0 | auto it = _tablets_channels.find(index_id); |
72 | 0 | if (it != _tablets_channels.end()) { |
73 | 0 | channel = it->second; |
74 | 0 | } else { |
75 | | // create a new tablets channel |
76 | 0 | TabletsChannelKey key(params.id(), index_id); |
77 | 0 | channel = std::make_shared<TabletsChannel>(key, _load_id, _is_high_priority, |
78 | 0 | _self_profile); |
79 | 0 | { |
80 | 0 | std::lock_guard<SpinLock> l(_tablets_channels_lock); |
81 | 0 | _tablets_channels.insert({index_id, channel}); |
82 | 0 | } |
83 | 0 | } |
84 | 0 | } |
85 | |
|
86 | 0 | RETURN_IF_ERROR(channel->open(params)); |
87 | | |
88 | 0 | _opened = true; |
89 | 0 | _last_updated_time.store(time(nullptr)); |
90 | 0 | return Status::OK(); |
91 | 0 | } |
92 | | |
93 | | Status LoadChannel::_get_tablets_channel(std::shared_ptr<TabletsChannel>& channel, |
94 | 0 | bool& is_finished, const int64_t index_id) { |
95 | 0 | std::lock_guard<std::mutex> l(_lock); |
96 | 0 | auto it = _tablets_channels.find(index_id); |
97 | 0 | if (it == _tablets_channels.end()) { |
98 | 0 | if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) { |
99 | | // this channel is already finished, just return OK |
100 | 0 | is_finished = true; |
101 | 0 | return Status::OK(); |
102 | 0 | } |
103 | 0 | std::stringstream ss; |
104 | 0 | ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id; |
105 | 0 | return Status::InternalError(ss.str()); |
106 | 0 | } |
107 | | |
108 | 0 | is_finished = false; |
109 | 0 | channel = it->second; |
110 | 0 | return Status::OK(); |
111 | 0 | } |
112 | | |
113 | | Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, |
114 | 0 | PTabletWriterAddBlockResult* response) { |
115 | 0 | SCOPED_TIMER(_add_batch_timer); |
116 | 0 | COUNTER_UPDATE(_add_batch_times, 1); |
117 | 0 | int64_t index_id = request.index_id(); |
118 | | // 1. get tablets channel |
119 | 0 | std::shared_ptr<TabletsChannel> channel; |
120 | 0 | bool is_finished = false; |
121 | 0 | Status st = _get_tablets_channel(channel, is_finished, index_id); |
122 | 0 | if (!st.ok() || is_finished) { |
123 | 0 | return st; |
124 | 0 | } |
125 | | |
126 | | // 2. add block to tablets channel |
127 | 0 | if (request.has_block()) { |
128 | 0 | RETURN_IF_ERROR(channel->add_batch(request, response)); |
129 | 0 | _add_batch_number_counter->update(1); |
130 | 0 | } |
131 | | |
132 | | // 3. handle eos |
133 | 0 | if (request.has_eos() && request.eos()) { |
134 | 0 | st = _handle_eos(channel, request, response); |
135 | 0 | _report_profile(response); |
136 | 0 | if (!st.ok()) { |
137 | 0 | return st; |
138 | 0 | } |
139 | 0 | } else if (_add_batch_number_counter->value() % 100 == 1) { |
140 | 0 | _report_profile(response); |
141 | 0 | } |
142 | 0 | _last_updated_time.store(time(nullptr)); |
143 | 0 | return st; |
144 | 0 | } |
145 | | |
146 | 0 | void LoadChannel::_report_profile(PTabletWriterAddBlockResult* response) { |
147 | 0 | if (!_enable_profile) { |
148 | 0 | return; |
149 | 0 | } |
150 | | |
151 | 0 | COUNTER_SET(_peak_memory_usage_counter, _mem_tracker->peak_consumption()); |
152 | | // TabletSink and LoadChannel in BE are M: N relationship, |
153 | | // Every once in a while LoadChannel will randomly return its own runtime profile to a TabletSink, |
154 | | // so usually all LoadChannel runtime profiles are saved on each TabletSink, |
155 | | // and the timeliness of the same LoadChannel profile saved on different TabletSinks is different, |
156 | | // and each TabletSink will periodically send fe reports all the LoadChannel profiles saved by itself, |
157 | | // and ensures to update the latest LoadChannel profile according to the timestamp. |
158 | 0 | _self_profile->set_timestamp(_last_updated_time); |
159 | |
|
160 | 0 | { |
161 | 0 | std::lock_guard<SpinLock> l(_tablets_channels_lock); |
162 | 0 | for (auto& it : _tablets_channels) { |
163 | 0 | it.second->refresh_profile(); |
164 | 0 | } |
165 | 0 | } |
166 | |
|
167 | 0 | TRuntimeProfileTree tprofile; |
168 | 0 | ThriftSerializer ser(false, 4096); |
169 | 0 | uint8_t* buf = nullptr; |
170 | 0 | uint32_t len = 0; |
171 | 0 | std::lock_guard<SpinLock> l(_profile_serialize_lock); |
172 | 0 | _profile->to_thrift(&tprofile); |
173 | 0 | auto st = ser.serialize(&tprofile, &len, &buf); |
174 | 0 | if (st.ok()) { |
175 | 0 | response->set_load_channel_profile(std::string((const char*)buf, len)); |
176 | 0 | } else { |
177 | 0 | LOG(WARNING) << "load channel TRuntimeProfileTree serialize failed, errmsg=" << st; |
178 | 0 | } |
179 | 0 | } |
180 | | |
181 | 0 | bool LoadChannel::is_finished() { |
182 | 0 | if (!_opened) { |
183 | 0 | return false; |
184 | 0 | } |
185 | 0 | std::lock_guard<std::mutex> l(_lock); |
186 | 0 | return _tablets_channels.empty(); |
187 | 0 | } |
188 | | |
189 | 0 | Status LoadChannel::cancel() { |
190 | 0 | std::lock_guard<std::mutex> l(_lock); |
191 | 0 | for (auto& it : _tablets_channels) { |
192 | 0 | it.second->cancel(); |
193 | 0 | } |
194 | 0 | return Status::OK(); |
195 | 0 | } |
196 | | |
197 | | } // namespace doris |