Coverage Report

Created: 2026-03-16 21:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "exec/exchange/vdata_stream_recvr.h"
33
#include "util/hash_util.hpp"
34
35
namespace doris {
36
#include "common/compile_check_begin.h"
37
38
4
VDataStreamMgr::VDataStreamMgr() {
39
    // TODO: metric
40
4
}
41
42
4
VDataStreamMgr::~VDataStreamMgr() {
43
    // Has to call close here, because receiver will check if the receiver is closed.
44
    // It will core during graceful stop.
45
4
    auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
46
4
    {
47
4
        std::shared_lock l(_lock);
48
4
        auto receiver_iterator = _receiver_map.begin();
49
4
        while (receiver_iterator != _receiver_map.end()) {
50
            // Could not call close directly, because during close method, it will remove itself
51
            // from the map, and modify the map, it will core.
52
0
            receivers.push_back(receiver_iterator->second);
53
0
            receiver_iterator++;
54
0
        }
55
4
    }
56
4
    for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) {
57
0
        (*iter)->close();
58
0
    }
59
4
}
60
61
inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id,
62
17
                                               PlanNodeId node_id) {
63
17
    uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0);
64
17
    value = HashUtil::hash(&fragment_instance_id.hi, 8, value);
65
17
    value = HashUtil::hash(&node_id, 4, value);
66
17
    return value;
67
17
}
68
69
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
70
        RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
71
        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
72
7
        RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
73
7
    DCHECK(profile != nullptr);
74
7
    VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id)
75
0
              << ", node=" << dest_node_id;
76
7
    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
77
7
            this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders,
78
7
            is_merging, profile, data_queue_capacity));
79
7
    uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
80
7
    std::unique_lock l(_lock);
81
7
    _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
82
7
    _receiver_map.insert(std::make_pair(hash_value, recvr));
83
7
    return recvr;
84
7
}
85
86
Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
87
3
                                  std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) {
88
3
    VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
89
0
             << ", node=" << node_id;
90
3
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
91
    // Create lock guard and not own lock currently and will lock conditionally
92
3
    std::shared_lock recvr_lock(_lock, std::defer_lock);
93
3
    if (acquire_lock) {
94
3
        recvr_lock.lock();
95
3
    }
96
3
    std::pair<StreamMap::iterator, StreamMap::iterator> range =
97
3
            _receiver_map.equal_range(hash_value);
98
3
    while (range.first != range.second) {
99
3
        auto recvr = range.first->second;
100
3
        if (recvr->fragment_instance_id() == fragment_instance_id &&
101
3
            recvr->dest_node_id() == node_id) {
102
3
            *res = recvr;
103
3
            return Status::OK();
104
3
        }
105
0
        ++range.first;
106
0
    }
107
0
    return Status::InvalidArgument("Could not find local receiver for node {} with instance {}",
108
0
                                   node_id, print_id(fragment_instance_id));
109
3
}
110
111
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
112
                                      ::google::protobuf::Closure** done,
113
1
                                      const int64_t wait_for_worker) {
114
1
    const PUniqueId& finst_id = request->finst_id();
115
1
    TUniqueId t_finst_id;
116
1
    t_finst_id.hi = finst_id.hi();
117
1
    t_finst_id.lo = finst_id.lo();
118
1
    std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
119
1
    ThreadCpuStopWatch cpu_time_stop_watch;
120
1
    cpu_time_stop_watch.start();
121
1
    static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
122
1
    if (recvr == nullptr) {
123
        // The receiver may remove itself from the receiver map via deregister_recvr()
124
        // at any time without considering the remaining number of senders.
125
        // As a consequence, find_recvr() may return an innocuous NULL if a thread
126
        // calling deregister_recvr() beat the thread calling find_recvr()
127
        // in acquiring _lock.
128
        //
129
        // e.g. for broadcast join build side, only one instance will build the hash table,
130
        // all other instances don't need build side data and will close the data stream receiver.
131
        //
132
        // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
133
        // errors from receiver-initiated teardowns.
134
0
        return Status::EndOfFile("data stream receiver closed");
135
0
    }
136
137
    // Lock the fragment context to ensure the runtime state and other objects are not
138
    // deconstructed
139
1
    auto ctx_lock = recvr->task_exec_ctx();
140
1
    if (ctx_lock == nullptr) {
141
        // Do not return internal error, because when query finished, the downstream node
142
        // may finish before upstream node. And the object maybe deconstructed. If return error
143
        // then the upstream node may report error status to FE, the query is failed.
144
0
        return Status::EndOfFile("data stream receiver is deconstructed");
145
0
    }
146
147
1
    bool eos = request->eos();
148
1
    Status exec_status =
149
1
            request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
150
151
1
    auto sender_id = request->sender_id();
152
1
    auto be_number = request->be_number();
153
1
    if (!request->blocks().empty()) {
154
1
        RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
155
1
                                          cpu_time_stop_watch.elapsed_time()));
156
1
    } else if (request->has_block()) {
157
        // old logic, for compatibility
158
0
        std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
159
0
        pblock_ptr->CopyFrom(request->block());
160
0
        RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
161
0
                                         request->packet_seq(), eos ? nullptr : done,
162
0
                                         wait_for_worker, cpu_time_stop_watch.elapsed_time()));
163
0
    }
164
165
1
    if (eos) {
166
0
        recvr->remove_sender(sender_id, be_number, exec_status);
167
0
    }
168
1
    return Status::OK();
169
1
}
170
171
7
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
172
7
    std::shared_ptr<VDataStreamRecvr> targert_recvr;
173
7
    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
174
0
               << ", node=" << node_id;
175
7
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
176
7
    {
177
7
        std::unique_lock l(_lock);
178
7
        auto range = _receiver_map.equal_range(hash_value);
179
7
        while (range.first != range.second) {
180
7
            const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second;
181
7
            if (recvr->fragment_instance_id() == fragment_instance_id &&
182
7
                recvr->dest_node_id() == node_id) {
183
7
                targert_recvr = recvr;
184
7
                _fragment_stream_set.erase(
185
7
                        std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()));
186
7
                _receiver_map.erase(range.first);
187
7
                break;
188
7
            }
189
0
            ++range.first;
190
0
        }
191
7
    }
192
193
    // Notify concurrent add_data() requests that the stream has been terminated.
194
    // cancel_stream maybe take a long time, so we handle it out of lock.
195
7
    if (targert_recvr) {
196
7
        targert_recvr->cancel_stream(Status::OK());
197
7
        return Status::OK();
198
7
    } else {
199
0
        return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}",
200
0
                                     print_id(fragment_instance_id), node_id);
201
0
    }
202
7
}
203
204
0
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) {
205
0
    VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id);
206
0
    std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
207
0
    {
208
0
        std::shared_lock l(_lock);
209
0
        FragmentStreamSet::iterator i =
210
0
                _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
211
0
        while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) {
212
0
            std::shared_ptr<VDataStreamRecvr> recvr;
213
0
            WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
214
0
            if (recvr == nullptr) {
215
                // keep going but at least log it
216
0
                std::stringstream err;
217
0
                err << "cancel(): missing in stream_map: fragment=" << print_id(i->first)
218
0
                    << " node=" << i->second;
219
0
                LOG(ERROR) << err.str();
220
0
            } else {
221
0
                recvrs.push_back(recvr);
222
0
            }
223
0
            ++i;
224
0
        }
225
0
    }
226
227
    // cancel_stream maybe take a long time, so we handle it out of lock.
228
0
    for (auto& it : recvrs) {
229
0
        it->cancel_stream(exec_status);
230
0
    }
231
0
}
232
233
} // namespace doris