be/src/exec/sink/writer/vmysql_result_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vmysql_result_writer.h" |
19 | | |
20 | | #include <fmt/core.h> |
21 | | #include <gen_cpp/Data_types.h> |
22 | | #include <gen_cpp/Metrics_types.h> |
23 | | #include <gen_cpp/PaloInternalService_types.h> |
24 | | #include <gen_cpp/internal_service.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <stdint.h> |
27 | | #include <string.h> |
28 | | #include <sys/types.h> |
29 | | |
30 | | #include <ostream> |
31 | | #include <string> |
32 | | |
33 | | #include "common/cast_set.h" |
34 | | #include "common/compiler_util.h" // IWYU pragma: keep |
35 | | #include "common/config.h" |
36 | | #include "core/block/block.h" |
37 | | #include "core/block/column_with_type_and_name.h" |
38 | | #include "core/column/column.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/data_type/data_type_array.h" |
41 | | #include "core/data_type/data_type_decimal.h" |
42 | | #include "core/data_type/data_type_map.h" |
43 | | #include "core/data_type/data_type_nullable.h" |
44 | | #include "core/data_type/data_type_struct.h" |
45 | | #include "core/data_type/define_primitive_type.h" |
46 | | #include "core/data_type_serde/data_type_serde.h" |
47 | | #include "core/types.h" |
48 | | #include "exprs/aggregate/aggregate_function.h" |
49 | | #include "exprs/vexpr.h" |
50 | | #include "exprs/vexpr_context.h" |
51 | | #include "runtime/result_block_buffer.h" |
52 | | #include "runtime/runtime_state.h" |
53 | | #include "util/mysql_global.h" |
54 | | |
55 | | namespace doris { |
56 | | |
57 | 1 | void GetResultBatchCtx::on_failure(const Status& status) { |
58 | 1 | DCHECK(!status.ok()) << "status is ok, errmsg=" << status; |
59 | 1 | status.to_protobuf(_result->mutable_status()); |
60 | 1 | _done->Run(); |
61 | 1 | } |
62 | | |
63 | 1 | void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) { |
64 | 1 | Status status; |
65 | 1 | status.to_protobuf(_result->mutable_status()); |
66 | 1 | PQueryStatistics* statistics = _result->mutable_query_statistics(); |
67 | 1 | statistics->set_returned_rows(returned_rows); |
68 | 1 | _result->set_packet_seq(packet_seq); |
69 | 1 | _result->set_eos(true); |
70 | 1 | _done->Run(); |
71 | 1 | } |
72 | | |
73 | | Status GetResultBatchCtx::on_data(const std::shared_ptr<TFetchDataResult>& t_result, |
74 | 3 | int64_t packet_seq, ResultBlockBufferBase* buffer) { |
75 | 3 | Status st = Status::OK(); |
76 | 3 | if (t_result != nullptr) { |
77 | 2 | uint8_t* buf = nullptr; |
78 | 2 | uint32_t len = 0; |
79 | 2 | ThriftSerializer ser(false, 4096); |
80 | 2 | RETURN_IF_ERROR(ser.serialize(&t_result->result_batch, &len, &buf)); |
81 | 2 | _result->set_row_batch(std::string((const char*)buf, len)); |
82 | 2 | } else { |
83 | 1 | _result->clear_row_batch(); |
84 | 1 | _result->set_empty_batch(true); |
85 | 1 | } |
86 | 3 | _result->set_packet_seq(packet_seq); |
87 | 3 | _result->set_eos(false); |
88 | | |
89 | | /// The size limit of proto buffer message is 2G |
90 | 3 | if (_result->ByteSizeLong() > _max_msg_size) { |
91 | 1 | st = Status::InternalError("Message size exceeds 2GB: {}", _result->ByteSizeLong()); |
92 | 1 | _result->clear_row_batch(); |
93 | 1 | _result->set_empty_batch(true); |
94 | 1 | } |
95 | 3 | st.to_protobuf(_result->mutable_status()); |
96 | 3 | _done->Run(); |
97 | 3 | return Status::OK(); |
98 | 3 | } |
99 | | |
100 | | VMysqlResultWriter::VMysqlResultWriter(std::shared_ptr<ResultBlockBufferBase> sinker, |
101 | | const VExprContextSPtrs& output_vexpr_ctxs, |
102 | | RuntimeProfile* parent_profile, bool is_binary_format) |
103 | 1 | : ResultWriter(), |
104 | 1 | _sinker(std::dynamic_pointer_cast<MySQLResultBlockBuffer>(sinker)), |
105 | 1 | _output_vexpr_ctxs(output_vexpr_ctxs), |
106 | 1 | _parent_profile(parent_profile), |
107 | 1 | _is_binary_format(is_binary_format) {} |
108 | | |
109 | 0 | Status VMysqlResultWriter::init(RuntimeState* state) { |
110 | 0 | _init_profile(); |
111 | 0 | set_output_object_data(state->return_object_data_as_binary()); |
112 | 0 | _is_dry_run = state->query_options().dry_run_query; |
113 | 0 | return Status::OK(); |
114 | 0 | } |
115 | | |
116 | 0 | void VMysqlResultWriter::_init_profile() { |
117 | 0 | if (_parent_profile != nullptr) { |
118 | | // for PointQueryExecutor, _parent_profile is null |
119 | 0 | _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); |
120 | 0 | _convert_tuple_timer = |
121 | 0 | ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); |
122 | 0 | _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); |
123 | 0 | _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); |
124 | 0 | _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); |
125 | 0 | } |
126 | 0 | } |
127 | | |
128 | 1 | Status VMysqlResultWriter::_write_one_block(RuntimeState* state, Block& block) { |
129 | 1 | Status status = Status::OK(); |
130 | 1 | int num_rows = cast_set<int>(block.rows()); |
131 | | // convert one batch |
132 | 1 | auto result = std::make_shared<TFetchDataResult>(); |
133 | 1 | result->result_batch.rows.resize(num_rows); |
134 | 1 | uint64_t bytes_sent = 0; |
135 | 1 | { |
136 | 1 | SCOPED_TIMER(_convert_tuple_timer); |
137 | | |
138 | 1 | struct Arguments { |
139 | 1 | const IColumn* column; |
140 | 1 | bool is_const; |
141 | 1 | DataTypeSerDeSPtr serde; |
142 | 1 | PrimitiveType type; |
143 | 1 | }; |
144 | 1 | auto options = DataTypeSerDe::get_default_format_options(); |
145 | 1 | options.timezone = &state->timezone_obj(); |
146 | | |
147 | 1 | const size_t num_cols = _output_vexpr_ctxs.size(); |
148 | 1 | std::vector<Arguments> arguments; |
149 | 1 | arguments.reserve(num_cols); |
150 | | |
151 | 9 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
152 | 8 | const auto& [column_ptr, col_const] = |
153 | 8 | unpack_if_const(block.get_by_position(col_idx).column); |
154 | 8 | int scale = _output_vexpr_ctxs[col_idx]->root()->data_type()->get_scale(); |
155 | | // decimalv2 scale and precision is hard code, so we should get real scale and precision |
156 | | // from expr |
157 | 8 | DataTypeSerDeSPtr serde; |
158 | 8 | if (_output_vexpr_ctxs[col_idx]->root()->data_type()->get_primitive_type() == |
159 | 8 | PrimitiveType::TYPE_DECIMALV2) { |
160 | 0 | if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) { |
161 | 0 | auto nested_serde = |
162 | 0 | std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale); |
163 | 0 | serde = std::make_shared<DataTypeNullableSerDe>(nested_serde); |
164 | 0 | } else { |
165 | 0 | serde = std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale); |
166 | 0 | } |
167 | 8 | } else { |
168 | 8 | serde = block.get_by_position(col_idx).type->get_serde(); |
169 | 8 | } |
170 | 8 | serde->set_return_object_as_string(output_object_data()); |
171 | 8 | arguments.emplace_back(column_ptr.get(), col_const, serde, |
172 | 8 | block.get_by_position(col_idx).type->get_primitive_type()); |
173 | 8 | } |
174 | | |
175 | 9 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
176 | 8 | const auto& argument = arguments[col_idx]; |
177 | | // const column will only have 1 row, see unpack_if_const |
178 | 8 | if (argument.column->size() < num_rows && !argument.is_const) { |
179 | 0 | return Status::InternalError( |
180 | 0 | "Required row size is out of range, need {} rows, column {} has {} " |
181 | 0 | "rows in fact.", |
182 | 0 | num_rows, argument.column->get_name(), argument.column->size()); |
183 | 0 | } |
184 | 8 | } |
185 | 1 | auto mysql_output_tmp_col = ColumnString::create(); |
186 | 1 | BufferWriter write_buffer(*mysql_output_tmp_col); |
187 | 1 | size_t write_buffer_index = 0; |
188 | | // For non-binary format, we need to call different serialization interfaces |
189 | | // write_column_to_mysql/presto/hive text |
190 | 1 | if (!_is_binary_format) { |
191 | 1 | const auto& serde_dialect = state->query_options().serde_dialect; |
192 | 1 | auto write_to_text = [serde_dialect](DataTypeSerDeSPtr& serde, const IColumn* column, |
193 | 1 | BufferWriter& write_buffer, size_t col_index, |
194 | 56 | const DataTypeSerDe::FormatOptions& options) { |
195 | 56 | if (serde_dialect == TSerdeDialect::DORIS) { |
196 | 56 | return serde->write_column_to_mysql_text(*column, write_buffer, col_index, |
197 | 56 | options); |
198 | 56 | } else if (serde_dialect == TSerdeDialect::PRESTO) { |
199 | 0 | return serde->write_column_to_presto_text(*column, write_buffer, col_index, |
200 | 0 | options); |
201 | 0 | } else if (serde_dialect == TSerdeDialect::HIVE) { |
202 | 0 | return serde->write_column_to_hive_text(*column, write_buffer, col_index, |
203 | 0 | options); |
204 | 0 | } else { |
205 | 0 | return false; |
206 | 0 | } |
207 | 56 | }; |
208 | | |
209 | 8 | for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
210 | 7 | auto& mysql_rows = result->result_batch.rows[row_idx]; |
211 | 63 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
212 | 56 | const auto col_index = index_check_const(row_idx, arguments[col_idx].is_const); |
213 | 56 | const auto* column = arguments[col_idx].column; |
214 | 56 | if (write_to_text(arguments[col_idx].serde, column, write_buffer, col_index, |
215 | 56 | options)) { |
216 | 56 | write_buffer.commit(); |
217 | 56 | auto str = mysql_output_tmp_col->get_data_at(write_buffer_index); |
218 | 56 | direct_write_to_mysql_result_string(mysql_rows, str.data, str.size); |
219 | 56 | write_buffer_index++; |
220 | 56 | } else { |
221 | 0 | direct_write_to_mysql_result_null(mysql_rows); |
222 | 0 | } |
223 | 56 | } |
224 | 7 | bytes_sent += mysql_rows.size(); |
225 | 7 | } |
226 | 1 | } else { |
227 | 0 | MysqlRowBinaryBuffer row_buffer; |
228 | |
|
229 | 0 | row_buffer.start_binary_row(_output_vexpr_ctxs.size()); |
230 | |
|
231 | 0 | for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
232 | 0 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
233 | 0 | auto type = arguments[col_idx].type; |
234 | 0 | if (type == PrimitiveType::TYPE_ARRAY || type == PrimitiveType::TYPE_MAP || |
235 | 0 | type == PrimitiveType::TYPE_STRUCT || |
236 | 0 | type == PrimitiveType::TYPE_QUANTILE_STATE || |
237 | 0 | type == PrimitiveType::TYPE_HLL || type == PrimitiveType::TYPE_BITMAP) { |
238 | | // Complex types are not supported in binary format yet |
239 | | // So use text format serialization interface here |
240 | 0 | const auto col_index = |
241 | 0 | index_check_const(row_idx, arguments[col_idx].is_const); |
242 | 0 | const auto* column = arguments[col_idx].column; |
243 | 0 | if (arguments[col_idx].serde->write_column_to_mysql_text( |
244 | 0 | *column, write_buffer, col_index, options)) { |
245 | 0 | write_buffer.commit(); |
246 | 0 | auto str = mysql_output_tmp_col->get_data_at(write_buffer_index); |
247 | 0 | row_buffer.push_string(str.data, str.size); |
248 | 0 | write_buffer_index++; |
249 | 0 | } else { |
250 | 0 | row_buffer.push_null(); |
251 | 0 | } |
252 | |
|
253 | 0 | } else { |
254 | 0 | RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql_binary( |
255 | 0 | *(arguments[col_idx].column), row_buffer, row_idx, |
256 | 0 | arguments[col_idx].is_const, options)); |
257 | 0 | } |
258 | 0 | } |
259 | | |
260 | 0 | result->result_batch.rows[row_idx].append(row_buffer.buf(), row_buffer.length()); |
261 | 0 | bytes_sent += row_buffer.length(); |
262 | 0 | row_buffer.reset(); |
263 | 0 | row_buffer.start_binary_row(_output_vexpr_ctxs.size()); |
264 | 0 | } |
265 | 0 | } |
266 | 1 | } |
267 | 1 | { |
268 | 1 | SCOPED_TIMER(_result_send_timer); |
269 | | // If this is a dry run task, no need to send data block |
270 | 1 | if (!_is_dry_run) { |
271 | 1 | status = _sinker->add_batch(state, result); |
272 | 1 | } |
273 | 1 | if (status.ok()) { |
274 | 1 | _written_rows += num_rows; |
275 | 1 | if (!_is_dry_run) { |
276 | 1 | _bytes_sent += bytes_sent; |
277 | 1 | } |
278 | 1 | } else { |
279 | 0 | LOG(WARNING) << "append result batch to sink failed."; |
280 | 0 | } |
281 | 1 | } |
282 | 1 | return status; |
283 | 1 | } |
284 | | |
285 | 1 | Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) { |
286 | 1 | SCOPED_TIMER(_append_row_batch_timer); |
287 | 1 | Status status = Status::OK(); |
288 | 1 | if (UNLIKELY(input_block.rows() == 0)) { |
289 | 0 | return status; |
290 | 0 | } |
291 | | |
292 | 1 | DCHECK(_output_vexpr_ctxs.empty() != true); |
293 | | |
294 | | // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec |
295 | | // failed, just return the error status |
296 | 1 | Block block; |
297 | 1 | RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, |
298 | 1 | input_block, &block)); |
299 | 1 | const auto total_bytes = block.bytes(); |
300 | | |
301 | 1 | if (total_bytes > config::thrift_max_message_size) [[unlikely]] { |
302 | 0 | const auto total_rows = block.rows(); |
303 | 0 | const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) / |
304 | 0 | config::thrift_max_message_size; |
305 | 0 | const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count; |
306 | |
|
307 | 0 | size_t offset = 0; |
308 | 0 | while (offset < total_rows) { |
309 | 0 | size_t rows = std::min(static_cast<size_t>(sub_block_rows), total_rows - offset); |
310 | 0 | auto sub_block = block.clone_empty(); |
311 | 0 | for (size_t i = 0; i != block.columns(); ++i) { |
312 | 0 | sub_block.get_by_position(i).column = |
313 | 0 | block.get_by_position(i).column->cut(offset, rows); |
314 | 0 | } |
315 | 0 | offset += rows; |
316 | |
|
317 | 0 | RETURN_IF_ERROR(_write_one_block(state, sub_block)); |
318 | 0 | } |
319 | 0 | return Status::OK(); |
320 | 0 | } |
321 | | |
322 | 1 | return _write_one_block(state, block); |
323 | 1 | } |
324 | | |
325 | 0 | Status VMysqlResultWriter::close(Status) { |
326 | 0 | COUNTER_SET(_sent_rows_counter, _written_rows); |
327 | 0 | COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent); |
328 | 0 | return Status::OK(); |
329 | 0 | } |
330 | | |
331 | | } // namespace doris |