be/src/exec/sink/writer/vfile_result_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vfile_result_writer.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Data_types.h> |
22 | | #include <gen_cpp/Metrics_types.h> |
23 | | #include <gen_cpp/PaloInternalService_types.h> |
24 | | #include <gen_cpp/PlanNodes_types.h> |
25 | | #include <glog/logging.h> |
26 | | |
27 | | #include <ostream> |
28 | | #include <utility> |
29 | | |
30 | | #include "common/compiler_util.h" // IWYU pragma: keep |
31 | | #include "common/consts.h" |
32 | | #include "common/status.h" |
33 | | #include "core/block/block.h" |
34 | | #include "core/column/column_string.h" |
35 | | #include "core/column/column_vector.h" |
36 | | #include "core/data_type/define_primitive_type.h" |
37 | | #include "core/data_type/primitive_type.h" |
38 | | #include "core/value/decimalv2_value.h" |
39 | | #include "core/value/large_int_value.h" |
40 | | #include "exec/operator/result_sink_operator.h" |
41 | | #include "exec/sink/writer/vmysql_result_writer.h" |
42 | | #include "exprs/function/cast/cast_to_string.h" |
43 | | #include "exprs/vexpr.h" |
44 | | #include "exprs/vexpr_context.h" |
45 | | #include "format/transformer/vcsv_transformer.h" |
46 | | #include "format/transformer/vnative_transformer.h" |
47 | | #include "format/transformer/vorc_transformer.h" |
48 | | #include "format/transformer/vparquet_transformer.h" |
49 | | #include "io/file_factory.h" |
50 | | #include "io/fs/broker_file_system.h" |
51 | | #include "io/fs/file_system.h" |
52 | | #include "io/fs/file_writer.h" |
53 | | #include "io/fs/hdfs_file_system.h" |
54 | | #include "io/fs/local_file_system.h" |
55 | | #include "io/fs/s3_file_system.h" |
56 | | #include "io/hdfs_builder.h" |
57 | | #include "runtime/descriptors.h" |
58 | | #include "runtime/result_block_buffer.h" |
59 | | #include "runtime/runtime_state.h" |
60 | | #include "service/backend_options.h" |
61 | | #include "util/mysql_row_buffer.h" |
62 | | #include "util/s3_uri.h" |
63 | | #include "util/s3_util.h" |
64 | | #include "util/uid_util.h" |
65 | | |
66 | | namespace doris { |
67 | | |
68 | | static double nons_to_second = 1000000000.00; |
69 | | |
70 | | VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
71 | | std::shared_ptr<Dependency> dep, |
72 | | std::shared_ptr<Dependency> fin_dep) |
73 | 0 | : AsyncResultWriter(output_exprs, dep, fin_dep) {} |
74 | | |
75 | | VFileResultWriter::VFileResultWriter( |
76 | | const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type, |
77 | | const TUniqueId fragment_instance_id, const VExprContextSPtrs& output_vexpr_ctxs, |
78 | | std::shared_ptr<ResultBlockBufferBase> sinker, Block* output_block, bool output_object_data, |
79 | | const RowDescriptor& output_row_descriptor, std::shared_ptr<Dependency> dep, |
80 | | std::shared_ptr<Dependency> fin_dep) |
81 | 0 | : AsyncResultWriter(output_vexpr_ctxs, dep, fin_dep), |
82 | 0 | _file_opts(file_opts), |
83 | 0 | _storage_type(storage_type), |
84 | 0 | _fragment_instance_id(fragment_instance_id), |
85 | 0 | _sinker(std::dynamic_pointer_cast<MySQLResultBlockBuffer>(sinker)), |
86 | 0 | _output_block(output_block), |
87 | 0 | _output_row_descriptor(output_row_descriptor) { |
88 | 0 | _output_object_data = output_object_data; |
89 | 0 | } |
90 | | |
91 | 0 | Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) { |
92 | 0 | _state = state; |
93 | 0 | _init_profile(profile); |
94 | | // check orc writer version |
95 | 0 | if (_file_opts->file_format == TFileFormatType::FORMAT_ORC && |
96 | 0 | _file_opts->orc_writer_version < 1) { |
97 | 0 | return Status::InternalError("orc writer version is less than 1."); |
98 | 0 | } |
99 | | // Delete existing files |
100 | 0 | if (_file_opts->delete_existing_files) { |
101 | 0 | RETURN_IF_ERROR(_delete_dir()); |
102 | 0 | } |
103 | 0 | return _create_next_file_writer(); |
104 | 0 | } |
105 | | |
106 | 0 | void VFileResultWriter::_init_profile(RuntimeProfile* parent_profile) { |
107 | 0 | RuntimeProfile* profile = parent_profile->create_child("VFileResultWriter", true, true); |
108 | 0 | _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime"); |
109 | 0 | _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime"); |
110 | 0 | _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime"); |
111 | 0 | _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime"); |
112 | 0 | _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT); |
113 | 0 | _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES); |
114 | 0 | } |
115 | | |
116 | 0 | Status VFileResultWriter::_create_next_file_writer() { |
117 | 0 | std::string file_name; |
118 | 0 | RETURN_IF_ERROR(_get_next_file_name(&file_name)); |
119 | 0 | return _create_file_writer(file_name); |
120 | 0 | } |
121 | | |
122 | 0 | Status VFileResultWriter::_create_file_writer(const std::string& file_name) { |
123 | 0 | auto file_type = DORIS_TRY(FileFactory::convert_storage_type(_storage_type)); |
124 | 0 | _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer( |
125 | 0 | file_type, _state->exec_env(), _file_opts->broker_addresses, |
126 | 0 | _file_opts->broker_properties, file_name, |
127 | 0 | { |
128 | 0 | .write_file_cache = false, |
129 | 0 | .sync_file_data = false, |
130 | 0 | })); |
131 | 0 | switch (_file_opts->file_format) { |
132 | 0 | case TFileFormatType::FORMAT_CSV_PLAIN: |
133 | 0 | _vfile_writer.reset(new VCSVTransformer( |
134 | 0 | _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _output_object_data, |
135 | 0 | _header_type, _header, _file_opts->column_separator, _file_opts->line_delimiter, |
136 | 0 | _file_opts->with_bom, _file_opts->compression_type)); |
137 | 0 | break; |
138 | 0 | case TFileFormatType::FORMAT_PARQUET: |
139 | 0 | _vfile_writer.reset(new VParquetTransformer( |
140 | 0 | _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->parquet_schemas, |
141 | 0 | _output_object_data, |
142 | 0 | {_file_opts->parquet_commpression_type, _file_opts->parquet_version, |
143 | 0 | _file_opts->parquert_disable_dictionary, _file_opts->enable_int96_timestamps})); |
144 | 0 | break; |
145 | 0 | case TFileFormatType::FORMAT_ORC: |
146 | 0 | _vfile_writer.reset(new VOrcTransformer( |
147 | 0 | _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->orc_schema, {}, |
148 | 0 | _output_object_data, _file_opts->orc_compression_type)); |
149 | 0 | break; |
150 | 0 | case TFileFormatType::FORMAT_NATIVE: |
151 | | // Doris Native binary format writer with configurable compression. |
152 | 0 | _vfile_writer.reset(new VNativeTransformer(_state, _file_writer_impl.get(), |
153 | 0 | _vec_output_expr_ctxs, _output_object_data, |
154 | 0 | _file_opts->compression_type)); |
155 | 0 | break; |
156 | 0 | default: |
157 | 0 | return Status::InternalError("unsupported file format: {}", _file_opts->file_format); |
158 | 0 | } |
159 | 0 | LOG(INFO) << "create file for exporting query result. file name: " << file_name |
160 | 0 | << ". query id: " << print_id(_state->query_id()) |
161 | 0 | << " format:" << _file_opts->file_format; |
162 | |
|
163 | 0 | return _vfile_writer->open(); |
164 | 0 | } |
165 | | |
166 | | // file name format as: my_prefix_{fragment_instance_id}_0.csv |
167 | 0 | Status VFileResultWriter::_get_next_file_name(std::string* file_name) { |
168 | 0 | std::string suffix = |
169 | 0 | _file_opts->file_suffix.empty() ? _file_format_to_name() : _file_opts->file_suffix; |
170 | 0 | std::stringstream ss; |
171 | 0 | ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "." |
172 | 0 | << suffix; |
173 | 0 | *file_name = ss.str(); |
174 | 0 | if (_storage_type == TStorageBackendType::LOCAL) { |
175 | | // For local file writer, the file_path is a local dir. |
176 | | // Here we do a simple security verification by checking whether the file exists. |
177 | | // Because the file path is currently arbitrarily specified by the user, |
178 | | // Doris is not responsible for ensuring the correctness of the path. |
179 | | // This is just to prevent overwriting the existing file. |
180 | 0 | bool exists = true; |
181 | 0 | RETURN_IF_ERROR(io::global_local_filesystem()->exists(*file_name, &exists)); |
182 | 0 | if (exists) { |
183 | 0 | return Status::InternalError("File already exists: {}", *file_name); |
184 | 0 | } |
185 | 0 | } |
186 | | |
187 | 0 | return Status::OK(); |
188 | 0 | } |
189 | | |
190 | | // file url format as: |
191 | | // LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_ |
192 | | // S3: {file_path}{fragment_instance_id}_ |
193 | | // BROKER: {file_path}{fragment_instance_id}_ |
194 | | |
195 | 0 | void VFileResultWriter::_get_file_url(std::string* file_url) { |
196 | 0 | std::stringstream ss; |
197 | 0 | if (_storage_type == TStorageBackendType::LOCAL) { |
198 | 0 | ss << "file:///" << BackendOptions::get_localhost(); |
199 | 0 | } |
200 | 0 | ss << _file_opts->file_path; |
201 | 0 | ss << print_id(_fragment_instance_id) << "_"; |
202 | 0 | *file_url = ss.str(); |
203 | 0 | } |
204 | | |
205 | 0 | std::string VFileResultWriter::_file_format_to_name() { |
206 | 0 | switch (_file_opts->file_format) { |
207 | 0 | case TFileFormatType::FORMAT_CSV_PLAIN: |
208 | 0 | return "csv" + _compression_type_to_name(); |
209 | 0 | case TFileFormatType::FORMAT_PARQUET: |
210 | 0 | return "parquet"; |
211 | 0 | case TFileFormatType::FORMAT_ORC: |
212 | 0 | return "orc"; |
213 | 0 | case TFileFormatType::FORMAT_NATIVE: |
214 | 0 | return "native"; |
215 | 0 | default: |
216 | 0 | return "unknown"; |
217 | 0 | } |
218 | 0 | } |
219 | | |
220 | 0 | std::string VFileResultWriter::_compression_type_to_name() { |
221 | 0 | switch (_file_opts->compression_type) { |
222 | 0 | case TFileCompressType::GZ: |
223 | 0 | return ".gz"; |
224 | 0 | case TFileCompressType::BZ2: |
225 | 0 | return ".bz2"; |
226 | 0 | case TFileCompressType::SNAPPYBLOCK: |
227 | 0 | return ".snappy"; |
228 | 0 | case TFileCompressType::LZ4BLOCK: |
229 | 0 | return ".lz4"; |
230 | 0 | case TFileCompressType::ZSTD: |
231 | 0 | return ".zstd"; |
232 | 0 | default: |
233 | 0 | return ""; |
234 | 0 | } |
235 | 0 | } |
236 | | |
237 | 0 | Status VFileResultWriter::write(RuntimeState* state, Block& block) { |
238 | 0 | if (block.rows() == 0) { |
239 | 0 | return Status::OK(); |
240 | 0 | } |
241 | 0 | SCOPED_TIMER(_append_row_batch_timer); |
242 | 0 | Block output_block; |
243 | 0 | RETURN_IF_ERROR(_projection_block(block, &output_block)); |
244 | 0 | RETURN_IF_ERROR(_write_file(output_block)); |
245 | | |
246 | 0 | _written_rows += block.rows(); |
247 | 0 | return Status::OK(); |
248 | 0 | } |
249 | | |
250 | 0 | Status VFileResultWriter::_write_file(const Block& block) { |
251 | 0 | { |
252 | 0 | SCOPED_TIMER(_file_write_timer); |
253 | 0 | RETURN_IF_ERROR(_vfile_writer->write(block)); |
254 | 0 | } |
255 | | // split file if exceed limit |
256 | | // the written len from file writer may not be updated in real-time, |
257 | | // because for orc writer, the orc OutputStream only flush when |
258 | | // bufferred data is larger than strip size(default 64MB). |
259 | | // So even if max_file_size_bytes set to 5MB, the file size will still |
260 | | // be 64MB. |
261 | | // TODO: opt this logic |
262 | 0 | _current_written_bytes = _vfile_writer->written_len(); |
263 | 0 | return _create_new_file_if_exceed_size(); |
264 | 0 | } |
265 | | |
266 | 0 | Status VFileResultWriter::_create_new_file_if_exceed_size() { |
267 | 0 | if (_current_written_bytes < _file_opts->max_file_size_bytes) { |
268 | 0 | return Status::OK(); |
269 | 0 | } |
270 | | // current file size exceed the max file size. close this file |
271 | | // and create new one |
272 | 0 | { |
273 | 0 | SCOPED_TIMER(_writer_close_timer); |
274 | 0 | RETURN_IF_ERROR(_close_file_writer(false)); |
275 | 0 | } |
276 | 0 | _current_written_bytes = 0; |
277 | 0 | return Status::OK(); |
278 | 0 | } |
279 | | |
280 | 0 | Status VFileResultWriter::_close_file_writer(bool done) { |
281 | 0 | if (_vfile_writer) { |
282 | 0 | RETURN_IF_ERROR(_vfile_writer->close()); |
283 | | // we can not use _current_written_bytes to COUNTER_UPDATE(_written_data_bytes, _current_written_bytes) |
284 | | // because it will call `write()` function of orc/parquet function in `_vfile_writer->close()` |
285 | | // and the real written_len will increase |
286 | | // and _current_written_bytes will less than _vfile_writer->written_len() |
287 | 0 | COUNTER_UPDATE(_written_data_bytes, _vfile_writer->written_len()); |
288 | 0 | _vfile_writer.reset(nullptr); |
289 | 0 | } else if (_file_writer_impl && _file_writer_impl->state() != io::FileWriter::State::CLOSED) { |
290 | 0 | RETURN_IF_ERROR(_file_writer_impl->close()); |
291 | 0 | } |
292 | | |
293 | 0 | if (!done) { |
294 | | // not finished, create new file writer for next file |
295 | 0 | RETURN_IF_ERROR(_create_next_file_writer()); |
296 | 0 | } else { |
297 | | // All data is written to file, send statistic result |
298 | 0 | if (_output_block == nullptr) { |
299 | 0 | RETURN_IF_ERROR(_send_result()); |
300 | 0 | } else { |
301 | 0 | RETURN_IF_ERROR(_fill_result_block()); |
302 | 0 | } |
303 | 0 | } |
304 | 0 | return Status::OK(); |
305 | 0 | } |
306 | | |
307 | | template <typename SRC> |
308 | 0 | void direct_write_to_mysql_result_int(std::string& mysql_rows, const SRC& value) { |
309 | 0 | auto str = CastToString::from_number(value); |
310 | 0 | direct_write_to_mysql_result_string(mysql_rows, str.c_str(), str.size()); |
311 | 0 | } Unexecuted instantiation: _ZN5doris32direct_write_to_mysql_result_intIiEEvRNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ Unexecuted instantiation: _ZN5doris32direct_write_to_mysql_result_intIlEEvRNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_ |
312 | | |
313 | 0 | Status VFileResultWriter::_send_result() { |
314 | 0 | if (_is_result_sent) { |
315 | 0 | return Status::OK(); |
316 | 0 | } |
317 | 0 | _is_result_sent = true; |
318 | | |
319 | | // The final stat result include: |
320 | | // | FileNumber | Int | |
321 | | // | TotalRows | Bigint | |
322 | | // | FileSize | Bigint | |
323 | | // | URL | Varchar | |
324 | | // | WriteTimeSec | Varchar | |
325 | | // | WriteSpeedKB | Varchar | |
326 | | // The type of these field should be consistent with types defined in OutFileClause.java of FE. |
327 | |
|
328 | 0 | auto result = std::make_shared<TFetchDataResult>(); |
329 | 0 | result->result_batch.rows.resize(1); |
330 | 0 | auto& mysql_output = result->result_batch.rows[0]; |
331 | |
|
332 | 0 | direct_write_to_mysql_result_int(mysql_output, _file_idx); // FileNumber |
333 | 0 | direct_write_to_mysql_result_int(mysql_output, _written_rows_counter->value()); // TotalRows |
334 | 0 | direct_write_to_mysql_result_int(mysql_output, _written_data_bytes->value()); // FileSize |
335 | 0 | std::string file_url; |
336 | 0 | _get_file_url(&file_url); |
337 | 0 | std::stringstream ss; |
338 | 0 | ss << file_url << "*"; |
339 | 0 | file_url = ss.str(); |
340 | 0 | direct_write_to_mysql_result_string(mysql_output, file_url.c_str(), |
341 | 0 | file_url.length()); // URL |
342 | 0 | double write_time = _file_write_timer->value() / nons_to_second; |
343 | 0 | std::string formatted_write_time = fmt::format("{:.3f}", write_time); |
344 | 0 | direct_write_to_mysql_result_string(mysql_output, formatted_write_time.c_str(), |
345 | 0 | formatted_write_time.length()); // WriteTimeSec |
346 | |
|
347 | 0 | double write_speed = _get_write_speed(_written_data_bytes->value(), _file_write_timer->value()); |
348 | 0 | std::string formatted_write_speed = fmt::format("{:.2f}", write_speed); |
349 | 0 | direct_write_to_mysql_result_string(mysql_output, formatted_write_speed.c_str(), |
350 | 0 | formatted_write_speed.length()); // WriteSpeedKB |
351 | |
|
352 | 0 | std::map<std::string, std::string> attach_infos; |
353 | 0 | attach_infos.insert(std::make_pair("FileNumber", std::to_string(_file_idx))); |
354 | 0 | attach_infos.insert( |
355 | 0 | std::make_pair("TotalRows", std::to_string(_written_rows_counter->value()))); |
356 | 0 | attach_infos.insert(std::make_pair("FileSize", std::to_string(_written_data_bytes->value()))); |
357 | 0 | attach_infos.insert(std::make_pair("URL", file_url)); |
358 | 0 | attach_infos.insert(std::make_pair("WriteTimeSec", formatted_write_time)); |
359 | 0 | attach_infos.insert(std::make_pair("WriteSpeedKB", formatted_write_speed)); |
360 | |
|
361 | 0 | result->result_batch.__set_attached_infos(attach_infos); |
362 | 0 | RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result), |
363 | 0 | "failed to send outfile result"); |
364 | 0 | return Status::OK(); |
365 | 0 | } |
366 | | |
367 | 0 | Status VFileResultWriter::_fill_result_block() { |
368 | 0 | if (_is_result_sent) { |
369 | 0 | return Status::OK(); |
370 | 0 | } |
371 | 0 | _is_result_sent = true; |
372 | |
|
373 | 0 | #ifndef INSERT_TO_COLUMN |
374 | 0 | #define INSERT_TO_COLUMN \ |
375 | 0 | if (i == 0) { \ |
376 | 0 | column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0); \ |
377 | 0 | } else if (i == 1) { \ |
378 | 0 | int64_t written_rows = _written_rows_counter->value(); \ |
379 | 0 | column->insert_data(reinterpret_cast<const char*>(&written_rows), 0); \ |
380 | 0 | } else if (i == 2) { \ |
381 | 0 | int64_t written_data_bytes = _written_data_bytes->value(); \ |
382 | 0 | column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0); \ |
383 | 0 | } else if (i == 3) { \ |
384 | 0 | std::string file_url; \ |
385 | 0 | static_cast<void>(_get_file_url(&file_url)); \ |
386 | 0 | column->insert_data(file_url.c_str(), file_url.size()); \ |
387 | 0 | } else if (i == 4) { \ |
388 | 0 | double write_time = _file_write_timer->value() / nons_to_second; \ |
389 | 0 | std::string formatted_write_time = fmt::format("{:.3f}", write_time); \ |
390 | 0 | column->insert_data(formatted_write_time.c_str(), formatted_write_time.size()); \ |
391 | 0 | } else if (i == 5) { \ |
392 | 0 | double write_speed = \ |
393 | 0 | _get_write_speed(_written_data_bytes->value(), _file_write_timer->value()); \ |
394 | 0 | std::string formatted_write_speed = fmt::format("{:.2f}", write_speed); \ |
395 | 0 | column->insert_data(formatted_write_speed.c_str(), formatted_write_speed.size()); \ |
396 | 0 | } \ |
397 | 0 | _output_block->replace_by_position(i, std::move(column)); |
398 | 0 | #endif |
399 | |
|
400 | 0 | for (int i = 0; i < _output_block->columns(); i++) { |
401 | 0 | switch (_output_row_descriptor.tuple_descriptors()[0] |
402 | 0 | ->slots()[i] |
403 | 0 | ->type() |
404 | 0 | ->get_primitive_type()) { |
405 | 0 | case TYPE_INT: { |
406 | 0 | auto column = ColumnInt32::create(); |
407 | 0 | INSERT_TO_COLUMN; |
408 | 0 | break; |
409 | 0 | } |
410 | 0 | case TYPE_BIGINT: { |
411 | 0 | auto column = ColumnInt64::create(); |
412 | 0 | INSERT_TO_COLUMN; |
413 | 0 | break; |
414 | 0 | } |
415 | 0 | case TYPE_VARCHAR: |
416 | 0 | case TYPE_CHAR: |
417 | 0 | case TYPE_STRING: { |
418 | 0 | auto column = ColumnString::create(); |
419 | 0 | INSERT_TO_COLUMN; |
420 | 0 | break; |
421 | 0 | } |
422 | 0 | default: |
423 | 0 | return Status::InternalError("Invalid type to print: {}", |
424 | 0 | _output_row_descriptor.tuple_descriptors()[0] |
425 | 0 | ->slots()[i] |
426 | 0 | ->type() |
427 | 0 | ->get_primitive_type()); |
428 | 0 | } |
429 | 0 | } |
430 | 0 | return Status::OK(); |
431 | 0 | } |
432 | | |
433 | 0 | Status VFileResultWriter::_delete_dir() { |
434 | | // get dir of file_path |
435 | 0 | std::string dir = _file_opts->file_path.substr(0, _file_opts->file_path.find_last_of('/') + 1); |
436 | 0 | switch (_storage_type) { |
437 | 0 | case TStorageBackendType::LOCAL: |
438 | 0 | return io::global_local_filesystem()->delete_directory(dir); |
439 | 0 | case TStorageBackendType::BROKER: { |
440 | 0 | auto fs = DORIS_TRY(io::BrokerFileSystem::create(_file_opts->broker_addresses[0], |
441 | 0 | _file_opts->broker_properties, |
442 | 0 | io::FileSystem::TMP_FS_ID)); |
443 | 0 | return fs->delete_directory(dir); |
444 | 0 | } |
445 | 0 | case TStorageBackendType::HDFS: { |
446 | 0 | THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties); |
447 | 0 | auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name, |
448 | 0 | io::FileSystem::TMP_FS_ID, nullptr)); |
449 | 0 | return fs->delete_directory(dir); |
450 | 0 | } |
451 | 0 | case TStorageBackendType::S3: { |
452 | 0 | S3URI s3_uri(dir); |
453 | 0 | RETURN_IF_ERROR(s3_uri.parse()); |
454 | 0 | S3Conf s3_conf; |
455 | 0 | std::shared_ptr<io::S3FileSystem> s3_fs = nullptr; |
456 | 0 | RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf( |
457 | 0 | _file_opts->broker_properties, s3_uri, &s3_conf)); |
458 | 0 | auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, io::FileSystem::TMP_FS_ID)); |
459 | 0 | return fs->delete_directory(dir); |
460 | 0 | } |
461 | 0 | default: |
462 | 0 | return Status::NotSupported("Unsupported storage type: {}", std::to_string(_storage_type)); |
463 | 0 | } |
464 | 0 | } |
465 | | |
466 | 0 | double VFileResultWriter::_get_write_speed(int64_t write_bytes, int64_t write_time) { |
467 | 0 | if (write_time <= 0) { |
468 | 0 | return 0; |
469 | 0 | } |
470 | | // KB / s |
471 | 0 | return ((write_bytes * nons_to_second) / (write_time)) / 1024; |
472 | 0 | } |
473 | | |
474 | 0 | Status VFileResultWriter::close(Status exec_status) { |
475 | 0 | Status st = exec_status; |
476 | 0 | if (st.ok()) { |
477 | | // the following 2 profile "_written_rows_counter" and "_writer_close_timer" |
478 | | // must be outside the `_close_file_writer()`. |
479 | | // because `_close_file_writer()` may be called in deconstructor, |
480 | | // at that time, the RuntimeState may already been deconstructed, |
481 | | // so does the profile in RuntimeState. |
482 | 0 | if (_written_rows_counter) { |
483 | 0 | COUNTER_SET(_written_rows_counter, _written_rows); |
484 | 0 | SCOPED_TIMER(_writer_close_timer); |
485 | 0 | } |
486 | 0 | st = _close_file_writer(true); |
487 | 0 | } |
488 | 0 | return st; |
489 | 0 | } |
490 | | |
491 | | } // namespace doris |