Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/runtime/small_file_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "runtime/small_file_mgr.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <absl/strings/str_split.h>
22
#include <errno.h> // IWYU pragma: keep
23
#include <gen_cpp/HeartbeatService_types.h>
24
#include <gen_cpp/Types_types.h>
25
#include <glog/logging.h>
26
#include <stdint.h>
27
#include <stdio.h>
28
29
#include <cstring>
30
#include <memory>
31
#include <sstream>
32
#include <utility>
33
#include <vector>
34
35
#include "common/metrics/doris_metrics.h"
36
#include "common/metrics/metrics.h"
37
#include "common/status.h"
38
#include "io/fs/file_system.h"
39
#include "io/fs/local_file_system.h"
40
#include "runtime/exec_env.h"
41
#include "service/http/http_client.h"
42
#include "util/md5.h"
43
#include "util/string_util.h"
44
45
namespace doris {
46
47
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(small_file_cache_count, MetricUnit::NOUNIT);
48
49
SmallFileMgr::SmallFileMgr(ExecEnv* env, const std::string& local_path)
50
0
        : _exec_env(env), _local_path(local_path) {
51
0
    REGISTER_HOOK_METRIC(small_file_cache_count, [this]() {
52
        // std::lock_guard<std::mutex> l(_lock);
53
0
        return _file_cache.size();
54
0
    });
55
0
}
56
57
0
SmallFileMgr::~SmallFileMgr() {
58
0
    DEREGISTER_HOOK_METRIC(small_file_cache_count);
59
0
}
60
61
0
Status SmallFileMgr::init() {
62
0
    RETURN_IF_ERROR(_load_local_files());
63
0
    return Status::OK();
64
0
}
65
66
0
Status SmallFileMgr::_load_local_files() {
67
0
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_local_path));
68
69
0
    auto scan_cb = [this](const io::FileInfo& file) {
70
0
        if (!file.is_file) {
71
0
            return true;
72
0
        }
73
0
        auto st = _load_single_file(_local_path, file.file_name);
74
0
        if (!st.ok()) {
75
0
            LOG(WARNING) << "load small file failed: " << st;
76
0
        }
77
0
        return true;
78
0
    };
79
80
0
    RETURN_IF_ERROR(io::global_local_filesystem()->iterate_directory(_local_path, scan_cb));
81
0
    return Status::OK();
82
0
}
83
84
0
Status SmallFileMgr::_load_single_file(const std::string& path, const std::string& file_name) {
85
    // file name format should be like:
86
    // file_id.md5
87
0
    std::vector<std::string> parts = absl::StrSplit(file_name, ".");
88
0
    if (parts.size() != 2) {
89
0
        return Status::InternalError("Not a valid file name: {}", file_name);
90
0
    }
91
0
    int64_t file_id = std::stol(parts[0]);
92
0
    std::string md5 = parts[1];
93
94
0
    if (_file_cache.find(file_id) != _file_cache.end()) {
95
0
        return Status::InternalError("File with same id is already been loaded: {}", file_id);
96
0
    }
97
98
0
    std::string file_md5;
99
0
    RETURN_IF_ERROR(io::global_local_filesystem()->md5sum(path + "/" + file_name, &file_md5));
100
0
    if (file_md5 != md5) {
101
0
        return Status::InternalError("Invalid md5 of file: {}", file_name);
102
0
    }
103
104
0
    CacheEntry entry;
105
0
    entry.path = path + "/" + file_name;
106
0
    entry.md5 = file_md5;
107
108
0
    _file_cache.emplace(file_id, entry);
109
0
    return Status::OK();
110
0
}
111
112
0
Status SmallFileMgr::get_file(int64_t file_id, const std::string& md5, std::string* file_path) {
113
0
    std::unique_lock<std::mutex> l(_lock);
114
    // find in cache
115
0
    auto it = _file_cache.find(file_id);
116
0
    if (it != _file_cache.end()) {
117
        // find the cached file, check it
118
0
        CacheEntry& entry = it->second;
119
0
        Status st = _check_file(entry, md5);
120
0
        if (!st.ok()) {
121
            // check file failed, we should remove this cache and download it from FE again
122
0
            if (remove(entry.path.c_str()) != 0) {
123
0
                return Status::InternalError("failed to remove file: {}, err: {}", file_id,
124
0
                                             std::strerror(errno));
125
0
            }
126
0
            _file_cache.erase(it);
127
0
        } else {
128
            // check ok, return the path
129
0
            *file_path = entry.path;
130
0
            return Status::OK();
131
0
        }
132
0
    }
133
134
    // file not found in cache. download it from FE
135
0
    RETURN_IF_ERROR(_download_file(file_id, md5, file_path));
136
137
0
    return Status::OK();
138
0
}
139
140
0
Status SmallFileMgr::_check_file(const CacheEntry& entry, const std::string& md5) {
141
0
    bool exists;
142
0
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(entry.path, &exists));
143
0
    if (!exists) {
144
0
        return Status::InternalError("file not exist: {}", entry.path);
145
0
    }
146
0
    if (!iequal(md5, entry.md5)) {
147
0
        return Status::InternalError("invalid MD5 of file: {}", entry.path);
148
0
    }
149
0
    return Status::OK();
150
0
}
151
152
Status SmallFileMgr::_download_file(int64_t file_id, const std::string& md5,
153
0
                                    std::string* file_path) {
154
0
    std::stringstream ss;
155
0
    ss << _local_path << "/" << file_id << ".tmp";
156
0
    std::string tmp_file = ss.str();
157
0
    bool should_delete = true;
158
0
    auto fp_closer = [&tmp_file, &should_delete](FILE* fp) {
159
0
        fclose(fp);
160
0
        if (should_delete) remove(tmp_file.c_str());
161
0
    };
162
163
0
    std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), "w"), fp_closer);
164
0
    if (fp == nullptr) {
165
0
        LOG(WARNING) << "fail to open file, file=" << tmp_file;
166
0
        return Status::InternalError("fail to open file");
167
0
    }
168
169
0
    HttpClient client;
170
171
0
    std::stringstream url_ss;
172
0
    ClusterInfo* cluster_info = _exec_env->cluster_info();
173
0
    url_ss << cluster_info->master_fe_addr.hostname << ":" << cluster_info->master_fe_http_port
174
0
           << "/api/get_small_file?"
175
0
           << "file_id=" << file_id << "&token=" << cluster_info->token;
176
177
0
    std::string url = url_ss.str();
178
179
0
    LOG(INFO) << "download file from: " << url;
180
181
0
    RETURN_IF_ERROR(client.init(url));
182
0
    Status status;
183
0
    Md5Digest digest;
184
0
    auto download_cb = [&status, &tmp_file, &fp, &digest](const void* data, size_t length) {
185
0
        digest.update(data, length);
186
0
        auto res = fwrite(data, length, 1, fp.get());
187
0
        if (res != 1) {
188
0
            LOG(WARNING) << "fail to write data to file, file=" << tmp_file
189
0
                         << ", error=" << ferror(fp.get());
190
0
            status = Status::InternalError("fail to write data when download");
191
0
            return false;
192
0
        }
193
0
        return true;
194
0
    };
195
0
    RETURN_IF_ERROR(client.execute(download_cb));
196
0
    RETURN_IF_ERROR(status);
197
0
    digest.digest();
198
199
0
    if (!iequal(digest.hex(), md5)) {
200
0
        LOG(WARNING) << "file's checksum is not equal, download: " << digest.hex()
201
0
                     << ", expected: " << md5 << ", file: " << file_id;
202
0
        return Status::InternalError("download with invalid md5");
203
0
    }
204
205
    // close this file
206
0
    should_delete = false;
207
0
    fp.reset();
208
209
    // rename temporary file to library file
210
0
    std::stringstream real_ss;
211
0
    real_ss << _local_path << "/" << file_id << "." << md5;
212
0
    std::string real_file_path = real_ss.str();
213
0
    auto ret = rename(tmp_file.c_str(), real_file_path.c_str());
214
0
    if (ret != 0) {
215
0
        char buf[64];
216
0
        LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << real_file_path
217
0
                     << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 64);
218
0
        remove(tmp_file.c_str());
219
0
        remove(real_file_path.c_str());
220
0
        return Status::InternalError("fail to rename file");
221
0
    }
222
223
    // add to file cache
224
0
    CacheEntry entry;
225
0
    entry.path = real_file_path;
226
0
    entry.md5 = md5;
227
0
    _file_cache.emplace(file_id, entry);
228
229
0
    *file_path = real_file_path;
230
231
    LOG(INFO) << "finished to download file: " << file_path;
232
0
    return Status::OK();
233
0
}
234
235
} // end namespace doris