Coverage Report

Created: 2026-04-10 18:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "exec/exchange/vdata_stream_recvr.h"
33
#include "util/hash_util.hpp"
34
35
namespace doris {
36
37
9
VDataStreamMgr::VDataStreamMgr() {
38
    // TODO: metric
39
9
}
40
41
6
VDataStreamMgr::~VDataStreamMgr() {
42
    // Has to call close here, because receiver will check if the receiver is closed.
43
    // It will core during graceful stop.
44
6
    auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
45
6
    {
46
6
        std::shared_lock l(_lock);
47
6
        auto receiver_iterator = _receiver_map.begin();
48
6
        while (receiver_iterator != _receiver_map.end()) {
49
            // Could not call close directly, because during close method, it will remove itself
50
            // from the map, and modify the map, it will core.
51
0
            receivers.push_back(receiver_iterator->second);
52
0
            receiver_iterator++;
53
0
        }
54
6
    }
55
6
    for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) {
56
0
        (*iter)->close();
57
0
    }
58
6
}
59
60
inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id,
61
4.64M
                                               PlanNodeId node_id) {
62
4.64M
    uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0);
63
4.64M
    value = HashUtil::hash(&fragment_instance_id.hi, 8, value);
64
4.64M
    value = HashUtil::hash(&node_id, 4, value);
65
4.64M
    return value;
66
4.64M
}
67
68
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
69
        RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
70
        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
71
413k
        RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
72
413k
    DCHECK(profile != nullptr);
73
18.4E
    VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id)
74
18.4E
              << ", node=" << dest_node_id;
75
413k
    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
76
413k
            this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders,
77
413k
            is_merging, profile, data_queue_capacity));
78
413k
    uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
79
413k
    std::unique_lock l(_lock);
80
413k
    _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
81
413k
    _receiver_map.insert(std::make_pair(hash_value, recvr));
82
413k
    return recvr;
83
413k
}
84
85
Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
86
3.80M
                                  std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) {
87
3.80M
    VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
88
1.93k
             << ", node=" << node_id;
89
3.80M
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
90
    // Create lock guard and not own lock currently and will lock conditionally
91
3.80M
    std::shared_lock recvr_lock(_lock, std::defer_lock);
92
3.83M
    if (acquire_lock) {
93
3.83M
        recvr_lock.lock();
94
3.83M
    }
95
3.80M
    std::pair<StreamMap::iterator, StreamMap::iterator> range =
96
3.80M
            _receiver_map.equal_range(hash_value);
97
3.84M
    while (range.first != range.second) {
98
3.84M
        auto recvr = range.first->second;
99
3.84M
        if (recvr->fragment_instance_id() == fragment_instance_id &&
100
3.84M
            recvr->dest_node_id() == node_id) {
101
3.84M
            *res = recvr;
102
3.84M
            return Status::OK();
103
3.84M
        }
104
4.10k
        ++range.first;
105
4.10k
    }
106
18.4E
    return Status::InvalidArgument("Could not find local receiver for node {} with instance {}",
107
18.4E
                                   node_id, print_id(fragment_instance_id));
108
3.80M
}
109
110
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
111
                                      ::google::protobuf::Closure** done,
112
2.59M
                                      const int64_t wait_for_worker) {
113
2.59M
    const PUniqueId& finst_id = request->finst_id();
114
2.59M
    TUniqueId t_finst_id;
115
2.59M
    t_finst_id.hi = finst_id.hi();
116
2.59M
    t_finst_id.lo = finst_id.lo();
117
2.59M
    std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
118
2.59M
    ThreadCpuStopWatch cpu_time_stop_watch;
119
2.59M
    cpu_time_stop_watch.start();
120
2.59M
    static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
121
2.59M
    if (recvr == nullptr) {
122
        // The receiver may remove itself from the receiver map via deregister_recvr()
123
        // at any time without considering the remaining number of senders.
124
        // As a consequence, find_recvr() may return an innocuous NULL if a thread
125
        // calling deregister_recvr() beat the thread calling find_recvr()
126
        // in acquiring _lock.
127
        //
128
        // e.g. for broadcast join build side, only one instance will build the hash table,
129
        // all other instances don't need build side data and will close the data stream receiver.
130
        //
131
        // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
132
        // errors from receiver-initiated teardowns.
133
7.59k
        return Status::EndOfFile("data stream receiver closed");
134
7.59k
    }
135
136
    // Lock the fragment context to ensure the runtime state and other objects are not
137
    // deconstructed
138
2.58M
    auto ctx_lock = recvr->task_exec_ctx();
139
2.58M
    if (ctx_lock == nullptr) {
140
        // Do not return internal error, because when query finished, the downstream node
141
        // may finish before upstream node. And the object maybe deconstructed. If return error
142
        // then the upstream node may report error status to FE, the query is failed.
143
0
        return Status::EndOfFile("data stream receiver is deconstructed");
144
0
    }
145
146
2.58M
    bool eos = request->eos();
147
2.58M
    Status exec_status =
148
2.58M
            request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
149
150
2.58M
    auto sender_id = request->sender_id();
151
2.58M
    auto be_number = request->be_number();
152
2.58M
    if (!request->blocks().empty()) {
153
70.0k
        RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
154
70.0k
                                          cpu_time_stop_watch.elapsed_time()));
155
2.51M
    } else if (request->has_block()) {
156
        // old logic, for compatibility
157
0
        std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
158
0
        pblock_ptr->CopyFrom(request->block());
159
0
        RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
160
0
                                         request->packet_seq(), eos ? nullptr : done,
161
0
                                         wait_for_worker, cpu_time_stop_watch.elapsed_time()));
162
0
    }
163
164
2.58M
    if (eos) {
165
2.57M
        recvr->remove_sender(sender_id, be_number, exec_status);
166
2.57M
    }
167
2.58M
    return Status::OK();
168
2.58M
}
169
170
416k
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
171
416k
    std::shared_ptr<VDataStreamRecvr> targert_recvr;
172
18.4E
    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
173
18.4E
               << ", node=" << node_id;
174
416k
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
175
416k
    {
176
416k
        std::unique_lock l(_lock);
177
416k
        auto range = _receiver_map.equal_range(hash_value);
178
417k
        while (range.first != range.second) {
179
417k
            const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second;
180
417k
            if (recvr->fragment_instance_id() == fragment_instance_id &&
181
417k
                recvr->dest_node_id() == node_id) {
182
417k
                targert_recvr = recvr;
183
417k
                _fragment_stream_set.erase(
184
417k
                        std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()));
185
417k
                _receiver_map.erase(range.first);
186
417k
                break;
187
417k
            }
188
0
            ++range.first;
189
0
        }
190
416k
    }
191
192
    // Notify concurrent add_data() requests that the stream has been terminated.
193
    // cancel_stream maybe take a long time, so we handle it out of lock.
194
417k
    if (targert_recvr) {
195
417k
        targert_recvr->cancel_stream(Status::OK());
196
417k
        return Status::OK();
197
18.4E
    } else {
198
18.4E
        return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}",
199
18.4E
                                     print_id(fragment_instance_id), node_id);
200
18.4E
    }
201
416k
}
202
203
0
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) {
204
0
    VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id);
205
0
    std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
206
0
    {
207
0
        std::shared_lock l(_lock);
208
0
        FragmentStreamSet::iterator i =
209
0
                _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
210
0
        while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) {
211
0
            std::shared_ptr<VDataStreamRecvr> recvr;
212
0
            WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
213
0
            if (recvr == nullptr) {
214
                // keep going but at least log it
215
0
                std::stringstream err;
216
0
                err << "cancel(): missing in stream_map: fragment=" << print_id(i->first)
217
0
                    << " node=" << i->second;
218
0
                LOG(ERROR) << err.str();
219
0
            } else {
220
0
                recvrs.push_back(recvr);
221
0
            }
222
0
            ++i;
223
0
        }
224
0
    }
225
226
    // cancel_stream maybe take a long time, so we handle it out of lock.
227
0
    for (auto& it : recvrs) {
228
0
        it->cancel_stream(exec_status);
229
0
    }
230
0
}
231
232
} // namespace doris