be/src/exec/exchange/vdata_stream_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/exchange/vdata_stream_mgr.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <gen_cpp/data.pb.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <gen_cpp/types.pb.h> |
24 | | #include <stddef.h> |
25 | | |
26 | | #include <memory> |
27 | | #include <ostream> |
28 | | #include <string> |
29 | | #include <vector> |
30 | | |
31 | | #include "common/logging.h" |
32 | | #include "exec/exchange/vdata_stream_recvr.h" |
33 | | #include "util/hash_util.hpp" |
34 | | |
35 | | namespace doris { |
36 | | #include "common/compile_check_begin.h" |
37 | | |
38 | 11 | VDataStreamMgr::VDataStreamMgr() { |
39 | | // TODO: metric |
40 | 11 | } |
41 | | |
42 | 7 | VDataStreamMgr::~VDataStreamMgr() { |
43 | | // Has to call close here, because receiver will check if the receiver is closed. |
44 | | // It will core during graceful stop. |
45 | 7 | auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>(); |
46 | 7 | { |
47 | 7 | std::shared_lock l(_lock); |
48 | 7 | auto receiver_iterator = _receiver_map.begin(); |
49 | 7 | while (receiver_iterator != _receiver_map.end()) { |
50 | | // Could not call close directly, because during close method, it will remove itself |
51 | | // from the map, and modify the map, it will core. |
52 | 0 | receivers.push_back(receiver_iterator->second); |
53 | 0 | receiver_iterator++; |
54 | 0 | } |
55 | 7 | } |
56 | 7 | for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) { |
57 | 0 | (*iter)->close(); |
58 | 0 | } |
59 | 7 | } |
60 | | |
61 | | inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id, |
62 | 6.47M | PlanNodeId node_id) { |
63 | 6.47M | uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0); |
64 | 6.47M | value = HashUtil::hash(&fragment_instance_id.hi, 8, value); |
65 | 6.47M | value = HashUtil::hash(&node_id, 4, value); |
66 | 6.47M | return value; |
67 | 6.47M | } |
68 | | |
69 | | std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr( |
70 | | RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter, |
71 | | const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders, |
72 | 491k | RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) { |
73 | 491k | DCHECK(profile != nullptr); |
74 | 491k | VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id) |
75 | 552 | << ", node=" << dest_node_id; |
76 | 491k | std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr( |
77 | 491k | this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders, |
78 | 491k | is_merging, profile, data_queue_capacity)); |
79 | 491k | uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id); |
80 | 491k | std::unique_lock l(_lock); |
81 | 491k | _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id)); |
82 | 491k | _receiver_map.insert(std::make_pair(hash_value, recvr)); |
83 | 491k | return recvr; |
84 | 491k | } |
85 | | |
86 | | Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id, |
87 | 5.49M | std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) { |
88 | 18.4E | VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id) |
89 | 18.4E | << ", node=" << node_id; |
90 | 5.49M | uint32_t hash_value = get_hash_value(fragment_instance_id, node_id); |
91 | | // Create lock guard and not own lock currently and will lock conditionally |
92 | 5.49M | std::shared_lock recvr_lock(_lock, std::defer_lock); |
93 | 5.52M | if (acquire_lock) { |
94 | 5.52M | recvr_lock.lock(); |
95 | 5.52M | } |
96 | 5.49M | std::pair<StreamMap::iterator, StreamMap::iterator> range = |
97 | 5.49M | _receiver_map.equal_range(hash_value); |
98 | 5.54M | while (range.first != range.second) { |
99 | 5.54M | auto recvr = range.first->second; |
100 | 5.54M | if (recvr->fragment_instance_id() == fragment_instance_id && |
101 | 5.56M | recvr->dest_node_id() == node_id) { |
102 | 5.56M | *res = recvr; |
103 | 5.56M | return Status::OK(); |
104 | 5.56M | } |
105 | 18.4E | ++range.first; |
106 | 18.4E | } |
107 | 18.4E | return Status::InvalidArgument("Could not find local receiver for node {} with instance {}", |
108 | 18.4E | node_id, print_id(fragment_instance_id)); |
109 | 5.49M | } |
110 | | |
111 | | Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request, |
112 | | ::google::protobuf::Closure** done, |
113 | 2.73M | const int64_t wait_for_worker) { |
114 | 2.73M | const PUniqueId& finst_id = request->finst_id(); |
115 | 2.73M | TUniqueId t_finst_id; |
116 | 2.73M | t_finst_id.hi = finst_id.hi(); |
117 | 2.73M | t_finst_id.lo = finst_id.lo(); |
118 | 2.73M | std::shared_ptr<VDataStreamRecvr> recvr = nullptr; |
119 | 2.73M | ThreadCpuStopWatch cpu_time_stop_watch; |
120 | 2.73M | cpu_time_stop_watch.start(); |
121 | 2.73M | static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr)); |
122 | 2.73M | if (recvr == nullptr) { |
123 | | // The receiver may remove itself from the receiver map via deregister_recvr() |
124 | | // at any time without considering the remaining number of senders. |
125 | | // As a consequence, find_recvr() may return an innocuous NULL if a thread |
126 | | // calling deregister_recvr() beat the thread calling find_recvr() |
127 | | // in acquiring _lock. |
128 | | // |
129 | | // e.g. for broadcast join build side, only one instance will build the hash table, |
130 | | // all other instances don't need build side data and will close the data stream receiver. |
131 | | // |
132 | | // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish |
133 | | // errors from receiver-initiated teardowns. |
134 | 15.1k | return Status::EndOfFile("data stream receiver closed"); |
135 | 15.1k | } |
136 | | |
137 | | // Lock the fragment context to ensure the runtime state and other objects are not |
138 | | // deconstructed |
139 | 2.71M | auto ctx_lock = recvr->task_exec_ctx(); |
140 | 2.71M | if (ctx_lock == nullptr) { |
141 | | // Do not return internal error, because when query finished, the downstream node |
142 | | // may finish before upstream node. And the object maybe deconstructed. If return error |
143 | | // then the upstream node may report error status to FE, the query is failed. |
144 | 0 | return Status::EndOfFile("data stream receiver is deconstructed"); |
145 | 0 | } |
146 | | |
147 | 2.71M | bool eos = request->eos(); |
148 | 2.71M | Status exec_status = |
149 | 2.71M | request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK(); |
150 | | |
151 | 2.71M | auto sender_id = request->sender_id(); |
152 | 2.71M | auto be_number = request->be_number(); |
153 | 2.71M | if (!request->blocks().empty()) { |
154 | 88.7k | RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker, |
155 | 88.7k | cpu_time_stop_watch.elapsed_time())); |
156 | 2.62M | } else if (request->has_block()) { |
157 | | // old logic, for compatibility |
158 | 0 | std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>(); |
159 | 0 | pblock_ptr->CopyFrom(request->block()); |
160 | 0 | RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number, |
161 | 0 | request->packet_seq(), eos ? nullptr : done, |
162 | 0 | wait_for_worker, cpu_time_stop_watch.elapsed_time())); |
163 | 0 | } |
164 | | |
165 | 2.71M | if (eos) { |
166 | 2.69M | recvr->remove_sender(sender_id, be_number, exec_status); |
167 | 2.69M | } |
168 | 2.71M | return Status::OK(); |
169 | 2.71M | } |
170 | | |
171 | 494k | Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) { |
172 | 494k | std::shared_ptr<VDataStreamRecvr> targert_recvr; |
173 | 18.4E | VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id) |
174 | 18.4E | << ", node=" << node_id; |
175 | 494k | uint32_t hash_value = get_hash_value(fragment_instance_id, node_id); |
176 | 494k | { |
177 | 494k | std::unique_lock l(_lock); |
178 | 494k | auto range = _receiver_map.equal_range(hash_value); |
179 | 495k | while (range.first != range.second) { |
180 | 495k | const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second; |
181 | 495k | if (recvr->fragment_instance_id() == fragment_instance_id && |
182 | 495k | recvr->dest_node_id() == node_id) { |
183 | 495k | targert_recvr = recvr; |
184 | 495k | _fragment_stream_set.erase( |
185 | 495k | std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id())); |
186 | 495k | _receiver_map.erase(range.first); |
187 | 495k | break; |
188 | 495k | } |
189 | 0 | ++range.first; |
190 | 0 | } |
191 | 494k | } |
192 | | |
193 | | // Notify concurrent add_data() requests that the stream has been terminated. |
194 | | // cancel_stream maybe take a long time, so we handle it out of lock. |
195 | 495k | if (targert_recvr) { |
196 | 495k | targert_recvr->cancel_stream(Status::OK()); |
197 | 495k | return Status::OK(); |
198 | 18.4E | } else { |
199 | 18.4E | return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}", |
200 | 18.4E | print_id(fragment_instance_id), node_id); |
201 | 18.4E | } |
202 | 494k | } |
203 | | |
204 | 0 | void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) { |
205 | 0 | VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id); |
206 | 0 | std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs; |
207 | 0 | { |
208 | 0 | std::shared_lock l(_lock); |
209 | 0 | FragmentStreamSet::iterator i = |
210 | 0 | _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0)); |
211 | 0 | while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) { |
212 | 0 | std::shared_ptr<VDataStreamRecvr> recvr; |
213 | 0 | WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), ""); |
214 | 0 | if (recvr == nullptr) { |
215 | | // keep going but at least log it |
216 | 0 | std::stringstream err; |
217 | 0 | err << "cancel(): missing in stream_map: fragment=" << print_id(i->first) |
218 | 0 | << " node=" << i->second; |
219 | 0 | LOG(ERROR) << err.str(); |
220 | 0 | } else { |
221 | 0 | recvrs.push_back(recvr); |
222 | 0 | } |
223 | 0 | ++i; |
224 | 0 | } |
225 | 0 | } |
226 | | |
227 | | // cancel_stream maybe take a long time, so we handle it out of lock. |
228 | 0 | for (auto& it : recvrs) { |
229 | 0 | it->cancel_stream(exec_status); |
230 | 0 | } |
231 | 0 | } |
232 | | |
233 | | } // namespace doris |