Coverage Report

Created: 2026-04-15 19:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vmysql_result_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
#include <gen_cpp/PaloInternalService_types.h>
20
#include <stddef.h>
21
22
#include <memory>
23
#include <vector>
24
25
#include "common/status.h"
26
#include "core/data_type/data_type.h"
27
#include "core/data_type/define_primitive_type.h"
28
#include "exec/sink/writer/result_writer.h"
29
#include "exprs/vexpr_fwd.h"
30
#include "runtime/result_block_buffer.h"
31
#include "runtime/runtime_profile.h"
32
#include "util/mysql_row_buffer.h"
33
34
namespace doris {
35
class RuntimeState;
36
37
class Block;
38
39
class GetResultBatchCtx {
40
public:
41
    using ResultType = TFetchDataResult;
42
    ENABLE_FACTORY_CREATOR(GetResultBatchCtx)
43
    GetResultBatchCtx(PFetchDataResult* result, google::protobuf::Closure* done)
44
1
            : _result(result), _done(done) {}
45
#ifdef BE_TEST
46
3
    GetResultBatchCtx() = default;
47
#endif
48
4
    MOCK_FUNCTION ~GetResultBatchCtx() = default;
49
    MOCK_FUNCTION void on_failure(const Status& status);
50
    MOCK_FUNCTION void on_close(int64_t packet_seq, int64_t returned_rows = 0);
51
    MOCK_FUNCTION Status on_data(const std::shared_ptr<TFetchDataResult>& t_result,
52
                                 int64_t packet_seq, ResultBlockBufferBase* buffer);
53
54
private:
55
#ifndef BE_TEST
56
    const int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
57
#else
58
    int32_t _max_msg_size = std::numeric_limits<int32_t>::max();
59
#endif
60
61
    PFetchDataResult* _result = nullptr;
62
    google::protobuf::Closure* _done = nullptr;
63
};
64
65
using MySQLResultBlockBuffer = ResultBlockBuffer<GetResultBatchCtx>;
66
67
class VMysqlResultWriter final : public ResultWriter {
68
public:
69
    VMysqlResultWriter(std::shared_ptr<ResultBlockBufferBase> sinker,
70
                       const VExprContextSPtrs& output_vexpr_ctxs, RuntimeProfile* parent_profile,
71
                       bool is_binary_format);
72
73
    Status init(RuntimeState* state) override;
74
75
    Status write(RuntimeState* state, Block& block) override;
76
77
    Status close(Status status) override;
78
79
private:
80
    void _init_profile();
81
    Status _write_one_block(RuntimeState* state, Block& block);
82
83
    std::shared_ptr<MySQLResultBlockBuffer> _sinker = nullptr;
84
85
    const VExprContextSPtrs& _output_vexpr_ctxs;
86
87
    RuntimeProfile* _parent_profile; // parent profile from result sink. not owned
88
    // total time cost on append batch operation
89
    RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
90
    // tuple convert timer, child timer of _append_row_batch_timer
91
    RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
92
    // file write timer, child timer of _append_row_batch_timer
93
    RuntimeProfile::Counter* _result_send_timer = nullptr;
94
    // number of sent rows
95
    RuntimeProfile::Counter* _sent_rows_counter = nullptr;
96
    // size of sent data
97
    RuntimeProfile::Counter* _bytes_sent_counter = nullptr;
98
    // If true, no block will be sent
99
    bool _is_dry_run = false;
100
101
    uint64_t _bytes_sent = 0;
102
103
    const bool _is_binary_format;
104
};
105
} // namespace doris