Coverage Report

Created: 2026-03-12 17:15

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/hdfs_file_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include "common/status.h"
21
#include "io/fs/file_writer.h"
22
#include "io/fs/hdfs.h"
23
#include "io/fs/path.h"
24
25
namespace doris {
26
struct Slice;
27
namespace io {
28
29
class HdfsHandler;
30
class BlockFileCache;
31
struct FileCacheAllocatorBuilder;
32
struct AsyncCloseStatusPack;
33
34
class HdfsFileWriter final : public FileWriter {
35
public:
36
    // Accepted path format:
37
    // - fs_name/path_to_file
38
    // - /path_to_file
39
    // TODO(plat1ko): Support related path for cloud mode
40
    static Result<FileWriterPtr> create(Path path, std::shared_ptr<HdfsHandler> handler,
41
                                        const std::string& fs_name,
42
                                        const FileWriterOptions* opts = nullptr);
43
44
    HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file,
45
                   std::string fs_name, const FileWriterOptions* opts = nullptr);
46
    ~HdfsFileWriter() override;
47
48
    Status appendv(const Slice* data, size_t data_cnt) override;
49
356
    const Path& path() const override { return _path; }
50
2.88k
    size_t bytes_appended() const override { return _bytes_appended; }
51
5.56k
    State state() const override { return _state; }
52
53
    Status close(bool non_block = false) override;
54
55
private:
56
    Status _close_impl();
57
    // Flush buffered data into HDFS client and write local file cache if enabled
58
    // **Notice**: this would clear the underlying buffer
59
    Status _flush_buffer();
60
    Status append_hdfs_file(std::string_view content);
61
    void _write_into_local_file_cache();
62
    Status _append(std::string_view content);
63
    void _flush_and_reset_approximate_jni_buffer_size();
64
    Status _acquire_jni_memory(size_t size);
65
66
    Path _path;
67
    std::shared_ptr<HdfsHandler> _hdfs_handler = nullptr;
68
    hdfsFile _hdfs_file = nullptr;
69
    std::string _fs_name;
70
    size_t _bytes_appended = 0;
71
    bool _sync_file_data;
72
    class BatchBuffer {
73
    public:
74
        BatchBuffer(size_t capacity);
75
        size_t append(std::string_view content);
76
        bool full() const;
77
        const char* data() const;
78
        size_t capacity() const;
79
        size_t size() const;
80
        void clear();
81
        std::string_view content() const;
82
83
    private:
84
        std::string _batch_buffer;
85
    };
86
    BatchBuffer _batch_buffer;
87
    size_t _approximate_jni_buffer_size = 0;
88
    std::unique_ptr<AsyncCloseStatusPack> _async_close_pack;
89
    // We should make sure that close_impl's return value is consistent
90
    // So we need add one field to restore the value first time return by calling close_impl
91
    Status _st;
92
    State _state {State::OPENED};
93
};
94
95
} // namespace io
96
} // namespace doris