be/src/exec/sink/writer/vtvf_table_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vtvf_table_writer.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | |
22 | | #include "common/status.h" |
23 | | #include "core/block/block.h" |
24 | | #include "exprs/vexpr.h" |
25 | | #include "exprs/vexpr_context.h" |
26 | | #include "io/file_factory.h" |
27 | | #include "runtime/runtime_state.h" |
28 | | |
29 | | namespace doris { |
30 | | |
31 | | VTVFTableWriter::VTVFTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs, |
32 | | std::shared_ptr<Dependency> dep, |
33 | | std::shared_ptr<Dependency> fin_dep) |
34 | 0 | : AsyncResultWriter(output_exprs, dep, fin_dep) { |
35 | 0 | _tvf_sink = t_sink.tvf_table_sink; |
36 | 0 | } |
37 | | |
38 | 0 | Status VTVFTableWriter::open(RuntimeState* state, RuntimeProfile* profile) { |
39 | 0 | _state = state; |
40 | | |
41 | | // Init profile counters |
42 | 0 | RuntimeProfile* writer_profile = profile->create_child("VTVFTableWriter", true, true); |
43 | 0 | _written_rows_counter = ADD_COUNTER(writer_profile, "NumWrittenRows", TUnit::UNIT); |
44 | 0 | _written_data_bytes = ADD_COUNTER(writer_profile, "WrittenDataBytes", TUnit::BYTES); |
45 | 0 | _file_write_timer = ADD_TIMER(writer_profile, "FileWriteTime"); |
46 | 0 | _writer_close_timer = ADD_TIMER(writer_profile, "FileWriterCloseTime"); |
47 | |
|
48 | 0 | _file_path = _tvf_sink.file_path; |
49 | 0 | _max_file_size_bytes = |
50 | 0 | _tvf_sink.__isset.max_file_size_bytes ? _tvf_sink.max_file_size_bytes : 0; |
51 | |
|
52 | 0 | VLOG_DEBUG << "TVF table writer open, query_id=" << print_id(_state->query_id()) |
53 | 0 | << ", tvf_name=" << _tvf_sink.tvf_name << ", file_path=" << _tvf_sink.file_path |
54 | 0 | << ", file_format=" << _tvf_sink.file_format << ", file_type=" << _tvf_sink.file_type |
55 | 0 | << ", max_file_size_bytes=" << _max_file_size_bytes |
56 | 0 | << ", columns_count=" << (_tvf_sink.__isset.columns ? _tvf_sink.columns.size() : 0); |
57 | |
|
58 | 0 | return _create_next_file_writer(); |
59 | 0 | } |
60 | | |
61 | 0 | Status VTVFTableWriter::write(RuntimeState* state, Block& block) { |
62 | 0 | COUNTER_UPDATE(_written_rows_counter, block.rows()); |
63 | 0 | state->update_num_rows_load_total(block.rows()); |
64 | |
|
65 | 0 | { |
66 | 0 | SCOPED_TIMER(_file_write_timer); |
67 | 0 | RETURN_IF_ERROR(_vfile_writer->write(block)); |
68 | 0 | } |
69 | | |
70 | 0 | _current_written_bytes = _vfile_writer->written_len(); |
71 | | |
72 | | // Auto-split if max file size is set |
73 | 0 | if (_max_file_size_bytes > 0) { |
74 | 0 | RETURN_IF_ERROR(_create_new_file_if_exceed_size()); |
75 | 0 | } |
76 | | |
77 | 0 | return Status::OK(); |
78 | 0 | } |
79 | | |
80 | 0 | Status VTVFTableWriter::close(Status status) { |
81 | 0 | if (!status.ok()) { |
82 | 0 | return status; |
83 | 0 | } |
84 | | |
85 | 0 | SCOPED_TIMER(_writer_close_timer); |
86 | 0 | return _close_file_writer(true); |
87 | 0 | } |
88 | | |
89 | 0 | Status VTVFTableWriter::_create_file_writer(const std::string& file_name) { |
90 | 0 | bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type == TTVFWriterType::JNI; |
91 | |
|
92 | 0 | if (!use_jni) { |
93 | | // Native path: create file writer via FileFactory |
94 | 0 | TFileType::type file_type = _tvf_sink.file_type; |
95 | 0 | std::map<std::string, std::string> properties; |
96 | 0 | if (_tvf_sink.__isset.properties) { |
97 | 0 | properties = _tvf_sink.properties; |
98 | 0 | } |
99 | |
|
100 | 0 | _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer( |
101 | 0 | file_type, _state->exec_env(), {}, properties, file_name, |
102 | 0 | {.write_file_cache = false, .sync_file_data = false})); |
103 | 0 | } |
104 | | |
105 | | // Factory creates either JNI or native transformer |
106 | 0 | RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state, |
107 | 0 | use_jni ? nullptr : _file_writer_impl.get(), |
108 | 0 | _vec_output_expr_ctxs, &_vfile_writer)); |
109 | | |
110 | 0 | VLOG_DEBUG << "TVF table writer created file: " << file_name |
111 | 0 | << ", format=" << _tvf_sink.file_format << ", use_jni=" << use_jni |
112 | 0 | << ", query_id=" << print_id(_state->query_id()); |
113 | |
|
114 | 0 | return _vfile_writer->open(); |
115 | 0 | } |
116 | | |
117 | 0 | Status VTVFTableWriter::_create_next_file_writer() { |
118 | 0 | std::string file_name; |
119 | 0 | RETURN_IF_ERROR(_get_next_file_name(&file_name)); |
120 | 0 | return _create_file_writer(file_name); |
121 | 0 | } |
122 | | |
123 | 0 | Status VTVFTableWriter::_close_file_writer(bool done) { |
124 | 0 | if (_vfile_writer) { |
125 | 0 | RETURN_IF_ERROR(_vfile_writer->close()); |
126 | 0 | COUNTER_UPDATE(_written_data_bytes, _vfile_writer->written_len()); |
127 | 0 | _vfile_writer.reset(nullptr); |
128 | 0 | } else if (_file_writer_impl && _file_writer_impl->state() != io::FileWriter::State::CLOSED) { |
129 | 0 | RETURN_IF_ERROR(_file_writer_impl->close()); |
130 | 0 | } |
131 | | |
132 | 0 | if (!done) { |
133 | 0 | RETURN_IF_ERROR(_create_next_file_writer()); |
134 | 0 | } |
135 | 0 | return Status::OK(); |
136 | 0 | } |
137 | | |
138 | 0 | Status VTVFTableWriter::_create_new_file_if_exceed_size() { |
139 | 0 | if (_max_file_size_bytes <= 0 || _current_written_bytes < _max_file_size_bytes) { |
140 | 0 | return Status::OK(); |
141 | 0 | } |
142 | 0 | SCOPED_TIMER(_writer_close_timer); |
143 | 0 | RETURN_IF_ERROR(_close_file_writer(false)); |
144 | 0 | _current_written_bytes = 0; |
145 | 0 | return Status::OK(); |
146 | 0 | } |
147 | | |
148 | 0 | Status VTVFTableWriter::_get_next_file_name(std::string* file_name) { |
149 | 0 | std::string ext; |
150 | 0 | switch (_tvf_sink.file_format) { |
151 | 0 | case TFileFormatType::FORMAT_CSV_PLAIN: |
152 | 0 | ext = "csv"; |
153 | 0 | break; |
154 | 0 | case TFileFormatType::FORMAT_PARQUET: |
155 | 0 | ext = "parquet"; |
156 | 0 | break; |
157 | 0 | case TFileFormatType::FORMAT_ORC: |
158 | 0 | ext = "orc"; |
159 | 0 | break; |
160 | 0 | default: |
161 | 0 | ext = "dat"; |
162 | 0 | break; |
163 | 0 | } |
164 | | |
165 | | // file_path is a prefix, generate: {prefix}{query_id}_{idx}.{ext} |
166 | 0 | std::string query_id_str = print_id(_state->query_id()); |
167 | 0 | *file_name = fmt::format("{}{}_{}.{}", _file_path, query_id_str, _file_idx, ext); |
168 | 0 | _file_idx++; |
169 | 0 | return Status::OK(); |
170 | 0 | } |
171 | | |
172 | | } // namespace doris |