Coverage Report

Created: 2026-03-13 05:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/packed_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/packed_file_writer.h"
19
20
#include <bvar/recorder.h>
21
#include <bvar/window.h>
22
#include <glog/logging.h>
23
24
#include "cloud/config.h"
25
#include "common/config.h"
26
#include "common/status.h"
27
#include "runtime/exec_env.h"
28
#include "util/slice.h"
29
30
namespace doris::io {
31
32
bvar::IntRecorder packed_file_writer_first_append_to_close_ms_recorder;
33
bvar::Window<bvar::IntRecorder> packed_file_writer_first_append_to_close_ms_window(
34
        "packed_file_writer_first_append_to_close_ms",
35
        &packed_file_writer_first_append_to_close_ms_recorder, /*window_size=*/10);
36
37
PackedFileWriter::PackedFileWriter(FileWriterPtr inner_writer, Path path,
38
                                   PackedAppendContext append_info)
39
63.4k
        : _inner_writer(std::move(inner_writer)),
40
63.4k
          _file_path(path.native()),
41
63.4k
          _packed_file_manager(PackedFileManager::instance()),
42
63.4k
          _append_info(std::move(append_info)) {
43
63.4k
    DCHECK(_inner_writer != nullptr);
44
63.4k
    DCHECK(!_file_path.empty());
45
63.4k
}
46
47
63.7k
PackedFileWriter::~PackedFileWriter() {
48
63.7k
    if (_state == State::OPENED) {
49
68
        LOG(WARNING) << "PackedFileWriter destroyed without being closed, file: " << _file_path;
50
68
    }
51
63.7k
}
52
53
2.29M
Status PackedFileWriter::appendv(const Slice* data, size_t data_cnt) {
54
2.29M
    if (_state != State::OPENED) {
55
0
        return Status::InternalError("Cannot append to closed or closing writer for file: " +
56
0
                                     _file_path);
57
0
    }
58
59
2.29M
    if (!_first_append_timestamp.has_value()) {
60
63.6k
        _first_append_timestamp = std::chrono::steady_clock::now();
61
63.6k
    }
62
63
    // Calculate total size to append
64
2.29M
    size_t total_size = 0;
65
9.10M
    for (size_t i = 0; i < data_cnt; ++i) {
66
6.80M
        total_size += data[i].size;
67
6.80M
    }
68
69
2.29M
    if (total_size == 0) {
70
1
        return Status::OK();
71
1
    }
72
73
    // Check if we should switch to direct write mode
74
2.29M
    if (!_is_direct_write && _bytes_appended + total_size > config::small_file_threshold_bytes) {
75
232
        RETURN_IF_ERROR(_switch_to_direct_write());
76
232
        _is_direct_write = true;
77
232
    }
78
79
    // Write data based on current mode
80
2.29M
    if (_is_direct_write) {
81
51.9k
        RETURN_IF_ERROR(_inner_writer->appendv(data, data_cnt));
82
2.24M
    } else {
83
        // Buffer small file data
84
8.89M
        for (size_t i = 0; i < data_cnt; ++i) {
85
6.64M
            _buffer.append(data[i].data, data[i].size);
86
6.64M
        }
87
2.24M
    }
88
89
2.29M
    _bytes_appended += total_size;
90
2.29M
    return Status::OK();
91
2.29M
}
92
93
127k
Status PackedFileWriter::close(bool non_block) {
94
127k
    if (_state == State::CLOSED) {
95
0
        return Status::OK();
96
0
    }
97
98
127k
    auto record_close_latency = [this]() {
99
63.5k
        if (_close_latency_recorded || !_first_append_timestamp.has_value()) {
100
58
            return;
101
58
        }
102
63.4k
        auto now = std::chrono::steady_clock::now();
103
63.4k
        auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
104
63.4k
                                  now - *_first_append_timestamp)
105
63.4k
                                  .count();
106
63.4k
        packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
107
63.5k
        if (auto* sampler = packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
108
63.5k
            sampler->take_sample();
109
63.5k
        }
110
63.4k
        _close_latency_recorded = true;
111
63.4k
    };
112
113
127k
    if (_state == State::ASYNC_CLOSING) {
114
63.6k
        if (non_block) {
115
0
            return Status::InternalError("Don't submit async close multi times");
116
0
        }
117
63.6k
        if (!_is_direct_write) {
118
63.4k
            RETURN_IF_ERROR(_wait_packed_upload());
119
63.4k
        } else {
120
233
            RETURN_IF_ERROR(_inner_writer->close(false));
121
233
        }
122
63.6k
        _state = State::CLOSED;
123
63.6k
        if (!non_block) {
124
63.5k
            record_close_latency();
125
63.5k
        }
126
63.6k
        return Status::OK();
127
63.6k
    }
128
129
63.5k
    if (non_block) {
130
63.5k
        return _close_async();
131
63.5k
    } else {
132
2
        return _close_sync();
133
2
    }
134
63.5k
}
135
136
63.5k
Status PackedFileWriter::_close_async() {
137
63.5k
    if (!_is_direct_write) {
138
        // Send small file data to packed manager
139
63.3k
        RETURN_IF_ERROR(_send_to_packed_manager());
140
63.3k
    } else {
141
        // For large files, just close the inner writer asynchronously
142
242
        RETURN_IF_ERROR(_inner_writer->close(true));
143
242
    }
144
63.5k
    _state = State::ASYNC_CLOSING;
145
63.5k
    return Status::OK();
146
63.5k
}
147
148
1
Status PackedFileWriter::_close_sync() {
149
1
    if (!_is_direct_write) {
150
        // Send small file data to pack manager and wait for upload
151
1
        RETURN_IF_ERROR(_send_to_packed_manager());
152
0
        RETURN_IF_ERROR(_wait_packed_upload());
153
0
    } else {
154
        // For large files, close the inner writer synchronously
155
0
        RETURN_IF_ERROR(_inner_writer->close(false));
156
0
    }
157
0
    _state = State::CLOSED;
158
0
    if (!_close_latency_recorded) {
159
0
        auto now = std::chrono::steady_clock::now();
160
0
        if (_first_append_timestamp.has_value()) {
161
0
            auto latency_ms = std::chrono::duration_cast<std::chrono::milliseconds>(
162
0
                                      now - *_first_append_timestamp)
163
0
                                      .count();
164
0
            packed_file_writer_first_append_to_close_ms_recorder << latency_ms;
165
0
            if (auto* sampler =
166
0
                        packed_file_writer_first_append_to_close_ms_recorder.get_sampler()) {
167
0
                sampler->take_sample();
168
0
            }
169
0
            _close_latency_recorded = true;
170
0
        }
171
0
    }
172
0
    return Status::OK();
173
1
}
174
175
63.4k
Status PackedFileWriter::_wait_packed_upload() {
176
63.4k
    DCHECK(!_is_direct_write);
177
    // Only wait if we have data that was sent to packed manager
178
63.4k
    if (_bytes_appended > 0 && _packed_file_manager != nullptr) {
179
63.3k
        return _packed_file_manager->wait_upload_done(_file_path);
180
63.3k
    }
181
49
    return Status::OK();
182
63.4k
}
183
184
232
Status PackedFileWriter::_switch_to_direct_write() {
185
232
    DCHECK(!_is_direct_write);
186
187
    // If we have buffered data, write it to inner writer first
188
232
    if (_buffer.size() > 0) {
189
231
        Slice buffer_slice(_buffer.data(), _buffer.size());
190
231
        RETURN_IF_ERROR(_inner_writer->appendv(&buffer_slice, 1));
191
231
        _buffer.clear();
192
231
    }
193
194
232
    return Status::OK();
195
232
}
196
197
63.3k
Status PackedFileWriter::_send_to_packed_manager() {
198
63.3k
    DCHECK(!_is_direct_write);
199
200
63.3k
    if (_packed_file_manager == nullptr) {
201
0
        return Status::InternalError("PackedFileManager is not available");
202
0
    }
203
63.3k
    LOG(INFO) << "send_to_packed_manager: " << _file_path << " buffer size: " << _buffer.size();
204
205
63.3k
    if (_append_info.resource_id.empty()) {
206
0
        return Status::InternalError("Missing resource id for packed file append");
207
0
    }
208
209
63.3k
    if (_append_info.txn_id <= 0) {
210
0
        return Status::InvalidArgument("Missing valid txn id for packed file append: " +
211
0
                                       _file_path);
212
0
    }
213
214
63.3k
    Slice data_slice(_buffer.data(), _buffer.size());
215
63.3k
    RETURN_IF_ERROR(_packed_file_manager->append_small_file(_file_path, data_slice, _append_info));
216
63.3k
    _buffer.clear();
217
63.3k
    return Status::OK();
218
63.3k
}
219
220
1.00k
Status PackedFileWriter::get_packed_slice_location(PackedSliceLocation* location) const {
221
1.00k
    DCHECK(_state == State::CLOSED)
222
0
            << " file_path: " << _file_path << " bytes_appended: " << _bytes_appended;
223
1.00k
    if (_is_direct_write) {
224
0
        *location = PackedSliceLocation {};
225
0
        return Status::OK();
226
0
    }
227
1.00k
    RETURN_IF_ERROR(_packed_file_manager->get_packed_slice_location(_file_path, location));
228
1.00k
    LOG(INFO) << "get_packed_slice_location: " << _file_path
229
1.00k
              << " packed_path: " << location->packed_file_path << " " << location->offset << " "
230
1.00k
              << location->size;
231
1.00k
    return Status::OK();
232
1.00k
}
233
} // namespace doris::io