Coverage Report

Created: 2026-03-12 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/load/load_path_mgr.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "load/load_path_mgr.h"
19
20
// IWYU pragma: no_include <bthread/errno.h>
21
#include <errno.h> // IWYU pragma: keep
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
#include <string.h>
25
#include <sys/stat.h>
26
27
#include <algorithm>
28
#include <boost/algorithm/string/join.hpp>
29
30
#include "common/cast_set.h"      // IWYU pragma: keep
31
#include "common/compiler_util.h" // IWYU pragma: keep
32
// IWYU pragma: no_include <bits/chrono.h>
33
#include <chrono> // IWYU pragma: keep
34
#include <memory>
35
#include <ostream>
36
#include <string>
37
38
#include "common/config.h"
39
#include "io/fs/file_system.h"
40
#include "io/fs/local_file_system.h"
41
#include "runtime/exec_env.h"
42
#include "storage/olap_define.h"
43
#include "storage/options.h"
44
#include "util/thread.h"
45
46
namespace doris {
47
48
#include "common/compile_check_begin.h"
49
50
using namespace ErrorCode;
51
52
static const uint32_t MAX_SHARD_NUM = 1024;
53
static const std::string SHARD_PREFIX = "__shard_";
54
55
LoadPathMgr::LoadPathMgr(ExecEnv* exec_env)
56
8
        : _exec_env(exec_env),
57
8
          _idx(0),
58
8
          _next_shard(0),
59
8
          _error_path_next_shard(0),
60
8
          _stop_background_threads_latch(1) {}
61
62
4
void LoadPathMgr::stop() {
63
4
    _stop_background_threads_latch.count_down();
64
4
    if (_clean_thread) {
65
3
        _clean_thread->join();
66
3
    }
67
4
}
68
69
7
Status LoadPathMgr::init() {
70
7
    _path_vec.clear();
71
7
    LOG(INFO) << "Load path configured to [" << boost::join(_path_vec, ",") << "]";
72
73
    // error log is saved in first root path
74
7
    _error_log_dir = _exec_env->store_paths()[0].path + "/" + ERROR_LOG_PREFIX;
75
    // check and make dir
76
7
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(_error_log_dir));
77
78
7
    _idx = 0;
79
7
    _reserved_hours = std::max<int64_t>(config::load_data_reserve_hours, 1L);
80
7
    RETURN_IF_ERROR(Thread::create(
81
7
            "LoadPathMgr", "clean_expired_temp_path",
82
7
            [this]() {
83
                // TODO(zc): add this thread to cgroup for control resource it use
84
7
                while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(3600))) {
85
7
                    this->clean();
86
7
                }
87
7
            },
88
7
            &_clean_thread));
89
7
    return Status::OK();
90
7
}
91
92
Status LoadPathMgr::allocate_dir(const std::string& db, const std::string& label,
93
24
                                 std::string* prefix, int64_t file_bytes) {
94
24
    Status status = _init_once.call([this] {
95
4
        for (auto& store_path : _exec_env->store_paths()) {
96
4
            _path_vec.push_back(store_path.path + "/" + MINI_PREFIX);
97
4
        }
98
4
        return Status::OK();
99
4
    });
100
24
    std::string path;
101
24
    auto size = _path_vec.size();
102
24
    auto retry = size;
103
24
    auto exceed_capacity_path_num = 0;
104
24
    size_t disk_capacity_bytes = 0;
105
24
    size_t available_bytes = 0;
106
25
    while (retry--) {
107
        // Call get_space_info to get disk space information.
108
        // If the call fails or the disk space is insufficient,
109
        // increment the count of paths exceeding capacity and proceed to the next loop iteration.
110
24
        std::string base_path = _path_vec[_idx].substr(0, _path_vec[_idx].find("/" + MINI_PREFIX));
111
24
        if (!io::global_local_filesystem()
112
24
                     ->get_space_info(base_path, &disk_capacity_bytes, &available_bytes)
113
24
                     .ok() ||
114
24
            !check_disk_space(disk_capacity_bytes, available_bytes, file_bytes)) {
115
1
            ++exceed_capacity_path_num;
116
1
            continue;
117
1
        }
118
        // add SHARD_PREFIX for compatible purpose
119
23
        std::lock_guard<std::mutex> l(_lock);
120
23
        std::string shard = SHARD_PREFIX + std::to_string(_next_shard++ % MAX_SHARD_NUM);
121
23
        path = _path_vec[_idx] + "/" + db + "/" + shard + "/" + label;
122
23
        _idx = (_idx + 1) % size;
123
23
        status = io::global_local_filesystem()->create_directory(path);
124
23
        if (LIKELY(status.ok())) {
125
23
            *prefix = path;
126
23
            return Status::OK();
127
23
        }
128
23
    }
129
1
    if (exceed_capacity_path_num == size) {
130
1
        return Status::Error<DISK_REACH_CAPACITY_LIMIT, false>("exceed capacity limit.");
131
1
    }
132
0
    return status;
133
1
}
134
135
bool LoadPathMgr::check_disk_space(size_t disk_capacity_bytes, size_t available_bytes,
136
26
                                   int64_t file_bytes) {
137
26
    bool is_available = false;
138
26
    int64_t remaining_bytes = available_bytes - file_bytes;
139
26
    double used_ratio =
140
26
            1.0 - cast_set<double>(remaining_bytes) / cast_set<double>(disk_capacity_bytes);
141
26
    is_available = !(used_ratio >= config::storage_flood_stage_usage_percent / 100.0 &&
142
26
                     remaining_bytes <= config::storage_flood_stage_left_capacity_bytes);
143
26
    if (!is_available) {
144
2
        LOG(WARNING) << "Exceed capacity limit. disk_capacity: " << disk_capacity_bytes
145
2
                     << ", available: " << available_bytes << ", file_bytes: " << file_bytes;
146
2
    }
147
26
    return is_available;
148
26
}
149
150
577
bool LoadPathMgr::is_too_old(time_t cur_time, const std::string& label_dir, int64_t reserve_hours) {
151
577
    struct stat dir_stat;
152
577
    if (stat(label_dir.c_str(), &dir_stat)) {
153
0
        char buf[64];
154
        // State failed, just information
155
0
        LOG(WARNING) << "stat directory failed.path=" << label_dir
156
0
                     << ",code=" << strerror_r(errno, buf, 64);
157
0
        return false;
158
0
    }
159
160
577
    if ((cur_time - dir_stat.st_mtime) < reserve_hours * 3600) {
161
577
        return false;
162
577
    }
163
164
0
    return true;
165
577
}
166
167
0
void LoadPathMgr::get_load_data_path(std::vector<std::string>* data_paths) {
168
0
    data_paths->insert(data_paths->end(), _path_vec.begin(), _path_vec.end());
169
0
    return;
170
0
}
171
172
const std::string ERROR_FILE_NAME = "error_log";
173
174
Status LoadPathMgr::get_load_error_file_name(const std::string& db, const std::string& label,
175
                                             const TUniqueId& fragment_instance_id,
176
577
                                             std::string* error_path) {
177
577
    std::stringstream ss;
178
577
    std::string shard = "";
179
577
    {
180
577
        std::lock_guard<std::mutex> l(_lock);
181
577
        shard = SHARD_PREFIX + std::to_string(_error_path_next_shard++ % MAX_SHARD_NUM);
182
577
    }
183
577
    std::string shard_path = _error_log_dir + "/" + shard;
184
    // check and create shard path
185
577
    RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(shard_path));
186
    // add shard sub dir to file path
187
577
    ss << shard << "/" << ERROR_FILE_NAME << "_" << db << "_" << label << "_" << std::hex
188
577
       << fragment_instance_id.hi << "_" << fragment_instance_id.lo;
189
577
    *error_path = ss.str();
190
577
    return Status::OK();
191
577
}
192
193
1.10k
std::string LoadPathMgr::get_load_error_absolute_path(const std::string& file_path) {
194
1.10k
    std::string path;
195
1.10k
    path.append(_error_log_dir);
196
1.10k
    path.append("/");
197
1.10k
    path.append(file_path);
198
1.10k
    return path;
199
1.10k
}
200
201
577
void LoadPathMgr::process_path(time_t now, const std::string& path, int64_t reserve_hours) {
202
577
    if (!is_too_old(now, path, reserve_hours)) {
203
577
        return;
204
577
    }
205
577
    LOG(INFO) << "Going to remove path. path=" << path;
206
0
    Status status = io::global_local_filesystem()->delete_directory_or_file(path);
207
0
    if (status.ok()) {
208
0
        LOG(INFO) << "Remove path success. path=" << path;
209
0
    } else {
210
0
        LOG(WARNING) << "Remove path failed. path=" << path << ", error=" << status;
211
0
    }
212
0
}
213
214
23
void LoadPathMgr::clean_files_in_path_vec(const std::string& path) {
215
    // Check if the path contains "/"+MINI_PREFIX. If not, return directly.
216
23
    if (path.find("/" + MINI_PREFIX) == std::string::npos) {
217
0
        return;
218
0
    }
219
220
23
    bool exists = false;
221
    // Check if the path exists
222
23
    Status status = io::global_local_filesystem()->exists(path, &exists);
223
23
    if (!status.ok()) {
224
0
        LOG(WARNING) << "Failed to check if path exists: " << path << ", error: " << status;
225
0
        return;
226
0
    }
227
23
    if (exists) {
228
        // If the path exists, delete the file or directory corresponding to that path
229
22
        status = io::global_local_filesystem()->delete_directory_or_file(path);
230
22
        if (status.ok()) {
231
22
            LOG(INFO) << "Delete path success: " << path;
232
22
        } else {
233
0
            LOG(WARNING) << "Delete path failed: " << path << ", error: " << status;
234
0
        }
235
22
    }
236
23
}
237
238
3
void LoadPathMgr::clean_one_path(const std::string& path) {
239
3
    bool exists = true;
240
3
    std::vector<io::FileInfo> dbs;
241
3
    Status st = io::global_local_filesystem()->list(path, false, &dbs, &exists);
242
3
    if (!st) {
243
0
        return;
244
0
    }
245
246
3
    Status status;
247
3
    time_t now = time(nullptr);
248
12
    for (auto& db : dbs) {
249
12
        if (db.is_file) {
250
0
            continue;
251
0
        }
252
12
        std::string db_dir = path + "/" + db.file_name;
253
12
        std::vector<io::FileInfo> sub_dirs;
254
12
        status = io::global_local_filesystem()->list(db_dir, false, &sub_dirs, &exists);
255
12
        if (!status.ok()) {
256
0
            LOG(WARNING) << "scan db of trash dir failed: " << status;
257
0
            continue;
258
0
        }
259
        // delete this file
260
22
        for (auto& sub_dir : sub_dirs) {
261
22
            if (sub_dir.is_file) {
262
0
                continue;
263
0
            }
264
22
            std::string sub_path = db_dir + "/" + sub_dir.file_name;
265
            // for compatible
266
22
            if (sub_dir.file_name.find(SHARD_PREFIX) == 0) {
267
                // sub_dir starts with SHARD_PREFIX
268
                // process shard sub dir
269
22
                std::vector<io::FileInfo> labels;
270
22
                status = io::global_local_filesystem()->list(sub_path, false, &labels, &exists);
271
22
                if (!status.ok()) {
272
0
                    LOG(WARNING) << "scan one path to delete directory failed: " << status;
273
0
                    continue;
274
0
                }
275
22
                for (auto& label : labels) {
276
0
                    std::string label_dir = sub_path + "/" + label.file_name;
277
0
                    process_path(now, label_dir, config::load_data_reserve_hours);
278
0
                }
279
22
            } else {
280
                // process label dir
281
0
                process_path(now, sub_path, config::load_data_reserve_hours);
282
0
            }
283
22
        }
284
12
    }
285
3
}
286
287
3
void LoadPathMgr::clean() {
288
3
    for (auto& path : _path_vec) {
289
3
        clean_one_path(path);
290
3
    }
291
3
    clean_error_log();
292
3
}
293
294
3
void LoadPathMgr::clean_error_log() {
295
3
    time_t now = time(nullptr);
296
3
    bool exists = true;
297
3
    std::vector<io::FileInfo> sub_dirs;
298
3
    {
299
3
        Status status =
300
3
                io::global_local_filesystem()->list(_error_log_dir, false, &sub_dirs, &exists);
301
3
        if (!status.ok()) {
302
0
            LOG(WARNING) << "scan error_log dir failed: " << status;
303
0
            return;
304
0
        }
305
3
    }
306
307
577
    for (auto& sub_dir : sub_dirs) {
308
577
        if (sub_dir.is_file) {
309
0
            continue;
310
0
        }
311
577
        std::string sub_path = _error_log_dir + "/" + sub_dir.file_name;
312
        // for compatible
313
577
        if (sub_dir.file_name.find(SHARD_PREFIX) == 0) {
314
            // sub_dir starts with SHARD_PREFIX
315
            // process shard sub dir
316
577
            std::vector<io::FileInfo> error_log_files;
317
577
            Status status =
318
577
                    io::global_local_filesystem()->list(sub_path, false, &error_log_files, &exists);
319
577
            if (!status.ok()) {
320
0
                LOG(WARNING) << "scan one path to delete directory failed: " << status;
321
0
                continue;
322
0
            }
323
577
            for (auto& error_log : error_log_files) {
324
577
                std::string error_log_path = sub_path + "/" + error_log.file_name;
325
577
                process_path(now, error_log_path, config::load_error_log_reserve_hours);
326
577
            }
327
577
        } else {
328
            // process error log file
329
0
            process_path(now, sub_path, config::load_error_log_reserve_hours);
330
0
        }
331
577
    }
332
3
}
333
334
#include "common/compile_check_end.h"
335
336
} // namespace doris