Coverage Report

Created: 2026-03-23 14:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vfile_result_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vfile_result_writer.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Data_types.h>
22
#include <gen_cpp/Metrics_types.h>
23
#include <gen_cpp/PaloInternalService_types.h>
24
#include <gen_cpp/PlanNodes_types.h>
25
#include <glog/logging.h>
26
27
#include <ostream>
28
#include <utility>
29
30
#include "common/compiler_util.h" // IWYU pragma: keep
31
#include "common/consts.h"
32
#include "common/status.h"
33
#include "core/block/block.h"
34
#include "core/column/column_string.h"
35
#include "core/column/column_vector.h"
36
#include "core/data_type/define_primitive_type.h"
37
#include "core/data_type/primitive_type.h"
38
#include "core/value/decimalv2_value.h"
39
#include "core/value/large_int_value.h"
40
#include "exec/operator/result_sink_operator.h"
41
#include "exec/sink/writer/vmysql_result_writer.h"
42
#include "exprs/function/cast/cast_to_string.h"
43
#include "exprs/vexpr.h"
44
#include "exprs/vexpr_context.h"
45
#include "format/transformer/vcsv_transformer.h"
46
#include "format/transformer/vnative_transformer.h"
47
#include "format/transformer/vorc_transformer.h"
48
#include "format/transformer/vparquet_transformer.h"
49
#include "io/file_factory.h"
50
#include "io/fs/broker_file_system.h"
51
#include "io/fs/file_system.h"
52
#include "io/fs/file_writer.h"
53
#include "io/fs/hdfs_file_system.h"
54
#include "io/fs/local_file_system.h"
55
#include "io/fs/s3_file_system.h"
56
#include "io/hdfs_builder.h"
57
#include "runtime/descriptors.h"
58
#include "runtime/result_block_buffer.h"
59
#include "runtime/runtime_state.h"
60
#include "service/backend_options.h"
61
#include "util/mysql_row_buffer.h"
62
#include "util/s3_uri.h"
63
#include "util/s3_util.h"
64
#include "util/uid_util.h"
65
66
namespace doris {
67
68
static double nons_to_second = 1000000000.00;
69
70
VFileResultWriter::VFileResultWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
71
                                     std::shared_ptr<Dependency> dep,
72
                                     std::shared_ptr<Dependency> fin_dep)
73
0
        : AsyncResultWriter(output_exprs, dep, fin_dep) {}
74
75
VFileResultWriter::VFileResultWriter(
76
        const ResultFileOptions* file_opts, const TStorageBackendType::type storage_type,
77
        const TUniqueId fragment_instance_id, const VExprContextSPtrs& output_vexpr_ctxs,
78
        std::shared_ptr<ResultBlockBufferBase> sinker, Block* output_block, bool output_object_data,
79
        const RowDescriptor& output_row_descriptor, std::shared_ptr<Dependency> dep,
80
        std::shared_ptr<Dependency> fin_dep)
81
0
        : AsyncResultWriter(output_vexpr_ctxs, dep, fin_dep),
82
0
          _file_opts(file_opts),
83
0
          _storage_type(storage_type),
84
0
          _fragment_instance_id(fragment_instance_id),
85
0
          _sinker(std::dynamic_pointer_cast<MySQLResultBlockBuffer>(sinker)),
86
0
          _output_block(output_block),
87
0
          _output_row_descriptor(output_row_descriptor) {
88
0
    _output_object_data = output_object_data;
89
0
}
90
91
0
Status VFileResultWriter::open(RuntimeState* state, RuntimeProfile* profile) {
92
0
    _state = state;
93
0
    _init_profile(profile);
94
    // check orc writer version
95
0
    if (_file_opts->file_format == TFileFormatType::FORMAT_ORC &&
96
0
        _file_opts->orc_writer_version < 1) {
97
0
        return Status::InternalError("orc writer version is less than 1.");
98
0
    }
99
    // Deprecated compatibility path. New FE already deletes the target directory in FE
100
    // and clears delete_existing_files before BE execution. Keep this branch only for
101
    // requests from older FE versions during rolling upgrade.
102
0
    if (_file_opts->delete_existing_files) {
103
0
        RETURN_IF_ERROR(_delete_dir());
104
0
    }
105
0
    return _create_next_file_writer();
106
0
}
107
108
0
void VFileResultWriter::_init_profile(RuntimeProfile* parent_profile) {
109
0
    RuntimeProfile* profile = parent_profile->create_child("VFileResultWriter", true, true);
110
0
    _append_row_batch_timer = ADD_TIMER(profile, "AppendBatchTime");
111
0
    _convert_tuple_timer = ADD_CHILD_TIMER(profile, "TupleConvertTime", "AppendBatchTime");
112
0
    _file_write_timer = ADD_CHILD_TIMER(profile, "FileWriteTime", "AppendBatchTime");
113
0
    _writer_close_timer = ADD_TIMER(profile, "FileWriterCloseTime");
114
0
    _written_rows_counter = ADD_COUNTER(profile, "NumWrittenRows", TUnit::UNIT);
115
0
    _written_data_bytes = ADD_COUNTER(profile, "WrittenDataBytes", TUnit::BYTES);
116
0
}
117
118
0
Status VFileResultWriter::_create_next_file_writer() {
119
0
    std::string file_name;
120
0
    RETURN_IF_ERROR(_get_next_file_name(&file_name));
121
0
    return _create_file_writer(file_name);
122
0
}
123
124
0
Status VFileResultWriter::_create_file_writer(const std::string& file_name) {
125
0
    auto file_type = DORIS_TRY(FileFactory::convert_storage_type(_storage_type));
126
0
    _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
127
0
            file_type, _state->exec_env(), _file_opts->broker_addresses,
128
0
            _file_opts->broker_properties, file_name,
129
0
            {
130
0
                    .write_file_cache = false,
131
0
                    .sync_file_data = false,
132
0
            }));
133
0
    switch (_file_opts->file_format) {
134
0
    case TFileFormatType::FORMAT_CSV_PLAIN:
135
0
        _vfile_writer.reset(new VCSVTransformer(
136
0
                _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _output_object_data,
137
0
                _header_type, _header, _file_opts->column_separator, _file_opts->line_delimiter,
138
0
                _file_opts->with_bom, _file_opts->compression_type));
139
0
        break;
140
0
    case TFileFormatType::FORMAT_PARQUET:
141
0
        _vfile_writer.reset(new VParquetTransformer(
142
0
                _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->parquet_schemas,
143
0
                _output_object_data,
144
0
                {_file_opts->parquet_commpression_type, _file_opts->parquet_version,
145
0
                 _file_opts->parquert_disable_dictionary, _file_opts->enable_int96_timestamps}));
146
0
        break;
147
0
    case TFileFormatType::FORMAT_ORC:
148
0
        _vfile_writer.reset(new VOrcTransformer(
149
0
                _state, _file_writer_impl.get(), _vec_output_expr_ctxs, _file_opts->orc_schema, {},
150
0
                _output_object_data, _file_opts->orc_compression_type));
151
0
        break;
152
0
    case TFileFormatType::FORMAT_NATIVE:
153
        // Doris Native binary format writer with configurable compression.
154
0
        _vfile_writer.reset(new VNativeTransformer(_state, _file_writer_impl.get(),
155
0
                                                   _vec_output_expr_ctxs, _output_object_data,
156
0
                                                   _file_opts->compression_type));
157
0
        break;
158
0
    default:
159
0
        return Status::InternalError("unsupported file format: {}", _file_opts->file_format);
160
0
    }
161
0
    LOG(INFO) << "create file for exporting query result. file name: " << file_name
162
0
              << ". query id: " << print_id(_state->query_id())
163
0
              << " format:" << _file_opts->file_format;
164
165
0
    return _vfile_writer->open();
166
0
}
167
168
// file name format as: my_prefix_{fragment_instance_id}_0.csv
169
0
Status VFileResultWriter::_get_next_file_name(std::string* file_name) {
170
0
    std::string suffix =
171
0
            _file_opts->file_suffix.empty() ? _file_format_to_name() : _file_opts->file_suffix;
172
0
    std::stringstream ss;
173
0
    ss << _file_opts->file_path << print_id(_fragment_instance_id) << "_" << (_file_idx++) << "."
174
0
       << suffix;
175
0
    *file_name = ss.str();
176
0
    if (_storage_type == TStorageBackendType::LOCAL) {
177
        // For local file writer, the file_path is a local dir.
178
        // Here we do a simple security verification by checking whether the file exists.
179
        // Because the file path is currently arbitrarily specified by the user,
180
        // Doris is not responsible for ensuring the correctness of the path.
181
        // This is just to prevent overwriting the existing file.
182
0
        bool exists = true;
183
0
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(*file_name, &exists));
184
0
        if (exists) {
185
0
            return Status::InternalError("File already exists: {}", *file_name);
186
0
        }
187
0
    }
188
189
0
    return Status::OK();
190
0
}
191
192
// file url format as:
193
// LOCAL: file:///localhost_address/{file_path}{fragment_instance_id}_
194
// S3: {file_path}{fragment_instance_id}_
195
// BROKER: {file_path}{fragment_instance_id}_
196
197
0
void VFileResultWriter::_get_file_url(std::string* file_url) {
198
0
    std::stringstream ss;
199
0
    if (_storage_type == TStorageBackendType::LOCAL) {
200
0
        ss << "file:///" << BackendOptions::get_localhost();
201
0
    }
202
0
    ss << _file_opts->file_path;
203
0
    ss << print_id(_fragment_instance_id) << "_";
204
0
    *file_url = ss.str();
205
0
}
206
207
0
std::string VFileResultWriter::_file_format_to_name() {
208
0
    switch (_file_opts->file_format) {
209
0
    case TFileFormatType::FORMAT_CSV_PLAIN:
210
0
        return "csv" + _compression_type_to_name();
211
0
    case TFileFormatType::FORMAT_PARQUET:
212
0
        return "parquet";
213
0
    case TFileFormatType::FORMAT_ORC:
214
0
        return "orc";
215
0
    case TFileFormatType::FORMAT_NATIVE:
216
0
        return "native";
217
0
    default:
218
0
        return "unknown";
219
0
    }
220
0
}
221
222
0
std::string VFileResultWriter::_compression_type_to_name() {
223
0
    switch (_file_opts->compression_type) {
224
0
    case TFileCompressType::GZ:
225
0
        return ".gz";
226
0
    case TFileCompressType::BZ2:
227
0
        return ".bz2";
228
0
    case TFileCompressType::SNAPPYBLOCK:
229
0
        return ".snappy";
230
0
    case TFileCompressType::LZ4BLOCK:
231
0
        return ".lz4";
232
0
    case TFileCompressType::ZSTD:
233
0
        return ".zstd";
234
0
    default:
235
0
        return "";
236
0
    }
237
0
}
238
239
0
Status VFileResultWriter::write(RuntimeState* state, Block& block) {
240
0
    if (block.rows() == 0) {
241
0
        return Status::OK();
242
0
    }
243
0
    SCOPED_TIMER(_append_row_batch_timer);
244
0
    Block output_block;
245
0
    RETURN_IF_ERROR(_projection_block(block, &output_block));
246
0
    RETURN_IF_ERROR(_write_file(output_block));
247
248
0
    _written_rows += block.rows();
249
0
    return Status::OK();
250
0
}
251
252
0
Status VFileResultWriter::_write_file(const Block& block) {
253
0
    {
254
0
        SCOPED_TIMER(_file_write_timer);
255
0
        RETURN_IF_ERROR(_vfile_writer->write(block));
256
0
    }
257
    // split file if exceed limit
258
    // the written len from file writer may not be updated in real-time,
259
    // because for orc writer, the orc OutputStream only flush when
260
    // bufferred data is larger than strip size(default 64MB).
261
    // So even if max_file_size_bytes set to 5MB, the file size will still
262
    // be 64MB.
263
    // TODO: opt this logic
264
0
    _current_written_bytes = _vfile_writer->written_len();
265
0
    return _create_new_file_if_exceed_size();
266
0
}
267
268
0
Status VFileResultWriter::_create_new_file_if_exceed_size() {
269
0
    if (_current_written_bytes < _file_opts->max_file_size_bytes) {
270
0
        return Status::OK();
271
0
    }
272
    // current file size exceed the max file size. close this file
273
    // and create new one
274
0
    {
275
0
        SCOPED_TIMER(_writer_close_timer);
276
0
        RETURN_IF_ERROR(_close_file_writer(false));
277
0
    }
278
0
    _current_written_bytes = 0;
279
0
    return Status::OK();
280
0
}
281
282
0
Status VFileResultWriter::_close_file_writer(bool done) {
283
0
    if (_vfile_writer) {
284
0
        RETURN_IF_ERROR(_vfile_writer->close());
285
        // we can not use _current_written_bytes to COUNTER_UPDATE(_written_data_bytes, _current_written_bytes)
286
        // because it will call `write()` function of orc/parquet function in `_vfile_writer->close()`
287
        // and the real written_len will increase
288
        // and _current_written_bytes will less than _vfile_writer->written_len()
289
0
        COUNTER_UPDATE(_written_data_bytes, _vfile_writer->written_len());
290
0
        _vfile_writer.reset(nullptr);
291
0
    } else if (_file_writer_impl && _file_writer_impl->state() != io::FileWriter::State::CLOSED) {
292
0
        RETURN_IF_ERROR(_file_writer_impl->close());
293
0
    }
294
295
0
    if (!done) {
296
        // not finished, create new file writer for next file
297
0
        RETURN_IF_ERROR(_create_next_file_writer());
298
0
    } else {
299
        // All data is written to file, send statistic result
300
0
        if (_output_block == nullptr) {
301
0
            RETURN_IF_ERROR(_send_result());
302
0
        } else {
303
0
            RETURN_IF_ERROR(_fill_result_block());
304
0
        }
305
0
    }
306
0
    return Status::OK();
307
0
}
308
309
template <typename SRC>
310
0
void direct_write_to_mysql_result_int(std::string& mysql_rows, const SRC& value) {
311
0
    auto str = CastToString::from_number(value);
312
0
    direct_write_to_mysql_result_string(mysql_rows, str.c_str(), str.size());
313
0
}
Unexecuted instantiation: _ZN5doris32direct_write_to_mysql_result_intIiEEvRNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
Unexecuted instantiation: _ZN5doris32direct_write_to_mysql_result_intIlEEvRNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEERKT_
314
315
0
Status VFileResultWriter::_send_result() {
316
0
    if (_is_result_sent) {
317
0
        return Status::OK();
318
0
    }
319
0
    _is_result_sent = true;
320
321
    // The final stat result include:
322
    // | FileNumber      | Int     |
323
    // | TotalRows       | Bigint  |
324
    // | FileSize        | Bigint  |
325
    // | URL             | Varchar |
326
    // | WriteTimeSec    | Varchar |
327
    // | WriteSpeedKB    | Varchar |
328
    // The type of these field should be consistent with types defined in OutFileClause.java of FE.
329
330
0
    auto result = std::make_shared<TFetchDataResult>();
331
0
    result->result_batch.rows.resize(1);
332
0
    auto& mysql_output = result->result_batch.rows[0];
333
334
0
    direct_write_to_mysql_result_int(mysql_output, _file_idx);                      // FileNumber
335
0
    direct_write_to_mysql_result_int(mysql_output, _written_rows_counter->value()); // TotalRows
336
0
    direct_write_to_mysql_result_int(mysql_output, _written_data_bytes->value());   // FileSize
337
0
    std::string file_url;
338
0
    _get_file_url(&file_url);
339
0
    std::stringstream ss;
340
0
    ss << file_url << "*";
341
0
    file_url = ss.str();
342
0
    direct_write_to_mysql_result_string(mysql_output, file_url.c_str(),
343
0
                                        file_url.length()); // URL
344
0
    double write_time = _file_write_timer->value() / nons_to_second;
345
0
    std::string formatted_write_time = fmt::format("{:.3f}", write_time);
346
0
    direct_write_to_mysql_result_string(mysql_output, formatted_write_time.c_str(),
347
0
                                        formatted_write_time.length()); // WriteTimeSec
348
349
0
    double write_speed = _get_write_speed(_written_data_bytes->value(), _file_write_timer->value());
350
0
    std::string formatted_write_speed = fmt::format("{:.2f}", write_speed);
351
0
    direct_write_to_mysql_result_string(mysql_output, formatted_write_speed.c_str(),
352
0
                                        formatted_write_speed.length()); // WriteSpeedKB
353
354
0
    std::map<std::string, std::string> attach_infos;
355
0
    attach_infos.insert(std::make_pair("FileNumber", std::to_string(_file_idx)));
356
0
    attach_infos.insert(
357
0
            std::make_pair("TotalRows", std::to_string(_written_rows_counter->value())));
358
0
    attach_infos.insert(std::make_pair("FileSize", std::to_string(_written_data_bytes->value())));
359
0
    attach_infos.insert(std::make_pair("URL", file_url));
360
0
    attach_infos.insert(std::make_pair("WriteTimeSec", formatted_write_time));
361
0
    attach_infos.insert(std::make_pair("WriteSpeedKB", formatted_write_speed));
362
363
0
    result->result_batch.__set_attached_infos(attach_infos);
364
0
    RETURN_NOT_OK_STATUS_WITH_WARN(_sinker->add_batch(_state, result),
365
0
                                   "failed to send outfile result");
366
0
    return Status::OK();
367
0
}
368
369
0
Status VFileResultWriter::_fill_result_block() {
370
0
    if (_is_result_sent) {
371
0
        return Status::OK();
372
0
    }
373
0
    _is_result_sent = true;
374
375
0
#ifndef INSERT_TO_COLUMN
376
0
#define INSERT_TO_COLUMN                                                                    \
377
0
    if (i == 0) {                                                                           \
378
0
        column->insert_data(reinterpret_cast<const char*>(&_file_idx), 0);                  \
379
0
    } else if (i == 1) {                                                                    \
380
0
        int64_t written_rows = _written_rows_counter->value();                              \
381
0
        column->insert_data(reinterpret_cast<const char*>(&written_rows), 0);               \
382
0
    } else if (i == 2) {                                                                    \
383
0
        int64_t written_data_bytes = _written_data_bytes->value();                          \
384
0
        column->insert_data(reinterpret_cast<const char*>(&written_data_bytes), 0);         \
385
0
    } else if (i == 3) {                                                                    \
386
0
        std::string file_url;                                                               \
387
0
        static_cast<void>(_get_file_url(&file_url));                                        \
388
0
        column->insert_data(file_url.c_str(), file_url.size());                             \
389
0
    } else if (i == 4) {                                                                    \
390
0
        double write_time = _file_write_timer->value() / nons_to_second;                    \
391
0
        std::string formatted_write_time = fmt::format("{:.3f}", write_time);               \
392
0
        column->insert_data(formatted_write_time.c_str(), formatted_write_time.size());     \
393
0
    } else if (i == 5) {                                                                    \
394
0
        double write_speed =                                                                \
395
0
                _get_write_speed(_written_data_bytes->value(), _file_write_timer->value()); \
396
0
        std::string formatted_write_speed = fmt::format("{:.2f}", write_speed);             \
397
0
        column->insert_data(formatted_write_speed.c_str(), formatted_write_speed.size());   \
398
0
    }                                                                                       \
399
0
    _output_block->replace_by_position(i, std::move(column));
400
0
#endif
401
402
0
    for (int i = 0; i < _output_block->columns(); i++) {
403
0
        switch (_output_row_descriptor.tuple_descriptors()[0]
404
0
                        ->slots()[i]
405
0
                        ->type()
406
0
                        ->get_primitive_type()) {
407
0
        case TYPE_INT: {
408
0
            auto column = ColumnInt32::create();
409
0
            INSERT_TO_COLUMN;
410
0
            break;
411
0
        }
412
0
        case TYPE_BIGINT: {
413
0
            auto column = ColumnInt64::create();
414
0
            INSERT_TO_COLUMN;
415
0
            break;
416
0
        }
417
0
        case TYPE_VARCHAR:
418
0
        case TYPE_CHAR:
419
0
        case TYPE_STRING: {
420
0
            auto column = ColumnString::create();
421
0
            INSERT_TO_COLUMN;
422
0
            break;
423
0
        }
424
0
        default:
425
0
            return Status::InternalError("Invalid type to print: {}",
426
0
                                         _output_row_descriptor.tuple_descriptors()[0]
427
0
                                                 ->slots()[i]
428
0
                                                 ->type()
429
0
                                                 ->get_primitive_type());
430
0
        }
431
0
    }
432
0
    return Status::OK();
433
0
}
434
435
0
Status VFileResultWriter::_delete_dir() {
436
    // get dir of file_path
437
0
    std::string dir = _file_opts->file_path.substr(0, _file_opts->file_path.find_last_of('/') + 1);
438
0
    switch (_storage_type) {
439
0
    case TStorageBackendType::LOCAL:
440
0
        return io::global_local_filesystem()->delete_directory(dir);
441
0
    case TStorageBackendType::BROKER: {
442
0
        auto fs = DORIS_TRY(io::BrokerFileSystem::create(_file_opts->broker_addresses[0],
443
0
                                                         _file_opts->broker_properties,
444
0
                                                         io::FileSystem::TMP_FS_ID));
445
0
        return fs->delete_directory(dir);
446
0
    }
447
0
    case TStorageBackendType::HDFS: {
448
0
        THdfsParams hdfs_params = parse_properties(_file_opts->broker_properties);
449
0
        auto fs = DORIS_TRY(io::HdfsFileSystem::create(hdfs_params, hdfs_params.fs_name,
450
0
                                                       io::FileSystem::TMP_FS_ID, nullptr));
451
0
        return fs->delete_directory(dir);
452
0
    }
453
0
    case TStorageBackendType::S3: {
454
0
        S3URI s3_uri(dir);
455
0
        RETURN_IF_ERROR(s3_uri.parse());
456
0
        S3Conf s3_conf;
457
0
        std::shared_ptr<io::S3FileSystem> s3_fs = nullptr;
458
0
        RETURN_IF_ERROR(S3ClientFactory::convert_properties_to_s3_conf(
459
0
                _file_opts->broker_properties, s3_uri, &s3_conf));
460
0
        auto fs = DORIS_TRY(io::S3FileSystem::create(s3_conf, io::FileSystem::TMP_FS_ID));
461
0
        return fs->delete_directory(dir);
462
0
    }
463
0
    default:
464
0
        return Status::NotSupported("Unsupported storage type: {}", std::to_string(_storage_type));
465
0
    }
466
0
}
467
468
0
double VFileResultWriter::_get_write_speed(int64_t write_bytes, int64_t write_time) {
469
0
    if (write_time <= 0) {
470
0
        return 0;
471
0
    }
472
    // KB / s
473
0
    return ((write_bytes * nons_to_second) / (write_time)) / 1024;
474
0
}
475
476
0
Status VFileResultWriter::close(Status exec_status) {
477
0
    Status st = exec_status;
478
0
    if (st.ok()) {
479
        // the following 2 profile "_written_rows_counter" and "_writer_close_timer"
480
        // must be outside the `_close_file_writer()`.
481
        // because `_close_file_writer()` may be called in deconstructor,
482
        // at that time, the RuntimeState may already been deconstructed,
483
        // so does the profile in RuntimeState.
484
0
        if (_written_rows_counter) {
485
0
            COUNTER_SET(_written_rows_counter, _written_rows);
486
0
            SCOPED_TIMER(_writer_close_timer);
487
0
        }
488
0
        st = _close_file_writer(true);
489
0
    }
490
0
    return st;
491
0
}
492
493
} // namespace doris