Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vhive_table_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vhive_table_writer.h"
19
20
#include "core/block/block.h"
21
#include "core/block/column_with_type_and_name.h"
22
#include "core/block/materialize_block.h"
23
#include "exec/sink/writer/vhive_partition_writer.h"
24
#include "exec/sink/writer/vhive_utils.h"
25
#include "exprs/vexpr.h"
26
#include "exprs/vexpr_context.h"
27
#include "runtime/runtime_profile.h"
28
#include "runtime/runtime_state.h"
29
30
namespace doris {
31
#include "common/compile_check_begin.h"
32
33
VHiveTableWriter::VHiveTableWriter(const TDataSink& t_sink,
34
                                   const VExprContextSPtrs& output_expr_ctxs,
35
                                   std::shared_ptr<Dependency> dep,
36
                                   std::shared_ptr<Dependency> fin_dep)
37
0
        : AsyncResultWriter(output_expr_ctxs, dep, fin_dep), _t_sink(t_sink) {
38
0
    DCHECK(_t_sink.__isset.hive_table_sink);
39
0
}
40
41
0
Status VHiveTableWriter::init_properties(ObjectPool* pool) {
42
0
    return Status::OK();
43
0
}
44
45
0
Status VHiveTableWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) {
46
0
    _state = state;
47
0
    _operator_profile = operator_profile;
48
0
    DCHECK(_operator_profile->get_child("CustomCounters") != nullptr);
49
0
    RuntimeProfile* custom_counters = _operator_profile->get_child("CustomCounters");
50
    // add all counter
51
0
    _written_rows_counter = ADD_COUNTER(custom_counters, "WrittenRows", TUnit::UNIT);
52
0
    _send_data_timer = ADD_TIMER(custom_counters, "SendDataTime");
53
0
    _partition_writers_dispatch_timer =
54
0
            ADD_CHILD_TIMER(custom_counters, "PartitionsDispatchTime", "SendDataTime");
55
0
    _partition_writers_write_timer =
56
0
            ADD_CHILD_TIMER(custom_counters, "PartitionsWriteTime", "SendDataTime");
57
0
    _partition_writers_count = ADD_COUNTER(custom_counters, "PartitionsWriteCount", TUnit::UNIT);
58
0
    _open_timer = ADD_TIMER(custom_counters, "OpenTime");
59
0
    _close_timer = ADD_TIMER(custom_counters, "CloseTime");
60
0
    _write_file_counter = ADD_COUNTER(custom_counters, "WriteFileCount", TUnit::UNIT);
61
62
0
    SCOPED_TIMER(_open_timer);
63
0
    for (int i = 0; i < _t_sink.hive_table_sink.columns.size(); ++i) {
64
0
        switch (_t_sink.hive_table_sink.columns[i].column_type) {
65
0
        case THiveColumnType::PARTITION_KEY: {
66
0
            _partition_columns_input_index.emplace_back(i);
67
0
            _non_write_columns_indices.insert(i);
68
0
            break;
69
0
        }
70
0
        case THiveColumnType::REGULAR: {
71
0
            _write_output_vexpr_ctxs.push_back(_vec_output_expr_ctxs[i]);
72
0
            break;
73
0
        }
74
0
        case THiveColumnType::SYNTHESIZED: {
75
0
            _non_write_columns_indices.insert(i);
76
0
            break;
77
0
        }
78
0
        default: {
79
0
            throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
80
0
                                   "Illegal hive column type {}, it should not be here.",
81
0
                                   to_string(_t_sink.hive_table_sink.columns[i].column_type));
82
0
        }
83
0
        }
84
0
    }
85
0
    return Status::OK();
86
0
}
87
88
0
Status VHiveTableWriter::write(RuntimeState* state, Block& block) {
89
0
    SCOPED_RAW_TIMER(&_send_data_ns);
90
91
0
    if (block.rows() == 0) {
92
0
        return Status::OK();
93
0
    }
94
0
    Block output_block;
95
0
    RETURN_IF_ERROR(VExprContext::get_output_block_after_execute_exprs(_vec_output_expr_ctxs, block,
96
0
                                                                       &output_block, false));
97
0
    materialize_block_inplace(output_block);
98
99
0
    std::unordered_map<std::shared_ptr<VHivePartitionWriter>, IColumn::Filter> writer_positions;
100
0
    _row_count += output_block.rows();
101
0
    auto& hive_table_sink = _t_sink.hive_table_sink;
102
103
0
    if (_partition_columns_input_index.empty()) {
104
0
        std::shared_ptr<VHivePartitionWriter> writer;
105
0
        {
106
0
            SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
107
0
            auto writer_iter = _partitions_to_writers.find("");
108
0
            if (writer_iter == _partitions_to_writers.end()) {
109
0
                try {
110
0
                    writer = _create_partition_writer(output_block, -1);
111
0
                } catch (doris::Exception& e) {
112
0
                    return e.to_status();
113
0
                }
114
0
                _partitions_to_writers.insert({"", writer});
115
0
                RETURN_IF_ERROR(writer->open(_state, _operator_profile));
116
0
            } else {
117
0
                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
118
0
                    std::string file_name(writer_iter->second->file_name());
119
0
                    int file_name_index = writer_iter->second->file_name_index();
120
0
                    {
121
0
                        SCOPED_RAW_TIMER(&_close_ns);
122
0
                        static_cast<void>(writer_iter->second->close(Status::OK()));
123
0
                    }
124
0
                    _partitions_to_writers.erase(writer_iter);
125
0
                    try {
126
0
                        writer = _create_partition_writer(output_block, -1, &file_name,
127
0
                                                          file_name_index + 1);
128
0
                    } catch (doris::Exception& e) {
129
0
                        return e.to_status();
130
0
                    }
131
0
                    _partitions_to_writers.insert({"", writer});
132
0
                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
133
0
                } else {
134
0
                    writer = writer_iter->second;
135
0
                }
136
0
            }
137
0
        }
138
0
        SCOPED_RAW_TIMER(&_partition_writers_write_ns);
139
0
        output_block.erase(_non_write_columns_indices);
140
0
        RETURN_IF_ERROR(writer->write(output_block));
141
0
        return Status::OK();
142
0
    }
143
144
0
    {
145
0
        SCOPED_RAW_TIMER(&_partition_writers_dispatch_ns);
146
0
        for (int i = 0; i < output_block.rows(); ++i) {
147
0
            std::vector<std::string> partition_values;
148
0
            try {
149
0
                partition_values = _create_partition_values(output_block, i);
150
0
            } catch (doris::Exception& e) {
151
0
                return e.to_status();
152
0
            }
153
0
            std::string partition_name = VHiveUtils::make_partition_name(
154
0
                    hive_table_sink.columns, _partition_columns_input_index, partition_values);
155
156
0
            auto create_and_open_writer =
157
0
                    [&](const std::string& partition_name, int position,
158
0
                        const std::string* file_name, int file_name_index,
159
0
                        std::shared_ptr<VHivePartitionWriter>& writer_ptr) -> Status {
160
0
                try {
161
0
                    auto writer = _create_partition_writer(output_block, position, file_name,
162
0
                                                           file_name_index);
163
0
                    RETURN_IF_ERROR(writer->open(_state, _operator_profile));
164
0
                    IColumn::Filter filter(output_block.rows(), 0);
165
0
                    filter[position] = 1;
166
0
                    writer_positions.insert({writer, std::move(filter)});
167
0
                    _partitions_to_writers.insert({partition_name, writer});
168
0
                    writer_ptr = writer;
169
0
                } catch (doris::Exception& e) {
170
0
                    return e.to_status();
171
0
                }
172
0
                return Status::OK();
173
0
            };
174
175
0
            auto writer_iter = _partitions_to_writers.find(partition_name);
176
0
            if (writer_iter == _partitions_to_writers.end()) {
177
0
                std::shared_ptr<VHivePartitionWriter> writer;
178
0
                if (_partitions_to_writers.size() + 1 >
179
0
                    config::table_sink_partition_write_max_partition_nums_per_writer) {
180
0
                    return Status::InternalError(
181
0
                            "Too many open partitions {}",
182
0
                            config::table_sink_partition_write_max_partition_nums_per_writer);
183
0
                }
184
0
                RETURN_IF_ERROR(create_and_open_writer(partition_name, i, nullptr, 0, writer));
185
0
            } else {
186
0
                std::shared_ptr<VHivePartitionWriter> writer;
187
0
                if (writer_iter->second->written_len() > config::hive_sink_max_file_size) {
188
0
                    std::string file_name(writer_iter->second->file_name());
189
0
                    int file_name_index = writer_iter->second->file_name_index();
190
0
                    {
191
0
                        SCOPED_RAW_TIMER(&_close_ns);
192
0
                        static_cast<void>(writer_iter->second->close(Status::OK()));
193
0
                    }
194
0
                    writer_positions.erase(writer_iter->second);
195
0
                    _partitions_to_writers.erase(writer_iter);
196
0
                    RETURN_IF_ERROR(create_and_open_writer(partition_name, i, &file_name,
197
0
                                                           file_name_index + 1, writer));
198
0
                } else {
199
0
                    writer = writer_iter->second;
200
0
                }
201
0
                auto writer_pos_iter = writer_positions.find(writer);
202
0
                if (writer_pos_iter == writer_positions.end()) {
203
0
                    IColumn::Filter filter(output_block.rows(), 0);
204
0
                    filter[i] = 1;
205
0
                    writer_positions.insert({writer, std::move(filter)});
206
0
                } else {
207
0
                    writer_pos_iter->second[i] = 1;
208
0
                }
209
0
            }
210
0
        }
211
0
    }
212
0
    SCOPED_RAW_TIMER(&_partition_writers_write_ns);
213
0
    output_block.erase(_non_write_columns_indices);
214
0
    for (auto it = writer_positions.begin(); it != writer_positions.end(); ++it) {
215
0
        Block filtered_block;
216
0
        RETURN_IF_ERROR(_filter_block(output_block, &it->second, &filtered_block));
217
0
        RETURN_IF_ERROR(it->first->write(filtered_block));
218
0
    }
219
0
    return Status::OK();
220
0
}
221
222
Status VHiveTableWriter::_filter_block(doris::Block& block, const IColumn::Filter* filter,
223
0
                                       doris::Block* output_block) {
224
0
    const ColumnsWithTypeAndName& columns_with_type_and_name =
225
0
            block.get_columns_with_type_and_name();
226
0
    ColumnsWithTypeAndName result_columns;
227
0
    for (int i = 0; i < columns_with_type_and_name.size(); ++i) {
228
0
        const auto& col = columns_with_type_and_name[i];
229
0
        result_columns.emplace_back(col.column->clone_resized(col.column->size()), col.type,
230
0
                                    col.name);
231
0
    }
232
0
    *output_block = {std::move(result_columns)};
233
234
0
    std::vector<uint32_t> columns_to_filter;
235
0
    int column_to_keep = output_block->columns();
236
0
    columns_to_filter.resize(column_to_keep);
237
0
    for (uint32_t i = 0; i < column_to_keep; ++i) {
238
0
        columns_to_filter[i] = i;
239
0
    }
240
241
0
    Block::filter_block_internal(output_block, columns_to_filter, *filter);
242
0
    return Status::OK();
243
0
}
244
245
0
Status VHiveTableWriter::close(Status status) {
246
0
    Status result_status;
247
0
    int64_t partitions_to_writers_size = _partitions_to_writers.size();
248
0
    {
249
0
        SCOPED_RAW_TIMER(&_close_ns);
250
0
        for (const auto& pair : _partitions_to_writers) {
251
0
            Status st = pair.second->close(status);
252
0
            if (!st.ok()) {
253
0
                LOG(WARNING) << fmt::format("partition writer close failed for partition {}",
254
0
                                            st.to_string());
255
0
                if (result_status.ok()) {
256
0
                    result_status = st;
257
0
                    continue;
258
0
                }
259
0
            }
260
0
        }
261
0
        _partitions_to_writers.clear();
262
0
    }
263
0
    if (status.ok()) {
264
0
        SCOPED_TIMER(_operator_profile->total_time_counter());
265
266
0
        COUNTER_SET(_written_rows_counter, static_cast<int64_t>(_row_count));
267
0
        COUNTER_SET(_send_data_timer, _send_data_ns);
268
0
        COUNTER_SET(_partition_writers_dispatch_timer, _partition_writers_dispatch_ns);
269
0
        COUNTER_SET(_partition_writers_write_timer, _partition_writers_write_ns);
270
0
        COUNTER_SET(_partition_writers_count, partitions_to_writers_size);
271
0
        COUNTER_SET(_close_timer, _close_ns);
272
0
        COUNTER_SET(_write_file_counter, _write_file_count);
273
0
    }
274
0
    return result_status;
275
0
}
276
277
std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer(
278
0
        Block& block, int position, const std::string* file_name, int file_name_index) {
279
0
    auto& hive_table_sink = _t_sink.hive_table_sink;
280
0
    std::vector<std::string> partition_values;
281
0
    std::string partition_name;
282
0
    if (!_partition_columns_input_index.empty()) {
283
0
        partition_values = _create_partition_values(block, position);
284
0
        partition_name = VHiveUtils::make_partition_name(
285
0
                hive_table_sink.columns, _partition_columns_input_index, partition_values);
286
0
    }
287
0
    const std::vector<THivePartition>& partitions = hive_table_sink.partitions;
288
0
    const THiveLocationParams& write_location = hive_table_sink.location;
289
0
    const THivePartition* existing_partition = nullptr;
290
0
    bool existing_table = true;
291
0
    for (const auto& partition : partitions) {
292
0
        if (partition_values == partition.values) {
293
0
            existing_partition = &partition;
294
0
            break;
295
0
        }
296
0
    }
297
0
    TUpdateMode::type update_mode;
298
0
    VHivePartitionWriter::WriteInfo write_info;
299
0
    TFileFormatType::type file_format_type;
300
0
    TFileCompressType::type write_compress_type;
301
0
    if (existing_partition == nullptr) { // new partition
302
0
        if (existing_table == false) {   // new table
303
0
            update_mode = TUpdateMode::NEW;
304
0
            if (_partition_columns_input_index.empty()) { // new unpartitioned table
305
0
                write_info = {write_location.write_path,
306
0
                              write_location.original_write_path,
307
0
                              write_location.target_path,
308
0
                              write_location.file_type,
309
0
                              {}};
310
0
            } else { // a new partition in a new partitioned table
311
0
                auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
312
0
                auto original_write_path =
313
0
                        fmt::format("{}/{}", write_location.original_write_path, partition_name);
314
0
                auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
315
0
                write_info = {std::move(write_path),
316
0
                              std::move(original_write_path),
317
0
                              std::move(target_path),
318
0
                              write_location.file_type,
319
0
                              {}};
320
0
            }
321
0
        } else { // a new partition in an existing partitioned table, or an existing unpartitioned table
322
0
            if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
323
0
                update_mode =
324
0
                        !hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
325
0
                write_info = {write_location.write_path,
326
0
                              write_location.original_write_path,
327
0
                              write_location.target_path,
328
0
                              write_location.file_type,
329
0
                              {}};
330
0
            } else { // a new partition in an existing partitioned table
331
0
                update_mode = TUpdateMode::NEW;
332
0
                auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
333
0
                auto original_write_path =
334
0
                        fmt::format("{}/{}", write_location.original_write_path, partition_name);
335
0
                auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
336
0
                write_info = {std::move(write_path),
337
0
                              std::move(original_write_path),
338
0
                              std::move(target_path),
339
0
                              write_location.file_type,
340
0
                              {}};
341
0
            }
342
            // need to get schema from existing table ?
343
0
        }
344
0
        file_format_type = hive_table_sink.file_format;
345
0
        write_compress_type = hive_table_sink.compression_type;
346
0
    } else { // existing partition
347
0
        if (!hive_table_sink.overwrite) {
348
0
            update_mode = TUpdateMode::APPEND;
349
0
            auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
350
0
            auto original_write_path =
351
0
                    fmt::format("{}/{}", write_location.original_write_path, partition_name);
352
0
            auto target_path = fmt::format("{}", existing_partition->location.target_path);
353
0
            write_info = {std::move(write_path),
354
0
                          std::move(original_write_path),
355
0
                          std::move(target_path),
356
0
                          existing_partition->location.file_type,
357
0
                          {}};
358
0
            file_format_type = existing_partition->file_format;
359
0
            write_compress_type = hive_table_sink.compression_type;
360
0
        } else {
361
0
            update_mode = TUpdateMode::OVERWRITE;
362
0
            auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
363
0
            auto original_write_path =
364
0
                    fmt::format("{}/{}", write_location.original_write_path, partition_name);
365
0
            auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
366
0
            write_info = {std::move(write_path),
367
0
                          std::move(original_write_path),
368
0
                          std::move(target_path),
369
0
                          write_location.file_type,
370
0
                          {}};
371
0
            file_format_type = hive_table_sink.file_format;
372
0
            write_compress_type = hive_table_sink.compression_type;
373
            // need to get schema from existing table ?
374
0
        }
375
0
    }
376
0
    if (hive_table_sink.__isset.broker_addresses) {
377
0
        write_info.broker_addresses.assign(hive_table_sink.broker_addresses.begin(),
378
0
                                           hive_table_sink.broker_addresses.end());
379
0
    }
380
381
0
    _write_file_count++;
382
0
    std::vector<std::string> column_names;
383
0
    column_names.reserve(hive_table_sink.columns.size());
384
0
    for (int i = 0; i < hive_table_sink.columns.size(); i++) {
385
0
        if (_non_write_columns_indices.find(i) == _non_write_columns_indices.end()) {
386
0
            column_names.emplace_back(hive_table_sink.columns[i].name);
387
0
        }
388
0
    }
389
0
    return std::make_shared<VHivePartitionWriter>(
390
0
            _t_sink, std::move(partition_name), update_mode, _write_output_vexpr_ctxs,
391
0
            std::move(column_names), std::move(write_info),
392
0
            (file_name == nullptr) ? _compute_file_name() : *file_name, file_name_index,
393
0
            file_format_type, write_compress_type, &hive_table_sink.serde_properties,
394
0
            hive_table_sink.hadoop_config);
395
0
}
396
397
0
std::vector<std::string> VHiveTableWriter::_create_partition_values(Block& block, int position) {
398
0
    std::vector<std::string> partition_values;
399
0
    for (int i = 0; i < _partition_columns_input_index.size(); ++i) {
400
0
        int partition_column_idx = _partition_columns_input_index[i];
401
0
        ColumnWithTypeAndName partition_column = block.get_by_position(partition_column_idx);
402
0
        std::string value = _to_partition_value(
403
0
                _vec_output_expr_ctxs[partition_column_idx]->root()->data_type(), partition_column,
404
0
                position);
405
406
        // Check if value contains only printable ASCII characters
407
0
        bool is_valid = true;
408
0
        for (char c : value) {
409
0
            if (c < 0x20 || c > 0x7E) {
410
0
                is_valid = false;
411
0
                break;
412
0
            }
413
0
        }
414
415
0
        if (!is_valid) {
416
            // Encode value using Base16 encoding with space separator
417
0
            std::stringstream encoded;
418
0
            for (unsigned char c : value) {
419
0
                encoded << std::hex << std::setw(2) << std::setfill('0') << (int)c;
420
0
                encoded << " ";
421
0
            }
422
0
            throw doris::Exception(
423
0
                    doris::ErrorCode::INTERNAL_ERROR,
424
0
                    "Hive partition values can only contain printable ASCII characters (0x20 - "
425
0
                    "0x7E). Invalid value: {}",
426
0
                    encoded.str());
427
0
        }
428
429
0
        partition_values.emplace_back(value);
430
0
    }
431
432
0
    return partition_values;
433
0
}
434
435
std::string VHiveTableWriter::_to_partition_value(const DataTypePtr& type_desc,
436
                                                  const ColumnWithTypeAndName& partition_column,
437
0
                                                  int position) {
438
0
    ColumnPtr column;
439
0
    if (auto* nullable_column = check_and_get_column<ColumnNullable>(*partition_column.column)) {
440
0
        auto* __restrict null_map_data = nullable_column->get_null_map_data().data();
441
0
        if (null_map_data[position]) {
442
0
            return "__HIVE_DEFAULT_PARTITION__";
443
0
        }
444
0
        column = nullable_column->get_nested_column_ptr();
445
0
    } else {
446
0
        column = partition_column.column;
447
0
    }
448
0
    auto [item, size] = column->get_data_at(position);
449
0
    switch (type_desc->get_primitive_type()) {
450
0
    case TYPE_BOOLEAN: {
451
0
        Field field = check_and_get_column<const ColumnUInt8>(*column)->operator[](position);
452
0
        return std::to_string(field.get<TYPE_BOOLEAN>());
453
0
    }
454
0
    case TYPE_TINYINT: {
455
0
        return std::to_string(*reinterpret_cast<const Int8*>(item));
456
0
    }
457
0
    case TYPE_SMALLINT: {
458
0
        return std::to_string(*reinterpret_cast<const Int16*>(item));
459
0
    }
460
0
    case TYPE_INT: {
461
0
        return std::to_string(*reinterpret_cast<const Int32*>(item));
462
0
    }
463
0
    case TYPE_BIGINT: {
464
0
        return std::to_string(*reinterpret_cast<const Int64*>(item));
465
0
    }
466
0
    case TYPE_FLOAT: {
467
0
        return std::to_string(*reinterpret_cast<const Float32*>(item));
468
0
    }
469
0
    case TYPE_DOUBLE: {
470
0
        return std::to_string(*reinterpret_cast<const Float64*>(item));
471
0
    }
472
0
    case TYPE_VARCHAR:
473
0
    case TYPE_CHAR:
474
0
    case TYPE_STRING: {
475
0
        return std::string(item, size);
476
0
    }
477
0
    case TYPE_DATE: {
478
0
        VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item);
479
480
0
        char buf[64];
481
0
        char* pos = value.to_string(buf);
482
0
        return std::string(buf, pos - buf - 1);
483
0
    }
484
0
    case TYPE_DATETIME: {
485
0
        VecDateTimeValue value = binary_cast<int64_t, doris::VecDateTimeValue>(*(int64_t*)item);
486
487
0
        char buf[64];
488
0
        char* pos = value.to_string(buf);
489
0
        return std::string(buf, pos - buf - 1);
490
0
    }
491
0
    case TYPE_DATEV2: {
492
0
        DateV2Value<DateV2ValueType> value =
493
0
                binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)item);
494
495
0
        char buf[64];
496
0
        char* pos = value.to_string(buf);
497
0
        return std::string(buf, pos - buf - 1);
498
0
    }
499
0
    case TYPE_DATETIMEV2: {
500
0
        DateV2Value<DateTimeV2ValueType> value =
501
0
                binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)item);
502
503
0
        char buf[64];
504
0
        char* pos = value.to_string(buf, type_desc->get_scale());
505
0
        return std::string(buf, pos - buf - 1);
506
0
    }
507
0
    case TYPE_DECIMALV2: {
508
0
        Decimal128V2 value = *(Decimal128V2*)(item);
509
0
        return value.to_string(type_desc->get_scale());
510
0
    }
511
0
    case TYPE_DECIMAL32: {
512
0
        Decimal32 value = *(Decimal32*)(item);
513
0
        return value.to_string(type_desc->get_scale());
514
0
    }
515
0
    case TYPE_DECIMAL64: {
516
0
        Decimal64 value = *(Decimal64*)(item);
517
0
        return value.to_string(type_desc->get_scale());
518
0
    }
519
0
    case TYPE_DECIMAL128I: {
520
0
        Decimal128V3 value = *(Decimal128V3*)(item);
521
0
        return value.to_string(type_desc->get_scale());
522
0
    }
523
0
    case TYPE_DECIMAL256: {
524
0
        Decimal256 value = *(Decimal256*)(item);
525
0
        return value.to_string(type_desc->get_scale());
526
0
    }
527
0
    default: {
528
0
        throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR,
529
0
                               "Unsupported type for partition {}", type_desc->get_name());
530
0
    }
531
0
    }
532
0
}
533
534
0
std::string VHiveTableWriter::_compute_file_name() {
535
0
    boost::uuids::uuid uuid = boost::uuids::random_generator()();
536
537
0
    std::string uuid_str = boost::uuids::to_string(uuid);
538
539
0
    return fmt::format("{}_{}", print_id(_state->query_id()), uuid_str);
540
0
}
541
542
} // namespace doris