be/src/runtime/result_buffer_mgr.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_buffer_mgr.h" |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <gen_cpp/types.pb.h> |
22 | | #include <glog/logging.h> |
23 | | |
24 | | #include <cstdint> |
25 | | |
26 | | // IWYU pragma: no_include <bits/chrono.h> |
27 | | #include <chrono> // IWYU pragma: keep |
28 | | #include <memory> |
29 | | #include <ostream> |
30 | | #include <utility> |
31 | | |
32 | | #include "arrow/type_fwd.h" |
33 | | #include "common/metrics/doris_metrics.h" |
34 | | #include "common/metrics/metrics.h" |
35 | | #include "common/status.h" |
36 | | #include "exec/sink/writer/varrow_flight_result_writer.h" |
37 | | #include "exec/sink/writer/vmysql_result_writer.h" |
38 | | #include "runtime/result_block_buffer.h" |
39 | | #include "util/thread.h" |
40 | | #include "util/uid_util.h" |
41 | | |
42 | | namespace doris { |
43 | | |
44 | | DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(result_buffer_block_count, MetricUnit::NOUNIT); |
45 | | |
46 | 6 | ResultBufferMgr::ResultBufferMgr() : _stop_background_threads_latch(1) { |
47 | | // Each ResultBlockBufferBase has a limited queue size of 1024, it's not needed to count the |
48 | | // actual size of all ResultBlockBufferBase. |
49 | 6 | REGISTER_HOOK_METRIC(result_buffer_block_count, [this]() { |
50 | | // std::lock_guard<std::mutex> l(_buffer_map_lock); |
51 | 6 | return _buffer_map.size(); |
52 | 6 | }); |
53 | 6 | } |
54 | | |
55 | 0 | void ResultBufferMgr::stop() { |
56 | 0 | DEREGISTER_HOOK_METRIC(result_buffer_block_count); |
57 | 0 | _stop_background_threads_latch.count_down(); |
58 | 0 | if (_clean_thread) { |
59 | 0 | _clean_thread->join(); |
60 | 0 | } |
61 | 0 | } |
62 | | |
63 | 0 | Status ResultBufferMgr::init() { |
64 | 0 | RETURN_IF_ERROR(Thread::create( |
65 | 0 | "ResultBufferMgr", "cancel_timeout_result", [this]() { this->cancel_thread(); }, |
66 | 0 | &_clean_thread)); |
67 | 0 | return Status::OK(); |
68 | 0 | } |
69 | | |
70 | | Status ResultBufferMgr::create_sender(const TUniqueId& unique_id, int buffer_size, |
71 | | std::shared_ptr<ResultBlockBufferBase>* sender, |
72 | | RuntimeState* state, bool arrow_flight, |
73 | 7 | std::shared_ptr<arrow::Schema> schema) { |
74 | 7 | { |
75 | 7 | std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock); |
76 | 7 | auto iter = _buffer_map.find(unique_id); |
77 | | |
78 | 7 | if (_buffer_map.end() != iter) { |
79 | 2 | return Status::InternalError("ResultBlockBuffer already exist, id={}", |
80 | 2 | print_id(unique_id)); |
81 | 2 | } |
82 | 7 | } |
83 | | |
84 | 5 | std::shared_ptr<ResultBlockBufferBase> control_block = nullptr; |
85 | | |
86 | 5 | if (arrow_flight) { |
87 | 1 | control_block = std::make_shared<ArrowFlightResultBlockBuffer>(unique_id, state, schema, |
88 | 1 | buffer_size); |
89 | 4 | } else { |
90 | 4 | control_block = std::make_shared<MySQLResultBlockBuffer>(unique_id, state, buffer_size); |
91 | 4 | } |
92 | | |
93 | 5 | { |
94 | 5 | std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); |
95 | 5 | _buffer_map.insert(std::make_pair(unique_id, control_block)); |
96 | | // ResultBlockBufferBase should destroy after max_timeout |
97 | | // for exceed max_timeout FE will return timeout to client |
98 | | // otherwise in some case may block all fragment handle threads |
99 | | // details see issue https://github.com/apache/doris/issues/16203 |
100 | | // add extra 5s for avoid corner case |
101 | 5 | int64_t max_timeout = time(nullptr) + state->execution_timeout() + 5; |
102 | 5 | cancel_at_time(max_timeout, unique_id); |
103 | 5 | } |
104 | 5 | *sender = control_block; |
105 | 5 | return Status::OK(); |
106 | 7 | } |
107 | | |
108 | | template <typename ResultBlockBufferType> |
109 | | std::shared_ptr<ResultBlockBufferType> ResultBufferMgr::_find_control_block( |
110 | 1 | const TUniqueId& unique_id) { |
111 | 1 | std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock); |
112 | 1 | auto iter = _buffer_map.find(unique_id); |
113 | | |
114 | 1 | if (_buffer_map.end() != iter) { |
115 | 1 | return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second); |
116 | 1 | } |
117 | | |
118 | 0 | return {}; |
119 | 1 | } Unexecuted instantiation: _ZN5doris15ResultBufferMgr19_find_control_blockINS_28ArrowFlightResultBlockBufferEEESt10shared_ptrIT_ERKNS_9TUniqueIdE _ZN5doris15ResultBufferMgr19_find_control_blockINS_17ResultBlockBufferINS_17GetResultBatchCtxEEEEESt10shared_ptrIT_ERKNS_9TUniqueIdE Line | Count | Source | 110 | 1 | const TUniqueId& unique_id) { | 111 | 1 | std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock); | 112 | 1 | auto iter = _buffer_map.find(unique_id); | 113 | | | 114 | 1 | if (_buffer_map.end() != iter) { | 115 | 1 | return std::dynamic_pointer_cast<ResultBlockBufferType>(iter->second); | 116 | 1 | } | 117 | | | 118 | 0 | return {}; | 119 | 1 | } |
|
120 | | |
121 | | template <typename ResultBlockBufferType> |
122 | | Status ResultBufferMgr::find_buffer(const TUniqueId& finst_id, |
123 | 1 | std::shared_ptr<ResultBlockBufferType>& buffer) { |
124 | 1 | buffer = _find_control_block<ResultBlockBufferType>(finst_id); |
125 | 1 | return buffer == nullptr ? Status::InternalError( |
126 | 0 | "no arrow schema for this query, maybe query has been " |
127 | 0 | "canceled, finst_id={}", |
128 | 0 | print_id(finst_id)) |
129 | 1 | : Status::OK(); |
130 | 1 | } Unexecuted instantiation: _ZN5doris15ResultBufferMgr11find_bufferINS_28ArrowFlightResultBlockBufferEEENS_6StatusERKNS_9TUniqueIdERSt10shared_ptrIT_E _ZN5doris15ResultBufferMgr11find_bufferINS_17ResultBlockBufferINS_17GetResultBatchCtxEEEEENS_6StatusERKNS_9TUniqueIdERSt10shared_ptrIT_E Line | Count | Source | 123 | 1 | std::shared_ptr<ResultBlockBufferType>& buffer) { | 124 | 1 | buffer = _find_control_block<ResultBlockBufferType>(finst_id); | 125 | 1 | return buffer == nullptr ? Status::InternalError( | 126 | 0 | "no arrow schema for this query, maybe query has been " | 127 | 0 | "canceled, finst_id={}", | 128 | 0 | print_id(finst_id)) | 129 | 1 | : Status::OK(); | 130 | 1 | } |
|
131 | | |
132 | 2 | bool ResultBufferMgr::cancel(const TUniqueId& unique_id, const Status& reason) { |
133 | 2 | std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock); |
134 | 2 | auto iter = _buffer_map.find(unique_id); |
135 | | |
136 | 2 | auto exist = _buffer_map.end() != iter; |
137 | 2 | if (exist) { |
138 | 1 | iter->second->cancel(reason); |
139 | 1 | _buffer_map.erase(iter); |
140 | 1 | } |
141 | 2 | return exist; |
142 | 2 | } |
143 | | |
144 | 5 | void ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& unique_id) { |
145 | 5 | std::lock_guard<std::mutex> l(_timeout_lock); |
146 | 5 | auto iter = _timeout_map.find(cancel_time); |
147 | | |
148 | 5 | if (_timeout_map.end() == iter) { |
149 | 5 | _timeout_map.insert( |
150 | 5 | std::pair<time_t, std::vector<TUniqueId>>(cancel_time, std::vector<TUniqueId>())); |
151 | 5 | iter = _timeout_map.find(cancel_time); |
152 | 5 | } |
153 | | |
154 | 5 | iter->second.push_back(unique_id); |
155 | 5 | } |
156 | | |
157 | 0 | void ResultBufferMgr::cancel_thread() { |
158 | 0 | LOG(INFO) << "result buffer manager cancel thread begin."; |
159 | |
|
160 | 0 | do { |
161 | | // get query |
162 | 0 | std::vector<TUniqueId> query_to_cancel; |
163 | 0 | time_t now_time = time(nullptr); |
164 | 0 | { |
165 | 0 | std::lock_guard<std::mutex> l(_timeout_lock); |
166 | 0 | auto end = _timeout_map.upper_bound(now_time + 1); |
167 | |
|
168 | 0 | for (auto iter = _timeout_map.begin(); iter != end; ++iter) { |
169 | 0 | for (const auto& id : iter->second) { |
170 | 0 | query_to_cancel.push_back(id); |
171 | 0 | } |
172 | 0 | } |
173 | |
|
174 | 0 | _timeout_map.erase(_timeout_map.begin(), end); |
175 | 0 | } |
176 | | |
177 | | // cancel query |
178 | 0 | for (const auto& id : query_to_cancel) { |
179 | 0 | cancel(id, Status::Cancelled("Clean up expired ResultBlockBuffer, queryId: {}", |
180 | 0 | print_id(id))); |
181 | 0 | } |
182 | 0 | } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); |
183 | |
|
184 | | LOG(INFO) << "result buffer manager cancel thread finish."; |
185 | 0 | } |
186 | | |
187 | | template Status ResultBufferMgr::find_buffer( |
188 | | const TUniqueId& finst_id, std::shared_ptr<doris::ArrowFlightResultBlockBuffer>& buffer); |
189 | | template Status ResultBufferMgr::find_buffer( |
190 | | const TUniqueId& finst_id, |
191 | | std::shared_ptr<doris::ResultBlockBuffer<doris::GetResultBatchCtx>>& buffer); |
192 | | |
193 | | } // namespace doris |