Coverage Report

Created: 2026-03-14 04:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/writer/vtvf_table_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/sink/writer/vtvf_table_writer.h"
19
20
#include <fmt/format.h>
21
22
#include "common/status.h"
23
#include "core/block/block.h"
24
#include "exprs/vexpr.h"
25
#include "exprs/vexpr_context.h"
26
#include "io/file_factory.h"
27
#include "runtime/runtime_state.h"
28
29
namespace doris {
30
31
VTVFTableWriter::VTVFTableWriter(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
32
                                 std::shared_ptr<Dependency> dep,
33
                                 std::shared_ptr<Dependency> fin_dep)
34
0
        : AsyncResultWriter(output_exprs, dep, fin_dep) {
35
0
    _tvf_sink = t_sink.tvf_table_sink;
36
0
}
37
38
0
Status VTVFTableWriter::open(RuntimeState* state, RuntimeProfile* profile) {
39
0
    _state = state;
40
41
    // Init profile counters
42
0
    RuntimeProfile* writer_profile = profile->create_child("VTVFTableWriter", true, true);
43
0
    _written_rows_counter = ADD_COUNTER(writer_profile, "NumWrittenRows", TUnit::UNIT);
44
0
    _written_data_bytes = ADD_COUNTER(writer_profile, "WrittenDataBytes", TUnit::BYTES);
45
0
    _file_write_timer = ADD_TIMER(writer_profile, "FileWriteTime");
46
0
    _writer_close_timer = ADD_TIMER(writer_profile, "FileWriterCloseTime");
47
48
0
    _file_path = _tvf_sink.file_path;
49
0
    _max_file_size_bytes =
50
0
            _tvf_sink.__isset.max_file_size_bytes ? _tvf_sink.max_file_size_bytes : 0;
51
52
0
    VLOG_DEBUG << "TVF table writer open, query_id=" << print_id(_state->query_id())
53
0
               << ", tvf_name=" << _tvf_sink.tvf_name << ", file_path=" << _tvf_sink.file_path
54
0
               << ", file_format=" << _tvf_sink.file_format << ", file_type=" << _tvf_sink.file_type
55
0
               << ", max_file_size_bytes=" << _max_file_size_bytes
56
0
               << ", columns_count=" << (_tvf_sink.__isset.columns ? _tvf_sink.columns.size() : 0);
57
58
0
    return _create_next_file_writer();
59
0
}
60
61
0
Status VTVFTableWriter::write(RuntimeState* state, Block& block) {
62
0
    COUNTER_UPDATE(_written_rows_counter, block.rows());
63
0
    state->update_num_rows_load_total(block.rows());
64
65
0
    {
66
0
        SCOPED_TIMER(_file_write_timer);
67
0
        RETURN_IF_ERROR(_vfile_writer->write(block));
68
0
    }
69
70
0
    _current_written_bytes = _vfile_writer->written_len();
71
72
    // Auto-split if max file size is set
73
0
    if (_max_file_size_bytes > 0) {
74
0
        RETURN_IF_ERROR(_create_new_file_if_exceed_size());
75
0
    }
76
77
0
    return Status::OK();
78
0
}
79
80
0
Status VTVFTableWriter::close(Status status) {
81
0
    if (!status.ok()) {
82
0
        return status;
83
0
    }
84
85
0
    SCOPED_TIMER(_writer_close_timer);
86
0
    return _close_file_writer(true);
87
0
}
88
89
0
Status VTVFTableWriter::_create_file_writer(const std::string& file_name) {
90
0
    bool use_jni = _tvf_sink.__isset.writer_type && _tvf_sink.writer_type == TTVFWriterType::JNI;
91
92
0
    if (!use_jni) {
93
        // Native path: create file writer via FileFactory
94
0
        TFileType::type file_type = _tvf_sink.file_type;
95
0
        std::map<std::string, std::string> properties;
96
0
        if (_tvf_sink.__isset.properties) {
97
0
            properties = _tvf_sink.properties;
98
0
        }
99
100
0
        _file_writer_impl = DORIS_TRY(FileFactory::create_file_writer(
101
0
                file_type, _state->exec_env(), {}, properties, file_name,
102
0
                {.write_file_cache = false, .sync_file_data = false}));
103
0
    }
104
105
    // Factory creates either JNI or native transformer
106
0
    RETURN_IF_ERROR(create_tvf_format_transformer(_tvf_sink, _state,
107
0
                                                  use_jni ? nullptr : _file_writer_impl.get(),
108
0
                                                  _vec_output_expr_ctxs, &_vfile_writer));
109
110
0
    VLOG_DEBUG << "TVF table writer created file: " << file_name
111
0
               << ", format=" << _tvf_sink.file_format << ", use_jni=" << use_jni
112
0
               << ", query_id=" << print_id(_state->query_id());
113
114
0
    return _vfile_writer->open();
115
0
}
116
117
0
Status VTVFTableWriter::_create_next_file_writer() {
118
0
    std::string file_name;
119
0
    RETURN_IF_ERROR(_get_next_file_name(&file_name));
120
0
    return _create_file_writer(file_name);
121
0
}
122
123
0
Status VTVFTableWriter::_close_file_writer(bool done) {
124
0
    if (_vfile_writer) {
125
0
        RETURN_IF_ERROR(_vfile_writer->close());
126
0
        COUNTER_UPDATE(_written_data_bytes, _vfile_writer->written_len());
127
0
        _vfile_writer.reset(nullptr);
128
0
    } else if (_file_writer_impl && _file_writer_impl->state() != io::FileWriter::State::CLOSED) {
129
0
        RETURN_IF_ERROR(_file_writer_impl->close());
130
0
    }
131
132
0
    if (!done) {
133
0
        RETURN_IF_ERROR(_create_next_file_writer());
134
0
    }
135
0
    return Status::OK();
136
0
}
137
138
0
Status VTVFTableWriter::_create_new_file_if_exceed_size() {
139
0
    if (_max_file_size_bytes <= 0 || _current_written_bytes < _max_file_size_bytes) {
140
0
        return Status::OK();
141
0
    }
142
0
    SCOPED_TIMER(_writer_close_timer);
143
0
    RETURN_IF_ERROR(_close_file_writer(false));
144
0
    _current_written_bytes = 0;
145
0
    return Status::OK();
146
0
}
147
148
0
Status VTVFTableWriter::_get_next_file_name(std::string* file_name) {
149
0
    std::string ext;
150
0
    switch (_tvf_sink.file_format) {
151
0
    case TFileFormatType::FORMAT_CSV_PLAIN:
152
0
        ext = "csv";
153
0
        break;
154
0
    case TFileFormatType::FORMAT_PARQUET:
155
0
        ext = "parquet";
156
0
        break;
157
0
    case TFileFormatType::FORMAT_ORC:
158
0
        ext = "orc";
159
0
        break;
160
0
    default:
161
0
        ext = "dat";
162
0
        break;
163
0
    }
164
165
    // file_path is a prefix, generate: {prefix}{query_id}_{idx}.{ext}
166
0
    std::string query_id_str = print_id(_state->query_id());
167
0
    *file_name = fmt::format("{}{}_{}.{}", _file_path, query_id_str, _file_idx, ext);
168
0
    _file_idx++;
169
0
    return Status::OK();
170
0
}
171
172
} // namespace doris