Coverage Report

Created: 2026-03-18 18:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/spill/spill_file_manager.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/spill/spill_file_manager.h"
19
20
#include <fmt/format.h>
21
#include <glog/logging.h>
22
23
#include <algorithm>
24
#include <filesystem>
25
#include <memory>
26
#include <string>
27
28
#include "common/logging.h"
29
#include "common/metrics/doris_metrics.h"
30
#include "exec/spill/spill_file.h"
31
#include "io/fs/file_system.h"
32
#include "io/fs/local_file_system.h"
33
#include "storage/olap_define.h"
34
#include "util/parse_util.h"
35
#include "util/pretty_printer.h"
36
#include "util/time.h"
37
38
namespace doris {
39
#include "common/compile_check_begin.h"
40
41
155
SpillFileManager::~SpillFileManager() {
42
155
    DorisMetrics::instance()->metric_registry()->deregister_entity(_entity);
43
155
}
44
45
SpillFileManager::SpillFileManager(
46
        std::unordered_map<std::string, std::unique_ptr<SpillDataDir>>&& spill_store_map)
47
170
        : _spill_store_map(std::move(spill_store_map)), _stop_background_threads_latch(1) {}
48
49
170
Status SpillFileManager::init() {
50
170
    LOG(INFO) << "init spill stream manager";
51
170
    RETURN_IF_ERROR(_init_spill_store_map());
52
53
174
    for (const auto& [path, store] : _spill_store_map) {
54
174
        auto gc_dir_root_dir = store->get_spill_data_gc_path();
55
174
        bool exists = true;
56
174
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(gc_dir_root_dir, &exists));
57
174
        if (!exists) {
58
64
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(gc_dir_root_dir));
59
64
        }
60
61
174
        auto spill_dir = store->get_spill_data_path();
62
174
        RETURN_IF_ERROR(io::global_local_filesystem()->exists(spill_dir, &exists));
63
174
        if (!exists) {
64
64
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
65
110
        } else {
66
110
            auto suffix = ToStringFromUnixMillis(UnixMillis());
67
110
            auto gc_dir = store->get_spill_data_gc_path(suffix);
68
110
            if (std::filesystem::exists(gc_dir)) {
69
0
                LOG(WARNING) << "gc dir already exists: " << gc_dir;
70
0
            }
71
110
            (void)io::global_local_filesystem()->rename(spill_dir, gc_dir);
72
110
            RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(spill_dir));
73
110
        }
74
174
    }
75
76
170
    RETURN_IF_ERROR(Thread::create(
77
170
            "Spill", "spill_gc_thread", [this]() { this->_spill_gc_thread_callback(); },
78
170
            &_spill_gc_thread));
79
170
    LOG(INFO) << "spill gc thread started";
80
81
170
    _init_metrics();
82
83
170
    return Status::OK();
84
170
}
85
86
170
void SpillFileManager::_init_metrics() {
87
170
    _entity = DorisMetrics::instance()->metric_registry()->register_entity("spill",
88
170
                                                                           {{"name", "spill"}});
89
90
170
    _spill_write_bytes_metric = std::make_unique<doris::MetricPrototype>(
91
170
            doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_write_bytes");
92
170
    _spill_write_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
93
170
            _spill_write_bytes_metric.get()));
94
95
170
    _spill_read_bytes_metric = std::make_unique<doris::MetricPrototype>(
96
170
            doris::MetricType::COUNTER, doris::MetricUnit::BYTES, "spill_read_bytes");
97
170
    _spill_read_bytes_counter = (IntAtomicCounter*)(_entity->register_metric<IntAtomicCounter>(
98
170
            _spill_read_bytes_metric.get()));
99
170
}
100
101
// clean up stale spilled files
102
170
void SpillFileManager::_spill_gc_thread_callback() {
103
11.5k
    while (!_stop_background_threads_latch.wait_for(
104
11.5k
            std::chrono::milliseconds(config::spill_gc_interval_ms))) {
105
11.3k
        gc(config::spill_gc_work_time_ms);
106
11.9k
        for (auto& [path, dir] : _spill_store_map) {
107
11.9k
            static_cast<void>(dir->update_capacity());
108
11.9k
        }
109
11.3k
    }
110
170
}
111
112
170
Status SpillFileManager::_init_spill_store_map() {
113
174
    for (const auto& store : _spill_store_map) {
114
174
        RETURN_IF_ERROR(store.second->init());
115
174
    }
116
117
170
    return Status::OK();
118
170
}
119
120
std::vector<SpillDataDir*> SpillFileManager::_get_stores_for_spill(
121
652
        TStorageMedium::type storage_medium) {
122
652
    std::vector<std::pair<SpillDataDir*, double>> stores_with_usage;
123
658
    for (auto& [_, store] : _spill_store_map) {
124
658
        if (store->storage_medium() == storage_medium && !store->reach_capacity_limit(0)) {
125
329
            stores_with_usage.emplace_back(store.get(), store->_get_disk_usage(0));
126
329
        }
127
658
    }
128
652
    if (stores_with_usage.empty()) {
129
323
        return {};
130
323
    }
131
132
329
    std::ranges::sort(stores_with_usage, [](auto&& a, auto&& b) { return a.second < b.second; });
133
134
329
    std::vector<SpillDataDir*> stores;
135
329
    for (const auto& [store, _] : stores_with_usage) {
136
329
        stores.emplace_back(store);
137
329
    }
138
329
    return stores;
139
652
}
140
141
Status SpillFileManager::create_spill_file(const std::string& relative_path,
142
329
                                           SpillFileSPtr& spill_file) {
143
329
    auto data_dirs = _get_stores_for_spill(TStorageMedium::type::SSD);
144
329
    if (data_dirs.empty()) {
145
323
        data_dirs = _get_stores_for_spill(TStorageMedium::type::HDD);
146
323
    }
147
329
    if (data_dirs.empty()) {
148
0
        return Status::Error<ErrorCode::NO_AVAILABLE_ROOT_PATH>(
149
0
                "no available disk can be used for spill.");
150
0
    }
151
152
    // Select the first available data dir (sorted by usage ascending)
153
329
    SpillDataDir* data_dir = data_dirs.front();
154
329
    spill_file = std::make_shared<SpillFile>(data_dir, relative_path);
155
329
    return Status::OK();
156
329
}
157
158
112
void SpillFileManager::delete_spill_file(SpillFileSPtr spill_file) {
159
112
    if (!spill_file) {
160
0
        LOG(WARNING) << "[spill][delete] null spill_file";
161
0
        return;
162
0
    }
163
112
    spill_file->gc();
164
112
}
165
166
11.4k
void SpillFileManager::gc(int32_t max_work_time_ms) {
167
11.4k
    bool exists = true;
168
11.4k
    bool has_work = false;
169
11.4k
    int64_t max_work_time_ns = max_work_time_ms * 1000L * 1000L;
170
11.4k
    MonotonicStopWatch watch;
171
11.4k
    watch.start();
172
11.4k
    Defer defer {[&]() {
173
11.3k
        if (has_work) {
174
14
            std::string msg(
175
14
                    fmt::format("spill gc time: {}",
176
14
                                PrettyPrinter::print(watch.elapsed_time(), TUnit::TIME_NS)));
177
14
            msg += ", spill storage:\n";
178
22
            for (const auto& [path, store_dir] : _spill_store_map) {
179
22
                msg += "    " + store_dir->debug_string();
180
22
                msg += "\n";
181
22
            }
182
14
            LOG(INFO) << msg;
183
14
        }
184
11.3k
    }};
185
11.9k
    for (const auto& [path, store_dir] : _spill_store_map) {
186
11.9k
        std::string gc_root_dir = store_dir->get_spill_data_gc_path();
187
188
11.9k
        std::error_code ec;
189
11.9k
        exists = std::filesystem::exists(gc_root_dir, ec);
190
11.9k
        if (ec || !exists) {
191
8.17k
            continue;
192
8.17k
        }
193
        // dirs of queries
194
3.73k
        std::vector<io::FileInfo> dirs;
195
3.73k
        auto st = io::global_local_filesystem()->list(gc_root_dir, false, &dirs, &exists);
196
3.73k
        if (!st.ok()) {
197
0
            continue;
198
0
        }
199
200
3.73k
        for (const auto& dir : dirs) {
201
164
            has_work = true;
202
164
            if (dir.is_file) {
203
0
                continue;
204
0
            }
205
164
            std::string abs_dir = fmt::format("{}/{}", gc_root_dir, dir.file_name);
206
            // operator spill sub dirs of a query
207
164
            std::vector<io::FileInfo> files;
208
164
            st = io::global_local_filesystem()->list(abs_dir, false, &files, &exists);
209
164
            if (!st.ok()) {
210
0
                continue;
211
0
            }
212
164
            if (files.empty()) {
213
110
                static_cast<void>(io::global_local_filesystem()->delete_directory(abs_dir));
214
110
                continue;
215
110
            }
216
217
204
            for (const auto& file : files) {
218
204
                auto abs_file_path = fmt::format("{}/{}", abs_dir, file.file_name);
219
204
                if (file.is_file) {
220
0
                    static_cast<void>(io::global_local_filesystem()->delete_file(abs_file_path));
221
204
                } else {
222
204
                    static_cast<void>(
223
204
                            io::global_local_filesystem()->delete_directory(abs_file_path));
224
204
                }
225
204
                if (watch.elapsed_time() > max_work_time_ns) {
226
0
                    break;
227
0
                }
228
204
            }
229
54
        }
230
3.73k
    }
231
11.4k
}
232
233
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_capacity, MetricUnit::BYTES);
234
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_limit, MetricUnit::BYTES);
235
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_avail_capacity, MetricUnit::BYTES);
236
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_data_size, MetricUnit::BYTES);
237
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_data, MetricUnit::BYTES);
238
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(spill_disk_has_spill_gc_data, MetricUnit::BYTES);
239
240
SpillDataDir::SpillDataDir(std::string path, int64_t capacity_bytes,
241
                           TStorageMedium::type storage_medium)
242
174
        : _path(std::move(path)),
243
174
          _disk_capacity_bytes(capacity_bytes),
244
174
          _storage_medium(storage_medium) {
245
174
    spill_data_dir_metric_entity = DorisMetrics::instance()->metric_registry()->register_entity(
246
174
            std::string("spill_data_dir.") + _path, {{"path", _path + "/" + SPILL_DIR_PREFIX}});
247
174
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_capacity);
248
174
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_limit);
249
174
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_avail_capacity);
250
174
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_data_size);
251
174
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_data);
252
174
    INT_GAUGE_METRIC_REGISTER(spill_data_dir_metric_entity, spill_disk_has_spill_gc_data);
253
174
}
254
255
7.81k
bool is_directory_empty(const std::filesystem::path& dir) {
256
7.81k
    try {
257
7.81k
        return std::filesystem::is_directory(dir) &&
258
7.81k
               std::filesystem::directory_iterator(dir) ==
259
7.68k
                       std::filesystem::end(std::filesystem::directory_iterator {});
260
        // this method is not thread safe, the file referenced by directory_iterator
261
        // maybe moved to spill_gc dir during this function call, so need to catch expection
262
7.81k
    } catch (const std::filesystem::filesystem_error&) {
263
0
        return true;
264
0
    }
265
7.81k
}
266
267
174
Status SpillDataDir::init() {
268
174
    bool exists = false;
269
174
    RETURN_IF_ERROR(io::global_local_filesystem()->exists(_path, &exists));
270
174
    if (!exists) {
271
0
        RETURN_NOT_OK_STATUS_WITH_WARN(Status::IOError("opendir failed, path={}", _path),
272
0
                                       "check file exist failed");
273
0
    }
274
174
    RETURN_IF_ERROR(update_capacity());
275
174
    LOG(INFO) << fmt::format(
276
174
            "spill storage path: {}, capacity: {}, limit: {}, available: "
277
174
            "{}",
278
174
            _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
279
174
            PrettyPrinter::print_bytes(_spill_data_limit_bytes),
280
174
            PrettyPrinter::print_bytes(_available_bytes));
281
174
    return Status::OK();
282
174
}
283
284
4.41k
std::string SpillDataDir::get_spill_data_path(const std::string& query_id) const {
285
4.41k
    auto dir = fmt::format("{}/{}", _path, SPILL_DIR_PREFIX);
286
4.41k
    if (!query_id.empty()) {
287
0
        dir = fmt::format("{}/{}", dir, query_id);
288
0
    }
289
4.41k
    return dir;
290
4.41k
}
291
292
16.1k
std::string SpillDataDir::get_spill_data_gc_path(const std::string& sub_dir_name) const {
293
16.1k
    auto dir = fmt::format("{}/{}", _path, SPILL_GC_DIR_PREFIX);
294
16.1k
    if (!sub_dir_name.empty()) {
295
110
        dir = fmt::format("{}/{}", dir, sub_dir_name);
296
110
    }
297
16.1k
    return dir;
298
16.1k
}
299
300
12.0k
Status SpillDataDir::update_capacity() {
301
12.0k
    std::lock_guard<std::mutex> l(_mutex);
302
12.0k
    RETURN_IF_ERROR(io::global_local_filesystem()->get_space_info(_path, &_disk_capacity_bytes,
303
12.0k
                                                                  &_available_bytes));
304
3.92k
    spill_disk_capacity->set_value(_disk_capacity_bytes);
305
3.92k
    spill_disk_avail_capacity->set_value(_available_bytes);
306
3.92k
    auto disk_use_max_bytes =
307
3.92k
            (int64_t)(_disk_capacity_bytes * config::storage_flood_stage_usage_percent / 100);
308
3.92k
    bool is_percent = true;
309
3.92k
    _spill_data_limit_bytes = ParseUtil::parse_mem_spec(config::spill_storage_limit, -1,
310
3.92k
                                                        _disk_capacity_bytes, &is_percent);
311
3.92k
    if (_spill_data_limit_bytes <= 0) {
312
0
        spill_disk_limit->set_value(_spill_data_limit_bytes);
313
0
        auto err_msg = fmt::format("Failed to parse spill storage limit from '{}'",
314
0
                                   config::spill_storage_limit);
315
0
        LOG(WARNING) << err_msg;
316
0
        return Status::InvalidArgument(err_msg);
317
0
    }
318
3.92k
    if (is_percent) {
319
3.90k
        _spill_data_limit_bytes = (int64_t)(_spill_data_limit_bytes *
320
3.90k
                                            config::storage_flood_stage_usage_percent / 100);
321
3.90k
    }
322
3.92k
    _spill_data_limit_bytes = std::min(_spill_data_limit_bytes, disk_use_max_bytes);
323
3.92k
    spill_disk_limit->set_value(_spill_data_limit_bytes);
324
325
3.92k
    std::string spill_root_dir = get_spill_data_path();
326
3.92k
    std::string spill_gc_root_dir = get_spill_data_gc_path();
327
3.92k
    spill_disk_has_spill_data->set_value(is_directory_empty(spill_root_dir) ? 0 : 1);
328
3.92k
    spill_disk_has_spill_gc_data->set_value(is_directory_empty(spill_gc_root_dir) ? 0 : 1);
329
330
3.92k
    return Status::OK();
331
3.92k
}
332
333
780
bool SpillDataDir::_reach_disk_capacity_limit(int64_t incoming_data_size) {
334
780
    double used_pct = _get_disk_usage(incoming_data_size);
335
780
    int64_t left_bytes = _available_bytes - incoming_data_size;
336
780
    if (used_pct >= config::storage_flood_stage_usage_percent / 100.0 &&
337
780
        left_bytes <= config::storage_flood_stage_left_capacity_bytes) {
338
0
        LOG(WARNING) << "reach capacity limit. used pct: " << used_pct
339
0
                     << ", left bytes: " << left_bytes << ", path: " << _path;
340
0
        return true;
341
0
    }
342
780
    return false;
343
780
}
344
780
bool SpillDataDir::reach_capacity_limit(int64_t incoming_data_size) {
345
780
    std::lock_guard<std::mutex> l(_mutex);
346
780
    if (_reach_disk_capacity_limit(incoming_data_size)) {
347
0
        return true;
348
0
    }
349
780
    if (_spill_data_bytes + incoming_data_size > _spill_data_limit_bytes) {
350
0
        LOG_EVERY_T(WARNING, 1) << fmt::format(
351
0
                "spill data reach limit, path: {}, capacity: {}, limit: {}, used: {}, "
352
0
                "available: "
353
0
                "{}, "
354
0
                "incoming "
355
0
                "bytes: {}",
356
0
                _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
357
0
                PrettyPrinter::print_bytes(_spill_data_limit_bytes),
358
0
                PrettyPrinter::print_bytes(_spill_data_bytes),
359
0
                PrettyPrinter::print_bytes(_available_bytes),
360
0
                PrettyPrinter::print_bytes(incoming_data_size));
361
0
        return true;
362
0
    }
363
780
    return false;
364
780
}
365
22
std::string SpillDataDir::debug_string() {
366
22
    return fmt::format(
367
22
            "path: {}, capacity: {}, limit: {}, used: {}, available: "
368
22
            "{}",
369
22
            _path, PrettyPrinter::print_bytes(_disk_capacity_bytes),
370
22
            PrettyPrinter::print_bytes(_spill_data_limit_bytes),
371
22
            PrettyPrinter::print_bytes(_spill_data_bytes),
372
22
            PrettyPrinter::print_bytes(_available_bytes));
373
22
}
374
} // namespace doris