be/src/exec/sink/writer/vfile_result_writer.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/Types_types.h> |
21 | | #include <stddef.h> |
22 | | #include <stdint.h> |
23 | | |
24 | | #include <cstdint> |
25 | | #include <iosfwd> |
26 | | #include <memory> |
27 | | #include <string> |
28 | | #include <vector> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/block/block.h" |
32 | | #include "exec/sink/writer/async_result_writer.h" |
33 | | #include "format/transformer/vfile_format_transformer.h" |
34 | | #include "io/fs/file_writer.h" |
35 | | #include "runtime/descriptors.h" |
36 | | #include "runtime/result_block_buffer.h" |
37 | | #include "runtime/runtime_profile.h" |
38 | | |
39 | | namespace doris { |
40 | | class ResultBlockBufferBase; |
41 | | class RuntimeState; |
42 | | |
43 | | class GetResultBatchCtx; |
44 | | using MySQLResultBlockBuffer = ResultBlockBuffer<GetResultBatchCtx>; |
45 | | class VExprContext; |
46 | | struct ResultFileOptions; |
47 | | } // namespace doris |
48 | | |
49 | | namespace doris { |
50 | | |
51 | | // write result to file |
52 | | class VFileResultWriter final : public AsyncResultWriter { |
53 | | public: |
54 | | VFileResultWriter(const ResultFileOptions* file_option, |
55 | | const TStorageBackendType::type storage_type, |
56 | | const TUniqueId fragment_instance_id, |
57 | | const VExprContextSPtrs& _output_vexpr_ctxs, |
58 | | std::shared_ptr<ResultBlockBufferBase> sinker, Block* output_block, |
59 | | bool output_object_data, const RowDescriptor& output_row_descriptor, |
60 | | std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep); |
61 | | |
62 | | VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
63 | | std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep); |
64 | | |
65 | | Status write(RuntimeState* state, Block& block) override; |
66 | | |
67 | | Status close(Status exec_status) override; |
68 | | |
69 | | Status open(RuntimeState* state, RuntimeProfile* profile) override; |
70 | | |
71 | | // file result writer always return statistic result in one row |
72 | 0 | int64_t get_written_rows() const override { return 1; } |
73 | | |
74 | 0 | void set_header_info(const std::string& header_type, const std::string& header) { |
75 | 0 | _header_type = header_type; |
76 | 0 | _header = header; |
77 | 0 | } |
78 | | |
79 | | private: |
80 | | Status _write_file(const Block& block); |
81 | | |
82 | | void _init_profile(RuntimeProfile*); |
83 | | |
84 | | Status _create_file_writer(const std::string& file_name); |
85 | | Status _create_next_file_writer(); |
86 | | // get next export file name |
87 | | Status _get_next_file_name(std::string* file_name); |
88 | | void _get_file_url(std::string* file_url); |
89 | | std::string _file_format_to_name(); |
90 | | // close file writer, and if !done, it will create new writer for next file. |
91 | | Status _close_file_writer(bool done); |
92 | | // create a new file if current file size exceed limit |
93 | | Status _create_new_file_if_exceed_size(); |
94 | | // send the final statistic result |
95 | | Status _send_result(); |
96 | | // save result into batch rather than send it |
97 | | Status _fill_result_block(); |
98 | | // delete the dir of file_path |
99 | | Status _delete_dir(); |
100 | | double _get_write_speed(int64_t write_bytes, int64_t write_time); |
101 | | std::string _compression_type_to_name(); |
102 | | |
103 | | private: |
104 | | RuntimeState* _state; // not owned, set when init |
105 | | const ResultFileOptions* _file_opts = nullptr; |
106 | | TStorageBackendType::type _storage_type; |
107 | | TUniqueId _fragment_instance_id; |
108 | | |
109 | | // If the result file format is plain text, like CSV, this _file_writer is owned by this FileResultWriter. |
110 | | // If the result file format is Parquet, this _file_writer is owned by _parquet_writer. |
111 | | std::unique_ptr<doris::io::FileWriter> _file_writer_impl; |
112 | | // Used to buffer the export data of plain text |
113 | | // TODO(cmy): I simply use a stringstrteam to buffer the data, to avoid calling |
114 | | // file writer's write() for every single row. |
115 | | // But this cannot solve the problem of a row of data that is too large. |
116 | | // For example: bitmap_to_string() may return large volume of data. |
117 | | // And the speed is relative low, in my test, is about 6.5MB/s. |
118 | | std::stringstream _plain_text_outstream; |
119 | | |
120 | | // current written bytes, used for split data |
121 | | int64_t _current_written_bytes = 0; |
122 | | // the suffix idx of export file name, start at 0 |
123 | | int _file_idx = 0; |
124 | | |
125 | | // total time cost on append batch operation |
126 | | RuntimeProfile::Counter* _append_row_batch_timer = nullptr; |
127 | | // tuple convert timer, child timer of _append_row_batch_timer |
128 | | RuntimeProfile::Counter* _convert_tuple_timer = nullptr; |
129 | | // file write timer, child timer of _append_row_batch_timer |
130 | | RuntimeProfile::Counter* _file_write_timer = nullptr; |
131 | | // time of closing the file writer |
132 | | RuntimeProfile::Counter* _writer_close_timer = nullptr; |
133 | | // number of written rows |
134 | | RuntimeProfile::Counter* _written_rows_counter = nullptr; |
135 | | // bytes of written data |
136 | | RuntimeProfile::Counter* _written_data_bytes = nullptr; |
137 | | |
138 | | // _sinker and _output_batch are not owned by FileResultWriter |
139 | | std::shared_ptr<MySQLResultBlockBuffer> _sinker = nullptr; |
140 | | Block* _output_block = nullptr; |
141 | | // set to true if the final statistic result is sent |
142 | | bool _is_result_sent = false; |
143 | | RowDescriptor _output_row_descriptor; |
144 | | // convert block to parquet/orc/csv fomrat |
145 | | std::unique_ptr<VFileFormatTransformer> _vfile_writer; |
146 | | |
147 | | std::string_view _header_type; |
148 | | std::string_view _header; |
149 | | }; |
150 | | } // namespace doris |