be/src/exec/sink/writer/vhive_partition_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vhive_partition_writer.h" |
19 | | |
20 | | #include <aws/s3/model/CompletedPart.h> |
21 | | |
22 | | #include "core/block/materialize_block.h" |
23 | | #include "core/column/column_map.h" |
24 | | #include "format/transformer/vcsv_transformer.h" |
25 | | #include "format/transformer/vorc_transformer.h" |
26 | | #include "format/transformer/vparquet_transformer.h" |
27 | | #include "io/file_factory.h" |
28 | | #include "io/fs/s3_file_writer.h" |
29 | | #include "runtime/runtime_state.h" |
30 | | |
31 | | namespace doris { |
32 | | |
33 | | VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name, |
34 | | TUpdateMode::type update_mode, |
35 | | const VExprContextSPtrs& write_output_expr_ctxs, |
36 | | std::vector<std::string> write_column_names, |
37 | | WriteInfo write_info, std::string file_name, |
38 | | int file_name_index, |
39 | | TFileFormatType::type file_format_type, |
40 | | TFileCompressType::type hive_compress_type, |
41 | | const THiveSerDeProperties* hive_serde_properties, |
42 | | const std::map<std::string, std::string>& hadoop_conf) |
43 | 0 | : _partition_name(std::move(partition_name)), |
44 | 0 | _update_mode(update_mode), |
45 | 0 | _write_output_expr_ctxs(write_output_expr_ctxs), |
46 | 0 | _write_column_names(std::move(write_column_names)), |
47 | 0 | _write_info(std::move(write_info)), |
48 | 0 | _file_name(std::move(file_name)), |
49 | 0 | _file_name_index(file_name_index), |
50 | 0 | _file_format_type(file_format_type), |
51 | 0 | _hive_compress_type(hive_compress_type), |
52 | 0 | _hive_serde_properties(hive_serde_properties), |
53 | 0 | _hadoop_conf(hadoop_conf) {} |
54 | | |
55 | 0 | Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) { |
56 | 0 | _state = state; |
57 | |
|
58 | 0 | io::FSPropertiesRef fs_properties(_write_info.file_type); |
59 | 0 | fs_properties.properties = &_hadoop_conf; |
60 | 0 | if (!_write_info.broker_addresses.empty()) { |
61 | 0 | fs_properties.broker_addresses = &(_write_info.broker_addresses); |
62 | 0 | } |
63 | 0 | io::FileDescription file_description = { |
64 | 0 | .path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()), |
65 | 0 | .fs_name {}}; |
66 | 0 | _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description)); |
67 | 0 | io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true}; |
68 | 0 | RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options)); |
69 | | |
70 | 0 | switch (_file_format_type) { |
71 | 0 | case TFileFormatType::FORMAT_PARQUET: { |
72 | 0 | TParquetCompressionType::type parquet_compression_type; |
73 | 0 | switch (_hive_compress_type) { |
74 | 0 | case TFileCompressType::PLAIN: { |
75 | 0 | parquet_compression_type = TParquetCompressionType::UNCOMPRESSED; |
76 | 0 | break; |
77 | 0 | } |
78 | 0 | case TFileCompressType::SNAPPYBLOCK: { |
79 | 0 | parquet_compression_type = TParquetCompressionType::SNAPPY; |
80 | 0 | break; |
81 | 0 | } |
82 | 0 | case TFileCompressType::ZSTD: { |
83 | 0 | parquet_compression_type = TParquetCompressionType::ZSTD; |
84 | 0 | break; |
85 | 0 | } |
86 | 0 | default: { |
87 | 0 | return Status::InternalError("Unsupported hive compress type {} with parquet", |
88 | 0 | to_string(_hive_compress_type)); |
89 | 0 | } |
90 | 0 | } |
91 | 0 | ParquetFileOptions parquet_options = {parquet_compression_type, |
92 | 0 | TParquetVersion::PARQUET_1_0, false, true}; |
93 | 0 | _file_format_transformer = std::make_unique<VParquetTransformer>( |
94 | 0 | state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, false, |
95 | 0 | parquet_options); |
96 | 0 | return _file_format_transformer->open(); |
97 | 0 | } |
98 | 0 | case TFileFormatType::FORMAT_ORC: { |
99 | 0 | _file_format_transformer = std::make_unique<VOrcTransformer>( |
100 | 0 | state, _file_writer.get(), _write_output_expr_ctxs, "", _write_column_names, false, |
101 | 0 | _hive_compress_type); |
102 | 0 | return _file_format_transformer->open(); |
103 | 0 | } |
104 | 0 | case TFileFormatType::FORMAT_CSV_PLAIN: { |
105 | | // TODO(syt): support hive csv table, only hive text file is supportted now |
106 | 0 | _file_format_transformer = std::make_unique<VCSVTransformer>( |
107 | 0 | state, _file_writer.get(), _write_output_expr_ctxs, false, "csv", "", |
108 | 0 | _hive_serde_properties->field_delim, _hive_serde_properties->line_delim, false, |
109 | 0 | _hive_compress_type, _hive_serde_properties); |
110 | 0 | return _file_format_transformer->open(); |
111 | 0 | } |
112 | 0 | default: { |
113 | 0 | return Status::InternalError("Unsupported file format type {}", |
114 | 0 | to_string(_file_format_type)); |
115 | 0 | } |
116 | 0 | } |
117 | 0 | } |
118 | | |
119 | 0 | Status VHivePartitionWriter::close(const Status& status) { |
120 | 0 | Status result_status; |
121 | 0 | if (_file_format_transformer != nullptr) { |
122 | 0 | result_status = _file_format_transformer->close(); |
123 | 0 | if (!result_status.ok()) { |
124 | 0 | LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}", |
125 | 0 | result_status.to_string()); |
126 | 0 | } |
127 | 0 | } |
128 | 0 | bool status_ok = result_status.ok() && status.ok(); |
129 | 0 | if (!status_ok && _fs != nullptr) { |
130 | 0 | auto path = fmt::format("{}/{}", _write_info.write_path, _file_name); |
131 | 0 | Status st = _fs->delete_file(path); |
132 | 0 | if (!st.ok()) { |
133 | 0 | LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path, st.to_string()); |
134 | 0 | } |
135 | 0 | } |
136 | 0 | if (status_ok) { |
137 | 0 | auto partition_update = _build_partition_update(); |
138 | 0 | _state->add_hive_partition_updates(partition_update); |
139 | 0 | } |
140 | 0 | return result_status; |
141 | 0 | } |
142 | | |
143 | 0 | Status VHivePartitionWriter::write(Block& block) { |
144 | 0 | RETURN_IF_ERROR(_file_format_transformer->write(block)); |
145 | 0 | _row_count += block.rows(); |
146 | 0 | return Status::OK(); |
147 | 0 | } |
148 | | |
149 | 0 | THivePartitionUpdate VHivePartitionWriter::_build_partition_update() { |
150 | 0 | THivePartitionUpdate hive_partition_update; |
151 | 0 | hive_partition_update.__set_name(_partition_name); |
152 | 0 | hive_partition_update.__set_update_mode(_update_mode); |
153 | 0 | THiveLocationParams location; |
154 | 0 | location.__set_write_path(_write_info.original_write_path); |
155 | 0 | location.__set_target_path(_write_info.target_path); |
156 | 0 | hive_partition_update.__set_location(location); |
157 | 0 | hive_partition_update.__set_file_names({_get_target_file_name()}); |
158 | 0 | hive_partition_update.__set_row_count(_row_count); |
159 | 0 | DCHECK(_file_format_transformer != nullptr); |
160 | 0 | hive_partition_update.__set_file_size(_file_format_transformer->written_len()); |
161 | |
|
162 | 0 | if (_write_info.file_type == TFileType::FILE_S3) { |
163 | 0 | DCHECK(_file_writer != nullptr); |
164 | 0 | doris::io::S3FileWriter* s3_mpu_file_writer = |
165 | 0 | dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get()); |
166 | 0 | DCHECK(s3_mpu_file_writer != nullptr); |
167 | 0 | TS3MPUPendingUpload s3_mpu_pending_upload; |
168 | 0 | s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket()); |
169 | 0 | s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key()); |
170 | 0 | s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id()); |
171 | |
|
172 | 0 | std::map<int, std::string> etags; |
173 | 0 | for (auto& completed_part : s3_mpu_file_writer->completed_parts()) { |
174 | 0 | etags.insert({completed_part.part_num, completed_part.etag}); |
175 | 0 | } |
176 | 0 | s3_mpu_pending_upload.__set_etags(etags); |
177 | 0 | hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload}); |
178 | 0 | } |
179 | 0 | return hive_partition_update; |
180 | 0 | } |
181 | | |
182 | | std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type file_format_type, |
183 | 0 | TFileCompressType::type write_compress_type) { |
184 | 0 | std::string compress_name; |
185 | 0 | switch (write_compress_type) { |
186 | 0 | case TFileCompressType::SNAPPYBLOCK: { |
187 | 0 | compress_name = ".snappy"; |
188 | 0 | break; |
189 | 0 | } |
190 | 0 | case TFileCompressType::ZLIB: { |
191 | 0 | compress_name = ".zlib"; |
192 | 0 | break; |
193 | 0 | } |
194 | 0 | case TFileCompressType::ZSTD: { |
195 | 0 | compress_name = ".zst"; |
196 | 0 | break; |
197 | 0 | } |
198 | 0 | case TFileCompressType::GZ: { |
199 | 0 | compress_name = ".gz"; |
200 | 0 | break; |
201 | 0 | } |
202 | 0 | case TFileCompressType::BZ2: { |
203 | 0 | compress_name = ".bz2"; |
204 | 0 | break; |
205 | 0 | } |
206 | 0 | case TFileCompressType::LZ4BLOCK: { |
207 | 0 | compress_name = ".lz4"; |
208 | 0 | break; |
209 | 0 | } |
210 | 0 | default: { |
211 | 0 | compress_name = ""; |
212 | 0 | break; |
213 | 0 | } |
214 | 0 | } |
215 | | |
216 | 0 | std::string file_format_name; |
217 | 0 | switch (file_format_type) { |
218 | 0 | case TFileFormatType::FORMAT_PARQUET: { |
219 | 0 | file_format_name = ".parquet"; |
220 | 0 | break; |
221 | 0 | } |
222 | 0 | case TFileFormatType::FORMAT_ORC: { |
223 | 0 | file_format_name = ".orc"; |
224 | 0 | break; |
225 | 0 | } |
226 | 0 | default: { |
227 | 0 | file_format_name = ""; |
228 | 0 | break; |
229 | 0 | } |
230 | 0 | } |
231 | 0 | return fmt::format("{}{}", compress_name, file_format_name); |
232 | 0 | } |
233 | | |
234 | 0 | std::string VHivePartitionWriter::_get_target_file_name() { |
235 | 0 | return fmt::format("{}-{}{}", _file_name, _file_name_index, |
236 | 0 | _get_file_extension(_file_format_type, _hive_compress_type)); |
237 | 0 | } |
238 | | |
239 | | } // namespace doris |