Coverage Report

Created: 2026-03-12 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_writer.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "io/fs/hdfs_file_writer.h"
19
20
#include <fcntl.h>
21
#include <fmt/core.h>
22
23
#include <chrono>
24
#include <filesystem>
25
#include <ostream>
26
#include <random>
27
#include <string>
28
#include <thread>
29
#include <utility>
30
31
#include "common/config.h"
32
#include "common/logging.h"
33
#include "common/status.h"
34
#include "cpp/sync_point.h"
35
#include "io/cache/block_file_cache.h"
36
#include "io/cache/block_file_cache_factory.h"
37
#include "io/cache/file_cache_common.h"
38
#include "io/fs/err_utils.h"
39
#include "io/fs/file_writer.h"
40
#include "io/fs/hdfs_file_system.h"
41
#include "io/hdfs_util.h"
42
#include "runtime/exec_env.h"
43
#include "service/backend_options.h"
44
#include "util/bvar_helper.h"
45
#include "util/jni-util.h"
46
47
namespace doris::io {
48
#include "common/compile_check_begin.h"
49
50
bvar::Adder<uint64_t> hdfs_file_writer_total("hdfs_file_writer_total_num");
51
bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written");
52
bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
53
bvar::Adder<uint64_t> inflight_hdfs_file_writer("inflight_hdfs_file_writer");
54
bvar::Adder<uint64_t> hdfs_file_writer_async_close_queuing("hdfs_file_writer_async_close_queuing");
55
bvar::Adder<uint64_t> hdfs_file_writer_async_close_processing(
56
        "hdfs_file_writer_async_close_processing");
57
58
static constexpr size_t MB = 1024 * 1024;
59
#ifndef USE_LIBHDFS3
60
static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
61
#endif
62
63
0
inline std::default_random_engine make_random_engine() {
64
0
    return std::default_random_engine(
65
0
            static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
66
0
}
67
68
// In practice, we've found that if the import frequency to HDFS is too fast,
69
// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
70
// For this, we should have a method to monitor how much JVM memory is currently being used.
71
// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS.
72
// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
73
// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller.
74
// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
75
// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure.
76
// The caller could do sleep to wait for free memory.
77
class HdfsWriteMemUsageRecorder {
78
public:
79
9
    HdfsWriteMemUsageRecorder() = default;
80
0
    ~HdfsWriteMemUsageRecorder() = default;
81
0
    size_t max_usage() const {
82
0
        return static_cast<size_t>(static_cast<double>(max_jvm_heap_size()) *
83
0
                                   config::max_hdfs_wirter_jni_heap_usage_ratio);
84
0
    }
85
0
    Status acquire_memory(size_t memory_size, int try_time) {
86
#if defined(USE_LIBHDFS3) || defined(BE_TEST)
87
        return Status::OK();
88
#else
89
0
        if (!config::enable_hdfs_mem_limiter) {
90
0
            return Status::OK();
91
0
        }
92
0
        auto unit = config::hdfs_jni_write_sleep_milliseconds;
93
0
        std::default_random_engine rng = make_random_engine();
94
0
        std::uniform_int_distribution<int64_t> u(unit, 2 * unit);
95
0
        std::uniform_int_distribution<int64_t> u2(2 * unit, 4 * unit);
96
0
        auto duration_ms =
97
0
                try_time < (config::hdfs_jni_write_max_retry_time / 2) ? u(rng) : u2(rng);
98
0
        std::unique_lock lck {cur_memory_latch};
99
0
        cv.wait_for(lck, std::chrono::milliseconds(duration_ms),
100
0
                    [&]() { return cur_memory_comsuption + memory_size <= max_usage(); });
101
0
        if (cur_memory_comsuption + memory_size > max_usage()) {
102
0
            lck.unlock();
103
0
            return Status::InternalError<false>(
104
0
                    "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, "
105
0
                    "ratio is {}",
106
0
                    max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio);
107
0
        }
108
0
        cur_memory_comsuption += memory_size;
109
0
        return Status::OK();
110
0
#endif
111
0
    }
112
113
0
    void release_memory(size_t memory_size) {
114
#if defined(USE_LIBHDFS3) || defined(BE_TEST)
115
#else
116
0
        if (!config::enable_hdfs_mem_limiter) {
117
0
            return;
118
0
        }
119
0
        std::unique_lock lck {cur_memory_latch};
120
0
        size_t origin_size = cur_memory_comsuption;
121
0
        cur_memory_comsuption -= memory_size;
122
0
        if (cur_memory_comsuption < max_usage() && origin_size > max_usage()) {
123
0
            cv.notify_all();
124
0
        }
125
0
#endif
126
0
    }
127
128
private:
129
    // clang-format off
130
0
    size_t max_jvm_heap_size() const {
131
0
        return Jni::Util::get_max_jni_heap_memory_size();
132
0
    }
133
    // clang-format on
134
    [[maybe_unused]] std::size_t cur_memory_comsuption {0};
135
    std::mutex cur_memory_latch;
136
    std::condition_variable cv;
137
};
138
139
static HdfsWriteMemUsageRecorder g_hdfs_write_rate_limiter;
140
141
HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file,
142
                               std::string fs_name, const FileWriterOptions* opts)
143
1
        : _path(std::move(path)),
144
1
          _hdfs_handler(std::move(handler)),
145
1
          _hdfs_file(hdfs_file),
146
1
          _fs_name(std::move(fs_name)),
147
1
          _sync_file_data(opts ? opts->sync_file_data : true),
148
1
          _batch_buffer(MB * config::hdfs_write_batch_buffer_size_mb) {
149
1
    init_cache_builder(opts, _path);
150
1
    hdfs_file_writer_total << 1;
151
152
1
    TEST_SYNC_POINT("HdfsFileWriter");
153
1
}
154
155
1
HdfsFileWriter::~HdfsFileWriter() {
156
1
    if (_async_close_pack != nullptr) {
157
        // For thread safety
158
0
        std::ignore = _async_close_pack->future.get();
159
0
        _async_close_pack = nullptr;
160
0
    }
161
1
    if (_hdfs_file) {
162
0
        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
163
0
        hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
164
0
        inflight_hdfs_file_writer << -1;
165
0
        _flush_and_reset_approximate_jni_buffer_size();
166
0
    }
167
1
}
168
169
2
void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() {
170
2
    g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size);
171
2
    _approximate_jni_buffer_size = 0;
172
2
}
173
174
5
Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
175
#ifdef USE_LIBHDFS3
176
    return Status::OK();
177
#else
178
5
    size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size);
179
5
    int try_time = 0;
180
5
    if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time); !st.ok()) {
181
0
        if (_approximate_jni_buffer_size > 0) {
182
0
            int ret;
183
0
            {
184
0
                SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hflush_latency);
185
0
                ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHFlush(_hdfs_handler->hdfs_fs, _hdfs_file),
186
0
                                                   "HdfsFileWriter::close::hdfsHFlush");
187
0
            }
188
0
            _flush_and_reset_approximate_jni_buffer_size();
189
0
            if (ret != 0) {
190
0
                return Status::InternalError(
191
0
                        "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, "
192
0
                        "file_size={}",
193
0
                        BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(),
194
0
                        bytes_appended());
195
0
            }
196
0
        }
197
        // Other hdfs writers might have occupied too much memory, we need to sleep for a while to wait for them
198
        // releasing their memory
199
0
        for (; try_time < config::hdfs_jni_write_max_retry_time; try_time++) {
200
0
            if (g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time).ok()) {
201
0
                _approximate_jni_buffer_size += actual_size;
202
0
                return Status::OK();
203
0
            }
204
0
        }
205
0
        return st;
206
0
    }
207
208
5
    _approximate_jni_buffer_size += actual_size;
209
5
    return Status::OK();
210
5
#endif
211
5
}
212
213
1
Status HdfsFileWriter::close(bool non_block) {
214
1
    if (state() == State::CLOSED) {
215
0
        return Status::InternalError("HdfsFileWriter already closed, file path {}, fs name {}",
216
0
                                     _path.native(), _fs_name);
217
0
    }
218
1
    if (state() == State::ASYNC_CLOSING) {
219
0
        if (non_block) {
220
0
            return Status::InternalError("Don't submit async close multi times");
221
0
        }
222
0
        CHECK(_async_close_pack != nullptr);
223
0
        _st = _async_close_pack->future.get();
224
0
        _async_close_pack = nullptr;
225
        // We should wait for all the pre async task to be finished
226
0
        _state = State::CLOSED;
227
        // The next time we call close() with no matter non_block true or false, it would always return the
228
        // '_st' value because this writer is already closed.
229
0
        return _st;
230
0
    }
231
1
    if (non_block) {
232
0
        _state = State::ASYNC_CLOSING;
233
0
        _async_close_pack = std::make_unique<AsyncCloseStatusPack>();
234
0
        _async_close_pack->future = _async_close_pack->promise.get_future();
235
0
        hdfs_file_writer_async_close_queuing << 1;
236
0
        return ExecEnv::GetInstance()->non_block_close_thread_pool()->submit_func([&]() {
237
0
            hdfs_file_writer_async_close_queuing << -1;
238
0
            hdfs_file_writer_async_close_processing << 1;
239
0
            _async_close_pack->promise.set_value(_close_impl());
240
0
            hdfs_file_writer_async_close_processing << -1;
241
0
        });
242
0
    }
243
1
    _st = _close_impl();
244
1
    _state = State::CLOSED;
245
1
    return _st;
246
1
}
247
248
0
Status HdfsFileWriter::_close_impl() {
249
0
    if (_batch_buffer.size() != 0) {
250
0
        if (_st = _flush_buffer(); !_st.ok()) {
251
0
            return _st;
252
0
        }
253
0
    }
254
0
    int ret;
255
0
    if (_sync_file_data) {
256
0
        {
257
0
            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hsync_latency);
258
#ifdef USE_LIBHDFS3
259
            ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file),
260
                                               "HdfsFileWriter::close::hdfsHSync");
261
#else
262
0
            ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file),
263
0
                                               "HdfsFileWriter::close::hdfsHSync");
264
0
            _flush_and_reset_approximate_jni_buffer_size();
265
0
#endif
266
0
        }
267
0
        TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsSync",
268
0
                                               Status::InternalError("failed to sync hdfs file"));
269
270
0
        if (ret != 0) {
271
0
            _st = Status::InternalError(
272
0
                    "failed to sync hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name,
273
0
                    _path.native(), hdfs_error(), bytes_appended());
274
0
            return _st;
275
0
        }
276
0
    }
277
278
0
    {
279
0
        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
280
        // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for
281
        // the HDFS response, but won't guarantee the synchronization of data to HDFS.
282
0
        ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file),
283
0
                                           "HdfsFileWriter::close::hdfsCloseFile");
284
0
        inflight_hdfs_file_writer << -1;
285
0
        _flush_and_reset_approximate_jni_buffer_size();
286
0
    }
287
0
    _hdfs_file = nullptr;
288
0
    TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsCloseFile",
289
0
                                           Status::InternalError("failed to close hdfs file"));
290
0
    if (ret != 0) {
291
0
        _st = Status::InternalError(
292
0
                "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, file_size={}",
293
0
                BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(),
294
0
                bytes_appended());
295
0
        return _st;
296
0
    }
297
0
    hdfs_file_created_total << 1;
298
0
    return Status::OK();
299
0
}
300
301
1
HdfsFileWriter::BatchBuffer::BatchBuffer(size_t capacity) {
302
1
    _batch_buffer.reserve(capacity);
303
1
}
304
305
2.41k
bool HdfsFileWriter::BatchBuffer::full() const {
306
2.41k
    return size() == capacity();
307
2.41k
}
308
309
0
const char* HdfsFileWriter::BatchBuffer::data() const {
310
0
    return _batch_buffer.data();
311
0
}
312
313
3.61k
size_t HdfsFileWriter::BatchBuffer::capacity() const {
314
3.61k
    return _batch_buffer.capacity();
315
3.61k
}
316
317
3.61k
size_t HdfsFileWriter::BatchBuffer::size() const {
318
3.61k
    return _batch_buffer.size();
319
3.61k
}
320
321
5
void HdfsFileWriter::BatchBuffer::clear() {
322
5
    _batch_buffer.clear();
323
5
}
324
325
// TODO(ByteYue): Refactor Upload Buffer to reduce this duplicate code
326
0
void HdfsFileWriter::_write_into_local_file_cache() {
327
0
    int64_t tablet_id = get_tablet_id(_path.native()).value_or(0);
328
0
    auto holder = _cache_builder->allocate_cache_holder(_bytes_appended - _batch_buffer.size(),
329
0
                                                        _batch_buffer.capacity(), tablet_id);
330
0
    size_t pos = 0;
331
0
    size_t data_remain_size = _batch_buffer.size();
332
0
    for (auto& block : holder->file_blocks) {
333
0
        if (data_remain_size == 0) {
334
0
            break;
335
0
        }
336
0
        size_t block_size = block->range().size();
337
0
        size_t append_size = std::min(data_remain_size, block_size);
338
0
        if (block->state() == FileBlock::State::EMPTY) {
339
0
            block->get_or_set_downloader();
340
0
            if (block->is_downloader()) {
341
0
                Slice s(_batch_buffer.data() + pos, append_size);
342
0
                Status st = block->append(s);
343
0
                if (st.ok()) {
344
0
                    st = block->finalize();
345
0
                }
346
0
                if (!st.ok()) {
347
0
                    LOG_WARNING("failed to append data to file cache").error(st);
348
0
                }
349
0
            }
350
0
        }
351
0
        data_remain_size -= append_size;
352
0
        pos += append_size;
353
0
    }
354
0
}
355
356
0
Status HdfsFileWriter::append_hdfs_file(std::string_view content) {
357
0
    RETURN_IF_ERROR(_acquire_jni_memory(content.size()));
358
0
    while (!content.empty()) {
359
0
        int64_t written_bytes;
360
0
        {
361
0
            TEST_INJECTION_POINT_CALLBACK("HdfsFileWriter::append_hdfs_file_delay");
362
0
            SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_write_latency);
363
0
            int64_t max_to_write = content.size();
364
0
            tSize to_write = static_cast<tSize>(std::min(
365
0
                    max_to_write, static_cast<int64_t>(std::numeric_limits<tSize>::max())));
366
0
            written_bytes = SYNC_POINT_HOOK_RETURN_VALUE(
367
0
                    hdfsWrite(_hdfs_handler->hdfs_fs, _hdfs_file, content.data(), to_write),
368
0
                    "HdfsFileWriter::append_hdfs_file::hdfsWrite", content);
369
0
            {
370
0
                TEST_INJECTION_POINT_RETURN_WITH_VALUE(
371
0
                        "HdfsFileWriter::append_hdfs_file_error",
372
0
                        Status::InternalError(
373
0
                                "write hdfs failed. fs_name: {}, path: {}, error: inject error",
374
0
                                _fs_name, _path.native()));
375
0
            }
376
0
        }
377
0
        if (written_bytes < 0) {
378
0
            return Status::InternalError(
379
0
                    "write hdfs failed. fs_name: {}, path: {}, error: {}, file_size={}", _fs_name,
380
0
                    _path.native(), hdfs_error(), bytes_appended());
381
0
        }
382
0
        hdfs_bytes_written_total << written_bytes;
383
0
        content.remove_prefix(written_bytes);
384
0
    }
385
0
    return Status::OK();
386
0
}
387
388
5
Status HdfsFileWriter::_flush_buffer() {
389
5
    RETURN_IF_ERROR(append_hdfs_file(_batch_buffer.content()));
390
5
    if (_cache_builder != nullptr) {
391
0
        _write_into_local_file_cache();
392
0
    }
393
5
    _batch_buffer.clear();
394
5
    return Status::OK();
395
5
}
396
397
1.20k
size_t HdfsFileWriter::BatchBuffer::append(std::string_view content) {
398
1.20k
    size_t append_size = std::min(capacity() - size(), content.size());
399
1.20k
    _batch_buffer.append(content.data(), append_size);
400
1.20k
    return append_size;
401
1.20k
}
402
403
5
std::string_view HdfsFileWriter::BatchBuffer::content() const {
404
5
    return _batch_buffer;
405
5
}
406
407
1.20k
Status HdfsFileWriter::_append(std::string_view content) {
408
2.40k
    while (!content.empty()) {
409
1.20k
        if (_batch_buffer.full()) {
410
0
            auto error_msg = fmt::format("invalid batch buffer status, capacity {}, size {}",
411
0
                                         _batch_buffer.capacity(), _batch_buffer.size());
412
0
            return Status::InternalError(error_msg);
413
0
        }
414
1.20k
        size_t append_size = _batch_buffer.append(content);
415
1.20k
        content.remove_prefix(append_size);
416
1.20k
        _bytes_appended += append_size;
417
1.20k
        if (_batch_buffer.full()) {
418
4
            RETURN_IF_ERROR(_flush_buffer());
419
4
        }
420
1.20k
    }
421
1.20k
    return Status::OK();
422
1.20k
}
423
424
1.20k
Status HdfsFileWriter::appendv(const Slice* data, size_t data_cnt) {
425
1.20k
    if (_state != State::OPENED) [[unlikely]] {
426
0
        return Status::InternalError("append to closed file: {}", _path.native());
427
0
    }
428
429
2.40k
    for (size_t i = 0; i < data_cnt; i++) {
430
1.20k
        RETURN_IF_ERROR(_append({data[i].get_data(), data[i].get_size()}));
431
1.20k
    }
432
1.20k
    return Status::OK();
433
1.20k
}
434
435
Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<HdfsHandler> handler,
436
                                             const std::string& fs_name,
437
0
                                             const FileWriterOptions* opts) {
438
0
    auto path = convert_path(full_path, fs_name);
439
#ifdef USE_LIBHDFS3
440
    std::string hdfs_dir = path.parent_path().string();
441
    int exists = hdfsExists(handler->hdfs_fs, hdfs_dir.c_str());
442
    if (exists != 0) {
443
        VLOG_NOTICE << "hdfs dir doesn't exist, create it: " << hdfs_dir;
444
        int ret = hdfsCreateDirectory(handler->hdfs_fs, hdfs_dir.c_str());
445
        if (ret != 0) {
446
            std::stringstream ss;
447
            ss << "create dir failed. "
448
               << " fs_name: " << fs_name << " path: " << hdfs_dir << ", err: " << hdfs_error();
449
            LOG(WARNING) << ss.str();
450
            return ResultError(Status::InternalError(ss.str()));
451
        }
452
    }
453
#endif
454
    // open file
455
456
0
    hdfsFile hdfs_file = nullptr;
457
0
    {
458
0
        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency);
459
0
        hdfs_file = hdfsOpenFile(handler->hdfs_fs, path.c_str(), O_WRONLY, 0, 0, 0);
460
0
    }
461
0
    if (hdfs_file == nullptr) {
462
0
        std::stringstream ss;
463
0
        ss << "open file failed. "
464
0
           << " fs_name:" << fs_name << " path:" << path << ", err: " << hdfs_error();
465
0
        LOG(WARNING) << ss.str();
466
0
        return ResultError(Status::InternalError(ss.str()));
467
0
    }
468
0
    VLOG_NOTICE << "open file. fs_name:" << fs_name << ", path:" << path;
469
0
    inflight_hdfs_file_writer << 1;
470
0
    return std::make_unique<HdfsFileWriter>(std::move(path), handler, hdfs_file, fs_name, opts);
471
0
}
472
#include "common/compile_check_end.h"
473
474
} // namespace doris::io