Coverage Report

Created: 2026-03-13 09:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/exchange/vdata_stream_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/exchange/vdata_stream_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/data.pb.h>
22
#include <gen_cpp/internal_service.pb.h>
23
#include <gen_cpp/types.pb.h>
24
#include <stddef.h>
25
26
#include <memory>
27
#include <ostream>
28
#include <string>
29
#include <vector>
30
31
#include "common/logging.h"
32
#include "exec/exchange/vdata_stream_recvr.h"
33
#include "util/hash_util.hpp"
34
35
namespace doris {
36
#include "common/compile_check_begin.h"
37
38
11
VDataStreamMgr::VDataStreamMgr() {
39
    // TODO: metric
40
11
}
41
42
7
VDataStreamMgr::~VDataStreamMgr() {
43
    // Has to call close here, because receiver will check if the receiver is closed.
44
    // It will core during graceful stop.
45
7
    auto receivers = std::vector<std::shared_ptr<VDataStreamRecvr>>();
46
7
    {
47
7
        std::shared_lock l(_lock);
48
7
        auto receiver_iterator = _receiver_map.begin();
49
7
        while (receiver_iterator != _receiver_map.end()) {
50
            // Could not call close directly, because during close method, it will remove itself
51
            // from the map, and modify the map, it will core.
52
0
            receivers.push_back(receiver_iterator->second);
53
0
            receiver_iterator++;
54
0
        }
55
7
    }
56
7
    for (auto iter = receivers.begin(); iter != receivers.end(); ++iter) {
57
0
        (*iter)->close();
58
0
    }
59
7
}
60
61
inline uint32_t VDataStreamMgr::get_hash_value(const TUniqueId& fragment_instance_id,
62
6.47M
                                               PlanNodeId node_id) {
63
6.47M
    uint32_t value = HashUtil::hash(&fragment_instance_id.lo, 8, 0);
64
6.47M
    value = HashUtil::hash(&fragment_instance_id.hi, 8, value);
65
6.47M
    value = HashUtil::hash(&node_id, 4, value);
66
6.47M
    return value;
67
6.47M
}
68
69
std::shared_ptr<VDataStreamRecvr> VDataStreamMgr::create_recvr(
70
        RuntimeState* state, RuntimeProfile::HighWaterMarkCounter* memory_used_counter,
71
        const TUniqueId& fragment_instance_id, PlanNodeId dest_node_id, int num_senders,
72
491k
        RuntimeProfile* profile, bool is_merging, size_t data_queue_capacity) {
73
491k
    DCHECK(profile != nullptr);
74
491k
    VLOG_FILE << "creating receiver for fragment=" << print_id(fragment_instance_id)
75
552
              << ", node=" << dest_node_id;
76
491k
    std::shared_ptr<VDataStreamRecvr> recvr(new VDataStreamRecvr(
77
491k
            this, memory_used_counter, state, fragment_instance_id, dest_node_id, num_senders,
78
491k
            is_merging, profile, data_queue_capacity));
79
491k
    uint32_t hash_value = get_hash_value(fragment_instance_id, dest_node_id);
80
491k
    std::unique_lock l(_lock);
81
491k
    _fragment_stream_set.insert(std::make_pair(fragment_instance_id, dest_node_id));
82
491k
    _receiver_map.insert(std::make_pair(hash_value, recvr));
83
491k
    return recvr;
84
491k
}
85
86
Status VDataStreamMgr::find_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id,
87
5.49M
                                  std::shared_ptr<VDataStreamRecvr>* res, bool acquire_lock) {
88
18.4E
    VLOG_ROW << "looking up fragment_instance_id=" << print_id(fragment_instance_id)
89
18.4E
             << ", node=" << node_id;
90
5.49M
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
91
    // Create lock guard and not own lock currently and will lock conditionally
92
5.49M
    std::shared_lock recvr_lock(_lock, std::defer_lock);
93
5.52M
    if (acquire_lock) {
94
5.52M
        recvr_lock.lock();
95
5.52M
    }
96
5.49M
    std::pair<StreamMap::iterator, StreamMap::iterator> range =
97
5.49M
            _receiver_map.equal_range(hash_value);
98
5.54M
    while (range.first != range.second) {
99
5.54M
        auto recvr = range.first->second;
100
5.54M
        if (recvr->fragment_instance_id() == fragment_instance_id &&
101
5.56M
            recvr->dest_node_id() == node_id) {
102
5.56M
            *res = recvr;
103
5.56M
            return Status::OK();
104
5.56M
        }
105
18.4E
        ++range.first;
106
18.4E
    }
107
18.4E
    return Status::InvalidArgument("Could not find local receiver for node {} with instance {}",
108
18.4E
                                   node_id, print_id(fragment_instance_id));
109
5.49M
}
110
111
Status VDataStreamMgr::transmit_block(const PTransmitDataParams* request,
112
                                      ::google::protobuf::Closure** done,
113
2.73M
                                      const int64_t wait_for_worker) {
114
2.73M
    const PUniqueId& finst_id = request->finst_id();
115
2.73M
    TUniqueId t_finst_id;
116
2.73M
    t_finst_id.hi = finst_id.hi();
117
2.73M
    t_finst_id.lo = finst_id.lo();
118
2.73M
    std::shared_ptr<VDataStreamRecvr> recvr = nullptr;
119
2.73M
    ThreadCpuStopWatch cpu_time_stop_watch;
120
2.73M
    cpu_time_stop_watch.start();
121
2.73M
    static_cast<void>(find_recvr(t_finst_id, request->node_id(), &recvr));
122
2.73M
    if (recvr == nullptr) {
123
        // The receiver may remove itself from the receiver map via deregister_recvr()
124
        // at any time without considering the remaining number of senders.
125
        // As a consequence, find_recvr() may return an innocuous NULL if a thread
126
        // calling deregister_recvr() beat the thread calling find_recvr()
127
        // in acquiring _lock.
128
        //
129
        // e.g. for broadcast join build side, only one instance will build the hash table,
130
        // all other instances don't need build side data and will close the data stream receiver.
131
        //
132
        // TODO: Rethink the lifecycle of DataStreamRecvr to distinguish
133
        // errors from receiver-initiated teardowns.
134
15.1k
        return Status::EndOfFile("data stream receiver closed");
135
15.1k
    }
136
137
    // Lock the fragment context to ensure the runtime state and other objects are not
138
    // deconstructed
139
2.71M
    auto ctx_lock = recvr->task_exec_ctx();
140
2.71M
    if (ctx_lock == nullptr) {
141
        // Do not return internal error, because when query finished, the downstream node
142
        // may finish before upstream node. And the object maybe deconstructed. If return error
143
        // then the upstream node may report error status to FE, the query is failed.
144
0
        return Status::EndOfFile("data stream receiver is deconstructed");
145
0
    }
146
147
2.71M
    bool eos = request->eos();
148
2.71M
    Status exec_status =
149
2.71M
            request->has_exec_status() ? Status::create(request->exec_status()) : Status::OK();
150
151
2.71M
    auto sender_id = request->sender_id();
152
2.71M
    auto be_number = request->be_number();
153
2.71M
    if (!request->blocks().empty()) {
154
88.7k
        RETURN_IF_ERROR(recvr->add_blocks(request, done, wait_for_worker,
155
88.7k
                                          cpu_time_stop_watch.elapsed_time()));
156
2.62M
    } else if (request->has_block()) {
157
        // old logic, for compatibility
158
0
        std::unique_ptr<PBlock> pblock_ptr = std::make_unique<PBlock>();
159
0
        pblock_ptr->CopyFrom(request->block());
160
0
        RETURN_IF_ERROR(recvr->add_block(std::move(pblock_ptr), sender_id, be_number,
161
0
                                         request->packet_seq(), eos ? nullptr : done,
162
0
                                         wait_for_worker, cpu_time_stop_watch.elapsed_time()));
163
0
    }
164
165
2.71M
    if (eos) {
166
2.69M
        recvr->remove_sender(sender_id, be_number, exec_status);
167
2.69M
    }
168
2.71M
    return Status::OK();
169
2.71M
}
170
171
494k
Status VDataStreamMgr::deregister_recvr(const TUniqueId& fragment_instance_id, PlanNodeId node_id) {
172
494k
    std::shared_ptr<VDataStreamRecvr> targert_recvr;
173
18.4E
    VLOG_QUERY << "deregister_recvr(): fragment_instance_id=" << print_id(fragment_instance_id)
174
18.4E
               << ", node=" << node_id;
175
494k
    uint32_t hash_value = get_hash_value(fragment_instance_id, node_id);
176
494k
    {
177
494k
        std::unique_lock l(_lock);
178
494k
        auto range = _receiver_map.equal_range(hash_value);
179
495k
        while (range.first != range.second) {
180
495k
            const std::shared_ptr<VDataStreamRecvr>& recvr = range.first->second;
181
495k
            if (recvr->fragment_instance_id() == fragment_instance_id &&
182
495k
                recvr->dest_node_id() == node_id) {
183
495k
                targert_recvr = recvr;
184
495k
                _fragment_stream_set.erase(
185
495k
                        std::make_pair(recvr->fragment_instance_id(), recvr->dest_node_id()));
186
495k
                _receiver_map.erase(range.first);
187
495k
                break;
188
495k
            }
189
0
            ++range.first;
190
0
        }
191
494k
    }
192
193
    // Notify concurrent add_data() requests that the stream has been terminated.
194
    // cancel_stream maybe take a long time, so we handle it out of lock.
195
495k
    if (targert_recvr) {
196
495k
        targert_recvr->cancel_stream(Status::OK());
197
495k
        return Status::OK();
198
18.4E
    } else {
199
18.4E
        return Status::InternalError("unknown row receiver id: fragment_instance_id={}, node_id={}",
200
18.4E
                                     print_id(fragment_instance_id), node_id);
201
18.4E
    }
202
494k
}
203
204
0
void VDataStreamMgr::cancel(const TUniqueId& fragment_instance_id, Status exec_status) {
205
0
    VLOG_QUERY << "cancelling all streams for fragment=" << print_id(fragment_instance_id);
206
0
    std::vector<std::shared_ptr<VDataStreamRecvr>> recvrs;
207
0
    {
208
0
        std::shared_lock l(_lock);
209
0
        FragmentStreamSet::iterator i =
210
0
                _fragment_stream_set.lower_bound(std::make_pair(fragment_instance_id, 0));
211
0
        while (i != _fragment_stream_set.end() && i->first == fragment_instance_id) {
212
0
            std::shared_ptr<VDataStreamRecvr> recvr;
213
0
            WARN_IF_ERROR(find_recvr(i->first, i->second, &recvr, false), "");
214
0
            if (recvr == nullptr) {
215
                // keep going but at least log it
216
0
                std::stringstream err;
217
0
                err << "cancel(): missing in stream_map: fragment=" << print_id(i->first)
218
0
                    << " node=" << i->second;
219
0
                LOG(ERROR) << err.str();
220
0
            } else {
221
0
                recvrs.push_back(recvr);
222
0
            }
223
0
            ++i;
224
0
        }
225
0
    }
226
227
    // cancel_stream maybe take a long time, so we handle it out of lock.
228
0
    for (auto& it : recvrs) {
229
0
        it->cancel_stream(exec_status);
230
0
    }
231
0
}
232
233
} // namespace doris