be/src/runtime/result_block_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_block_buffer.h" |
19 | | |
20 | | #include <gen_cpp/Data_types.h> |
21 | | #include <gen_cpp/PaloInternalService_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <glog/logging.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <chrono> // IWYU pragma: keep |
27 | | #include <limits> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "arrow/type_fwd.h" |
34 | | #include "common/config.h" |
35 | | #include "core/block/block.h" |
36 | | #include "exec/pipeline/dependency.h" |
37 | | #include "exec/sink/writer/varrow_flight_result_writer.h" |
38 | | #include "exec/sink/writer/vmysql_result_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/thread_context.h" |
41 | | #include "util/thrift_util.h" |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | template <typename ResultCtxType> |
46 | | ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state, |
47 | | int buffer_size) |
48 | 250k | : _fragment_id(std::move(id)), |
49 | 250k | _is_close(false), |
50 | 250k | _batch_size(state->batch_size()), |
51 | 250k | _timezone(state->timezone()), |
52 | 250k | _be_exec_version(state->be_exec_version()), |
53 | 250k | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), |
54 | 250k | _buffer_limit(buffer_size) { |
55 | 250k | _mem_tracker = MemTrackerLimiter::create_shared( |
56 | 250k | MemTrackerLimiter::Type::QUERY, |
57 | 250k | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); |
58 | 250k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 48 | 106 | : _fragment_id(std::move(id)), | 49 | 106 | _is_close(false), | 50 | 106 | _batch_size(state->batch_size()), | 51 | 106 | _timezone(state->timezone()), | 52 | 106 | _be_exec_version(state->be_exec_version()), | 53 | 106 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 54 | 106 | _buffer_limit(buffer_size) { | 55 | 106 | _mem_tracker = MemTrackerLimiter::create_shared( | 56 | 106 | MemTrackerLimiter::Type::QUERY, | 57 | 106 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 58 | 106 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 48 | 250k | : _fragment_id(std::move(id)), | 49 | 250k | _is_close(false), | 50 | 250k | _batch_size(state->batch_size()), | 51 | 250k | _timezone(state->timezone()), | 52 | 250k | _be_exec_version(state->be_exec_version()), | 53 | 250k | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 54 | 250k | _buffer_limit(buffer_size) { | 55 | 250k | _mem_tracker = MemTrackerLimiter::create_shared( | 56 | 250k | MemTrackerLimiter::Type::QUERY, | 57 | 250k | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 58 | 250k | } |
|
59 | | |
60 | | template <typename ResultCtxType> |
61 | | Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status, |
62 | 427k | int64_t num_rows, bool& is_fully_closed) { |
63 | 427k | std::unique_lock<std::mutex> l(_lock); |
64 | 427k | _returned_rows.fetch_add(num_rows); |
65 | | // close will be called multiple times and error status needs to be collected. |
66 | 427k | if (!exec_status.ok()) { |
67 | 1.31k | _status = exec_status; |
68 | 1.31k | } |
69 | | |
70 | 427k | auto it = _result_sink_dependencies.find(id); |
71 | 427k | if (it != _result_sink_dependencies.end()) { |
72 | 427k | it->second->set_always_ready(); |
73 | 427k | _result_sink_dependencies.erase(it); |
74 | 18.4E | } else { |
75 | 18.4E | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", |
76 | 18.4E | print_id(id)); |
77 | 18.4E | } |
78 | 427k | if (!_result_sink_dependencies.empty()) { |
79 | | // Still waiting for other instances to finish; this is not the final close. |
80 | 177k | is_fully_closed = false; |
81 | 177k | return _status; |
82 | 177k | } |
83 | | |
84 | | // All instances have closed: the buffer is now fully closed. |
85 | 250k | is_fully_closed = true; |
86 | 250k | _is_close = true; |
87 | 250k | _arrow_data_arrival.notify_all(); |
88 | | |
89 | 250k | if (!_waiting_rpc.empty()) { |
90 | 145k | if (_status.ok()) { |
91 | 144k | for (auto& ctx : _waiting_rpc) { |
92 | 144k | ctx->on_close(_packet_num, _returned_rows); |
93 | 144k | } |
94 | 144k | } else { |
95 | 1.29k | for (auto& ctx : _waiting_rpc) { |
96 | 1.27k | ctx->on_failure(_status); |
97 | 1.27k | } |
98 | 1.29k | } |
99 | 145k | _waiting_rpc.clear(); |
100 | 145k | } |
101 | | |
102 | 250k | return _status; |
103 | 427k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb Line | Count | Source | 62 | 104 | int64_t num_rows, bool& is_fully_closed) { | 63 | 104 | std::unique_lock<std::mutex> l(_lock); | 64 | 104 | _returned_rows.fetch_add(num_rows); | 65 | | // close will be called multiple times and error status needs to be collected. | 66 | 104 | if (!exec_status.ok()) { | 67 | 2 | _status = exec_status; | 68 | 2 | } | 69 | | | 70 | 104 | auto it = _result_sink_dependencies.find(id); | 71 | 104 | if (it != _result_sink_dependencies.end()) { | 72 | 103 | it->second->set_always_ready(); | 73 | 103 | _result_sink_dependencies.erase(it); | 74 | 103 | } else { | 75 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 76 | 1 | print_id(id)); | 77 | 1 | } | 78 | 104 | if (!_result_sink_dependencies.empty()) { | 79 | | // Still waiting for other instances to finish; this is not the final close. | 80 | 1 | is_fully_closed = false; | 81 | 1 | return _status; | 82 | 1 | } | 83 | | | 84 | | // All instances have closed: the buffer is now fully closed. | 85 | 103 | is_fully_closed = true; | 86 | 103 | _is_close = true; | 87 | 103 | _arrow_data_arrival.notify_all(); | 88 | | | 89 | 103 | if (!_waiting_rpc.empty()) { | 90 | 2 | if (_status.ok()) { | 91 | 1 | for (auto& ctx : _waiting_rpc) { | 92 | 1 | ctx->on_close(_packet_num, _returned_rows); | 93 | 1 | } | 94 | 1 | } else { | 95 | 1 | for (auto& ctx : _waiting_rpc) { | 96 | 1 | ctx->on_failure(_status); | 97 | 1 | } | 98 | 1 | } | 99 | 2 | _waiting_rpc.clear(); | 100 | 2 | } | 101 | | | 102 | 103 | return _status; | 103 | 104 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb Line | Count | Source | 62 | 427k | int64_t num_rows, bool& is_fully_closed) { | 63 | 427k | std::unique_lock<std::mutex> l(_lock); | 64 | 427k | _returned_rows.fetch_add(num_rows); | 65 | | // close will be called multiple times and error status needs to be collected. | 66 | 427k | if (!exec_status.ok()) { | 67 | 1.31k | _status = exec_status; | 68 | 1.31k | } | 69 | | | 70 | 427k | auto it = _result_sink_dependencies.find(id); | 71 | 427k | if (it != _result_sink_dependencies.end()) { | 72 | 427k | it->second->set_always_ready(); | 73 | 427k | _result_sink_dependencies.erase(it); | 74 | 18.4E | } else { | 75 | 18.4E | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 76 | 18.4E | print_id(id)); | 77 | 18.4E | } | 78 | 427k | if (!_result_sink_dependencies.empty()) { | 79 | | // Still waiting for other instances to finish; this is not the final close. | 80 | 177k | is_fully_closed = false; | 81 | 177k | return _status; | 82 | 177k | } | 83 | | | 84 | | // All instances have closed: the buffer is now fully closed. | 85 | 250k | is_fully_closed = true; | 86 | 250k | _is_close = true; | 87 | 250k | _arrow_data_arrival.notify_all(); | 88 | | | 89 | 250k | if (!_waiting_rpc.empty()) { | 90 | 145k | if (_status.ok()) { | 91 | 144k | for (auto& ctx : _waiting_rpc) { | 92 | 144k | ctx->on_close(_packet_num, _returned_rows); | 93 | 144k | } | 94 | 144k | } else { | 95 | 1.29k | for (auto& ctx : _waiting_rpc) { | 96 | 1.27k | ctx->on_failure(_status); | 97 | 1.27k | } | 98 | 1.29k | } | 99 | 145k | _waiting_rpc.clear(); | 100 | 145k | } | 101 | | | 102 | 250k | return _status; | 103 | 427k | } |
|
104 | | |
105 | | template <typename ResultCtxType> |
106 | 202k | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { |
107 | 202k | std::unique_lock<std::mutex> l(_lock); |
108 | 202k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); |
109 | 202k | if (_status.ok()) { |
110 | 202k | _status = reason; |
111 | 202k | } |
112 | 202k | _arrow_data_arrival.notify_all(); |
113 | 202k | for (auto& ctx : _waiting_rpc) { |
114 | 2 | ctx->on_failure(reason); |
115 | 2 | } |
116 | 202k | _waiting_rpc.clear(); |
117 | 202k | _update_dependency(); |
118 | 202k | _result_batch_queue.clear(); |
119 | 202k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 106 | 102 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 107 | 102 | std::unique_lock<std::mutex> l(_lock); | 108 | 102 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 109 | 102 | if (_status.ok()) { | 110 | 102 | _status = reason; | 111 | 102 | } | 112 | 102 | _arrow_data_arrival.notify_all(); | 113 | 102 | for (auto& ctx : _waiting_rpc) { | 114 | 1 | ctx->on_failure(reason); | 115 | 1 | } | 116 | 102 | _waiting_rpc.clear(); | 117 | 102 | _update_dependency(); | 118 | 102 | _result_batch_queue.clear(); | 119 | 102 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 106 | 202k | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 107 | 202k | std::unique_lock<std::mutex> l(_lock); | 108 | 202k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 109 | 202k | if (_status.ok()) { | 110 | 202k | _status = reason; | 111 | 202k | } | 112 | 202k | _arrow_data_arrival.notify_all(); | 113 | 202k | for (auto& ctx : _waiting_rpc) { | 114 | 1 | ctx->on_failure(reason); | 115 | 1 | } | 116 | 202k | _waiting_rpc.clear(); | 117 | 202k | _update_dependency(); | 118 | 202k | _result_batch_queue.clear(); | 119 | 202k | } |
|
120 | | |
121 | | template <typename ResultCtxType> |
122 | | void ResultBlockBuffer<ResultCtxType>::set_dependency( |
123 | 423k | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { |
124 | 423k | std::unique_lock<std::mutex> l(_lock); |
125 | 423k | _result_sink_dependencies[id] = result_sink_dependency; |
126 | 423k | _update_dependency(); |
127 | 423k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 123 | 106 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 124 | 106 | std::unique_lock<std::mutex> l(_lock); | 125 | 106 | _result_sink_dependencies[id] = result_sink_dependency; | 126 | 106 | _update_dependency(); | 127 | 106 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 123 | 423k | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 124 | 423k | std::unique_lock<std::mutex> l(_lock); | 125 | 423k | _result_sink_dependencies[id] = result_sink_dependency; | 126 | 423k | _update_dependency(); | 127 | 423k | } |
|
128 | | |
129 | | template <typename ResultCtxType> |
130 | 1.21M | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { |
131 | 1.21M | if (!_status.ok()) { |
132 | 202k | for (auto it : _result_sink_dependencies) { |
133 | 6 | it.second->set_ready(); |
134 | 6 | } |
135 | 202k | return; |
136 | 202k | } |
137 | | |
138 | 2.13M | for (auto it : _result_sink_dependencies) { |
139 | 2.13M | if (_instance_rows[it.first] > _batch_size) { |
140 | 11 | it.second->block(); |
141 | 2.13M | } else { |
142 | 2.13M | it.second->set_ready(); |
143 | 2.13M | } |
144 | 2.13M | } |
145 | 1.01M | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 130 | 568 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 131 | 568 | if (!_status.ok()) { | 132 | 104 | for (auto it : _result_sink_dependencies) { | 133 | 3 | it.second->set_ready(); | 134 | 3 | } | 135 | 104 | return; | 136 | 104 | } | 137 | | | 138 | 464 | for (auto it : _result_sink_dependencies) { | 139 | 274 | if (_instance_rows[it.first] > _batch_size) { | 140 | 4 | it.second->block(); | 141 | 270 | } else { | 142 | 270 | it.second->set_ready(); | 143 | 270 | } | 144 | 274 | } | 145 | 464 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 130 | 1.21M | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 131 | 1.21M | if (!_status.ok()) { | 132 | 202k | for (auto it : _result_sink_dependencies) { | 133 | 3 | it.second->set_ready(); | 134 | 3 | } | 135 | 202k | return; | 136 | 202k | } | 137 | | | 138 | 2.13M | for (auto it : _result_sink_dependencies) { | 139 | 2.13M | if (_instance_rows[it.first] > _batch_size) { | 140 | 7 | it.second->block(); | 141 | 2.13M | } else { | 142 | 2.13M | it.second->set_ready(); | 143 | 2.13M | } | 144 | 2.13M | } | 145 | 1.01M | } |
|
146 | | |
147 | | template <typename ResultCtxType> |
148 | 416k | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { |
149 | 416k | std::lock_guard<std::mutex> l(_lock); |
150 | 416k | SCOPED_ATTACH_TASK(_mem_tracker); |
151 | 416k | Defer defer {[&]() { _update_dependency(); }};_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 151 | 8 | Defer defer {[&]() { _update_dependency(); }}; |
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 151 | 416k | Defer defer {[&]() { _update_dependency(); }}; |
|
152 | 416k | if (!_status.ok()) { |
153 | 9 | ctx->on_failure(_status); |
154 | 9 | return _status; |
155 | 9 | } |
156 | 416k | if (!_result_batch_queue.empty()) { |
157 | 11.4k | auto result = _result_batch_queue.front(); |
158 | 11.4k | _result_batch_queue.pop_front(); |
159 | 11.6k | for (auto it : _instance_rows_in_queue.front()) { |
160 | 11.6k | _instance_rows[it.first] -= it.second; |
161 | 11.6k | } |
162 | 11.4k | _instance_rows_in_queue.pop_front(); |
163 | 11.4k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
164 | 11.4k | _packet_num++; |
165 | 11.4k | return Status::OK(); |
166 | 11.4k | } |
167 | 404k | if (_is_close) { |
168 | 101k | if (!_status.ok()) { |
169 | 0 | ctx->on_failure(_status); |
170 | 0 | return Status::OK(); |
171 | 0 | } |
172 | 101k | ctx->on_close(_packet_num, _returned_rows); |
173 | 101k | LOG(INFO) << fmt::format( |
174 | 101k | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " |
175 | 101k | "packet_num={}, peak_memory_usage={}", |
176 | 101k | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, |
177 | 101k | _mem_tracker->peak_consumption()); |
178 | 101k | return Status::OK(); |
179 | 101k | } |
180 | | // no ready data, push ctx to waiting list |
181 | 302k | _waiting_rpc.push_back(ctx); |
182 | 302k | return Status::OK(); |
183 | 404k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 148 | 8 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 149 | 8 | std::lock_guard<std::mutex> l(_lock); | 150 | 8 | SCOPED_ATTACH_TASK(_mem_tracker); | 151 | 8 | Defer defer {[&]() { _update_dependency(); }}; | 152 | 8 | if (!_status.ok()) { | 153 | 1 | ctx->on_failure(_status); | 154 | 1 | return _status; | 155 | 1 | } | 156 | 7 | if (!_result_batch_queue.empty()) { | 157 | 2 | auto result = _result_batch_queue.front(); | 158 | 2 | _result_batch_queue.pop_front(); | 159 | 2 | for (auto it : _instance_rows_in_queue.front()) { | 160 | 2 | _instance_rows[it.first] -= it.second; | 161 | 2 | } | 162 | 2 | _instance_rows_in_queue.pop_front(); | 163 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 164 | 2 | _packet_num++; | 165 | 2 | return Status::OK(); | 166 | 2 | } | 167 | 5 | if (_is_close) { | 168 | 1 | if (!_status.ok()) { | 169 | 0 | ctx->on_failure(_status); | 170 | 0 | return Status::OK(); | 171 | 0 | } | 172 | 1 | ctx->on_close(_packet_num, _returned_rows); | 173 | 1 | LOG(INFO) << fmt::format( | 174 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 175 | 1 | "packet_num={}, peak_memory_usage={}", | 176 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 177 | 1 | _mem_tracker->peak_consumption()); | 178 | 1 | return Status::OK(); | 179 | 1 | } | 180 | | // no ready data, push ctx to waiting list | 181 | 4 | _waiting_rpc.push_back(ctx); | 182 | 4 | return Status::OK(); | 183 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 148 | 416k | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 149 | 416k | std::lock_guard<std::mutex> l(_lock); | 150 | 416k | SCOPED_ATTACH_TASK(_mem_tracker); | 151 | 416k | Defer defer {[&]() { _update_dependency(); }}; | 152 | 416k | if (!_status.ok()) { | 153 | 8 | ctx->on_failure(_status); | 154 | 8 | return _status; | 155 | 8 | } | 156 | 416k | if (!_result_batch_queue.empty()) { | 157 | 11.4k | auto result = _result_batch_queue.front(); | 158 | 11.4k | _result_batch_queue.pop_front(); | 159 | 11.6k | for (auto it : _instance_rows_in_queue.front()) { | 160 | 11.6k | _instance_rows[it.first] -= it.second; | 161 | 11.6k | } | 162 | 11.4k | _instance_rows_in_queue.pop_front(); | 163 | 11.4k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 164 | 11.4k | _packet_num++; | 165 | 11.4k | return Status::OK(); | 166 | 11.4k | } | 167 | 404k | if (_is_close) { | 168 | 101k | if (!_status.ok()) { | 169 | 0 | ctx->on_failure(_status); | 170 | 0 | return Status::OK(); | 171 | 0 | } | 172 | 101k | ctx->on_close(_packet_num, _returned_rows); | 173 | 101k | LOG(INFO) << fmt::format( | 174 | 101k | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 175 | 101k | "packet_num={}, peak_memory_usage={}", | 176 | 101k | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 177 | 101k | _mem_tracker->peak_consumption()); | 178 | 101k | return Status::OK(); | 179 | 101k | } | 180 | | // no ready data, push ctx to waiting list | 181 | 302k | _waiting_rpc.push_back(ctx); | 182 | 302k | return Status::OK(); | 183 | 404k | } |
|
184 | | |
185 | | template <typename ResultCtxType> |
186 | | Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, |
187 | 171k | std::shared_ptr<InBlockType>& result) { |
188 | 171k | std::unique_lock<std::mutex> l(_lock); |
189 | | |
190 | 171k | if (!_status.ok()) { |
191 | 2 | return _status; |
192 | 2 | } |
193 | | |
194 | 171k | if (_waiting_rpc.empty()) { |
195 | 13.6k | auto sz = 0; |
196 | 13.6k | auto num_rows = 0; |
197 | 13.6k | size_t batch_size = 0; |
198 | 13.6k | if constexpr (std::is_same_v<InBlockType, Block>) { |
199 | 133 | num_rows = cast_set<int>(result->rows()); |
200 | 133 | batch_size = result->bytes(); |
201 | 13.5k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
202 | 13.5k | num_rows = cast_set<int>(result->result_batch.rows.size()); |
203 | 18.6k | for (const auto& row : result->result_batch.rows) { |
204 | 18.6k | batch_size += row.size(); |
205 | 18.6k | } |
206 | 13.5k | } |
207 | 13.6k | if (!_result_batch_queue.empty()) { |
208 | 1.81k | if constexpr (std::is_same_v<InBlockType, Block>) { |
209 | 12 | sz = cast_set<int>(_result_batch_queue.back()->rows()); |
210 | 1.80k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
211 | 1.80k | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); |
212 | 1.80k | } |
213 | 1.81k | if (sz + num_rows < _buffer_limit && |
214 | 1.81k | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { |
215 | 1.81k | if constexpr (std::is_same_v<InBlockType, Block>) { |
216 | 12 | auto last_block = _result_batch_queue.back(); |
217 | 44 | for (size_t i = 0; i < last_block->columns(); i++) { |
218 | 32 | last_block->mutate_columns()[i]->insert_range_from( |
219 | 32 | *result->get_by_position(i).column, 0, num_rows); |
220 | 32 | } |
221 | 1.80k | } else { |
222 | 1.80k | std::vector<std::string>& back_rows = |
223 | 1.80k | _result_batch_queue.back()->result_batch.rows; |
224 | 1.80k | std::vector<std::string>& result_rows = result->result_batch.rows; |
225 | 1.80k | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), |
226 | 1.80k | std::make_move_iterator(result_rows.end())); |
227 | 1.80k | } |
228 | 1.81k | _last_batch_bytes += batch_size; |
229 | 1.81k | } else { |
230 | 0 | _instance_rows_in_queue.emplace_back(); |
231 | 0 | _result_batch_queue.push_back(std::move(result)); |
232 | 0 | _last_batch_bytes = batch_size; |
233 | 0 | _arrow_data_arrival |
234 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
235 | 0 | } |
236 | 11.8k | } else { |
237 | 11.8k | _instance_rows_in_queue.emplace_back(); |
238 | 11.8k | _result_batch_queue.push_back(std::move(result)); |
239 | 11.8k | _last_batch_bytes = batch_size; |
240 | 11.8k | _arrow_data_arrival |
241 | 11.8k | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
242 | 11.8k | } |
243 | 13.6k | _instance_rows[state->fragment_instance_id()] += num_rows; |
244 | 13.6k | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
245 | 157k | } else { |
246 | 157k | auto ctx = _waiting_rpc.front(); |
247 | 157k | _waiting_rpc.pop_front(); |
248 | 157k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
249 | 157k | _packet_num++; |
250 | 157k | } |
251 | | |
252 | 171k | _update_dependency(); |
253 | 171k | return Status::OK(); |
254 | 171k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE Line | Count | Source | 187 | 135 | std::shared_ptr<InBlockType>& result) { | 188 | 135 | std::unique_lock<std::mutex> l(_lock); | 189 | | | 190 | 135 | if (!_status.ok()) { | 191 | 1 | return _status; | 192 | 1 | } | 193 | | | 194 | 134 | if (_waiting_rpc.empty()) { | 195 | 133 | auto sz = 0; | 196 | 133 | auto num_rows = 0; | 197 | 133 | size_t batch_size = 0; | 198 | 133 | if constexpr (std::is_same_v<InBlockType, Block>) { | 199 | 133 | num_rows = cast_set<int>(result->rows()); | 200 | 133 | batch_size = result->bytes(); | 201 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 202 | | num_rows = cast_set<int>(result->result_batch.rows.size()); | 203 | | for (const auto& row : result->result_batch.rows) { | 204 | | batch_size += row.size(); | 205 | | } | 206 | | } | 207 | 133 | if (!_result_batch_queue.empty()) { | 208 | 12 | if constexpr (std::is_same_v<InBlockType, Block>) { | 209 | 12 | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 210 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 211 | | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 212 | | } | 213 | 12 | if (sz + num_rows < _buffer_limit && | 214 | 12 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 215 | 12 | if constexpr (std::is_same_v<InBlockType, Block>) { | 216 | 12 | auto last_block = _result_batch_queue.back(); | 217 | 44 | for (size_t i = 0; i < last_block->columns(); i++) { | 218 | 32 | last_block->mutate_columns()[i]->insert_range_from( | 219 | 32 | *result->get_by_position(i).column, 0, num_rows); | 220 | 32 | } | 221 | | } else { | 222 | | std::vector<std::string>& back_rows = | 223 | | _result_batch_queue.back()->result_batch.rows; | 224 | | std::vector<std::string>& result_rows = result->result_batch.rows; | 225 | | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 226 | | std::make_move_iterator(result_rows.end())); | 227 | | } | 228 | 12 | _last_batch_bytes += batch_size; | 229 | 12 | } else { | 230 | 0 | _instance_rows_in_queue.emplace_back(); | 231 | 0 | _result_batch_queue.push_back(std::move(result)); | 232 | 0 | _last_batch_bytes = batch_size; | 233 | 0 | _arrow_data_arrival | 234 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 235 | 0 | } | 236 | 121 | } else { | 237 | 121 | _instance_rows_in_queue.emplace_back(); | 238 | 121 | _result_batch_queue.push_back(std::move(result)); | 239 | 121 | _last_batch_bytes = batch_size; | 240 | 121 | _arrow_data_arrival | 241 | 121 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 242 | 121 | } | 243 | 133 | _instance_rows[state->fragment_instance_id()] += num_rows; | 244 | 133 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 245 | 133 | } else { | 246 | 1 | auto ctx = _waiting_rpc.front(); | 247 | 1 | _waiting_rpc.pop_front(); | 248 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 249 | 1 | _packet_num++; | 250 | 1 | } | 251 | | | 252 | 134 | _update_dependency(); | 253 | 134 | return Status::OK(); | 254 | 134 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE Line | Count | Source | 187 | 170k | std::shared_ptr<InBlockType>& result) { | 188 | 170k | std::unique_lock<std::mutex> l(_lock); | 189 | | | 190 | 170k | if (!_status.ok()) { | 191 | 1 | return _status; | 192 | 1 | } | 193 | | | 194 | 170k | if (_waiting_rpc.empty()) { | 195 | 13.5k | auto sz = 0; | 196 | 13.5k | auto num_rows = 0; | 197 | 13.5k | size_t batch_size = 0; | 198 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 199 | | num_rows = cast_set<int>(result->rows()); | 200 | | batch_size = result->bytes(); | 201 | 13.5k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 202 | 13.5k | num_rows = cast_set<int>(result->result_batch.rows.size()); | 203 | 18.6k | for (const auto& row : result->result_batch.rows) { | 204 | 18.6k | batch_size += row.size(); | 205 | 18.6k | } | 206 | 13.5k | } | 207 | 13.5k | if (!_result_batch_queue.empty()) { | 208 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 209 | | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 210 | 1.80k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 211 | 1.80k | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 212 | 1.80k | } | 213 | 1.80k | if (sz + num_rows < _buffer_limit && | 214 | 1.80k | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 215 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 216 | | auto last_block = _result_batch_queue.back(); | 217 | | for (size_t i = 0; i < last_block->columns(); i++) { | 218 | | last_block->mutate_columns()[i]->insert_range_from( | 219 | | *result->get_by_position(i).column, 0, num_rows); | 220 | | } | 221 | 1.80k | } else { | 222 | 1.80k | std::vector<std::string>& back_rows = | 223 | 1.80k | _result_batch_queue.back()->result_batch.rows; | 224 | 1.80k | std::vector<std::string>& result_rows = result->result_batch.rows; | 225 | 1.80k | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 226 | 1.80k | std::make_move_iterator(result_rows.end())); | 227 | 1.80k | } | 228 | 1.80k | _last_batch_bytes += batch_size; | 229 | 1.80k | } else { | 230 | 0 | _instance_rows_in_queue.emplace_back(); | 231 | 0 | _result_batch_queue.push_back(std::move(result)); | 232 | 0 | _last_batch_bytes = batch_size; | 233 | 0 | _arrow_data_arrival | 234 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 235 | 0 | } | 236 | 11.7k | } else { | 237 | 11.7k | _instance_rows_in_queue.emplace_back(); | 238 | 11.7k | _result_batch_queue.push_back(std::move(result)); | 239 | 11.7k | _last_batch_bytes = batch_size; | 240 | 11.7k | _arrow_data_arrival | 241 | 11.7k | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 242 | 11.7k | } | 243 | 13.5k | _instance_rows[state->fragment_instance_id()] += num_rows; | 244 | 13.5k | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 245 | 157k | } else { | 246 | 157k | auto ctx = _waiting_rpc.front(); | 247 | 157k | _waiting_rpc.pop_front(); | 248 | 157k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 249 | 157k | _packet_num++; | 250 | 157k | } | 251 | | | 252 | 170k | _update_dependency(); | 253 | 170k | return Status::OK(); | 254 | 170k | } |
|
255 | | |
256 | | template class ResultBlockBuffer<GetArrowResultBatchCtx>; |
257 | | template class ResultBlockBuffer<GetResultBatchCtx>; |
258 | | |
259 | | } // namespace doris |