Coverage Report

Created: 2026-05-19 04:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "exec/exchange/vdata_stream_recvr.h"
33
#include "util/hash_util.hpp"
34
35
namespace doris {
36
37
18
VDataStreamMgr::VDataStreamMgr() {
38
    // TODO: metric
39
18
}
40
41
14
VDataStreamMgr::~VDataStreamMgr() {
42
    // Has to call close here, because receiver will check if the receiver is closed.
43
    // It will core during graceful stop.
44
14
    auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
45
14
    {
46
14
        SharedLockGuard l(_lock);
47
14
        auto receiver_iterator = _receiver_map.begin();
48
14
        while (receiver_iterator != _receiver_map.end()) {
49
            // Could not call close directly, because during close method, it will remove itself
50
            // from the map, and modify the map, it will core.
51
0
            receivers.push_back(receiver_iterator->second);
52
0
            receiver_iterator++;
53
0
        }
54
14
    }
55
14
    for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) {
56
0
        (*iter)->close();
57
0
    }
58
14
}
59
60
inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id,
61
2.93M
                                               PlanNodeId node_id) {
62
2.93M
    uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0);
63
2.93M
    value = HashUtil::hash(&fragment_instance_id.hi, 8, value);
64
2.93M
    value = HashUtil::hash(&node_id, 4, value);
65
2.93M
    return value;
66
2.93M
}
67
68
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
69
        RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
70
        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
71
334k
        RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
72
334k
    DCHECK(profile != nullptr);
73
18.4E
    VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id)
74
18.4E
              << ", node=" << dest_node_id;
75
334k
    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
76
334k
            this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders,
77
334k
            is_merging, profile, data_queue_capacity));
78
334k
    uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
79
334k
    LockGuard l(_lock);
80
334k
    _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
81
334k
    _receiver_map.insert(std::make_pair(hash_value, recvr));
82
334k
    return recvr;
83
334k
}
84
85
Status VDataStreamMgr::_find_recvr(uint32_t hash_value, const TUniqueId& fragment_instance_id,
86
2.25M
                                   PlanNodeId node_id, std::shared_ptr<VDataStreamRecvr>* res) {
87
18.4E
    VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
88
18.4E
             << ", node=" << node_id;
89
2.25M
    std::pair<StreamMap::iterator, StreamMap::iterator> range =
90
2.25M
            _receiver_map.equal_range(hash_value);
91
2.27M
    while (range.first != range.second) {
92
2.27M
        auto recvr = range.first->second;
93
2.27M
        if (recvr->fragment_instance_id() == fragment_instance_id &&
94
2.27M
            recvr->dest_node_id() == node_id) {
95
2.26M
            *res = recvr;
96
2.26M
            return Status::OK();
97
2.26M
        }
98
7.89k
        ++range.first;
99
7.89k
    }
100
18.4E
    return Status::InvalidArgument("Could not find local receiver for node {} with instance {}",
101
18.4E
                                   node_id, print_id(fragment_instance_id));
102
2.25M
}
103
104
Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
105
2.26M
                                  std::shared_ptr<VDataStreamRecvr>* res) {
106
2.26M
    SharedLockGuard recvr_lock(_lock);
107
2.26M
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
108
2.26M
    return _find_recvr(hash_value, fragment_instance_id, node_id, res);
109
2.26M
}
110
111
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
112
                                      ::google::protobuf::Closure** done,
113
718k
                                      const int64_t wait_for_worker) {
114
718k
    const PUniqueId& finst_id = request->finst_id();
115
718k
    TUniqueId t_finst_id;
116
718k
    t_finst_id.hi = finst_id.hi();
117
718k
    t_finst_id.lo = finst_id.lo();
118
718k
    std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
119
718k
    ThreadCpuStopWatch cpu_time_stop_watch;
120
718k
    cpu_time_stop_watch.start();
121
718k
    static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
122
718k
    if (recvr == nullptr) {
123
        // The receiver may remove itself from the receiver map via deregister_recvr()
124
        // at any time without considering the remaining number of senders.
125
        // As a consequence, find_recvr() may return an innocuous NULL if a thread
126
        // calling deregister_recvr() beat the thread calling find_recvr()
127
        // in acquiring _lock.
128
        //
129
        // e.g. for broadcast join build side, only one instance will build the hash table,
130
        // all other instances don't need build side data and will close the data stream receiver.
131
        //
132
        // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
133
        // errors from receiver-initiated teardowns.
134
3.62k
        return Status::EndOfFile("data stream receiver closed");
135
3.62k
    }
136
137
    // Lock the fragment context to ensure the runtime state and other objects are not
138
    // deconstructed
139
714k
    auto ctx_lock = recvr->task_exec_ctx();
140
714k
    if (ctx_lock == nullptr) {
141
        // Do not return internal error, because when query finished, the downstream node
142
        // may finish before upstream node. And the object maybe deconstructed. If return error
143
        // then the upstream node may report error status to FE, the query is failed.
144
0
        return Status::EndOfFile("data stream receiver is deconstructed");
145
0
    }
146
147
714k
    bool eos = request->eos();
148
714k
    Status exec_status =
149
714k
            request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
150
151
714k
    auto sender_id = request->sender_id();
152
714k
    auto be_number = request->be_number();
153
714k
    if (!request->blocks().empty()) {
154
65.8k
        RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
155
65.8k
                                          cpu_time_stop_watch.elapsed_time()));
156
648k
    } else if (request->has_block()) {
157
        // old logic, for compatibility
158
0
        std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
159
0
        pblock_ptr->CopyFrom(request->block());
160
0
        RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
161
0
                                         request->packet_seq(), eos ? nullptr : done,
162
0
                                         wait_for_worker, cpu_time_stop_watch.elapsed_time()));
163
0
    }
164
165
714k
    if (eos) {
166
704k
        recvr->remove_sender(sender_id, be_number, exec_status);
167
704k
    }
168
714k
    return Status::OK();
169
714k
}
170
171
338k
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
172
338k
    std::shared_ptr<VDataStreamRecvr> targert_recvr;
173
18.4E
    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
174
18.4E
               << ", node=" << node_id;
175
338k
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
176
338k
    {
177
338k
        LockGuard l(_lock);
178
338k
        auto range = _receiver_map.equal_range(hash_value);
179
339k
        while (range.first != range.second) {
180
339k
            const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second;
181
339k
            if (recvr->fragment_instance_id() == fragment_instance_id &&
182
339k
                recvr->dest_node_id() == node_id) {
183
339k
                targert_recvr = recvr;
184
339k
                _fragment_stream_set.erase(
185
339k
                        std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()));
186
339k
                _receiver_map.erase(range.first);
187
339k
                break;
188
339k
            }
189
0
            ++range.first;
190
0
        }
191
338k
    }
192
193
    // Notify concurrent add_data() requests that the stream has been terminated.
194
    // cancel_stream maybe take a long time, so we handle it out of lock.
195
339k
    if (targert_recvr) {
196
339k
        targert_recvr->cancel_stream(Status::OK());
197
339k
        return Status::OK();
198
18.4E
    } else {
199
18.4E
        return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}",
200
18.4E
                                     print_id(fragment_instance_id), node_id);
201
18.4E
    }
202
338k
}
203
204
0
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) {
205
0
    VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id);
206
0
    std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
207
0
    {
208
0
        SharedLockGuard l(_lock);
209
0
        FragmentStreamSet::iterator i =
210
0
                _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
211
0
        while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) {
212
0
            std::shared_ptr<VDataStreamRecvr> recvr;
213
0
            uint32_t hash_value = get_hash_value(i->first, i->second);
214
0
            WARN_IF_ERROR(_find_recvr(hash_value, i->first, i->second, &recvr), "");
215
0
            if (recvr == nullptr) {
216
                // keep going but at least log it
217
0
                std::stringstream err;
218
0
                err << "cancel(): missing in stream_map: fragment=" << print_id(i->first)
219
0
                    << " node=" << i->second;
220
0
                LOG(ERROR) << err.str();
221
0
            } else {
222
0
                recvrs.push_back(recvr);
223
0
            }
224
0
            ++i;
225
0
        }
226
0
    }
227
228
    // cancel_stream maybe take a long time, so we handle it out of lock.
229
0
    for (auto& it : recvrs) {
230
0
        it->cancel_stream(exec_status);
231
0
    }
232
0
}
233
234
} // namespace doris