be/src/runtime/result_block_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_block_buffer.h" |
19 | | |
20 | | #include <gen_cpp/Data_types.h> |
21 | | #include <gen_cpp/PaloInternalService_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <glog/logging.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <chrono> // IWYU pragma: keep |
27 | | #include <limits> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "arrow/type_fwd.h" |
34 | | #include "common/config.h" |
35 | | #include "core/block/block.h" |
36 | | #include "exec/pipeline/dependency.h" |
37 | | #include "exec/sink/writer/varrow_flight_result_writer.h" |
38 | | #include "exec/sink/writer/vmysql_result_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/thread_context.h" |
41 | | #include "util/thrift_util.h" |
42 | | |
43 | | namespace doris { |
44 | | #include "common/compile_check_begin.h" |
45 | | |
46 | | template <typename ResultCtxType> |
47 | | ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state, |
48 | | int buffer_size) |
49 | 179k | : _fragment_id(std::move(id)), |
50 | 179k | _is_close(false), |
51 | 179k | _batch_size(state->batch_size()), |
52 | 179k | _timezone(state->timezone()), |
53 | 179k | _be_exec_version(state->be_exec_version()), |
54 | 179k | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), |
55 | 179k | _buffer_limit(buffer_size) { |
56 | 179k | _mem_tracker = MemTrackerLimiter::create_shared( |
57 | 179k | MemTrackerLimiter::Type::QUERY, |
58 | 179k | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); |
59 | 179k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 49 | 8 | : _fragment_id(std::move(id)), | 50 | 8 | _is_close(false), | 51 | 8 | _batch_size(state->batch_size()), | 52 | 8 | _timezone(state->timezone()), | 53 | 8 | _be_exec_version(state->be_exec_version()), | 54 | 8 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 55 | 8 | _buffer_limit(buffer_size) { | 56 | 8 | _mem_tracker = MemTrackerLimiter::create_shared( | 57 | 8 | MemTrackerLimiter::Type::QUERY, | 58 | 8 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 59 | 8 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 49 | 179k | : _fragment_id(std::move(id)), | 50 | 179k | _is_close(false), | 51 | 179k | _batch_size(state->batch_size()), | 52 | 179k | _timezone(state->timezone()), | 53 | 179k | _be_exec_version(state->be_exec_version()), | 54 | 179k | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 55 | 179k | _buffer_limit(buffer_size) { | 56 | 179k | _mem_tracker = MemTrackerLimiter::create_shared( | 57 | 179k | MemTrackerLimiter::Type::QUERY, | 58 | 179k | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 59 | 179k | } |
|
60 | | |
61 | | template <typename ResultCtxType> |
62 | | Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status, |
63 | 359k | int64_t num_rows) { |
64 | 359k | std::unique_lock<std::mutex> l(_lock); |
65 | 359k | _returned_rows.fetch_add(num_rows); |
66 | | // close will be called multiple times and error status needs to be collected. |
67 | 359k | if (!exec_status.ok()) { |
68 | 1.24k | _status = exec_status; |
69 | 1.24k | } |
70 | | |
71 | 359k | auto it = _result_sink_dependencies.find(id); |
72 | 359k | if (it != _result_sink_dependencies.end()) { |
73 | 359k | it->second->set_always_ready(); |
74 | 359k | _result_sink_dependencies.erase(it); |
75 | 18.4E | } else { |
76 | 18.4E | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", |
77 | 18.4E | print_id(id)); |
78 | 18.4E | } |
79 | 359k | if (!_result_sink_dependencies.empty()) { |
80 | 180k | return _status; |
81 | 180k | } |
82 | | |
83 | 179k | _is_close = true; |
84 | 179k | _arrow_data_arrival.notify_all(); |
85 | | |
86 | 179k | if (!_waiting_rpc.empty()) { |
87 | 102k | if (_status.ok()) { |
88 | 101k | for (auto& ctx : _waiting_rpc) { |
89 | 101k | ctx->on_close(_packet_num, _returned_rows); |
90 | 101k | } |
91 | 101k | } else { |
92 | 1.23k | for (auto& ctx : _waiting_rpc) { |
93 | 1.20k | ctx->on_failure(_status); |
94 | 1.20k | } |
95 | 1.23k | } |
96 | 102k | _waiting_rpc.clear(); |
97 | 102k | } |
98 | | |
99 | 179k | return _status; |
100 | 359k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusEl Line | Count | Source | 63 | 6 | int64_t num_rows) { | 64 | 6 | std::unique_lock<std::mutex> l(_lock); | 65 | 6 | _returned_rows.fetch_add(num_rows); | 66 | | // close will be called multiple times and error status needs to be collected. | 67 | 6 | if (!exec_status.ok()) { | 68 | 2 | _status = exec_status; | 69 | 2 | } | 70 | | | 71 | 6 | auto it = _result_sink_dependencies.find(id); | 72 | 6 | if (it != _result_sink_dependencies.end()) { | 73 | 5 | it->second->set_always_ready(); | 74 | 5 | _result_sink_dependencies.erase(it); | 75 | 5 | } else { | 76 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 77 | 1 | print_id(id)); | 78 | 1 | } | 79 | 6 | if (!_result_sink_dependencies.empty()) { | 80 | 1 | return _status; | 81 | 1 | } | 82 | | | 83 | 5 | _is_close = true; | 84 | 5 | _arrow_data_arrival.notify_all(); | 85 | | | 86 | 5 | if (!_waiting_rpc.empty()) { | 87 | 2 | if (_status.ok()) { | 88 | 1 | for (auto& ctx : _waiting_rpc) { | 89 | 1 | ctx->on_close(_packet_num, _returned_rows); | 90 | 1 | } | 91 | 1 | } else { | 92 | 1 | for (auto& ctx : _waiting_rpc) { | 93 | 1 | ctx->on_failure(_status); | 94 | 1 | } | 95 | 1 | } | 96 | 2 | _waiting_rpc.clear(); | 97 | 2 | } | 98 | | | 99 | 5 | return _status; | 100 | 6 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusEl Line | Count | Source | 63 | 359k | int64_t num_rows) { | 64 | 359k | std::unique_lock<std::mutex> l(_lock); | 65 | 359k | _returned_rows.fetch_add(num_rows); | 66 | | // close will be called multiple times and error status needs to be collected. | 67 | 359k | if (!exec_status.ok()) { | 68 | 1.24k | _status = exec_status; | 69 | 1.24k | } | 70 | | | 71 | 359k | auto it = _result_sink_dependencies.find(id); | 72 | 359k | if (it != _result_sink_dependencies.end()) { | 73 | 359k | it->second->set_always_ready(); | 74 | 359k | _result_sink_dependencies.erase(it); | 75 | 18.4E | } else { | 76 | 18.4E | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 77 | 18.4E | print_id(id)); | 78 | 18.4E | } | 79 | 359k | if (!_result_sink_dependencies.empty()) { | 80 | 180k | return _status; | 81 | 180k | } | 82 | | | 83 | 179k | _is_close = true; | 84 | 179k | _arrow_data_arrival.notify_all(); | 85 | | | 86 | 179k | if (!_waiting_rpc.empty()) { | 87 | 102k | if (_status.ok()) { | 88 | 101k | for (auto& ctx : _waiting_rpc) { | 89 | 101k | ctx->on_close(_packet_num, _returned_rows); | 90 | 101k | } | 91 | 101k | } else { | 92 | 1.23k | for (auto& ctx : _waiting_rpc) { | 93 | 1.20k | ctx->on_failure(_status); | 94 | 1.20k | } | 95 | 1.23k | } | 96 | 102k | _waiting_rpc.clear(); | 97 | 102k | } | 98 | | | 99 | 179k | return _status; | 100 | 359k | } |
|
101 | | |
102 | | template <typename ResultCtxType> |
103 | 44.7k | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { |
104 | 44.7k | std::unique_lock<std::mutex> l(_lock); |
105 | 44.7k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); |
106 | 44.7k | if (_status.ok()) { |
107 | 44.4k | _status = reason; |
108 | 44.4k | } |
109 | 44.7k | _arrow_data_arrival.notify_all(); |
110 | 44.7k | for (auto& ctx : _waiting_rpc) { |
111 | 2 | ctx->on_failure(reason); |
112 | 2 | } |
113 | 44.7k | _waiting_rpc.clear(); |
114 | 44.7k | _update_dependency(); |
115 | 44.7k | _result_batch_queue.clear(); |
116 | 44.7k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 103 | 4 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 104 | 4 | std::unique_lock<std::mutex> l(_lock); | 105 | 4 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 106 | 4 | if (_status.ok()) { | 107 | 4 | _status = reason; | 108 | 4 | } | 109 | 4 | _arrow_data_arrival.notify_all(); | 110 | 4 | for (auto& ctx : _waiting_rpc) { | 111 | 1 | ctx->on_failure(reason); | 112 | 1 | } | 113 | 4 | _waiting_rpc.clear(); | 114 | 4 | _update_dependency(); | 115 | 4 | _result_batch_queue.clear(); | 116 | 4 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 103 | 44.7k | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 104 | 44.7k | std::unique_lock<std::mutex> l(_lock); | 105 | 44.7k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 106 | 44.7k | if (_status.ok()) { | 107 | 44.4k | _status = reason; | 108 | 44.4k | } | 109 | 44.7k | _arrow_data_arrival.notify_all(); | 110 | 44.7k | for (auto& ctx : _waiting_rpc) { | 111 | 1 | ctx->on_failure(reason); | 112 | 1 | } | 113 | 44.7k | _waiting_rpc.clear(); | 114 | 44.7k | _update_dependency(); | 115 | 44.7k | _result_batch_queue.clear(); | 116 | 44.7k | } |
|
117 | | |
118 | | template <typename ResultCtxType> |
119 | | void ResultBlockBuffer<ResultCtxType>::set_dependency( |
120 | 357k | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { |
121 | 357k | std::unique_lock<std::mutex> l(_lock); |
122 | 357k | _result_sink_dependencies[id] = result_sink_dependency; |
123 | 357k | _update_dependency(); |
124 | 357k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 120 | 8 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 121 | 8 | std::unique_lock<std::mutex> l(_lock); | 122 | 8 | _result_sink_dependencies[id] = result_sink_dependency; | 123 | 8 | _update_dependency(); | 124 | 8 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 120 | 357k | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 121 | 357k | std::unique_lock<std::mutex> l(_lock); | 122 | 357k | _result_sink_dependencies[id] = result_sink_dependency; | 123 | 357k | _update_dependency(); | 124 | 357k | } |
|
125 | | |
126 | | template <typename ResultCtxType> |
127 | 815k | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { |
128 | 815k | if (!_status.ok()) { |
129 | 44.7k | for (auto it : _result_sink_dependencies) { |
130 | 6 | it.second->set_ready(); |
131 | 6 | } |
132 | 44.7k | return; |
133 | 44.7k | } |
134 | | |
135 | 2.00M | for (auto it : _result_sink_dependencies) { |
136 | 2.00M | if (_instance_rows[it.first] > _batch_size) { |
137 | 8 | it.second->block(); |
138 | 2.00M | } else { |
139 | 2.00M | it.second->set_ready(); |
140 | 2.00M | } |
141 | 2.00M | } |
142 | 771k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 127 | 36 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 128 | 36 | if (!_status.ok()) { | 129 | 6 | for (auto it : _result_sink_dependencies) { | 130 | 3 | it.second->set_ready(); | 131 | 3 | } | 132 | 6 | return; | 133 | 6 | } | 134 | | | 135 | 30 | for (auto it : _result_sink_dependencies) { | 136 | 24 | if (_instance_rows[it.first] > _batch_size) { | 137 | 4 | it.second->block(); | 138 | 20 | } else { | 139 | 20 | it.second->set_ready(); | 140 | 20 | } | 141 | 24 | } | 142 | 30 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 127 | 815k | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 128 | 815k | if (!_status.ok()) { | 129 | 44.7k | for (auto it : _result_sink_dependencies) { | 130 | 3 | it.second->set_ready(); | 131 | 3 | } | 132 | 44.7k | return; | 133 | 44.7k | } | 134 | | | 135 | 2.00M | for (auto it : _result_sink_dependencies) { | 136 | 2.00M | if (_instance_rows[it.first] > _batch_size) { | 137 | 4 | it.second->block(); | 138 | 2.00M | } else { | 139 | 2.00M | it.second->set_ready(); | 140 | 2.00M | } | 141 | 2.00M | } | 142 | 771k | } |
|
143 | | |
144 | | template <typename ResultCtxType> |
145 | 295k | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { |
146 | 295k | std::lock_guard<std::mutex> l(_lock); |
147 | 295k | SCOPED_ATTACH_TASK(_mem_tracker); |
148 | 295k | Defer defer {[&]() { _update_dependency(); }};_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 148 | 8 | Defer defer {[&]() { _update_dependency(); }}; |
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 148 | 295k | Defer defer {[&]() { _update_dependency(); }}; |
|
149 | 295k | if (!_status.ok()) { |
150 | 11 | ctx->on_failure(_status); |
151 | 11 | return _status; |
152 | 11 | } |
153 | 295k | if (!_result_batch_queue.empty()) { |
154 | 6.59k | auto result = _result_batch_queue.front(); |
155 | 6.59k | _result_batch_queue.pop_front(); |
156 | 6.69k | for (auto it : _instance_rows_in_queue.front()) { |
157 | 6.69k | _instance_rows[it.first] -= it.second; |
158 | 6.69k | } |
159 | 6.59k | _instance_rows_in_queue.pop_front(); |
160 | 6.59k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
161 | 6.59k | _packet_num++; |
162 | 6.59k | return Status::OK(); |
163 | 6.59k | } |
164 | 288k | if (_is_close) { |
165 | 76.3k | if (!_status.ok()) { |
166 | 0 | ctx->on_failure(_status); |
167 | 0 | return Status::OK(); |
168 | 0 | } |
169 | 76.3k | ctx->on_close(_packet_num, _returned_rows); |
170 | 76.3k | LOG(INFO) << fmt::format( |
171 | 76.3k | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " |
172 | 76.3k | "packet_num={}, peak_memory_usage={}", |
173 | 76.3k | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, |
174 | 76.3k | _mem_tracker->peak_consumption()); |
175 | 76.3k | return Status::OK(); |
176 | 76.3k | } |
177 | | // no ready data, push ctx to waiting list |
178 | 212k | _waiting_rpc.push_back(ctx); |
179 | 212k | return Status::OK(); |
180 | 288k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 145 | 8 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 146 | 8 | std::lock_guard<std::mutex> l(_lock); | 147 | 8 | SCOPED_ATTACH_TASK(_mem_tracker); | 148 | 8 | Defer defer {[&]() { _update_dependency(); }}; | 149 | 8 | if (!_status.ok()) { | 150 | 1 | ctx->on_failure(_status); | 151 | 1 | return _status; | 152 | 1 | } | 153 | 7 | if (!_result_batch_queue.empty()) { | 154 | 2 | auto result = _result_batch_queue.front(); | 155 | 2 | _result_batch_queue.pop_front(); | 156 | 2 | for (auto it : _instance_rows_in_queue.front()) { | 157 | 2 | _instance_rows[it.first] -= it.second; | 158 | 2 | } | 159 | 2 | _instance_rows_in_queue.pop_front(); | 160 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 161 | 2 | _packet_num++; | 162 | 2 | return Status::OK(); | 163 | 2 | } | 164 | 5 | if (_is_close) { | 165 | 1 | if (!_status.ok()) { | 166 | 0 | ctx->on_failure(_status); | 167 | 0 | return Status::OK(); | 168 | 0 | } | 169 | 1 | ctx->on_close(_packet_num, _returned_rows); | 170 | 1 | LOG(INFO) << fmt::format( | 171 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 172 | 1 | "packet_num={}, peak_memory_usage={}", | 173 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 174 | 1 | _mem_tracker->peak_consumption()); | 175 | 1 | return Status::OK(); | 176 | 1 | } | 177 | | // no ready data, push ctx to waiting list | 178 | 4 | _waiting_rpc.push_back(ctx); | 179 | 4 | return Status::OK(); | 180 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 145 | 295k | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 146 | 295k | std::lock_guard<std::mutex> l(_lock); | 147 | 295k | SCOPED_ATTACH_TASK(_mem_tracker); | 148 | 295k | Defer defer {[&]() { _update_dependency(); }}; | 149 | 295k | if (!_status.ok()) { | 150 | 10 | ctx->on_failure(_status); | 151 | 10 | return _status; | 152 | 10 | } | 153 | 295k | if (!_result_batch_queue.empty()) { | 154 | 6.58k | auto result = _result_batch_queue.front(); | 155 | 6.58k | _result_batch_queue.pop_front(); | 156 | 6.69k | for (auto it : _instance_rows_in_queue.front()) { | 157 | 6.69k | _instance_rows[it.first] -= it.second; | 158 | 6.69k | } | 159 | 6.58k | _instance_rows_in_queue.pop_front(); | 160 | 6.58k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 161 | 6.58k | _packet_num++; | 162 | 6.58k | return Status::OK(); | 163 | 6.58k | } | 164 | 288k | if (_is_close) { | 165 | 76.3k | if (!_status.ok()) { | 166 | 0 | ctx->on_failure(_status); | 167 | 0 | return Status::OK(); | 168 | 0 | } | 169 | 76.3k | ctx->on_close(_packet_num, _returned_rows); | 170 | 76.3k | LOG(INFO) << fmt::format( | 171 | 76.3k | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 172 | 76.3k | "packet_num={}, peak_memory_usage={}", | 173 | 76.3k | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 174 | 76.3k | _mem_tracker->peak_consumption()); | 175 | 76.3k | return Status::OK(); | 176 | 76.3k | } | 177 | | // no ready data, push ctx to waiting list | 178 | 212k | _waiting_rpc.push_back(ctx); | 179 | 212k | return Status::OK(); | 180 | 288k | } |
|
181 | | |
182 | | template <typename ResultCtxType> |
183 | | Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, |
184 | 117k | std::shared_ptr<InBlockType>& result) { |
185 | 117k | std::unique_lock<std::mutex> l(_lock); |
186 | | |
187 | 117k | if (!_status.ok()) { |
188 | 2 | return _status; |
189 | 2 | } |
190 | | |
191 | 117k | if (_waiting_rpc.empty()) { |
192 | 7.95k | auto sz = 0; |
193 | 7.95k | auto num_rows = 0; |
194 | 7.95k | size_t batch_size = 0; |
195 | 7.95k | if constexpr (std::is_same_v<InBlockType, Block>) { |
196 | 8 | num_rows = cast_set<int>(result->rows()); |
197 | 8 | batch_size = result->bytes(); |
198 | 7.94k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
199 | 7.94k | num_rows = cast_set<int>(result->result_batch.rows.size()); |
200 | 13.6k | for (const auto& row : result->result_batch.rows) { |
201 | 13.6k | batch_size += row.size(); |
202 | 13.6k | } |
203 | 7.94k | } |
204 | 7.95k | if (!_result_batch_queue.empty()) { |
205 | 1.11k | if constexpr (std::is_same_v<InBlockType, Block>) { |
206 | 2 | sz = cast_set<int>(_result_batch_queue.back()->rows()); |
207 | 1.11k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
208 | 1.11k | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); |
209 | 1.11k | } |
210 | 1.11k | if (sz + num_rows < _buffer_limit && |
211 | 1.11k | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { |
212 | 1.11k | if constexpr (std::is_same_v<InBlockType, Block>) { |
213 | 2 | auto last_block = _result_batch_queue.back(); |
214 | 4 | for (size_t i = 0; i < last_block->columns(); i++) { |
215 | 2 | last_block->mutate_columns()[i]->insert_range_from( |
216 | 2 | *result->get_by_position(i).column, 0, num_rows); |
217 | 2 | } |
218 | 1.11k | } else { |
219 | 1.11k | std::vector<std::string>& back_rows = |
220 | 1.11k | _result_batch_queue.back()->result_batch.rows; |
221 | 1.11k | std::vector<std::string>& result_rows = result->result_batch.rows; |
222 | 1.11k | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), |
223 | 1.11k | std::make_move_iterator(result_rows.end())); |
224 | 1.11k | } |
225 | 1.11k | _last_batch_bytes += batch_size; |
226 | 1.11k | } else { |
227 | 0 | _instance_rows_in_queue.emplace_back(); |
228 | 0 | _result_batch_queue.push_back(std::move(result)); |
229 | 0 | _last_batch_bytes = batch_size; |
230 | 0 | _arrow_data_arrival |
231 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
232 | 0 | } |
233 | 6.84k | } else { |
234 | 6.84k | _instance_rows_in_queue.emplace_back(); |
235 | 6.84k | _result_batch_queue.push_back(std::move(result)); |
236 | 6.84k | _last_batch_bytes = batch_size; |
237 | 6.84k | _arrow_data_arrival |
238 | 6.84k | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
239 | 6.84k | } |
240 | 7.95k | _instance_rows[state->fragment_instance_id()] += num_rows; |
241 | 7.95k | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
242 | 109k | } else { |
243 | 109k | auto ctx = _waiting_rpc.front(); |
244 | 109k | _waiting_rpc.pop_front(); |
245 | 109k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
246 | 109k | _packet_num++; |
247 | 109k | } |
248 | | |
249 | 117k | _update_dependency(); |
250 | 117k | return Status::OK(); |
251 | 117k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE Line | Count | Source | 184 | 10 | std::shared_ptr<InBlockType>& result) { | 185 | 10 | std::unique_lock<std::mutex> l(_lock); | 186 | | | 187 | 10 | if (!_status.ok()) { | 188 | 1 | return _status; | 189 | 1 | } | 190 | | | 191 | 9 | if (_waiting_rpc.empty()) { | 192 | 8 | auto sz = 0; | 193 | 8 | auto num_rows = 0; | 194 | 8 | size_t batch_size = 0; | 195 | 8 | if constexpr (std::is_same_v<InBlockType, Block>) { | 196 | 8 | num_rows = cast_set<int>(result->rows()); | 197 | 8 | batch_size = result->bytes(); | 198 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 199 | | num_rows = cast_set<int>(result->result_batch.rows.size()); | 200 | | for (const auto& row : result->result_batch.rows) { | 201 | | batch_size += row.size(); | 202 | | } | 203 | | } | 204 | 8 | if (!_result_batch_queue.empty()) { | 205 | 2 | if constexpr (std::is_same_v<InBlockType, Block>) { | 206 | 2 | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 207 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 208 | | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 209 | | } | 210 | 2 | if (sz + num_rows < _buffer_limit && | 211 | 2 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 212 | 2 | if constexpr (std::is_same_v<InBlockType, Block>) { | 213 | 2 | auto last_block = _result_batch_queue.back(); | 214 | 4 | for (size_t i = 0; i < last_block->columns(); i++) { | 215 | 2 | last_block->mutate_columns()[i]->insert_range_from( | 216 | 2 | *result->get_by_position(i).column, 0, num_rows); | 217 | 2 | } | 218 | | } else { | 219 | | std::vector<std::string>& back_rows = | 220 | | _result_batch_queue.back()->result_batch.rows; | 221 | | std::vector<std::string>& result_rows = result->result_batch.rows; | 222 | | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 223 | | std::make_move_iterator(result_rows.end())); | 224 | | } | 225 | 2 | _last_batch_bytes += batch_size; | 226 | 2 | } else { | 227 | 0 | _instance_rows_in_queue.emplace_back(); | 228 | 0 | _result_batch_queue.push_back(std::move(result)); | 229 | 0 | _last_batch_bytes = batch_size; | 230 | 0 | _arrow_data_arrival | 231 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 232 | 0 | } | 233 | 6 | } else { | 234 | 6 | _instance_rows_in_queue.emplace_back(); | 235 | 6 | _result_batch_queue.push_back(std::move(result)); | 236 | 6 | _last_batch_bytes = batch_size; | 237 | 6 | _arrow_data_arrival | 238 | 6 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 239 | 6 | } | 240 | 8 | _instance_rows[state->fragment_instance_id()] += num_rows; | 241 | 8 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 242 | 8 | } else { | 243 | 1 | auto ctx = _waiting_rpc.front(); | 244 | 1 | _waiting_rpc.pop_front(); | 245 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 246 | 1 | _packet_num++; | 247 | 1 | } | 248 | | | 249 | 9 | _update_dependency(); | 250 | 9 | return Status::OK(); | 251 | 9 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE Line | Count | Source | 184 | 117k | std::shared_ptr<InBlockType>& result) { | 185 | 117k | std::unique_lock<std::mutex> l(_lock); | 186 | | | 187 | 117k | if (!_status.ok()) { | 188 | 1 | return _status; | 189 | 1 | } | 190 | | | 191 | 117k | if (_waiting_rpc.empty()) { | 192 | 7.94k | auto sz = 0; | 193 | 7.94k | auto num_rows = 0; | 194 | 7.94k | size_t batch_size = 0; | 195 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 196 | | num_rows = cast_set<int>(result->rows()); | 197 | | batch_size = result->bytes(); | 198 | 7.94k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 199 | 7.94k | num_rows = cast_set<int>(result->result_batch.rows.size()); | 200 | 13.6k | for (const auto& row : result->result_batch.rows) { | 201 | 13.6k | batch_size += row.size(); | 202 | 13.6k | } | 203 | 7.94k | } | 204 | 7.94k | if (!_result_batch_queue.empty()) { | 205 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 206 | | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 207 | 1.11k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 208 | 1.11k | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 209 | 1.11k | } | 210 | 1.11k | if (sz + num_rows < _buffer_limit && | 211 | 1.11k | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 212 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 213 | | auto last_block = _result_batch_queue.back(); | 214 | | for (size_t i = 0; i < last_block->columns(); i++) { | 215 | | last_block->mutate_columns()[i]->insert_range_from( | 216 | | *result->get_by_position(i).column, 0, num_rows); | 217 | | } | 218 | 1.11k | } else { | 219 | 1.11k | std::vector<std::string>& back_rows = | 220 | 1.11k | _result_batch_queue.back()->result_batch.rows; | 221 | 1.11k | std::vector<std::string>& result_rows = result->result_batch.rows; | 222 | 1.11k | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 223 | 1.11k | std::make_move_iterator(result_rows.end())); | 224 | 1.11k | } | 225 | 1.11k | _last_batch_bytes += batch_size; | 226 | 1.11k | } else { | 227 | 0 | _instance_rows_in_queue.emplace_back(); | 228 | 0 | _result_batch_queue.push_back(std::move(result)); | 229 | 0 | _last_batch_bytes = batch_size; | 230 | 0 | _arrow_data_arrival | 231 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 232 | 0 | } | 233 | 6.83k | } else { | 234 | 6.83k | _instance_rows_in_queue.emplace_back(); | 235 | 6.83k | _result_batch_queue.push_back(std::move(result)); | 236 | 6.83k | _last_batch_bytes = batch_size; | 237 | 6.83k | _arrow_data_arrival | 238 | 6.83k | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 239 | 6.83k | } | 240 | 7.94k | _instance_rows[state->fragment_instance_id()] += num_rows; | 241 | 7.94k | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 242 | 109k | } else { | 243 | 109k | auto ctx = _waiting_rpc.front(); | 244 | 109k | _waiting_rpc.pop_front(); | 245 | 109k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 246 | 109k | _packet_num++; | 247 | 109k | } | 248 | | | 249 | 117k | _update_dependency(); | 250 | 117k | return Status::OK(); | 251 | 117k | } |
|
252 | | |
253 | | template class ResultBlockBuffer<GetArrowResultBatchCtx>; |
254 | | template class ResultBlockBuffer<GetResultBatchCtx>; |
255 | | |
256 | | #include "common/compile_check_end.h" |
257 | | } // namespace doris |