be/src/runtime/result_block_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_block_buffer.h" |
19 | | |
20 | | #include <gen_cpp/Data_types.h> |
21 | | #include <gen_cpp/PaloInternalService_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <glog/logging.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <chrono> // IWYU pragma: keep |
27 | | #include <limits> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "arrow/type_fwd.h" |
34 | | #include "common/config.h" |
35 | | #include "core/block/block.h" |
36 | | #include "exec/pipeline/dependency.h" |
37 | | #include "exec/sink/writer/varrow_flight_result_writer.h" |
38 | | #include "exec/sink/writer/vmysql_result_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/thread_context.h" |
41 | | #include "util/thrift_util.h" |
42 | | |
43 | | namespace doris { |
44 | | #include "common/compile_check_begin.h" |
45 | | |
46 | | template <typename ResultCtxType> |
47 | | ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state, |
48 | | int buffer_size) |
49 | 13 | : _fragment_id(std::move(id)), |
50 | 13 | _is_close(false), |
51 | 13 | _batch_size(state->batch_size()), |
52 | 13 | _timezone(state->timezone()), |
53 | 13 | _be_exec_version(state->be_exec_version()), |
54 | 13 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), |
55 | 13 | _buffer_limit(buffer_size) { |
56 | 13 | _mem_tracker = MemTrackerLimiter::create_shared( |
57 | 13 | MemTrackerLimiter::Type::QUERY, |
58 | 13 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); |
59 | 13 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 49 | 5 | : _fragment_id(std::move(id)), | 50 | 5 | _is_close(false), | 51 | 5 | _batch_size(state->batch_size()), | 52 | 5 | _timezone(state->timezone()), | 53 | 5 | _be_exec_version(state->be_exec_version()), | 54 | 5 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 55 | 5 | _buffer_limit(buffer_size) { | 56 | 5 | _mem_tracker = MemTrackerLimiter::create_shared( | 57 | 5 | MemTrackerLimiter::Type::QUERY, | 58 | 5 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 59 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 49 | 8 | : _fragment_id(std::move(id)), | 50 | 8 | _is_close(false), | 51 | 8 | _batch_size(state->batch_size()), | 52 | 8 | _timezone(state->timezone()), | 53 | 8 | _be_exec_version(state->be_exec_version()), | 54 | 8 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 55 | 8 | _buffer_limit(buffer_size) { | 56 | 8 | _mem_tracker = MemTrackerLimiter::create_shared( | 57 | 8 | MemTrackerLimiter::Type::QUERY, | 58 | 8 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 59 | 8 | } |
|
60 | | |
61 | | template <typename ResultCtxType> |
62 | | Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status, |
63 | 6 | int64_t num_rows, bool& is_fully_closed) { |
64 | 6 | std::unique_lock<std::mutex> l(_lock); |
65 | 6 | _returned_rows.fetch_add(num_rows); |
66 | | // close will be called multiple times and error status needs to be collected. |
67 | 6 | if (!exec_status.ok()) { |
68 | 4 | _status = exec_status; |
69 | 4 | } |
70 | | |
71 | 6 | auto it = _result_sink_dependencies.find(id); |
72 | 6 | if (it != _result_sink_dependencies.end()) { |
73 | 4 | it->second->set_always_ready(); |
74 | 4 | _result_sink_dependencies.erase(it); |
75 | 4 | } else { |
76 | 2 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", |
77 | 2 | print_id(id)); |
78 | 2 | } |
79 | 6 | if (!_result_sink_dependencies.empty()) { |
80 | | // Still waiting for other instances to finish; this is not the final close. |
81 | 2 | is_fully_closed = false; |
82 | 2 | return _status; |
83 | 2 | } |
84 | | |
85 | | // All instances have closed: the buffer is now fully closed. |
86 | 4 | is_fully_closed = true; |
87 | 4 | _is_close = true; |
88 | 4 | _arrow_data_arrival.notify_all(); |
89 | | |
90 | 4 | if (!_waiting_rpc.empty()) { |
91 | 4 | if (_status.ok()) { |
92 | 2 | for (auto& ctx : _waiting_rpc) { |
93 | 2 | ctx->on_close(_packet_num, _returned_rows); |
94 | 2 | } |
95 | 2 | } else { |
96 | 2 | for (auto& ctx : _waiting_rpc) { |
97 | 2 | ctx->on_failure(_status); |
98 | 2 | } |
99 | 2 | } |
100 | 4 | _waiting_rpc.clear(); |
101 | 4 | } |
102 | | |
103 | 4 | return _status; |
104 | 6 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb Line | Count | Source | 63 | 3 | int64_t num_rows, bool& is_fully_closed) { | 64 | 3 | std::unique_lock<std::mutex> l(_lock); | 65 | 3 | _returned_rows.fetch_add(num_rows); | 66 | | // close will be called multiple times and error status needs to be collected. | 67 | 3 | if (!exec_status.ok()) { | 68 | 2 | _status = exec_status; | 69 | 2 | } | 70 | | | 71 | 3 | auto it = _result_sink_dependencies.find(id); | 72 | 3 | if (it != _result_sink_dependencies.end()) { | 73 | 2 | it->second->set_always_ready(); | 74 | 2 | _result_sink_dependencies.erase(it); | 75 | 2 | } else { | 76 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 77 | 1 | print_id(id)); | 78 | 1 | } | 79 | 3 | if (!_result_sink_dependencies.empty()) { | 80 | | // Still waiting for other instances to finish; this is not the final close. | 81 | 1 | is_fully_closed = false; | 82 | 1 | return _status; | 83 | 1 | } | 84 | | | 85 | | // All instances have closed: the buffer is now fully closed. | 86 | 2 | is_fully_closed = true; | 87 | 2 | _is_close = true; | 88 | 2 | _arrow_data_arrival.notify_all(); | 89 | | | 90 | 2 | if (!_waiting_rpc.empty()) { | 91 | 2 | if (_status.ok()) { | 92 | 1 | for (auto& ctx : _waiting_rpc) { | 93 | 1 | ctx->on_close(_packet_num, _returned_rows); | 94 | 1 | } | 95 | 1 | } else { | 96 | 1 | for (auto& ctx : _waiting_rpc) { | 97 | 1 | ctx->on_failure(_status); | 98 | 1 | } | 99 | 1 | } | 100 | 2 | _waiting_rpc.clear(); | 101 | 2 | } | 102 | | | 103 | 2 | return _status; | 104 | 3 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusElRb Line | Count | Source | 63 | 3 | int64_t num_rows, bool& is_fully_closed) { | 64 | 3 | std::unique_lock<std::mutex> l(_lock); | 65 | 3 | _returned_rows.fetch_add(num_rows); | 66 | | // close will be called multiple times and error status needs to be collected. | 67 | 3 | if (!exec_status.ok()) { | 68 | 2 | _status = exec_status; | 69 | 2 | } | 70 | | | 71 | 3 | auto it = _result_sink_dependencies.find(id); | 72 | 3 | if (it != _result_sink_dependencies.end()) { | 73 | 2 | it->second->set_always_ready(); | 74 | 2 | _result_sink_dependencies.erase(it); | 75 | 2 | } else { | 76 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 77 | 1 | print_id(id)); | 78 | 1 | } | 79 | 3 | if (!_result_sink_dependencies.empty()) { | 80 | | // Still waiting for other instances to finish; this is not the final close. | 81 | 1 | is_fully_closed = false; | 82 | 1 | return _status; | 83 | 1 | } | 84 | | | 85 | | // All instances have closed: the buffer is now fully closed. | 86 | 2 | is_fully_closed = true; | 87 | 2 | _is_close = true; | 88 | 2 | _arrow_data_arrival.notify_all(); | 89 | | | 90 | 2 | if (!_waiting_rpc.empty()) { | 91 | 2 | if (_status.ok()) { | 92 | 1 | for (auto& ctx : _waiting_rpc) { | 93 | 1 | ctx->on_close(_packet_num, _returned_rows); | 94 | 1 | } | 95 | 1 | } else { | 96 | 1 | for (auto& ctx : _waiting_rpc) { | 97 | 1 | ctx->on_failure(_status); | 98 | 1 | } | 99 | 1 | } | 100 | 2 | _waiting_rpc.clear(); | 101 | 2 | } | 102 | | | 103 | 2 | return _status; | 104 | 3 | } |
|
105 | | |
106 | | template <typename ResultCtxType> |
107 | 3 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { |
108 | 3 | std::unique_lock<std::mutex> l(_lock); |
109 | 3 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); |
110 | 3 | if (_status.ok()) { |
111 | 3 | _status = reason; |
112 | 3 | } |
113 | 3 | _arrow_data_arrival.notify_all(); |
114 | 3 | for (auto& ctx : _waiting_rpc) { |
115 | 2 | ctx->on_failure(reason); |
116 | 2 | } |
117 | 3 | _waiting_rpc.clear(); |
118 | 3 | _update_dependency(); |
119 | 3 | _result_batch_queue.clear(); |
120 | 3 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 107 | 1 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 108 | 1 | std::unique_lock<std::mutex> l(_lock); | 109 | 1 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 110 | 1 | if (_status.ok()) { | 111 | 1 | _status = reason; | 112 | 1 | } | 113 | 1 | _arrow_data_arrival.notify_all(); | 114 | 1 | for (auto& ctx : _waiting_rpc) { | 115 | 1 | ctx->on_failure(reason); | 116 | 1 | } | 117 | 1 | _waiting_rpc.clear(); | 118 | 1 | _update_dependency(); | 119 | 1 | _result_batch_queue.clear(); | 120 | 1 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 107 | 2 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 108 | 2 | std::unique_lock<std::mutex> l(_lock); | 109 | 2 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 110 | 2 | if (_status.ok()) { | 111 | 2 | _status = reason; | 112 | 2 | } | 113 | 2 | _arrow_data_arrival.notify_all(); | 114 | 2 | for (auto& ctx : _waiting_rpc) { | 115 | 1 | ctx->on_failure(reason); | 116 | 1 | } | 117 | 2 | _waiting_rpc.clear(); | 118 | 2 | _update_dependency(); | 119 | 2 | _result_batch_queue.clear(); | 120 | 2 | } |
|
121 | | |
122 | | template <typename ResultCtxType> |
123 | | void ResultBlockBuffer<ResultCtxType>::set_dependency( |
124 | 9 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { |
125 | 9 | std::unique_lock<std::mutex> l(_lock); |
126 | 9 | _result_sink_dependencies[id] = result_sink_dependency; |
127 | 9 | _update_dependency(); |
128 | 9 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 124 | 5 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 125 | 5 | std::unique_lock<std::mutex> l(_lock); | 126 | 5 | _result_sink_dependencies[id] = result_sink_dependency; | 127 | 5 | _update_dependency(); | 128 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 124 | 4 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 125 | 4 | std::unique_lock<std::mutex> l(_lock); | 126 | 4 | _result_sink_dependencies[id] = result_sink_dependency; | 127 | 4 | _update_dependency(); | 128 | 4 | } |
|
129 | | |
130 | | template <typename ResultCtxType> |
131 | 36 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { |
132 | 36 | if (!_status.ok()) { |
133 | 7 | for (auto it : _result_sink_dependencies) { |
134 | 6 | it.second->set_ready(); |
135 | 6 | } |
136 | 7 | return; |
137 | 7 | } |
138 | | |
139 | 29 | for (auto it : _result_sink_dependencies) { |
140 | 26 | if (_instance_rows[it.first] > _batch_size) { |
141 | 6 | it.second->block(); |
142 | 20 | } else { |
143 | 20 | it.second->set_ready(); |
144 | 20 | } |
145 | 26 | } |
146 | 29 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 131 | 19 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 132 | 19 | if (!_status.ok()) { | 133 | 3 | for (auto it : _result_sink_dependencies) { | 134 | 3 | it.second->set_ready(); | 135 | 3 | } | 136 | 3 | return; | 137 | 3 | } | 138 | | | 139 | 16 | for (auto it : _result_sink_dependencies) { | 140 | 15 | if (_instance_rows[it.first] > _batch_size) { | 141 | 4 | it.second->block(); | 142 | 11 | } else { | 143 | 11 | it.second->set_ready(); | 144 | 11 | } | 145 | 15 | } | 146 | 16 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 131 | 17 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 132 | 17 | if (!_status.ok()) { | 133 | 4 | for (auto it : _result_sink_dependencies) { | 134 | 3 | it.second->set_ready(); | 135 | 3 | } | 136 | 4 | return; | 137 | 4 | } | 138 | | | 139 | 13 | for (auto it : _result_sink_dependencies) { | 140 | 11 | if (_instance_rows[it.first] > _batch_size) { | 141 | 2 | it.second->block(); | 142 | 9 | } else { | 143 | 9 | it.second->set_ready(); | 144 | 9 | } | 145 | 11 | } | 146 | 13 | } |
|
147 | | |
148 | | template <typename ResultCtxType> |
149 | 15 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { |
150 | 15 | std::lock_guard<std::mutex> l(_lock); |
151 | 15 | SCOPED_ATTACH_TASK(_mem_tracker); |
152 | 15 | Defer defer {[&]() { _update_dependency(); }};_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 152 | 8 | Defer defer {[&]() { _update_dependency(); }}; |
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 152 | 7 | Defer defer {[&]() { _update_dependency(); }}; |
|
153 | 15 | if (!_status.ok()) { |
154 | 2 | ctx->on_failure(_status); |
155 | 2 | return _status; |
156 | 2 | } |
157 | 13 | if (!_result_batch_queue.empty()) { |
158 | 3 | auto result = _result_batch_queue.front(); |
159 | 3 | _result_batch_queue.pop_front(); |
160 | 3 | for (auto it : _instance_rows_in_queue.front()) { |
161 | 3 | _instance_rows[it.first] -= it.second; |
162 | 3 | } |
163 | 3 | _instance_rows_in_queue.pop_front(); |
164 | 3 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
165 | 3 | _packet_num++; |
166 | 3 | return Status::OK(); |
167 | 3 | } |
168 | 10 | if (_is_close) { |
169 | 2 | if (!_status.ok()) { |
170 | 0 | ctx->on_failure(_status); |
171 | 0 | return Status::OK(); |
172 | 0 | } |
173 | 2 | ctx->on_close(_packet_num, _returned_rows); |
174 | 2 | LOG(INFO) << fmt::format( |
175 | 2 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " |
176 | 2 | "packet_num={}, peak_memory_usage={}", |
177 | 2 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, |
178 | 2 | _mem_tracker->peak_consumption()); |
179 | 2 | return Status::OK(); |
180 | 2 | } |
181 | | // no ready data, push ctx to waiting list |
182 | 8 | _waiting_rpc.push_back(ctx); |
183 | 8 | return Status::OK(); |
184 | 10 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 149 | 8 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 150 | 8 | std::lock_guard<std::mutex> l(_lock); | 151 | 8 | SCOPED_ATTACH_TASK(_mem_tracker); | 152 | 8 | Defer defer {[&]() { _update_dependency(); }}; | 153 | 8 | if (!_status.ok()) { | 154 | 1 | ctx->on_failure(_status); | 155 | 1 | return _status; | 156 | 1 | } | 157 | 7 | if (!_result_batch_queue.empty()) { | 158 | 2 | auto result = _result_batch_queue.front(); | 159 | 2 | _result_batch_queue.pop_front(); | 160 | 2 | for (auto it : _instance_rows_in_queue.front()) { | 161 | 2 | _instance_rows[it.first] -= it.second; | 162 | 2 | } | 163 | 2 | _instance_rows_in_queue.pop_front(); | 164 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 165 | 2 | _packet_num++; | 166 | 2 | return Status::OK(); | 167 | 2 | } | 168 | 5 | if (_is_close) { | 169 | 1 | if (!_status.ok()) { | 170 | 0 | ctx->on_failure(_status); | 171 | 0 | return Status::OK(); | 172 | 0 | } | 173 | 1 | ctx->on_close(_packet_num, _returned_rows); | 174 | 1 | LOG(INFO) << fmt::format( | 175 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 176 | 1 | "packet_num={}, peak_memory_usage={}", | 177 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 178 | 1 | _mem_tracker->peak_consumption()); | 179 | 1 | return Status::OK(); | 180 | 1 | } | 181 | | // no ready data, push ctx to waiting list | 182 | 4 | _waiting_rpc.push_back(ctx); | 183 | 4 | return Status::OK(); | 184 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 149 | 7 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 150 | 7 | std::lock_guard<std::mutex> l(_lock); | 151 | 7 | SCOPED_ATTACH_TASK(_mem_tracker); | 152 | 7 | Defer defer {[&]() { _update_dependency(); }}; | 153 | 7 | if (!_status.ok()) { | 154 | 1 | ctx->on_failure(_status); | 155 | 1 | return _status; | 156 | 1 | } | 157 | 6 | if (!_result_batch_queue.empty()) { | 158 | 1 | auto result = _result_batch_queue.front(); | 159 | 1 | _result_batch_queue.pop_front(); | 160 | 1 | for (auto it : _instance_rows_in_queue.front()) { | 161 | 1 | _instance_rows[it.first] -= it.second; | 162 | 1 | } | 163 | 1 | _instance_rows_in_queue.pop_front(); | 164 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 165 | 1 | _packet_num++; | 166 | 1 | return Status::OK(); | 167 | 1 | } | 168 | 5 | if (_is_close) { | 169 | 1 | if (!_status.ok()) { | 170 | 0 | ctx->on_failure(_status); | 171 | 0 | return Status::OK(); | 172 | 0 | } | 173 | 1 | ctx->on_close(_packet_num, _returned_rows); | 174 | 1 | LOG(INFO) << fmt::format( | 175 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 176 | 1 | "packet_num={}, peak_memory_usage={}", | 177 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 178 | 1 | _mem_tracker->peak_consumption()); | 179 | 1 | return Status::OK(); | 180 | 1 | } | 181 | | // no ready data, push ctx to waiting list | 182 | 4 | _waiting_rpc.push_back(ctx); | 183 | 4 | return Status::OK(); | 184 | 5 | } |
|
185 | | |
186 | | template <typename ResultCtxType> |
187 | | Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, |
188 | 11 | std::shared_ptr<InBlockType>& result) { |
189 | 11 | std::unique_lock<std::mutex> l(_lock); |
190 | | |
191 | 11 | if (!_status.ok()) { |
192 | 2 | return _status; |
193 | 2 | } |
194 | | |
195 | 9 | if (_waiting_rpc.empty()) { |
196 | 7 | auto sz = 0; |
197 | 7 | auto num_rows = 0; |
198 | 7 | size_t batch_size = 0; |
199 | 7 | if constexpr (std::is_same_v<InBlockType, Block>) { |
200 | 4 | num_rows = cast_set<int>(result->rows()); |
201 | 4 | batch_size = result->bytes(); |
202 | 4 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
203 | 3 | num_rows = cast_set<int>(result->result_batch.rows.size()); |
204 | 11 | for (const auto& row : result->result_batch.rows) { |
205 | 11 | batch_size += row.size(); |
206 | 11 | } |
207 | 3 | } |
208 | 7 | if (!_result_batch_queue.empty()) { |
209 | 3 | if constexpr (std::is_same_v<InBlockType, Block>) { |
210 | 2 | sz = cast_set<int>(_result_batch_queue.back()->rows()); |
211 | 2 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
212 | 1 | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); |
213 | 1 | } |
214 | 3 | if (sz + num_rows < _buffer_limit && |
215 | 3 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { |
216 | 3 | if constexpr (std::is_same_v<InBlockType, Block>) { |
217 | 2 | auto last_block = _result_batch_queue.back(); |
218 | 4 | for (size_t i = 0; i < last_block->columns(); i++) { |
219 | 2 | last_block->mutate_columns()[i]->insert_range_from( |
220 | 2 | *result->get_by_position(i).column, 0, num_rows); |
221 | 2 | } |
222 | 2 | } else { |
223 | 1 | std::vector<std::string>& back_rows = |
224 | 1 | _result_batch_queue.back()->result_batch.rows; |
225 | 1 | std::vector<std::string>& result_rows = result->result_batch.rows; |
226 | 1 | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), |
227 | 1 | std::make_move_iterator(result_rows.end())); |
228 | 1 | } |
229 | 3 | _last_batch_bytes += batch_size; |
230 | 3 | } else { |
231 | 0 | _instance_rows_in_queue.emplace_back(); |
232 | 0 | _result_batch_queue.push_back(std::move(result)); |
233 | 0 | _last_batch_bytes = batch_size; |
234 | 0 | _arrow_data_arrival |
235 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
236 | 0 | } |
237 | 4 | } else { |
238 | 4 | _instance_rows_in_queue.emplace_back(); |
239 | 4 | _result_batch_queue.push_back(std::move(result)); |
240 | 4 | _last_batch_bytes = batch_size; |
241 | 4 | _arrow_data_arrival |
242 | 4 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
243 | 4 | } |
244 | 7 | _instance_rows[state->fragment_instance_id()] += num_rows; |
245 | 7 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
246 | 7 | } else { |
247 | 2 | auto ctx = _waiting_rpc.front(); |
248 | 2 | _waiting_rpc.pop_front(); |
249 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
250 | 2 | _packet_num++; |
251 | 2 | } |
252 | | |
253 | 9 | _update_dependency(); |
254 | 9 | return Status::OK(); |
255 | 9 | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE Line | Count | Source | 188 | 6 | std::shared_ptr<InBlockType>& result) { | 189 | 6 | std::unique_lock<std::mutex> l(_lock); | 190 | | | 191 | 6 | if (!_status.ok()) { | 192 | 1 | return _status; | 193 | 1 | } | 194 | | | 195 | 5 | if (_waiting_rpc.empty()) { | 196 | 4 | auto sz = 0; | 197 | 4 | auto num_rows = 0; | 198 | 4 | size_t batch_size = 0; | 199 | 4 | if constexpr (std::is_same_v<InBlockType, Block>) { | 200 | 4 | num_rows = cast_set<int>(result->rows()); | 201 | 4 | batch_size = result->bytes(); | 202 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 203 | | num_rows = cast_set<int>(result->result_batch.rows.size()); | 204 | | for (const auto& row : result->result_batch.rows) { | 205 | | batch_size += row.size(); | 206 | | } | 207 | | } | 208 | 4 | if (!_result_batch_queue.empty()) { | 209 | 2 | if constexpr (std::is_same_v<InBlockType, Block>) { | 210 | 2 | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 211 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 212 | | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 213 | | } | 214 | 2 | if (sz + num_rows < _buffer_limit && | 215 | 2 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 216 | 2 | if constexpr (std::is_same_v<InBlockType, Block>) { | 217 | 2 | auto last_block = _result_batch_queue.back(); | 218 | 4 | for (size_t i = 0; i < last_block->columns(); i++) { | 219 | 2 | last_block->mutate_columns()[i]->insert_range_from( | 220 | 2 | *result->get_by_position(i).column, 0, num_rows); | 221 | 2 | } | 222 | | } else { | 223 | | std::vector<std::string>& back_rows = | 224 | | _result_batch_queue.back()->result_batch.rows; | 225 | | std::vector<std::string>& result_rows = result->result_batch.rows; | 226 | | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 227 | | std::make_move_iterator(result_rows.end())); | 228 | | } | 229 | 2 | _last_batch_bytes += batch_size; | 230 | 2 | } else { | 231 | 0 | _instance_rows_in_queue.emplace_back(); | 232 | 0 | _result_batch_queue.push_back(std::move(result)); | 233 | 0 | _last_batch_bytes = batch_size; | 234 | 0 | _arrow_data_arrival | 235 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 236 | 0 | } | 237 | 2 | } else { | 238 | 2 | _instance_rows_in_queue.emplace_back(); | 239 | 2 | _result_batch_queue.push_back(std::move(result)); | 240 | 2 | _last_batch_bytes = batch_size; | 241 | 2 | _arrow_data_arrival | 242 | 2 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 243 | 2 | } | 244 | 4 | _instance_rows[state->fragment_instance_id()] += num_rows; | 245 | 4 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 246 | 4 | } else { | 247 | 1 | auto ctx = _waiting_rpc.front(); | 248 | 1 | _waiting_rpc.pop_front(); | 249 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 250 | 1 | _packet_num++; | 251 | 1 | } | 252 | | | 253 | 5 | _update_dependency(); | 254 | 5 | return Status::OK(); | 255 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE Line | Count | Source | 188 | 5 | std::shared_ptr<InBlockType>& result) { | 189 | 5 | std::unique_lock<std::mutex> l(_lock); | 190 | | | 191 | 5 | if (!_status.ok()) { | 192 | 1 | return _status; | 193 | 1 | } | 194 | | | 195 | 4 | if (_waiting_rpc.empty()) { | 196 | 3 | auto sz = 0; | 197 | 3 | auto num_rows = 0; | 198 | 3 | size_t batch_size = 0; | 199 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 200 | | num_rows = cast_set<int>(result->rows()); | 201 | | batch_size = result->bytes(); | 202 | 3 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 203 | 3 | num_rows = cast_set<int>(result->result_batch.rows.size()); | 204 | 11 | for (const auto& row : result->result_batch.rows) { | 205 | 11 | batch_size += row.size(); | 206 | 11 | } | 207 | 3 | } | 208 | 3 | if (!_result_batch_queue.empty()) { | 209 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 210 | | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 211 | 1 | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 212 | 1 | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 213 | 1 | } | 214 | 1 | if (sz + num_rows < _buffer_limit && | 215 | 1 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 216 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 217 | | auto last_block = _result_batch_queue.back(); | 218 | | for (size_t i = 0; i < last_block->columns(); i++) { | 219 | | last_block->mutate_columns()[i]->insert_range_from( | 220 | | *result->get_by_position(i).column, 0, num_rows); | 221 | | } | 222 | 1 | } else { | 223 | 1 | std::vector<std::string>& back_rows = | 224 | 1 | _result_batch_queue.back()->result_batch.rows; | 225 | 1 | std::vector<std::string>& result_rows = result->result_batch.rows; | 226 | 1 | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 227 | 1 | std::make_move_iterator(result_rows.end())); | 228 | 1 | } | 229 | 1 | _last_batch_bytes += batch_size; | 230 | 1 | } else { | 231 | 0 | _instance_rows_in_queue.emplace_back(); | 232 | 0 | _result_batch_queue.push_back(std::move(result)); | 233 | 0 | _last_batch_bytes = batch_size; | 234 | 0 | _arrow_data_arrival | 235 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 236 | 0 | } | 237 | 2 | } else { | 238 | 2 | _instance_rows_in_queue.emplace_back(); | 239 | 2 | _result_batch_queue.push_back(std::move(result)); | 240 | 2 | _last_batch_bytes = batch_size; | 241 | 2 | _arrow_data_arrival | 242 | 2 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 243 | 2 | } | 244 | 3 | _instance_rows[state->fragment_instance_id()] += num_rows; | 245 | 3 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 246 | 3 | } else { | 247 | 1 | auto ctx = _waiting_rpc.front(); | 248 | 1 | _waiting_rpc.pop_front(); | 249 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 250 | 1 | _packet_num++; | 251 | 1 | } | 252 | | | 253 | 4 | _update_dependency(); | 254 | 4 | return Status::OK(); | 255 | 4 | } |
|
256 | | |
257 | | template class ResultBlockBuffer<GetArrowResultBatchCtx>; |
258 | | template class ResultBlockBuffer<GetResultBatchCtx>; |
259 | | |
260 | | #include "common/compile_check_end.h" |
261 | | } // namespace doris |