be/src/exec/sink/writer/vhive_table_writer.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/sink/writer/vhive_table_writer.h" |
19 | | |
20 | | #include "core/block/block.h" |
21 | | #include "core/block/column_with_type_and_name.h" |
22 | | #include "core/block/materialize_block.h" |
23 | | #include "exec/sink/writer/vhive_partition_writer.h" |
24 | | #include "exec/sink/writer/vhive_utils.h" |
25 | | #include "exprs/vexpr.h" |
26 | | #include "exprs/vexpr_context.h" |
27 | | #include "runtime/runtime_profile.h" |
28 | | #include "runtime/runtime_state.h" |
29 | | |
30 | | namespace doris { |
31 | | #include "common/compile_check_begin.h" |
32 | | |
33 | | VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink, |
34 | | const VExprContextSPtrs& output_expr_ctxs, |
35 | | std::shared_ptr<Dependency> dep, |
36 | | std::shared_ptr<Dependency> fin_dep) |
37 | 0 | : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) { |
38 | 0 | DCHECK(_t_sink.__isset.hive_table_sink); |
39 | 0 | } |
40 | | |
41 | 0 | Status VHiveTableWriter::init_properties(ObjectPool* pool) { |
42 | 0 | return Status::OK(); |
43 | 0 | } |
44 | | |
45 | 0 | Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) { |
46 | 0 | _state = state; |
47 | 0 | _operator_profile = operator_profile; |
48 | 0 | DCHECK(_operator_profile->get_child("CustomCounters") != nullptr); |
49 | 0 | RuntimeProfile* custom_counters = _operator_profile->get_child("CustomCounters"); |
50 | | // add all counter |
51 | 0 | _written_rows_counter = ADD_COUNTER(custom_counters, "WrittenRows", TUnit::UNIT); |
52 | 0 | _send_data_timer = ADD_TIMER(custom_counters, "SendDataTime"); |
53 | 0 | _partition_writers_dispatch_timer = |
54 | 0 | ADD_CHILD_TIMER(custom_counters, "PartitionsDispatchTime", "SendDataTime"); |
55 | 0 | _partition_writers_write_timer = |
56 | 0 | ADD_CHILD_TIMER(custom_counters, "PartitionsWriteTime", "SendDataTime"); |
57 | 0 | _partition_writers_count = ADD_COUNTER(custom_counters, "PartitionsWriteCount", TUnit::UNIT); |
58 | 0 | _open_timer = ADD_TIMER(custom_counters, "OpenTime"); |
59 | 0 | _close_timer = ADD_TIMER(custom_counters, "CloseTime"); |
60 | 0 | _write_file_counter = ADD_COUNTER(custom_counters, "WriteFileCount", TUnit::UNIT); |
61 | |
|
62 | 0 | SCOPED_TIMER(_open_timer); |
63 | 0 | for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) { |
64 | 0 | switch (_t_sink.hive_table_sink.columns[i].column_type) { |
65 | 0 | case THiveColumnType::PARTITION_KEY: { |
66 | 0 | _partition_columns_input_index.emplace_back(i); |
67 | 0 | _non_write_columns_indices.insert(i); |
68 | 0 | break; |
69 | 0 | } |
70 | 0 | case THiveColumnType::REGULAR: { |
71 | 0 | _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]); |
72 | 0 | break; |
73 | 0 | } |
74 | 0 | case THiveColumnType::SYNTHESIZED: { |
75 | 0 | _non_write_columns_indices.insert(i); |
76 | 0 | break; |
77 | 0 | } |
78 | 0 | default: { |
79 | 0 | throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, |
80 | 0 | "Illegal hive column type {}, it should not be here.", |
81 | 0 | to_string(_t_sink.hive_table_sink.columns[i].column_type)); |
82 | 0 | } |
83 | 0 | } |
84 | 0 | } |
85 | 0 | return Status::OK(); |
86 | 0 | } |
87 | | |
88 | 0 | Status VHiveTableWriter::write(RuntimeState* state, Block& block) { |
89 | 0 | SCOPED_RAW_TIMER(&_send_data_ns); |
90 | |
|
91 | 0 | if (block.rows() == 0) { |
92 | 0 | return Status::OK(); |
93 | 0 | } |
94 | 0 | Block output_block; |
95 | 0 | RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block, |
96 | 0 | &output_block, false)); |
97 | 0 | materialize_block_inplace(output_block); |
98 | |
|
99 | 0 | std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions; |
100 | 0 | _row_count += output_block.rows(); |
101 | 0 | auto& hive_table_sink = _t_sink.hive_table_sink; |
102 | |
|
103 | 0 | if (_partition_columns_input_index.empty()) { |
104 | 0 | std::shared_ptr<VHivePartitionWriter> writer; |
105 | 0 | { |
106 | 0 | SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); |
107 | 0 | auto writer_iter = _partitions_to_writers.find(""); |
108 | 0 | if (writer_iter == _partitions_to_writers.end()) { |
109 | 0 | try { |
110 | 0 | writer = _create_partition_writer(output_block, -1); |
111 | 0 | } catch (doris::Exception& e) { |
112 | 0 | return e.to_status(); |
113 | 0 | } |
114 | 0 | _partitions_to_writers.insert({"", writer}); |
115 | 0 | RETURN_IF_ERROR(writer->open(_state, _operator_profile)); |
116 | 0 | } else { |
117 | 0 | if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { |
118 | 0 | std::string file_name(writer_iter->second->file_name()); |
119 | 0 | int file_name_index = writer_iter->second->file_name_index(); |
120 | 0 | { |
121 | 0 | SCOPED_RAW_TIMER(&_close_ns); |
122 | 0 | static_cast<void>(writer_iter->second->close(Status::OK())); |
123 | 0 | } |
124 | 0 | _partitions_to_writers.erase(writer_iter); |
125 | 0 | try { |
126 | 0 | writer = _create_partition_writer(output_block, -1, &file_name, |
127 | 0 | file_name_index + 1); |
128 | 0 | } catch (doris::Exception& e) { |
129 | 0 | return e.to_status(); |
130 | 0 | } |
131 | 0 | _partitions_to_writers.insert({"", writer}); |
132 | 0 | RETURN_IF_ERROR(writer->open(_state, _operator_profile)); |
133 | 0 | } else { |
134 | 0 | writer = writer_iter->second; |
135 | 0 | } |
136 | 0 | } |
137 | 0 | } |
138 | 0 | SCOPED_RAW_TIMER(&_partition_writers_write_ns); |
139 | 0 | output_block.erase(_non_write_columns_indices); |
140 | 0 | RETURN_IF_ERROR(writer->write(output_block)); |
141 | 0 | return Status::OK(); |
142 | 0 | } |
143 | | |
144 | 0 | { |
145 | 0 | SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns); |
146 | 0 | for (int i = 0; i < output_block.rows(); ++i) { |
147 | 0 | std::vector<std::string> partition_values; |
148 | 0 | try { |
149 | 0 | partition_values = _create_partition_values(output_block, i); |
150 | 0 | } catch (doris::Exception& e) { |
151 | 0 | return e.to_status(); |
152 | 0 | } |
153 | 0 | std::string partition_name = VHiveUtils::make_partition_name( |
154 | 0 | hive_table_sink.columns, _partition_columns_input_index, partition_values); |
155 | |
|
156 | 0 | auto create_and_open_writer = |
157 | 0 | [&](const std::string& partition_name, int position, |
158 | 0 | const std::string* file_name, int file_name_index, |
159 | 0 | std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status { |
160 | 0 | try { |
161 | 0 | auto writer = _create_partition_writer(output_block, position, file_name, |
162 | 0 | file_name_index); |
163 | 0 | RETURN_IF_ERROR(writer->open(_state, _operator_profile)); |
164 | 0 | IColumn::Filter filter(output_block.rows(), 0); |
165 | 0 | filter[position] = 1; |
166 | 0 | writer_positions.insert({writer, std::move(filter)}); |
167 | 0 | _partitions_to_writers.insert({partition_name, writer}); |
168 | 0 | writer_ptr = writer; |
169 | 0 | } catch (doris::Exception& e) { |
170 | 0 | return e.to_status(); |
171 | 0 | } |
172 | 0 | return Status::OK(); |
173 | 0 | }; |
174 | |
|
175 | 0 | auto writer_iter = _partitions_to_writers.find(partition_name); |
176 | 0 | if (writer_iter == _partitions_to_writers.end()) { |
177 | 0 | std::shared_ptr<VHivePartitionWriter> writer; |
178 | 0 | if (_partitions_to_writers.size() + 1 > |
179 | 0 | config::table_sink_partition_write_max_partition_nums_per_writer) { |
180 | 0 | return Status::InternalError( |
181 | 0 | "Too many open partitions {}", |
182 | 0 | config::table_sink_partition_write_max_partition_nums_per_writer); |
183 | 0 | } |
184 | 0 | RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer)); |
185 | 0 | } else { |
186 | 0 | std::shared_ptr<VHivePartitionWriter> writer; |
187 | 0 | if (writer_iter->second->written_len() > config::hive_sink_max_file_size) { |
188 | 0 | std::string file_name(writer_iter->second->file_name()); |
189 | 0 | int file_name_index = writer_iter->second->file_name_index(); |
190 | 0 | { |
191 | 0 | SCOPED_RAW_TIMER(&_close_ns); |
192 | 0 | static_cast<void>(writer_iter->second->close(Status::OK())); |
193 | 0 | } |
194 | 0 | writer_positions.erase(writer_iter->second); |
195 | 0 | _partitions_to_writers.erase(writer_iter); |
196 | 0 | RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name, |
197 | 0 | file_name_index + 1, writer)); |
198 | 0 | } else { |
199 | 0 | writer = writer_iter->second; |
200 | 0 | } |
201 | 0 | auto writer_pos_iter = writer_positions.find(writer); |
202 | 0 | if (writer_pos_iter == writer_positions.end()) { |
203 | 0 | IColumn::Filter filter(output_block.rows(), 0); |
204 | 0 | filter[i] = 1; |
205 | 0 | writer_positions.insert({writer, std::move(filter)}); |
206 | 0 | } else { |
207 | 0 | writer_pos_iter->second[i] = 1; |
208 | 0 | } |
209 | 0 | } |
210 | 0 | } |
211 | 0 | } |
212 | 0 | SCOPED_RAW_TIMER(&_partition_writers_write_ns); |
213 | 0 | output_block.erase(_non_write_columns_indices); |
214 | 0 | for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) { |
215 | 0 | Block filtered_block; |
216 | 0 | RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block)); |
217 | 0 | RETURN_IF_ERROR(it->first->write(filtered_block)); |
218 | 0 | } |
219 | 0 | return Status::OK(); |
220 | 0 | } |
221 | | |
222 | | Status VHiveTableWriter::_filter_block(doris::Block& block, const IColumn::Filter* filter, |
223 | 0 | doris::Block* output_block) { |
224 | 0 | const ColumnsWithTypeAndName& columns_with_type_and_name = |
225 | 0 | block.get_columns_with_type_and_name(); |
226 | 0 | ColumnsWithTypeAndName result_columns; |
227 | 0 | for (int i = 0; i < columns_with_type_and_name.size(); ++i) { |
228 | 0 | const auto& col = columns_with_type_and_name[i]; |
229 | 0 | result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type, |
230 | 0 | col.name); |
231 | 0 | } |
232 | 0 | *output_block = {std::move(result_columns)}; |
233 | |
|
234 | 0 | std::vector<uint32_t> columns_to_filter; |
235 | 0 | int column_to_keep = output_block->columns(); |
236 | 0 | columns_to_filter.resize(column_to_keep); |
237 | 0 | for (uint32_t i = 0; i < column_to_keep; ++i) { |
238 | 0 | columns_to_filter[i] = i; |
239 | 0 | } |
240 | |
|
241 | 0 | Block::filter_block_internal(output_block, columns_to_filter, *filter); |
242 | 0 | return Status::OK(); |
243 | 0 | } |
244 | | |
245 | 0 | Status VHiveTableWriter::close(Status status) { |
246 | 0 | Status result_status; |
247 | 0 | int64_t partitions_to_writers_size = _partitions_to_writers.size(); |
248 | 0 | { |
249 | 0 | SCOPED_RAW_TIMER(&_close_ns); |
250 | 0 | for (const auto& pair : _partitions_to_writers) { |
251 | 0 | Status st = pair.second->close(status); |
252 | 0 | if (!st.ok()) { |
253 | 0 | LOG(WARNING) << fmt::format("partition writer close failed for partition {}", |
254 | 0 | st.to_string()); |
255 | 0 | if (result_status.ok()) { |
256 | 0 | result_status = st; |
257 | 0 | continue; |
258 | 0 | } |
259 | 0 | } |
260 | 0 | } |
261 | 0 | _partitions_to_writers.clear(); |
262 | 0 | } |
263 | 0 | if (status.ok()) { |
264 | 0 | SCOPED_TIMER(_operator_profile->total_time_counter()); |
265 | |
|
266 | 0 | COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count)); |
267 | 0 | COUNTER_SET(_send_data_timer, _send_data_ns); |
268 | 0 | COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns); |
269 | 0 | COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns); |
270 | 0 | COUNTER_SET(_partition_writers_count, partitions_to_writers_size); |
271 | 0 | COUNTER_SET(_close_timer, _close_ns); |
272 | 0 | COUNTER_SET(_write_file_counter, _write_file_count); |
273 | 0 | } |
274 | 0 | return result_status; |
275 | 0 | } |
276 | | |
277 | | std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer( |
278 | 0 | Block& block, int position, const std::string* file_name, int file_name_index) { |
279 | 0 | auto& hive_table_sink = _t_sink.hive_table_sink; |
280 | 0 | std::vector<std::string> partition_values; |
281 | 0 | std::string partition_name; |
282 | 0 | if (!_partition_columns_input_index.empty()) { |
283 | 0 | partition_values = _create_partition_values(block, position); |
284 | 0 | partition_name = VHiveUtils::make_partition_name( |
285 | 0 | hive_table_sink.columns, _partition_columns_input_index, partition_values); |
286 | 0 | } |
287 | 0 | const std::vector<THivePartition>& partitions = hive_table_sink.partitions; |
288 | 0 | const THiveLocationParams& write_location = hive_table_sink.location; |
289 | 0 | const THivePartition* existing_partition = nullptr; |
290 | 0 | bool existing_table = true; |
291 | 0 | for (const auto& partition : partitions) { |
292 | 0 | if (partition_values == partition.values) { |
293 | 0 | existing_partition = &partition; |
294 | 0 | break; |
295 | 0 | } |
296 | 0 | } |
297 | 0 | TUpdateMode::type update_mode; |
298 | 0 | VHivePartitionWriter::WriteInfo write_info; |
299 | 0 | TFileFormatType::type file_format_type; |
300 | 0 | TFileCompressType::type write_compress_type; |
301 | 0 | if (existing_partition == nullptr) { // new partition |
302 | 0 | if (existing_table == false) { // new table |
303 | 0 | update_mode = TUpdateMode::NEW; |
304 | 0 | if (_partition_columns_input_index.empty()) { // new unpartitioned table |
305 | 0 | write_info = {write_location.write_path, |
306 | 0 | write_location.original_write_path, |
307 | 0 | write_location.target_path, |
308 | 0 | write_location.file_type, |
309 | 0 | {}}; |
310 | 0 | } else { // a new partition in a new partitioned table |
311 | 0 | auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); |
312 | 0 | auto original_write_path = |
313 | 0 | fmt::format("{}/{}", write_location.original_write_path, partition_name); |
314 | 0 | auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); |
315 | 0 | write_info = {std::move(write_path), |
316 | 0 | std::move(original_write_path), |
317 | 0 | std::move(target_path), |
318 | 0 | write_location.file_type, |
319 | 0 | {}}; |
320 | 0 | } |
321 | 0 | } else { // a new partition in an existing partitioned table, or an existing unpartitioned table |
322 | 0 | if (_partition_columns_input_index.empty()) { // an existing unpartitioned table |
323 | 0 | update_mode = |
324 | 0 | !hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE; |
325 | 0 | write_info = {write_location.write_path, |
326 | 0 | write_location.original_write_path, |
327 | 0 | write_location.target_path, |
328 | 0 | write_location.file_type, |
329 | 0 | {}}; |
330 | 0 | } else { // a new partition in an existing partitioned table |
331 | 0 | update_mode = TUpdateMode::NEW; |
332 | 0 | auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); |
333 | 0 | auto original_write_path = |
334 | 0 | fmt::format("{}/{}", write_location.original_write_path, partition_name); |
335 | 0 | auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); |
336 | 0 | write_info = {std::move(write_path), |
337 | 0 | std::move(original_write_path), |
338 | 0 | std::move(target_path), |
339 | 0 | write_location.file_type, |
340 | 0 | {}}; |
341 | 0 | } |
342 | | // need to get schema from existing table ? |
343 | 0 | } |
344 | 0 | file_format_type = hive_table_sink.file_format; |
345 | 0 | write_compress_type = hive_table_sink.compression_type; |
346 | 0 | } else { // existing partition |
347 | 0 | if (!hive_table_sink.overwrite) { |
348 | 0 | update_mode = TUpdateMode::APPEND; |
349 | 0 | auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); |
350 | 0 | auto original_write_path = |
351 | 0 | fmt::format("{}/{}", write_location.original_write_path, partition_name); |
352 | 0 | auto target_path = fmt::format("{}", existing_partition->location.target_path); |
353 | 0 | write_info = {std::move(write_path), |
354 | 0 | std::move(original_write_path), |
355 | 0 | std::move(target_path), |
356 | 0 | existing_partition->location.file_type, |
357 | 0 | {}}; |
358 | 0 | file_format_type = existing_partition->file_format; |
359 | 0 | write_compress_type = hive_table_sink.compression_type; |
360 | 0 | } else { |
361 | 0 | update_mode = TUpdateMode::OVERWRITE; |
362 | 0 | auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name); |
363 | 0 | auto original_write_path = |
364 | 0 | fmt::format("{}/{}", write_location.original_write_path, partition_name); |
365 | 0 | auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name); |
366 | 0 | write_info = {std::move(write_path), |
367 | 0 | std::move(original_write_path), |
368 | 0 | std::move(target_path), |
369 | 0 | write_location.file_type, |
370 | 0 | {}}; |
371 | 0 | file_format_type = hive_table_sink.file_format; |
372 | 0 | write_compress_type = hive_table_sink.compression_type; |
373 | | // need to get schema from existing table ? |
374 | 0 | } |
375 | 0 | } |
376 | 0 | if (hive_table_sink.__isset.broker_addresses) { |
377 | 0 | write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(), |
378 | 0 | hive_table_sink.broker_addresses.end()); |
379 | 0 | } |
380 | |
|
381 | 0 | _write_file_count++; |
382 | 0 | std::vector<std::string> column_names; |
383 | 0 | column_names.reserve(hive_table_sink.columns.size()); |
384 | 0 | for (int i = 0; i < hive_table_sink.columns.size(); i++) { |
385 | 0 | if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) { |
386 | 0 | column_names.emplace_back(hive_table_sink.columns[i].name); |
387 | 0 | } |
388 | 0 | } |
389 | 0 | return std::make_shared<VHivePartitionWriter>( |
390 | 0 | _t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs, |
391 | 0 | std::move(column_names), std::move(write_info), |
392 | 0 | (file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index, |
393 | 0 | file_format_type, write_compress_type, &hive_table_sink.serde_properties, |
394 | 0 | hive_table_sink.hadoop_config); |
395 | 0 | } |
396 | | |
397 | 0 | std::vector<std::string> VHiveTableWriter::_create_partition_values(Block& block, int position) { |
398 | 0 | std::vector<std::string> partition_values; |
399 | 0 | for (int i = 0; i < _partition_columns_input_index.size(); ++i) { |
400 | 0 | int partition_column_idx = _partition_columns_input_index[i]; |
401 | 0 | ColumnWithTypeAndName partition_column = block.get_by_position(partition_column_idx); |
402 | 0 | std::string value = _to_partition_value( |
403 | 0 | _vec_output_expr_ctxs[partition_column_idx]->root()->data_type(), partition_column, |
404 | 0 | position); |
405 | | |
406 | | // Check if value contains only printable ASCII characters |
407 | 0 | bool is_valid = true; |
408 | 0 | for (char c : value) { |
409 | 0 | if (c < 0x20 || c > 0x7E) { |
410 | 0 | is_valid = false; |
411 | 0 | break; |
412 | 0 | } |
413 | 0 | } |
414 | |
|
415 | 0 | if (!is_valid) { |
416 | | // Encode value using Base16 encoding with space separator |
417 | 0 | std::stringstream encoded; |
418 | 0 | for (unsigned char c : value) { |
419 | 0 | encoded << std::hex << std::setw(2) << std::setfill('0') << (int)c; |
420 | 0 | encoded << " "; |
421 | 0 | } |
422 | 0 | throw doris::Exception( |
423 | 0 | doris::ErrorCode::INTERNAL_ERROR, |
424 | 0 | "Hive partition values can only contain printable ASCII characters (0x20 - " |
425 | 0 | "0x7E). Invalid value: {}", |
426 | 0 | encoded.str()); |
427 | 0 | } |
428 | | |
429 | 0 | partition_values.emplace_back(value); |
430 | 0 | } |
431 | | |
432 | 0 | return partition_values; |
433 | 0 | } |
434 | | |
435 | | std::string VHiveTableWriter::_to_partition_value(const DataTypePtr& type_desc, |
436 | | const ColumnWithTypeAndName& partition_column, |
437 | 0 | int position) { |
438 | 0 | ColumnPtr column; |
439 | 0 | if (auto* nullable_column = check_and_get_column<ColumnNullable>(*partition_column.column)) { |
440 | 0 | auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); |
441 | 0 | if (null_map_data[position]) { |
442 | 0 | return "__HIVE_DEFAULT_PARTITION__"; |
443 | 0 | } |
444 | 0 | column = nullable_column->get_nested_column_ptr(); |
445 | 0 | } else { |
446 | 0 | column = partition_column.column; |
447 | 0 | } |
448 | 0 | auto [item, size] = column->get_data_at(position); |
449 | 0 | switch (type_desc->get_primitive_type()) { |
450 | 0 | case TYPE_BOOLEAN: { |
451 | 0 | Field field = check_and_get_column<const ColumnUInt8>(*column)->operator[](position); |
452 | 0 | return std::to_string(field.get<TYPE_BOOLEAN>()); |
453 | 0 | } |
454 | 0 | case TYPE_TINYINT: { |
455 | 0 | return std::to_string(*reinterpret_cast<const Int8*>(item)); |
456 | 0 | } |
457 | 0 | case TYPE_SMALLINT: { |
458 | 0 | return std::to_string(*reinterpret_cast<const Int16*>(item)); |
459 | 0 | } |
460 | 0 | case TYPE_INT: { |
461 | 0 | return std::to_string(*reinterpret_cast<const Int32*>(item)); |
462 | 0 | } |
463 | 0 | case TYPE_BIGINT: { |
464 | 0 | return std::to_string(*reinterpret_cast<const Int64*>(item)); |
465 | 0 | } |
466 | 0 | case TYPE_FLOAT: { |
467 | 0 | return std::to_string(*reinterpret_cast<const Float32*>(item)); |
468 | 0 | } |
469 | 0 | case TYPE_DOUBLE: { |
470 | 0 | return std::to_string(*reinterpret_cast<const Float64*>(item)); |
471 | 0 | } |
472 | 0 | case TYPE_VARCHAR: |
473 | 0 | case TYPE_CHAR: |
474 | 0 | case TYPE_STRING: { |
475 | 0 | return std::string(item, size); |
476 | 0 | } |
477 | 0 | case TYPE_DATE: { |
478 | 0 | VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item); |
479 | |
|
480 | 0 | char buf[64]; |
481 | 0 | char* pos = value.to_string(buf); |
482 | 0 | return std::string(buf, pos - buf - 1); |
483 | 0 | } |
484 | 0 | case TYPE_DATETIME: { |
485 | 0 | VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item); |
486 | |
|
487 | 0 | char buf[64]; |
488 | 0 | char* pos = value.to_string(buf); |
489 | 0 | return std::string(buf, pos - buf - 1); |
490 | 0 | } |
491 | 0 | case TYPE_DATEV2: { |
492 | 0 | DateV2Value<DateV2ValueType> value = |
493 | 0 | binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)item); |
494 | |
|
495 | 0 | char buf[64]; |
496 | 0 | char* pos = value.to_string(buf); |
497 | 0 | return std::string(buf, pos - buf - 1); |
498 | 0 | } |
499 | 0 | case TYPE_DATETIMEV2: { |
500 | 0 | DateV2Value<DateTimeV2ValueType> value = |
501 | 0 | binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item); |
502 | |
|
503 | 0 | char buf[64]; |
504 | 0 | char* pos = value.to_string(buf, type_desc->get_scale()); |
505 | 0 | return std::string(buf, pos - buf - 1); |
506 | 0 | } |
507 | 0 | case TYPE_DECIMALV2: { |
508 | 0 | Decimal128V2 value = *(Decimal128V2*)(item); |
509 | 0 | return value.to_string(type_desc->get_scale()); |
510 | 0 | } |
511 | 0 | case TYPE_DECIMAL32: { |
512 | 0 | Decimal32 value = *(Decimal32*)(item); |
513 | 0 | return value.to_string(type_desc->get_scale()); |
514 | 0 | } |
515 | 0 | case TYPE_DECIMAL64: { |
516 | 0 | Decimal64 value = *(Decimal64*)(item); |
517 | 0 | return value.to_string(type_desc->get_scale()); |
518 | 0 | } |
519 | 0 | case TYPE_DECIMAL128I: { |
520 | 0 | Decimal128V3 value = *(Decimal128V3*)(item); |
521 | 0 | return value.to_string(type_desc->get_scale()); |
522 | 0 | } |
523 | 0 | case TYPE_DECIMAL256: { |
524 | 0 | Decimal256 value = *(Decimal256*)(item); |
525 | 0 | return value.to_string(type_desc->get_scale()); |
526 | 0 | } |
527 | 0 | default: { |
528 | 0 | throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, |
529 | 0 | "Unsupported type for partition {}", type_desc->get_name()); |
530 | 0 | } |
531 | 0 | } |
532 | 0 | } |
533 | | |
534 | 0 | std::string VHiveTableWriter::_compute_file_name() { |
535 | 0 | boost::uuids::uuid uuid = boost::uuids::random_generator()(); |
536 | |
|
537 | 0 | std::string uuid_str = boost::uuids::to_string(uuid); |
538 | |
|
539 | 0 | return fmt::format("{}_{}", print_id(_state->query_id()), uuid_str); |
540 | 0 | } |
541 | | |
542 | | } // namespace doris |