be/src/runtime/result_block_buffer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/result_block_buffer.h" |
19 | | |
20 | | #include <gen_cpp/Data_types.h> |
21 | | #include <gen_cpp/PaloInternalService_types.h> |
22 | | #include <gen_cpp/internal_service.pb.h> |
23 | | #include <glog/logging.h> |
24 | | #include <google/protobuf/stubs/callback.h> |
25 | | // IWYU pragma: no_include <bits/chrono.h> |
26 | | #include <chrono> // IWYU pragma: keep |
27 | | #include <limits> |
28 | | #include <ostream> |
29 | | #include <string> |
30 | | #include <utility> |
31 | | #include <vector> |
32 | | |
33 | | #include "arrow/type_fwd.h" |
34 | | #include "common/config.h" |
35 | | #include "core/block/block.h" |
36 | | #include "exec/pipeline/dependency.h" |
37 | | #include "exec/sink/writer/varrow_flight_result_writer.h" |
38 | | #include "exec/sink/writer/vmysql_result_writer.h" |
39 | | #include "runtime/runtime_profile.h" |
40 | | #include "runtime/thread_context.h" |
41 | | #include "util/thrift_util.h" |
42 | | |
43 | | namespace doris { |
44 | | #include "common/compile_check_begin.h" |
45 | | |
46 | | template <typename ResultCtxType> |
47 | | ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId id, RuntimeState* state, |
48 | | int buffer_size) |
49 | 257k | : _fragment_id(std::move(id)), |
50 | 257k | _is_close(false), |
51 | 257k | _batch_size(state->batch_size()), |
52 | 257k | _timezone(state->timezone()), |
53 | 257k | _be_exec_version(state->be_exec_version()), |
54 | 257k | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), |
55 | 257k | _buffer_limit(buffer_size) { |
56 | 257k | _mem_tracker = MemTrackerLimiter::create_shared( |
57 | 257k | MemTrackerLimiter::Type::QUERY, |
58 | 257k | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); |
59 | 257k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 49 | 106 | : _fragment_id(std::move(id)), | 50 | 106 | _is_close(false), | 51 | 106 | _batch_size(state->batch_size()), | 52 | 106 | _timezone(state->timezone()), | 53 | 106 | _be_exec_version(state->be_exec_version()), | 54 | 106 | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 55 | 106 | _buffer_limit(buffer_size) { | 56 | 106 | _mem_tracker = MemTrackerLimiter::create_shared( | 57 | 106 | MemTrackerLimiter::Type::QUERY, | 58 | 106 | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 59 | 106 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEEC2ENS_9TUniqueIdEPNS_12RuntimeStateEi Line | Count | Source | 49 | 257k | : _fragment_id(std::move(id)), | 50 | 257k | _is_close(false), | 51 | 257k | _batch_size(state->batch_size()), | 52 | 257k | _timezone(state->timezone()), | 53 | 257k | _be_exec_version(state->be_exec_version()), | 54 | 257k | _fragment_transmission_compression_type(state->fragement_transmission_compression_type()), | 55 | 257k | _buffer_limit(buffer_size) { | 56 | 257k | _mem_tracker = MemTrackerLimiter::create_shared( | 57 | 257k | MemTrackerLimiter::Type::QUERY, | 58 | 257k | fmt::format("ResultBlockBuffer#FragmentInstanceId={}", print_id(_fragment_id))); | 59 | 257k | } |
|
60 | | |
61 | | template <typename ResultCtxType> |
62 | | Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status exec_status, |
63 | 455k | int64_t num_rows) { |
64 | 455k | std::unique_lock<std::mutex> l(_lock); |
65 | 455k | _returned_rows.fetch_add(num_rows); |
66 | | // close will be called multiple times and error status needs to be collected. |
67 | 455k | if (!exec_status.ok()) { |
68 | 1.31k | _status = exec_status; |
69 | 1.31k | } |
70 | | |
71 | 455k | auto it = _result_sink_dependencies.find(id); |
72 | 455k | if (it != _result_sink_dependencies.end()) { |
73 | 455k | it->second->set_always_ready(); |
74 | 455k | _result_sink_dependencies.erase(it); |
75 | 18.4E | } else { |
76 | 18.4E | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", |
77 | 18.4E | print_id(id)); |
78 | 18.4E | } |
79 | 455k | if (!_result_sink_dependencies.empty()) { |
80 | 197k | return _status; |
81 | 197k | } |
82 | | |
83 | 258k | _is_close = true; |
84 | 258k | _arrow_data_arrival.notify_all(); |
85 | | |
86 | 258k | if (!_waiting_rpc.empty()) { |
87 | 147k | if (_status.ok()) { |
88 | 146k | for (auto& ctx : _waiting_rpc) { |
89 | 146k | ctx->on_close(_packet_num, _returned_rows); |
90 | 146k | } |
91 | 146k | } else { |
92 | 1.26k | for (auto& ctx : _waiting_rpc) { |
93 | 1.25k | ctx->on_failure(_status); |
94 | 1.25k | } |
95 | 1.26k | } |
96 | 147k | _waiting_rpc.clear(); |
97 | 147k | } |
98 | | |
99 | 258k | return _status; |
100 | 455k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusEl Line | Count | Source | 63 | 104 | int64_t num_rows) { | 64 | 104 | std::unique_lock<std::mutex> l(_lock); | 65 | 104 | _returned_rows.fetch_add(num_rows); | 66 | | // close will be called multiple times and error status needs to be collected. | 67 | 104 | if (!exec_status.ok()) { | 68 | 2 | _status = exec_status; | 69 | 2 | } | 70 | | | 71 | 104 | auto it = _result_sink_dependencies.find(id); | 72 | 104 | if (it != _result_sink_dependencies.end()) { | 73 | 103 | it->second->set_always_ready(); | 74 | 103 | _result_sink_dependencies.erase(it); | 75 | 103 | } else { | 76 | 1 | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 77 | 1 | print_id(id)); | 78 | 1 | } | 79 | 104 | if (!_result_sink_dependencies.empty()) { | 80 | 1 | return _status; | 81 | 1 | } | 82 | | | 83 | 103 | _is_close = true; | 84 | 103 | _arrow_data_arrival.notify_all(); | 85 | | | 86 | 103 | if (!_waiting_rpc.empty()) { | 87 | 2 | if (_status.ok()) { | 88 | 1 | for (auto& ctx : _waiting_rpc) { | 89 | 1 | ctx->on_close(_packet_num, _returned_rows); | 90 | 1 | } | 91 | 1 | } else { | 92 | 1 | for (auto& ctx : _waiting_rpc) { | 93 | 1 | ctx->on_failure(_status); | 94 | 1 | } | 95 | 1 | } | 96 | 2 | _waiting_rpc.clear(); | 97 | 2 | } | 98 | | | 99 | 103 | return _status; | 100 | 104 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE5closeERKNS_9TUniqueIdENS_6StatusEl Line | Count | Source | 63 | 455k | int64_t num_rows) { | 64 | 455k | std::unique_lock<std::mutex> l(_lock); | 65 | 455k | _returned_rows.fetch_add(num_rows); | 66 | | // close will be called multiple times and error status needs to be collected. | 67 | 455k | if (!exec_status.ok()) { | 68 | 1.31k | _status = exec_status; | 69 | 1.31k | } | 70 | | | 71 | 455k | auto it = _result_sink_dependencies.find(id); | 72 | 455k | if (it != _result_sink_dependencies.end()) { | 73 | 455k | it->second->set_always_ready(); | 74 | 455k | _result_sink_dependencies.erase(it); | 75 | 18.4E | } else { | 76 | 18.4E | _status = Status::InternalError("Instance {} is not found in ResultBlockBuffer", | 77 | 18.4E | print_id(id)); | 78 | 18.4E | } | 79 | 455k | if (!_result_sink_dependencies.empty()) { | 80 | 197k | return _status; | 81 | 197k | } | 82 | | | 83 | 258k | _is_close = true; | 84 | 258k | _arrow_data_arrival.notify_all(); | 85 | | | 86 | 258k | if (!_waiting_rpc.empty()) { | 87 | 147k | if (_status.ok()) { | 88 | 146k | for (auto& ctx : _waiting_rpc) { | 89 | 146k | ctx->on_close(_packet_num, _returned_rows); | 90 | 146k | } | 91 | 146k | } else { | 92 | 1.26k | for (auto& ctx : _waiting_rpc) { | 93 | 1.25k | ctx->on_failure(_status); | 94 | 1.25k | } | 95 | 1.26k | } | 96 | 147k | _waiting_rpc.clear(); | 97 | 147k | } | 98 | | | 99 | 258k | return _status; | 100 | 455k | } |
|
101 | | |
102 | | template <typename ResultCtxType> |
103 | 63.3k | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { |
104 | 63.3k | std::unique_lock<std::mutex> l(_lock); |
105 | 63.3k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); |
106 | 63.3k | if (_status.ok()) { |
107 | 63.2k | _status = reason; |
108 | 63.2k | } |
109 | 63.3k | _arrow_data_arrival.notify_all(); |
110 | 63.3k | for (auto& ctx : _waiting_rpc) { |
111 | 2 | ctx->on_failure(reason); |
112 | 2 | } |
113 | 63.3k | _waiting_rpc.clear(); |
114 | 63.3k | _update_dependency(); |
115 | 63.3k | _result_batch_queue.clear(); |
116 | 63.3k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 103 | 102 | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 104 | 102 | std::unique_lock<std::mutex> l(_lock); | 105 | 102 | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 106 | 102 | if (_status.ok()) { | 107 | 102 | _status = reason; | 108 | 102 | } | 109 | 102 | _arrow_data_arrival.notify_all(); | 110 | 102 | for (auto& ctx : _waiting_rpc) { | 111 | 1 | ctx->on_failure(reason); | 112 | 1 | } | 113 | 102 | _waiting_rpc.clear(); | 114 | 102 | _update_dependency(); | 115 | 102 | _result_batch_queue.clear(); | 116 | 102 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE6cancelERKNS_6StatusE Line | Count | Source | 103 | 63.2k | void ResultBlockBuffer<ResultCtxType>::cancel(const Status& reason) { | 104 | 63.2k | std::unique_lock<std::mutex> l(_lock); | 105 | 63.2k | SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker); | 106 | 63.2k | if (_status.ok()) { | 107 | 63.1k | _status = reason; | 108 | 63.1k | } | 109 | 63.2k | _arrow_data_arrival.notify_all(); | 110 | 63.2k | for (auto& ctx : _waiting_rpc) { | 111 | 1 | ctx->on_failure(reason); | 112 | 1 | } | 113 | 63.2k | _waiting_rpc.clear(); | 114 | 63.2k | _update_dependency(); | 115 | 63.2k | _result_batch_queue.clear(); | 116 | 63.2k | } |
|
117 | | |
118 | | template <typename ResultCtxType> |
119 | | void ResultBlockBuffer<ResultCtxType>::set_dependency( |
120 | 450k | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { |
121 | 450k | std::unique_lock<std::mutex> l(_lock); |
122 | 450k | _result_sink_dependencies[id] = result_sink_dependency; |
123 | 450k | _update_dependency(); |
124 | 450k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 120 | 106 | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 121 | 106 | std::unique_lock<std::mutex> l(_lock); | 122 | 106 | _result_sink_dependencies[id] = result_sink_dependency; | 123 | 106 | _update_dependency(); | 124 | 106 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE14set_dependencyERKNS_9TUniqueIdESt10shared_ptrINS_10DependencyEE Line | Count | Source | 120 | 450k | const TUniqueId& id, std::shared_ptr<Dependency> result_sink_dependency) { | 121 | 450k | std::unique_lock<std::mutex> l(_lock); | 122 | 450k | _result_sink_dependencies[id] = result_sink_dependency; | 123 | 450k | _update_dependency(); | 124 | 450k | } |
|
125 | | |
126 | | template <typename ResultCtxType> |
127 | 1.10M | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { |
128 | 1.10M | if (!_status.ok()) { |
129 | 63.4k | for (auto it : _result_sink_dependencies) { |
130 | 6 | it.second->set_ready(); |
131 | 6 | } |
132 | 63.4k | return; |
133 | 63.4k | } |
134 | | |
135 | 2.50M | for (auto it : _result_sink_dependencies) { |
136 | 2.50M | if (_instance_rows[it.first] > _batch_size) { |
137 | 6 | it.second->block(); |
138 | 2.50M | } else { |
139 | 2.50M | it.second->set_ready(); |
140 | 2.50M | } |
141 | 2.50M | } |
142 | 1.04M | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 127 | 540 | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 128 | 540 | if (!_status.ok()) { | 129 | 104 | for (auto it : _result_sink_dependencies) { | 130 | 3 | it.second->set_ready(); | 131 | 3 | } | 132 | 104 | return; | 133 | 104 | } | 134 | | | 135 | 436 | for (auto it : _result_sink_dependencies) { | 136 | 248 | if (_instance_rows[it.first] > _batch_size) { | 137 | 4 | it.second->block(); | 138 | 244 | } else { | 139 | 244 | it.second->set_ready(); | 140 | 244 | } | 141 | 248 | } | 142 | 436 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE18_update_dependencyEv Line | Count | Source | 127 | 1.10M | void ResultBlockBuffer<ResultCtxType>::_update_dependency() { | 128 | 1.10M | if (!_status.ok()) { | 129 | 63.3k | for (auto it : _result_sink_dependencies) { | 130 | 3 | it.second->set_ready(); | 131 | 3 | } | 132 | 63.3k | return; | 133 | 63.3k | } | 134 | | | 135 | 2.50M | for (auto it : _result_sink_dependencies) { | 136 | 2.50M | if (_instance_rows[it.first] > _batch_size) { | 137 | 2 | it.second->block(); | 138 | 2.50M | } else { | 139 | 2.50M | it.second->set_ready(); | 140 | 2.50M | } | 141 | 2.50M | } | 142 | 1.04M | } |
|
143 | | |
144 | | template <typename ResultCtxType> |
145 | 423k | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { |
146 | 423k | std::lock_guard<std::mutex> l(_lock); |
147 | 423k | SCOPED_ATTACH_TASK(_mem_tracker); |
148 | 423k | Defer defer {[&]() { _update_dependency(); }};_ZZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 148 | 8 | Defer defer {[&]() { _update_dependency(); }}; |
_ZZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_EENKUlvE_clEv Line | Count | Source | 148 | 423k | Defer defer {[&]() { _update_dependency(); }}; |
|
149 | 423k | if (!_status.ok()) { |
150 | 18 | ctx->on_failure(_status); |
151 | 18 | return _status; |
152 | 18 | } |
153 | 423k | if (!_result_batch_queue.empty()) { |
154 | 10.5k | auto result = _result_batch_queue.front(); |
155 | 10.5k | _result_batch_queue.pop_front(); |
156 | 10.7k | for (auto it : _instance_rows_in_queue.front()) { |
157 | 10.7k | _instance_rows[it.first] -= it.second; |
158 | 10.7k | } |
159 | 10.5k | _instance_rows_in_queue.pop_front(); |
160 | 10.5k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
161 | 10.5k | _packet_num++; |
162 | 10.5k | return Status::OK(); |
163 | 10.5k | } |
164 | 413k | if (_is_close) { |
165 | 109k | if (!_status.ok()) { |
166 | 0 | ctx->on_failure(_status); |
167 | 0 | return Status::OK(); |
168 | 0 | } |
169 | 109k | ctx->on_close(_packet_num, _returned_rows); |
170 | 109k | LOG(INFO) << fmt::format( |
171 | 109k | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " |
172 | 109k | "packet_num={}, peak_memory_usage={}", |
173 | 109k | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, |
174 | 109k | _mem_tracker->peak_consumption()); |
175 | 109k | return Status::OK(); |
176 | 109k | } |
177 | | // no ready data, push ctx to waiting list |
178 | 303k | _waiting_rpc.push_back(ctx); |
179 | 303k | return Status::OK(); |
180 | 413k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 145 | 8 | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 146 | 8 | std::lock_guard<std::mutex> l(_lock); | 147 | 8 | SCOPED_ATTACH_TASK(_mem_tracker); | 148 | 8 | Defer defer {[&]() { _update_dependency(); }}; | 149 | 8 | if (!_status.ok()) { | 150 | 1 | ctx->on_failure(_status); | 151 | 1 | return _status; | 152 | 1 | } | 153 | 7 | if (!_result_batch_queue.empty()) { | 154 | 2 | auto result = _result_batch_queue.front(); | 155 | 2 | _result_batch_queue.pop_front(); | 156 | 2 | for (auto it : _instance_rows_in_queue.front()) { | 157 | 2 | _instance_rows[it.first] -= it.second; | 158 | 2 | } | 159 | 2 | _instance_rows_in_queue.pop_front(); | 160 | 2 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 161 | 2 | _packet_num++; | 162 | 2 | return Status::OK(); | 163 | 2 | } | 164 | 5 | if (_is_close) { | 165 | 1 | if (!_status.ok()) { | 166 | 0 | ctx->on_failure(_status); | 167 | 0 | return Status::OK(); | 168 | 0 | } | 169 | 1 | ctx->on_close(_packet_num, _returned_rows); | 170 | 1 | LOG(INFO) << fmt::format( | 171 | 1 | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 172 | 1 | "packet_num={}, peak_memory_usage={}", | 173 | 1 | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 174 | 1 | _mem_tracker->peak_consumption()); | 175 | 1 | return Status::OK(); | 176 | 1 | } | 177 | | // no ready data, push ctx to waiting list | 178 | 4 | _waiting_rpc.push_back(ctx); | 179 | 4 | return Status::OK(); | 180 | 5 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9get_batchESt10shared_ptrIS1_E Line | Count | Source | 145 | 423k | Status ResultBlockBuffer<ResultCtxType>::get_batch(std::shared_ptr<ResultCtxType> ctx) { | 146 | 423k | std::lock_guard<std::mutex> l(_lock); | 147 | 423k | SCOPED_ATTACH_TASK(_mem_tracker); | 148 | 423k | Defer defer {[&]() { _update_dependency(); }}; | 149 | 423k | if (!_status.ok()) { | 150 | 17 | ctx->on_failure(_status); | 151 | 17 | return _status; | 152 | 17 | } | 153 | 423k | if (!_result_batch_queue.empty()) { | 154 | 10.5k | auto result = _result_batch_queue.front(); | 155 | 10.5k | _result_batch_queue.pop_front(); | 156 | 10.7k | for (auto it : _instance_rows_in_queue.front()) { | 157 | 10.7k | _instance_rows[it.first] -= it.second; | 158 | 10.7k | } | 159 | 10.5k | _instance_rows_in_queue.pop_front(); | 160 | 10.5k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 161 | 10.5k | _packet_num++; | 162 | 10.5k | return Status::OK(); | 163 | 10.5k | } | 164 | 413k | if (_is_close) { | 165 | 109k | if (!_status.ok()) { | 166 | 0 | ctx->on_failure(_status); | 167 | 0 | return Status::OK(); | 168 | 0 | } | 169 | 109k | ctx->on_close(_packet_num, _returned_rows); | 170 | 109k | LOG(INFO) << fmt::format( | 171 | 109k | "ResultBlockBuffer finished, fragment_id={}, is_close={}, is_cancelled={}, " | 172 | 109k | "packet_num={}, peak_memory_usage={}", | 173 | 109k | print_id(_fragment_id), _is_close, !_status.ok(), _packet_num, | 174 | 109k | _mem_tracker->peak_consumption()); | 175 | 109k | return Status::OK(); | 176 | 109k | } | 177 | | // no ready data, push ctx to waiting list | 178 | 303k | _waiting_rpc.push_back(ctx); | 179 | 303k | return Status::OK(); | 180 | 413k | } |
|
181 | | |
182 | | template <typename ResultCtxType> |
183 | | Status ResultBlockBuffer<ResultCtxType>::add_batch(RuntimeState* state, |
184 | 168k | std::shared_ptr<InBlockType>& result) { |
185 | 168k | std::unique_lock<std::mutex> l(_lock); |
186 | | |
187 | 168k | if (!_status.ok()) { |
188 | 2 | return _status; |
189 | 2 | } |
190 | | |
191 | 168k | if (_waiting_rpc.empty()) { |
192 | 12.4k | auto sz = 0; |
193 | 12.4k | auto num_rows = 0; |
194 | 12.4k | size_t batch_size = 0; |
195 | 12.4k | if constexpr (std::is_same_v<InBlockType, Block>) { |
196 | 119 | num_rows = cast_set<int>(result->rows()); |
197 | 119 | batch_size = result->bytes(); |
198 | 12.3k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
199 | 12.3k | num_rows = cast_set<int>(result->result_batch.rows.size()); |
200 | 23.5k | for (const auto& row : result->result_batch.rows) { |
201 | 23.5k | batch_size += row.size(); |
202 | 23.5k | } |
203 | 12.3k | } |
204 | 12.4k | if (!_result_batch_queue.empty()) { |
205 | 1.60k | if constexpr (std::is_same_v<InBlockType, Block>) { |
206 | 12 | sz = cast_set<int>(_result_batch_queue.back()->rows()); |
207 | 1.59k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { |
208 | 1.59k | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); |
209 | 1.59k | } |
210 | 1.60k | if (sz + num_rows < _buffer_limit && |
211 | 1.60k | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { |
212 | 1.60k | if constexpr (std::is_same_v<InBlockType, Block>) { |
213 | 12 | auto last_block = _result_batch_queue.back(); |
214 | 44 | for (size_t i = 0; i < last_block->columns(); i++) { |
215 | 32 | last_block->mutate_columns()[i]->insert_range_from( |
216 | 32 | *result->get_by_position(i).column, 0, num_rows); |
217 | 32 | } |
218 | 1.59k | } else { |
219 | 1.59k | std::vector<std::string>& back_rows = |
220 | 1.59k | _result_batch_queue.back()->result_batch.rows; |
221 | 1.59k | std::vector<std::string>& result_rows = result->result_batch.rows; |
222 | 1.59k | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), |
223 | 1.59k | std::make_move_iterator(result_rows.end())); |
224 | 1.59k | } |
225 | 1.60k | _last_batch_bytes += batch_size; |
226 | 1.60k | } else { |
227 | 0 | _instance_rows_in_queue.emplace_back(); |
228 | 0 | _result_batch_queue.push_back(std::move(result)); |
229 | 0 | _last_batch_bytes = batch_size; |
230 | 0 | _arrow_data_arrival |
231 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
232 | 0 | } |
233 | 10.8k | } else { |
234 | 10.8k | _instance_rows_in_queue.emplace_back(); |
235 | 10.8k | _result_batch_queue.push_back(std::move(result)); |
236 | 10.8k | _last_batch_bytes = batch_size; |
237 | 10.8k | _arrow_data_arrival |
238 | 10.8k | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) |
239 | 10.8k | } |
240 | 12.4k | _instance_rows[state->fragment_instance_id()] += num_rows; |
241 | 12.4k | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; |
242 | 156k | } else { |
243 | 156k | auto ctx = _waiting_rpc.front(); |
244 | 156k | _waiting_rpc.pop_front(); |
245 | 156k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); |
246 | 156k | _packet_num++; |
247 | 156k | } |
248 | | |
249 | 168k | _update_dependency(); |
250 | 168k | return Status::OK(); |
251 | 168k | } _ZN5doris17ResultBlockBufferINS_22GetArrowResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_5BlockEE Line | Count | Source | 184 | 121 | std::shared_ptr<InBlockType>& result) { | 185 | 121 | std::unique_lock<std::mutex> l(_lock); | 186 | | | 187 | 121 | if (!_status.ok()) { | 188 | 1 | return _status; | 189 | 1 | } | 190 | | | 191 | 120 | if (_waiting_rpc.empty()) { | 192 | 119 | auto sz = 0; | 193 | 119 | auto num_rows = 0; | 194 | 119 | size_t batch_size = 0; | 195 | 119 | if constexpr (std::is_same_v<InBlockType, Block>) { | 196 | 119 | num_rows = cast_set<int>(result->rows()); | 197 | 119 | batch_size = result->bytes(); | 198 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 199 | | num_rows = cast_set<int>(result->result_batch.rows.size()); | 200 | | for (const auto& row : result->result_batch.rows) { | 201 | | batch_size += row.size(); | 202 | | } | 203 | | } | 204 | 119 | if (!_result_batch_queue.empty()) { | 205 | 12 | if constexpr (std::is_same_v<InBlockType, Block>) { | 206 | 12 | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 207 | | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 208 | | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 209 | | } | 210 | 12 | if (sz + num_rows < _buffer_limit && | 211 | 12 | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 212 | 12 | if constexpr (std::is_same_v<InBlockType, Block>) { | 213 | 12 | auto last_block = _result_batch_queue.back(); | 214 | 44 | for (size_t i = 0; i < last_block->columns(); i++) { | 215 | 32 | last_block->mutate_columns()[i]->insert_range_from( | 216 | 32 | *result->get_by_position(i).column, 0, num_rows); | 217 | 32 | } | 218 | | } else { | 219 | | std::vector<std::string>& back_rows = | 220 | | _result_batch_queue.back()->result_batch.rows; | 221 | | std::vector<std::string>& result_rows = result->result_batch.rows; | 222 | | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 223 | | std::make_move_iterator(result_rows.end())); | 224 | | } | 225 | 12 | _last_batch_bytes += batch_size; | 226 | 12 | } else { | 227 | 0 | _instance_rows_in_queue.emplace_back(); | 228 | 0 | _result_batch_queue.push_back(std::move(result)); | 229 | 0 | _last_batch_bytes = batch_size; | 230 | 0 | _arrow_data_arrival | 231 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 232 | 0 | } | 233 | 107 | } else { | 234 | 107 | _instance_rows_in_queue.emplace_back(); | 235 | 107 | _result_batch_queue.push_back(std::move(result)); | 236 | 107 | _last_batch_bytes = batch_size; | 237 | 107 | _arrow_data_arrival | 238 | 107 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 239 | 107 | } | 240 | 119 | _instance_rows[state->fragment_instance_id()] += num_rows; | 241 | 119 | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 242 | 119 | } else { | 243 | 1 | auto ctx = _waiting_rpc.front(); | 244 | 1 | _waiting_rpc.pop_front(); | 245 | 1 | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 246 | 1 | _packet_num++; | 247 | 1 | } | 248 | | | 249 | 120 | _update_dependency(); | 250 | 120 | return Status::OK(); | 251 | 120 | } |
_ZN5doris17ResultBlockBufferINS_17GetResultBatchCtxEE9add_batchEPNS_12RuntimeStateERSt10shared_ptrINS_16TFetchDataResultEE Line | Count | Source | 184 | 168k | std::shared_ptr<InBlockType>& result) { | 185 | 168k | std::unique_lock<std::mutex> l(_lock); | 186 | | | 187 | 168k | if (!_status.ok()) { | 188 | 1 | return _status; | 189 | 1 | } | 190 | | | 191 | 168k | if (_waiting_rpc.empty()) { | 192 | 12.3k | auto sz = 0; | 193 | 12.3k | auto num_rows = 0; | 194 | 12.3k | size_t batch_size = 0; | 195 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 196 | | num_rows = cast_set<int>(result->rows()); | 197 | | batch_size = result->bytes(); | 198 | 12.3k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 199 | 12.3k | num_rows = cast_set<int>(result->result_batch.rows.size()); | 200 | 23.5k | for (const auto& row : result->result_batch.rows) { | 201 | 23.5k | batch_size += row.size(); | 202 | 23.5k | } | 203 | 12.3k | } | 204 | 12.3k | if (!_result_batch_queue.empty()) { | 205 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 206 | | sz = cast_set<int>(_result_batch_queue.back()->rows()); | 207 | 1.59k | } else if constexpr (std::is_same_v<InBlockType, TFetchDataResult>) { | 208 | 1.59k | sz = cast_set<int>(_result_batch_queue.back()->result_batch.rows.size()); | 209 | 1.59k | } | 210 | 1.59k | if (sz + num_rows < _buffer_limit && | 211 | 1.59k | (batch_size + _last_batch_bytes) <= config::thrift_max_message_size) { | 212 | | if constexpr (std::is_same_v<InBlockType, Block>) { | 213 | | auto last_block = _result_batch_queue.back(); | 214 | | for (size_t i = 0; i < last_block->columns(); i++) { | 215 | | last_block->mutate_columns()[i]->insert_range_from( | 216 | | *result->get_by_position(i).column, 0, num_rows); | 217 | | } | 218 | 1.59k | } else { | 219 | 1.59k | std::vector<std::string>& back_rows = | 220 | 1.59k | _result_batch_queue.back()->result_batch.rows; | 221 | 1.59k | std::vector<std::string>& result_rows = result->result_batch.rows; | 222 | 1.59k | back_rows.insert(back_rows.end(), std::make_move_iterator(result_rows.begin()), | 223 | 1.59k | std::make_move_iterator(result_rows.end())); | 224 | 1.59k | } | 225 | 1.59k | _last_batch_bytes += batch_size; | 226 | 1.59k | } else { | 227 | 0 | _instance_rows_in_queue.emplace_back(); | 228 | 0 | _result_batch_queue.push_back(std::move(result)); | 229 | 0 | _last_batch_bytes = batch_size; | 230 | 0 | _arrow_data_arrival | 231 | 0 | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 232 | 0 | } | 233 | 10.7k | } else { | 234 | 10.7k | _instance_rows_in_queue.emplace_back(); | 235 | 10.7k | _result_batch_queue.push_back(std::move(result)); | 236 | 10.7k | _last_batch_bytes = batch_size; | 237 | 10.7k | _arrow_data_arrival | 238 | 10.7k | .notify_one(); // Only valid for get_arrow_batch(std::shared_ptr<Block>,) | 239 | 10.7k | } | 240 | 12.3k | _instance_rows[state->fragment_instance_id()] += num_rows; | 241 | 12.3k | _instance_rows_in_queue.back()[state->fragment_instance_id()] += num_rows; | 242 | 156k | } else { | 243 | 156k | auto ctx = _waiting_rpc.front(); | 244 | 156k | _waiting_rpc.pop_front(); | 245 | 156k | RETURN_IF_ERROR(ctx->on_data(result, _packet_num, this)); | 246 | 156k | _packet_num++; | 247 | 156k | } | 248 | | | 249 | 168k | _update_dependency(); | 250 | 168k | return Status::OK(); | 251 | 168k | } |
|
252 | | |
253 | | template class ResultBlockBuffer<GetArrowResultBatchCtx>; |
254 | | template class ResultBlockBuffer<GetResultBatchCtx>; |
255 | | |
256 | | #include "common/compile_check_end.h" |
257 | | } // namespace doris |