Coverage Report

Created: 2026-04-16 11:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/hdfs_file_writer.h"
19
20
#include <fcntl.h>
21
#include <fmt/core.h>
22
23
#include <chrono>
24
#include <filesystem>
25
#include <ostream>
26
#include <random>
27
#include <string>
28
#include <thread>
29
#include <utility>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "cpp/sync_point.h"
35
#include "io/cache/block_file_cache.h"
36
#include "io/cache/block_file_cache_factory.h"
37
#include "io/cache/file_cache_common.h"
38
#include "io/fs/err_utils.h"
39
#include "io/fs/file_writer.h"
40
#include "io/fs/hdfs_file_system.h"
41
#include "io/hdfs_util.h"
42
#include "runtime/exec_env.h"
43
#include "service/backend_options.h"
44
#include "util/bvar_helper.h"
45
#include "util/jni-util.h"
46
47
namespace doris::io {
48
49
bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num");
50
bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written");
51
bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
52
bvar::Adder<uint64_t> inflight_hdfs_file_writer("inflight_hdfs_file_writer");
53
bvar::Adder<uint64_t> hdfs_file_writer_async_close_queuing("hdfs_file_writer_async_close_queuing");
54
bvar::Adder<uint64_t> hdfs_file_writer_async_close_processing(
55
        "hdfs_file_writer_async_close_processing");
56
57
static constexpr size_t MB = 1024 * 1024;
58
#ifndef USE_LIBHDFS3
59
static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
60
#endif
61
62
2.74k
inline std::default_random_engine make_random_engine() {
63
2.74k
    return std::default_random_engine(
64
2.74k
            static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
65
2.74k
}
66
67
// In practice, we've found that if the import frequency to HDFS is too fast,
68
// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
69
// For this, we should have a method to monitor how much JVM memory is currently being used.
70
// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS.
71
// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
72
// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller.
73
// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
74
// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure.
75
// The caller could do sleep to wait for free memory.
76
class HdfsWriteMemUsageRecorder {
77
public:
78
9
    HdfsWriteMemUsageRecorder() = default;
79
0
    ~HdfsWriteMemUsageRecorder() = default;
80
16.1k
    size_t max_usage() const {
81
16.1k
        return static_cast<size_t>(static_cast<double>(max_jvm_heap_size()) *
82
16.1k
                                   config::max_hdfs_wirter_jni_heap_usage_ratio);
83
16.1k
    }
84
2.74k
    Status acquire_memory(size_t memory_size, int try_time) {
85
#if defined(USE_LIBHDFS3) || defined(BE_TEST)
86
        return Status::OK();
87
#else
88
2.74k
        if (!config::enable_hdfs_mem_limiter) {
89
0
            return Status::OK();
90
0
        }
91
2.74k
        auto unit = config::hdfs_jni_write_sleep_milliseconds;
92
2.74k
        std::default_random_engine rng = make_random_engine();
93
2.74k
        std::uniform_int_distribution<int64_t> u(unit, 2 * unit);
94
2.74k
        std::uniform_int_distribution<int64_t> u2(2 * unit, 4 * unit);
95
2.74k
        auto duration_ms =
96
2.74k
                try_time < (config::hdfs_jni_write_max_retry_time / 2) ? u(rng) : u2(rng);
97
2.74k
        std::unique_lock lck {cur_memory_latch};
98
2.74k
        cv.wait_for(lck, std::chrono::milliseconds(duration_ms),
99
2.74k
                    [&]() { return cur_memory_comsuption + memory_size <= max_usage(); });
100
2.74k
        if (cur_memory_comsuption + memory_size > max_usage()) {
101
0
            lck.unlock();
102
0
            return Status::InternalError<false>(
103
0
                    "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, "
104
0
                    "ratio is {}",
105
0
                    max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio);
106
0
        }
107
2.74k
        cur_memory_comsuption += memory_size;
108
2.74k
        return Status::OK();
109
2.74k
#endif
110
2.74k
    }
111
112
5.33k
    void release_memory(size_t memory_size) {
113
#if defined(USE_LIBHDFS3) || defined(BE_TEST)
114
#else
115
5.33k
        if (!config::enable_hdfs_mem_limiter) {
116
0
            return;
117
0
        }
118
5.33k
        std::unique_lock lck {cur_memory_latch};
119
5.33k
        size_t origin_size = cur_memory_comsuption;
120
5.33k
        cur_memory_comsuption -= memory_size;
121
5.33k
        if (cur_memory_comsuption < max_usage() && origin_size > max_usage()) {
122
0
            cv.notify_all();
123
0
        }
124
5.33k
#endif
125
5.33k
    }
126
127
private:
128
    // clang-format off
129
16.1k
    size_t max_jvm_heap_size() const {
130
16.1k
        return Jni::Util::get_max_jni_heap_memory_size();
131
16.1k
    }
132
    // clang-format on
133
    [[maybe_unused]] std::size_t cur_memory_comsuption {0};
134
    std::mutex cur_memory_latch;
135
    std::condition_variable cv;
136
};
137
138
static HdfsWriteMemUsageRecorder g_hdfs_write_rate_limiter;
139
140
HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file,
141
                               std::string fs_name, const FileWriterOptions* opts)
142
2.74k
        : _path(std::move(path)),
143
2.74k
          _hdfs_handler(std::move(handler)),
144
2.74k
          _hdfs_file(hdfs_file),
145
2.74k
          _fs_name(std::move(fs_name)),
146
2.74k
          _sync_file_data(opts ? opts->sync_file_data : true),
147
2.74k
          _batch_buffer(MB * config::hdfs_write_batch_buffer_size_mb) {
148
2.74k
    init_cache_builder(opts, _path);
149
2.74k
    hdfs_file_writer_total << 1;
150
151
2.74k
    TEST_SYNC_POINT("HdfsFileWriter");
152
2.74k
}
153
154
2.74k
HdfsFileWriter::~HdfsFileWriter() {
155
2.74k
    if (_async_close_pack != nullptr) {
156
        // For thread safety
157
0
        std::ignore = _async_close_pack->future.get();
158
0
        _async_close_pack = nullptr;
159
0
    }
160
2.74k
    if (_hdfs_file) {
161
0
        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
162
0
        hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
163
0
        inflight_hdfs_file_writer << -1;
164
0
        _flush_and_reset_approximate_jni_buffer_size();
165
0
    }
166
2.74k
}
167
168
5.34k
void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() {
169
5.34k
    g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size);
170
5.34k
    _approximate_jni_buffer_size = 0;
171
5.34k
}
172
173
2.75k
Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
174
#ifdef USE_LIBHDFS3
175
    return Status::OK();
176
#else
177
2.75k
    size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size);
178
2.75k
    int try_time = 0;
179
2.75k
    if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time); !st.ok()) {
180
0
        if (_approximate_jni_buffer_size > 0) {
181
0
            int ret;
182
0
            {
183
0
                SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hflush_latency);
184
0
                ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHFlush(_hdfs_handler->hdfs_fs, _hdfs_file),
185
0
                                                   "HdfsFileWriter::close::hdfsHFlush");
186
0
            }
187
0
            _flush_and_reset_approximate_jni_buffer_size();
188
0
            if (ret != 0) {
189
0
                return Status::InternalError(
190
0
                        "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, "
191
0
                        "file_size={}",
192
0
                        BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(),
193
0
                        bytes_appended());
194
0
            }
195
0
        }
196
        // Other hdfs writers might have occupied too much memory, we need to sleep for a while to wait for them
197
        // releasing their memory
198
0
        for (; try_time < config::hdfs_jni_write_max_retry_time; try_time++) {
199
0
            if (g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time).ok()) {
200
0
                _approximate_jni_buffer_size += actual_size;
201
0
                return Status::OK();
202
0
            }
203
0
        }
204
0
        return st;
205
0
    }
206
207
2.75k
    _approximate_jni_buffer_size += actual_size;
208
2.75k
    return Status::OK();
209
2.75k
#endif
210
2.75k
}
211
212
2.74k
Status HdfsFileWriter::close(bool non_block) {
213
2.74k
    if (state() == State::CLOSED) {
214
0
        return Status::InternalError("HdfsFileWriter already closed, file path {}, fs name {}",
215
0
                                     _path.native(), _fs_name);
216
0
    }
217
2.74k
    if (state() == State::ASYNC_CLOSING) {
218
0
        if (non_block) {
219
0
            return Status::InternalError("Don't submit async close multi times");
220
0
        }
221
0
        CHECK(_async_close_pack != nullptr);
222
0
        _st = _async_close_pack->future.get();
223
0
        _async_close_pack = nullptr;
224
        // We should wait for all the pre async task to be finished
225
0
        _state = State::CLOSED;
226
        // The next time we call close() with no matter non_block true or false, it would always return the
227
        // '_st' value because this writer is already closed.
228
0
        return _st;
229
0
    }
230
2.74k
    if (non_block) {
231
0
        _state = State::ASYNC_CLOSING;
232
0
        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
233
0
        _async_close_pack->future = _async_close_pack->promise.get_future();
234
0
        hdfs_file_writer_async_close_queuing << 1;
235
0
        return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
236
0
            hdfs_file_writer_async_close_queuing << -1;
237
0
            hdfs_file_writer_async_close_processing << 1;
238
0
            _async_close_pack->promise.set_value(_close_impl());
239
0
            hdfs_file_writer_async_close_processing << -1;
240
0
        });
241
0
    }
242
2.74k
    _st = _close_impl();
243
2.74k
    _state = State::CLOSED;
244
2.74k
    return _st;
245
2.74k
}
246
247
2.74k
Status HdfsFileWriter::_close_impl() {
248
2.74k
    if (_batch_buffer.size() != 0) {
249
2.74k
        if (_st = _flush_buffer(); !_st.ok()) {
250
0
            return _st;
251
0
        }
252
2.74k
    }
253
2.74k
    int ret;
254
2.74k
    if (_sync_file_data) {
255
2.59k
        {
256
2.59k
            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
257
#ifdef USE_LIBHDFS3
258
            ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file),
259
                                               "HdfsFileWriter::close::hdfsHSync");
260
#else
261
2.59k
            ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file),
262
2.59k
                                               "HdfsFileWriter::close::hdfsHSync");
263
2.59k
            _flush_and_reset_approximate_jni_buffer_size();
264
2.59k
#endif
265
2.59k
        }
266
2.59k
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsSync",
267
2.59k
                                               Status::InternalError("failed to sync hdfs file"));
268
269
2.59k
        if (ret != 0) {
270
0
            _st = Status::InternalError(
271
0
                    "failed to sync hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name,
272
0
                    _path.native(), hdfs_error(), bytes_appended());
273
0
            return _st;
274
0
        }
275
2.59k
    }
276
277
2.74k
    {
278
2.74k
        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
279
        // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for
280
        // the HDFS response, but won't guarantee the synchronization of data to HDFS.
281
2.74k
        ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file),
282
2.74k
                                           "HdfsFileWriter::close::hdfsCloseFile");
283
2.74k
        inflight_hdfs_file_writer << -1;
284
2.74k
        _flush_and_reset_approximate_jni_buffer_size();
285
2.74k
    }
286
2.74k
    _hdfs_file = nullptr;
287
2.74k
    TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile",
288
2.74k
                                           Status::InternalError("failed to close hdfs file"));
289
2.74k
    if (ret != 0) {
290
0
        _st = Status::InternalError(
291
0
                "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, file_size={}",
292
0
                BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(),
293
0
                bytes_appended());
294
0
        return _st;
295
0
    }
296
2.74k
    hdfs_file_created_total << 1;
297
2.74k
    return Status::OK();
298
2.74k
}
299
300
2.74k
HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) {
301
2.74k
    _batch_buffer.reserve(capacity);
302
2.74k
}
303
304
218k
bool HdfsFileWriter::BatchBuffer::full() const {
305
218k
    return size() == capacity();
306
218k
}
307
308
0
const char* HdfsFileWriter::BatchBuffer::data() const {
309
0
    return _batch_buffer.data();
310
0
}
311
312
327k
size_t HdfsFileWriter::BatchBuffer::capacity() const {
313
327k
    return _batch_buffer.capacity();
314
327k
}
315
316
330k
size_t HdfsFileWriter::BatchBuffer::size() const {
317
330k
    return _batch_buffer.size();
318
330k
}
319
320
2.75k
void HdfsFileWriter::BatchBuffer::clear() {
321
2.75k
    _batch_buffer.clear();
322
2.75k
}
323
324
// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
325
0
void HdfsFileWriter::_write_into_local_file_cache() {
326
0
    int64_t tablet_id = get_tablet_id(_path.native()).value_or(0);
327
0
    auto holder = _cache_builder->allocate_cache_holder(_bytes_appended - _batch_buffer.size(),
328
0
                                                        _batch_buffer.capacity(), tablet_id);
329
0
    size_t pos = 0;
330
0
    size_t data_remain_size = _batch_buffer.size();
331
0
    for (auto& block : holder->file_blocks) {
332
0
        if (data_remain_size == 0) {
333
0
            break;
334
0
        }
335
0
        size_t block_size = block->range().size();
336
0
        size_t append_size = std::min(data_remain_size, block_size);
337
0
        if (block->state() == FileBlock::State::EMPTY) {
338
0
            block->get_or_set_downloader();
339
0
            if (block->is_downloader()) {
340
0
                Slice s(_batch_buffer.data() + pos, append_size);
341
0
                Status st = block->append(s);
342
0
                if (st.ok()) {
343
0
                    st = block->finalize();
344
0
                }
345
0
                if (!st.ok()) {
346
0
                    LOG_WARNING("failed to append data to file cache").error(st);
347
0
                }
348
0
            }
349
0
        }
350
0
        data_remain_size -= append_size;
351
0
        pos += append_size;
352
0
    }
353
0
}
354
355
2.74k
Status HdfsFileWriter::append_hdfs_file(std::string_view content) {
356
2.74k
    RETURN_IF_ERROR(_acquire_jni_memory(content.size()));
357
5.49k
    while (!content.empty()) {
358
2.74k
        int64_t written_bytes;
359
2.74k
        {
360
2.74k
            TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay");
361
2.74k
            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
362
2.74k
            int64_t max_to_write = content.size();
363
2.74k
            tSize to_write = static_cast<tSize>(std::min(
364
2.74k
                    max_to_write, static_cast<int64_t>(std::numeric_limits<tSize>::max())));
365
2.74k
            written_bytes = SYNC_POINT_HOOK_RETURN_VALUE(
366
2.74k
                    hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), to_write),
367
2.74k
                    "HdfsFileWriter::append_hdfs_file::hdfsWrite", content);
368
2.74k
            {
369
2.74k
                TEST_INJECTION_POINT_RETURN_WITH_VALUE(
370
2.74k
                        "HdfsFileWriter::append_hdfs_file_error",
371
2.74k
                        Status::InternalError(
372
2.74k
                                "write hdfs failed. fs_name: {}, path: {}, error: inject error",
373
2.74k
                                _fs_name, _path.native()));
374
2.74k
            }
375
2.74k
        }
376
2.74k
        if (written_bytes < 0) {
377
0
            return Status::InternalError(
378
0
                    "write hdfs failed. fs_name: {}, path: {}, error: {}, file_size={}", _fs_name,
379
0
                    _path.native(), hdfs_error(), bytes_appended());
380
0
        }
381
2.74k
        hdfs_bytes_written_total << written_bytes;
382
2.74k
        content.remove_prefix(written_bytes);
383
2.74k
    }
384
2.74k
    return Status::OK();
385
2.74k
}
386
387
2.75k
Status HdfsFileWriter::_flush_buffer() {
388
2.75k
    RETURN_IF_ERROR(append_hdfs_file(_batch_buffer.content()));
389
2.75k
    if (_cache_builder != nullptr) {
390
0
        _write_into_local_file_cache();
391
0
    }
392
2.75k
    _batch_buffer.clear();
393
2.75k
    return Status::OK();
394
2.75k
}
395
396
109k
size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) {
397
109k
    size_t append_size = std::min(capacity() - size(), content.size());
398
109k
    _batch_buffer.append(content.data(), append_size);
399
109k
    return append_size;
400
109k
}
401
402
2.75k
std::string_view HdfsFileWriter::BatchBuffer::content() const {
403
2.75k
    return _batch_buffer;
404
2.75k
}
405
406
109k
Status HdfsFileWriter::_append(std::string_view content) {
407
218k
    while (!content.empty()) {
408
109k
        if (_batch_buffer.full()) {
409
0
            auto error_msg = fmt::format("invalid batch buffer status, capacity {}, size {}",
410
0
                                         _batch_buffer.capacity(), _batch_buffer.size());
411
0
            return Status::InternalError(error_msg);
412
0
        }
413
109k
        size_t append_size = _batch_buffer.append(content);
414
109k
        content.remove_prefix(append_size);
415
109k
        _bytes_appended += append_size;
416
109k
        if (_batch_buffer.full()) {
417
4
            RETURN_IF_ERROR(_flush_buffer());
418
4
        }
419
109k
    }
420
109k
    return Status::OK();
421
109k
}
422
423
109k
Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
424
109k
    if (_state != State::OPENED) [[unlikely]] {
425
0
        return Status::InternalError("append to closed file: {}", _path.native());
426
0
    }
427
428
218k
    for (size_t i = 0; i < data_cnt; i++) {
429
109k
        RETURN_IF_ERROR(_append({data[i].get_data(), data[i].get_size()}));
430
109k
    }
431
109k
    return Status::OK();
432
109k
}
433
434
Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler,
435
                                             const std::string& fs_name,
436
2.74k
                                             const FileWriterOptions* opts) {
437
2.74k
    auto path = convert_path(full_path, fs_name);
438
#ifdef USE_LIBHDFS3
439
    std::string hdfs_dir = path.parent_path().string();
440
    int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str());
441
    if (exists != 0) {
442
        VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir;
443
        int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str());
444
        if (ret != 0) {
445
            std::stringstream ss;
446
            ss << "create dir failed. "
447
               << " fs_name: " << fs_name << " path: " << hdfs_dir << ", err: " << hdfs_error();
448
            LOG(WARNING) << ss.str();
449
            return ResultError(Status::InternalError(ss.str()));
450
        }
451
    }
452
#endif
453
    // open file
454
455
2.74k
    hdfsFile hdfs_file = nullptr;
456
2.74k
    {
457
2.74k
        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency);
458
2.74k
        hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0);
459
2.74k
    }
460
2.74k
    if (hdfs_file == nullptr) {
461
0
        std::stringstream ss;
462
0
        ss << "open file failed. "
463
0
           << " fs_name:" << fs_name << " path:" << path << ", err: " << hdfs_error();
464
0
        LOG(WARNING) << ss.str();
465
0
        return ResultError(Status::InternalError(ss.str()));
466
0
    }
467
2.74k
    VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path;
468
2.74k
    inflight_hdfs_file_writer << 1;
469
2.74k
    return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name, opts);
470
2.74k
}
471
472
} // namespace doris::io