Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_buffer_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/result_buffer_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/types.pb.h>
22
#include <glog/logging.h>
23
24
#include <cstdint>
25
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <memory>
29
#include <ostream>
30
#include <utility>
31
32
#include "arrow/type_fwd.h"
33
#include "common/metrics/doris_metrics.h"
34
#include "common/metrics/metrics.h"
35
#include "common/status.h"
36
#include "exec/sink/writer/varrow_flight_result_writer.h"
37
#include "exec/sink/writer/vmysql_result_writer.h"
38
#include "runtime/result_block_buffer.h"
39
#include "util/thread.h"
40
#include "util/uid_util.h"
41
42
namespace doris {
43
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(result_buffer_block_count, MetricUnit::NOUNIT);
45
46
6
ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) {
47
    // Each ResultBlockBufferBase has a limited queue size of 1024, it's not needed to count the
48
    // actual size of all ResultBlockBufferBase.
49
6
    REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() {
50
        // std::lock_guard<std::mutex> l(_buffer_map_lock);
51
6
        return _buffer_map.size();
52
6
    });
53
6
}
54
55
0
void ResultBufferMgr::stop() {
56
0
    DEREGISTER_HOOK_METRIC(result_buffer_block_count);
57
0
    _stop_background_threads_latch.count_down();
58
0
    if (_clean_thread) {
59
0
        _clean_thread->join();
60
0
    }
61
0
}
62
63
0
Status ResultBufferMgr::init() {
64
0
    RETURN_IF_ERROR(Thread::create(
65
0
            "ResultBufferMgr", "cancel_timeout_result", [this]() { this->cancel_thread(); },
66
0
            &_clean_thread));
67
0
    return Status::OK();
68
0
}
69
70
Status ResultBufferMgr::create_sender(const TUniqueId& unique_id, int buffer_size,
71
                                      std::shared_ptr<ResultBlockBufferBase>* sender,
72
                                      RuntimeState* state, bool arrow_flight,
73
7
                                      std::shared_ptr<arrow::Schema> schema) {
74
7
    {
75
7
        std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
76
7
        auto iter = _buffer_map.find(unique_id);
77
78
7
        if (_buffer_map.end() != iter) {
79
2
            return Status::InternalError("ResultBlockBuffer already exist, id={}",
80
2
                                         print_id(unique_id));
81
2
        }
82
7
    }
83
84
5
    std::shared_ptr<ResultBlockBufferBase> control_block = nullptr;
85
86
5
    if (arrow_flight) {
87
1
        control_block = std::make_shared<ArrowFlightResultBlockBuffer>(unique_id, state, schema,
88
1
                                                                       buffer_size);
89
4
    } else {
90
4
        control_block = std::make_shared<MySQLResultBlockBuffer>(unique_id, state, buffer_size);
91
4
    }
92
93
5
    {
94
5
        std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
95
5
        _buffer_map.insert(std::make_pair(unique_id, control_block));
96
        // ResultBlockBufferBase should destroy after max_timeout
97
        // for exceed max_timeout FE will return timeout to client
98
        // otherwise in some case may block all fragment handle threads
99
        // details see issue https://github.com/apache/doris/issues/16203
100
        // add extra 5s for avoid corner case
101
5
        int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5;
102
5
        cancel_at_time(max_timeout, unique_id);
103
5
    }
104
5
    *sender = control_block;
105
5
    return Status::OK();
106
7
}
107
108
template <typename ResultBlockBufferType>
109
std::shared_ptr<ResultBlockBufferType> ResultBufferMgr::_find_control_block(
110
1
        const TUniqueId& unique_id) {
111
1
    std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
112
1
    auto iter = _buffer_map.find(unique_id);
113
114
1
    if (_buffer_map.end() != iter) {
115
1
        return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second);
116
1
    }
117
118
0
    return {};
119
1
}
Unexecuted instantiation: _ZN5doris15ResultBufferMgr19_find_control_blockINS_28ArrowFlightResultBlockBufferEEESt10shared_ptrIT_ERKNS_9TUniqueIdE
_ZN5doris15ResultBufferMgr19_find_control_blockINS_17ResultBlockBufferINS_17GetResultBatchCtxEEEEESt10shared_ptrIT_ERKNS_9TUniqueIdE
Line
Count
Source
110
1
        const TUniqueId& unique_id) {
111
1
    std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
112
1
    auto iter = _buffer_map.find(unique_id);
113
114
1
    if (_buffer_map.end() != iter) {
115
1
        return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second);
116
1
    }
117
118
0
    return {};
119
1
}
120
121
template <typename ResultBlockBufferType>
122
Status ResultBufferMgr::find_buffer(const TUniqueId& finst_id,
123
1
                                    std::shared_ptr<ResultBlockBufferType>& buffer) {
124
1
    buffer = _find_control_block<ResultBlockBufferType>(finst_id);
125
1
    return buffer == nullptr ? Status::InternalError(
126
0
                                       "no arrow schema for this query, maybe query has been "
127
0
                                       "canceled, finst_id={}",
128
0
                                       print_id(finst_id))
129
1
                             : Status::OK();
130
1
}
Unexecuted instantiation: _ZN5doris15ResultBufferMgr11find_bufferINS_28ArrowFlightResultBlockBufferEEENS_6StatusERKNS_9TUniqueIdERSt10shared_ptrIT_E
_ZN5doris15ResultBufferMgr11find_bufferINS_17ResultBlockBufferINS_17GetResultBatchCtxEEEEENS_6StatusERKNS_9TUniqueIdERSt10shared_ptrIT_E
Line
Count
Source
123
1
                                    std::shared_ptr<ResultBlockBufferType>& buffer) {
124
1
    buffer = _find_control_block<ResultBlockBufferType>(finst_id);
125
1
    return buffer == nullptr ? Status::InternalError(
126
0
                                       "no arrow schema for this query, maybe query has been "
127
0
                                       "canceled, finst_id={}",
128
0
                                       print_id(finst_id))
129
1
                             : Status::OK();
130
1
}
131
132
2
bool ResultBufferMgr::cancel(const TUniqueId& unique_id, const Status& reason) {
133
2
    std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
134
2
    auto iter = _buffer_map.find(unique_id);
135
136
2
    auto exist = _buffer_map.end() != iter;
137
2
    if (exist) {
138
1
        iter->second->cancel(reason);
139
1
        _buffer_map.erase(iter);
140
1
    }
141
2
    return exist;
142
2
}
143
144
5
void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& unique_id) {
145
5
    std::lock_guard<std::mutex> l(_timeout_lock);
146
5
    auto iter = _timeout_map.find(cancel_time);
147
148
5
    if (_timeout_map.end() == iter) {
149
5
        _timeout_map.insert(
150
5
                std::pair<time_t, std::vector<TUniqueId>>(cancel_time, std::vector<TUniqueId>()));
151
5
        iter = _timeout_map.find(cancel_time);
152
5
    }
153
154
5
    iter->second.push_back(unique_id);
155
5
}
156
157
0
void ResultBufferMgr::cancel_thread() {
158
0
    LOG(INFO) << "result buffer manager cancel thread begin.";
159
160
0
    do {
161
        // get query
162
0
        std::vector<TUniqueId> query_to_cancel;
163
0
        time_t now_time = time(nullptr);
164
0
        {
165
0
            std::lock_guard<std::mutex> l(_timeout_lock);
166
0
            auto end = _timeout_map.upper_bound(now_time + 1);
167
168
0
            for (auto iter = _timeout_map.begin(); iter != end; ++iter) {
169
0
                for (const auto& id : iter->second) {
170
0
                    query_to_cancel.push_back(id);
171
0
                }
172
0
            }
173
174
0
            _timeout_map.erase(_timeout_map.begin(), end);
175
0
        }
176
177
        // cancel query
178
0
        for (const auto& id : query_to_cancel) {
179
0
            cancel(id, Status::Cancelled("Clean up expired ResultBlockBuffer, queryId: {}",
180
0
                                         print_id(id)));
181
0
        }
182
0
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
183
184
    LOG(INFO) << "result buffer manager cancel thread finish.";
185
0
}
186
187
template Status ResultBufferMgr::find_buffer(
188
        const TUniqueId& finst_id, std::shared_ptr<doris::ArrowFlightResultBlockBuffer>& buffer);
189
template Status ResultBufferMgr::find_buffer(
190
        const TUniqueId& finst_id,
191
        std::shared_ptr<doris::ResultBlockBuffer<doris::GetResultBatchCtx>>& buffer);
192
193
} // namespace doris