Coverage Report

Created: 2026-03-14 13:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/io/fs/file_writer.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <future>
21
#include <memory>
22
23
#include "common/status.h"
24
#include "io/cache/block_file_cache.h"
25
#include "io/cache/block_file_cache_factory.h"
26
#include "io/cache/file_cache_common.h"
27
#include "io/fs/file_reader_writer_fwd.h"
28
#include "io/fs/path.h"
29
#include "util/slice.h"
30
31
namespace doris::io {
32
class FileSystem;
33
struct FileCacheAllocatorBuilder;
34
struct EncryptionInfo;
35
36
// Only affects remote file writers
37
struct FileWriterOptions {
38
    // S3 committer will start multipart uploading all files on BE side,
39
    // and then complete multipart upload these files on FE side.
40
    // If you do not complete multi parts of a file, the file will not be visible.
41
    // So in this way, the atomicity of a single file can be guaranteed. But it still cannot
42
    // guarantee the atomicity of multiple files.
43
    // Because hive committers have best-effort semantics,
44
    // this shortens the inconsistent time window.
45
    bool used_by_s3_committer = false;
46
    bool write_file_cache = false;
47
    bool is_cold_data = false;
48
    bool sync_file_data = true;              // Whether flush data into storage system
49
    uint64_t file_cache_expiration_time = 0; // Relative time
50
    uint64_t approximate_bytes_to_write = 0; // Approximate bytes to write, used for file cache
51
};
52
53
struct AsyncCloseStatusPack {
54
    std::promise<Status> promise;
55
    std::future<Status> future;
56
};
57
58
class FileWriter {
59
public:
60
    enum class State : uint8_t {
61
        OPENED = 0,
62
        ASYNC_CLOSING,
63
        CLOSED,
64
    };
65
206k
    FileWriter() = default;
66
206k
    virtual ~FileWriter() = default;
67
68
    FileWriter(const FileWriter&) = delete;
69
    const FileWriter& operator=(const FileWriter&) = delete;
70
71
    // Normal close. Wait for all data to persist before returning.
72
    // If there is no data appended, an empty file will be persisted.
73
    virtual Status close(bool non_block = false) = 0;
74
75
171k
    Status append(const Slice& data) { return appendv(&data, 1); }
76
77
    virtual Status appendv(const Slice* data, size_t data_cnt) = 0;
78
79
    virtual const Path& path() const = 0;
80
81
    virtual size_t bytes_appended() const = 0;
82
83
    virtual State state() const = 0;
84
85
    // Returns true if this file's data was written to a packed file.
86
    // Used to determine whether to collect packed slice location from PackedFileManager.
87
0
    virtual bool is_in_packed_file() const { return false; }
88
89
5.36k
    FileCacheAllocatorBuilder* cache_builder() const {
90
5.36k
        return _cache_builder == nullptr ? nullptr : _cache_builder.get();
91
5.36k
    }
92
93
protected:
94
71.9k
    void init_cache_builder(const FileWriterOptions* opts, const Path& path) {
95
72.0k
        if (!config::enable_file_cache || opts == nullptr) {
96
1.60k
            return;
97
1.60k
        }
98
99
70.3k
        io::UInt128Wrapper path_hash = BlockFileCache::hash(path.filename().native());
100
70.3k
        BlockFileCache* file_cache_ptr = FileCacheFactory::instance()->get_by_path(path_hash);
101
102
70.3k
        bool has_enough_file_cache_space = config::enable_file_cache_adaptive_write &&
103
70.4k
                                           (opts->approximate_bytes_to_write > 0) &&
104
70.3k
                                           (file_cache_ptr->approximate_available_cache_size() >
105
6.33k
                                            opts->approximate_bytes_to_write);
106
107
70.3k
        VLOG_DEBUG << "path:" << path.filename().native()
108
194
                   << ", write_file_cache:" << opts->write_file_cache
109
194
                   << ", has_enough_file_cache_space:" << has_enough_file_cache_space
110
194
                   << ", approximate_bytes_to_write:" << opts->approximate_bytes_to_write
111
194
                   << ", file_cache_available_size:"
112
194
                   << file_cache_ptr->approximate_available_cache_size();
113
70.3k
        if (opts->write_file_cache || has_enough_file_cache_space) {
114
58.4k
            _cache_builder = std::make_unique<FileCacheAllocatorBuilder>(FileCacheAllocatorBuilder {
115
18.4E
                    opts ? opts->is_cold_data : false, opts ? opts->file_cache_expiration_time : 0,
116
58.4k
                    path_hash, file_cache_ptr});
117
58.4k
        }
118
70.3k
        return;
119
71.9k
    }
120
121
    std::unique_ptr<FileCacheAllocatorBuilder> _cache_builder =
122
            nullptr; // nullptr if disable write file cache
123
};
124
125
} // namespace doris::io