Coverage Report

Created: 2026-04-13 08:21

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/load_path_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/load_path_mgr.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <string.h>
25
#include <sys/stat.h>
26
27
#include <algorithm>
28
#include <boost/algorithm/string/join.hpp>
29
30
#include "common/cast_set.h"      // IWYU pragma: keep
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
// IWYU pragma: no_include <bits/chrono.h>
33
#include <chrono> // IWYU pragma: keep
34
#include <memory>
35
#include <ostream>
36
#include <string>
37
38
#include "common/config.h"
39
#include "io/fs/file_system.h"
40
#include "io/fs/local_file_system.h"
41
#include "runtime/exec_env.h"
42
#include "storage/olap_define.h"
43
#include "storage/options.h"
44
#include "util/thread.h"
45
46
namespace doris {
47
48
using namespace ErrorCode;
49
50
static const uint32_t MAX_SHARD_NUM = 1024;
51
static const std::string SHARD_PREFIX = "__shard_";
52
53
LoadPathMgr::LoadPathMgr(ExecEnv* exec_env)
54
8
        : _exec_env(exec_env),
55
8
          _idx(0),
56
8
          _next_shard(0),
57
8
          _error_path_next_shard(0),
58
8
          _stop_background_threads_latch(1) {}
59
60
4
void LoadPathMgr::stop() {
61
4
    _stop_background_threads_latch.count_down();
62
4
    if (_clean_thread) {
63
3
        _clean_thread->join();
64
3
    }
65
4
}
66
67
7
Status LoadPathMgr::init() {
68
7
    _path_vec.clear();
69
7
    LOG(INFO) << "Load path configured to [" << boost::join(_path_vec, ",") << "]";
70
71
    // error log is saved in first root path
72
7
    _error_log_dir = _exec_env->store_paths()[0].path + "/" + ERROR_LOG_PREFIX;
73
    // check and make dir
74
7
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_error_log_dir));
75
76
7
    _idx = 0;
77
7
    _reserved_hours = std::max<int64_t>(config::load_data_reserve_hours, 1L);
78
7
    RETURN_IF_ERROR(Thread::create(
79
7
            "LoadPathMgr", "clean_expired_temp_path",
80
7
            [this]() {
81
                // TODO(zc): add this thread to cgroup for control resource it use
82
7
                while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(3600))) {
83
7
                    this->clean();
84
7
                }
85
7
            },
86
7
            &_clean_thread));
87
7
    return Status::OK();
88
7
}
89
90
Status LoadPathMgr::allocate_dir(const std::string& db, const std::string& label,
91
20
                                 std::string* prefix, int64_t file_bytes) {
92
20
    Status status = _init_once.call([this] {
93
2
        for (auto& store_path : _exec_env->store_paths()) {
94
2
            _path_vec.push_back(store_path.path + "/" + MINI_PREFIX);
95
2
        }
96
2
        return Status::OK();
97
2
    });
98
20
    std::string path;
99
20
    auto size = _path_vec.size();
100
20
    auto retry = size;
101
20
    auto exceed_capacity_path_num = 0;
102
20
    size_t disk_capacity_bytes = 0;
103
20
    size_t available_bytes = 0;
104
21
    while (retry--) {
105
        // Call get_space_info to get disk space information.
106
        // If the call fails or the disk space is insufficient,
107
        // increment the count of paths exceeding capacity and proceed to the next loop iteration.
108
20
        std::string base_path = _path_vec[_idx].substr(0, _path_vec[_idx].find("/" + MINI_PREFIX));
109
20
        if (!io::global_local_filesystem()
110
20
                     ->get_space_info(base_path, &disk_capacity_bytes, &available_bytes)
111
20
                     .ok() ||
112
20
            !check_disk_space(disk_capacity_bytes, available_bytes, file_bytes)) {
113
1
            ++exceed_capacity_path_num;
114
1
            continue;
115
1
        }
116
        // add SHARD_PREFIX for compatible purpose
117
19
        std::lock_guard<std::mutex> l(_lock);
118
19
        std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ % MAX_SHARD_NUM);
119
19
        path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label;
120
19
        _idx = (_idx + 1) % size;
121
19
        status = io::global_local_filesystem()->create_directory(path);
122
19
        if (LIKELY(status.ok())) {
123
19
            *prefix = path;
124
19
            return Status::OK();
125
19
        }
126
19
    }
127
1
    if (exceed_capacity_path_num == size) {
128
1
        return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>("exceed capacity limit.");
129
1
    }
130
0
    return status;
131
1
}
132
133
bool LoadPathMgr::check_disk_space(size_t disk_capacity_bytes, size_t available_bytes,
134
22
                                   int64_t file_bytes) {
135
22
    bool is_available = false;
136
22
    int64_t remaining_bytes = available_bytes - file_bytes;
137
22
    double used_ratio =
138
22
            1.0 - cast_set<double>(remaining_bytes) / cast_set<double>(disk_capacity_bytes);
139
22
    is_available = !(used_ratio >= config::storage_flood_stage_usage_percent / 100.0 &&
140
22
                     remaining_bytes <= config::storage_flood_stage_left_capacity_bytes);
141
22
    if (!is_available) {
142
2
        LOG(WARNING) << "Exceed capacity limit. disk_capacity: " << disk_capacity_bytes
143
2
                     << ", available: " << available_bytes << ", file_bytes: " << file_bytes;
144
2
    }
145
22
    return is_available;
146
22
}
147
148
567
bool LoadPathMgr::is_too_old(time_t cur_time, const std::string& label_dir, int64_t reserve_hours) {
149
567
    struct stat dir_stat;
150
567
    if (stat(label_dir.c_str(), &dir_stat)) {
151
0
        char buf[64];
152
        // State failed, just information
153
0
        LOG(WARNING) << "stat directory failed.path=" << label_dir
154
0
                     << ",code=" << strerror_r(errno, buf, 64);
155
0
        return false;
156
0
    }
157
158
567
    if ((cur_time - dir_stat.st_mtime) < reserve_hours * 3600) {
159
567
        return false;
160
567
    }
161
162
0
    return true;
163
567
}
164
165
0
void LoadPathMgr::get_load_data_path(std::vector<std::string>* data_paths) {
166
0
    data_paths->insert(data_paths->end(), _path_vec.begin(), _path_vec.end());
167
0
    return;
168
0
}
169
170
const std::string ERROR_FILE_NAME = "error_log";
171
172
Status LoadPathMgr::get_load_error_file_name(const std::string& db, const std::string& label,
173
                                             const TUniqueId& fragment_instance_id,
174
573
                                             std::string* error_path) {
175
573
    std::stringstream ss;
176
573
    std::string shard = "";
177
573
    {
178
573
        std::lock_guard<std::mutex> l(_lock);
179
573
        shard = SHARD_PREFIX + std::to_string(_error_path_next_shard++ % MAX_SHARD_NUM);
180
573
    }
181
573
    std::string shard_path = _error_log_dir + "/" + shard;
182
    // check and create shard path
183
573
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(shard_path));
184
    // add shard sub dir to file path
185
573
    ss << shard << "/" << ERROR_FILE_NAME << "_" << db << "_" << label << "_" << std::hex
186
573
       << fragment_instance_id.hi << "_" << fragment_instance_id.lo;
187
573
    *error_path = ss.str();
188
573
    return Status::OK();
189
573
}
190
191
1.10k
std::string LoadPathMgr::get_load_error_absolute_path(const std::string& file_path) {
192
1.10k
    std::string path;
193
1.10k
    path.append(_error_log_dir);
194
1.10k
    path.append("/");
195
1.10k
    path.append(file_path);
196
1.10k
    return path;
197
1.10k
}
198
199
567
void LoadPathMgr::process_path(time_t now, const std::string& path, int64_t reserve_hours) {
200
567
    if (!is_too_old(now, path, reserve_hours)) {
201
567
        return;
202
567
    }
203
567
    LOG(INFO) << "Going to remove path. path=" << path;
204
0
    Status status = io::global_local_filesystem()->delete_directory_or_file(path);
205
0
    if (status.ok()) {
206
0
        LOG(INFO) << "Remove path success. path=" << path;
207
0
    } else {
208
0
        LOG(WARNING) << "Remove path failed. path=" << path << ", error=" << status;
209
0
    }
210
0
}
211
212
19
void LoadPathMgr::clean_files_in_path_vec(const std::string& path) {
213
    // Check if the path contains "/"+MINI_PREFIX. If not, return directly.
214
19
    if (path.find("/" + MINI_PREFIX) == std::string::npos) {
215
0
        return;
216
0
    }
217
218
19
    bool exists = false;
219
    // Check if the path exists
220
19
    Status status = io::global_local_filesystem()->exists(path, &exists);
221
19
    if (!status.ok()) {
222
0
        LOG(WARNING) << "Failed to check if path exists: " << path << ", error: " << status;
223
0
        return;
224
0
    }
225
19
    if (exists) {
226
        // If the path exists, delete the file or directory corresponding to that path
227
18
        status = io::global_local_filesystem()->delete_directory_or_file(path);
228
18
        if (status.ok()) {
229
18
            LOG(INFO) << "Delete path success: " << path;
230
18
        } else {
231
0
            LOG(WARNING) << "Delete path failed: " << path << ", error: " << status;
232
0
        }
233
18
    }
234
19
}
235
236
1
void LoadPathMgr::clean_one_path(const std::string& path) {
237
1
    bool exists = true;
238
1
    std::vector<io::FileInfo> dbs;
239
1
    Status st = io::global_local_filesystem()->list(path, false, &dbs, &exists);
240
1
    if (!st) {
241
0
        return;
242
0
    }
243
244
1
    Status status;
245
1
    time_t now = time(nullptr);
246
10
    for (auto& db : dbs) {
247
10
        if (db.is_file) {
248
0
            continue;
249
0
        }
250
10
        std::string db_dir = path + "/" + db.file_name;
251
10
        std::vector<io::FileInfo> sub_dirs;
252
10
        status = io::global_local_filesystem()->list(db_dir, false, &sub_dirs, &exists);
253
10
        if (!status.ok()) {
254
0
            LOG(WARNING) << "scan db of trash dir failed: " << status;
255
0
            continue;
256
0
        }
257
        // delete this file
258
18
        for (auto& sub_dir : sub_dirs) {
259
18
            if (sub_dir.is_file) {
260
0
                continue;
261
0
            }
262
18
            std::string sub_path = db_dir + "/" + sub_dir.file_name;
263
            // for compatible
264
18
            if (sub_dir.file_name.find(SHARD_PREFIX) == 0) {
265
                // sub_dir starts with SHARD_PREFIX
266
                // process shard sub dir
267
18
                std::vector<io::FileInfo> labels;
268
18
                status = io::global_local_filesystem()->list(sub_path, false, &labels, &exists);
269
18
                if (!status.ok()) {
270
0
                    LOG(WARNING) << "scan one path to delete directory failed: " << status;
271
0
                    continue;
272
0
                }
273
18
                for (auto& label : labels) {
274
0
                    std::string label_dir = sub_path + "/" + label.file_name;
275
0
                    process_path(now, label_dir, config::load_data_reserve_hours);
276
0
                }
277
18
            } else {
278
                // process label dir
279
0
                process_path(now, sub_path, config::load_data_reserve_hours);
280
0
            }
281
18
        }
282
10
    }
283
1
}
284
285
1
void LoadPathMgr::clean() {
286
1
    for (auto& path : _path_vec) {
287
1
        clean_one_path(path);
288
1
    }
289
1
    clean_error_log();
290
1
}
291
292
1
void LoadPathMgr::clean_error_log() {
293
1
    time_t now = time(nullptr);
294
1
    bool exists = true;
295
1
    std::vector<io::FileInfo> sub_dirs;
296
1
    {
297
1
        Status status =
298
1
                io::global_local_filesystem()->list(_error_log_dir, false, &sub_dirs, &exists);
299
1
        if (!status.ok()) {
300
0
            LOG(WARNING) << "scan error_log dir failed: " << status;
301
0
            return;
302
0
        }
303
1
    }
304
305
567
    for (auto& sub_dir : sub_dirs) {
306
567
        if (sub_dir.is_file) {
307
0
            continue;
308
0
        }
309
567
        std::string sub_path = _error_log_dir + "/" + sub_dir.file_name;
310
        // for compatible
311
567
        if (sub_dir.file_name.find(SHARD_PREFIX) == 0) {
312
            // sub_dir starts with SHARD_PREFIX
313
            // process shard sub dir
314
567
            std::vector<io::FileInfo> error_log_files;
315
567
            Status status =
316
567
                    io::global_local_filesystem()->list(sub_path, false, &error_log_files, &exists);
317
567
            if (!status.ok()) {
318
0
                LOG(WARNING) << "scan one path to delete directory failed: " << status;
319
0
                continue;
320
0
            }
321
567
            for (auto& error_log : error_log_files) {
322
567
                std::string error_log_path = sub_path + "/" + error_log.file_name;
323
567
                process_path(now, error_log_path, config::load_error_log_reserve_hours);
324
567
            }
325
567
        } else {
326
            // process error log file
327
0
            process_path(now, sub_path, config::load_error_log_reserve_hours);
328
0
        }
329
567
    }
330
1
}
331
332
} // namespace doris