Coverage Report

Created: 2026-07-03 19:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vhive_partition_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vhive_partition_writer.h"
19
20
#include <aws/s3/model/CompletedPart.h>
21
22
#include "core/block/materialize_block.h"
23
#include "core/column/column_map.h"
24
#include "format/transformer/vcsv_transformer.h"
25
#include "format/transformer/vorc_transformer.h"
26
#include "format/transformer/vparquet_transformer.h"
27
#include "io/file_factory.h"
28
#include "io/fs/s3_file_writer.h"
29
#include "runtime/runtime_state.h"
30
31
namespace doris {
32
33
VHivePartitionWriter::VHivePartitionWriter(const TDataSink& t_sink, std::string partition_name,
34
                                           TUpdateMode::type update_mode,
35
                                           const VExprContextSPtrs& write_output_expr_ctxs,
36
                                           std::vector<std::string> write_column_names,
37
                                           WriteInfo write_info, std::string file_name,
38
                                           int file_name_index,
39
                                           TFileFormatType::type file_format_type,
40
                                           TFileCompressType::type hive_compress_type,
41
                                           const THiveSerDeProperties* hive_serde_properties,
42
                                           const std::map<std::string, std::string>& hadoop_conf)
43
0
        : _partition_name(std::move(partition_name)),
44
0
          _update_mode(update_mode),
45
0
          _write_output_expr_ctxs(write_output_expr_ctxs),
46
0
          _write_column_names(std::move(write_column_names)),
47
0
          _write_info(std::move(write_info)),
48
0
          _file_name(std::move(file_name)),
49
0
          _file_name_index(file_name_index),
50
0
          _file_format_type(file_format_type),
51
0
          _hive_compress_type(hive_compress_type),
52
0
          _hive_serde_properties(hive_serde_properties),
53
0
          _hadoop_conf(hadoop_conf) {}
54
55
0
Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* operator_profile) {
56
0
    _state = state;
57
58
0
    io::FSPropertiesRef fs_properties(_write_info.file_type);
59
0
    fs_properties.properties = &_hadoop_conf;
60
0
    if (!_write_info.broker_addresses.empty()) {
61
0
        fs_properties.broker_addresses = &(_write_info.broker_addresses);
62
0
    }
63
0
    io::FileDescription file_description = {
64
0
            .path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name()),
65
0
            .fs_name {}};
66
0
    _fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
67
0
    io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
68
0
    RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options));
69
70
0
    switch (_file_format_type) {
71
0
    case TFileFormatType::FORMAT_PARQUET: {
72
0
        TParquetCompressionType::type parquet_compression_type;
73
0
        switch (_hive_compress_type) {
74
0
        case TFileCompressType::PLAIN: {
75
0
            parquet_compression_type = TParquetCompressionType::UNCOMPRESSED;
76
0
            break;
77
0
        }
78
0
        case TFileCompressType::SNAPPYBLOCK: {
79
0
            parquet_compression_type = TParquetCompressionType::SNAPPY;
80
0
            break;
81
0
        }
82
0
        case TFileCompressType::ZSTD: {
83
0
            parquet_compression_type = TParquetCompressionType::ZSTD;
84
0
            break;
85
0
        }
86
0
        case TFileCompressType::LZ4BLOCK: {
87
            // Hadoop-framed Parquet LZ4 (not LZ4_RAW) for cross-engine compatibility.
88
0
            parquet_compression_type = TParquetCompressionType::LZ4_HADOOP;
89
0
            break;
90
0
        }
91
0
        default: {
92
0
            return Status::InternalError("Unsupported hive compress type {} with parquet",
93
0
                                         to_string(_hive_compress_type));
94
0
        }
95
0
        }
96
        // TODO: INT96 is kept for Hive 2/3 compatibility. Add an explicit option before
97
        // changing the default Hive parquet timestamp encoding to standard logical types.
98
0
        ParquetFileOptions parquet_options = {parquet_compression_type,
99
0
                                              TParquetVersion::PARQUET_1_0, false, true};
100
0
        _file_format_transformer = std::make_unique<VParquetTransformer>(
101
0
                state, _file_writer.get(), _write_output_expr_ctxs, _write_column_names, false,
102
0
                parquet_options);
103
0
        return _file_format_transformer->open();
104
0
    }
105
0
    case TFileFormatType::FORMAT_ORC: {
106
0
        _file_format_transformer = std::make_unique<VOrcTransformer>(
107
0
                state, _file_writer.get(), _write_output_expr_ctxs, "", _write_column_names, false,
108
0
                _hive_compress_type);
109
0
        return _file_format_transformer->open();
110
0
    }
111
0
    case TFileFormatType::FORMAT_CSV_PLAIN: {
112
        // TODO(syt): support hive csv table, only hive text file is supportted now
113
0
        _file_format_transformer = std::make_unique<VCSVTransformer>(
114
0
                state, _file_writer.get(), _write_output_expr_ctxs, false, "csv", "",
115
0
                _hive_serde_properties->field_delim, _hive_serde_properties->line_delim, false,
116
0
                _hive_compress_type, _hive_serde_properties);
117
0
        return _file_format_transformer->open();
118
0
    }
119
0
    default: {
120
0
        return Status::InternalError("Unsupported file format type {}",
121
0
                                     to_string(_file_format_type));
122
0
    }
123
0
    }
124
0
}
125
126
0
Status VHivePartitionWriter::close(const Status& status) {
127
0
    Status result_status;
128
0
    if (_file_format_transformer != nullptr) {
129
0
        result_status = _file_format_transformer->close();
130
0
        if (!result_status.ok()) {
131
0
            LOG(WARNING) << fmt::format("_file_format_transformer close failed, reason: {}",
132
0
                                        result_status.to_string());
133
0
        }
134
0
    }
135
0
    bool status_ok = result_status.ok() && status.ok();
136
0
    if (!status_ok) {
137
0
        _add_s3_mpu_pending_upload_for_rollback();
138
0
        if (_fs != nullptr) {
139
            // delete the actual created file, otherwise an orphan file is left behind
140
0
            auto path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name());
141
0
            Status st = _fs->delete_file(path);
142
0
            if (!st.ok()) {
143
0
                LOG(WARNING) << fmt::format("Delete file {} failed, reason: {}", path,
144
0
                                            st.to_string());
145
0
            }
146
0
        }
147
0
    }
148
0
    if (status_ok) {
149
0
        auto partition_update = _build_partition_update();
150
0
        _state->add_hive_partition_updates(partition_update);
151
0
    }
152
0
    return result_status;
153
0
}
154
155
0
Status VHivePartitionWriter::write(Block& block) {
156
0
    RETURN_IF_ERROR(_file_format_transformer->write(block));
157
0
    _row_count += block.rows();
158
0
    return Status::OK();
159
0
}
160
161
0
THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
162
0
    THivePartitionUpdate hive_partition_update;
163
0
    hive_partition_update.__set_name(_partition_name);
164
0
    hive_partition_update.__set_update_mode(_update_mode);
165
0
    THiveLocationParams location;
166
0
    location.__set_write_path(_write_info.original_write_path);
167
0
    location.__set_target_path(_write_info.target_path);
168
0
    hive_partition_update.__set_location(location);
169
0
    hive_partition_update.__set_file_names({_get_target_file_name()});
170
0
    hive_partition_update.__set_row_count(_row_count);
171
0
    DCHECK(_file_format_transformer != nullptr);
172
0
    hive_partition_update.__set_file_size(_file_format_transformer->written_len());
173
174
0
    TS3MPUPendingUpload s3_mpu_pending_upload;
175
0
    if (_build_s3_mpu_pending_upload(&s3_mpu_pending_upload)) {
176
0
        hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
177
0
    }
178
0
    return hive_partition_update;
179
0
}
180
181
0
bool VHivePartitionWriter::_build_s3_mpu_pending_upload(TS3MPUPendingUpload* pending_upload) {
182
0
    DCHECK(pending_upload != nullptr);
183
0
    if (_write_info.file_type != TFileType::FILE_S3 || _file_writer == nullptr) {
184
0
        return false;
185
0
    }
186
187
0
    doris::io::S3FileWriter* s3_mpu_file_writer =
188
0
            dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
189
0
    DCHECK(s3_mpu_file_writer != nullptr);
190
0
    std::string upload_id = s3_mpu_file_writer->upload_id();
191
0
    if (upload_id.empty()) {
192
0
        return false;
193
0
    }
194
195
0
    pending_upload->__set_bucket(s3_mpu_file_writer->bucket());
196
0
    pending_upload->__set_key(s3_mpu_file_writer->key());
197
0
    pending_upload->__set_upload_id(upload_id);
198
199
0
    std::map<int, std::string> etags;
200
0
    for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
201
0
        etags.insert({completed_part.part_num, completed_part.etag});
202
0
    }
203
0
    pending_upload->__set_etags(etags);
204
0
    return true;
205
0
}
206
207
0
void VHivePartitionWriter::_add_s3_mpu_pending_upload_for_rollback() {
208
0
    TS3MPUPendingUpload s3_mpu_pending_upload;
209
0
    if (!_build_s3_mpu_pending_upload(&s3_mpu_pending_upload)) {
210
0
        return;
211
0
    }
212
213
0
    THivePartitionUpdate hive_partition_update;
214
0
    hive_partition_update.__set_name(_partition_name);
215
0
    hive_partition_update.__set_update_mode(_update_mode);
216
0
    THiveLocationParams location;
217
0
    location.__set_write_path(_write_info.original_write_path);
218
0
    location.__set_target_path(_write_info.target_path);
219
0
    hive_partition_update.__set_location(location);
220
0
    hive_partition_update.__set_file_names({});
221
0
    hive_partition_update.__set_row_count(0);
222
0
    hive_partition_update.__set_file_size(0);
223
0
    hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
224
0
    _state->add_hive_partition_updates(hive_partition_update);
225
0
}
226
227
std::string VHivePartitionWriter::_get_file_extension(TFileFormatType::type file_format_type,
228
0
                                                      TFileCompressType::type write_compress_type) {
229
0
    std::string compress_name;
230
0
    switch (write_compress_type) {
231
0
    case TFileCompressType::SNAPPYBLOCK: {
232
0
        compress_name = ".snappy";
233
0
        break;
234
0
    }
235
0
    case TFileCompressType::ZLIB: {
236
0
        compress_name = ".zlib";
237
0
        break;
238
0
    }
239
0
    case TFileCompressType::ZSTD: {
240
0
        compress_name = ".zst";
241
0
        break;
242
0
    }
243
0
    case TFileCompressType::GZ: {
244
0
        compress_name = ".gz";
245
0
        break;
246
0
    }
247
0
    case TFileCompressType::BZ2: {
248
0
        compress_name = ".bz2";
249
0
        break;
250
0
    }
251
0
    case TFileCompressType::LZ4BLOCK: {
252
0
        compress_name = ".lz4";
253
0
        break;
254
0
    }
255
0
    default: {
256
0
        compress_name = "";
257
0
        break;
258
0
    }
259
0
    }
260
261
0
    std::string file_format_name;
262
0
    switch (file_format_type) {
263
0
    case TFileFormatType::FORMAT_PARQUET: {
264
0
        file_format_name = ".parquet";
265
0
        break;
266
0
    }
267
0
    case TFileFormatType::FORMAT_ORC: {
268
0
        file_format_name = ".orc";
269
0
        break;
270
0
    }
271
0
    default: {
272
0
        file_format_name = "";
273
0
        break;
274
0
    }
275
0
    }
276
0
    return fmt::format("{}{}", compress_name, file_format_name);
277
0
}
278
279
0
std::string VHivePartitionWriter::_get_target_file_name() {
280
0
    return fmt::format("{}-{}{}", _file_name, _file_name_index,
281
0
                       _get_file_extension(_file_format_type, _hive_compress_type));
282
0
}
283
284
} // namespace doris