Coverage Report

Created: 2026-05-17 18:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "exec/exchange/vdata_stream_recvr.h"
33
#include "util/hash_util.hpp"
34
35
namespace doris {
36
37
17
VDataStreamMgr::VDataStreamMgr() {
38
    // TODO: metric
39
17
}
40
41
14
VDataStreamMgr::~VDataStreamMgr() {
42
    // Has to call close here, because receiver will check if the receiver is closed.
43
    // It will core during graceful stop.
44
14
    auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
45
14
    {
46
14
        std::shared_lock l(_lock);
47
14
        auto receiver_iterator = _receiver_map.begin();
48
14
        while (receiver_iterator != _receiver_map.end()) {
49
            // Could not call close directly, because during close method, it will remove itself
50
            // from the map, and modify the map, it will core.
51
0
            receivers.push_back(receiver_iterator->second);
52
0
            receiver_iterator++;
53
0
        }
54
14
    }
55
14
    for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) {
56
0
        (*iter)->close();
57
0
    }
58
14
}
59
60
inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id,
61
809
                                               PlanNodeId node_id) {
62
809
    uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0);
63
809
    value = HashUtil::hash(&fragment_instance_id.hi, 8, value);
64
809
    value = HashUtil::hash(&node_id, 4, value);
65
809
    return value;
66
809
}
67
68
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
69
        RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
70
        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
71
269
        RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
72
269
    DCHECK(profile != nullptr);
73
269
    VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id)
74
0
              << ", node=" << dest_node_id;
75
269
    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
76
269
            this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders,
77
269
            is_merging, profile, data_queue_capacity));
78
269
    uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
79
269
    std::unique_lock l(_lock);
80
269
    _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
81
269
    _receiver_map.insert(std::make_pair(hash_value, recvr));
82
269
    return recvr;
83
269
}
84
85
Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
86
271
                                  std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) {
87
271
    VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
88
0
             << ", node=" << node_id;
89
271
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
90
    // Create lock guard and not own lock currently and will lock conditionally
91
271
    std::shared_lock recvr_lock(_lock, std::defer_lock);
92
271
    if (acquire_lock) {
93
269
        recvr_lock.lock();
94
269
    }
95
271
    std::pair<StreamMap::iterator, StreamMap::iterator> range =
96
271
            _receiver_map.equal_range(hash_value);
97
271
    while (range.first != range.second) {
98
271
        auto recvr = range.first->second;
99
271
        if (recvr->fragment_instance_id() == fragment_instance_id &&
100
271
            recvr->dest_node_id() == node_id) {
101
271
            *res = recvr;
102
271
            return Status::OK();
103
271
        }
104
0
        ++range.first;
105
0
    }
106
0
    return Status::InvalidArgument("Could not find local receiver for node {} with instance {}",
107
0
                                   node_id, print_id(fragment_instance_id));
108
271
}
109
110
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
111
                                      ::google::protobuf::Closure** done,
112
107
                                      const int64_t wait_for_worker) {
113
107
    const PUniqueId& finst_id = request->finst_id();
114
107
    TUniqueId t_finst_id;
115
107
    t_finst_id.hi = finst_id.hi();
116
107
    t_finst_id.lo = finst_id.lo();
117
107
    std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
118
107
    ThreadCpuStopWatch cpu_time_stop_watch;
119
107
    cpu_time_stop_watch.start();
120
107
    static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
121
107
    if (recvr == nullptr) {
122
        // The receiver may remove itself from the receiver map via deregister_recvr()
123
        // at any time without considering the remaining number of senders.
124
        // As a consequence, find_recvr() may return an innocuous NULL if a thread
125
        // calling deregister_recvr() beat the thread calling find_recvr()
126
        // in acquiring _lock.
127
        //
128
        // e.g. for broadcast join build side, only one instance will build the hash table,
129
        // all other instances don't need build side data and will close the data stream receiver.
130
        //
131
        // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
132
        // errors from receiver-initiated teardowns.
133
0
        return Status::EndOfFile("data stream receiver closed");
134
0
    }
135
136
    // Lock the fragment context to ensure the runtime state and other objects are not
137
    // deconstructed
138
107
    auto ctx_lock = recvr->task_exec_ctx();
139
107
    if (ctx_lock == nullptr) {
140
        // Do not return internal error, because when query finished, the downstream node
141
        // may finish before upstream node. And the object maybe deconstructed. If return error
142
        // then the upstream node may report error status to FE, the query is failed.
143
0
        return Status::EndOfFile("data stream receiver is deconstructed");
144
0
    }
145
146
107
    bool eos = request->eos();
147
107
    Status exec_status =
148
107
            request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
149
150
107
    auto sender_id = request->sender_id();
151
107
    auto be_number = request->be_number();
152
107
    if (!request->blocks().empty()) {
153
107
        RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
154
107
                                          cpu_time_stop_watch.elapsed_time()));
155
107
    } else if (request->has_block()) {
156
        // old logic, for compatibility
157
0
        std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
158
0
        pblock_ptr->CopyFrom(request->block());
159
0
        RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
160
0
                                         request->packet_seq(), eos ? nullptr : done,
161
0
                                         wait_for_worker, cpu_time_stop_watch.elapsed_time()));
162
0
    }
163
164
107
    if (eos) {
165
106
        recvr->remove_sender(sender_id, be_number, exec_status);
166
106
    }
167
107
    return Status::OK();
168
107
}
169
170
269
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
171
269
    std::shared_ptr<VDataStreamRecvr> targert_recvr;
172
269
    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
173
0
               << ", node=" << node_id;
174
269
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
175
269
    {
176
269
        std::unique_lock l(_lock);
177
269
        auto range = _receiver_map.equal_range(hash_value);
178
269
        while (range.first != range.second) {
179
269
            const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second;
180
269
            if (recvr->fragment_instance_id() == fragment_instance_id &&
181
269
                recvr->dest_node_id() == node_id) {
182
269
                targert_recvr = recvr;
183
269
                _fragment_stream_set.erase(
184
269
                        std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()));
185
269
                _receiver_map.erase(range.first);
186
269
                break;
187
269
            }
188
0
            ++range.first;
189
0
        }
190
269
    }
191
192
    // Notify concurrent add_data() requests that the stream has been terminated.
193
    // cancel_stream maybe take a long time, so we handle it out of lock.
194
269
    if (targert_recvr) {
195
269
        targert_recvr->cancel_stream(Status::OK());
196
269
        return Status::OK();
197
269
    } else {
198
0
        return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}",
199
0
                                     print_id(fragment_instance_id), node_id);
200
0
    }
201
269
}
202
203
0
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) {
204
0
    VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id);
205
0
    std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
206
0
    {
207
0
        std::shared_lock l(_lock);
208
0
        FragmentStreamSet::iterator i =
209
0
                _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
210
0
        while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) {
211
0
            std::shared_ptr<VDataStreamRecvr> recvr;
212
0
            WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
213
0
            if (recvr == nullptr) {
214
                // keep going but at least log it
215
0
                std::stringstream err;
216
0
                err << "cancel(): missing in stream_map: fragment=" << print_id(i->first)
217
0
                    << " node=" << i->second;
218
0
                LOG(ERROR) << err.str();
219
0
            } else {
220
0
                recvrs.push_back(recvr);
221
0
            }
222
0
            ++i;
223
0
        }
224
0
    }
225
226
    // cancel_stream maybe take a long time, so we handle it out of lock.
227
0
    for (auto& it : recvrs) {
228
0
        it->cancel_stream(exec_status);
229
0
    }
230
0
}
231
232
} // namespace doris