Coverage Report

Created: 2026-03-16 15:31

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vmysql_result_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vmysql_result_writer.h"
19
20
#include <fmt/core.h>
21
#include <gen_cpp/Data_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/internal_service.pb.h>
25
#include <glog/logging.h>
26
#include <stdint.h>
27
#include <string.h>
28
#include <sys/types.h>
29
30
#include <ostream>
31
#include <string>
32
33
#include "common/cast_set.h"
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/config.h"
36
#include "core/block/block.h"
37
#include "core/block/column_with_type_and_name.h"
38
#include "core/column/column.h"
39
#include "core/column/column_const.h"
40
#include "core/data_type/data_type_array.h"
41
#include "core/data_type/data_type_decimal.h"
42
#include "core/data_type/data_type_map.h"
43
#include "core/data_type/data_type_nullable.h"
44
#include "core/data_type/data_type_struct.h"
45
#include "core/data_type/define_primitive_type.h"
46
#include "core/data_type_serde/data_type_serde.h"
47
#include "core/types.h"
48
#include "exprs/aggregate/aggregate_function.h"
49
#include "exprs/vexpr.h"
50
#include "exprs/vexpr_context.h"
51
#include "runtime/result_block_buffer.h"
52
#include "runtime/runtime_state.h"
53
#include "util/mysql_global.h"
54
55
namespace doris {
56
#include "common/compile_check_begin.h"
57
58
1
void GetResultBatchCtx::on_failure(const Status& status) {
59
1
    DCHECK(!status.ok()) << "status is ok, errmsg=" << status;
60
1
    status.to_protobuf(_result->mutable_status());
61
1
    _done->Run();
62
1
}
63
64
1
void GetResultBatchCtx::on_close(int64_t packet_seq, int64_t returned_rows) {
65
1
    Status status;
66
1
    status.to_protobuf(_result->mutable_status());
67
1
    PQueryStatistics* statistics = _result->mutable_query_statistics();
68
1
    statistics->set_returned_rows(returned_rows);
69
1
    _result->set_packet_seq(packet_seq);
70
1
    _result->set_eos(true);
71
1
    _done->Run();
72
1
}
73
74
Status GetResultBatchCtx::on_data(const std::shared_ptr<TFetchDataResult>& t_result,
75
3
                                  int64_t packet_seq, ResultBlockBufferBase* buffer) {
76
3
    Status st = Status::OK();
77
3
    if (t_result != nullptr) {
78
2
        uint8_t* buf = nullptr;
79
2
        uint32_t len = 0;
80
2
        ThriftSerializer ser(false, 4096);
81
2
        RETURN_IF_ERROR(ser.serialize(&t_result->result_batch, &len, &buf));
82
2
        _result->set_row_batch(std::string((const char*)buf, len));
83
2
    } else {
84
1
        _result->clear_row_batch();
85
1
        _result->set_empty_batch(true);
86
1
    }
87
3
    _result->set_packet_seq(packet_seq);
88
3
    _result->set_eos(false);
89
90
    /// The size limit of proto buffer message is 2G
91
3
    if (_result->ByteSizeLong() > _max_msg_size) {
92
1
        st = Status::InternalError("Message size exceeds 2GB: {}", _result->ByteSizeLong());
93
1
        _result->clear_row_batch();
94
1
        _result->set_empty_batch(true);
95
1
    }
96
3
    st.to_protobuf(_result->mutable_status());
97
3
    _done->Run();
98
3
    return Status::OK();
99
3
}
100
101
VMysqlResultWriter::VMysqlResultWriter(std::shared_ptr<ResultBlockBufferBase> sinker,
102
                                       const VExprContextSPtrs& output_vexpr_ctxs,
103
                                       RuntimeProfile* parent_profile, bool is_binary_format)
104
1
        : ResultWriter(),
105
1
          _sinker(std::dynamic_pointer_cast<MySQLResultBlockBuffer>(sinker)),
106
1
          _output_vexpr_ctxs(output_vexpr_ctxs),
107
1
          _parent_profile(parent_profile),
108
1
          _is_binary_format(is_binary_format) {}
109
110
0
Status VMysqlResultWriter::init(RuntimeState* state) {
111
0
    _init_profile();
112
0
    set_output_object_data(state->return_object_data_as_binary());
113
0
    _is_dry_run = state->query_options().dry_run_query;
114
0
    return Status::OK();
115
0
}
116
117
0
void VMysqlResultWriter::_init_profile() {
118
0
    if (_parent_profile != nullptr) {
119
        // for PointQueryExecutor, _parent_profile is null
120
0
        _append_row_batch_timer = ADD_TIMER(_parent_profile, "AppendBatchTime");
121
0
        _convert_tuple_timer =
122
0
                ADD_CHILD_TIMER(_parent_profile, "TupleConvertTime", "AppendBatchTime");
123
0
        _result_send_timer = ADD_CHILD_TIMER(_parent_profile, "ResultSendTime", "AppendBatchTime");
124
0
        _sent_rows_counter = ADD_COUNTER(_parent_profile, "NumSentRows", TUnit::UNIT);
125
0
        _bytes_sent_counter = ADD_COUNTER(_parent_profile, "BytesSent", TUnit::BYTES);
126
0
    }
127
0
}
128
129
1
Status VMysqlResultWriter::_write_one_block(RuntimeState* state, Block& block) {
130
1
    Status status = Status::OK();
131
1
    int num_rows = cast_set<int>(block.rows());
132
    // convert one batch
133
1
    auto result = std::make_shared<TFetchDataResult>();
134
1
    result->result_batch.rows.resize(num_rows);
135
1
    uint64_t bytes_sent = 0;
136
1
    {
137
1
        SCOPED_TIMER(_convert_tuple_timer);
138
139
1
        struct Arguments {
140
1
            const IColumn* column;
141
1
            bool is_const;
142
1
            DataTypeSerDeSPtr serde;
143
1
            PrimitiveType type;
144
1
        };
145
1
        auto options = DataTypeSerDe::get_default_format_options();
146
1
        options.timezone = &state->timezone_obj();
147
148
1
        const size_t num_cols = _output_vexpr_ctxs.size();
149
1
        std::vector<Arguments> arguments;
150
1
        arguments.reserve(num_cols);
151
152
9
        for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
153
8
            const auto& [column_ptr, col_const] =
154
8
                    unpack_if_const(block.get_by_position(col_idx).column);
155
8
            int scale = _output_vexpr_ctxs[col_idx]->root()->data_type()->get_scale();
156
            // decimalv2 scale and precision is hard code, so we should get real scale and precision
157
            // from expr
158
8
            DataTypeSerDeSPtr serde;
159
8
            if (_output_vexpr_ctxs[col_idx]->root()->data_type()->get_primitive_type() ==
160
8
                PrimitiveType::TYPE_DECIMALV2) {
161
0
                if (_output_vexpr_ctxs[col_idx]->root()->is_nullable()) {
162
0
                    auto nested_serde =
163
0
                            std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale);
164
0
                    serde = std::make_shared<DataTypeNullableSerDe>(nested_serde);
165
0
                } else {
166
0
                    serde = std::make_shared<DataTypeDecimalSerDe<TYPE_DECIMALV2>>(27, scale);
167
0
                }
168
8
            } else {
169
8
                serde = block.get_by_position(col_idx).type->get_serde();
170
8
            }
171
8
            serde->set_return_object_as_string(output_object_data());
172
8
            arguments.emplace_back(column_ptr.get(), col_const, serde,
173
8
                                   block.get_by_position(col_idx).type->get_primitive_type());
174
8
        }
175
176
9
        for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
177
8
            const auto& argument = arguments[col_idx];
178
            // const column will only have 1 row, see unpack_if_const
179
8
            if (argument.column->size() < num_rows && !argument.is_const) {
180
0
                return Status::InternalError(
181
0
                        "Required row size is out of range, need {} rows, column {} has {} "
182
0
                        "rows in fact.",
183
0
                        num_rows, argument.column->get_name(), argument.column->size());
184
0
            }
185
8
        }
186
1
        auto mysql_output_tmp_col = ColumnString::create();
187
1
        BufferWriter write_buffer(*mysql_output_tmp_col);
188
1
        size_t write_buffer_index = 0;
189
        // For non-binary format, we need to call different serialization interfaces
190
        // write_column_to_mysql/presto/hive text
191
1
        if (!_is_binary_format) {
192
1
            const auto& serde_dialect = state->query_options().serde_dialect;
193
1
            auto write_to_text = [serde_dialect](DataTypeSerDeSPtr& serde, const IColumn* column,
194
1
                                                 BufferWriter& write_buffer, size_t col_index,
195
56
                                                 const DataTypeSerDe::FormatOptions& options) {
196
56
                if (serde_dialect == TSerdeDialect::DORIS) {
197
56
                    return serde->write_column_to_mysql_text(*column, write_buffer, col_index,
198
56
                                                             options);
199
56
                } else if (serde_dialect == TSerdeDialect::PRESTO) {
200
0
                    return serde->write_column_to_presto_text(*column, write_buffer, col_index,
201
0
                                                              options);
202
0
                } else if (serde_dialect == TSerdeDialect::HIVE) {
203
0
                    return serde->write_column_to_hive_text(*column, write_buffer, col_index,
204
0
                                                            options);
205
0
                } else {
206
0
                    return false;
207
0
                }
208
56
            };
209
210
8
            for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
211
7
                auto& mysql_rows = result->result_batch.rows[row_idx];
212
63
                for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
213
56
                    const auto col_index = index_check_const(row_idx, arguments[col_idx].is_const);
214
56
                    const auto* column = arguments[col_idx].column;
215
56
                    if (write_to_text(arguments[col_idx].serde, column, write_buffer, col_index,
216
56
                                      options)) {
217
56
                        write_buffer.commit();
218
56
                        auto str = mysql_output_tmp_col->get_data_at(write_buffer_index);
219
56
                        direct_write_to_mysql_result_string(mysql_rows, str.data, str.size);
220
56
                        write_buffer_index++;
221
56
                    } else {
222
0
                        direct_write_to_mysql_result_null(mysql_rows);
223
0
                    }
224
56
                }
225
7
                bytes_sent += mysql_rows.size();
226
7
            }
227
1
        } else {
228
0
            MysqlRowBinaryBuffer row_buffer;
229
230
0
            row_buffer.start_binary_row(_output_vexpr_ctxs.size());
231
232
0
            for (int row_idx = 0; row_idx < num_rows; ++row_idx) {
233
0
                for (size_t col_idx = 0; col_idx < num_cols; ++col_idx) {
234
0
                    auto type = arguments[col_idx].type;
235
0
                    if (type == PrimitiveType::TYPE_ARRAY || type == PrimitiveType::TYPE_MAP ||
236
0
                        type == PrimitiveType::TYPE_STRUCT ||
237
0
                        type == PrimitiveType::TYPE_QUANTILE_STATE ||
238
0
                        type == PrimitiveType::TYPE_HLL || type == PrimitiveType::TYPE_BITMAP) {
239
                        // Complex types are not supported in binary format yet
240
                        // So use text format serialization interface here
241
0
                        const auto col_index =
242
0
                                index_check_const(row_idx, arguments[col_idx].is_const);
243
0
                        const auto* column = arguments[col_idx].column;
244
0
                        if (arguments[col_idx].serde->write_column_to_mysql_text(
245
0
                                    *column, write_buffer, col_index, options)) {
246
0
                            write_buffer.commit();
247
0
                            auto str = mysql_output_tmp_col->get_data_at(write_buffer_index);
248
0
                            row_buffer.push_string(str.data, str.size);
249
0
                            write_buffer_index++;
250
0
                        } else {
251
0
                            row_buffer.push_null();
252
0
                        }
253
254
0
                    } else {
255
0
                        RETURN_IF_ERROR(arguments[col_idx].serde->write_column_to_mysql_binary(
256
0
                                *(arguments[col_idx].column), row_buffer, row_idx,
257
0
                                arguments[col_idx].is_const, options));
258
0
                    }
259
0
                }
260
261
0
                result->result_batch.rows[row_idx].append(row_buffer.buf(), row_buffer.length());
262
0
                bytes_sent += row_buffer.length();
263
0
                row_buffer.reset();
264
0
                row_buffer.start_binary_row(_output_vexpr_ctxs.size());
265
0
            }
266
0
        }
267
1
    }
268
1
    {
269
1
        SCOPED_TIMER(_result_send_timer);
270
        // If this is a dry run task, no need to send data block
271
1
        if (!_is_dry_run) {
272
1
            status = _sinker->add_batch(state, result);
273
1
        }
274
1
        if (status.ok()) {
275
1
            _written_rows += num_rows;
276
1
            if (!_is_dry_run) {
277
1
                _bytes_sent += bytes_sent;
278
1
            }
279
1
        } else {
280
0
            LOG(WARNING) << "append result batch to sink failed.";
281
0
        }
282
1
    }
283
1
    return status;
284
1
}
285
286
1
Status VMysqlResultWriter::write(RuntimeState* state, Block& input_block) {
287
1
    SCOPED_TIMER(_append_row_batch_timer);
288
1
    Status status = Status::OK();
289
1
    if (UNLIKELY(input_block.rows() == 0)) {
290
0
        return status;
291
0
    }
292
293
1
    DCHECK(_output_vexpr_ctxs.empty() != true);
294
295
    // Exec vectorized expr here to speed up, block.rows() == 0 means expr exec
296
    // failed, just return the error status
297
1
    Block block;
298
1
    RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_output_vexpr_ctxs,
299
1
                                                                       input_block, &block));
300
1
    const auto total_bytes = block.bytes();
301
302
1
    if (total_bytes > config::thrift_max_message_size) [[unlikely]] {
303
0
        const auto total_rows = block.rows();
304
0
        const auto sub_block_count = (total_bytes + config::thrift_max_message_size - 1) /
305
0
                                     config::thrift_max_message_size;
306
0
        const auto sub_block_rows = (total_rows + sub_block_count - 1) / sub_block_count;
307
308
0
        size_t offset = 0;
309
0
        while (offset < total_rows) {
310
0
            size_t rows = std::min(static_cast<size_t>(sub_block_rows), total_rows - offset);
311
0
            auto sub_block = block.clone_empty();
312
0
            for (size_t i = 0; i != block.columns(); ++i) {
313
0
                sub_block.get_by_position(i).column =
314
0
                        block.get_by_position(i).column->cut(offset, rows);
315
0
            }
316
0
            offset += rows;
317
318
0
            RETURN_IF_ERROR(_write_one_block(state, sub_block));
319
0
        }
320
0
        return Status::OK();
321
0
    }
322
323
1
    return _write_one_block(state, block);
324
1
}
325
326
0
Status VMysqlResultWriter::close(Status) {
327
0
    COUNTER_SET(_sent_rows_counter, _written_rows);
328
0
    COUNTER_UPDATE(_bytes_sent_counter, _bytes_sent);
329
0
    return Status::OK();
330
0
}
331
332
} // namespace doris