Coverage Report

Created: 2026-04-15 19:34

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vhive_table_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vhive_table_writer.h"
19
20
#include "core/block/block.h"
21
#include "core/block/column_with_type_and_name.h"
22
#include "core/block/materialize_block.h"
23
#include "exec/sink/writer/vhive_partition_writer.h"
24
#include "exec/sink/writer/vhive_utils.h"
25
#include "exprs/vexpr.h"
26
#include "exprs/vexpr_context.h"
27
#include "runtime/runtime_profile.h"
28
#include "runtime/runtime_state.h"
29
30
namespace doris {
31
32
VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
33
                                   const VExprContextSPtrs& output_expr_ctxs,
34
                                   std::shared_ptr<Dependency> dep,
35
                                   std::shared_ptr<Dependency> fin_dep)
36
0
        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) {
37
0
    DCHECK(_t_sink.__isset.hive_table_sink);
38
0
}
39
40
0
Status VHiveTableWriter::init_properties(ObjectPool* pool) {
41
0
    return Status::OK();
42
0
}
43
44
0
Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) {
45
0
    _state = state;
46
0
    _operator_profile = operator_profile;
47
0
    DCHECK(_operator_profile->get_child("CustomCounters") != nullptr);
48
0
    RuntimeProfile* custom_counters = _operator_profile->get_child("CustomCounters");
49
    // add all counter
50
0
    _written_rows_counter = ADD_COUNTER(custom_counters, "WrittenRows", TUnit::UNIT);
51
0
    _send_data_timer = ADD_TIMER(custom_counters, "SendDataTime");
52
0
    _partition_writers_dispatch_timer =
53
0
            ADD_CHILD_TIMER(custom_counters, "PartitionsDispatchTime", "SendDataTime");
54
0
    _partition_writers_write_timer =
55
0
            ADD_CHILD_TIMER(custom_counters, "PartitionsWriteTime", "SendDataTime");
56
0
    _partition_writers_count = ADD_COUNTER(custom_counters, "PartitionsWriteCount", TUnit::UNIT);
57
0
    _open_timer = ADD_TIMER(custom_counters, "OpenTime");
58
0
    _close_timer = ADD_TIMER(custom_counters, "CloseTime");
59
0
    _write_file_counter = ADD_COUNTER(custom_counters, "WriteFileCount", TUnit::UNIT);
60
61
0
    SCOPED_TIMER(_open_timer);
62
0
    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
63
0
        switch (_t_sink.hive_table_sink.columns[i].column_type) {
64
0
        case THiveColumnType::PARTITION_KEY: {
65
0
            _partition_columns_input_index.emplace_back(i);
66
0
            _non_write_columns_indices.insert(i);
67
0
            break;
68
0
        }
69
0
        case THiveColumnType::REGULAR: {
70
0
            _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]);
71
0
            break;
72
0
        }
73
0
        case THiveColumnType::SYNTHESIZED: {
74
0
            _non_write_columns_indices.insert(i);
75
0
            break;
76
0
        }
77
0
        default: {
78
0
            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
79
0
                                   "Illegal hive column type {}, it should not be here.",
80
0
                                   to_string(_t_sink.hive_table_sink.columns[i].column_type));
81
0
        }
82
0
        }
83
0
    }
84
0
    return Status::OK();
85
0
}
86
87
0
Status VHiveTableWriter::write(RuntimeState* state, Block& block) {
88
0
    SCOPED_RAW_TIMER(&_send_data_ns);
89
90
0
    if (block.rows() == 0) {
91
0
        return Status::OK();
92
0
    }
93
0
    Block output_block;
94
0
    RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block,
95
0
                                                                       &output_block, false));
96
0
    materialize_block_inplace(output_block);
97
98
0
    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions;
99
0
    _row_count += output_block.rows();
100
0
    auto& hive_table_sink = _t_sink.hive_table_sink;
101
102
0
    if (_partition_columns_input_index.empty()) {
103
0
        std::shared_ptr<VHivePartitionWriter> writer;
104
0
        {
105
0
            SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
106
0
            auto writer_iter = _partitions_to_writers.find("");
107
0
            if (writer_iter == _partitions_to_writers.end()) {
108
0
                try {
109
0
                    writer = _create_partition_writer(output_block, -1);
110
0
                } catch (doris::Exception& e) {
111
0
                    return e.to_status();
112
0
                }
113
0
                _partitions_to_writers.insert({"", writer});
114
0
                RETURN_IF_ERROR(writer->open(_state, _operator_profile));
115
0
            } else {
116
0
                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
117
0
                    std::string file_name(writer_iter->second->file_name());
118
0
                    int file_name_index = writer_iter->second->file_name_index();
119
0
                    {
120
0
                        SCOPED_RAW_TIMER(&_close_ns);
121
0
                        static_cast<void>(writer_iter->second->close(Status::OK()));
122
0
                    }
123
0
                    _partitions_to_writers.erase(writer_iter);
124
0
                    try {
125
0
                        writer = _create_partition_writer(output_block, -1, &file_name,
126
0
                                                          file_name_index + 1);
127
0
                    } catch (doris::Exception& e) {
128
0
                        return e.to_status();
129
0
                    }
130
0
                    _partitions_to_writers.insert({"", writer});
131
0
                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
132
0
                } else {
133
0
                    writer = writer_iter->second;
134
0
                }
135
0
            }
136
0
        }
137
0
        SCOPED_RAW_TIMER(&_partition_writers_write_ns);
138
0
        output_block.erase(_non_write_columns_indices);
139
0
        RETURN_IF_ERROR(writer->write(output_block));
140
0
        return Status::OK();
141
0
    }
142
143
0
    {
144
0
        SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
145
0
        for (int i = 0; i < output_block.rows(); ++i) {
146
0
            std::vector<std::string> partition_values;
147
0
            try {
148
0
                partition_values = _create_partition_values(output_block, i);
149
0
            } catch (doris::Exception& e) {
150
0
                return e.to_status();
151
0
            }
152
0
            std::string partition_name = VHiveUtils::make_partition_name(
153
0
                    hive_table_sink.columns, _partition_columns_input_index, partition_values);
154
155
0
            auto create_and_open_writer =
156
0
                    [&](const std::string& partition_name, int position,
157
0
                        const std::string* file_name, int file_name_index,
158
0
                        std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status {
159
0
                try {
160
0
                    auto writer = _create_partition_writer(output_block, position, file_name,
161
0
                                                           file_name_index);
162
0
                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
163
0
                    IColumn::Filter filter(output_block.rows(), 0);
164
0
                    filter[position] = 1;
165
0
                    writer_positions.insert({writer, std::move(filter)});
166
0
                    _partitions_to_writers.insert({partition_name, writer});
167
0
                    writer_ptr = writer;
168
0
                } catch (doris::Exception& e) {
169
0
                    return e.to_status();
170
0
                }
171
0
                return Status::OK();
172
0
            };
173
174
0
            auto writer_iter = _partitions_to_writers.find(partition_name);
175
0
            if (writer_iter == _partitions_to_writers.end()) {
176
0
                std::shared_ptr<VHivePartitionWriter> writer;
177
0
                if (_partitions_to_writers.size() + 1 >
178
0
                    config::table_sink_partition_write_max_partition_nums_per_writer) {
179
0
                    return Status::InternalError(
180
0
                            "Too many open partitions {}",
181
0
                            config::table_sink_partition_write_max_partition_nums_per_writer);
182
0
                }
183
0
                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
184
0
            } else {
185
0
                std::shared_ptr<VHivePartitionWriter> writer;
186
0
                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
187
0
                    std::string file_name(writer_iter->second->file_name());
188
0
                    int file_name_index = writer_iter->second->file_name_index();
189
0
                    {
190
0
                        SCOPED_RAW_TIMER(&_close_ns);
191
0
                        static_cast<void>(writer_iter->second->close(Status::OK()));
192
0
                    }
193
0
                    writer_positions.erase(writer_iter->second);
194
0
                    _partitions_to_writers.erase(writer_iter);
195
0
                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
196
0
                                                           file_name_index + 1, writer));
197
0
                } else {
198
0
                    writer = writer_iter->second;
199
0
                }
200
0
                auto writer_pos_iter = writer_positions.find(writer);
201
0
                if (writer_pos_iter == writer_positions.end()) {
202
0
                    IColumn::Filter filter(output_block.rows(), 0);
203
0
                    filter[i] = 1;
204
0
                    writer_positions.insert({writer, std::move(filter)});
205
0
                } else {
206
0
                    writer_pos_iter->second[i] = 1;
207
0
                }
208
0
            }
209
0
        }
210
0
    }
211
0
    SCOPED_RAW_TIMER(&_partition_writers_write_ns);
212
0
    output_block.erase(_non_write_columns_indices);
213
0
    for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) {
214
0
        Block filtered_block;
215
0
        RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block));
216
0
        RETURN_IF_ERROR(it->first->write(filtered_block));
217
0
    }
218
0
    return Status::OK();
219
0
}
220
221
Status VHiveTableWriter::_filter_block(doris::Block& block, const IColumn::Filter* filter,
222
0
                                       doris::Block* output_block) {
223
0
    const ColumnsWithTypeAndName& columns_with_type_and_name =
224
0
            block.get_columns_with_type_and_name();
225
0
    ColumnsWithTypeAndName result_columns;
226
0
    for (int i = 0; i < columns_with_type_and_name.size(); ++i) {
227
0
        const auto& col = columns_with_type_and_name[i];
228
0
        result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type,
229
0
                                    col.name);
230
0
    }
231
0
    *output_block = {std::move(result_columns)};
232
233
0
    std::vector<uint32_t> columns_to_filter;
234
0
    int column_to_keep = output_block->columns();
235
0
    columns_to_filter.resize(column_to_keep);
236
0
    for (uint32_t i = 0; i < column_to_keep; ++i) {
237
0
        columns_to_filter[i] = i;
238
0
    }
239
240
0
    Block::filter_block_internal(output_block, columns_to_filter, *filter);
241
0
    return Status::OK();
242
0
}
243
244
0
Status VHiveTableWriter::close(Status status) {
245
0
    Status result_status;
246
0
    int64_t partitions_to_writers_size = _partitions_to_writers.size();
247
0
    {
248
0
        SCOPED_RAW_TIMER(&_close_ns);
249
0
        for (const auto& pair : _partitions_to_writers) {
250
0
            Status st = pair.second->close(status);
251
0
            if (!st.ok()) {
252
0
                LOG(WARNING) << fmt::format("partition writer close failed for partition {}",
253
0
                                            st.to_string());
254
0
                if (result_status.ok()) {
255
0
                    result_status = st;
256
0
                    continue;
257
0
                }
258
0
            }
259
0
        }
260
0
        _partitions_to_writers.clear();
261
0
    }
262
0
    if (status.ok()) {
263
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
264
265
0
        COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
266
0
        COUNTER_SET(_send_data_timer, _send_data_ns);
267
0
        COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns);
268
0
        COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns);
269
0
        COUNTER_SET(_partition_writers_count, partitions_to_writers_size);
270
0
        COUNTER_SET(_close_timer, _close_ns);
271
0
        COUNTER_SET(_write_file_counter, _write_file_count);
272
0
    }
273
0
    return result_status;
274
0
}
275
276
std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer(
277
0
        Block& block, int position, const std::string* file_name, int file_name_index) {
278
0
    auto& hive_table_sink = _t_sink.hive_table_sink;
279
0
    std::vector<std::string> partition_values;
280
0
    std::string partition_name;
281
0
    if (!_partition_columns_input_index.empty()) {
282
0
        partition_values = _create_partition_values(block, position);
283
0
        partition_name = VHiveUtils::make_partition_name(
284
0
                hive_table_sink.columns, _partition_columns_input_index, partition_values);
285
0
    }
286
0
    const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
287
0
    const THiveLocationParams& write_location = hive_table_sink.location;
288
0
    const THivePartition* existing_partition = nullptr;
289
0
    bool existing_table = true;
290
0
    for (const auto& partition : partitions) {
291
0
        if (partition_values == partition.values) {
292
0
            existing_partition = &partition;
293
0
            break;
294
0
        }
295
0
    }
296
0
    TUpdateMode::type update_mode;
297
0
    VHivePartitionWriter::WriteInfo write_info;
298
0
    TFileFormatType::type file_format_type;
299
0
    TFileCompressType::type write_compress_type;
300
0
    if (existing_partition == nullptr) { // new partition
301
0
        if (existing_table == false) {   // new table
302
0
            update_mode = TUpdateMode::NEW;
303
0
            if (_partition_columns_input_index.empty()) { // new unpartitioned table
304
0
                write_info = {write_location.write_path,
305
0
                              write_location.original_write_path,
306
0
                              write_location.target_path,
307
0
                              write_location.file_type,
308
0
                              {}};
309
0
            } else { // a new partition in a new partitioned table
310
0
                auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
311
0
                auto original_write_path =
312
0
                        fmt::format("{}/{}", write_location.original_write_path, partition_name);
313
0
                auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
314
0
                write_info = {std::move(write_path),
315
0
                              std::move(original_write_path),
316
0
                              std::move(target_path),
317
0
                              write_location.file_type,
318
0
                              {}};
319
0
            }
320
0
        } else { // a new partition in an existing partitioned table, or an existing unpartitioned table
321
0
            if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
322
0
                update_mode =
323
0
                        !hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
324
0
                write_info = {write_location.write_path,
325
0
                              write_location.original_write_path,
326
0
                              write_location.target_path,
327
0
                              write_location.file_type,
328
0
                              {}};
329
0
            } else { // a new partition in an existing partitioned table
330
0
                update_mode = TUpdateMode::NEW;
331
0
                auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
332
0
                auto original_write_path =
333
0
                        fmt::format("{}/{}", write_location.original_write_path, partition_name);
334
0
                auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
335
0
                write_info = {std::move(write_path),
336
0
                              std::move(original_write_path),
337
0
                              std::move(target_path),
338
0
                              write_location.file_type,
339
0
                              {}};
340
0
            }
341
            // need to get schema from existing table ?
342
0
        }
343
0
        file_format_type = hive_table_sink.file_format;
344
0
        write_compress_type = hive_table_sink.compression_type;
345
0
    } else { // existing partition
346
0
        if (!hive_table_sink.overwrite) {
347
0
            update_mode = TUpdateMode::APPEND;
348
0
            auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
349
0
            auto original_write_path =
350
0
                    fmt::format("{}/{}", write_location.original_write_path, partition_name);
351
0
            auto target_path = fmt::format("{}", existing_partition->location.target_path);
352
0
            write_info = {std::move(write_path),
353
0
                          std::move(original_write_path),
354
0
                          std::move(target_path),
355
0
                          existing_partition->location.file_type,
356
0
                          {}};
357
0
            file_format_type = existing_partition->file_format;
358
0
            write_compress_type = hive_table_sink.compression_type;
359
0
        } else {
360
0
            update_mode = TUpdateMode::OVERWRITE;
361
0
            auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
362
0
            auto original_write_path =
363
0
                    fmt::format("{}/{}", write_location.original_write_path, partition_name);
364
0
            auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
365
0
            write_info = {std::move(write_path),
366
0
                          std::move(original_write_path),
367
0
                          std::move(target_path),
368
0
                          write_location.file_type,
369
0
                          {}};
370
0
            file_format_type = hive_table_sink.file_format;
371
0
            write_compress_type = hive_table_sink.compression_type;
372
            // need to get schema from existing table ?
373
0
        }
374
0
    }
375
0
    if (hive_table_sink.__isset.broker_addresses) {
376
0
        write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(),
377
0
                                           hive_table_sink.broker_addresses.end());
378
0
    }
379
380
0
    _write_file_count++;
381
0
    std::vector<std::string> column_names;
382
0
    column_names.reserve(hive_table_sink.columns.size());
383
0
    for (int i = 0; i < hive_table_sink.columns.size(); i++) {
384
0
        if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) {
385
0
            column_names.emplace_back(hive_table_sink.columns[i].name);
386
0
        }
387
0
    }
388
0
    return std::make_shared<VHivePartitionWriter>(
389
0
            _t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs,
390
0
            std::move(column_names), std::move(write_info),
391
0
            (file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index,
392
0
            file_format_type, write_compress_type, &hive_table_sink.serde_properties,
393
0
            hive_table_sink.hadoop_config);
394
0
}
395
396
0
std::vector<std::string> VHiveTableWriter::_create_partition_values(Block& block, int position) {
397
0
    std::vector<std::string> partition_values;
398
0
    for (int i = 0; i < _partition_columns_input_index.size(); ++i) {
399
0
        int partition_column_idx = _partition_columns_input_index[i];
400
0
        ColumnWithTypeAndName partition_column = block.get_by_position(partition_column_idx);
401
0
        std::string value = _to_partition_value(
402
0
                _vec_output_expr_ctxs[partition_column_idx]->root()->data_type(), partition_column,
403
0
                position);
404
405
        // Check if value contains only printable ASCII characters
406
0
        bool is_valid = true;
407
0
        for (char c : value) {
408
0
            if (c < 0x20 || c > 0x7E) {
409
0
                is_valid = false;
410
0
                break;
411
0
            }
412
0
        }
413
414
0
        if (!is_valid) {
415
            // Encode value using Base16 encoding with space separator
416
0
            std::stringstream encoded;
417
0
            for (unsigned char c : value) {
418
0
                encoded << std::hex << std::setw(2) << std::setfill('0') << (int)c;
419
0
                encoded << " ";
420
0
            }
421
0
            throw doris::Exception(
422
0
                    doris::ErrorCode::INTERNAL_ERROR,
423
0
                    "Hive partition values can only contain printable ASCII characters (0x20 - "
424
0
                    "0x7E). Invalid value: {}",
425
0
                    encoded.str());
426
0
        }
427
428
0
        partition_values.emplace_back(value);
429
0
    }
430
431
0
    return partition_values;
432
0
}
433
434
std::string VHiveTableWriter::_to_partition_value(const DataTypePtr& type_desc,
435
                                                  const ColumnWithTypeAndName& partition_column,
436
0
                                                  int position) {
437
0
    ColumnPtr column;
438
0
    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*partition_column.column)) {
439
0
        auto* __restrict null_map_data = nullable_column->get_null_map_data().data();
440
0
        if (null_map_data[position]) {
441
0
            return "__HIVE_DEFAULT_PARTITION__";
442
0
        }
443
0
        column = nullable_column->get_nested_column_ptr();
444
0
    } else {
445
0
        column = partition_column.column;
446
0
    }
447
0
    auto [item, size] = column->get_data_at(position);
448
0
    switch (type_desc->get_primitive_type()) {
449
0
    case TYPE_BOOLEAN: {
450
0
        Field field = check_and_get_column<const ColumnUInt8>(*column)->operator[](position);
451
0
        return std::to_string(field.get<TYPE_BOOLEAN>());
452
0
    }
453
0
    case TYPE_TINYINT: {
454
0
        return std::to_string(*reinterpret_cast<const Int8*>(item));
455
0
    }
456
0
    case TYPE_SMALLINT: {
457
0
        return std::to_string(*reinterpret_cast<const Int16*>(item));
458
0
    }
459
0
    case TYPE_INT: {
460
0
        return std::to_string(*reinterpret_cast<const Int32*>(item));
461
0
    }
462
0
    case TYPE_BIGINT: {
463
0
        return std::to_string(*reinterpret_cast<const Int64*>(item));
464
0
    }
465
0
    case TYPE_FLOAT: {
466
0
        return std::to_string(*reinterpret_cast<const Float32*>(item));
467
0
    }
468
0
    case TYPE_DOUBLE: {
469
0
        return std::to_string(*reinterpret_cast<const Float64*>(item));
470
0
    }
471
0
    case TYPE_VARCHAR:
472
0
    case TYPE_CHAR:
473
0
    case TYPE_STRING: {
474
0
        return std::string(item, size);
475
0
    }
476
0
    case TYPE_DATE: {
477
0
        VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item);
478
479
0
        char buf[64];
480
0
        char* pos = value.to_string(buf);
481
0
        return std::string(buf, pos - buf - 1);
482
0
    }
483
0
    case TYPE_DATETIME: {
484
0
        VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item);
485
486
0
        char buf[64];
487
0
        char* pos = value.to_string(buf);
488
0
        return std::string(buf, pos - buf - 1);
489
0
    }
490
0
    case TYPE_DATEV2: {
491
0
        DateV2Value<DateV2ValueType> value =
492
0
                binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)item);
493
494
0
        char buf[64];
495
0
        char* pos = value.to_string(buf);
496
0
        return std::string(buf, pos - buf - 1);
497
0
    }
498
0
    case TYPE_DATETIMEV2: {
499
0
        DateV2Value<DateTimeV2ValueType> value =
500
0
                binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item);
501
502
0
        char buf[64];
503
0
        char* pos = value.to_string(buf, type_desc->get_scale());
504
0
        return std::string(buf, pos - buf - 1);
505
0
    }
506
0
    case TYPE_DECIMALV2: {
507
0
        Decimal128V2 value = *(Decimal128V2*)(item);
508
0
        return value.to_string(type_desc->get_scale());
509
0
    }
510
0
    case TYPE_DECIMAL32: {
511
0
        Decimal32 value = *(Decimal32*)(item);
512
0
        return value.to_string(type_desc->get_scale());
513
0
    }
514
0
    case TYPE_DECIMAL64: {
515
0
        Decimal64 value = *(Decimal64*)(item);
516
0
        return value.to_string(type_desc->get_scale());
517
0
    }
518
0
    case TYPE_DECIMAL128I: {
519
0
        Decimal128V3 value = *(Decimal128V3*)(item);
520
0
        return value.to_string(type_desc->get_scale());
521
0
    }
522
0
    case TYPE_DECIMAL256: {
523
0
        Decimal256 value = *(Decimal256*)(item);
524
0
        return value.to_string(type_desc->get_scale());
525
0
    }
526
0
    default: {
527
0
        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
528
0
                               "Unsupported type for partition {}", type_desc->get_name());
529
0
    }
530
0
    }
531
0
}
532
533
0
std::string VHiveTableWriter::_compute_file_name() {
534
0
    boost::uuids::uuid uuid = boost::uuids::random_generator()();
535
536
0
    std::string uuid_str = boost::uuids::to_string(uuid);
537
538
0
    return fmt::format("{}_{}", print_id(_state->query_id()), uuid_str);
539
0
}
540
541
} // namespace doris