be/src/runtime/result_block_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_block_buffer.h" |
19 | | |
20 | | #include <gen_cpp/Data_types.h> |
21 | | #include <gen_cpp/PaloInternalService_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <glog/logging.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <chrono> // IWYU pragma: keep |
27 | | #include <limits> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "arrow/type_fwd.h" |
34 | | #include "common/config.h" |
35 | | #include "core/block/block.h" |
36 | | #include "exec/pipeline/dependency.h" |
37 | | #include "exec/sink/writer/varrow_flight_result_writer.h" |
38 | | #include "exec/sink/writer/vmysql_result_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/thread_context.h" |
41 | | #include "util/thrift_util.h" |
42 | | |
43 | | namespace doris { |
44 | | |
45 | | template <typename ResultCtxType> |
46 | | ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state, |
47 | | int buffer_size) |
48 | 13 | : _fragment_id(std::move(id)), |
49 | 13 | _is_close(false), |
50 | 13 | _batch_size(state->batch_size()), |
51 | 13 | _timezone(state->timezone()), |
52 | 13 | _be_exec_version(state->be_exec_version()), |
53 | 13 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), |
54 | 13 | _buffer_limit(buffer_size) { |
55 | 13 | _mem_tracker = MemTrackerLimiter::create_shared( |
56 | 13 | MemTrackerLimiter::Type::QUERY, |
57 | 13 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); |
58 | 13 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 48 | 5 | : _fragment_id(std::move(id)), | 49 | 5 | _is_close(false), | 50 | 5 | _batch_size(state->batch_size()), | 51 | 5 | _timezone(state->timezone()), | 52 | 5 | _be_exec_version(state->be_exec_version()), | 53 | 5 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 54 | 5 | _buffer_limit(buffer_size) { | 55 | 5 | _mem_tracker = MemTrackerLimiter::create_shared( | 56 | 5 | MemTrackerLimiter::Type::QUERY, | 57 | 5 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 58 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 48 | 8 | : _fragment_id(std::move(id)), | 49 | 8 | _is_close(false), | 50 | 8 | _batch_size(state->batch_size()), | 51 | 8 | _timezone(state->timezone()), | 52 | 8 | _be_exec_version(state->be_exec_version()), | 53 | 8 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 54 | 8 | _buffer_limit(buffer_size) { | 55 | 8 | _mem_tracker = MemTrackerLimiter::create_shared( | 56 | 8 | MemTrackerLimiter::Type::QUERY, | 57 | 8 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 58 | 8 | } |
|
59 | | |
60 | | template <typename ResultCtxType> |
61 | | Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status, |
62 | 6 | int64_t num_rows, bool& is_fully_closed) { |
63 | 6 | std::unique_lock<std::mutex> l(_lock); |
64 | 6 | _returned_rows.fetch_add(num_rows); |
65 | | // close will be called multiple times and error status needs to be collected. |
66 | 6 | if (!exec_status.ok()) { |
67 | 4 | _status = exec_status; |
68 | 4 | } |
69 | | |
70 | 6 | auto it = _result_sink_dependencies.find(id); |
71 | 6 | if (it != _result_sink_dependencies.end()) { |
72 | 4 | it->second->set_always_ready(); |
73 | 4 | _result_sink_dependencies.erase(it); |
74 | 4 | } else { |
75 | 2 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", |
76 | 2 | print_id(id)); |
77 | 2 | } |
78 | 6 | if (!_result_sink_dependencies.empty()) { |
79 | | // Still waiting for other instances to finish; this is not the final close. |
80 | 2 | is_fully_closed = false; |
81 | 2 | return _status; |
82 | 2 | } |
83 | | |
84 | | // All instances have closed: the buffer is now fully closed. |
85 | 4 | is_fully_closed = true; |
86 | 4 | _is_close = true; |
87 | 4 | _arrow_data_arrival.notify_all(); |
88 | | |
89 | 4 | if (!_waiting_rpc.empty()) { |
90 | 4 | if (_status.ok()) { |
91 | 2 | for (auto& ctx : _waiting_rpc) { |
92 | 2 | ctx->on_close(_packet_num, _returned_rows); |
93 | 2 | } |
94 | 2 | } else { |
95 | 2 | for (auto& ctx : _waiting_rpc) { |
96 | 2 | ctx->on_failure(_status); |
97 | 2 | } |
98 | 2 | } |
99 | 4 | _waiting_rpc.clear(); |
100 | 4 | } |
101 | | |
102 | 4 | return _status; |
103 | 6 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb Line | Count | Source | 62 | 3 | int64_t num_rows, bool& is_fully_closed) { | 63 | 3 | std::unique_lock<std::mutex> l(_lock); | 64 | 3 | _returned_rows.fetch_add(num_rows); | 65 | | // close will be called multiple times and error status needs to be collected. | 66 | 3 | if (!exec_status.ok()) { | 67 | 2 | _status = exec_status; | 68 | 2 | } | 69 | | | 70 | 3 | auto it = _result_sink_dependencies.find(id); | 71 | 3 | if (it != _result_sink_dependencies.end()) { | 72 | 2 | it->second->set_always_ready(); | 73 | 2 | _result_sink_dependencies.erase(it); | 74 | 2 | } else { | 75 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 76 | 1 | print_id(id)); | 77 | 1 | } | 78 | 3 | if (!_result_sink_dependencies.empty()) { | 79 | | // Still waiting for other instances to finish; this is not the final close. | 80 | 1 | is_fully_closed = false; | 81 | 1 | return _status; | 82 | 1 | } | 83 | | | 84 | | // All instances have closed: the buffer is now fully closed. | 85 | 2 | is_fully_closed = true; | 86 | 2 | _is_close = true; | 87 | 2 | _arrow_data_arrival.notify_all(); | 88 | | | 89 | 2 | if (!_waiting_rpc.empty()) { | 90 | 2 | if (_status.ok()) { | 91 | 1 | for (auto& ctx : _waiting_rpc) { | 92 | 1 | ctx->on_close(_packet_num, _returned_rows); | 93 | 1 | } | 94 | 1 | } else { | 95 | 1 | for (auto& ctx : _waiting_rpc) { | 96 | 1 | ctx->on_failure(_status); | 97 | 1 | } | 98 | 1 | } | 99 | 2 | _waiting_rpc.clear(); | 100 | 2 | } | 101 | | | 102 | 2 | return _status; | 103 | 3 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb Line | Count | Source | 62 | 3 | int64_t num_rows, bool& is_fully_closed) { | 63 | 3 | std::unique_lock<std::mutex> l(_lock); | 64 | 3 | _returned_rows.fetch_add(num_rows); | 65 | | // close will be called multiple times and error status needs to be collected. | 66 | 3 | if (!exec_status.ok()) { | 67 | 2 | _status = exec_status; | 68 | 2 | } | 69 | | | 70 | 3 | auto it = _result_sink_dependencies.find(id); | 71 | 3 | if (it != _result_sink_dependencies.end()) { | 72 | 2 | it->second->set_always_ready(); | 73 | 2 | _result_sink_dependencies.erase(it); | 74 | 2 | } else { | 75 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 76 | 1 | print_id(id)); | 77 | 1 | } | 78 | 3 | if (!_result_sink_dependencies.empty()) { | 79 | | // Still waiting for other instances to finish; this is not the final close. | 80 | 1 | is_fully_closed = false; | 81 | 1 | return _status; | 82 | 1 | } | 83 | | | 84 | | // All instances have closed: the buffer is now fully closed. | 85 | 2 | is_fully_closed = true; | 86 | 2 | _is_close = true; | 87 | 2 | _arrow_data_arrival.notify_all(); | 88 | | | 89 | 2 | if (!_waiting_rpc.empty()) { | 90 | 2 | if (_status.ok()) { | 91 | 1 | for (auto& ctx : _waiting_rpc) { | 92 | 1 | ctx->on_close(_packet_num, _returned_rows); | 93 | 1 | } | 94 | 1 | } else { | 95 | 1 | for (auto& ctx : _waiting_rpc) { | 96 | 1 | ctx->on_failure(_status); | 97 | 1 | } | 98 | 1 | } | 99 | 2 | _waiting_rpc.clear(); | 100 | 2 | } | 101 | | | 102 | 2 | return _status; | 103 | 3 | } |
|
104 | | |
105 | | template <typename ResultCtxType> |
106 | 3 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { |
107 | 3 | std::unique_lock<std::mutex> l(_lock); |
108 | 3 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); |
109 | 3 | if (_status.ok()) { |
110 | 3 | _status = reason; |
111 | 3 | } |
112 | 3 | _arrow_data_arrival.notify_all(); |
113 | 3 | for (auto& ctx : _waiting_rpc) { |
114 | 2 | ctx->on_failure(reason); |
115 | 2 | } |
116 | 3 | _waiting_rpc.clear(); |
117 | 3 | _update_dependency(); |
118 | 3 | _result_batch_queue.clear(); |
119 | 3 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 106 | 1 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 107 | 1 | std::unique_lock<std::mutex> l(_lock); | 108 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 109 | 1 | if (_status.ok()) { | 110 | 1 | _status = reason; | 111 | 1 | } | 112 | 1 | _arrow_data_arrival.notify_all(); | 113 | 1 | for (auto& ctx : _waiting_rpc) { | 114 | 1 | ctx->on_failure(reason); | 115 | 1 | } | 116 | 1 | _waiting_rpc.clear(); | 117 | 1 | _update_dependency(); | 118 | 1 | _result_batch_queue.clear(); | 119 | 1 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 106 | 2 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 107 | 2 | std::unique_lock<std::mutex> l(_lock); | 108 | 2 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 109 | 2 | if (_status.ok()) { | 110 | 2 | _status = reason; | 111 | 2 | } | 112 | 2 | _arrow_data_arrival.notify_all(); | 113 | 2 | for (auto& ctx : _waiting_rpc) { | 114 | 1 | ctx->on_failure(reason); | 115 | 1 | } | 116 | 2 | _waiting_rpc.clear(); | 117 | 2 | _update_dependency(); | 118 | 2 | _result_batch_queue.clear(); | 119 | 2 | } |
|
120 | | |
121 | | template <typename ResultCtxType> |
122 | | void ResultBlockBuffer<ResultCtxType>::set_dependency( |
123 | 9 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { |
124 | 9 | std::unique_lock<std::mutex> l(_lock); |
125 | 9 | _result_sink_dependencies[id] = result_sink_dependency; |
126 | 9 | _update_dependency(); |
127 | 9 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 123 | 5 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 124 | 5 | std::unique_lock<std::mutex> l(_lock); | 125 | 5 | _result_sink_dependencies[id] = result_sink_dependency; | 126 | 5 | _update_dependency(); | 127 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 123 | 4 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 124 | 4 | std::unique_lock<std::mutex> l(_lock); | 125 | 4 | _result_sink_dependencies[id] = result_sink_dependency; | 126 | 4 | _update_dependency(); | 127 | 4 | } |
|
128 | | |
129 | | template <typename ResultCtxType> |
130 | 36 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { |
131 | 36 | if (!_status.ok()) { |
132 | 7 | for (auto it : _result_sink_dependencies) { |
133 | 6 | it.second->set_ready(); |
134 | 6 | } |
135 | 7 | return; |
136 | 7 | } |
137 | | |
138 | 29 | for (auto it : _result_sink_dependencies) { |
139 | 26 | if (_instance_rows[it.first] > _batch_size) { |
140 | 6 | it.second->block(); |
141 | 20 | } else { |
142 | 20 | it.second->set_ready(); |
143 | 20 | } |
144 | 26 | } |
145 | 29 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 130 | 19 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 131 | 19 | if (!_status.ok()) { | 132 | 3 | for (auto it : _result_sink_dependencies) { | 133 | 3 | it.second->set_ready(); | 134 | 3 | } | 135 | 3 | return; | 136 | 3 | } | 137 | | | 138 | 16 | for (auto it : _result_sink_dependencies) { | 139 | 15 | if (_instance_rows[it.first] > _batch_size) { | 140 | 4 | it.second->block(); | 141 | 11 | } else { | 142 | 11 | it.second->set_ready(); | 143 | 11 | } | 144 | 15 | } | 145 | 16 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 130 | 17 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 131 | 17 | if (!_status.ok()) { | 132 | 4 | for (auto it : _result_sink_dependencies) { | 133 | 3 | it.second->set_ready(); | 134 | 3 | } | 135 | 4 | return; | 136 | 4 | } | 137 | | | 138 | 13 | for (auto it : _result_sink_dependencies) { | 139 | 11 | if (_instance_rows[it.first] > _batch_size) { | 140 | 2 | it.second->block(); | 141 | 9 | } else { | 142 | 9 | it.second->set_ready(); | 143 | 9 | } | 144 | 11 | } | 145 | 13 | } |
|
146 | | |
147 | | template <typename ResultCtxType> |
148 | 15 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { |
149 | 15 | std::lock_guard<std::mutex> l(_lock); |
150 | 15 | SCOPED_ATTACH_TASK(_mem_tracker); |
151 | 15 | Defer defer {[&]() { _update_dependency(); }};_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 151 | 8 | Defer defer {[&]() { _update_dependency(); }}; |
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 151 | 7 | Defer defer {[&]() { _update_dependency(); }}; |
|
152 | 15 | if (!_status.ok()) { |
153 | 2 | ctx->on_failure(_status); |
154 | 2 | return _status; |
155 | 2 | } |
156 | 13 | if (!_result_batch_queue.empty()) { |
157 | 3 | auto result = _result_batch_queue.front(); |
158 | 3 | _result_batch_queue.pop_front(); |
159 | 3 | for (auto it : _instance_rows_in_queue.front()) { |
160 | 3 | _instance_rows[it.first] -= it.second; |
161 | 3 | } |
162 | 3 | _instance_rows_in_queue.pop_front(); |
163 | 3 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
164 | 3 | _packet_num++; |
165 | 3 | return Status::OK(); |
166 | 3 | } |
167 | 10 | if (_is_close) { |
168 | 2 | if (!_status.ok()) { |
169 | 0 | ctx->on_failure(_status); |
170 | 0 | return Status::OK(); |
171 | 0 | } |
172 | 2 | ctx->on_close(_packet_num, _returned_rows); |
173 | 2 | LOG(INFO) << fmt::format( |
174 | 2 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " |
175 | 2 | "packet_num={}, peak_memory_usage={}", |
176 | 2 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, |
177 | 2 | _mem_tracker->peak_consumption()); |
178 | 2 | return Status::OK(); |
179 | 2 | } |
180 | | // no ready data, push ctx to waiting list |
181 | 8 | _waiting_rpc.push_back(ctx); |
182 | 8 | return Status::OK(); |
183 | 10 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 148 | 8 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 149 | 8 | std::lock_guard<std::mutex> l(_lock); | 150 | 8 | SCOPED_ATTACH_TASK(_mem_tracker); | 151 | 8 | Defer defer {[&]() { _update_dependency(); }}; | 152 | 8 | if (!_status.ok()) { | 153 | 1 | ctx->on_failure(_status); | 154 | 1 | return _status; | 155 | 1 | } | 156 | 7 | if (!_result_batch_queue.empty()) { | 157 | 2 | auto result = _result_batch_queue.front(); | 158 | 2 | _result_batch_queue.pop_front(); | 159 | 2 | for (auto it : _instance_rows_in_queue.front()) { | 160 | 2 | _instance_rows[it.first] -= it.second; | 161 | 2 | } | 162 | 2 | _instance_rows_in_queue.pop_front(); | 163 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 164 | 2 | _packet_num++; | 165 | 2 | return Status::OK(); | 166 | 2 | } | 167 | 5 | if (_is_close) { | 168 | 1 | if (!_status.ok()) { | 169 | 0 | ctx->on_failure(_status); | 170 | 0 | return Status::OK(); | 171 | 0 | } | 172 | 1 | ctx->on_close(_packet_num, _returned_rows); | 173 | 1 | LOG(INFO) << fmt::format( | 174 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 175 | 1 | "packet_num={}, peak_memory_usage={}", | 176 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 177 | 1 | _mem_tracker->peak_consumption()); | 178 | 1 | return Status::OK(); | 179 | 1 | } | 180 | | // no ready data, push ctx to waiting list | 181 | 4 | _waiting_rpc.push_back(ctx); | 182 | 4 | return Status::OK(); | 183 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 148 | 7 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 149 | 7 | std::lock_guard<std::mutex> l(_lock); | 150 | 7 | SCOPED_ATTACH_TASK(_mem_tracker); | 151 | 7 | Defer defer {[&]() { _update_dependency(); }}; | 152 | 7 | if (!_status.ok()) { | 153 | 1 | ctx->on_failure(_status); | 154 | 1 | return _status; | 155 | 1 | } | 156 | 6 | if (!_result_batch_queue.empty()) { | 157 | 1 | auto result = _result_batch_queue.front(); | 158 | 1 | _result_batch_queue.pop_front(); | 159 | 1 | for (auto it : _instance_rows_in_queue.front()) { | 160 | 1 | _instance_rows[it.first] -= it.second; | 161 | 1 | } | 162 | 1 | _instance_rows_in_queue.pop_front(); | 163 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 164 | 1 | _packet_num++; | 165 | 1 | return Status::OK(); | 166 | 1 | } | 167 | 5 | if (_is_close) { | 168 | 1 | if (!_status.ok()) { | 169 | 0 | ctx->on_failure(_status); | 170 | 0 | return Status::OK(); | 171 | 0 | } | 172 | 1 | ctx->on_close(_packet_num, _returned_rows); | 173 | 1 | LOG(INFO) << fmt::format( | 174 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 175 | 1 | "packet_num={}, peak_memory_usage={}", | 176 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 177 | 1 | _mem_tracker->peak_consumption()); | 178 | 1 | return Status::OK(); | 179 | 1 | } | 180 | | // no ready data, push ctx to waiting list | 181 | 4 | _waiting_rpc.push_back(ctx); | 182 | 4 | return Status::OK(); | 183 | 5 | } |
|
184 | | |
185 | | template <typename ResultCtxType> |
186 | | Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, |
187 | 11 | std::shared_ptr<InBlockType>& result) { |
188 | 11 | std::unique_lock<std::mutex> l(_lock); |
189 | | |
190 | 11 | if (!_status.ok()) { |
191 | 2 | return _status; |
192 | 2 | } |
193 | | |
194 | 9 | if (_waiting_rpc.empty()) { |
195 | 7 | auto sz = 0; |
196 | 7 | auto num_rows = 0; |
197 | 7 | size_t batch_size = 0; |
198 | 7 | if constexpr (std::is_same_v<InBlockType, Block>) { |
199 | 4 | num_rows = cast_set<int>(result->rows()); |
200 | 4 | batch_size = result->bytes(); |
201 | 4 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
202 | 3 | num_rows = cast_set<int>(result->result_batch.rows.size()); |
203 | 11 | for (const auto& row : result->result_batch.rows) { |
204 | 11 | batch_size += row.size(); |
205 | 11 | } |
206 | 3 | } |
207 | 7 | if (!_result_batch_queue.empty()) { |
208 | 3 | if constexpr (std::is_same_v<InBlockType, Block>) { |
209 | 2 | sz = cast_set<int>(_result_batch_queue.back()->rows()); |
210 | 2 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
211 | 1 | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); |
212 | 1 | } |
213 | 3 | if (sz + num_rows < _buffer_limit && |
214 | 3 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { |
215 | 3 | if constexpr (std::is_same_v<InBlockType, Block>) { |
216 | 2 | auto last_block = _result_batch_queue.back(); |
217 | 4 | for (size_t i = 0; i < last_block->columns(); i++) { |
218 | 2 | last_block->mutate_columns()[i]->insert_range_from( |
219 | 2 | *result->get_by_position(i).column, 0, num_rows); |
220 | 2 | } |
221 | 2 | } else { |
222 | 1 | std::vector<std::string>& back_rows = |
223 | 1 | _result_batch_queue.back()->result_batch.rows; |
224 | 1 | std::vector<std::string>& result_rows = result->result_batch.rows; |
225 | 1 | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), |
226 | 1 | std::make_move_iterator(result_rows.end())); |
227 | 1 | } |
228 | 3 | _last_batch_bytes += batch_size; |
229 | 3 | } else { |
230 | 0 | _instance_rows_in_queue.emplace_back(); |
231 | 0 | _result_batch_queue.push_back(std::move(result)); |
232 | 0 | _last_batch_bytes = batch_size; |
233 | 0 | _arrow_data_arrival |
234 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
235 | 0 | } |
236 | 4 | } else { |
237 | 4 | _instance_rows_in_queue.emplace_back(); |
238 | 4 | _result_batch_queue.push_back(std::move(result)); |
239 | 4 | _last_batch_bytes = batch_size; |
240 | 4 | _arrow_data_arrival |
241 | 4 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
242 | 4 | } |
243 | 7 | _instance_rows[state->fragment_instance_id()] += num_rows; |
244 | 7 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
245 | 7 | } else { |
246 | 2 | auto ctx = _waiting_rpc.front(); |
247 | 2 | _waiting_rpc.pop_front(); |
248 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
249 | 2 | _packet_num++; |
250 | 2 | } |
251 | | |
252 | 9 | _update_dependency(); |
253 | 9 | return Status::OK(); |
254 | 9 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE Line | Count | Source | 187 | 6 | std::shared_ptr<InBlockType>& result) { | 188 | 6 | std::unique_lock<std::mutex> l(_lock); | 189 | | | 190 | 6 | if (!_status.ok()) { | 191 | 1 | return _status; | 192 | 1 | } | 193 | | | 194 | 5 | if (_waiting_rpc.empty()) { | 195 | 4 | auto sz = 0; | 196 | 4 | auto num_rows = 0; | 197 | 4 | size_t batch_size = 0; | 198 | 4 | if constexpr (std::is_same_v<InBlockType, Block>) { | 199 | 4 | num_rows = cast_set<int>(result->rows()); | 200 | 4 | batch_size = result->bytes(); | 201 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 202 | | num_rows = cast_set<int>(result->result_batch.rows.size()); | 203 | | for (const auto& row : result->result_batch.rows) { | 204 | | batch_size += row.size(); | 205 | | } | 206 | | } | 207 | 4 | if (!_result_batch_queue.empty()) { | 208 | 2 | if constexpr (std::is_same_v<InBlockType, Block>) { | 209 | 2 | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 210 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 211 | | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 212 | | } | 213 | 2 | if (sz + num_rows < _buffer_limit && | 214 | 2 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 215 | 2 | if constexpr (std::is_same_v<InBlockType, Block>) { | 216 | 2 | auto last_block = _result_batch_queue.back(); | 217 | 4 | for (size_t i = 0; i < last_block->columns(); i++) { | 218 | 2 | last_block->mutate_columns()[i]->insert_range_from( | 219 | 2 | *result->get_by_position(i).column, 0, num_rows); | 220 | 2 | } | 221 | | } else { | 222 | | std::vector<std::string>& back_rows = | 223 | | _result_batch_queue.back()->result_batch.rows; | 224 | | std::vector<std::string>& result_rows = result->result_batch.rows; | 225 | | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 226 | | std::make_move_iterator(result_rows.end())); | 227 | | } | 228 | 2 | _last_batch_bytes += batch_size; | 229 | 2 | } else { | 230 | 0 | _instance_rows_in_queue.emplace_back(); | 231 | 0 | _result_batch_queue.push_back(std::move(result)); | 232 | 0 | _last_batch_bytes = batch_size; | 233 | 0 | _arrow_data_arrival | 234 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 235 | 0 | } | 236 | 2 | } else { | 237 | 2 | _instance_rows_in_queue.emplace_back(); | 238 | 2 | _result_batch_queue.push_back(std::move(result)); | 239 | 2 | _last_batch_bytes = batch_size; | 240 | 2 | _arrow_data_arrival | 241 | 2 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 242 | 2 | } | 243 | 4 | _instance_rows[state->fragment_instance_id()] += num_rows; | 244 | 4 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 245 | 4 | } else { | 246 | 1 | auto ctx = _waiting_rpc.front(); | 247 | 1 | _waiting_rpc.pop_front(); | 248 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 249 | 1 | _packet_num++; | 250 | 1 | } | 251 | | | 252 | 5 | _update_dependency(); | 253 | 5 | return Status::OK(); | 254 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE Line | Count | Source | 187 | 5 | std::shared_ptr<InBlockType>& result) { | 188 | 5 | std::unique_lock<std::mutex> l(_lock); | 189 | | | 190 | 5 | if (!_status.ok()) { | 191 | 1 | return _status; | 192 | 1 | } | 193 | | | 194 | 4 | if (_waiting_rpc.empty()) { | 195 | 3 | auto sz = 0; | 196 | 3 | auto num_rows = 0; | 197 | 3 | size_t batch_size = 0; | 198 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 199 | | num_rows = cast_set<int>(result->rows()); | 200 | | batch_size = result->bytes(); | 201 | 3 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 202 | 3 | num_rows = cast_set<int>(result->result_batch.rows.size()); | 203 | 11 | for (const auto& row : result->result_batch.rows) { | 204 | 11 | batch_size += row.size(); | 205 | 11 | } | 206 | 3 | } | 207 | 3 | if (!_result_batch_queue.empty()) { | 208 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 209 | | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 210 | 1 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 211 | 1 | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 212 | 1 | } | 213 | 1 | if (sz + num_rows < _buffer_limit && | 214 | 1 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 215 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 216 | | auto last_block = _result_batch_queue.back(); | 217 | | for (size_t i = 0; i < last_block->columns(); i++) { | 218 | | last_block->mutate_columns()[i]->insert_range_from( | 219 | | *result->get_by_position(i).column, 0, num_rows); | 220 | | } | 221 | 1 | } else { | 222 | 1 | std::vector<std::string>& back_rows = | 223 | 1 | _result_batch_queue.back()->result_batch.rows; | 224 | 1 | std::vector<std::string>& result_rows = result->result_batch.rows; | 225 | 1 | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 226 | 1 | std::make_move_iterator(result_rows.end())); | 227 | 1 | } | 228 | 1 | _last_batch_bytes += batch_size; | 229 | 1 | } else { | 230 | 0 | _instance_rows_in_queue.emplace_back(); | 231 | 0 | _result_batch_queue.push_back(std::move(result)); | 232 | 0 | _last_batch_bytes = batch_size; | 233 | 0 | _arrow_data_arrival | 234 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 235 | 0 | } | 236 | 2 | } else { | 237 | 2 | _instance_rows_in_queue.emplace_back(); | 238 | 2 | _result_batch_queue.push_back(std::move(result)); | 239 | 2 | _last_batch_bytes = batch_size; | 240 | 2 | _arrow_data_arrival | 241 | 2 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 242 | 2 | } | 243 | 3 | _instance_rows[state->fragment_instance_id()] += num_rows; | 244 | 3 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 245 | 3 | } else { | 246 | 1 | auto ctx = _waiting_rpc.front(); | 247 | 1 | _waiting_rpc.pop_front(); | 248 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 249 | 1 | _packet_num++; | 250 | 1 | } | 251 | | | 252 | 4 | _update_dependency(); | 253 | 4 | return Status::OK(); | 254 | 4 | } |
|
255 | | |
256 | | template class ResultBlockBuffer<GetArrowResultBatchCtx>; |
257 | | template class ResultBlockBuffer<GetResultBatchCtx>; |
258 | | |
259 | | } // namespace doris |