Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vfile_result_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/Types_types.h>
21
#include <stddef.h>
22
#include <stdint.h>
23
24
#include <cstdint>
25
#include <iosfwd>
26
#include <memory>
27
#include <string>
28
#include <vector>
29
30
#include "common/status.h"
31
#include "core/block/block.h"
32
#include "exec/sink/writer/async_result_writer.h"
33
#include "format/transformer/vfile_format_transformer.h"
34
#include "io/fs/file_writer.h"
35
#include "runtime/descriptors.h"
36
#include "runtime/result_block_buffer.h"
37
#include "runtime/runtime_profile.h"
38
39
namespace doris {
40
class ResultBlockBufferBase;
41
class RuntimeState;
42
43
class GetResultBatchCtx;
44
using MySQLResultBlockBuffer = ResultBlockBuffer<GetResultBatchCtx>;
45
class VExprContext;
46
struct ResultFileOptions;
47
} // namespace doris
48
49
namespace doris {
50
51
// write result to file
52
class VFileResultWriter final : public AsyncResultWriter {
53
public:
54
    VFileResultWriter(const ResultFileOptions* file_option,
55
                      const TStorageBackendType::type storage_type,
56
                      const TUniqueId fragment_instance_id,
57
                      const VExprContextSPtrs& _output_vexpr_ctxs,
58
                      std::shared_ptr<ResultBlockBufferBase> sinker, Block* output_block,
59
                      bool output_object_data, const RowDescriptor& output_row_descriptor,
60
                      std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
61
62
    VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
63
                      std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
64
65
    Status write(RuntimeState* state, Block& block) override;
66
67
    Status close(Status exec_status) override;
68
69
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
70
71
    // file result writer always return statistic result in one row
72
0
    int64_t get_written_rows() const override { return 1; }
73
74
0
    void set_header_info(const std::string& header_type, const std::string& header) {
75
0
        _header_type = header_type;
76
0
        _header = header;
77
0
    }
78
79
private:
80
    Status _write_file(const Block& block);
81
82
    void _init_profile(RuntimeProfile*);
83
84
    Status _create_file_writer(const std::string& file_name);
85
    Status _create_next_file_writer();
86
    // get next export file name
87
    Status _get_next_file_name(std::string* file_name);
88
    void _get_file_url(std::string* file_url);
89
    std::string _file_format_to_name();
90
    // close file writer, and if !done, it will create new writer for next file.
91
    Status _close_file_writer(bool done);
92
    // create a new file if current file size exceed limit
93
    Status _create_new_file_if_exceed_size();
94
    // send the final statistic result
95
    Status _send_result();
96
    // save result into batch rather than send it
97
    Status _fill_result_block();
98
    // delete the dir of file_path
99
    Status _delete_dir();
100
    double _get_write_speed(int64_t write_bytes, int64_t write_time);
101
    std::string _compression_type_to_name();
102
103
private:
104
    RuntimeState* _state; // not owned, set when init
105
    const ResultFileOptions* _file_opts = nullptr;
106
    TStorageBackendType::type _storage_type;
107
    TUniqueId _fragment_instance_id;
108
109
    // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter.
110
    // If the result file format is Parquet, this _file_writer is owned by _parquet_writer.
111
    std::unique_ptr<doris::io::FileWriter> _file_writer_impl;
112
    // Used to buffer the export data of plain text
113
    // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling
114
    // file writer's write() for every single row.
115
    // But this cannot solve the problem of a row of data that is too large.
116
    // For example: bitmap_to_string() may return large volume of data.
117
    // And the speed is relative low, in my test, is about 6.5MB/s.
118
    std::stringstream _plain_text_outstream;
119
120
    // current written bytes, used for split data
121
    int64_t _current_written_bytes = 0;
122
    // the suffix idx of export file name, start at 0
123
    int _file_idx = 0;
124
125
    // total time cost on append batch operation
126
    RuntimeProfile::Counter* _append_row_batch_timer = nullptr;
127
    // tuple convert timer, child timer of _append_row_batch_timer
128
    RuntimeProfile::Counter* _convert_tuple_timer = nullptr;
129
    // file write timer, child timer of _append_row_batch_timer
130
    RuntimeProfile::Counter* _file_write_timer = nullptr;
131
    // time of closing the file writer
132
    RuntimeProfile::Counter* _writer_close_timer = nullptr;
133
    // number of written rows
134
    RuntimeProfile::Counter* _written_rows_counter = nullptr;
135
    // bytes of written data
136
    RuntimeProfile::Counter* _written_data_bytes = nullptr;
137
138
    // _sinker and _output_batch are not owned by FileResultWriter
139
    std::shared_ptr<MySQLResultBlockBuffer> _sinker = nullptr;
140
    Block* _output_block = nullptr;
141
    // set to true if the final statistic result is sent
142
    bool _is_result_sent = false;
143
    RowDescriptor _output_row_descriptor;
144
    // convert block to parquet/orc/csv fomrat
145
    std::unique_ptr<VFileFormatTransformer> _vfile_writer;
146
147
    std::string_view _header_type;
148
    std::string_view _header;
149
};
150
} // namespace doris