Coverage Report

Created: 2026-03-23 10:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/result_buffer_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/result_buffer_mgr.h"
19
20
#include <gen_cpp/Types_types.h>
21
#include <gen_cpp/types.pb.h>
22
#include <glog/logging.h>
23
24
#include <cstdint>
25
26
// IWYU pragma: no_include <bits/chrono.h>
27
#include <chrono> // IWYU pragma: keep
28
#include <memory>
29
#include <ostream>
30
#include <utility>
31
32
#include "arrow/type_fwd.h"
33
#include "common/metrics/doris_metrics.h"
34
#include "common/metrics/metrics.h"
35
#include "common/status.h"
36
#include "exec/sink/writer/varrow_flight_result_writer.h"
37
#include "exec/sink/writer/vmysql_result_writer.h"
38
#include "runtime/result_block_buffer.h"
39
#include "util/thread.h"
40
#include "util/uid_util.h"
41
42
namespace doris {
43
44
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(result_buffer_block_count, MetricUnit::NOUNIT);
45
46
11
ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) {
47
    // Each ResultBlockBufferBase has a limited queue size of 1024, it's not needed to count the
48
    // actual size of all ResultBlockBufferBase.
49
11
    REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() {
50
        // std::lock_guard<std::mutex> l(_buffer_map_lock);
51
11
        return _buffer_map.size();
52
11
    });
53
11
}
54
55
2
void ResultBufferMgr::stop() {
56
2
    DEREGISTER_HOOK_METRIC(result_buffer_block_count);
57
2
    _stop_background_threads_latch.count_down();
58
2
    if (_clean_thread) {
59
2
        _clean_thread->join();
60
2
    }
61
2
}
62
63
5
Status ResultBufferMgr::init() {
64
5
    RETURN_IF_ERROR(Thread::create(
65
5
            "ResultBufferMgr", "cancel_timeout_result", [this]() { this->cancel_thread(); },
66
5
            &_clean_thread));
67
5
    return Status::OK();
68
5
}
69
70
Status ResultBufferMgr::create_sender(const TUniqueId& unique_id, int buffer_size,
71
                                      std::shared_ptr<ResultBlockBufferBase>* sender,
72
                                      RuntimeState* state, bool arrow_flight,
73
189k
                                      std::shared_ptr<arrow::Schema> schema) {
74
189k
    {
75
189k
        std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
76
189k
        auto iter = _buffer_map.find(unique_id);
77
78
189k
        if (_buffer_map.end() != iter) {
79
2
            return Status::InternalError("ResultBlockBuffer already exist, id={}",
80
2
                                         print_id(unique_id));
81
2
        }
82
189k
    }
83
84
189k
    std::shared_ptr<ResultBlockBufferBase> control_block = nullptr;
85
86
189k
    if (arrow_flight) {
87
4
        control_block = std::make_shared<ArrowFlightResultBlockBuffer>(unique_id, state, schema,
88
4
                                                                       buffer_size);
89
189k
    } else {
90
189k
        control_block = std::make_shared<MySQLResultBlockBuffer>(unique_id, state, buffer_size);
91
189k
    }
92
93
189k
    {
94
189k
        std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
95
189k
        _buffer_map.insert(std::make_pair(unique_id, control_block));
96
        // ResultBlockBufferBase should destroy after max_timeout
97
        // for exceed max_timeout FE will return timeout to client
98
        // otherwise in some case may block all fragment handle threads
99
        // details see issue https://github.com/apache/doris/issues/16203
100
        // add extra 5s for avoid corner case
101
189k
        int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5;
102
189k
        cancel_at_time(max_timeout, unique_id);
103
189k
    }
104
189k
    *sender = control_block;
105
189k
    return Status::OK();
106
189k
}
107
108
template <typename ResultBlockBufferType>
109
std::shared_ptr<ResultBlockBufferType> ResultBufferMgr::_find_control_block(
110
315k
        const TUniqueId& unique_id) {
111
315k
    std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
112
315k
    auto iter = _buffer_map.find(unique_id);
113
114
315k
    if (_buffer_map.end() != iter) {
115
315k
        return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second);
116
315k
    }
117
118
18.4E
    return {};
119
315k
}
_ZN5doris15ResultBufferMgr19_find_control_blockINS_28ArrowFlightResultBlockBufferEEESt10shared_ptrIT_ERKNS_9TUniqueIdE
Line
Count
Source
110
14
        const TUniqueId& unique_id) {
111
14
    std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
112
14
    auto iter = _buffer_map.find(unique_id);
113
114
14
    if (_buffer_map.end() != iter) {
115
14
        return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second);
116
14
    }
117
118
0
    return {};
119
14
}
_ZN5doris15ResultBufferMgr19_find_control_blockINS_17ResultBlockBufferINS_17GetResultBatchCtxEEEEESt10shared_ptrIT_ERKNS_9TUniqueIdE
Line
Count
Source
110
315k
        const TUniqueId& unique_id) {
111
315k
    std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
112
315k
    auto iter = _buffer_map.find(unique_id);
113
114
315k
    if (_buffer_map.end() != iter) {
115
315k
        return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second);
116
315k
    }
117
118
18.4E
    return {};
119
315k
}
120
121
template <typename ResultBlockBufferType>
122
Status ResultBufferMgr::find_buffer(const TUniqueId& finst_id,
123
315k
                                    std::shared_ptr<ResultBlockBufferType>& buffer) {
124
315k
    buffer = _find_control_block<ResultBlockBufferType>(finst_id);
125
315k
    return buffer == nullptr ? Status::InternalError(
126
0
                                       "no arrow schema for this query, maybe query has been "
127
0
                                       "canceled, finst_id={}",
128
0
                                       print_id(finst_id))
129
315k
                             : Status::OK();
130
315k
}
_ZN5doris15ResultBufferMgr11find_bufferINS_28ArrowFlightResultBlockBufferEEENS_6StatusERKNS_9TUniqueIdERSt10shared_ptrIT_E
Line
Count
Source
123
14
                                    std::shared_ptr<ResultBlockBufferType>& buffer) {
124
14
    buffer = _find_control_block<ResultBlockBufferType>(finst_id);
125
14
    return buffer == nullptr ? Status::InternalError(
126
0
                                       "no arrow schema for this query, maybe query has been "
127
0
                                       "canceled, finst_id={}",
128
0
                                       print_id(finst_id))
129
14
                             : Status::OK();
130
14
}
_ZN5doris15ResultBufferMgr11find_bufferINS_17ResultBlockBufferINS_17GetResultBatchCtxEEEEENS_6StatusERKNS_9TUniqueIdERSt10shared_ptrIT_E
Line
Count
Source
123
315k
                                    std::shared_ptr<ResultBlockBufferType>& buffer) {
124
315k
    buffer = _find_control_block<ResultBlockBufferType>(finst_id);
125
315k
    return buffer == nullptr ? Status::InternalError(
126
0
                                       "no arrow schema for this query, maybe query has been "
127
0
                                       "canceled, finst_id={}",
128
0
                                       print_id(finst_id))
129
315k
                             : Status::OK();
130
315k
}
131
132
233k
bool ResultBufferMgr::cancel(const TUniqueId& unique_id, const Status& reason) {
133
233k
    std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
134
233k
    auto iter = _buffer_map.find(unique_id);
135
136
233k
    auto exist = _buffer_map.end() != iter;
137
233k
    if (exist) {
138
72.9k
        iter->second->cancel(reason);
139
72.9k
        _buffer_map.erase(iter);
140
72.9k
    }
141
233k
    return exist;
142
233k
}
143
144
487k
void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& unique_id) {
145
487k
    std::lock_guard<std::mutex> l(_timeout_lock);
146
487k
    auto iter = _timeout_map.find(cancel_time);
147
148
487k
    if (_timeout_map.end() == iter) {
149
8.25k
        _timeout_map.insert(
150
8.25k
                std::pair<time_t, std::vector<TUniqueId>>(cancel_time, std::vector<TUniqueId>()));
151
8.25k
        iter = _timeout_map.find(cancel_time);
152
8.25k
    }
153
154
487k
    iter->second.push_back(unique_id);
155
487k
}
156
157
5
void ResultBufferMgr::cancel_thread() {
158
5
    LOG(INFO) << "result buffer manager cancel thread begin.";
159
160
6.31k
    do {
161
        // get query
162
6.31k
        std::vector<TUniqueId> query_to_cancel;
163
6.31k
        time_t now_time = time(nullptr);
164
6.31k
        {
165
6.31k
            std::lock_guard<std::mutex> l(_timeout_lock);
166
6.31k
            auto end = _timeout_map.upper_bound(now_time + 1);
167
168
10.5k
            for (auto iter = _timeout_map.begin(); iter != end; ++iter) {
169
233k
                for (const auto& id : iter->second) {
170
233k
                    query_to_cancel.push_back(id);
171
233k
                }
172
4.20k
            }
173
174
6.31k
            _timeout_map.erase(_timeout_map.begin(), end);
175
6.31k
        }
176
177
        // cancel query
178
233k
        for (const auto& id : query_to_cancel) {
179
233k
            cancel(id, Status::Cancelled("Clean up expired ResultBlockBuffer, queryId: {}",
180
233k
                                         print_id(id)));
181
233k
        }
182
6.31k
    } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
183
184
    LOG(INFO) << "result buffer manager cancel thread finish.";
185
5
}
186
187
template Status ResultBufferMgr::find_buffer(
188
        const TUniqueId& finst_id, std::shared_ptr<doris::ArrowFlightResultBlockBuffer>& buffer);
189
template Status ResultBufferMgr::find_buffer(
190
        const TUniqueId& finst_id,
191
        std::shared_ptr<doris::ResultBlockBuffer<doris::GetResultBatchCtx>>& buffer);
192
193
} // namespace doris