be/src/exec/sink/writer/vmysql_result_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vmysql_result_writer.h" |
19 | | |
20 | | #include <fmt/core.h> |
21 | | #include <gen_cpp/Data_types.h> |
22 | | #include <gen_cpp/Metrics_types.h> |
23 | | #include <gen_cpp/PaloInternalService_types.h> |
24 | | #include <gen_cpp/internal_service.pb.h> |
25 | | #include <glog/logging.h> |
26 | | #include <stdint.h> |
27 | | #include <string.h> |
28 | | #include <sys/types.h> |
29 | | |
30 | | #include <ostream> |
31 | | #include <string> |
32 | | |
33 | | #include "common/cast_set.h" |
34 | | #include "common/compiler_util.h" // IWYU pragma: keep |
35 | | #include "common/config.h" |
36 | | #include "core/block/block.h" |
37 | | #include "core/block/column_with_type_and_name.h" |
38 | | #include "core/column/column.h" |
39 | | #include "core/column/column_const.h" |
40 | | #include "core/data_type/data_type_array.h" |
41 | | #include "core/data_type/data_type_decimal.h" |
42 | | #include "core/data_type/data_type_map.h" |
43 | | #include "core/data_type/data_type_nullable.h" |
44 | | #include "core/data_type/data_type_struct.h" |
45 | | #include "core/data_type/define_primitive_type.h" |
46 | | #include "core/data_type_serde/data_type_serde.h" |
47 | | #include "core/types.h" |
48 | | #include "exprs/aggregate/aggregate_function.h" |
49 | | #include "exprs/vexpr.h" |
50 | | #include "exprs/vexpr_context.h" |
51 | | #include "runtime/result_block_buffer.h" |
52 | | #include "runtime/runtime_state.h" |
53 | | #include "util/mysql_global.h" |
54 | | |
55 | | namespace doris { |
56 | | #include "common/compile_check_begin.h" |
57 | | |
58 | 1 | void GetResultBatchCtx::on_failure(const Status& status) { |
59 | 1 | DCHECK(!status.ok()) << "status is ok, errmsg=" << status; |
60 | 1 | status.to_protobuf(_result->mutable_status()); |
61 | 1 | _done->Run(); |
62 | 1 | } |
63 | | |
64 | 1 | void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) { |
65 | 1 | Status status; |
66 | 1 | status.to_protobuf(_result->mutable_status()); |
67 | 1 | PQueryStatistics* statistics = _result->mutable_query_statistics(); |
68 | 1 | statistics->set_returned_rows(returned_rows); |
69 | 1 | _result->set_packet_seq(packet_seq); |
70 | 1 | _result->set_eos(true); |
71 | 1 | _done->Run(); |
72 | 1 | } |
73 | | |
74 | | Status GetResultBatchCtx::on_data(const std::shared_ptr<TFetchDataResult>& t_result, |
75 | 3 | int64_t packet_seq, ResultBlockBufferBase* buffer) { |
76 | 3 | Status st = Status::OK(); |
77 | 3 | if (t_result != nullptr) { |
78 | 2 | uint8_t* buf = nullptr; |
79 | 2 | uint32_t len = 0; |
80 | 2 | ThriftSerializer ser(false, 4096); |
81 | 2 | RETURN_IF_ERROR(ser.serialize(&t_result->result_batch, &len, &buf)); |
82 | 2 | _result->set_row_batch(std::string((const char*)buf, len)); |
83 | 2 | } else { |
84 | 1 | _result->clear_row_batch(); |
85 | 1 | _result->set_empty_batch(true); |
86 | 1 | } |
87 | 3 | _result->set_packet_seq(packet_seq); |
88 | 3 | _result->set_eos(false); |
89 | | |
90 | | /// The size limit of proto buffer message is 2G |
91 | 3 | if (_result->ByteSizeLong() > _max_msg_size) { |
92 | 1 | st = Status::InternalError("Message size exceeds 2GB: {}", _result->ByteSizeLong()); |
93 | 1 | _result->clear_row_batch(); |
94 | 1 | _result->set_empty_batch(true); |
95 | 1 | } |
96 | 3 | st.to_protobuf(_result->mutable_status()); |
97 | 3 | _done->Run(); |
98 | 3 | return Status::OK(); |
99 | 3 | } |
100 | | |
101 | | VMysqlResultWriter::VMysqlResultWriter(std::shared_ptr<ResultBlockBufferBase> sinker, |
102 | | const VExprContextSPtrs& output_vexpr_ctxs, |
103 | | RuntimeProfile* parent_profile, bool is_binary_format) |
104 | 1 | : ResultWriter(), |
105 | 1 | _sinker(std::dynamic_pointer_cast<MySQLResultBlockBuffer>(sinker)), |
106 | 1 | _output_vexpr_ctxs(output_vexpr_ctxs), |
107 | 1 | _parent_profile(parent_profile), |
108 | 1 | _is_binary_format(is_binary_format) {} |
109 | | |
110 | 0 | Status VMysqlResultWriter::init(RuntimeState* state) { |
111 | 0 | _init_profile(); |
112 | 0 | set_output_object_data(state->return_object_data_as_binary()); |
113 | 0 | _is_dry_run = state->query_options().dry_run_query; |
114 | 0 | return Status::OK(); |
115 | 0 | } |
116 | | |
117 | 0 | void VMysqlResultWriter::_init_profile() { |
118 | 0 | if (_parent_profile != nullptr) { |
119 | | // for PointQueryExecutor, _parent_profile is null |
120 | 0 | _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime"); |
121 | 0 | _convert_tuple_timer = |
122 | 0 | ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime"); |
123 | 0 | _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime"); |
124 | 0 | _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT); |
125 | 0 | _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES); |
126 | 0 | } |
127 | 0 | } |
128 | | |
129 | 1 | Status VMysqlResultWriter::_write_one_block(RuntimeState* state, Block& block) { |
130 | 1 | Status status = Status::OK(); |
131 | 1 | int num_rows = cast_set<int>(block.rows()); |
132 | | // convert one batch |
133 | 1 | auto result = std::make_shared<TFetchDataResult>(); |
134 | 1 | result->result_batch.rows.resize(num_rows); |
135 | 1 | uint64_t bytes_sent = 0; |
136 | 1 | { |
137 | 1 | SCOPED_TIMER(_convert_tuple_timer); |
138 | | |
139 | 1 | struct Arguments { |
140 | 1 | const IColumn* column; |
141 | 1 | bool is_const; |
142 | 1 | DataTypeSerDeSPtr serde; |
143 | 1 | PrimitiveType type; |
144 | 1 | }; |
145 | 1 | auto options = DataTypeSerDe::get_default_format_options(); |
146 | 1 | options.timezone = &state->timezone_obj(); |
147 | | |
148 | 1 | const size_t num_cols = _output_vexpr_ctxs.size(); |
149 | 1 | std::vector<Arguments> arguments; |
150 | 1 | arguments.reserve(num_cols); |
151 | | |
152 | 9 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
153 | 8 | const auto& [column_ptr, col_const] = |
154 | 8 | unpack_if_const(block.get_by_position(col_idx).column); |
155 | 8 | int scale = _output_vexpr_ctxs[col_idx]->root()->data_type()->get_scale(); |
156 | | // decimalv2 scale and precision is hard code, so we should get real scale and precision |
157 | | // from expr |
158 | 8 | DataTypeSerDeSPtr serde; |
159 | 8 | if (_output_vexpr_ctxs[col_idx]->root()->data_type()->get_primitive_type() == |
160 | 8 | PrimitiveType::TYPE_DECIMALV2) { |
161 | 0 | if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) { |
162 | 0 | auto nested_serde = |
163 | 0 | std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale); |
164 | 0 | serde = std::make_shared<DataTypeNullableSerDe>(nested_serde); |
165 | 0 | } else { |
166 | 0 | serde = std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale); |
167 | 0 | } |
168 | 8 | } else { |
169 | 8 | serde = block.get_by_position(col_idx).type->get_serde(); |
170 | 8 | } |
171 | 8 | serde->set_return_object_as_string(output_object_data()); |
172 | 8 | arguments.emplace_back(column_ptr.get(), col_const, serde, |
173 | 8 | block.get_by_position(col_idx).type->get_primitive_type()); |
174 | 8 | } |
175 | | |
176 | 9 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
177 | 8 | const auto& argument = arguments[col_idx]; |
178 | | // const column will only have 1 row, see unpack_if_const |
179 | 8 | if (argument.column->size() < num_rows && !argument.is_const) { |
180 | 0 | return Status::InternalError( |
181 | 0 | "Required row size is out of range, need {} rows, column {} has {} " |
182 | 0 | "rows in fact.", |
183 | 0 | num_rows, argument.column->get_name(), argument.column->size()); |
184 | 0 | } |
185 | 8 | } |
186 | 1 | auto mysql_output_tmp_col = ColumnString::create(); |
187 | 1 | BufferWriter write_buffer(*mysql_output_tmp_col); |
188 | 1 | size_t write_buffer_index = 0; |
189 | | // For non-binary format, we need to call different serialization interfaces |
190 | | // write_column_to_mysql/presto/hive text |
191 | 1 | if (!_is_binary_format) { |
192 | 1 | const auto& serde_dialect = state->query_options().serde_dialect; |
193 | 1 | auto write_to_text = [serde_dialect](DataTypeSerDeSPtr& serde, const IColumn* column, |
194 | 1 | BufferWriter& write_buffer, size_t col_index, |
195 | 56 | const DataTypeSerDe::FormatOptions& options) { |
196 | 56 | if (serde_dialect == TSerdeDialect::DORIS) { |
197 | 56 | return serde->write_column_to_mysql_text(*column, write_buffer, col_index, |
198 | 56 | options); |
199 | 56 | } else if (serde_dialect == TSerdeDialect::PRESTO) { |
200 | 0 | return serde->write_column_to_presto_text(*column, write_buffer, col_index, |
201 | 0 | options); |
202 | 0 | } else if (serde_dialect == TSerdeDialect::HIVE) { |
203 | 0 | return serde->write_column_to_hive_text(*column, write_buffer, col_index, |
204 | 0 | options); |
205 | 0 | } else { |
206 | 0 | return false; |
207 | 0 | } |
208 | 56 | }; |
209 | | |
210 | 8 | for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
211 | 7 | auto& mysql_rows = result->result_batch.rows[row_idx]; |
212 | 63 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
213 | 56 | const auto col_index = index_check_const(row_idx, arguments[col_idx].is_const); |
214 | 56 | const auto* column = arguments[col_idx].column; |
215 | 56 | if (write_to_text(arguments[col_idx].serde, column, write_buffer, col_index, |
216 | 56 | options)) { |
217 | 56 | write_buffer.commit(); |
218 | 56 | auto str = mysql_output_tmp_col->get_data_at(write_buffer_index); |
219 | 56 | direct_write_to_mysql_result_string(mysql_rows, str.data, str.size); |
220 | 56 | write_buffer_index++; |
221 | 56 | } else { |
222 | 0 | direct_write_to_mysql_result_null(mysql_rows); |
223 | 0 | } |
224 | 56 | } |
225 | 7 | bytes_sent += mysql_rows.size(); |
226 | 7 | } |
227 | 1 | } else { |
228 | 0 | MysqlRowBinaryBuffer row_buffer; |
229 | |
|
230 | 0 | row_buffer.start_binary_row(_output_vexpr_ctxs.size()); |
231 | |
|
232 | 0 | for (int row_idx = 0; row_idx < num_rows; ++row_idx) { |
233 | 0 | for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) { |
234 | 0 | auto type = arguments[col_idx].type; |
235 | 0 | if (type == PrimitiveType::TYPE_ARRAY || type == PrimitiveType::TYPE_MAP || |
236 | 0 | type == PrimitiveType::TYPE_STRUCT || |
237 | 0 | type == PrimitiveType::TYPE_QUANTILE_STATE || |
238 | 0 | type == PrimitiveType::TYPE_HLL || type == PrimitiveType::TYPE_BITMAP) { |
239 | | // Complex types are not supported in binary format yet |
240 | | // So use text format serialization interface here |
241 | 0 | const auto col_index = |
242 | 0 | index_check_const(row_idx, arguments[col_idx].is_const); |
243 | 0 | const auto* column = arguments[col_idx].column; |
244 | 0 | if (arguments[col_idx].serde->write_column_to_mysql_text( |
245 | 0 | *column, write_buffer, col_index, options)) { |
246 | 0 | write_buffer.commit(); |
247 | 0 | auto str = mysql_output_tmp_col->get_data_at(write_buffer_index); |
248 | 0 | row_buffer.push_string(str.data, str.size); |
249 | 0 | write_buffer_index++; |
250 | 0 | } else { |
251 | 0 | row_buffer.push_null(); |
252 | 0 | } |
253 | |
|
254 | 0 | } else { |
255 | 0 | RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql_binary( |
256 | 0 | *(arguments[col_idx].column), row_buffer, row_idx, |
257 | 0 | arguments[col_idx].is_const, options)); |
258 | 0 | } |
259 | 0 | } |
260 | | |
261 | 0 | result->result_batch.rows[row_idx].append(row_buffer.buf(), row_buffer.length()); |
262 | 0 | bytes_sent += row_buffer.length(); |
263 | 0 | row_buffer.reset(); |
264 | 0 | row_buffer.start_binary_row(_output_vexpr_ctxs.size()); |
265 | 0 | } |
266 | 0 | } |
267 | 1 | } |
268 | 1 | { |
269 | 1 | SCOPED_TIMER(_result_send_timer); |
270 | | // If this is a dry run task, no need to send data block |
271 | 1 | if (!_is_dry_run) { |
272 | 1 | status = _sinker->add_batch(state, result); |
273 | 1 | } |
274 | 1 | if (status.ok()) { |
275 | 1 | _written_rows += num_rows; |
276 | 1 | if (!_is_dry_run) { |
277 | 1 | _bytes_sent += bytes_sent; |
278 | 1 | } |
279 | 1 | } else { |
280 | 0 | LOG(WARNING) << "append result batch to sink failed."; |
281 | 0 | } |
282 | 1 | } |
283 | 1 | return status; |
284 | 1 | } |
285 | | |
286 | 1 | Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) { |
287 | 1 | SCOPED_TIMER(_append_row_batch_timer); |
288 | 1 | Status status = Status::OK(); |
289 | 1 | if (UNLIKELY(input_block.rows() == 0)) { |
290 | 0 | return status; |
291 | 0 | } |
292 | | |
293 | 1 | DCHECK(_output_vexpr_ctxs.empty() != true); |
294 | | |
295 | | // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec |
296 | | // failed, just return the error status |
297 | 1 | Block block; |
298 | 1 | RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs, |
299 | 1 | input_block, &block)); |
300 | 1 | const auto total_bytes = block.bytes(); |
301 | | |
302 | 1 | if (total_bytes > config::thrift_max_message_size) [[unlikely]] { |
303 | 0 | const auto total_rows = block.rows(); |
304 | 0 | const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) / |
305 | 0 | config::thrift_max_message_size; |
306 | 0 | const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count; |
307 | |
|
308 | 0 | size_t offset = 0; |
309 | 0 | while (offset < total_rows) { |
310 | 0 | size_t rows = std::min(static_cast<size_t>(sub_block_rows), total_rows - offset); |
311 | 0 | auto sub_block = block.clone_empty(); |
312 | 0 | for (size_t i = 0; i != block.columns(); ++i) { |
313 | 0 | sub_block.get_by_position(i).column = |
314 | 0 | block.get_by_position(i).column->cut(offset, rows); |
315 | 0 | } |
316 | 0 | offset += rows; |
317 | |
|
318 | 0 | RETURN_IF_ERROR(_write_one_block(state, sub_block)); |
319 | 0 | } |
320 | 0 | return Status::OK(); |
321 | 0 | } |
322 | | |
323 | 1 | return _write_one_block(state, block); |
324 | 1 | } |
325 | | |
326 | 0 | Status VMysqlResultWriter::close(Status) { |
327 | 0 | COUNTER_SET(_sent_rows_counter, _written_rows); |
328 | 0 | COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent); |
329 | 0 | return Status::OK(); |
330 | 0 | } |
331 | | |
332 | | } // namespace doris |